1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::Path;
3use std::time::Instant;
4
5use async_trait::async_trait;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use simple_agent_type::message::{Message, Role};
10use simple_agent_type::request::CompletionRequest;
11use simple_agent_type::response::FinishReason;
12use simple_agent_type::tool::{
13 ToolCall, ToolChoice, ToolChoiceFunction, ToolChoiceMode, ToolChoiceTool, ToolDefinition,
14 ToolFunction, ToolType,
15};
16use simple_agents_core::{
17 CompletionMode, CompletionOptions, CompletionOutcome, SimpleAgentsClient,
18};
19use simple_agents_healing::JsonishParser;
20use thiserror::Error;
21
22use crate::ir::{Node, NodeKind, RouterRoute, WorkflowDefinition, WORKFLOW_IR_V0};
23use crate::observability::tracing::{
24 flush_workflow_tracer, workflow_tracer, SpanKind, TraceContext, WorkflowSpan,
25};
26use crate::runtime::{
27 LlmExecutionError, LlmExecutionInput, LlmExecutionOutput, LlmExecutor, ToolExecutionError,
28 ToolExecutionInput, ToolExecutor, WorkflowRuntime, WorkflowRuntimeError,
29 WorkflowRuntimeOptions,
30};
31use crate::visualize::workflow_to_mermaid;
32
33const YAML_START_NODE_ID: &str = "__yaml_start";
34const YAML_LLM_TOOL_ID: &str = "__yaml_llm_call";
35
36static TRACE_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
37
38#[derive(Debug, Clone, PartialEq, Serialize)]
39pub struct YamlStepTiming {
40 pub node_id: String,
41 pub node_kind: String,
42 pub elapsed_ms: u128,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 pub prompt_tokens: Option<u32>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub completion_tokens: Option<u32>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub total_tokens: Option<u32>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub thinking_tokens: Option<u32>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub tokens_per_second: Option<f64>,
53}
54
55#[derive(Debug, Clone, PartialEq, Serialize)]
56pub struct YamlLlmNodeMetrics {
57 pub elapsed_ms: u128,
58 pub prompt_tokens: u32,
59 pub completion_tokens: u32,
60 pub total_tokens: u32,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub thinking_tokens: Option<u32>,
63 pub tokens_per_second: f64,
64}
65
66#[derive(Debug, Clone, PartialEq, Serialize)]
67pub struct YamlWorkflowRunOutput {
68 pub workflow_id: String,
69 pub entry_node: String,
70 pub email_text: String,
71 pub trace: Vec<String>,
72 pub outputs: BTreeMap<String, Value>,
73 pub terminal_node: String,
74 pub terminal_output: Option<Value>,
75 pub step_timings: Vec<YamlStepTiming>,
76 pub llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics>,
77 pub total_elapsed_ms: u128,
78 #[serde(skip_serializing_if = "Option::is_none")]
79 pub ttft_ms: Option<u128>,
80 pub total_input_tokens: u64,
81 pub total_output_tokens: u64,
82 pub total_tokens: u64,
83 #[serde(skip_serializing_if = "Option::is_none")]
84 pub total_thinking_tokens: Option<u64>,
85 pub tokens_per_second: f64,
86 #[serde(skip_serializing_if = "Option::is_none")]
87 pub trace_id: Option<String>,
88 #[serde(skip_serializing_if = "Option::is_none")]
89 pub metadata: Option<Value>,
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
93#[serde(rename_all = "snake_case")]
94pub enum YamlWorkflowPayloadMode {
95 #[default]
96 FullPayload,
97 RedactedPayload,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
101#[serde(rename_all = "snake_case")]
102pub enum YamlToolTraceMode {
103 #[default]
104 Full,
105 Redacted,
106 Off,
107}
108
109#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
110pub struct YamlWorkflowTraceContextInput {
111 #[serde(default)]
112 pub trace_id: Option<String>,
113 #[serde(default)]
114 pub span_id: Option<String>,
115 #[serde(default)]
116 pub parent_span_id: Option<String>,
117 #[serde(default)]
118 pub traceparent: Option<String>,
119 #[serde(default)]
120 pub tracestate: Option<String>,
121 #[serde(default)]
122 pub baggage: BTreeMap<String, String>,
123}
124
125#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
126pub struct YamlWorkflowTraceTenantContext {
127 #[serde(default)]
128 pub workspace_id: Option<String>,
129 #[serde(default)]
130 pub user_id: Option<String>,
131 #[serde(default)]
132 pub conversation_id: Option<String>,
133 #[serde(default)]
134 pub request_id: Option<String>,
135 #[serde(default)]
136 pub run_id: Option<String>,
137}
138
139#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
140pub struct YamlWorkflowTelemetryConfig {
141 #[serde(default = "default_true")]
142 pub enabled: bool,
143 #[serde(default = "default_true")]
144 pub nerdstats: bool,
145 #[serde(default = "default_sample_rate")]
146 pub sample_rate: f32,
147 #[serde(default)]
148 pub payload_mode: YamlWorkflowPayloadMode,
149 #[serde(default = "default_retention_days")]
150 pub retention_days: u32,
151 #[serde(default = "default_true")]
152 pub multi_tenant: bool,
153 #[serde(default)]
154 pub tool_trace_mode: YamlToolTraceMode,
155}
156
157impl Default for YamlWorkflowTelemetryConfig {
158 fn default() -> Self {
159 Self {
160 enabled: true,
161 nerdstats: true,
162 sample_rate: 1.0,
163 payload_mode: YamlWorkflowPayloadMode::FullPayload,
164 retention_days: 30,
165 multi_tenant: true,
166 tool_trace_mode: YamlToolTraceMode::Full,
167 }
168 }
169}
170
171#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
172pub struct YamlWorkflowTraceOptions {
173 #[serde(default)]
174 pub context: Option<YamlWorkflowTraceContextInput>,
175 #[serde(default)]
176 pub tenant: YamlWorkflowTraceTenantContext,
177}
178
179#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
180pub struct YamlWorkflowRunOptions {
181 #[serde(default)]
182 pub telemetry: YamlWorkflowTelemetryConfig,
183 #[serde(default)]
184 pub trace: YamlWorkflowTraceOptions,
185 #[serde(default)]
186 pub model: Option<String>,
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
190pub struct YamlLlmTokenUsage {
191 pub prompt_tokens: u32,
192 pub completion_tokens: u32,
193 pub total_tokens: u32,
194 pub thinking_tokens: Option<u32>,
195}
196
197#[derive(Debug, Clone, PartialEq, Serialize)]
198pub struct YamlLlmExecutionResult {
199 pub payload: Value,
200 pub usage: Option<YamlLlmTokenUsage>,
201 pub ttft_ms: Option<u128>,
202 pub tool_calls: Vec<YamlToolCallTrace>,
203}
204
205#[derive(Debug, Clone, PartialEq, Serialize)]
206pub struct YamlToolCallTrace {
207 pub id: String,
208 pub name: String,
209 pub arguments: Value,
210 pub output: Option<Value>,
211 pub status: String,
212 pub elapsed_ms: u128,
213 pub error: Option<String>,
214}
215
216#[derive(Debug, Clone, Default)]
217struct YamlTokenTotals {
218 input_tokens: u64,
219 output_tokens: u64,
220 total_tokens: u64,
221 thinking_tokens: Option<u64>,
222}
223
224impl YamlTokenTotals {
225 fn add_usage(&mut self, usage: &YamlLlmTokenUsage) {
226 self.input_tokens += u64::from(usage.prompt_tokens);
227 self.output_tokens += u64::from(usage.completion_tokens);
228 self.total_tokens += u64::from(usage.total_tokens);
229
230 if let Some(thinking_tokens) = usage.thinking_tokens {
231 let next = self.thinking_tokens.unwrap_or(0) + u64::from(thinking_tokens);
232 self.thinking_tokens = Some(next);
233 }
234 }
235
236 fn tokens_per_second(&self, elapsed_ms: u128) -> f64 {
237 if elapsed_ms == 0 {
238 return 0.0;
239 }
240 round_two_decimals((self.output_tokens as f64) * 1000.0 / (elapsed_ms as f64))
241 }
242}
243
244fn round_two_decimals(value: f64) -> f64 {
245 (value * 100.0).round() / 100.0
246}
247
248fn completion_tokens_per_second(completion_tokens: u32, elapsed_ms: u128) -> f64 {
249 if elapsed_ms == 0 {
250 return 0.0;
251 }
252 round_two_decimals((completion_tokens as f64) * 1000.0 / (elapsed_ms as f64))
253}
254
255fn default_true() -> bool {
256 true
257}
258
259fn default_sample_rate() -> f32 {
260 1.0
261}
262
263fn default_retention_days() -> u32 {
264 30
265}
266
267fn validate_sample_rate(sample_rate: f32) -> Result<(), YamlWorkflowRunError> {
268 if sample_rate.is_finite() && (0.0..=1.0).contains(&sample_rate) {
269 return Ok(());
270 }
271
272 Err(YamlWorkflowRunError::InvalidInput {
273 message: format!(
274 "telemetry.sample_rate must be between 0.0 and 1.0 inclusive; received {sample_rate}"
275 ),
276 })
277}
278
279fn should_sample_trace(trace_id: &str, sample_rate: f32) -> bool {
280 if sample_rate >= 1.0 {
281 return true;
282 }
283 if sample_rate <= 0.0 {
284 return false;
285 }
286
287 let mut hash: u64 = 0xcbf29ce484222325;
288 for byte in trace_id.as_bytes() {
289 hash ^= u64::from(*byte);
290 hash = hash.wrapping_mul(0x100000001b3);
291 }
292
293 let ratio = (hash as f64) / (u64::MAX as f64);
294 ratio < (sample_rate as f64)
295}
296
297fn trace_id_from_traceparent(traceparent: &str) -> Option<String> {
298 let mut parts = traceparent.split('-');
299 let version = parts.next()?;
300 let trace_id = parts.next()?;
301 let _span_id = parts.next()?;
302 let _flags = parts.next()?;
303 if parts.next().is_some() {
304 return None;
305 }
306
307 if version.len() != 2 || trace_id.len() != 32 {
308 return None;
309 }
310
311 if !version.chars().all(|ch| ch.is_ascii_hexdigit()) {
312 return None;
313 }
314 if !trace_id.chars().all(|ch| ch.is_ascii_hexdigit()) {
315 return None;
316 }
317 if trace_id.chars().all(|ch| ch == '0') {
318 return None;
319 }
320
321 Some(trace_id.to_ascii_lowercase())
322}
323
324#[derive(Debug, Clone, Copy, PartialEq, Eq)]
325enum TraceIdSource {
326 Disabled,
327 ExplicitTraceId,
328 ParentTraceId,
329 Traceparent,
330 ParentTraceparent,
331 Generated,
332}
333
334#[derive(Debug, Clone)]
335struct ResolvedTelemetryContext {
336 trace_id: Option<String>,
337 sampled: bool,
338 trace_id_source: TraceIdSource,
339}
340
341fn resolve_run_trace_id_with_source(
342 options: &YamlWorkflowRunOptions,
343 parent_trace_context: Option<&TraceContext>,
344) -> (Option<String>, TraceIdSource) {
345 if !options.telemetry.enabled {
346 return (None, TraceIdSource::Disabled);
347 }
348
349 if let Some(trace_id) = options
350 .trace
351 .context
352 .as_ref()
353 .and_then(|context| context.trace_id.clone())
354 {
355 return (Some(trace_id), TraceIdSource::ExplicitTraceId);
356 }
357
358 if let Some(trace_id) = parent_trace_context.and_then(|context| context.trace_id.clone()) {
359 return (Some(trace_id), TraceIdSource::ParentTraceId);
360 }
361
362 if let Some(trace_id) = options
363 .trace
364 .context
365 .as_ref()
366 .and_then(|context| context.traceparent.as_deref())
367 .and_then(trace_id_from_traceparent)
368 {
369 return (Some(trace_id), TraceIdSource::Traceparent);
370 }
371
372 if let Some(trace_id) = parent_trace_context
373 .and_then(|context| context.traceparent.as_deref())
374 .and_then(trace_id_from_traceparent)
375 {
376 return (Some(trace_id), TraceIdSource::ParentTraceparent);
377 }
378
379 (Some(generate_trace_id()), TraceIdSource::Generated)
380}
381
382fn resolve_telemetry_context(
383 options: &YamlWorkflowRunOptions,
384 parent_trace_context: Option<&TraceContext>,
385) -> ResolvedTelemetryContext {
386 let (trace_id, trace_id_source) =
387 resolve_run_trace_id_with_source(options, parent_trace_context);
388 let sampled = trace_id
389 .as_deref()
390 .map(|value| should_sample_trace(value, options.telemetry.sample_rate))
391 .unwrap_or(false);
392
393 ResolvedTelemetryContext {
394 trace_id,
395 sampled,
396 trace_id_source,
397 }
398}
399
400fn generate_trace_id() -> String {
401 use std::time::{SystemTime, UNIX_EPOCH};
402
403 let now_nanos = SystemTime::now()
404 .duration_since(UNIX_EPOCH)
405 .map(|duration| duration.as_nanos())
406 .unwrap_or(0);
407 let sequence = u128::from(TRACE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
408 format!("{:032x}", now_nanos ^ sequence)
409}
410
411fn workflow_metadata_with_trace(
412 options: &YamlWorkflowRunOptions,
413 trace_id: &str,
414 sampled: bool,
415 trace_id_source: TraceIdSource,
416) -> Value {
417 json!({
418 "telemetry": {
419 "trace_id": trace_id,
420 "trace_id_source": match trace_id_source {
421 TraceIdSource::Disabled => "disabled",
422 TraceIdSource::ExplicitTraceId => "explicit_trace_id",
423 TraceIdSource::ParentTraceId => "parent_trace_id",
424 TraceIdSource::Traceparent => "traceparent",
425 TraceIdSource::ParentTraceparent => "parent_traceparent",
426 TraceIdSource::Generated => "generated",
427 },
428 "enabled": options.telemetry.enabled,
429 "sampled": sampled,
430 "nerdstats": options.telemetry.nerdstats,
431 "sample_rate": options.telemetry.sample_rate,
432 "payload_mode": match options.telemetry.payload_mode {
433 YamlWorkflowPayloadMode::FullPayload => "full_payload",
434 YamlWorkflowPayloadMode::RedactedPayload => "redacted_payload",
435 },
436 "retention_days": options.telemetry.retention_days,
437 "multi_tenant": options.telemetry.multi_tenant,
438 "tool_trace_mode": match options.telemetry.tool_trace_mode {
439 YamlToolTraceMode::Full => "full",
440 YamlToolTraceMode::Redacted => "redacted",
441 YamlToolTraceMode::Off => "off",
442 },
443 },
444 "trace": {
445 "tenant": {
446 "workspace_id": options.trace.tenant.workspace_id,
447 "user_id": options.trace.tenant.user_id,
448 "conversation_id": options.trace.tenant.conversation_id,
449 "request_id": options.trace.tenant.request_id,
450 "run_id": options.trace.tenant.run_id,
451 }
452 },
453 })
454}
455
456fn apply_trace_tenant_attributes(span: &mut dyn WorkflowSpan, options: &YamlWorkflowRunOptions) {
457 if let Some(workspace_id) = options.trace.tenant.workspace_id.as_deref() {
458 span.set_attribute("tenant.workspace_id", workspace_id);
459 }
460 if let Some(user_id) = options.trace.tenant.user_id.as_deref() {
461 span.set_attribute("tenant.user_id", user_id);
462 }
463 if let Some(conversation_id) = options.trace.tenant.conversation_id.as_deref() {
464 span.set_attribute("tenant.conversation_id", conversation_id);
465 }
466 if let Some(request_id) = options.trace.tenant.request_id.as_deref() {
467 span.set_attribute("tenant.request_id", request_id);
468 }
469 if let Some(run_id) = options.trace.tenant.run_id.as_deref() {
470 span.set_attribute("tenant.run_id", run_id);
471 }
472}
473
474fn workflow_nerdstats(output: &YamlWorkflowRunOutput) -> Value {
475 let llm_nodes_without_usage: Vec<String> = output
476 .step_timings
477 .iter()
478 .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
479 .map(|step| step.node_id.clone())
480 .collect();
481 let token_metrics_available = llm_nodes_without_usage.is_empty();
482 let token_metrics_source = if token_metrics_available {
483 "provider_usage"
484 } else {
485 "provider_stream_usage_unavailable"
486 };
487 let total_input_tokens = if token_metrics_available {
488 json!(output.total_input_tokens)
489 } else {
490 Value::Null
491 };
492 let total_output_tokens = if token_metrics_available {
493 json!(output.total_output_tokens)
494 } else {
495 Value::Null
496 };
497 let total_tokens = if token_metrics_available {
498 json!(output.total_tokens)
499 } else {
500 Value::Null
501 };
502 let total_thinking_tokens = if token_metrics_available {
503 json!(output.total_thinking_tokens)
504 } else {
505 Value::Null
506 };
507 let tokens_per_second = if token_metrics_available {
508 json!(output.tokens_per_second)
509 } else {
510 Value::Null
511 };
512
513 json!({
514 "workflow_id": output.workflow_id,
515 "terminal_node": output.terminal_node,
516 "total_elapsed_ms": output.total_elapsed_ms,
517 "ttft_ms": output.ttft_ms,
518 "step_timings": output.step_timings,
519 "llm_node_metrics": output.llm_node_metrics,
520 "total_input_tokens": total_input_tokens,
521 "total_output_tokens": total_output_tokens,
522 "total_tokens": total_tokens,
523 "total_thinking_tokens": total_thinking_tokens,
524 "tokens_per_second": tokens_per_second,
525 "trace_id": output.trace_id,
526 "token_metrics_available": token_metrics_available,
527 "token_metrics_source": token_metrics_source,
528 "llm_nodes_without_usage": llm_nodes_without_usage,
529 })
530}
531
532fn payload_for_span(mode: YamlWorkflowPayloadMode, payload: &Value) -> String {
533 match mode {
534 YamlWorkflowPayloadMode::FullPayload => payload.to_string(),
535 YamlWorkflowPayloadMode::RedactedPayload => json!({
536 "redacted": true,
537 "value_type": match payload {
538 Value::Null => "null",
539 Value::Bool(_) => "bool",
540 Value::Number(_) => "number",
541 Value::String(_) => "string",
542 Value::Array(_) => "array",
543 Value::Object(_) => "object",
544 }
545 })
546 .to_string(),
547 }
548}
549
550fn payload_for_tool_trace(mode: YamlToolTraceMode, payload: &Value) -> Value {
551 match mode {
552 YamlToolTraceMode::Full => payload.clone(),
553 YamlToolTraceMode::Redacted => json!({
554 "redacted": true,
555 "value_type": json_type_name(payload),
556 }),
557 YamlToolTraceMode::Off => Value::Null,
558 }
559}
560
561fn validate_json_schema(schema: &Value) -> Result<(), String> {
562 jsonschema::JSONSchema::compile(schema)
563 .map(|_| ())
564 .map_err(|error| format!("invalid JSON schema: {error}"))
565}
566
567fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
568 let validator = jsonschema::JSONSchema::compile(schema)
569 .map_err(|error| format!("invalid JSON schema: {error}"))?;
570 if let Err(errors) = validator.validate(instance) {
571 let message = errors
572 .into_iter()
573 .next()
574 .map(|error| error.to_string())
575 .unwrap_or_else(|| "unknown schema validation error".to_string());
576 return Err(format!("schema validation failed: {message}"));
577 }
578 Ok(())
579}
580
581fn schema_type(schema: &Value) -> Option<&str> {
582 schema.get("type").and_then(Value::as_str)
583}
584
585fn schema_expects_object(schema: &Value) -> bool {
586 schema_type(schema) == Some("object")
587}
588
589fn trace_context_from_options(options: &YamlWorkflowRunOptions) -> Option<TraceContext> {
590 options.trace.context.as_ref().map(|input| TraceContext {
591 trace_id: input.trace_id.clone(),
592 span_id: input.span_id.clone(),
593 parent_span_id: input.parent_span_id.clone(),
594 traceparent: input.traceparent.clone(),
595 tracestate: input.tracestate.clone(),
596 baggage: input.baggage.clone(),
597 })
598}
599
600fn merged_trace_context_for_worker(
601 span_context: Option<&TraceContext>,
602 resolved_trace_id: Option<&str>,
603 options: &YamlWorkflowRunOptions,
604) -> TraceContext {
605 let input_context = options.trace.context.as_ref();
606 let baggage = if let Some(context) = span_context {
607 if !context.baggage.is_empty() {
608 context.baggage.clone()
609 } else {
610 input_context
611 .map(|value| value.baggage.clone())
612 .unwrap_or_default()
613 }
614 } else {
615 input_context
616 .map(|value| value.baggage.clone())
617 .unwrap_or_default()
618 };
619
620 TraceContext {
621 trace_id: span_context
622 .and_then(|context| context.trace_id.clone())
623 .or_else(|| resolved_trace_id.map(|value| value.to_string()))
624 .or_else(|| input_context.and_then(|context| context.trace_id.clone())),
625 span_id: span_context
626 .and_then(|context| context.span_id.clone())
627 .or_else(|| input_context.and_then(|context| context.span_id.clone())),
628 parent_span_id: span_context
629 .and_then(|context| context.parent_span_id.clone())
630 .or_else(|| input_context.and_then(|context| context.parent_span_id.clone())),
631 traceparent: span_context
632 .and_then(|context| context.traceparent.clone())
633 .or_else(|| input_context.and_then(|context| context.traceparent.clone())),
634 tracestate: span_context
635 .and_then(|context| context.tracestate.clone())
636 .or_else(|| input_context.and_then(|context| context.tracestate.clone())),
637 baggage,
638 }
639}
640
641fn custom_worker_context_with_trace(
642 context: &Value,
643 trace_context: &TraceContext,
644 tenant_context: &YamlWorkflowTraceTenantContext,
645) -> Value {
646 let mut context_with_trace = context.clone();
647 let Some(root) = context_with_trace.as_object_mut() else {
648 return context_with_trace;
649 };
650
651 root.insert(
652 "trace".to_string(),
653 json!({
654 "context": {
655 "trace_id": trace_context.trace_id,
656 "span_id": trace_context.span_id,
657 "parent_span_id": trace_context.parent_span_id,
658 "traceparent": trace_context.traceparent,
659 "tracestate": trace_context.tracestate,
660 "baggage": trace_context.baggage,
661 },
662 "tenant": {
663 "workspace_id": tenant_context.workspace_id,
664 "user_id": tenant_context.user_id,
665 "conversation_id": tenant_context.conversation_id,
666 "request_id": tenant_context.request_id,
667 "run_id": tenant_context.run_id,
668 }
669 }),
670 );
671
672 context_with_trace
673}
674
675fn include_raw_stream_debug_events() -> bool {
676 match std::env::var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW") {
677 Ok(value) => {
678 let normalized = value.trim().to_ascii_lowercase();
679 normalized == "1" || normalized == "true" || normalized == "yes" || normalized == "on"
680 }
681 Err(_) => false,
682 }
683}
684
685#[derive(Debug)]
686struct StreamedPayloadResolution {
687 payload: Value,
688 heal_confidence: Option<f32>,
689}
690
691#[derive(Debug, Default)]
692struct StreamJsonAsTextFormatter {
693 raw_json: String,
694 emitted: bool,
695}
696
697impl StreamJsonAsTextFormatter {
698 fn push(&mut self, chunk: &str) {
699 self.raw_json.push_str(chunk);
700 }
701
702 fn emit_if_ready(&mut self, complete: bool) -> Option<String> {
703 if self.emitted || !complete {
704 return None;
705 }
706 self.emitted = true;
707 Some(render_json_object_as_text(self.raw_json.as_str()))
708 }
709}
710
711fn render_json_object_as_text(raw_json: &str) -> String {
712 let value = match serde_json::from_str::<Value>(raw_json) {
713 Ok(value) => value,
714 Err(_) => return raw_json.to_string(),
715 };
716 let Some(object) = value.as_object() else {
717 return raw_json.to_string();
718 };
719
720 let mut lines = Vec::with_capacity(object.len());
721 for (key, value) in object {
722 let rendered = match value {
723 Value::String(text) => text.clone(),
724 _ => value.to_string(),
725 };
726 lines.push(format!("{key}: {rendered}"));
727 }
728 lines.join("\n")
729}
730
731#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
732#[serde(rename_all = "snake_case")]
733pub enum YamlWorkflowTokenKind {
734 Output,
735 Thinking,
736}
737
738#[derive(Debug, Default)]
739struct StructuredJsonDeltaFilter {
740 started: bool,
741 completed: bool,
742 depth: u32,
743 in_string: bool,
744 escape: bool,
745}
746
747impl StructuredJsonDeltaFilter {
748 fn split(&mut self, delta: &str) -> (Option<String>, Option<String>) {
749 if delta.is_empty() {
750 return (None, None);
751 }
752
753 let mut output = String::new();
754 let mut thinking = String::new();
755
756 for ch in delta.chars() {
757 if self.completed {
758 thinking.push(ch);
759 continue;
760 }
761
762 if !self.started {
763 if ch != '{' {
764 thinking.push(ch);
765 continue;
766 }
767 self.started = true;
768 self.depth = 1;
769 output.push(ch);
770 continue;
771 }
772
773 output.push(ch);
774 if self.in_string {
775 if self.escape {
776 self.escape = false;
777 continue;
778 }
779 if ch == '\\' {
780 self.escape = true;
781 continue;
782 }
783 if ch == '"' {
784 self.in_string = false;
785 }
786 continue;
787 }
788
789 match ch {
790 '"' => self.in_string = true,
791 '{' => self.depth = self.depth.saturating_add(1),
792 '}' => {
793 if self.depth > 0 {
794 self.depth -= 1;
795 }
796 if self.depth == 0 {
797 self.completed = true;
798 }
799 }
800 _ => {}
801 }
802 }
803
804 let output = if output.is_empty() {
805 None
806 } else {
807 Some(output)
808 };
809 let thinking = if thinking.is_empty() {
810 None
811 } else {
812 Some(thinking)
813 };
814
815 (output, thinking)
816 }
817
818 fn completed(&self) -> bool {
819 self.completed
820 }
821}
822
823fn extract_last_fenced_json_block(raw: &str) -> Option<&str> {
824 let start = raw.rfind("```json")?;
825 let remainder = &raw[start + "```json".len()..];
826 let end = remainder.find("```")?;
827 let candidate = remainder[..end].trim();
828 if candidate.is_empty() {
829 return None;
830 }
831 Some(candidate)
832}
833
834fn extract_balanced_object_from(raw: &str, start_index: usize) -> Option<&str> {
835 let mut depth = 0u32;
836 let mut in_string = false;
837 let mut escape = false;
838
839 for (relative_index, ch) in raw[start_index..].char_indices() {
840 if in_string {
841 if escape {
842 escape = false;
843 continue;
844 }
845 if ch == '\\' {
846 escape = true;
847 continue;
848 }
849 if ch == '"' {
850 in_string = false;
851 }
852 continue;
853 }
854
855 match ch {
856 '"' => in_string = true,
857 '{' => depth = depth.saturating_add(1),
858 '}' => {
859 if depth == 0 {
860 return None;
861 }
862 depth -= 1;
863 if depth == 0 {
864 let end_index = start_index + relative_index + ch.len_utf8();
865 return Some(raw[start_index..end_index].trim());
866 }
867 }
868 _ => {}
869 }
870 }
871
872 None
873}
874
875fn extract_last_parsable_object(raw: &str) -> Option<&str> {
876 let starts: Vec<usize> = raw
877 .char_indices()
878 .filter_map(|(index, ch)| if ch == '{' { Some(index) } else { None })
879 .collect();
880
881 for start in starts.into_iter().rev() {
882 let Some(candidate) = extract_balanced_object_from(raw, start) else {
883 continue;
884 };
885 if serde_json::from_str::<Value>(candidate).is_ok() {
886 return Some(candidate);
887 }
888 }
889
890 None
891}
892
893fn resolve_structured_json_candidate(raw: &str) -> Option<&str> {
894 extract_last_fenced_json_block(raw).or_else(|| extract_last_parsable_object(raw))
895}
896
897fn parse_streamed_structured_payload(
898 raw: &str,
899 heal: bool,
900) -> Result<StreamedPayloadResolution, String> {
901 if !heal {
902 if let Ok(payload) = serde_json::from_str::<Value>(raw) {
903 return Ok(StreamedPayloadResolution {
904 payload,
905 heal_confidence: None,
906 });
907 }
908
909 let candidate = resolve_structured_json_candidate(raw).ok_or_else(|| {
910 "failed to parse streamed structured completion JSON: no JSON object candidate found"
911 .to_string()
912 })?;
913 let payload = serde_json::from_str::<Value>(candidate).map_err(|error| {
914 format!(
915 "failed to parse streamed structured completion JSON: {error}; candidate={candidate}"
916 )
917 })?;
918 return Ok(StreamedPayloadResolution {
919 payload,
920 heal_confidence: None,
921 });
922 }
923
924 let candidate = resolve_structured_json_candidate(raw).unwrap_or(raw);
925 let parser = JsonishParser::new();
926 let healed = parser
927 .parse(candidate)
928 .map_err(|error| format!("failed to heal streamed structured completion JSON: {error}"))?;
929
930 Ok(StreamedPayloadResolution {
931 payload: healed.value,
932 heal_confidence: Some(healed.confidence),
933 })
934}
935
936#[derive(Debug, Clone, PartialEq, Serialize)]
937pub struct YamlWorkflowEvent {
938 pub event_type: String,
939 pub node_id: Option<String>,
940 pub step_id: Option<String>,
941 pub node_kind: Option<String>,
942 pub streamable: Option<bool>,
943 pub message: Option<String>,
944 pub delta: Option<String>,
945 pub token_kind: Option<YamlWorkflowTokenKind>,
946 pub is_terminal_node_token: Option<bool>,
947 pub elapsed_ms: Option<u128>,
948 pub metadata: Option<Value>,
949}
950
951pub type WorkflowMessageRole = Role;
952
953#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
954pub struct WorkflowMessage {
955 pub role: WorkflowMessageRole,
956 pub content: String,
957 #[serde(default)]
958 pub name: Option<String>,
959 #[serde(default, alias = "toolCallId")]
960 pub tool_call_id: Option<String>,
961}
962
963#[derive(Debug, Clone, PartialEq, Serialize)]
964pub struct YamlTemplateBinding {
965 pub index: usize,
966 pub expression: String,
967 pub source_path: String,
968 pub resolved: Value,
969 pub resolved_type: String,
970 pub missing: bool,
971}
972
973#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
974pub enum YamlWorkflowDiagnosticSeverity {
975 Error,
976 Warning,
977}
978
979#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
980pub struct YamlWorkflowDiagnostic {
981 pub node_id: Option<String>,
982 pub code: String,
983 pub severity: YamlWorkflowDiagnosticSeverity,
984 pub message: String,
985}
986
987#[derive(Debug, Error)]
988pub enum YamlWorkflowRunError {
989 #[error("failed to read workflow yaml '{path}': {source}")]
990 Read {
991 path: String,
992 source: std::io::Error,
993 },
994 #[error("failed to parse workflow yaml '{path}': {source}")]
995 Parse {
996 path: String,
997 source: serde_yaml::Error,
998 },
999 #[error("workflow '{workflow_id}' has no nodes")]
1000 EmptyNodes { workflow_id: String },
1001 #[error("entry node '{entry_node}' does not exist")]
1002 MissingEntry { entry_node: String },
1003 #[error("unknown node id '{node_id}'")]
1004 MissingNode { node_id: String },
1005 #[error("unsupported node type in '{node_id}'")]
1006 UnsupportedNodeType { node_id: String },
1007 #[error("unsupported switch condition format: {condition}")]
1008 UnsupportedCondition { condition: String },
1009 #[error("switch node '{node_id}' has no valid next target")]
1010 InvalidSwitchTarget { node_id: String },
1011 #[error("llm returned non-object payload for node '{node_id}'")]
1012 LlmPayloadNotObject { node_id: String },
1013 #[error("custom worker handler '{handler}' is not supported")]
1014 UnsupportedCustomHandler { handler: String },
1015 #[error("llm execution failed for node '{node_id}': {message}")]
1016 Llm { node_id: String, message: String },
1017 #[error("custom worker execution failed for node '{node_id}': {message}")]
1018 CustomWorker { node_id: String, message: String },
1019 #[error("workflow validation failed with {diagnostics_count} error(s)")]
1020 Validation {
1021 diagnostics_count: usize,
1022 diagnostics: Vec<YamlWorkflowDiagnostic>,
1023 },
1024 #[error("invalid workflow input: {message}")]
1025 InvalidInput { message: String },
1026 #[error("ir runtime execution failed: {message}")]
1027 IrRuntime { message: String },
1028 #[error("workflow event stream cancelled: {message}")]
1029 EventSinkCancelled { message: String },
1030}
1031
1032pub trait YamlWorkflowEventSink: Send + Sync {
1033 fn emit(&self, event: &YamlWorkflowEvent);
1034
1035 fn is_cancelled(&self) -> bool {
1036 false
1037 }
1038}
1039
1040pub struct NoopYamlWorkflowEventSink;
1041
1042impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
1043 fn emit(&self, _event: &YamlWorkflowEvent) {}
1044}
1045
1046fn workflow_event_sink_cancelled_message() -> &'static str {
1047 "workflow event callback cancelled"
1048}
1049
1050fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
1051 event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
1052}
1053
1054#[derive(Debug, Clone, PartialEq, Eq, Error)]
1055pub enum YamlToIrError {
1056 #[error("entry node '{entry_node}' does not exist")]
1057 MissingEntry { entry_node: String },
1058 #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
1059 MultipleOutgoingEdge { node_id: String },
1060 #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
1061 UnsupportedNode { node_id: String, reason: String },
1062}
1063
1064pub fn yaml_workflow_to_mermaid(workflow: &YamlWorkflow) -> String {
1066 if let Ok(ir) = yaml_workflow_to_ir(workflow) {
1067 return workflow_to_mermaid(&ir);
1068 }
1069
1070 yaml_workflow_to_mermaid_fallback(workflow)
1071}
1072
1073fn yaml_workflow_to_mermaid_fallback(workflow: &YamlWorkflow) -> String {
1074 let mut lines = Vec::new();
1075 lines.push("flowchart TD".to_string());
1076 let mut tool_node_ids: Vec<String> = Vec::new();
1077
1078 for node in &workflow.nodes {
1079 let label = format!("{}\\n({})", node.id, node.kind_name());
1080 lines.push(format!(
1081 " {}[\"{}\"]",
1082 sanitize_mermaid_id(&node.id),
1083 escape_mermaid_label(label.as_str()),
1084 ));
1085
1086 if let Some(llm) = node.node_type.llm_call.as_ref() {
1087 for (idx, tool_name) in llm_tool_names(llm).into_iter().enumerate() {
1088 let tool_id = sanitize_mermaid_id(format!("{}__tool_{}", node.id, idx).as_str());
1089 lines.push(format!(
1090 " {}([\"{}\"])",
1091 tool_id,
1092 escape_mermaid_label(format!("tool: {tool_name}").as_str())
1093 ));
1094 lines.push(format!(
1095 " {} -.-> {}",
1096 sanitize_mermaid_id(&node.id),
1097 tool_id
1098 ));
1099 tool_node_ids.push(tool_id);
1100 }
1101 }
1102 }
1103
1104 let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1105
1106 for edge in &workflow.edges {
1107 emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1108 }
1109
1110 for node in &workflow.nodes {
1111 if let Some(switch) = node.node_type.switch.as_ref() {
1112 for branch in &switch.branches {
1113 emitted.insert((
1114 node.id.clone(),
1115 branch.condition.clone(),
1116 branch.target.clone(),
1117 ));
1118 }
1119 emitted.insert((
1120 node.id.clone(),
1121 "default".to_string(),
1122 switch.default.clone(),
1123 ));
1124 }
1125 }
1126
1127 let mut edges = emitted.into_iter().collect::<Vec<_>>();
1128 edges.sort();
1129
1130 for (from, label, to) in edges {
1131 if label.is_empty() {
1132 lines.push(format!(
1133 " {} --> {}",
1134 sanitize_mermaid_id(&from),
1135 sanitize_mermaid_id(&to)
1136 ));
1137 } else {
1138 lines.push(format!(
1139 " {} -- \"{}\" --> {}",
1140 sanitize_mermaid_id(&from),
1141 escape_mermaid_label(&label),
1142 sanitize_mermaid_id(&to)
1143 ));
1144 }
1145 }
1146
1147 if !tool_node_ids.is_empty() {
1148 lines.push(" classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1149 lines.push(format!(" class {} toolNode;", tool_node_ids.join(",")));
1150 }
1151
1152 lines.join("\n")
1153}
1154
1155fn llm_tool_names(llm: &YamlLlmCall) -> Vec<String> {
1156 llm.tools
1157 .iter()
1158 .map(|tool| match tool {
1159 YamlToolDeclaration::OpenAi(openai) => openai.function.name.clone(),
1160 YamlToolDeclaration::Simplified(simple) => simple.name.clone(),
1161 })
1162 .collect()
1163}
1164
1165pub fn yaml_workflow_file_to_mermaid(workflow_path: &Path) -> Result<String, YamlWorkflowRunError> {
1167 let contents =
1168 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1169 path: workflow_path.display().to_string(),
1170 source,
1171 })?;
1172
1173 let workflow: YamlWorkflow =
1174 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1175 path: workflow_path.display().to_string(),
1176 source,
1177 })?;
1178
1179 let referenced_subgraphs = discover_referenced_subgraphs(workflow_path, &workflow)?;
1180 if referenced_subgraphs.is_empty() {
1181 return Ok(yaml_workflow_to_mermaid(&workflow));
1182 }
1183
1184 Ok(yaml_workflow_to_mermaid_with_subgraphs(
1185 &workflow,
1186 &referenced_subgraphs,
1187 ))
1188}
1189
1190#[derive(Debug, Clone)]
1191struct MermaidSubgraphWorkflow {
1192 alias: String,
1193 workflow: YamlWorkflow,
1194}
1195
1196#[derive(Debug, Default)]
1197struct MermaidBlockRender {
1198 lines: Vec<String>,
1199 tool_node_ids: Vec<String>,
1200 run_workflow_tool_node_ids: Vec<String>,
1201 entry_node_id: String,
1202}
1203
1204fn yaml_workflow_to_mermaid_with_subgraphs(
1205 workflow: &YamlWorkflow,
1206 subgraphs: &[MermaidSubgraphWorkflow],
1207) -> String {
1208 let mut lines = vec!["flowchart TD".to_string()];
1209
1210 let main_block = render_mermaid_block(workflow, "main");
1211 lines.push(format!(
1212 " subgraph main_graph[\"Main: {}\"]",
1213 escape_mermaid_label(&workflow.id)
1214 ));
1215 for line in &main_block.lines {
1216 lines.push(format!(" {line}"));
1217 }
1218 lines.push(" end".to_string());
1219
1220 let mut all_tool_nodes = main_block.tool_node_ids.clone();
1221
1222 for (index, subgraph) in subgraphs.iter().enumerate() {
1223 let block_id = format!("subgraph_{}", index + 1);
1224 let block = render_mermaid_block(&subgraph.workflow, &block_id);
1225 lines.push(format!(
1226 " subgraph {}[\"Subgraph: {}\"]",
1227 sanitize_mermaid_id(&format!("{}_cluster", block_id)),
1228 escape_mermaid_label(&subgraph.alias)
1229 ));
1230 for line in &block.lines {
1231 lines.push(format!(" {line}"));
1232 }
1233 lines.push(" end".to_string());
1234
1235 for tool_node in &main_block.run_workflow_tool_node_ids {
1236 lines.push(format!(
1237 " {} -. \"{}\" .-> {}",
1238 tool_node,
1239 escape_mermaid_label(&format!("calls {}", subgraph.alias)),
1240 block.entry_node_id
1241 ));
1242 }
1243
1244 all_tool_nodes.extend(block.tool_node_ids);
1245 }
1246
1247 if !all_tool_nodes.is_empty() {
1248 lines.push(" classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1249 lines.push(format!(" class {} toolNode;", all_tool_nodes.join(",")));
1250 }
1251
1252 lines.join("\n")
1253}
1254
1255fn render_mermaid_block(workflow: &YamlWorkflow, prefix: &str) -> MermaidBlockRender {
1256 let mut block = MermaidBlockRender {
1257 entry_node_id: prefixed_mermaid_id(prefix, &workflow.entry_node),
1258 ..Default::default()
1259 };
1260
1261 for node in &workflow.nodes {
1262 let node_id = prefixed_mermaid_id(prefix, &node.id);
1263 let label = format!("{}\\n({})", node.id, node.kind_name());
1264 block
1265 .lines
1266 .push(format!("{}[\"{}\"]", node_id, escape_mermaid_label(&label)));
1267
1268 if let Some(llm) = node.node_type.llm_call.as_ref() {
1269 for (idx, tool) in llm.tools.iter().enumerate() {
1270 let tool_name = tool_declaration_name(tool);
1271 let tool_id = prefixed_mermaid_id(prefix, &format!("{}__tool_{}", node.id, idx));
1272 block.lines.push(format!(
1273 "{}([\"{}\"])",
1274 tool_id,
1275 escape_mermaid_label(format!("tool: {tool_name}").as_str())
1276 ));
1277 block.lines.push(format!("{} -.-> {}", node_id, tool_id));
1278 block.tool_node_ids.push(tool_id.clone());
1279
1280 if tool_name == "run_workflow_graph" {
1281 block.run_workflow_tool_node_ids.push(tool_id);
1282 }
1283 }
1284 }
1285 }
1286
1287 let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1288
1289 for edge in &workflow.edges {
1290 emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1291 }
1292
1293 for node in &workflow.nodes {
1294 if let Some(switch) = node.node_type.switch.as_ref() {
1295 for branch in &switch.branches {
1296 emitted.insert((
1297 node.id.clone(),
1298 branch.condition.clone(),
1299 branch.target.clone(),
1300 ));
1301 }
1302 emitted.insert((
1303 node.id.clone(),
1304 "default".to_string(),
1305 switch.default.clone(),
1306 ));
1307 }
1308 }
1309
1310 let mut edges = emitted.into_iter().collect::<Vec<_>>();
1311 edges.sort();
1312
1313 for (from, label, to) in edges {
1314 let from_id = prefixed_mermaid_id(prefix, &from);
1315 let to_id = prefixed_mermaid_id(prefix, &to);
1316 if label.is_empty() {
1317 block.lines.push(format!("{} --> {}", from_id, to_id));
1318 } else {
1319 block.lines.push(format!(
1320 "{} -- \"{}\" --> {}",
1321 from_id,
1322 escape_mermaid_label(&label),
1323 to_id
1324 ));
1325 }
1326 }
1327
1328 block
1329}
1330
1331fn discover_referenced_subgraphs(
1332 workflow_path: &Path,
1333 workflow: &YamlWorkflow,
1334) -> Result<Vec<MermaidSubgraphWorkflow>, YamlWorkflowRunError> {
1335 let workflow_ids = referenced_workflow_ids(workflow);
1336 if workflow_ids.is_empty() {
1337 return Ok(Vec::new());
1338 }
1339
1340 let parent_dir = workflow_path.parent().unwrap_or(Path::new("."));
1341 let sibling_workflows = load_yaml_sibling_workflows(parent_dir, workflow_path)?;
1342
1343 let mut discovered = Vec::new();
1344 let mut seen = HashSet::new();
1345
1346 for workflow_id in workflow_ids {
1347 let normalized = normalize_workflow_lookup_key(&workflow_id);
1348 if seen.contains(&normalized) {
1349 continue;
1350 }
1351
1352 if let Some((_, subworkflow)) = sibling_workflows.iter().find(|(key, _)| key == &normalized)
1353 {
1354 discovered.push(MermaidSubgraphWorkflow {
1355 alias: workflow_id.clone(),
1356 workflow: subworkflow.clone(),
1357 });
1358 seen.insert(normalized);
1359 }
1360 }
1361
1362 Ok(discovered)
1363}
1364
1365fn load_yaml_sibling_workflows(
1366 parent_dir: &Path,
1367 workflow_path: &Path,
1368) -> Result<Vec<(String, YamlWorkflow)>, YamlWorkflowRunError> {
1369 let mut results = Vec::new();
1370 let entries = std::fs::read_dir(parent_dir).map_err(|source| YamlWorkflowRunError::Read {
1371 path: parent_dir.display().to_string(),
1372 source,
1373 })?;
1374
1375 for entry in entries {
1376 let entry = entry.map_err(|source| YamlWorkflowRunError::Read {
1377 path: parent_dir.display().to_string(),
1378 source,
1379 })?;
1380 let path = entry.path();
1381 if !is_yaml_file(&path) {
1382 continue;
1383 }
1384
1385 if path == workflow_path {
1386 continue;
1387 }
1388
1389 let contents =
1390 std::fs::read_to_string(&path).map_err(|source| YamlWorkflowRunError::Read {
1391 path: path.display().to_string(),
1392 source,
1393 })?;
1394 let subworkflow: YamlWorkflow =
1395 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1396 path: path.display().to_string(),
1397 source,
1398 })?;
1399
1400 if let Some(stem) = path.file_stem().and_then(|value| value.to_str()) {
1401 results.push((normalize_workflow_lookup_key(stem), subworkflow.clone()));
1402 }
1403 results.push((normalize_workflow_lookup_key(&subworkflow.id), subworkflow));
1404 }
1405
1406 Ok(results)
1407}
1408
1409fn referenced_workflow_ids(workflow: &YamlWorkflow) -> Vec<String> {
1410 let mut ids = Vec::new();
1411 let mut seen = HashSet::new();
1412
1413 for node in &workflow.nodes {
1414 let prompt = node
1415 .config
1416 .as_ref()
1417 .and_then(|config| config.prompt.as_deref());
1418
1419 if let Some(llm) = node.node_type.llm_call.as_ref() {
1420 for tool in &llm.tools {
1421 if tool_declaration_name(tool) != "run_workflow_graph" {
1422 continue;
1423 }
1424
1425 for workflow_id in referenced_workflow_ids_from_tool(tool) {
1426 let normalized = normalize_workflow_lookup_key(&workflow_id);
1427 if seen.insert(normalized) {
1428 ids.push(workflow_id);
1429 }
1430 }
1431
1432 if let Some(prompt_text) = prompt {
1433 for workflow_id in referenced_workflow_ids_from_prompt(prompt_text) {
1434 let normalized = normalize_workflow_lookup_key(&workflow_id);
1435 if seen.insert(normalized) {
1436 ids.push(workflow_id);
1437 }
1438 }
1439 }
1440 }
1441 }
1442 }
1443
1444 ids
1445}
1446
1447fn referenced_workflow_ids_from_tool(tool: &YamlToolDeclaration) -> Vec<String> {
1448 let schema = match tool {
1449 YamlToolDeclaration::OpenAi(openai) => openai.function.parameters.as_ref(),
1450 YamlToolDeclaration::Simplified(simple) => Some(&simple.input_schema),
1451 };
1452
1453 let mut ids = Vec::new();
1454 if let Some(schema) = schema {
1455 if let Some(workflow_prop) = schema
1456 .get("properties")
1457 .and_then(Value::as_object)
1458 .and_then(|properties| properties.get("workflow_id"))
1459 {
1460 if let Some(value) = workflow_prop.get("const").and_then(Value::as_str) {
1461 ids.push(value.to_string());
1462 }
1463
1464 if let Some(enum_values) = workflow_prop.get("enum").and_then(Value::as_array) {
1465 for value in enum_values {
1466 if let Some(value) = value.as_str() {
1467 ids.push(value.to_string());
1468 }
1469 }
1470 }
1471 }
1472 }
1473
1474 ids
1475}
1476
1477fn referenced_workflow_ids_from_prompt(prompt: &str) -> Vec<String> {
1478 let mut ids = Vec::new();
1479 let mut search = prompt;
1480
1481 while let Some(index) = search.find("\"workflow_id\"") {
1482 let remainder = &search[index + "\"workflow_id\"".len()..];
1483 let Some(colon_index) = remainder.find(':') else {
1484 break;
1485 };
1486
1487 let candidate = remainder[colon_index + 1..].trim_start();
1488 if let Some(rest) = candidate.strip_prefix('"') {
1489 if let Some(end_quote_index) = rest.find('"') {
1490 let workflow_id = rest[..end_quote_index].trim();
1491 if !workflow_id.is_empty() {
1492 ids.push(workflow_id.to_string());
1493 }
1494 search = &rest[end_quote_index + 1..];
1495 continue;
1496 }
1497 }
1498
1499 search = &remainder[colon_index + 1..];
1500 }
1501
1502 ids
1503}
1504
1505fn normalize_workflow_lookup_key(value: &str) -> String {
1506 value
1507 .chars()
1508 .map(|ch| match ch {
1509 '-' => '_',
1510 _ => ch.to_ascii_lowercase(),
1511 })
1512 .collect()
1513}
1514
1515fn is_yaml_file(path: &Path) -> bool {
1516 matches!(
1517 path.extension().and_then(|ext| ext.to_str()),
1518 Some("yaml") | Some("yml")
1519 )
1520}
1521
1522fn prefixed_mermaid_id(prefix: &str, id: &str) -> String {
1523 sanitize_mermaid_id(format!("{}__{}", prefix, id).as_str())
1524}
1525
1526fn tool_declaration_name(tool: &YamlToolDeclaration) -> &str {
1527 match tool {
1528 YamlToolDeclaration::OpenAi(openai) => &openai.function.name,
1529 YamlToolDeclaration::Simplified(simple) => &simple.name,
1530 }
1531}
1532
1533pub fn yaml_workflow_to_ir(workflow: &YamlWorkflow) -> Result<WorkflowDefinition, YamlToIrError> {
1534 let known_ids: HashSet<&str> = workflow.nodes.iter().map(|n| n.id.as_str()).collect();
1535 if !known_ids.contains(workflow.entry_node.as_str()) {
1536 return Err(YamlToIrError::MissingEntry {
1537 entry_node: workflow.entry_node.clone(),
1538 });
1539 }
1540
1541 let mut outgoing: HashMap<&str, Vec<&str>> = HashMap::new();
1542 for edge in &workflow.edges {
1543 outgoing
1544 .entry(edge.from.as_str())
1545 .or_default()
1546 .push(edge.to.as_str());
1547 }
1548
1549 let mut nodes = Vec::with_capacity(workflow.nodes.len() + 1);
1550 nodes.push(Node {
1551 id: YAML_START_NODE_ID.to_string(),
1552 kind: NodeKind::Start {
1553 next: workflow.entry_node.clone(),
1554 },
1555 });
1556
1557 for node in &workflow.nodes {
1558 if let Some(llm) = node.node_type.llm_call.as_ref() {
1559 if node
1560 .config
1561 .as_ref()
1562 .and_then(|c| c.set_globals.as_ref())
1563 .is_some()
1564 || node
1565 .config
1566 .as_ref()
1567 .and_then(|c| c.update_globals.as_ref())
1568 .is_some()
1569 {
1570 return Err(YamlToIrError::UnsupportedNode {
1571 node_id: node.id.clone(),
1572 reason: "set_globals/update_globals are not represented in canonical IR llm nodes yet"
1573 .to_string(),
1574 });
1575 }
1576
1577 if !llm.tools.is_empty() {
1578 return Err(YamlToIrError::UnsupportedNode {
1579 node_id: node.id.clone(),
1580 reason: "llm_call.tools are not represented in canonical IR llm nodes yet"
1581 .to_string(),
1582 });
1583 }
1584
1585 let next = single_next_for_node(&outgoing, &node.id)?;
1586 nodes.push(Node {
1587 id: node.id.clone(),
1588 kind: NodeKind::Tool {
1589 tool: YAML_LLM_TOOL_ID.to_string(),
1590 input: json!({
1591 "node_id": node.id,
1592 "model": llm.model,
1593 "prompt_template": node
1594 .config
1595 .as_ref()
1596 .and_then(|c| c.prompt.clone())
1597 .unwrap_or_default(),
1598 "stream": llm.stream.unwrap_or(false),
1599 "stream_json_as_text": llm.stream_json_as_text.unwrap_or(false),
1600 "heal": llm.heal.unwrap_or(false),
1601 "messages_path": llm.messages_path,
1602 "append_prompt_as_user": llm.append_prompt_as_user.unwrap_or(true),
1603 "output_schema": node
1604 .config
1605 .as_ref()
1606 .and_then(|c| c.output_schema.clone())
1607 .unwrap_or_else(default_llm_output_schema),
1608 }),
1609 next,
1610 },
1611 });
1612 continue;
1613 }
1614
1615 if let Some(worker) = node.node_type.custom_worker.as_ref() {
1616 if node
1617 .config
1618 .as_ref()
1619 .and_then(|c| c.set_globals.as_ref())
1620 .is_some()
1621 || node
1622 .config
1623 .as_ref()
1624 .and_then(|c| c.update_globals.as_ref())
1625 .is_some()
1626 {
1627 return Err(YamlToIrError::UnsupportedNode {
1628 node_id: node.id.clone(),
1629 reason: "set_globals/update_globals are not represented in canonical IR tool nodes yet"
1630 .to_string(),
1631 });
1632 }
1633
1634 let next = single_next_for_node(&outgoing, &node.id)?;
1635 nodes.push(Node {
1636 id: node.id.clone(),
1637 kind: NodeKind::Tool {
1638 tool: worker.handler.clone(),
1639 input: node
1640 .config
1641 .as_ref()
1642 .and_then(|c| c.payload.clone())
1643 .unwrap_or_else(|| json!({})),
1644 next,
1645 },
1646 });
1647 continue;
1648 }
1649
1650 if let Some(switch) = node.node_type.switch.as_ref() {
1651 nodes.push(Node {
1652 id: node.id.clone(),
1653 kind: NodeKind::Router {
1654 routes: switch
1655 .branches
1656 .iter()
1657 .map(|b| RouterRoute {
1658 when: rewrite_yaml_condition_to_ir(&b.condition),
1659 next: b.target.clone(),
1660 })
1661 .collect(),
1662 default: switch.default.clone(),
1663 },
1664 });
1665 continue;
1666 }
1667
1668 return Err(YamlToIrError::UnsupportedNode {
1669 node_id: node.id.clone(),
1670 reason: "node_type must be llm_call, switch, or custom_worker".to_string(),
1671 });
1672 }
1673
1674 Ok(WorkflowDefinition {
1675 version: WORKFLOW_IR_V0.to_string(),
1676 name: workflow.id.clone(),
1677 nodes,
1678 })
1679}
1680
1681fn single_next_for_node(
1682 outgoing: &HashMap<&str, Vec<&str>>,
1683 node_id: &str,
1684) -> Result<Option<String>, YamlToIrError> {
1685 match outgoing.get(node_id) {
1686 None => Ok(None),
1687 Some(targets) if targets.len() == 1 => Ok(Some(targets[0].to_string())),
1688 Some(_) => Err(YamlToIrError::MultipleOutgoingEdge {
1689 node_id: node_id.to_string(),
1690 }),
1691 }
1692}
1693
1694fn rewrite_yaml_condition_to_ir(expr: &str) -> String {
1695 let rewritten = expr
1696 .replace("$.nodes.", "$.node_outputs.")
1697 .replace(".output.", ".");
1698 if let Some(prefix) = rewritten.strip_suffix(".output") {
1699 prefix.to_string()
1700 } else {
1701 rewritten
1702 }
1703}
1704
1705fn sanitize_mermaid_id(id: &str) -> String {
1706 let mut out = String::with_capacity(id.len() + 1);
1707 if id
1708 .chars()
1709 .next()
1710 .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
1711 {
1712 out.push_str(id);
1713 } else {
1714 out.push('n');
1715 out.push('_');
1716 out.push_str(id);
1717 }
1718 out.chars()
1719 .map(|ch| {
1720 if ch.is_ascii_alphanumeric() || ch == '_' {
1721 ch
1722 } else {
1723 '_'
1724 }
1725 })
1726 .collect()
1727}
1728
1729fn escape_mermaid_label(label: &str) -> String {
1730 label.replace('"', "\\\"")
1731}
1732
1733#[derive(Debug, Clone)]
1734pub struct YamlLlmExecutionRequest {
1735 pub node_id: String,
1736 pub is_terminal_node: bool,
1737 pub stream_json_as_text: bool,
1738 pub model: String,
1739 pub messages: Option<Vec<Message>>,
1740 pub append_prompt_as_user: bool,
1741 pub prompt: String,
1742 pub prompt_template: String,
1743 pub prompt_bindings: Vec<YamlTemplateBinding>,
1744 pub schema: Value,
1745 pub stream: bool,
1746 pub heal: bool,
1747 pub tools: Vec<YamlResolvedTool>,
1748 pub tool_choice: Option<ToolChoice>,
1749 pub max_tool_roundtrips: u8,
1750 pub tool_calls_global_key: Option<String>,
1751 pub tool_trace_mode: YamlToolTraceMode,
1752 pub execution_context: Value,
1753 pub email_text: String,
1754 pub trace_id: Option<String>,
1755 pub trace_context: Option<TraceContext>,
1756 pub trace_sampled: bool,
1757}
1758
1759#[derive(Debug, Clone)]
1760pub struct YamlResolvedTool {
1761 pub definition: ToolDefinition,
1762 pub output_schema: Option<Value>,
1763}
1764
1765#[async_trait]
1766pub trait YamlWorkflowLlmExecutor: Send + Sync {
1767 async fn complete_structured(
1768 &self,
1769 request: YamlLlmExecutionRequest,
1770 event_sink: Option<&dyn YamlWorkflowEventSink>,
1771 ) -> Result<YamlLlmExecutionResult, String>;
1772}
1773
1774#[async_trait]
1775pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
1776 async fn execute(
1777 &self,
1778 handler: &str,
1779 payload: &Value,
1780 email_text: &str,
1781 context: &Value,
1782 ) -> Result<Value, String>;
1783}
1784
1785pub async fn run_workflow_yaml_file(
1786 workflow_path: &Path,
1787 workflow_input: &Value,
1788 executor: &dyn YamlWorkflowLlmExecutor,
1789) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1790 let contents =
1791 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1792 path: workflow_path.display().to_string(),
1793 source,
1794 })?;
1795
1796 let workflow: YamlWorkflow =
1797 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1798 path: workflow_path.display().to_string(),
1799 source,
1800 })?;
1801
1802 run_workflow_yaml(&workflow, workflow_input, executor).await
1803}
1804
1805pub async fn run_email_workflow_yaml_file(
1806 workflow_path: &Path,
1807 email_text: &str,
1808 executor: &dyn YamlWorkflowLlmExecutor,
1809) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1810 let workflow_input = json!({ "email_text": email_text });
1811 run_workflow_yaml_file(workflow_path, &workflow_input, executor).await
1812}
1813
1814pub async fn run_workflow_yaml_file_with_client(
1815 workflow_path: &Path,
1816 workflow_input: &Value,
1817 client: &SimpleAgentsClient,
1818) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1819 let contents =
1820 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1821 path: workflow_path.display().to_string(),
1822 source,
1823 })?;
1824
1825 let workflow: YamlWorkflow =
1826 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1827 path: workflow_path.display().to_string(),
1828 source,
1829 })?;
1830
1831 run_workflow_yaml_with_client(&workflow, workflow_input, client).await
1832}
1833
1834pub async fn run_email_workflow_yaml_file_with_client(
1835 workflow_path: &Path,
1836 email_text: &str,
1837 client: &SimpleAgentsClient,
1838) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1839 let workflow_input = json!({ "email_text": email_text });
1840 run_workflow_yaml_file_with_client(workflow_path, &workflow_input, client).await
1841}
1842
1843pub async fn run_workflow_yaml_with_client(
1844 workflow: &YamlWorkflow,
1845 workflow_input: &Value,
1846 client: &SimpleAgentsClient,
1847) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1848 run_workflow_yaml_with_client_and_custom_worker(workflow, workflow_input, client, None).await
1849}
1850
1851pub async fn run_email_workflow_yaml_with_client(
1852 workflow: &YamlWorkflow,
1853 email_text: &str,
1854 client: &SimpleAgentsClient,
1855) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1856 let workflow_input = json!({ "email_text": email_text });
1857 run_workflow_yaml_with_client(workflow, &workflow_input, client).await
1858}
1859
1860pub async fn run_workflow_yaml_file_with_client_and_custom_worker(
1861 workflow_path: &Path,
1862 workflow_input: &Value,
1863 client: &SimpleAgentsClient,
1864 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1865) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1866 let contents =
1867 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1868 path: workflow_path.display().to_string(),
1869 source,
1870 })?;
1871
1872 let workflow: YamlWorkflow =
1873 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1874 path: workflow_path.display().to_string(),
1875 source,
1876 })?;
1877
1878 run_workflow_yaml_with_client_and_custom_worker(
1879 &workflow,
1880 workflow_input,
1881 client,
1882 custom_worker,
1883 )
1884 .await
1885}
1886
1887pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker(
1888 workflow_path: &Path,
1889 email_text: &str,
1890 client: &SimpleAgentsClient,
1891 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1892) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1893 let workflow_input = json!({ "email_text": email_text });
1894 run_workflow_yaml_file_with_client_and_custom_worker(
1895 workflow_path,
1896 &workflow_input,
1897 client,
1898 custom_worker,
1899 )
1900 .await
1901}
1902
1903pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1904 workflow_path: &Path,
1905 workflow_input: &Value,
1906 client: &SimpleAgentsClient,
1907 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1908 event_sink: Option<&dyn YamlWorkflowEventSink>,
1909) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1910 run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1911 workflow_path,
1912 workflow_input,
1913 client,
1914 custom_worker,
1915 event_sink,
1916 &YamlWorkflowRunOptions::default(),
1917 )
1918 .await
1919}
1920
1921pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1922 workflow_path: &Path,
1923 workflow_input: &Value,
1924 client: &SimpleAgentsClient,
1925 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1926 event_sink: Option<&dyn YamlWorkflowEventSink>,
1927 options: &YamlWorkflowRunOptions,
1928) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1929 let contents =
1930 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1931 path: workflow_path.display().to_string(),
1932 source,
1933 })?;
1934
1935 let workflow: YamlWorkflow =
1936 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1937 path: workflow_path.display().to_string(),
1938 source,
1939 })?;
1940
1941 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1942 &workflow,
1943 workflow_input,
1944 client,
1945 custom_worker,
1946 event_sink,
1947 options,
1948 )
1949 .await
1950}
1951
1952pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker_and_events(
1953 workflow_path: &Path,
1954 email_text: &str,
1955 client: &SimpleAgentsClient,
1956 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1957 event_sink: Option<&dyn YamlWorkflowEventSink>,
1958) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1959 let workflow_input = json!({ "email_text": email_text });
1960 run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1961 workflow_path,
1962 &workflow_input,
1963 client,
1964 custom_worker,
1965 event_sink,
1966 )
1967 .await
1968}
1969
1970pub async fn run_workflow_yaml_with_client_and_custom_worker(
1971 workflow: &YamlWorkflow,
1972 workflow_input: &Value,
1973 client: &SimpleAgentsClient,
1974 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1975) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1976 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1977 workflow,
1978 workflow_input,
1979 client,
1980 custom_worker,
1981 None,
1982 &YamlWorkflowRunOptions::default(),
1983 )
1984 .await
1985}
1986
1987pub async fn run_email_workflow_yaml_with_client_and_custom_worker(
1988 workflow: &YamlWorkflow,
1989 email_text: &str,
1990 client: &SimpleAgentsClient,
1991 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1992) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1993 let workflow_input = json!({ "email_text": email_text });
1994 run_workflow_yaml_with_client_and_custom_worker(
1995 workflow,
1996 &workflow_input,
1997 client,
1998 custom_worker,
1999 )
2000 .await
2001}
2002
2003pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events(
2004 workflow: &YamlWorkflow,
2005 workflow_input: &Value,
2006 client: &SimpleAgentsClient,
2007 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2008 event_sink: Option<&dyn YamlWorkflowEventSink>,
2009) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2010 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2011 workflow,
2012 workflow_input,
2013 client,
2014 custom_worker,
2015 event_sink,
2016 &YamlWorkflowRunOptions::default(),
2017 )
2018 .await
2019}
2020
2021pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2022 workflow: &YamlWorkflow,
2023 workflow_input: &Value,
2024 client: &SimpleAgentsClient,
2025 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2026 event_sink: Option<&dyn YamlWorkflowEventSink>,
2027 options: &YamlWorkflowRunOptions,
2028) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2029 struct BorrowedClientExecutor<'a> {
2030 client: &'a SimpleAgentsClient,
2031 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
2032 run_options: YamlWorkflowRunOptions,
2033 }
2034
2035 #[async_trait]
2036 impl<'a> YamlWorkflowLlmExecutor for BorrowedClientExecutor<'a> {
2037 async fn complete_structured(
2038 &self,
2039 request: YamlLlmExecutionRequest,
2040 event_sink: Option<&dyn YamlWorkflowEventSink>,
2041 ) -> Result<YamlLlmExecutionResult, String> {
2042 let expects_object = schema_expects_object(&request.schema);
2043 let messages = if let Some(mut history) = request.messages.clone() {
2044 if request.append_prompt_as_user && !request.prompt.trim().is_empty() {
2045 history.push(Message::user(&request.prompt));
2046 }
2047 history
2048 } else {
2049 vec![
2050 Message::system("You execute workflow classification steps."),
2051 Message::user(&request.prompt),
2052 ]
2053 };
2054
2055 if !request.tools.is_empty() {
2056 let mut tool_traces: Vec<YamlToolCallTrace> = Vec::new();
2057 let mut conversation = messages;
2058 let mut usage_total: Option<YamlLlmTokenUsage> = None;
2059
2060 for roundtrip in 0..=request.max_tool_roundtrips {
2061 let mut builder = CompletionRequest::builder()
2062 .model(&request.model)
2063 .messages(conversation.clone())
2064 .tools(request.tools.iter().map(|t| t.definition.clone()).collect());
2065
2066 if request.heal && expects_object {
2067 builder = builder.json_schema("workflow_step", request.schema.clone());
2068 }
2069
2070 if let Some(choice) = request.tool_choice.clone() {
2071 builder = builder.tool_choice(choice);
2072 }
2073
2074 if request.stream {
2075 builder = builder.stream(true);
2076 }
2077
2078 let completion_request = builder
2079 .build()
2080 .map_err(|error| format!("failed to build completion request: {error}"))?;
2081
2082 let outcome = self
2083 .client
2084 .complete(&completion_request, CompletionOptions::default())
2085 .await
2086 .map_err(|error| error.to_string())?;
2087
2088 let mut streamed_tool_calls: Option<Vec<ToolCall>> = None;
2089 let mut streamed_content = String::new();
2090 let mut finish_reason = FinishReason::Stop;
2091
2092 match outcome {
2093 CompletionOutcome::Response(response) => {
2094 if let Some(usage) = usage_total.as_mut() {
2095 usage.prompt_tokens += response.usage.prompt_tokens;
2096 usage.completion_tokens += response.usage.completion_tokens;
2097 usage.total_tokens += response.usage.total_tokens;
2098 } else {
2099 usage_total = Some(YamlLlmTokenUsage {
2100 prompt_tokens: response.usage.prompt_tokens,
2101 completion_tokens: response.usage.completion_tokens,
2102 total_tokens: response.usage.total_tokens,
2103 thinking_tokens: None,
2104 });
2105 }
2106
2107 let choice = response
2108 .choices
2109 .first()
2110 .ok_or_else(|| "completion returned no choices".to_string())?;
2111 streamed_content = choice.message.content.clone();
2112 streamed_tool_calls = choice.message.tool_calls.clone();
2113 finish_reason = choice.finish_reason;
2114 }
2115 CompletionOutcome::HealedJson(healed) => {
2116 let response = healed.response;
2117 if let Some(usage) = usage_total.as_mut() {
2118 usage.prompt_tokens += response.usage.prompt_tokens;
2119 usage.completion_tokens += response.usage.completion_tokens;
2120 usage.total_tokens += response.usage.total_tokens;
2121 } else {
2122 usage_total = Some(YamlLlmTokenUsage {
2123 prompt_tokens: response.usage.prompt_tokens,
2124 completion_tokens: response.usage.completion_tokens,
2125 total_tokens: response.usage.total_tokens,
2126 thinking_tokens: None,
2127 });
2128 }
2129
2130 let choice = response
2131 .choices
2132 .first()
2133 .ok_or_else(|| "completion returned no choices".to_string())?;
2134 streamed_content = choice.message.content.clone();
2135 streamed_tool_calls = choice.message.tool_calls.clone();
2136 finish_reason = choice.finish_reason;
2137 }
2138 CompletionOutcome::CoercedSchema(coerced) => {
2139 let response = coerced.response;
2140 if let Some(usage) = usage_total.as_mut() {
2141 usage.prompt_tokens += response.usage.prompt_tokens;
2142 usage.completion_tokens += response.usage.completion_tokens;
2143 usage.total_tokens += response.usage.total_tokens;
2144 } else {
2145 usage_total = Some(YamlLlmTokenUsage {
2146 prompt_tokens: response.usage.prompt_tokens,
2147 completion_tokens: response.usage.completion_tokens,
2148 total_tokens: response.usage.total_tokens,
2149 thinking_tokens: None,
2150 });
2151 }
2152
2153 let choice = response
2154 .choices
2155 .first()
2156 .ok_or_else(|| "completion returned no choices".to_string())?;
2157 streamed_content = choice.message.content.clone();
2158 streamed_tool_calls = choice.message.tool_calls.clone();
2159 finish_reason = choice.finish_reason;
2160 }
2161 CompletionOutcome::Stream(mut stream) => {
2162 let mut final_stream_usage: Option<simple_agent_type::response::Usage> =
2163 None;
2164 let mut delta_filter = StructuredJsonDeltaFilter::default();
2165 let include_raw_debug = include_raw_stream_debug_events();
2166 let mut json_text_formatter = if request.stream_json_as_text {
2167 Some(StreamJsonAsTextFormatter::default())
2168 } else {
2169 None
2170 };
2171 let mut tool_calls_by_index: HashMap<u32, ToolCall> = HashMap::new();
2172
2173 while let Some(chunk_result) = stream.next().await {
2174 if event_sink_is_cancelled(event_sink) {
2175 return Err(workflow_event_sink_cancelled_message().to_string());
2176 }
2177
2178 let chunk = chunk_result.map_err(|error| error.to_string())?;
2179 if let Some(usage) = chunk.usage {
2180 final_stream_usage = Some(usage);
2181 }
2182
2183 if let Some(choice) = chunk.choices.first() {
2184 if let Some(chunk_finish_reason) = choice.finish_reason {
2185 finish_reason = chunk_finish_reason;
2186 }
2187
2188 if include_raw_debug {
2189 if let Some(reasoning_delta) =
2190 choice.delta.reasoning_content.as_ref()
2191 {
2192 if let Some(sink) = event_sink {
2193 sink.emit(&YamlWorkflowEvent {
2194 event_type: "node_stream_thinking_delta"
2195 .to_string(),
2196 node_id: Some(request.node_id.clone()),
2197 step_id: Some(request.node_id.clone()),
2198 node_kind: Some("llm_call".to_string()),
2199 streamable: Some(true),
2200 message: None,
2201 delta: Some(reasoning_delta.clone()),
2202 token_kind: Some(
2203 YamlWorkflowTokenKind::Thinking,
2204 ),
2205 is_terminal_node_token: Some(
2206 request.is_terminal_node,
2207 ),
2208 elapsed_ms: None,
2209 metadata: None,
2210 });
2211 }
2212 }
2213 }
2214
2215 if let Some(delta) = choice.delta.content.clone() {
2216 streamed_content.push_str(delta.as_str());
2217 let (output_delta, thinking_delta) = if expects_object {
2218 delta_filter.split(delta.as_str())
2219 } else {
2220 (Some(delta.clone()), None)
2221 };
2222 let rendered_output_delta = if let Some(output_chunk) =
2223 output_delta
2224 {
2225 if let Some(formatter) = json_text_formatter.as_mut() {
2226 formatter.push(output_chunk.as_str());
2227 formatter.emit_if_ready(delta_filter.completed())
2228 } else {
2229 Some(output_chunk)
2230 }
2231 } else {
2232 None
2233 };
2234
2235 if include_raw_debug {
2236 if let Some(sink) = event_sink {
2237 if let Some(raw_thinking_delta) =
2238 thinking_delta.as_ref()
2239 {
2240 sink.emit(&YamlWorkflowEvent {
2241 event_type: "node_stream_thinking_delta"
2242 .to_string(),
2243 node_id: Some(request.node_id.clone()),
2244 step_id: Some(request.node_id.clone()),
2245 node_kind: Some("llm_call".to_string()),
2246 streamable: Some(true),
2247 message: None,
2248 delta: Some(raw_thinking_delta.clone()),
2249 token_kind: Some(
2250 YamlWorkflowTokenKind::Thinking,
2251 ),
2252 is_terminal_node_token: Some(
2253 request.is_terminal_node,
2254 ),
2255 elapsed_ms: None,
2256 metadata: None,
2257 });
2258 }
2259 if let Some(raw_output_delta) =
2260 rendered_output_delta.as_ref()
2261 {
2262 sink.emit(&YamlWorkflowEvent {
2263 event_type: "node_stream_output_delta"
2264 .to_string(),
2265 node_id: Some(request.node_id.clone()),
2266 step_id: Some(request.node_id.clone()),
2267 node_kind: Some("llm_call".to_string()),
2268 streamable: Some(true),
2269 message: None,
2270 delta: Some(raw_output_delta.clone()),
2271 token_kind: Some(
2272 YamlWorkflowTokenKind::Output,
2273 ),
2274 is_terminal_node_token: Some(
2275 request.is_terminal_node,
2276 ),
2277 elapsed_ms: None,
2278 metadata: None,
2279 });
2280 }
2281 }
2282 }
2283
2284 if let Some(filtered_delta) = rendered_output_delta {
2285 if let Some(sink) = event_sink {
2286 sink.emit(&YamlWorkflowEvent {
2287 event_type: "node_stream_delta".to_string(),
2288 node_id: Some(request.node_id.clone()),
2289 step_id: Some(request.node_id.clone()),
2290 node_kind: Some("llm_call".to_string()),
2291 streamable: Some(true),
2292 message: None,
2293 delta: Some(filtered_delta),
2294 token_kind: Some(YamlWorkflowTokenKind::Output),
2295 is_terminal_node_token: Some(
2296 request.is_terminal_node,
2297 ),
2298 elapsed_ms: None,
2299 metadata: None,
2300 });
2301 }
2302 }
2303 }
2304
2305 if let Some(tool_call_deltas) = choice.delta.tool_calls.as_ref()
2306 {
2307 for tool_call_delta in tool_call_deltas {
2308 let entry = tool_calls_by_index
2309 .entry(tool_call_delta.index)
2310 .or_insert_with(|| ToolCall {
2311 id: tool_call_delta.id.clone().unwrap_or_else(
2312 || {
2313 format!(
2314 "tool_call_{}",
2315 tool_call_delta.index
2316 )
2317 },
2318 ),
2319 tool_type: ToolType::Function,
2320 function:
2321 simple_agent_type::tool::ToolCallFunction {
2322 name: String::new(),
2323 arguments: String::new(),
2324 },
2325 });
2326
2327 if let Some(id) = tool_call_delta.id.as_ref() {
2328 entry.id = id.clone();
2329 }
2330 if let Some(tool_type) = tool_call_delta.tool_type {
2331 entry.tool_type = tool_type;
2332 }
2333 if let Some(function_delta) =
2334 tool_call_delta.function.as_ref()
2335 {
2336 if let Some(name) = function_delta.name.as_ref() {
2337 entry.function.name = name.clone();
2338 }
2339 if let Some(arguments) =
2340 function_delta.arguments.as_ref()
2341 {
2342 entry.function.arguments.push_str(arguments);
2343 }
2344 }
2345 }
2346 }
2347 }
2348
2349 if event_sink_is_cancelled(event_sink) {
2350 return Err(workflow_event_sink_cancelled_message().to_string());
2351 }
2352 }
2353
2354 if let Some(usage) = final_stream_usage {
2355 if let Some(total) = usage_total.as_mut() {
2356 total.prompt_tokens += usage.prompt_tokens;
2357 total.completion_tokens += usage.completion_tokens;
2358 total.total_tokens += usage.total_tokens;
2359 } else {
2360 usage_total = Some(YamlLlmTokenUsage {
2361 prompt_tokens: usage.prompt_tokens,
2362 completion_tokens: usage.completion_tokens,
2363 total_tokens: usage.total_tokens,
2364 thinking_tokens: None,
2365 });
2366 }
2367 }
2368
2369 let mut ordered_tool_calls =
2370 tool_calls_by_index.into_iter().collect::<Vec<_>>();
2371 ordered_tool_calls.sort_by_key(|(index, _)| *index);
2372 if !ordered_tool_calls.is_empty() {
2373 streamed_tool_calls = Some(
2374 ordered_tool_calls
2375 .into_iter()
2376 .map(|(_, tool_call)| tool_call)
2377 .collect::<Vec<_>>(),
2378 );
2379 }
2380 }
2381 }
2382
2383 let has_tool_calls = streamed_tool_calls
2384 .as_ref()
2385 .is_some_and(|calls| !calls.is_empty());
2386 if finish_reason != FinishReason::ToolCalls && !has_tool_calls {
2387 let payload = if expects_object {
2388 parse_streamed_structured_payload(
2389 streamed_content.as_str(),
2390 request.heal,
2391 )
2392 .map_err(|error| {
2393 format!("failed to parse structured completion JSON: {error}")
2394 })?
2395 .payload
2396 } else {
2397 Value::String(streamed_content.clone())
2398 };
2399 return Ok(YamlLlmExecutionResult {
2400 payload,
2401 usage: usage_total,
2402 ttft_ms: None,
2403 tool_calls: tool_traces,
2404 });
2405 }
2406
2407 if roundtrip >= request.max_tool_roundtrips {
2408 return Err(format!(
2409 "tool call roundtrip limit reached for node '{}' (max={})",
2410 request.node_id, request.max_tool_roundtrips
2411 ));
2412 }
2413
2414 let tool_calls: Vec<ToolCall> = streamed_tool_calls.ok_or_else(|| {
2415 "finish_reason=tool_calls but no tool calls found".to_string()
2416 })?;
2417 if tool_calls
2418 .iter()
2419 .any(|tool_call| tool_call.function.name.trim().is_empty())
2420 {
2421 return Err("streamed tool call missing function name".to_string());
2422 }
2423
2424 let assistant_tool_message =
2425 Message::assistant(&streamed_content).with_tool_calls(tool_calls.clone());
2426 conversation.push(assistant_tool_message);
2427
2428 for tool_call in tool_calls {
2429 let tool_call_id = tool_call.id.clone();
2430 let tool_name = tool_call.function.name.clone();
2431 let tool_started = Instant::now();
2432 let arguments: Value = serde_json::from_str(&tool_call.function.arguments)
2433 .map_err(|error| {
2434 format!(
2435 "tool '{}' arguments must be valid JSON: {}",
2436 tool_name, error
2437 )
2438 })?;
2439 let mut tool_span_context: Option<TraceContext> = None;
2440 let mut tool_span = if request.trace_sampled {
2441 let (span_context, mut span) = workflow_tracer().start_span(
2442 "workflow.tool.execute",
2443 SpanKind::Node,
2444 request.trace_context.as_ref(),
2445 );
2446 tool_span_context = Some(span_context);
2447 span.set_attribute(
2448 "trace_id",
2449 request.trace_id.as_deref().unwrap_or_default(),
2450 );
2451 span.set_attribute("node_id", request.node_id.as_str());
2452 span.set_attribute("node_kind", "llm_call");
2453 span.set_attribute("tool_name", tool_name.as_str());
2454 span.set_attribute("tool_call_id", tool_call_id.as_str());
2455 let args_for_span =
2456 payload_for_tool_trace(request.tool_trace_mode, &arguments)
2457 .to_string();
2458 span.set_attribute("tool_arguments", args_for_span.as_str());
2459 Some(span)
2460 } else {
2461 None
2462 };
2463
2464 if request.tool_trace_mode != YamlToolTraceMode::Off {
2465 if let Some(sink) = event_sink {
2466 sink.emit(&YamlWorkflowEvent {
2467 event_type: "node_tool_call_requested".to_string(),
2468 node_id: Some(request.node_id.clone()),
2469 step_id: Some(request.node_id.clone()),
2470 node_kind: Some("llm_call".to_string()),
2471 streamable: Some(false),
2472 message: Some(format!(
2473 "tool call requested: {}",
2474 tool_name
2475 )),
2476 delta: None,
2477 token_kind: None,
2478 is_terminal_node_token: None,
2479 elapsed_ms: None,
2480 metadata: Some(json!({
2481 "tool_call_id": tool_call_id.clone(),
2482 "tool_name": tool_name.clone(),
2483 "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2484 })),
2485 });
2486 }
2487 }
2488
2489 let Some(tool_config) = request
2490 .tools
2491 .iter()
2492 .find(|tool| tool.definition.function.name == tool_name)
2493 else {
2494 return Err(format!("model requested unknown tool '{}'", tool_name));
2495 };
2496
2497 let tool_output_result = if tool_name == "run_workflow_graph" {
2498 execute_subworkflow_tool_call(
2499 &arguments,
2500 &request.execution_context,
2501 self.client,
2502 self.custom_worker,
2503 &self.run_options,
2504 tool_span_context.as_ref(),
2505 request.trace_id.as_deref(),
2506 )
2507 .await
2508 } else if let Some(custom_worker) = self.custom_worker {
2509 custom_worker
2510 .execute(
2511 tool_name.as_str(),
2512 &arguments,
2513 request.email_text.as_str(),
2514 &request.execution_context,
2515 )
2516 .await
2517 } else {
2518 Err(format!(
2519 "tool '{}' requested but no custom worker executor is configured",
2520 tool_name
2521 ))
2522 };
2523
2524 let tool_output = match tool_output_result {
2525 Ok(output) => output,
2526 Err(message) => {
2527 let elapsed_ms = tool_started.elapsed().as_millis();
2528 if let Some(span) = tool_span.as_mut() {
2529 span.add_event("workflow.tool.execute.error");
2530 span.set_attribute("tool_status", "error");
2531 span.set_attribute("tool_error", message.as_str());
2532 span.set_attribute(
2533 "elapsed_ms",
2534 elapsed_ms.to_string().as_str(),
2535 );
2536 }
2537 if request.tool_trace_mode != YamlToolTraceMode::Off {
2538 if let Some(sink) = event_sink {
2539 sink.emit(&YamlWorkflowEvent {
2540 event_type: "node_tool_call_failed".to_string(),
2541 node_id: Some(request.node_id.clone()),
2542 step_id: Some(request.node_id.clone()),
2543 node_kind: Some("llm_call".to_string()),
2544 streamable: Some(false),
2545 message: Some(message.clone()),
2546 delta: None,
2547 token_kind: None,
2548 is_terminal_node_token: None,
2549 elapsed_ms: Some(elapsed_ms),
2550 metadata: Some(json!({
2551 "tool_call_id": tool_call_id.clone(),
2552 "tool_name": tool_name.clone(),
2553 })),
2554 });
2555 }
2556 }
2557 tool_traces.push(YamlToolCallTrace {
2558 id: tool_call_id.clone(),
2559 name: tool_name.clone(),
2560 arguments,
2561 output: None,
2562 status: "error".to_string(),
2563 elapsed_ms,
2564 error: Some(message.clone()),
2565 });
2566 if let Some(span) = tool_span.take() {
2567 span.end();
2568 }
2569 return Err(format!("tool '{}' failed: {}", tool_name, message));
2570 }
2571 };
2572
2573 if let Some(output_schema) = tool_config.output_schema.as_ref() {
2574 validate_schema_instance(output_schema, &tool_output).map_err(
2575 |message| {
2576 format!(
2577 "tool '{}' output failed schema validation: {}",
2578 tool_name, message
2579 )
2580 },
2581 )?;
2582 }
2583
2584 let elapsed_ms = tool_started.elapsed().as_millis();
2585 if let Some(span) = tool_span.as_mut() {
2586 span.add_event("workflow.tool.execute.completed");
2587 span.set_attribute("tool_status", "ok");
2588 span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
2589 let output_for_span =
2590 payload_for_tool_trace(request.tool_trace_mode, &tool_output)
2591 .to_string();
2592 span.set_attribute("tool_output", output_for_span.as_str());
2593 }
2594 if request.tool_trace_mode != YamlToolTraceMode::Off {
2595 if let Some(sink) = event_sink {
2596 sink.emit(&YamlWorkflowEvent {
2597 event_type: "node_tool_call_completed".to_string(),
2598 node_id: Some(request.node_id.clone()),
2599 step_id: Some(request.node_id.clone()),
2600 node_kind: Some("llm_call".to_string()),
2601 streamable: Some(false),
2602 message: Some(format!(
2603 "tool call completed: {}",
2604 tool_name
2605 )),
2606 delta: None,
2607 token_kind: None,
2608 is_terminal_node_token: None,
2609 elapsed_ms: Some(elapsed_ms),
2610 metadata: Some(json!({
2611 "tool_call_id": tool_call_id.clone(),
2612 "tool_name": tool_name.clone(),
2613 "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2614 "output": payload_for_tool_trace(request.tool_trace_mode, &tool_output),
2615 })),
2616 });
2617 }
2618 }
2619
2620 tool_traces.push(YamlToolCallTrace {
2621 id: tool_call_id.clone(),
2622 name: tool_name.clone(),
2623 arguments: arguments.clone(),
2624 output: Some(tool_output.clone()),
2625 status: "ok".to_string(),
2626 elapsed_ms,
2627 error: None,
2628 });
2629
2630 conversation.push(Message::tool(
2631 serde_json::to_string(&tool_output).map_err(|error| {
2632 format!("failed to serialize tool output: {error}")
2633 })?,
2634 tool_call_id,
2635 ));
2636 if let Some(span) = tool_span.take() {
2637 span.end();
2638 }
2639 }
2640
2641 if request.tool_trace_mode != YamlToolTraceMode::Off {
2642 if let Some(sink) = event_sink {
2643 sink.emit(&YamlWorkflowEvent {
2644 event_type: "node_tool_roundtrip_completed".to_string(),
2645 node_id: Some(request.node_id.clone()),
2646 step_id: Some(request.node_id.clone()),
2647 node_kind: Some("llm_call".to_string()),
2648 streamable: Some(false),
2649 message: Some(format!(
2650 "tool roundtrip {} completed",
2651 roundtrip + 1
2652 )),
2653 delta: None,
2654 token_kind: None,
2655 is_terminal_node_token: None,
2656 elapsed_ms: None,
2657 metadata: Some(json!({
2658 "roundtrip": roundtrip + 1,
2659 "max_tool_roundtrips": request.max_tool_roundtrips,
2660 })),
2661 });
2662 }
2663 }
2664 }
2665
2666 return Err(format!(
2667 "tool-enabled llm_call '{}' exhausted loop without final payload",
2668 request.node_id
2669 ));
2670 }
2671
2672 let mut builder = CompletionRequest::builder()
2673 .model(&request.model)
2674 .messages(messages);
2675
2676 if request.heal && !request.stream && expects_object {
2677 builder = builder.json_schema("workflow_step", request.schema.clone());
2678 }
2679
2680 if request.stream {
2681 builder = builder.stream(true);
2682 }
2683
2684 let completion_request = builder
2685 .build()
2686 .map_err(|error| format!("failed to build completion request: {error}"))?;
2687
2688 let completion_options = if request.heal && !request.stream && expects_object {
2689 CompletionOptions {
2690 mode: CompletionMode::HealedJson,
2691 }
2692 } else {
2693 CompletionOptions::default()
2694 };
2695
2696 let outcome = self
2697 .client
2698 .complete(&completion_request, completion_options)
2699 .await
2700 .map_err(|error| error.to_string())?;
2701
2702 match outcome {
2703 CompletionOutcome::Stream(mut stream) => {
2704 let mut aggregated = String::new();
2705 let mut final_stream_usage: Option<simple_agent_type::response::Usage> = None;
2706 let stream_started = Instant::now();
2707 let mut ttft_ms: Option<u128> = None;
2708 let mut delta_filter = StructuredJsonDeltaFilter::default();
2709 let include_raw_debug = include_raw_stream_debug_events();
2710 let mut json_text_formatter = if request.stream_json_as_text {
2711 Some(StreamJsonAsTextFormatter::default())
2712 } else {
2713 None
2714 };
2715 while let Some(chunk_result) = stream.next().await {
2716 if event_sink_is_cancelled(event_sink) {
2717 return Err(workflow_event_sink_cancelled_message().to_string());
2718 }
2719 let chunk = chunk_result.map_err(|error| error.to_string())?;
2720 if let Some(usage) = chunk.usage {
2721 final_stream_usage = Some(usage);
2722 }
2723 if let Some(choice) = chunk.choices.first() {
2724 if ttft_ms.is_none()
2725 && (choice
2726 .delta
2727 .content
2728 .as_ref()
2729 .is_some_and(|delta| !delta.is_empty())
2730 || choice
2731 .delta
2732 .reasoning_content
2733 .as_ref()
2734 .is_some_and(|delta| !delta.is_empty()))
2735 {
2736 ttft_ms = Some(stream_started.elapsed().as_millis());
2737 }
2738 if include_raw_debug {
2739 if let Some(reasoning_delta) =
2740 choice.delta.reasoning_content.as_ref()
2741 {
2742 if let Some(sink) = event_sink {
2743 sink.emit(&YamlWorkflowEvent {
2744 event_type: "node_stream_thinking_delta".to_string(),
2745 node_id: Some(request.node_id.clone()),
2746 step_id: Some(request.node_id.clone()),
2747 node_kind: Some("llm_call".to_string()),
2748 streamable: Some(true),
2749 message: None,
2750 delta: Some(reasoning_delta.clone()),
2751 token_kind: Some(YamlWorkflowTokenKind::Thinking),
2752 is_terminal_node_token: Some(request.is_terminal_node),
2753 elapsed_ms: None,
2754 metadata: None,
2755 });
2756 }
2757 }
2758 }
2759 if let Some(delta) = choice.delta.content.clone() {
2760 aggregated.push_str(delta.as_str());
2761 let (output_delta, thinking_delta) = if expects_object {
2762 delta_filter.split(delta.as_str())
2763 } else {
2764 (Some(delta.clone()), None)
2765 };
2766 let rendered_output_delta = if let Some(output_chunk) = output_delta
2767 {
2768 if let Some(formatter) = json_text_formatter.as_mut() {
2769 formatter.push(output_chunk.as_str());
2770 formatter.emit_if_ready(delta_filter.completed())
2771 } else {
2772 Some(output_chunk)
2773 }
2774 } else {
2775 None
2776 };
2777 if include_raw_debug {
2778 if let Some(sink) = event_sink {
2779 if let Some(raw_thinking_delta) = thinking_delta.as_ref() {
2780 sink.emit(&YamlWorkflowEvent {
2781 event_type: "node_stream_thinking_delta"
2782 .to_string(),
2783 node_id: Some(request.node_id.clone()),
2784 step_id: Some(request.node_id.clone()),
2785 node_kind: Some("llm_call".to_string()),
2786 streamable: Some(true),
2787 message: None,
2788 delta: Some(raw_thinking_delta.clone()),
2789 token_kind: Some(YamlWorkflowTokenKind::Thinking),
2790 is_terminal_node_token: Some(
2791 request.is_terminal_node,
2792 ),
2793 elapsed_ms: None,
2794 metadata: None,
2795 });
2796 }
2797 if let Some(raw_output_delta) =
2798 rendered_output_delta.as_ref()
2799 {
2800 sink.emit(&YamlWorkflowEvent {
2801 event_type: "node_stream_output_delta".to_string(),
2802 node_id: Some(request.node_id.clone()),
2803 step_id: Some(request.node_id.clone()),
2804 node_kind: Some("llm_call".to_string()),
2805 streamable: Some(true),
2806 message: None,
2807 delta: Some(raw_output_delta.clone()),
2808 token_kind: Some(YamlWorkflowTokenKind::Output),
2809 is_terminal_node_token: Some(
2810 request.is_terminal_node,
2811 ),
2812 elapsed_ms: None,
2813 metadata: None,
2814 });
2815 }
2816 }
2817 }
2818 if let Some(filtered_delta) = rendered_output_delta {
2819 if let Some(sink) = event_sink {
2820 sink.emit(&YamlWorkflowEvent {
2821 event_type: "node_stream_delta".to_string(),
2822 node_id: Some(request.node_id.clone()),
2823 step_id: Some(request.node_id.clone()),
2824 node_kind: Some("llm_call".to_string()),
2825 streamable: Some(true),
2826 message: None,
2827 delta: Some(filtered_delta),
2828 token_kind: Some(YamlWorkflowTokenKind::Output),
2829 is_terminal_node_token: Some(request.is_terminal_node),
2830 elapsed_ms: None,
2831 metadata: None,
2832 });
2833 }
2834 }
2835 }
2836 }
2837
2838 if event_sink_is_cancelled(event_sink) {
2839 return Err(workflow_event_sink_cancelled_message().to_string());
2840 }
2841 }
2842
2843 let payload = if expects_object {
2844 let resolved =
2845 parse_streamed_structured_payload(aggregated.as_str(), request.heal)?;
2846 if let Some(confidence) = resolved.heal_confidence {
2847 if let Some(sink) = event_sink {
2848 sink.emit(&YamlWorkflowEvent {
2849 event_type: "node_healed".to_string(),
2850 node_id: Some(request.node_id.clone()),
2851 step_id: Some(request.node_id.clone()),
2852 node_kind: Some("llm_call".to_string()),
2853 streamable: Some(true),
2854 message: Some(format!(
2855 "healed streamed structured response confidence={confidence}"
2856 )),
2857 delta: None,
2858 token_kind: None,
2859 is_terminal_node_token: None,
2860 elapsed_ms: None,
2861 metadata: None,
2862 });
2863 }
2864 }
2865 resolved.payload
2866 } else {
2867 Value::String(aggregated)
2868 };
2869
2870 Ok(YamlLlmExecutionResult {
2871 payload,
2872 usage: final_stream_usage.map(|usage| YamlLlmTokenUsage {
2873 prompt_tokens: usage.prompt_tokens,
2874 completion_tokens: usage.completion_tokens,
2875 total_tokens: usage.total_tokens,
2876 thinking_tokens: None,
2877 }),
2878 ttft_ms,
2879 tool_calls: Vec::new(),
2880 })
2881 }
2882 CompletionOutcome::Response(response) => {
2883 let payload = if expects_object {
2884 let content = response
2885 .content()
2886 .ok_or_else(|| "completion returned empty content".to_string())?;
2887 serde_json::from_str(content).map_err(|error| {
2888 format!("failed to parse structured completion JSON: {error}")
2889 })?
2890 } else {
2891 Value::String(response.content().unwrap_or_default().to_string())
2892 };
2893
2894 Ok(YamlLlmExecutionResult {
2895 payload,
2896 usage: Some(YamlLlmTokenUsage {
2897 prompt_tokens: response.usage.prompt_tokens,
2898 completion_tokens: response.usage.completion_tokens,
2899 total_tokens: response.usage.total_tokens,
2900 thinking_tokens: None,
2901 }),
2902 ttft_ms: None,
2903 tool_calls: Vec::new(),
2904 })
2905 }
2906 CompletionOutcome::HealedJson(healed) => {
2907 if !expects_object {
2908 return Err(
2909 "healed json outcome is unsupported for non-object schema".to_string()
2910 );
2911 }
2912 if let Some(sink) = event_sink {
2913 sink.emit(&YamlWorkflowEvent {
2914 event_type: "node_healed".to_string(),
2915 node_id: Some(request.node_id.clone()),
2916 step_id: Some(request.node_id.clone()),
2917 node_kind: Some("llm_call".to_string()),
2918 streamable: Some(request.stream),
2919 message: Some(format!(
2920 "healed structured response confidence={}",
2921 healed.parsed.confidence
2922 )),
2923 delta: None,
2924 token_kind: None,
2925 is_terminal_node_token: None,
2926 elapsed_ms: None,
2927 metadata: None,
2928 });
2929 }
2930 Ok(YamlLlmExecutionResult {
2931 payload: healed.parsed.value,
2932 usage: Some(YamlLlmTokenUsage {
2933 prompt_tokens: healed.response.usage.prompt_tokens,
2934 completion_tokens: healed.response.usage.completion_tokens,
2935 total_tokens: healed.response.usage.total_tokens,
2936 thinking_tokens: None,
2937 }),
2938 ttft_ms: None,
2939 tool_calls: Vec::new(),
2940 })
2941 }
2942 CompletionOutcome::CoercedSchema(coerced) => {
2943 if !expects_object {
2944 return Err(
2945 "coerced schema outcome is unsupported for non-object schema"
2946 .to_string(),
2947 );
2948 }
2949 Ok(YamlLlmExecutionResult {
2950 payload: coerced.coerced.value,
2951 usage: Some(YamlLlmTokenUsage {
2952 prompt_tokens: coerced.response.usage.prompt_tokens,
2953 completion_tokens: coerced.response.usage.completion_tokens,
2954 total_tokens: coerced.response.usage.total_tokens,
2955 thinking_tokens: None,
2956 }),
2957 ttft_ms: None,
2958 tool_calls: Vec::new(),
2959 })
2960 }
2961 }
2962 }
2963 }
2964
2965 let executor = BorrowedClientExecutor {
2966 client,
2967 custom_worker,
2968 run_options: options.clone(),
2969 };
2970 run_workflow_yaml_with_custom_worker_and_events_and_options(
2971 workflow,
2972 workflow_input,
2973 &executor,
2974 custom_worker,
2975 event_sink,
2976 options,
2977 )
2978 .await
2979}
2980
2981pub async fn run_email_workflow_yaml_with_client_and_custom_worker_and_events(
2982 workflow: &YamlWorkflow,
2983 email_text: &str,
2984 client: &SimpleAgentsClient,
2985 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2986 event_sink: Option<&dyn YamlWorkflowEventSink>,
2987) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2988 let workflow_input = json!({ "email_text": email_text });
2989 run_workflow_yaml_with_client_and_custom_worker_and_events(
2990 workflow,
2991 &workflow_input,
2992 client,
2993 custom_worker,
2994 event_sink,
2995 )
2996 .await
2997}
2998
2999pub async fn run_workflow_yaml(
3000 workflow: &YamlWorkflow,
3001 workflow_input: &Value,
3002 executor: &dyn YamlWorkflowLlmExecutor,
3003) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3004 run_workflow_yaml_with_custom_worker_and_events(workflow, workflow_input, executor, None, None)
3005 .await
3006}
3007
3008pub async fn run_email_workflow_yaml(
3009 workflow: &YamlWorkflow,
3010 email_text: &str,
3011 executor: &dyn YamlWorkflowLlmExecutor,
3012) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3013 let workflow_input = json!({ "email_text": email_text });
3014 run_workflow_yaml(workflow, &workflow_input, executor).await
3015}
3016
3017pub async fn run_workflow_yaml_with_custom_worker(
3018 workflow: &YamlWorkflow,
3019 workflow_input: &Value,
3020 executor: &dyn YamlWorkflowLlmExecutor,
3021 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3022) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3023 run_workflow_yaml_with_custom_worker_and_events(
3024 workflow,
3025 workflow_input,
3026 executor,
3027 custom_worker,
3028 None,
3029 )
3030 .await
3031}
3032
3033pub async fn run_email_workflow_yaml_with_custom_worker(
3034 workflow: &YamlWorkflow,
3035 email_text: &str,
3036 executor: &dyn YamlWorkflowLlmExecutor,
3037 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3038) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3039 let workflow_input = json!({ "email_text": email_text });
3040 run_workflow_yaml_with_custom_worker(workflow, &workflow_input, executor, custom_worker).await
3041}
3042
3043pub async fn run_workflow_yaml_with_custom_worker_and_events(
3044 workflow: &YamlWorkflow,
3045 workflow_input: &Value,
3046 executor: &dyn YamlWorkflowLlmExecutor,
3047 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3048 event_sink: Option<&dyn YamlWorkflowEventSink>,
3049) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3050 run_workflow_yaml_with_custom_worker_and_events_and_options(
3051 workflow,
3052 workflow_input,
3053 executor,
3054 custom_worker,
3055 event_sink,
3056 &YamlWorkflowRunOptions::default(),
3057 )
3058 .await
3059}
3060
3061pub async fn run_workflow_yaml_with_custom_worker_and_events_and_options(
3062 workflow: &YamlWorkflow,
3063 workflow_input: &Value,
3064 executor: &dyn YamlWorkflowLlmExecutor,
3065 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3066 event_sink: Option<&dyn YamlWorkflowEventSink>,
3067 options: &YamlWorkflowRunOptions,
3068) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3069 if !workflow_input.is_object() {
3070 return Err(YamlWorkflowRunError::InvalidInput {
3071 message: "workflow input must be a JSON object".to_string(),
3072 });
3073 }
3074
3075 validate_sample_rate(options.telemetry.sample_rate)?;
3076
3077 let email_text = workflow_input
3078 .get("email_text")
3079 .and_then(Value::as_str)
3080 .unwrap_or_default();
3081
3082 let diagnostics = verify_yaml_workflow(workflow);
3083 let errors: Vec<YamlWorkflowDiagnostic> = diagnostics
3084 .iter()
3085 .filter(|d| d.severity == YamlWorkflowDiagnosticSeverity::Error)
3086 .cloned()
3087 .collect();
3088 if !errors.is_empty() {
3089 return Err(YamlWorkflowRunError::Validation {
3090 diagnostics_count: errors.len(),
3091 diagnostics: errors,
3092 });
3093 }
3094
3095 if let Some(output) =
3096 try_run_yaml_via_ir_runtime(workflow, workflow_input, executor, custom_worker, options)
3097 .await?
3098 {
3099 return Ok(output);
3100 }
3101
3102 let parent_trace_context = trace_context_from_options(options);
3103 let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3104
3105 let tracer = workflow_tracer();
3106 let mut workflow_span_context: Option<TraceContext> = None;
3107 let mut workflow_span = if telemetry_context.sampled {
3108 let (span_context, mut span) = tracer.start_span(
3109 "workflow.run",
3110 SpanKind::Workflow,
3111 parent_trace_context.as_ref(),
3112 );
3113 if let Some(trace_id_value) = telemetry_context.trace_id.as_deref() {
3114 span.set_attribute("trace_id", trace_id_value);
3115 }
3116 apply_trace_tenant_attributes(span.as_mut(), options);
3117 workflow_span_context = Some(span_context);
3118 Some(span)
3119 } else {
3120 None
3121 };
3122
3123 if workflow.nodes.is_empty() {
3124 return Err(YamlWorkflowRunError::EmptyNodes {
3125 workflow_id: workflow.id.clone(),
3126 });
3127 }
3128
3129 let node_map: HashMap<&str, &YamlNode> = workflow
3130 .nodes
3131 .iter()
3132 .map(|node| (node.id.as_str(), node))
3133 .collect();
3134 if !node_map.contains_key(workflow.entry_node.as_str()) {
3135 return Err(YamlWorkflowRunError::MissingEntry {
3136 entry_node: workflow.entry_node.clone(),
3137 });
3138 }
3139
3140 let edge_map: HashMap<&str, &str> = workflow
3141 .edges
3142 .iter()
3143 .map(|edge| (edge.from.as_str(), edge.to.as_str()))
3144 .collect();
3145
3146 let mut current = workflow.entry_node.clone();
3147 let mut trace = Vec::new();
3148 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
3149 let mut globals = serde_json::Map::new();
3150 let mut step_timings = Vec::new();
3151 let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
3152 let mut token_totals = YamlTokenTotals::default();
3153 let mut workflow_ttft_ms: Option<u128> = None;
3154 let started = Instant::now();
3155
3156 if let Some(sink) = event_sink {
3157 sink.emit(&YamlWorkflowEvent {
3158 event_type: "workflow_started".to_string(),
3159 node_id: None,
3160 step_id: None,
3161 node_kind: None,
3162 streamable: None,
3163 message: Some(format!("workflow_id={}", workflow.id)),
3164 delta: None,
3165 token_kind: None,
3166 is_terminal_node_token: None,
3167 elapsed_ms: Some(0),
3168 metadata: None,
3169 });
3170 }
3171
3172 if event_sink_is_cancelled(event_sink) {
3173 return Err(YamlWorkflowRunError::EventSinkCancelled {
3174 message: workflow_event_sink_cancelled_message().to_string(),
3175 });
3176 }
3177
3178 loop {
3179 if event_sink_is_cancelled(event_sink) {
3180 return Err(YamlWorkflowRunError::EventSinkCancelled {
3181 message: workflow_event_sink_cancelled_message().to_string(),
3182 });
3183 }
3184
3185 let node =
3186 *node_map
3187 .get(current.as_str())
3188 .ok_or_else(|| YamlWorkflowRunError::MissingNode {
3189 node_id: current.clone(),
3190 })?;
3191
3192 trace.push(node.id.clone());
3193 let step_started = Instant::now();
3194
3195 let mut node_span_context: Option<TraceContext> = None;
3196 let mut node_span = if telemetry_context.sampled {
3197 let (span_context, mut span) = tracer.start_span(
3198 "workflow.node.execute",
3199 SpanKind::Node,
3200 workflow_span_context.as_ref(),
3201 );
3202 node_span_context = Some(span_context);
3203 span.set_attribute(
3204 "trace_id",
3205 telemetry_context.trace_id.as_deref().unwrap_or_default(),
3206 );
3207 span.set_attribute("node_id", node.id.as_str());
3208 span.set_attribute("node_kind", node.kind_name());
3209 Some(span)
3210 } else {
3211 None
3212 };
3213
3214 let node_streamable = node
3215 .node_type
3216 .llm_call
3217 .as_ref()
3218 .map(|llm| llm.stream.unwrap_or(false) && !llm.heal.unwrap_or(false));
3219 let workflow_elapsed_before_node_ms = started.elapsed().as_millis();
3220
3221 if let Some(sink) = event_sink {
3222 sink.emit(&YamlWorkflowEvent {
3223 event_type: "node_started".to_string(),
3224 node_id: Some(node.id.clone()),
3225 step_id: Some(node.id.clone()),
3226 node_kind: Some(node.kind_name().to_string()),
3227 streamable: node_streamable,
3228 message: if node_streamable == Some(false) {
3229 Some("Node is not streamable; status events only".to_string())
3230 } else {
3231 None
3232 },
3233 delta: None,
3234 token_kind: None,
3235 is_terminal_node_token: None,
3236 elapsed_ms: Some(workflow_elapsed_before_node_ms),
3237 metadata: None,
3238 });
3239 }
3240
3241 if event_sink_is_cancelled(event_sink) {
3242 return Err(YamlWorkflowRunError::EventSinkCancelled {
3243 message: workflow_event_sink_cancelled_message().to_string(),
3244 });
3245 }
3246
3247 let mut node_usage: Option<YamlLlmTokenUsage> = None;
3248 let is_terminal_node = !edge_map.contains_key(node.id.as_str());
3249 let next = if let Some(llm) = &node.node_type.llm_call {
3250 let prompt_template = node
3251 .config
3252 .as_ref()
3253 .and_then(|cfg| cfg.prompt.as_deref())
3254 .unwrap_or_default();
3255 let context = json!({
3256 "input": workflow_input,
3257 "nodes": outputs,
3258 "globals": Value::Object(globals.clone())
3259 });
3260 let messages = if let Some(path) = llm.messages_path.as_deref() {
3261 Some(
3262 parse_messages_from_context(path, &context).map_err(|message| {
3263 YamlWorkflowRunError::Llm {
3264 node_id: node.id.clone(),
3265 message,
3266 }
3267 })?,
3268 )
3269 } else {
3270 None
3271 };
3272 let prompt_bindings = collect_template_bindings(prompt_template, &context);
3273 let prompt = interpolate_template(prompt_template, &context);
3274 let schema = llm_output_schema_for_node(node);
3275
3276 let request = YamlLlmExecutionRequest {
3277 node_id: node.id.clone(),
3278 is_terminal_node,
3279 stream_json_as_text: llm.stream_json_as_text.unwrap_or(false),
3280 model: options
3281 .model
3282 .as_ref()
3283 .and_then(|model| {
3284 let trimmed = model.trim();
3285 if trimmed.is_empty() {
3286 None
3287 } else {
3288 Some(trimmed.to_string())
3289 }
3290 })
3291 .unwrap_or_else(|| llm.model.clone()),
3292 messages,
3293 append_prompt_as_user: llm.append_prompt_as_user.unwrap_or(true),
3294 prompt,
3295 prompt_template: prompt_template.to_string(),
3296 prompt_bindings,
3297 schema,
3298 stream: llm.stream.unwrap_or(false),
3299 heal: llm.heal.unwrap_or(false),
3300 tools: normalize_llm_tools(llm).map_err(|message| YamlWorkflowRunError::Llm {
3301 node_id: node.id.clone(),
3302 message,
3303 })?,
3304 tool_choice: normalize_tool_choice(llm.tool_choice.clone()).map_err(|message| {
3305 YamlWorkflowRunError::Llm {
3306 node_id: node.id.clone(),
3307 message,
3308 }
3309 })?,
3310 max_tool_roundtrips: llm.max_tool_roundtrips.unwrap_or(1),
3311 tool_calls_global_key: llm.tool_calls_global_key.clone(),
3312 tool_trace_mode: options.telemetry.tool_trace_mode,
3313 execution_context: context.clone(),
3314 email_text: email_text.to_string(),
3315 trace_id: telemetry_context.trace_id.clone(),
3316 trace_context: node_span_context.clone(),
3317 trace_sampled: telemetry_context.sampled,
3318 };
3319
3320 if let Some(span) = node_span.as_mut() {
3321 span.set_attribute(
3322 "node_input",
3323 payload_for_span(options.telemetry.payload_mode, &context).as_str(),
3324 );
3325 }
3326
3327 if let Some(sink) = event_sink {
3328 sink.emit(&YamlWorkflowEvent {
3329 event_type: "node_llm_input_resolved".to_string(),
3330 node_id: Some(node.id.clone()),
3331 step_id: Some(node.id.clone()),
3332 node_kind: Some("llm_call".to_string()),
3333 streamable: Some(request.stream),
3334 message: Some("resolved llm input for telemetry".to_string()),
3335 delta: None,
3336 token_kind: None,
3337 is_terminal_node_token: None,
3338 elapsed_ms: Some(started.elapsed().as_millis()),
3339 metadata: Some(json!({
3340 "model": request.model.clone(),
3341 "stream_requested": request.stream,
3342 "stream_json_as_text": request.stream_json_as_text,
3343 "heal_requested": request.heal,
3344 "effective_stream": request.stream,
3345 "prompt_template": request.prompt_template.clone(),
3346 "prompt": request.prompt.clone(),
3347 "schema": request.schema.clone(),
3348 "bindings": request.prompt_bindings.clone(),
3349 "tools_count": request.tools.len(),
3350 "max_tool_roundtrips": request.max_tool_roundtrips,
3351 })),
3352 });
3353 }
3354
3355 if event_sink_is_cancelled(event_sink) {
3356 return Err(YamlWorkflowRunError::EventSinkCancelled {
3357 message: workflow_event_sink_cancelled_message().to_string(),
3358 });
3359 }
3360
3361 let llm_result = executor
3362 .complete_structured(request, event_sink)
3363 .await
3364 .map_err(|message| YamlWorkflowRunError::Llm {
3365 node_id: node.id.clone(),
3366 message,
3367 })?;
3368
3369 if let Some(usage) = llm_result.usage.as_ref() {
3370 token_totals.add_usage(usage);
3371 }
3372 if workflow_ttft_ms.is_none() {
3373 workflow_ttft_ms = llm_result
3374 .ttft_ms
3375 .map(|node_ttft_ms| workflow_elapsed_before_node_ms + node_ttft_ms);
3376 }
3377 node_usage = llm_result.usage;
3378
3379 let payload = llm_result.payload;
3380 let tool_calls = llm_result.tool_calls;
3381
3382 let mut node_output = json!({ "output": payload });
3383 if !tool_calls.is_empty() {
3384 if let Some(output_obj) = node_output.as_object_mut() {
3385 output_obj.insert("tool_calls".to_string(), json!(tool_calls));
3386 }
3387 }
3388 outputs.insert(node.id.clone(), node_output);
3389 if let Some(span) = node_span.as_mut() {
3390 if let Some(output_payload) = outputs.get(node.id.as_str()) {
3391 span.set_attribute(
3392 "node_output",
3393 payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
3394 );
3395 }
3396 }
3397 apply_set_globals(node, &outputs, workflow_input, &mut globals);
3398 apply_update_globals(node, &outputs, workflow_input, &mut globals);
3399 if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
3400 if let Some(node_tool_calls) = outputs
3401 .get(node.id.as_str())
3402 .and_then(|value| value.get("tool_calls"))
3403 .cloned()
3404 {
3405 globals.insert(global_key.clone(), node_tool_calls);
3406 }
3407 }
3408 edge_map
3409 .get(node.id.as_str())
3410 .map(|value| value.to_string())
3411 } else if let Some(switch) = &node.node_type.switch {
3412 let context = json!({
3413 "input": workflow_input,
3414 "nodes": outputs,
3415 "globals": Value::Object(globals.clone())
3416 });
3417 let mut chosen = Some(switch.default.clone());
3418 for branch in &switch.branches {
3419 if evaluate_switch_condition(branch.condition.as_str(), &context)? {
3420 chosen = Some(branch.target.clone());
3421 break;
3422 }
3423 }
3424 let chosen = chosen.ok_or_else(|| YamlWorkflowRunError::InvalidSwitchTarget {
3425 node_id: node.id.clone(),
3426 })?;
3427 Some(chosen)
3428 } else if let Some(custom) = &node.node_type.custom_worker {
3429 let payload = node
3430 .config
3431 .as_ref()
3432 .and_then(|cfg| cfg.payload.as_ref())
3433 .cloned()
3434 .unwrap_or_else(|| json!({}));
3435 let context = json!({
3436 "input": workflow_input,
3437 "nodes": outputs,
3438 "globals": Value::Object(globals.clone())
3439 });
3440
3441 if let Some(span) = node_span.as_mut() {
3442 span.set_attribute("handler_name", custom.handler.as_str());
3443 span.set_attribute(
3444 "node_input",
3445 payload_for_span(options.telemetry.payload_mode, &payload).as_str(),
3446 );
3447 }
3448
3449 let mut handler_span_context: Option<TraceContext> = None;
3450 let mut handler_span = if telemetry_context.sampled {
3451 let (span_context, mut span) = tracer.start_span(
3452 "handler.invoke",
3453 SpanKind::Node,
3454 workflow_span_context.as_ref(),
3455 );
3456 handler_span_context = Some(span_context);
3457 span.set_attribute(
3458 "trace_id",
3459 telemetry_context.trace_id.as_deref().unwrap_or_default(),
3460 );
3461 span.set_attribute("handler_name", custom.handler.as_str());
3462 apply_trace_tenant_attributes(span.as_mut(), options);
3463 Some(span)
3464 } else {
3465 None
3466 };
3467
3468 let worker_trace_context = merged_trace_context_for_worker(
3469 handler_span_context.as_ref(),
3470 telemetry_context.trace_id.as_deref(),
3471 options,
3472 );
3473 let worker_context = custom_worker_context_with_trace(
3474 &context,
3475 &worker_trace_context,
3476 &options.trace.tenant,
3477 );
3478
3479 let worker_output_result = if let Some(custom_worker_executor) = custom_worker {
3480 custom_worker_executor
3481 .execute(
3482 custom.handler.as_str(),
3483 &payload,
3484 email_text,
3485 &worker_context,
3486 )
3487 .await
3488 .map_err(|message| YamlWorkflowRunError::CustomWorker {
3489 node_id: node.id.clone(),
3490 message,
3491 })
3492 } else {
3493 mock_custom_worker_output(custom.handler.as_str(), &payload)
3494 };
3495
3496 if let Some(span) = handler_span.take() {
3497 span.end();
3498 }
3499
3500 let worker_output = worker_output_result?;
3501
3502 outputs.insert(node.id.clone(), json!({ "output": worker_output }));
3503 if let Some(span) = node_span.as_mut() {
3504 if let Some(output_payload) = outputs.get(node.id.as_str()) {
3505 span.set_attribute(
3506 "node_output",
3507 payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
3508 );
3509 }
3510 }
3511 apply_set_globals(node, &outputs, workflow_input, &mut globals);
3512 apply_update_globals(node, &outputs, workflow_input, &mut globals);
3513 edge_map
3514 .get(node.id.as_str())
3515 .map(|value| value.to_string())
3516 } else {
3517 return Err(YamlWorkflowRunError::UnsupportedNodeType {
3518 node_id: node.id.clone(),
3519 });
3520 };
3521
3522 let node_kind = node.kind_name().to_string();
3523 let elapsed_ms = step_started.elapsed().as_millis();
3524 step_timings.push(YamlStepTiming {
3525 node_id: node.id.clone(),
3526 node_kind,
3527 elapsed_ms,
3528 prompt_tokens: node_usage.as_ref().map(|usage| usage.prompt_tokens),
3529 completion_tokens: node_usage.as_ref().map(|usage| usage.completion_tokens),
3530 total_tokens: node_usage.as_ref().map(|usage| usage.total_tokens),
3531 thinking_tokens: node_usage.as_ref().and_then(|usage| usage.thinking_tokens),
3532 tokens_per_second: node_usage
3533 .as_ref()
3534 .map(|usage| completion_tokens_per_second(usage.completion_tokens, elapsed_ms)),
3535 });
3536
3537 if let Some(usage) = node_usage.as_ref() {
3538 llm_node_metrics.insert(
3539 node.id.clone(),
3540 YamlLlmNodeMetrics {
3541 elapsed_ms,
3542 prompt_tokens: usage.prompt_tokens,
3543 completion_tokens: usage.completion_tokens,
3544 total_tokens: usage.total_tokens,
3545 thinking_tokens: usage.thinking_tokens,
3546 tokens_per_second: completion_tokens_per_second(
3547 usage.completion_tokens,
3548 elapsed_ms,
3549 ),
3550 },
3551 );
3552 }
3553
3554 if let Some(mut span) = node_span.take() {
3555 span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
3556 span.add_event("node_completed");
3557 span.end();
3558 }
3559
3560 if let Some(sink) = event_sink {
3561 sink.emit(&YamlWorkflowEvent {
3562 event_type: "node_completed".to_string(),
3563 node_id: Some(node.id.clone()),
3564 step_id: Some(node.id.clone()),
3565 node_kind: Some(node.kind_name().to_string()),
3566 streamable: node_streamable,
3567 message: None,
3568 delta: None,
3569 token_kind: None,
3570 is_terminal_node_token: None,
3571 elapsed_ms: Some(elapsed_ms),
3572 metadata: None,
3573 });
3574 }
3575
3576 if event_sink_is_cancelled(event_sink) {
3577 return Err(YamlWorkflowRunError::EventSinkCancelled {
3578 message: workflow_event_sink_cancelled_message().to_string(),
3579 });
3580 }
3581
3582 if let Some(next) = next {
3583 current = next;
3584 continue;
3585 }
3586 break;
3587 }
3588
3589 let terminal_node = trace
3590 .last()
3591 .cloned()
3592 .ok_or_else(|| YamlWorkflowRunError::EmptyNodes {
3593 workflow_id: workflow.id.clone(),
3594 })?;
3595
3596 let terminal_output = outputs
3597 .get(terminal_node.as_str())
3598 .and_then(|value| value.get("output"))
3599 .cloned();
3600
3601 let total_elapsed_ms = started.elapsed().as_millis();
3602 let output = YamlWorkflowRunOutput {
3603 workflow_id: workflow.id.clone(),
3604 entry_node: workflow.entry_node.clone(),
3605 email_text: email_text.to_string(),
3606 trace,
3607 outputs,
3608 terminal_node,
3609 terminal_output,
3610 step_timings,
3611 llm_node_metrics,
3612 total_elapsed_ms,
3613 ttft_ms: workflow_ttft_ms,
3614 total_input_tokens: token_totals.input_tokens,
3615 total_output_tokens: token_totals.output_tokens,
3616 total_tokens: token_totals.total_tokens,
3617 total_thinking_tokens: token_totals.thinking_tokens,
3618 tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
3619 trace_id: telemetry_context.trace_id.clone(),
3620 metadata: telemetry_context.trace_id.as_ref().map(|value| {
3621 workflow_metadata_with_trace(
3622 options,
3623 value,
3624 telemetry_context.sampled,
3625 telemetry_context.trace_id_source,
3626 )
3627 }),
3628 };
3629
3630 if let Some(sink) = event_sink {
3631 let event_metadata = if options.telemetry.nerdstats {
3632 Some(json!({
3633 "nerdstats": workflow_nerdstats(&output),
3634 }))
3635 } else {
3636 None
3637 };
3638 sink.emit(&YamlWorkflowEvent {
3639 event_type: "workflow_completed".to_string(),
3640 node_id: None,
3641 step_id: None,
3642 node_kind: None,
3643 streamable: None,
3644 message: Some(format!("terminal_node={}", output.terminal_node)),
3645 delta: None,
3646 token_kind: None,
3647 is_terminal_node_token: None,
3648 elapsed_ms: Some(output.total_elapsed_ms),
3649 metadata: event_metadata,
3650 });
3651 }
3652
3653 if event_sink_is_cancelled(event_sink) {
3654 return Err(YamlWorkflowRunError::EventSinkCancelled {
3655 message: workflow_event_sink_cancelled_message().to_string(),
3656 });
3657 }
3658
3659 if let Some(mut span) = workflow_span.take() {
3660 span.set_attribute("workflow_id", workflow.id.as_str());
3661 if let Some(trace_id_value) = telemetry_context.trace_id.as_ref() {
3662 span.set_attribute("trace_id", trace_id_value.as_str());
3663 }
3664 span.end();
3665 flush_workflow_tracer();
3666 }
3667
3668 Ok(output)
3669}
3670
3671async fn try_run_yaml_via_ir_runtime(
3672 workflow: &YamlWorkflow,
3673 workflow_input: &Value,
3674 executor: &dyn YamlWorkflowLlmExecutor,
3675 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3676 options: &YamlWorkflowRunOptions,
3677) -> Result<Option<YamlWorkflowRunOutput>, YamlWorkflowRunError> {
3678 let ir = match yaml_workflow_to_ir(workflow) {
3679 Ok(def) => def,
3680 Err(YamlToIrError::UnsupportedNode { .. })
3681 | Err(YamlToIrError::MultipleOutgoingEdge { .. }) => return Ok(None),
3682 Err(err) => {
3683 return Err(YamlWorkflowRunError::InvalidInput {
3684 message: err.to_string(),
3685 });
3686 }
3687 };
3688
3689 let parent_trace_context = trace_context_from_options(options);
3690 let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3691
3692 let tracer = workflow_tracer();
3693 let mut workflow_span_context: Option<TraceContext> = None;
3694 let mut workflow_span = if telemetry_context.sampled {
3695 let (span_context, mut span) = tracer.start_span(
3696 "workflow.run",
3697 SpanKind::Workflow,
3698 parent_trace_context.as_ref(),
3699 );
3700 if let Some(trace_id_value) = telemetry_context.trace_id.as_deref() {
3701 span.set_attribute("trace_id", trace_id_value);
3702 }
3703 apply_trace_tenant_attributes(span.as_mut(), options);
3704 workflow_span_context = Some(span_context);
3705 Some(span)
3706 } else {
3707 None
3708 };
3709
3710 struct NoopLlm;
3711 #[async_trait]
3712 impl LlmExecutor for NoopLlm {
3713 async fn execute(
3714 &self,
3715 _input: LlmExecutionInput,
3716 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
3717 Err(LlmExecutionError::UnexpectedOutcome(
3718 "yaml_ir_uses_tool_path",
3719 ))
3720 }
3721 }
3722
3723 struct YamlIrToolExecutor<'a> {
3724 llm_executor: &'a dyn YamlWorkflowLlmExecutor,
3725 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
3726 token_totals: std::sync::Mutex<YamlTokenTotals>,
3727 node_usage: std::sync::Mutex<BTreeMap<String, YamlLlmTokenUsage>>,
3728 trace_id: Option<String>,
3729 trace_context: Option<TraceContext>,
3730 trace_input_context: Option<YamlWorkflowTraceContextInput>,
3731 tenant_context: YamlWorkflowTraceTenantContext,
3732 payload_mode: YamlWorkflowPayloadMode,
3733 trace_sampled: bool,
3734 }
3735
3736 #[async_trait]
3737 impl ToolExecutor for YamlIrToolExecutor<'_> {
3738 async fn execute_tool(
3739 &self,
3740 input: ToolExecutionInput,
3741 ) -> Result<Value, ToolExecutionError> {
3742 let context = build_yaml_context_from_ir_scope(&input.scoped_input);
3743
3744 if input.tool == YAML_LLM_TOOL_ID {
3745 let node_id = input
3746 .input
3747 .get("node_id")
3748 .and_then(Value::as_str)
3749 .ok_or_else(|| {
3750 ToolExecutionError::Failed("yaml llm call missing node_id".to_string())
3751 })?
3752 .to_string();
3753 let node_id_for_metrics = node_id.clone();
3754 let model = input
3755 .input
3756 .get("model")
3757 .and_then(Value::as_str)
3758 .ok_or_else(|| {
3759 ToolExecutionError::Failed("yaml llm call missing model".to_string())
3760 })?
3761 .to_string();
3762 let prompt_template = input
3763 .input
3764 .get("prompt_template")
3765 .and_then(Value::as_str)
3766 .unwrap_or_default()
3767 .to_string();
3768 let stream = input
3769 .input
3770 .get("stream")
3771 .and_then(Value::as_bool)
3772 .unwrap_or(false);
3773 let heal = input
3774 .input
3775 .get("heal")
3776 .and_then(Value::as_bool)
3777 .unwrap_or(false);
3778 let append_prompt_as_user = input
3779 .input
3780 .get("append_prompt_as_user")
3781 .and_then(Value::as_bool)
3782 .unwrap_or(true);
3783 let messages_path = input
3784 .input
3785 .get("messages_path")
3786 .and_then(Value::as_str)
3787 .map(str::to_string);
3788
3789 let messages = if let Some(path) = messages_path.as_deref() {
3790 Some(
3791 parse_messages_from_context(path, &context)
3792 .map_err(ToolExecutionError::Failed)?,
3793 )
3794 } else {
3795 None
3796 };
3797
3798 let prompt_bindings = collect_template_bindings(&prompt_template, &context);
3799 let prompt = interpolate_template(&prompt_template, &context);
3800 let email_text = context
3801 .get("input")
3802 .and_then(|v| v.get("email_text"))
3803 .and_then(Value::as_str)
3804 .unwrap_or_default();
3805 let schema = input
3806 .input
3807 .get("output_schema")
3808 .cloned()
3809 .unwrap_or_else(default_llm_output_schema);
3810
3811 let request = YamlLlmExecutionRequest {
3812 node_id,
3813 is_terminal_node: false,
3814 stream_json_as_text: input
3815 .input
3816 .get("stream_json_as_text")
3817 .and_then(Value::as_bool)
3818 .unwrap_or(false),
3819 model,
3820 messages,
3821 append_prompt_as_user,
3822 prompt,
3823 prompt_template,
3824 prompt_bindings,
3825 schema,
3826 stream,
3827 heal,
3828 tools: Vec::new(),
3829 tool_choice: None,
3830 max_tool_roundtrips: 1,
3831 tool_calls_global_key: None,
3832 tool_trace_mode: YamlToolTraceMode::Off,
3833 execution_context: context.clone(),
3834 email_text: email_text.to_string(),
3835 trace_id: self.trace_id.clone(),
3836 trace_context: self.trace_context.clone(),
3837 trace_sampled: self.trace_sampled,
3838 };
3839
3840 let llm_result = self
3841 .llm_executor
3842 .complete_structured(request, None)
3843 .await
3844 .map_err(ToolExecutionError::Failed);
3845
3846 if let Ok(ref result) = llm_result {
3847 if let Some(usage) = result.usage.as_ref() {
3848 if let Ok(mut totals) = self.token_totals.lock() {
3849 totals.add_usage(usage);
3850 }
3851 if let Ok(mut usage_map) = self.node_usage.lock() {
3852 usage_map.insert(node_id_for_metrics, usage.clone());
3853 }
3854 }
3855 }
3856
3857 return llm_result.map(|result| result.payload);
3858 }
3859
3860 let worker = self
3861 .custom_worker
3862 .ok_or_else(|| ToolExecutionError::NotFound {
3863 tool: input.tool.clone(),
3864 })?;
3865
3866 let payload = input.input.clone();
3867 let email_text = context
3868 .get("input")
3869 .and_then(|v| v.get("email_text"))
3870 .and_then(Value::as_str)
3871 .unwrap_or_default();
3872
3873 let tracer = workflow_tracer();
3874 let mut handler_span_context: Option<TraceContext> = None;
3875 let mut handler_span = if self.trace_sampled {
3876 let (span_context, mut span) = tracer.start_span(
3877 "handler.invoke",
3878 SpanKind::Node,
3879 self.trace_context.as_ref(),
3880 );
3881 handler_span_context = Some(span_context);
3882 if let Some(trace_id) = self.trace_id.as_ref() {
3883 span.set_attribute("trace_id", trace_id.as_str());
3884 }
3885 span.set_attribute("handler_name", input.tool.as_str());
3886 if let Some(workspace_id) = self.tenant_context.workspace_id.as_deref() {
3887 span.set_attribute("tenant.workspace_id", workspace_id);
3888 }
3889 if let Some(user_id) = self.tenant_context.user_id.as_deref() {
3890 span.set_attribute("tenant.user_id", user_id);
3891 }
3892 if let Some(conversation_id) = self.tenant_context.conversation_id.as_deref() {
3893 span.set_attribute("tenant.conversation_id", conversation_id);
3894 }
3895 if let Some(request_id) = self.tenant_context.request_id.as_deref() {
3896 span.set_attribute("tenant.request_id", request_id);
3897 }
3898 if let Some(run_id) = self.tenant_context.run_id.as_deref() {
3899 span.set_attribute("tenant.run_id", run_id);
3900 }
3901 span.set_attribute(
3902 "node_input",
3903 payload_for_span(self.payload_mode, &payload).as_str(),
3904 );
3905 Some(span)
3906 } else {
3907 None
3908 };
3909
3910 let trace_options = YamlWorkflowRunOptions {
3911 telemetry: YamlWorkflowTelemetryConfig::default(),
3912 trace: YamlWorkflowTraceOptions {
3913 context: self.trace_input_context.clone(),
3914 tenant: self.tenant_context.clone(),
3915 },
3916 model: None,
3917 };
3918 let worker_trace_context = merged_trace_context_for_worker(
3919 handler_span_context.as_ref(),
3920 self.trace_id.as_deref(),
3921 &trace_options,
3922 );
3923 let worker_context = custom_worker_context_with_trace(
3924 &context,
3925 &worker_trace_context,
3926 &self.tenant_context,
3927 );
3928
3929 let output_result = worker
3930 .execute(&input.tool, &payload, email_text, &worker_context)
3931 .await
3932 .map_err(ToolExecutionError::Failed);
3933
3934 if let Some(span) = handler_span.as_mut() {
3935 if output_result.is_ok() {
3936 span.add_event("handler.success");
3937 } else {
3938 span.add_event("handler.error");
3939 }
3940 }
3941
3942 if let Some(span) = handler_span.take() {
3943 span.end();
3944 }
3945
3946 output_result
3947 }
3948 }
3949
3950 let tool_executor = YamlIrToolExecutor {
3951 llm_executor: executor,
3952 custom_worker,
3953 token_totals: std::sync::Mutex::new(YamlTokenTotals::default()),
3954 node_usage: std::sync::Mutex::new(BTreeMap::new()),
3955 trace_id: telemetry_context.trace_id.clone(),
3956 trace_context: workflow_span_context.clone(),
3957 trace_input_context: options.trace.context.clone(),
3958 tenant_context: options.trace.tenant.clone(),
3959 payload_mode: options.telemetry.payload_mode,
3960 trace_sampled: telemetry_context.sampled,
3961 };
3962
3963 let runtime = WorkflowRuntime::new(
3964 ir,
3965 &NoopLlm,
3966 Some(&tool_executor),
3967 WorkflowRuntimeOptions::default(),
3968 );
3969
3970 let started = Instant::now();
3971 let result = match runtime.execute(workflow_input.clone(), None).await {
3972 Ok(result) => result,
3973 Err(WorkflowRuntimeError::Validation(_)) => return Ok(None),
3974 Err(error) => {
3975 return Err(YamlWorkflowRunError::IrRuntime {
3976 message: error.to_string(),
3977 });
3978 }
3979 };
3980 let total_elapsed_ms = started.elapsed().as_millis();
3981
3982 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
3983 for (node_id, output) in result.node_outputs {
3984 if node_id == YAML_START_NODE_ID {
3985 continue;
3986 }
3987 outputs.insert(node_id, json!({"output": output}));
3988 }
3989
3990 let mut trace = Vec::new();
3991 let mut step_timings = Vec::new();
3992 let node_usage_map = tool_executor
3993 .node_usage
3994 .lock()
3995 .map(|usage| usage.clone())
3996 .unwrap_or_default();
3997 let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
3998 for execution in result.node_executions {
3999 if execution.node_id == YAML_START_NODE_ID {
4000 continue;
4001 }
4002 trace.push(execution.node_id.clone());
4003 let usage = node_usage_map.get(&execution.node_id);
4004 if let Some(usage) = usage {
4005 llm_node_metrics.insert(
4006 execution.node_id.clone(),
4007 YamlLlmNodeMetrics {
4008 elapsed_ms: 0,
4009 prompt_tokens: usage.prompt_tokens,
4010 completion_tokens: usage.completion_tokens,
4011 total_tokens: usage.total_tokens,
4012 thinking_tokens: usage.thinking_tokens,
4013 tokens_per_second: completion_tokens_per_second(usage.completion_tokens, 0),
4014 },
4015 );
4016 }
4017 step_timings.push(YamlStepTiming {
4018 node_id: execution.node_id,
4019 node_kind: "ir_runtime".to_string(),
4020 elapsed_ms: 0,
4021 prompt_tokens: usage.map(|value| value.prompt_tokens),
4022 completion_tokens: usage.map(|value| value.completion_tokens),
4023 total_tokens: usage.map(|value| value.total_tokens),
4024 thinking_tokens: usage.and_then(|value| value.thinking_tokens),
4025 tokens_per_second: usage
4026 .map(|value| completion_tokens_per_second(value.completion_tokens, 0)),
4027 });
4028 }
4029
4030 let terminal_node = result.terminal_node_id;
4031 let terminal_output = outputs
4032 .get(&terminal_node)
4033 .and_then(|v| v.get("output"))
4034 .cloned();
4035
4036 let email_text = workflow_input
4037 .get("email_text")
4038 .and_then(Value::as_str)
4039 .unwrap_or_default()
4040 .to_string();
4041
4042 let token_totals = tool_executor
4043 .token_totals
4044 .lock()
4045 .map(|totals| totals.clone())
4046 .unwrap_or_default();
4047
4048 if let Some(mut span) = workflow_span.take() {
4049 span.set_attribute("workflow_id", workflow.id.as_str());
4050 if let Some(trace_id_value) = telemetry_context.trace_id.as_ref() {
4051 span.set_attribute("trace_id", trace_id_value.as_str());
4052 }
4053 span.end();
4054 flush_workflow_tracer();
4055 }
4056
4057 Ok(Some(YamlWorkflowRunOutput {
4058 workflow_id: workflow.id.clone(),
4059 entry_node: workflow.entry_node.clone(),
4060 email_text,
4061 trace,
4062 outputs,
4063 terminal_node,
4064 terminal_output,
4065 step_timings,
4066 llm_node_metrics,
4067 total_elapsed_ms,
4068 ttft_ms: None,
4069 total_input_tokens: token_totals.input_tokens,
4070 total_output_tokens: token_totals.output_tokens,
4071 total_tokens: token_totals.total_tokens,
4072 total_thinking_tokens: token_totals.thinking_tokens,
4073 tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
4074 trace_id: telemetry_context.trace_id.clone(),
4075 metadata: telemetry_context.trace_id.as_ref().map(|value| {
4076 workflow_metadata_with_trace(
4077 options,
4078 value,
4079 telemetry_context.sampled,
4080 telemetry_context.trace_id_source,
4081 )
4082 }),
4083 }))
4084}
4085
4086fn build_yaml_context_from_ir_scope(scoped_input: &Value) -> Value {
4087 let input = scoped_input.get("input").cloned().unwrap_or(Value::Null);
4088
4089 let mut nodes = serde_json::Map::new();
4090 if let Some(node_outputs) = scoped_input.get("node_outputs").and_then(Value::as_object) {
4091 for (node_id, output) in node_outputs {
4092 nodes.insert(node_id.clone(), json!({"output": output.clone()}));
4093 }
4094 }
4095
4096 json!({
4097 "input": input,
4098 "nodes": Value::Object(nodes),
4099 "globals": Value::Object(serde_json::Map::new())
4100 })
4101}
4102
4103pub async fn run_email_workflow_yaml_with_custom_worker_and_events(
4104 workflow: &YamlWorkflow,
4105 email_text: &str,
4106 executor: &dyn YamlWorkflowLlmExecutor,
4107 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4108 event_sink: Option<&dyn YamlWorkflowEventSink>,
4109) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
4110 let workflow_input = json!({ "email_text": email_text });
4111 run_workflow_yaml_with_custom_worker_and_events(
4112 workflow,
4113 &workflow_input,
4114 executor,
4115 custom_worker,
4116 event_sink,
4117 )
4118 .await
4119}
4120
4121fn evaluate_switch_condition(
4122 condition: &str,
4123 context: &Value,
4124) -> Result<bool, YamlWorkflowRunError> {
4125 let (left, right) =
4126 condition
4127 .split_once("==")
4128 .ok_or_else(|| YamlWorkflowRunError::UnsupportedCondition {
4129 condition: condition.to_string(),
4130 })?;
4131
4132 let left_path = left.trim().trim_start_matches("$.");
4133 let right_literal = right.trim().trim_matches('"').trim_matches('\'');
4134 let left_value = resolve_path(context, left_path);
4135 Ok(left_value
4136 .and_then(Value::as_str)
4137 .map(|value| value == right_literal)
4138 .unwrap_or(false))
4139}
4140
4141fn parse_messages_from_context(path: &str, context: &Value) -> Result<Vec<Message>, String> {
4142 let normalized_path = path.trim().trim_start_matches("$.");
4143 let value = resolve_path(context, normalized_path)
4144 .ok_or_else(|| format!("messages_path not found: {path}"))?;
4145 let list: Vec<WorkflowMessage> = serde_json::from_value(value.clone()).map_err(|err| {
4146 format!("messages_path must resolve to a list of messages: {path}; {err}")
4147 })?;
4148 if list.is_empty() {
4149 return Err(format!(
4150 "messages_path must not resolve to an empty list: {path}"
4151 ));
4152 }
4153
4154 let mut messages = Vec::with_capacity(list.len());
4155 for (index, item) in list.into_iter().enumerate() {
4156 let mut message = match item.role {
4157 Role::System => Message::system(item.content),
4158 Role::User => Message::user(item.content),
4159 Role::Assistant => Message::assistant(item.content),
4160 Role::Tool => {
4161 let tool_call_id = item
4162 .tool_call_id
4163 .ok_or_else(|| format!("tool message at index {index} missing tool_call_id"))?;
4164 Message::tool(item.content, tool_call_id)
4165 }
4166 };
4167
4168 if let Some(name) = item.name {
4169 message = message.with_name(name);
4170 }
4171
4172 messages.push(message);
4173 }
4174
4175 Ok(messages)
4176}
4177
4178pub fn verify_yaml_workflow(workflow: &YamlWorkflow) -> Vec<YamlWorkflowDiagnostic> {
4179 let mut diagnostics = Vec::new();
4180 let known_ids: HashMap<&str, &YamlNode> = workflow
4181 .nodes
4182 .iter()
4183 .map(|node| (node.id.as_str(), node))
4184 .collect();
4185
4186 if !known_ids.contains_key(workflow.entry_node.as_str()) {
4187 diagnostics.push(YamlWorkflowDiagnostic {
4188 node_id: None,
4189 code: "missing_entry".to_string(),
4190 severity: YamlWorkflowDiagnosticSeverity::Error,
4191 message: format!("entry node '{}' does not exist", workflow.entry_node),
4192 });
4193 }
4194
4195 for edge in &workflow.edges {
4196 if !known_ids.contains_key(edge.from.as_str()) {
4197 diagnostics.push(YamlWorkflowDiagnostic {
4198 node_id: Some(edge.from.clone()),
4199 code: "unknown_edge_from".to_string(),
4200 severity: YamlWorkflowDiagnosticSeverity::Error,
4201 message: format!("edge.from '{}' does not exist", edge.from),
4202 });
4203 }
4204 if !known_ids.contains_key(edge.to.as_str()) {
4205 diagnostics.push(YamlWorkflowDiagnostic {
4206 node_id: Some(edge.to.clone()),
4207 code: "unknown_edge_to".to_string(),
4208 severity: YamlWorkflowDiagnosticSeverity::Error,
4209 message: format!("edge.to '{}' does not exist", edge.to),
4210 });
4211 }
4212 }
4213
4214 for node in &workflow.nodes {
4215 if let Some(llm) = &node.node_type.llm_call {
4216 if llm.model.trim().is_empty() {
4217 diagnostics.push(YamlWorkflowDiagnostic {
4218 node_id: Some(node.id.clone()),
4219 code: "empty_model".to_string(),
4220 severity: YamlWorkflowDiagnosticSeverity::Error,
4221 message: "llm_call.model must not be empty".to_string(),
4222 });
4223 }
4224 if llm.stream.unwrap_or(false) && llm.heal.unwrap_or(false) {
4225 diagnostics.push(YamlWorkflowDiagnostic {
4226 node_id: Some(node.id.clone()),
4227 code: "stream_heal_conflict".to_string(),
4228 severity: YamlWorkflowDiagnosticSeverity::Warning,
4229 message:
4230 "llm_call.stream=true with heal=true is not streamable; runtime will disable streaming"
4231 .to_string(),
4232 });
4233 }
4234
4235 if llm.max_tool_roundtrips.unwrap_or(1) == 0 {
4236 diagnostics.push(YamlWorkflowDiagnostic {
4237 node_id: Some(node.id.clone()),
4238 code: "invalid_max_tool_roundtrips".to_string(),
4239 severity: YamlWorkflowDiagnosticSeverity::Error,
4240 message: "llm_call.max_tool_roundtrips must be >= 1".to_string(),
4241 });
4242 }
4243
4244 if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
4245 if global_key.trim().is_empty() {
4246 diagnostics.push(YamlWorkflowDiagnostic {
4247 node_id: Some(node.id.clone()),
4248 code: "empty_tool_calls_global_key".to_string(),
4249 severity: YamlWorkflowDiagnosticSeverity::Error,
4250 message: "llm_call.tool_calls_global_key must not be empty".to_string(),
4251 });
4252 }
4253 }
4254
4255 match normalize_tool_choice(llm.tool_choice.clone()) {
4256 Ok(choice) => {
4257 if let Some(ToolChoice::Tool(choice_tool)) = choice.as_ref() {
4258 if !llm.tools.iter().any(|tool| match (llm.tools_format, tool) {
4259 (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4260 openai.function.name == choice_tool.function.name
4261 }
4262 (
4263 YamlToolFormat::Simplified,
4264 YamlToolDeclaration::Simplified(simple),
4265 ) => simple.name == choice_tool.function.name,
4266 _ => false,
4267 }) {
4268 diagnostics.push(YamlWorkflowDiagnostic {
4269 node_id: Some(node.id.clone()),
4270 code: "unknown_tool_choice_function".to_string(),
4271 severity: YamlWorkflowDiagnosticSeverity::Error,
4272 message: format!(
4273 "llm_call.tool_choice references unknown function '{}'",
4274 choice_tool.function.name
4275 ),
4276 });
4277 }
4278 }
4279 }
4280 Err(message) => {
4281 diagnostics.push(YamlWorkflowDiagnostic {
4282 node_id: Some(node.id.clone()),
4283 code: "invalid_tool_choice".to_string(),
4284 severity: YamlWorkflowDiagnosticSeverity::Error,
4285 message,
4286 });
4287 }
4288 }
4289
4290 let normalized_tools = match normalize_llm_tools(llm) {
4291 Ok(tools) => tools,
4292 Err(message) => {
4293 diagnostics.push(YamlWorkflowDiagnostic {
4294 node_id: Some(node.id.clone()),
4295 code: "invalid_tools_format".to_string(),
4296 severity: YamlWorkflowDiagnosticSeverity::Error,
4297 message,
4298 });
4299 Vec::new()
4300 }
4301 };
4302
4303 let mut seen_tool_names = HashSet::new();
4304 for tool in &normalized_tools {
4305 let name = tool.definition.function.name.trim();
4306 if name.is_empty() {
4307 diagnostics.push(YamlWorkflowDiagnostic {
4308 node_id: Some(node.id.clone()),
4309 code: "empty_tool_name".to_string(),
4310 severity: YamlWorkflowDiagnosticSeverity::Error,
4311 message: "tool function name must not be empty".to_string(),
4312 });
4313 }
4314 if !seen_tool_names.insert(tool.definition.function.name.clone()) {
4315 diagnostics.push(YamlWorkflowDiagnostic {
4316 node_id: Some(node.id.clone()),
4317 code: "duplicate_tool_name".to_string(),
4318 severity: YamlWorkflowDiagnosticSeverity::Error,
4319 message: format!(
4320 "duplicate tool function name '{}' in node",
4321 tool.definition.function.name
4322 ),
4323 });
4324 }
4325
4326 let schema = tool
4327 .definition
4328 .function
4329 .parameters
4330 .clone()
4331 .unwrap_or(Value::Null);
4332 if schema.is_null() {
4333 diagnostics.push(YamlWorkflowDiagnostic {
4334 node_id: Some(node.id.clone()),
4335 code: "missing_tool_input_schema".to_string(),
4336 severity: YamlWorkflowDiagnosticSeverity::Error,
4337 message: format!(
4338 "tool '{}' is missing input schema",
4339 tool.definition.function.name
4340 ),
4341 });
4342 } else if let Err(message) = validate_json_schema(&schema) {
4343 diagnostics.push(YamlWorkflowDiagnostic {
4344 node_id: Some(node.id.clone()),
4345 code: "invalid_tool_input_schema".to_string(),
4346 severity: YamlWorkflowDiagnosticSeverity::Error,
4347 message: format!(
4348 "tool '{}' has invalid input schema: {}",
4349 tool.definition.function.name, message
4350 ),
4351 });
4352 }
4353
4354 if let Some(output_schema) = tool.output_schema.as_ref() {
4355 if let Err(message) = validate_json_schema(output_schema) {
4356 diagnostics.push(YamlWorkflowDiagnostic {
4357 node_id: Some(node.id.clone()),
4358 code: "invalid_tool_output_schema".to_string(),
4359 severity: YamlWorkflowDiagnosticSeverity::Error,
4360 message: format!(
4361 "tool '{}' has invalid output schema: {}",
4362 tool.definition.function.name, message
4363 ),
4364 });
4365 }
4366 }
4367 }
4368 }
4369
4370 if let Some(switch) = &node.node_type.switch {
4371 for branch in &switch.branches {
4372 if !known_ids.contains_key(branch.target.as_str()) {
4373 diagnostics.push(YamlWorkflowDiagnostic {
4374 node_id: Some(node.id.clone()),
4375 code: "unknown_switch_target".to_string(),
4376 severity: YamlWorkflowDiagnosticSeverity::Error,
4377 message: format!("switch branch target '{}' does not exist", branch.target),
4378 });
4379 }
4380 }
4381 if !known_ids.contains_key(switch.default.as_str()) {
4382 diagnostics.push(YamlWorkflowDiagnostic {
4383 node_id: Some(node.id.clone()),
4384 code: "unknown_switch_default".to_string(),
4385 severity: YamlWorkflowDiagnosticSeverity::Error,
4386 message: format!("switch default target '{}' does not exist", switch.default),
4387 });
4388 }
4389 }
4390
4391 if let Some(config) = node.config.as_ref() {
4392 if let Some(update_globals) = config.update_globals.as_ref() {
4393 for (key, update) in update_globals {
4394 let is_valid_op =
4395 matches!(update.op.as_str(), "set" | "append" | "increment" | "merge");
4396 if !is_valid_op {
4397 diagnostics.push(YamlWorkflowDiagnostic {
4398 node_id: Some(node.id.clone()),
4399 code: "unknown_update_op".to_string(),
4400 severity: YamlWorkflowDiagnosticSeverity::Error,
4401 message: format!(
4402 "update_globals key '{}' has unknown op '{}'; expected set|append|increment|merge",
4403 key, update.op
4404 ),
4405 });
4406 }
4407
4408 if update.op != "increment" && update.from.is_none() {
4409 diagnostics.push(YamlWorkflowDiagnostic {
4410 node_id: Some(node.id.clone()),
4411 code: "missing_update_from".to_string(),
4412 severity: YamlWorkflowDiagnosticSeverity::Error,
4413 message: format!(
4414 "update_globals key '{}' with op '{}' requires 'from'",
4415 key, update.op
4416 ),
4417 });
4418 }
4419 }
4420 }
4421 }
4422 }
4423
4424 diagnostics
4425}
4426
4427fn resolve_path<'a>(value: &'a Value, path: &str) -> Option<&'a Value> {
4428 path.split('.')
4429 .filter(|segment| !segment.is_empty())
4430 .try_fold(value, |current, segment| {
4431 if let Ok(index) = segment.parse::<usize>() {
4432 return current.get(index);
4433 }
4434 current.get(segment)
4435 })
4436}
4437
4438fn interpolate_template(template: &str, context: &Value) -> String {
4439 let mut out = String::with_capacity(template.len());
4440 let mut rest = template;
4441
4442 loop {
4443 let Some(start) = rest.find("{{") else {
4444 out.push_str(rest);
4445 break;
4446 };
4447
4448 out.push_str(&rest[..start]);
4449 let after_start = &rest[start + 2..];
4450 let Some(end) = after_start.find("}}") else {
4451 out.push_str(&rest[start..]);
4452 break;
4453 };
4454
4455 let expr = after_start[..end].trim();
4456 let source_path = expr.trim_start_matches("$.");
4457 let replacement = resolve_path(context, source_path)
4458 .map(value_to_template_string)
4459 .unwrap_or_default();
4460 out.push_str(replacement.as_str());
4461
4462 rest = &after_start[end + 2..];
4463 }
4464
4465 out
4466}
4467
4468fn collect_template_bindings(template: &str, context: &Value) -> Vec<YamlTemplateBinding> {
4469 let mut bindings = Vec::new();
4470 let mut rest = template;
4471
4472 loop {
4473 let Some(start) = rest.find("{{") else {
4474 break;
4475 };
4476
4477 let after_start = &rest[start + 2..];
4478 let Some(end) = after_start.find("}}") else {
4479 break;
4480 };
4481
4482 let expr = after_start[..end].trim();
4483 let source_path = expr.trim_start_matches("$.").to_string();
4484 let resolved = resolve_path(context, source_path.as_str()).cloned();
4485 let missing = resolved.is_none();
4486 let resolved_value = resolved.unwrap_or(Value::Null);
4487 bindings.push(YamlTemplateBinding {
4488 index: bindings.len(),
4489 expression: expr.to_string(),
4490 source_path,
4491 resolved_type: json_type_name(&resolved_value).to_string(),
4492 missing,
4493 resolved: resolved_value,
4494 });
4495
4496 rest = &after_start[end + 2..];
4497 }
4498
4499 bindings
4500}
4501
4502fn json_type_name(value: &Value) -> &'static str {
4503 match value {
4504 Value::Null => "null",
4505 Value::Bool(_) => "bool",
4506 Value::Number(_) => "number",
4507 Value::String(_) => "string",
4508 Value::Array(_) => "array",
4509 Value::Object(_) => "object",
4510 }
4511}
4512
4513fn value_to_template_string(value: &Value) -> String {
4514 match value {
4515 Value::Null => String::new(),
4516 Value::Bool(v) => v.to_string(),
4517 Value::Number(v) => v.to_string(),
4518 Value::String(v) => v.clone(),
4519 Value::Array(_) | Value::Object(_) => serde_json::to_string(value).unwrap_or_default(),
4520 }
4521}
4522
4523fn apply_set_globals(
4524 node: &YamlNode,
4525 outputs: &BTreeMap<String, Value>,
4526 workflow_input: &Value,
4527 globals: &mut serde_json::Map<String, Value>,
4528) {
4529 let Some(config) = node.config.as_ref() else {
4530 return;
4531 };
4532 let Some(set_globals) = config.set_globals.as_ref() else {
4533 return;
4534 };
4535
4536 let context = json!({
4537 "input": workflow_input,
4538 "nodes": outputs,
4539 "globals": Value::Object(globals.clone())
4540 });
4541
4542 for (key, expr) in set_globals {
4543 let value = resolve_path(&context, expr.as_str())
4544 .cloned()
4545 .unwrap_or(Value::Null);
4546 globals.insert(key.clone(), value);
4547 }
4548}
4549
4550fn apply_update_globals(
4551 node: &YamlNode,
4552 outputs: &BTreeMap<String, Value>,
4553 workflow_input: &Value,
4554 globals: &mut serde_json::Map<String, Value>,
4555) {
4556 let Some(config) = node.config.as_ref() else {
4557 return;
4558 };
4559 let Some(update_globals) = config.update_globals.as_ref() else {
4560 return;
4561 };
4562
4563 let context = json!({
4564 "input": workflow_input,
4565 "nodes": outputs,
4566 "globals": Value::Object(globals.clone())
4567 });
4568
4569 for (key, update) in update_globals {
4570 match update.op.as_str() {
4571 "set" => {
4572 if let Some(path) = update.from.as_ref() {
4573 let value = resolve_path(&context, path.as_str())
4574 .cloned()
4575 .unwrap_or(Value::Null);
4576 globals.insert(key.clone(), value);
4577 }
4578 }
4579 "append" => {
4580 if let Some(path) = update.from.as_ref() {
4581 let value = resolve_path(&context, path.as_str())
4582 .cloned()
4583 .unwrap_or(Value::Null);
4584 let entry = globals
4585 .entry(key.clone())
4586 .or_insert_with(|| Value::Array(Vec::new()));
4587 match entry {
4588 Value::Array(items) => items.push(value),
4589 other => {
4590 let existing = other.clone();
4591 *other = Value::Array(vec![existing, value]);
4592 }
4593 }
4594 }
4595 }
4596 "increment" => {
4597 let by = update.by.unwrap_or(1.0);
4598 let current = globals
4599 .get(key.as_str())
4600 .and_then(Value::as_f64)
4601 .unwrap_or(0.0);
4602 if let Some(next) = serde_json::Number::from_f64(current + by) {
4603 globals.insert(key.clone(), Value::Number(next));
4604 }
4605 }
4606 "merge" => {
4607 if let Some(path) = update.from.as_ref() {
4608 let source = resolve_path(&context, path.as_str())
4609 .cloned()
4610 .unwrap_or(Value::Null);
4611 if let Value::Object(source_map) = source {
4612 let target = globals
4613 .entry(key.clone())
4614 .or_insert_with(|| Value::Object(serde_json::Map::new()));
4615 if let Value::Object(target_map) = target {
4616 target_map.extend(source_map);
4617 } else {
4618 *target = Value::Object(source_map);
4619 }
4620 }
4621 }
4622 }
4623 _ => {}
4624 }
4625 }
4626}
4627
4628fn llm_output_schema_for_node(node: &YamlNode) -> Value {
4629 if let Some(schema) = node
4630 .config
4631 .as_ref()
4632 .and_then(|cfg| cfg.output_schema.clone())
4633 {
4634 return schema;
4635 }
4636
4637 default_llm_output_schema()
4638}
4639
4640fn normalize_tool_choice(
4641 config: Option<YamlToolChoiceConfig>,
4642) -> Result<Option<ToolChoice>, String> {
4643 let Some(config) = config else {
4644 return Ok(None);
4645 };
4646
4647 let choice = match config {
4648 YamlToolChoiceConfig::Mode(mode) => ToolChoice::Mode(mode),
4649 YamlToolChoiceConfig::Function { function } => ToolChoice::Tool(ToolChoiceTool {
4650 tool_type: ToolType::Function,
4651 function: ToolChoiceFunction { name: function },
4652 }),
4653 YamlToolChoiceConfig::OpenAi(tool) => ToolChoice::Tool(tool),
4654 };
4655
4656 Ok(Some(choice))
4657}
4658
4659fn normalize_llm_tools(llm: &YamlLlmCall) -> Result<Vec<YamlResolvedTool>, String> {
4660 llm.tools
4661 .iter()
4662 .map(|tool| match (llm.tools_format, tool) {
4663 (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4664 let definition = ToolDefinition {
4665 tool_type: openai.tool_type.unwrap_or(ToolType::Function),
4666 function: ToolFunction {
4667 name: openai.function.name.clone(),
4668 description: openai.function.description.clone(),
4669 parameters: openai.function.parameters.clone(),
4670 },
4671 };
4672 Ok(YamlResolvedTool {
4673 definition,
4674 output_schema: openai.function.output_schema.clone(),
4675 })
4676 }
4677 (YamlToolFormat::Simplified, YamlToolDeclaration::Simplified(simple)) => {
4678 let definition = ToolDefinition {
4679 tool_type: ToolType::Function,
4680 function: ToolFunction {
4681 name: simple.name.clone(),
4682 description: simple.description.clone(),
4683 parameters: Some(simple.input_schema.clone()),
4684 },
4685 };
4686 Ok(YamlResolvedTool {
4687 definition,
4688 output_schema: simple.output_schema.clone(),
4689 })
4690 }
4691 (YamlToolFormat::Openai, _) => {
4692 Err("tools_format=openai requires OpenAI-style tool declarations".to_string())
4693 }
4694 (YamlToolFormat::Simplified, _) => {
4695 Err("tools_format=simplified requires simplified tool declarations".to_string())
4696 }
4697 })
4698 .collect()
4699}
4700
4701fn default_llm_output_schema() -> Value {
4702 json!({
4703 "type": "object",
4704 "additionalProperties": true
4705 })
4706}
4707
4708fn mock_rag(topic: &str) -> Value {
4709 let (kb_source, playbook) = match topic {
4710 "probation" => (
4711 "hr_policy/probation.md",
4712 "Collect manager review, performance evidence, and probation timeline.",
4713 ),
4714 "leave_request" => (
4715 "hr_policy/leave.md",
4716 "Validate leave balance, manager approval, and blackout dates.",
4717 ),
4718 "supply_chain_order_assessment" => (
4719 "supply_chain/order_assessment.md",
4720 "Review order specs, inventory risk, and vendor lead-time guidance.",
4721 ),
4722 "supply_chain_order_replacement" => (
4723 "supply_chain/order_replacement.md",
4724 "Collect order id, damage proof, and replacement SLA policy.",
4725 ),
4726 "termination_first_time_offense" => (
4727 "hr_policy/termination_first_offense.md",
4728 "Validate first-incident criteria and route to HRBP review.",
4729 ),
4730 "termination_repeated_offense" => (
4731 "hr_policy/termination_repeated_offense.md",
4732 "Collect prior warnings and escalation approvals before final action.",
4733 ),
4734 _ => (
4735 "shared/request_clarification.md",
4736 "Request clarifying details before routing.",
4737 ),
4738 };
4739
4740 json!({
4741 "kb_source": kb_source,
4742 "playbook": playbook,
4743 })
4744}
4745
4746async fn execute_subworkflow_tool_call(
4747 payload: &Value,
4748 context: &Value,
4749 client: &SimpleAgentsClient,
4750 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4751 parent_options: &YamlWorkflowRunOptions,
4752 parent_trace_context: Option<&TraceContext>,
4753 resolved_trace_id: Option<&str>,
4754) -> Result<Value, String> {
4755 let workflow_id = payload
4756 .get("workflow_id")
4757 .and_then(Value::as_str)
4758 .ok_or_else(|| "run_workflow_graph requires payload.workflow_id".to_string())?;
4759
4760 let input_context = context
4761 .get("input")
4762 .and_then(Value::as_object)
4763 .ok_or_else(|| "run_workflow_graph requires context.input".to_string())?;
4764
4765 let registry = input_context
4766 .get("workflow_registry")
4767 .and_then(Value::as_object)
4768 .ok_or_else(|| {
4769 "run_workflow_graph requires input.workflow_registry map of workflow_id -> yaml_path"
4770 .to_string()
4771 })?;
4772
4773 let workflow_path = registry
4774 .get(workflow_id)
4775 .and_then(Value::as_str)
4776 .ok_or_else(|| {
4777 format!(
4778 "workflow_registry has no entry for workflow_id '{}'",
4779 workflow_id
4780 )
4781 })?;
4782
4783 let parent_depth = input_context
4784 .get("__subgraph_depth")
4785 .and_then(Value::as_u64)
4786 .unwrap_or(0);
4787 let max_depth = input_context
4788 .get("__subgraph_max_depth")
4789 .and_then(Value::as_u64)
4790 .unwrap_or(3);
4791
4792 if parent_depth >= max_depth {
4793 return Err(format!(
4794 "run_workflow_graph depth limit reached (depth={}, max={})",
4795 parent_depth, max_depth
4796 ));
4797 }
4798
4799 let mut subworkflow_input = payload
4800 .get("input")
4801 .and_then(Value::as_object)
4802 .cloned()
4803 .unwrap_or_default();
4804
4805 if !subworkflow_input.contains_key("messages") {
4806 if let Some(messages) = input_context.get("messages") {
4807 subworkflow_input.insert("messages".to_string(), messages.clone());
4808 }
4809 }
4810
4811 if !subworkflow_input.contains_key("email_text") {
4812 if let Some(email_text) = input_context.get("email_text") {
4813 subworkflow_input.insert("email_text".to_string(), email_text.clone());
4814 }
4815 }
4816
4817 if !subworkflow_input.contains_key("workflow_registry") {
4818 subworkflow_input.insert(
4819 "workflow_registry".to_string(),
4820 Value::Object(registry.clone()),
4821 );
4822 }
4823
4824 subworkflow_input.insert(
4825 "__subgraph_depth".to_string(),
4826 Value::Number(serde_json::Number::from(parent_depth + 1)),
4827 );
4828 subworkflow_input.insert(
4829 "__subgraph_max_depth".to_string(),
4830 Value::Number(serde_json::Number::from(max_depth)),
4831 );
4832
4833 let subworkflow_options =
4834 build_subworkflow_options(parent_options, parent_trace_context, resolved_trace_id);
4835
4836 let output = run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
4837 Path::new(workflow_path),
4838 &Value::Object(subworkflow_input),
4839 client,
4840 custom_worker,
4841 None,
4842 &subworkflow_options,
4843 )
4844 .await
4845 .map_err(|error| format!("subworkflow '{}' failed: {}", workflow_id, error))?;
4846
4847 Ok(json!({
4848 "workflow_id": workflow_id,
4849 "workflow_path": workflow_path,
4850 "terminal_node": output.terminal_node,
4851 "terminal_output": output.terminal_output,
4852 "trace": output.trace,
4853 }))
4854}
4855
4856fn build_subworkflow_options(
4857 parent_options: &YamlWorkflowRunOptions,
4858 parent_trace_context: Option<&TraceContext>,
4859 resolved_trace_id: Option<&str>,
4860) -> YamlWorkflowRunOptions {
4861 let mut subworkflow_options = parent_options.clone();
4862 if parent_trace_context.is_some() || resolved_trace_id.is_some() {
4863 let trace_context = YamlWorkflowTraceContextInput {
4864 trace_id: resolved_trace_id
4865 .map(|value| value.to_string())
4866 .or_else(|| parent_trace_context.and_then(|ctx| ctx.trace_id.clone())),
4867 span_id: parent_trace_context.and_then(|ctx| ctx.span_id.clone()),
4868 parent_span_id: parent_trace_context.and_then(|ctx| ctx.parent_span_id.clone()),
4869 traceparent: parent_trace_context.and_then(|ctx| ctx.traceparent.clone()),
4870 tracestate: parent_trace_context.and_then(|ctx| ctx.tracestate.clone()),
4871 baggage: parent_trace_context
4872 .map(|ctx| ctx.baggage.clone())
4873 .unwrap_or_default(),
4874 };
4875 subworkflow_options.trace.context = Some(trace_context);
4876 }
4877 subworkflow_options
4878}
4879
4880fn mock_custom_worker_output(
4881 handler: &str,
4882 payload: &Value,
4883) -> Result<Value, YamlWorkflowRunError> {
4884 if handler == "get_employee_record" {
4885 let employee_name = payload
4886 .get("employee_name")
4887 .and_then(Value::as_str)
4888 .unwrap_or("Unknown Employee")
4889 .trim();
4890 let normalized_name = if employee_name.is_empty() {
4891 "Unknown Employee"
4892 } else {
4893 employee_name
4894 };
4895
4896 let (employee_id, location) = match normalized_name.to_ascii_lowercase().as_str() {
4897 "alex johnson" => ("EMP-2041", "Austin"),
4898 "priya sharma" => ("EMP-3378", "Bengaluru"),
4899 "marcus lee" => ("EMP-1196", "Singapore"),
4900 "sarah chen" => ("EMP-4450", "Toronto"),
4901 _ => ("EMP-0000", "Unassigned"),
4902 };
4903
4904 return Ok(json!({
4905 "employee_name": normalized_name,
4906 "employee_id": employee_id,
4907 "location": location,
4908 }));
4909 }
4910
4911 if let Some(topic) = payload.get("topic").and_then(Value::as_str) {
4912 let mut value = mock_rag(topic);
4913 if let Value::Object(object) = &mut value {
4914 object.insert("handler".to_string(), Value::String(handler.to_string()));
4915 }
4916 return Ok(value);
4917 }
4918
4919 Err(YamlWorkflowRunError::UnsupportedCustomHandler {
4920 handler: handler.to_string(),
4921 })
4922}
4923
4924#[derive(Debug, Clone, Deserialize)]
4925pub struct YamlWorkflow {
4926 pub id: String,
4927 pub entry_node: String,
4928 #[serde(default)]
4929 pub nodes: Vec<YamlNode>,
4930 #[serde(default)]
4931 pub edges: Vec<YamlEdge>,
4932}
4933
4934#[derive(Debug, Clone, Deserialize)]
4935pub struct YamlNode {
4936 pub id: String,
4937 pub node_type: YamlNodeType,
4938 pub config: Option<YamlNodeConfig>,
4939}
4940
4941impl YamlNode {
4942 fn kind_name(&self) -> &'static str {
4943 if self.node_type.llm_call.is_some() {
4944 "llm_call"
4945 } else if self.node_type.switch.is_some() {
4946 "switch"
4947 } else if self.node_type.custom_worker.is_some() {
4948 "custom_worker"
4949 } else {
4950 "unknown"
4951 }
4952 }
4953}
4954
4955#[derive(Debug, Clone, Deserialize)]
4956pub struct YamlNodeType {
4957 pub llm_call: Option<YamlLlmCall>,
4958 pub switch: Option<YamlSwitch>,
4959 pub custom_worker: Option<YamlCustomWorker>,
4960}
4961
4962#[derive(Debug, Clone, Deserialize)]
4963pub struct YamlLlmCall {
4964 pub model: String,
4965 pub stream: Option<bool>,
4966 pub stream_json_as_text: Option<bool>,
4967 pub heal: Option<bool>,
4968 pub messages_path: Option<String>,
4969 pub append_prompt_as_user: Option<bool>,
4970 #[serde(default)]
4971 pub tools_format: YamlToolFormat,
4972 #[serde(default)]
4973 pub tools: Vec<YamlToolDeclaration>,
4974 pub tool_choice: Option<YamlToolChoiceConfig>,
4975 pub max_tool_roundtrips: Option<u8>,
4976 pub tool_calls_global_key: Option<String>,
4977}
4978
4979#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
4980#[serde(rename_all = "snake_case")]
4981pub enum YamlToolFormat {
4982 #[default]
4983 Openai,
4984 Simplified,
4985}
4986
4987#[derive(Debug, Clone, Deserialize)]
4988#[serde(untagged)]
4989pub enum YamlToolDeclaration {
4990 OpenAi(YamlOpenAiToolDeclaration),
4991 Simplified(YamlSimplifiedToolDeclaration),
4992}
4993
4994#[derive(Debug, Clone, Deserialize)]
4995pub struct YamlOpenAiToolDeclaration {
4996 #[serde(rename = "type")]
4997 pub tool_type: Option<ToolType>,
4998 pub function: YamlOpenAiToolFunction,
4999}
5000
5001#[derive(Debug, Clone, Deserialize)]
5002pub struct YamlOpenAiToolFunction {
5003 pub name: String,
5004 pub description: Option<String>,
5005 pub parameters: Option<Value>,
5006 pub output_schema: Option<Value>,
5007}
5008
5009#[derive(Debug, Clone, Deserialize)]
5010pub struct YamlSimplifiedToolDeclaration {
5011 pub name: String,
5012 pub description: Option<String>,
5013 pub input_schema: Value,
5014 pub output_schema: Option<Value>,
5015}
5016
5017#[derive(Debug, Clone, Deserialize)]
5018#[serde(untagged)]
5019pub enum YamlToolChoiceConfig {
5020 Mode(ToolChoiceMode),
5021 Function { function: String },
5022 OpenAi(ToolChoiceTool),
5023}
5024
5025#[derive(Debug, Clone, Deserialize)]
5026pub struct YamlSwitch {
5027 #[serde(default)]
5028 pub branches: Vec<YamlSwitchBranch>,
5029 pub default: String,
5030}
5031
5032#[derive(Debug, Clone, Deserialize)]
5033pub struct YamlSwitchBranch {
5034 pub condition: String,
5035 pub target: String,
5036}
5037
5038#[derive(Debug, Clone, Deserialize)]
5039pub struct YamlCustomWorker {
5040 pub handler: String,
5041}
5042
5043#[derive(Debug, Clone, Deserialize)]
5044pub struct YamlNodeConfig {
5045 pub prompt: Option<String>,
5046 #[serde(default, alias = "schema")]
5047 pub output_schema: Option<Value>,
5048 pub payload: Option<Value>,
5049 pub set_globals: Option<HashMap<String, String>>,
5050 pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
5051}
5052
5053#[derive(Debug, Clone, Deserialize)]
5054pub struct YamlGlobalUpdate {
5055 pub op: String,
5056 pub from: Option<String>,
5057 pub by: Option<f64>,
5058}
5059
5060#[derive(Debug, Clone, Deserialize)]
5061pub struct YamlEdge {
5062 pub from: String,
5063 pub to: String,
5064}
5065
5066#[cfg(test)]
5067mod tests {
5068 use super::*;
5069 use simple_agent_type::provider::{Provider, ProviderRequest, ProviderResponse};
5070 use simple_agent_type::response::{CompletionChoice, CompletionResponse, Usage};
5071 use simple_agent_type::tool::{ToolCallFunction, ToolType};
5072 use simple_agent_type::{Result as SaResult, SimpleAgentsError};
5073 use simple_agents_core::SimpleAgentsClientBuilder;
5074 use std::fs;
5075 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5076 use std::sync::{Arc, Mutex, OnceLock};
5077
5078 fn stream_debug_env_lock() -> &'static Mutex<()> {
5079 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
5080 LOCK.get_or_init(|| Mutex::new(()))
5081 }
5082
5083 struct MockExecutor;
5084
5085 struct RecordingSink {
5086 events: Mutex<Vec<YamlWorkflowEvent>>,
5087 }
5088
5089 struct CancelAfterFirstEventSink {
5090 cancelled: AtomicBool,
5091 }
5092
5093 impl YamlWorkflowEventSink for CancelAfterFirstEventSink {
5094 fn emit(&self, _event: &YamlWorkflowEvent) {
5095 self.cancelled.store(true, Ordering::SeqCst);
5096 }
5097
5098 fn is_cancelled(&self) -> bool {
5099 self.cancelled.load(Ordering::SeqCst)
5100 }
5101 }
5102
5103 struct CountingExecutor {
5104 call_count: AtomicUsize,
5105 }
5106
5107 struct CapturingWorker {
5108 context: Mutex<Option<Value>>,
5109 }
5110
5111 struct ToolLoopProvider;
5112
5113 struct UnknownToolProvider;
5114
5115 #[async_trait]
5116 impl Provider for ToolLoopProvider {
5117 fn name(&self) -> &str {
5118 "openai"
5119 }
5120
5121 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5122 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5123 Ok(ProviderRequest::new("mock://tool-loop").with_body(body))
5124 }
5125
5126 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5127 let request: CompletionRequest =
5128 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5129
5130 let has_tools = request
5131 .tools
5132 .as_ref()
5133 .is_some_and(|tools| !tools.is_empty());
5134 let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
5135
5136 let response = if has_tools && !has_tool_result {
5137 CompletionResponse {
5138 id: "resp_tool_1".to_string(),
5139 model: request.model.clone(),
5140 choices: vec![CompletionChoice {
5141 index: 0,
5142 message: Message::assistant("").with_tool_calls(vec![ToolCall {
5143 id: "call_get_context".to_string(),
5144 tool_type: ToolType::Function,
5145 function: ToolCallFunction {
5146 name: "get_customer_context".to_string(),
5147 arguments: "{\"order_id\":\"123\"}".to_string(),
5148 },
5149 }]),
5150 finish_reason: FinishReason::ToolCalls,
5151 logprobs: None,
5152 }],
5153 usage: Usage::new(10, 5),
5154 created: None,
5155 provider: Some(self.name().to_string()),
5156 healing_metadata: None,
5157 }
5158 } else if has_tools && has_tool_result {
5159 CompletionResponse {
5160 id: "resp_tool_2".to_string(),
5161 model: request.model.clone(),
5162 choices: vec![CompletionChoice {
5163 index: 0,
5164 message: Message::assistant("{\"state\":\"done\"}"),
5165 finish_reason: FinishReason::Stop,
5166 logprobs: None,
5167 }],
5168 usage: Usage::new(12, 6),
5169 created: None,
5170 provider: Some(self.name().to_string()),
5171 healing_metadata: None,
5172 }
5173 } else {
5174 let prompt = request
5175 .messages
5176 .iter()
5177 .rev()
5178 .find(|m| m.role == Role::User)
5179 .map(|m| m.content.clone())
5180 .unwrap_or_default();
5181 let payload = json!({
5182 "subject": "ok",
5183 "body": prompt,
5184 })
5185 .to_string();
5186 CompletionResponse {
5187 id: "resp_final".to_string(),
5188 model: request.model.clone(),
5189 choices: vec![CompletionChoice {
5190 index: 0,
5191 message: Message::assistant(payload),
5192 finish_reason: FinishReason::Stop,
5193 logprobs: None,
5194 }],
5195 usage: Usage::new(8, 4),
5196 created: None,
5197 provider: Some(self.name().to_string()),
5198 healing_metadata: None,
5199 }
5200 };
5201
5202 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5203 Ok(ProviderResponse::new(200, body))
5204 }
5205
5206 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5207 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5208 }
5209 }
5210
5211 #[async_trait]
5212 impl Provider for UnknownToolProvider {
5213 fn name(&self) -> &str {
5214 "openai"
5215 }
5216
5217 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5218 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5219 Ok(ProviderRequest::new("mock://unknown-tool").with_body(body))
5220 }
5221
5222 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5223 let request: CompletionRequest =
5224 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5225
5226 let response = CompletionResponse {
5227 id: "resp_unknown_tool".to_string(),
5228 model: request.model,
5229 choices: vec![CompletionChoice {
5230 index: 0,
5231 message: Message::assistant("").with_tool_calls(vec![ToolCall {
5232 id: "call_unknown".to_string(),
5233 tool_type: ToolType::Function,
5234 function: ToolCallFunction {
5235 name: "unknown_tool".to_string(),
5236 arguments: "{}".to_string(),
5237 },
5238 }]),
5239 finish_reason: FinishReason::ToolCalls,
5240 logprobs: None,
5241 }],
5242 usage: Usage::new(5, 2),
5243 created: None,
5244 provider: Some(self.name().to_string()),
5245 healing_metadata: None,
5246 };
5247
5248 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5249 Ok(ProviderResponse::new(200, body))
5250 }
5251
5252 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5253 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5254 }
5255 }
5256
5257 struct FixedToolWorker {
5258 payload: Value,
5259 }
5260
5261 struct CountingToolWorker {
5262 execute_calls: AtomicUsize,
5263 }
5264
5265 #[async_trait]
5266 impl YamlWorkflowCustomWorkerExecutor for FixedToolWorker {
5267 async fn execute(
5268 &self,
5269 _handler: &str,
5270 _payload: &Value,
5271 _email_text: &str,
5272 _context: &Value,
5273 ) -> Result<Value, String> {
5274 Ok(self.payload.clone())
5275 }
5276 }
5277
5278 #[async_trait]
5279 impl YamlWorkflowCustomWorkerExecutor for CountingToolWorker {
5280 async fn execute(
5281 &self,
5282 _handler: &str,
5283 _payload: &Value,
5284 _email_text: &str,
5285 _context: &Value,
5286 ) -> Result<Value, String> {
5287 self.execute_calls.fetch_add(1, Ordering::SeqCst);
5288 Ok(json!({"ok": true}))
5289 }
5290 }
5291
5292 #[async_trait]
5293 impl YamlWorkflowLlmExecutor for CountingExecutor {
5294 async fn complete_structured(
5295 &self,
5296 _request: YamlLlmExecutionRequest,
5297 _event_sink: Option<&dyn YamlWorkflowEventSink>,
5298 ) -> Result<YamlLlmExecutionResult, String> {
5299 self.call_count.fetch_add(1, Ordering::SeqCst);
5300 Ok(YamlLlmExecutionResult {
5301 payload: json!({"state":"ok"}),
5302 usage: None,
5303 ttft_ms: None,
5304 tool_calls: Vec::new(),
5305 })
5306 }
5307 }
5308
5309 impl YamlWorkflowEventSink for RecordingSink {
5310 fn emit(&self, event: &YamlWorkflowEvent) {
5311 self.events
5312 .lock()
5313 .expect("recording sink lock should not be poisoned")
5314 .push(event.clone());
5315 }
5316 }
5317
5318 #[async_trait]
5319 impl YamlWorkflowCustomWorkerExecutor for CapturingWorker {
5320 async fn execute(
5321 &self,
5322 _handler: &str,
5323 _payload: &Value,
5324 _email_text: &str,
5325 context: &Value,
5326 ) -> Result<Value, String> {
5327 let mut guard = self
5328 .context
5329 .lock()
5330 .map_err(|_| "capturing worker lock should not be poisoned".to_string())?;
5331 *guard = Some(context.clone());
5332 Ok(json!({"ok": true}))
5333 }
5334 }
5335
5336 #[async_trait]
5337 impl YamlWorkflowLlmExecutor for MockExecutor {
5338 async fn complete_structured(
5339 &self,
5340 request: YamlLlmExecutionRequest,
5341 _event_sink: Option<&dyn YamlWorkflowEventSink>,
5342 ) -> Result<YamlLlmExecutionResult, String> {
5343 let prompt = request.prompt;
5344 if prompt.contains("exactly one category") {
5345 return Ok(YamlLlmExecutionResult {
5346 payload: json!({"category":"termination","reason":"mock"}),
5347 usage: Some(YamlLlmTokenUsage {
5348 prompt_tokens: 10,
5349 completion_tokens: 5,
5350 total_tokens: 15,
5351 thinking_tokens: None,
5352 }),
5353 ttft_ms: None,
5354 tool_calls: Vec::new(),
5355 });
5356 }
5357 if prompt.contains("Determine termination subtype") {
5358 return Ok(YamlLlmExecutionResult {
5359 payload: json!({"subtype":"repeated_offense","reason":"mock"}),
5360 usage: Some(YamlLlmTokenUsage {
5361 prompt_tokens: 12,
5362 completion_tokens: 6,
5363 total_tokens: 18,
5364 thinking_tokens: None,
5365 }),
5366 ttft_ms: None,
5367 tool_calls: Vec::new(),
5368 });
5369 }
5370 if prompt.contains("Determine supply chain subtype") {
5371 return Ok(YamlLlmExecutionResult {
5372 payload: json!({"subtype":"order_replacement","reason":"mock"}),
5373 usage: Some(YamlLlmTokenUsage {
5374 prompt_tokens: 11,
5375 completion_tokens: 4,
5376 total_tokens: 15,
5377 thinking_tokens: None,
5378 }),
5379 ttft_ms: None,
5380 tool_calls: Vec::new(),
5381 });
5382 }
5383 Err("unexpected prompt".to_string())
5384 }
5385 }
5386
5387 #[tokio::test]
5388 async fn runs_yaml_workflow_and_returns_step_timings() {
5389 let yaml = r#"
5390id: email-intake-classification
5391entry_node: classify_top_level
5392nodes:
5393 - id: classify_top_level
5394 node_type:
5395 llm_call:
5396 model: gpt-4.1
5397 config:
5398 prompt: |
5399 Classify this email into exactly one category:
5400 {{ input.email_text }}
5401 - id: route_top_level
5402 node_type:
5403 switch:
5404 branches:
5405 - condition: '$.nodes.classify_top_level.output.category == "termination"'
5406 target: classify_termination_subtype
5407 default: rag_clarification
5408 - id: classify_termination_subtype
5409 node_type:
5410 llm_call:
5411 model: gpt-4.1
5412 config:
5413 prompt: |
5414 Determine termination subtype:
5415 {{ input.email_text }}
5416 - id: route_termination_subtype
5417 node_type:
5418 switch:
5419 branches:
5420 - condition: '$.nodes.classify_termination_subtype.output.subtype == "repeated_offense"'
5421 target: rag_termination_repeated_offense
5422 default: rag_clarification
5423 - id: rag_termination_repeated_offense
5424 node_type:
5425 custom_worker:
5426 handler: GetRagData
5427 config:
5428 payload:
5429 topic: termination_repeated_offense
5430 - id: rag_clarification
5431 node_type:
5432 custom_worker:
5433 handler: GetRagData
5434 config:
5435 payload:
5436 topic: clarification
5437edges:
5438 - from: classify_top_level
5439 to: route_top_level
5440 - from: classify_termination_subtype
5441 to: route_termination_subtype
5442"#;
5443
5444 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5445 let output = run_email_workflow_yaml(&workflow, "test", &MockExecutor)
5446 .await
5447 .expect("yaml workflow should execute");
5448
5449 assert_eq!(output.workflow_id, "email-intake-classification");
5450 assert_eq!(output.terminal_node, "rag_termination_repeated_offense");
5451 assert!(!output.step_timings.is_empty());
5452 assert_eq!(output.step_timings.len(), output.trace.len());
5453 assert!(output
5454 .outputs
5455 .contains_key("rag_termination_repeated_offense"));
5456 assert_eq!(output.total_input_tokens, 22);
5457 assert_eq!(output.total_output_tokens, 11);
5458 assert_eq!(output.total_tokens, 33);
5459 }
5460
5461 #[tokio::test]
5462 async fn emits_resolved_llm_input_event_with_bindings() {
5463 let yaml = r#"
5464id: email-intake-classification
5465entry_node: classify_top_level
5466nodes:
5467 - id: classify_top_level
5468 node_type:
5469 llm_call:
5470 model: gpt-4.1
5471 config:
5472 prompt: |
5473 Classify this email into exactly one category:
5474 {{ input.email_text }}
5475"#;
5476
5477 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5478 let sink = RecordingSink {
5479 events: Mutex::new(Vec::new()),
5480 };
5481
5482 let output = run_email_workflow_yaml_with_custom_worker_and_events(
5483 &workflow,
5484 "Need help with termination",
5485 &MockExecutor,
5486 None,
5487 Some(&sink),
5488 )
5489 .await
5490 .expect("yaml workflow should execute");
5491
5492 assert_eq!(output.terminal_node, "classify_top_level");
5493
5494 let events = sink
5495 .events
5496 .lock()
5497 .expect("recording sink lock should not be poisoned");
5498 let llm_event = events
5499 .iter()
5500 .find(|event| event.event_type == "node_llm_input_resolved")
5501 .expect("expected llm input telemetry event");
5502
5503 let metadata = llm_event
5504 .metadata
5505 .as_ref()
5506 .expect("llm input event must include metadata");
5507 assert_eq!(metadata["model"], Value::String("gpt-4.1".to_string()));
5508 assert_eq!(metadata["stream_requested"], Value::Bool(false));
5509 assert_eq!(metadata["heal_requested"], Value::Bool(false));
5510 assert!(metadata["prompt"]
5511 .as_str()
5512 .expect("prompt should be a string")
5513 .contains("Need help with termination"));
5514
5515 let bindings = metadata["bindings"]
5516 .as_array()
5517 .expect("bindings should be an array");
5518 assert_eq!(bindings.len(), 1);
5519 assert_eq!(
5520 bindings[0]["source_path"],
5521 Value::String("input.email_text".to_string())
5522 );
5523 assert_eq!(
5524 bindings[0]["resolved"],
5525 Value::String("Need help with termination".to_string())
5526 );
5527 assert_eq!(bindings[0]["missing"], Value::Bool(false));
5528 assert_eq!(
5529 bindings[0]["resolved_type"],
5530 Value::String("string".to_string())
5531 );
5532 }
5533
5534 #[tokio::test]
5535 async fn workflow_completed_event_includes_nerdstats_by_default() {
5536 let yaml = r#"
5537id: nerdstats-default
5538entry_node: classify
5539nodes:
5540 - id: classify
5541 node_type:
5542 llm_call:
5543 model: gpt-4.1
5544 config:
5545 prompt: |
5546 Classify this email into exactly one category:
5547 {{ input.email_text }}
5548"#;
5549
5550 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5551 let sink = RecordingSink {
5552 events: Mutex::new(Vec::new()),
5553 };
5554
5555 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5556 &workflow,
5557 &json!({"email_text":"hello"}),
5558 &MockExecutor,
5559 None,
5560 Some(&sink),
5561 &YamlWorkflowRunOptions::default(),
5562 )
5563 .await
5564 .expect("workflow should execute");
5565
5566 let events = sink
5567 .events
5568 .lock()
5569 .expect("recording sink lock should not be poisoned");
5570 let completed = events
5571 .iter()
5572 .find(|event| event.event_type == "workflow_completed")
5573 .expect("expected workflow_completed event");
5574 let metadata = completed
5575 .metadata
5576 .as_ref()
5577 .expect("workflow_completed should include metadata by default");
5578 let nerdstats = metadata
5579 .get("nerdstats")
5580 .expect("nerdstats should be present by default");
5581
5582 assert_eq!(nerdstats["workflow_id"], Value::String(output.workflow_id));
5583 assert_eq!(
5584 nerdstats["terminal_node"],
5585 Value::String(output.terminal_node)
5586 );
5587 assert_eq!(
5588 nerdstats["total_tokens"],
5589 Value::Number(output.total_tokens.into())
5590 );
5591 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
5592 assert_eq!(
5593 nerdstats["token_metrics_source"],
5594 Value::String("provider_usage".to_string())
5595 );
5596 assert_eq!(nerdstats["ttft_ms"], Value::Null);
5597 }
5598
5599 #[tokio::test]
5600 async fn workflow_completed_event_omits_nerdstats_when_disabled() {
5601 let yaml = r#"
5602id: nerdstats-disabled
5603entry_node: classify
5604nodes:
5605 - id: classify
5606 node_type:
5607 llm_call:
5608 model: gpt-4.1
5609 config:
5610 prompt: |
5611 Classify this email into exactly one category:
5612 {{ input.email_text }}
5613"#;
5614
5615 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5616 let sink = RecordingSink {
5617 events: Mutex::new(Vec::new()),
5618 };
5619 let options = YamlWorkflowRunOptions {
5620 telemetry: YamlWorkflowTelemetryConfig {
5621 nerdstats: false,
5622 ..YamlWorkflowTelemetryConfig::default()
5623 },
5624 ..YamlWorkflowRunOptions::default()
5625 };
5626
5627 run_workflow_yaml_with_custom_worker_and_events_and_options(
5628 &workflow,
5629 &json!({"email_text":"hello"}),
5630 &MockExecutor,
5631 None,
5632 Some(&sink),
5633 &options,
5634 )
5635 .await
5636 .expect("workflow should execute");
5637
5638 let events = sink
5639 .events
5640 .lock()
5641 .expect("recording sink lock should not be poisoned");
5642 let completed = events
5643 .iter()
5644 .find(|event| event.event_type == "workflow_completed")
5645 .expect("expected workflow_completed event");
5646 assert!(completed.metadata.is_none());
5647 }
5648
5649 struct StreamAwareMockExecutor;
5650
5651 #[async_trait]
5652 impl YamlWorkflowLlmExecutor for StreamAwareMockExecutor {
5653 async fn complete_structured(
5654 &self,
5655 request: YamlLlmExecutionRequest,
5656 _event_sink: Option<&dyn YamlWorkflowEventSink>,
5657 ) -> Result<YamlLlmExecutionResult, String> {
5658 Ok(YamlLlmExecutionResult {
5659 payload: json!({"state":"ok"}),
5660 usage: Some(YamlLlmTokenUsage {
5661 prompt_tokens: 20,
5662 completion_tokens: 10,
5663 total_tokens: 30,
5664 thinking_tokens: None,
5665 }),
5666 ttft_ms: if request.stream { Some(12) } else { None },
5667 tool_calls: Vec::new(),
5668 })
5669 }
5670 }
5671
5672 #[tokio::test]
5673 async fn workflow_completed_event_includes_nerdstats_for_streaming_nodes() {
5674 let yaml = r#"
5675id: nerdstats-streaming
5676entry_node: classify
5677nodes:
5678 - id: classify
5679 node_type:
5680 llm_call:
5681 model: gpt-4.1
5682 stream: true
5683 config:
5684 prompt: |
5685 Return JSON only:
5686 {"state":"ok"}
5687"#;
5688
5689 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5690 let sink = RecordingSink {
5691 events: Mutex::new(Vec::new()),
5692 };
5693
5694 run_workflow_yaml_with_custom_worker_and_events_and_options(
5695 &workflow,
5696 &json!({"email_text":"hello"}),
5697 &StreamAwareMockExecutor,
5698 None,
5699 Some(&sink),
5700 &YamlWorkflowRunOptions::default(),
5701 )
5702 .await
5703 .expect("workflow should execute");
5704
5705 let events = sink
5706 .events
5707 .lock()
5708 .expect("recording sink lock should not be poisoned");
5709 let completed = events
5710 .iter()
5711 .find(|event| event.event_type == "workflow_completed")
5712 .expect("expected workflow_completed event");
5713 let metadata = completed
5714 .metadata
5715 .as_ref()
5716 .expect("workflow_completed should include metadata by default");
5717 let nerdstats = metadata
5718 .get("nerdstats")
5719 .expect("nerdstats should be present by default");
5720
5721 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
5722 assert_eq!(nerdstats["total_tokens"], Value::Number(30u64.into()));
5723 assert_eq!(nerdstats["ttft_ms"], Value::Number(12u64.into()));
5724 }
5725
5726 #[test]
5727 fn workflow_nerdstats_marks_stream_token_metrics_unavailable() {
5728 let output = YamlWorkflowRunOutput {
5729 workflow_id: "workflow".to_string(),
5730 entry_node: "start".to_string(),
5731 email_text: "hello".to_string(),
5732 trace: vec!["llm_node".to_string()],
5733 outputs: BTreeMap::new(),
5734 terminal_node: "llm_node".to_string(),
5735 terminal_output: None,
5736 step_timings: vec![YamlStepTiming {
5737 node_id: "llm_node".to_string(),
5738 node_kind: "llm_call".to_string(),
5739 elapsed_ms: 100,
5740 prompt_tokens: None,
5741 completion_tokens: None,
5742 total_tokens: None,
5743 thinking_tokens: None,
5744 tokens_per_second: None,
5745 }],
5746 llm_node_metrics: BTreeMap::new(),
5747 total_elapsed_ms: 100,
5748 ttft_ms: None,
5749 total_input_tokens: 0,
5750 total_output_tokens: 0,
5751 total_tokens: 0,
5752 total_thinking_tokens: None,
5753 tokens_per_second: 0.0,
5754 trace_id: Some("trace-1".to_string()),
5755 metadata: None,
5756 };
5757
5758 let nerdstats = workflow_nerdstats(&output);
5759 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(false));
5760 assert_eq!(
5761 nerdstats["token_metrics_source"],
5762 Value::String("provider_stream_usage_unavailable".to_string())
5763 );
5764 assert_eq!(nerdstats["total_tokens"], Value::Null);
5765 assert_eq!(nerdstats["ttft_ms"], Value::Null);
5766 }
5767
5768 #[test]
5769 fn workflow_nerdstats_includes_ttft_when_available() {
5770 let output = YamlWorkflowRunOutput {
5771 workflow_id: "workflow".to_string(),
5772 entry_node: "start".to_string(),
5773 email_text: "hello".to_string(),
5774 trace: vec!["llm_node".to_string()],
5775 outputs: BTreeMap::new(),
5776 terminal_node: "llm_node".to_string(),
5777 terminal_output: None,
5778 step_timings: vec![YamlStepTiming {
5779 node_id: "llm_node".to_string(),
5780 node_kind: "llm_call".to_string(),
5781 elapsed_ms: 100,
5782 prompt_tokens: Some(10),
5783 completion_tokens: Some(15),
5784 total_tokens: Some(25),
5785 thinking_tokens: None,
5786 tokens_per_second: Some(150.0),
5787 }],
5788 llm_node_metrics: BTreeMap::new(),
5789 total_elapsed_ms: 100,
5790 ttft_ms: Some(42),
5791 total_input_tokens: 10,
5792 total_output_tokens: 15,
5793 total_tokens: 25,
5794 total_thinking_tokens: None,
5795 tokens_per_second: 150.0,
5796 trace_id: Some("trace-2".to_string()),
5797 metadata: None,
5798 };
5799
5800 let nerdstats = workflow_nerdstats(&output);
5801 assert_eq!(nerdstats["ttft_ms"], Value::Number(42u64.into()));
5802 }
5803
5804 struct MessageHistoryExecutor;
5805
5806 #[async_trait]
5807 impl YamlWorkflowLlmExecutor for MessageHistoryExecutor {
5808 async fn complete_structured(
5809 &self,
5810 request: YamlLlmExecutionRequest,
5811 _event_sink: Option<&dyn YamlWorkflowEventSink>,
5812 ) -> Result<YamlLlmExecutionResult, String> {
5813 let messages = request
5814 .messages
5815 .ok_or_else(|| "expected messages in request".to_string())?;
5816 if messages.len() != 2 {
5817 return Err(format!("expected 2 messages, got {}", messages.len()));
5818 }
5819 Ok(YamlLlmExecutionResult {
5820 payload: json!({"category":"termination","reason":"history"}),
5821 usage: Some(YamlLlmTokenUsage {
5822 prompt_tokens: 7,
5823 completion_tokens: 3,
5824 total_tokens: 10,
5825 thinking_tokens: None,
5826 }),
5827 ttft_ms: None,
5828 tool_calls: Vec::new(),
5829 })
5830 }
5831 }
5832
5833 #[tokio::test]
5834 async fn supports_messages_path_in_workflow_input() {
5835 let yaml = r#"
5836id: email-intake-classification
5837entry_node: classify_top_level
5838nodes:
5839 - id: classify_top_level
5840 node_type:
5841 llm_call:
5842 model: gpt-4.1
5843 messages_path: input.messages
5844 append_prompt_as_user: false
5845"#;
5846
5847 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5848 let input = json!({
5849 "email_text": "ignored",
5850 "messages": [
5851 {"role": "system", "content": "You are a classifier"},
5852 {"role": "user", "content": "Please classify this"}
5853 ]
5854 });
5855
5856 let output = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
5857 .await
5858 .expect("workflow should use chat history from input");
5859
5860 assert_eq!(output.terminal_node, "classify_top_level");
5861 assert_eq!(
5862 output.outputs["classify_top_level"]["output"]["reason"],
5863 Value::String("history".to_string())
5864 );
5865 }
5866
5867 #[tokio::test]
5868 async fn wrapper_entrypoints_produce_equivalent_outputs() {
5869 let yaml = r#"
5870id: wrapper-equivalence
5871entry_node: classify
5872nodes:
5873 - id: classify
5874 node_type:
5875 llm_call:
5876 model: gpt-4.1
5877 config:
5878 prompt: |
5879 Classify this email into exactly one category:
5880 {{ input.email_text }}
5881"#;
5882
5883 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5884 let input = json!({"email_text":"hello"});
5885
5886 let a = run_workflow_yaml(&workflow, &input, &MockExecutor)
5887 .await
5888 .expect("base entrypoint should execute");
5889 let b = run_workflow_yaml_with_custom_worker(&workflow, &input, &MockExecutor, None)
5890 .await
5891 .expect("custom worker wrapper should execute");
5892 let c = run_workflow_yaml_with_custom_worker_and_events_and_options(
5893 &workflow,
5894 &input,
5895 &MockExecutor,
5896 None,
5897 None,
5898 &YamlWorkflowRunOptions::default(),
5899 )
5900 .await
5901 .expect("events/options wrapper should execute");
5902
5903 assert_eq!(a.workflow_id, b.workflow_id);
5904 assert_eq!(a.workflow_id, c.workflow_id);
5905 assert_eq!(a.terminal_node, b.terminal_node);
5906 assert_eq!(a.terminal_node, c.terminal_node);
5907 assert_eq!(a.outputs, b.outputs);
5908 assert_eq!(a.outputs, c.outputs);
5909 assert_eq!(a.total_tokens, b.total_tokens);
5910 assert_eq!(a.total_tokens, c.total_tokens);
5911 }
5912
5913 #[tokio::test]
5914 async fn yaml_llm_tool_calling_captures_traces_and_supports_globals_reference() {
5915 let yaml = r#"
5916id: tool-calling-workflow
5917entry_node: generate_with_tool
5918nodes:
5919 - id: generate_with_tool
5920 node_type:
5921 llm_call:
5922 model: gpt-4.1
5923 tools_format: simplified
5924 max_tool_roundtrips: 1
5925 tool_calls_global_key: audit
5926 tools:
5927 - name: get_customer_context
5928 description: Fetch customer context
5929 input_schema:
5930 type: object
5931 properties:
5932 order_id: { type: string }
5933 required: [order_id]
5934 additionalProperties: false
5935 output_schema:
5936 type: object
5937 properties:
5938 customer_name: { type: string }
5939 required: [customer_name]
5940 additionalProperties: false
5941 config:
5942 output_schema:
5943 type: object
5944 properties:
5945 state: { type: string }
5946 required: [state]
5947 - id: personalize
5948 node_type:
5949 llm_call:
5950 model: gpt-4.1
5951 config:
5952 prompt: |
5953 Write an email greeting for {{ globals.audit.0.output.customer_name }}.
5954 output_schema:
5955 type: object
5956 properties:
5957 subject: { type: string }
5958 body: { type: string }
5959 required: [subject, body]
5960edges:
5961 - from: generate_with_tool
5962 to: personalize
5963"#;
5964
5965 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5966 let client = SimpleAgentsClientBuilder::new()
5967 .with_provider(Arc::new(ToolLoopProvider))
5968 .build()
5969 .expect("client should build");
5970 let worker = FixedToolWorker {
5971 payload: json!({"customer_name": "Ava"}),
5972 };
5973
5974 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
5975 &workflow,
5976 &json!({"email_text":"hello"}),
5977 &client,
5978 Some(&worker),
5979 None,
5980 &YamlWorkflowRunOptions::default(),
5981 )
5982 .await
5983 .expect("workflow should execute");
5984
5985 assert_eq!(output.trace, vec!["generate_with_tool", "personalize"]);
5986 assert_eq!(
5987 output.outputs["generate_with_tool"]["tool_calls"][0]["output"]["customer_name"],
5988 Value::String("Ava".to_string())
5989 );
5990 let body = output.outputs["personalize"]["output"]["body"]
5991 .as_str()
5992 .expect("body should be string");
5993 assert!(body.contains("Ava"));
5994 }
5995
5996 #[tokio::test]
5997 async fn yaml_llm_tool_output_schema_mismatch_hard_fails_node() {
5998 let yaml = r#"
5999id: tool-calling-schema-fail
6000entry_node: generate_with_tool
6001nodes:
6002 - id: generate_with_tool
6003 node_type:
6004 llm_call:
6005 model: gpt-4.1
6006 tools_format: simplified
6007 max_tool_roundtrips: 1
6008 tools:
6009 - name: get_customer_context
6010 input_schema:
6011 type: object
6012 properties:
6013 order_id: { type: string }
6014 required: [order_id]
6015 additionalProperties: false
6016 output_schema:
6017 type: object
6018 properties:
6019 customer_name: { type: string }
6020 required: [customer_name]
6021 additionalProperties: false
6022 config:
6023 output_schema:
6024 type: object
6025 properties:
6026 state: { type: string }
6027 required: [state]
6028"#;
6029
6030 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6031 let client = SimpleAgentsClientBuilder::new()
6032 .with_provider(Arc::new(ToolLoopProvider))
6033 .build()
6034 .expect("client should build");
6035 let worker = FixedToolWorker {
6036 payload: json!({"unexpected": "shape"}),
6037 };
6038
6039 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6040 &workflow,
6041 &json!({"email_text":"hello"}),
6042 &client,
6043 Some(&worker),
6044 None,
6045 &YamlWorkflowRunOptions::default(),
6046 )
6047 .await
6048 .expect_err("workflow should hard-fail on schema mismatch");
6049
6050 match error {
6051 YamlWorkflowRunError::Llm { message, .. } => {
6052 assert!(message.contains("output failed schema validation"));
6053 }
6054 other => panic!("expected llm error, got {other:?}"),
6055 }
6056 }
6057
6058 #[tokio::test]
6059 async fn yaml_llm_unknown_tool_is_rejected_before_custom_worker_execution() {
6060 let yaml = r#"
6061id: unknown-tool-rejected
6062entry_node: generate_with_tool
6063nodes:
6064 - id: generate_with_tool
6065 node_type:
6066 llm_call:
6067 model: gpt-4.1
6068 tools_format: simplified
6069 max_tool_roundtrips: 1
6070 tools:
6071 - name: get_customer_context
6072 input_schema:
6073 type: object
6074 properties:
6075 order_id: { type: string }
6076 required: [order_id]
6077 config:
6078 output_schema:
6079 type: object
6080 properties:
6081 state: { type: string }
6082 required: [state]
6083"#;
6084
6085 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6086 let client = SimpleAgentsClientBuilder::new()
6087 .with_provider(Arc::new(UnknownToolProvider))
6088 .build()
6089 .expect("client should build");
6090 let worker = CountingToolWorker {
6091 execute_calls: AtomicUsize::new(0),
6092 };
6093
6094 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6095 &workflow,
6096 &json!({"email_text":"hello"}),
6097 &client,
6098 Some(&worker),
6099 None,
6100 &YamlWorkflowRunOptions::default(),
6101 )
6102 .await
6103 .expect_err("workflow should reject unknown tool before executing worker");
6104
6105 match error {
6106 YamlWorkflowRunError::Llm { message, .. } => {
6107 assert!(message.contains("model requested unknown tool 'unknown_tool'"));
6108 }
6109 other => panic!("expected llm error, got {other:?}"),
6110 }
6111
6112 assert_eq!(worker.execute_calls.load(Ordering::SeqCst), 0);
6113 }
6114
6115 #[test]
6116 fn validates_tools_format_mismatch() {
6117 let yaml = r#"
6118id: mismatch
6119entry_node: generate
6120nodes:
6121 - id: generate
6122 node_type:
6123 llm_call:
6124 model: gpt-4.1
6125 tools_format: openai
6126 tools:
6127 - name: get_customer_context
6128 input_schema:
6129 type: object
6130 properties:
6131 order_id: { type: string }
6132 required: [order_id]
6133"#;
6134
6135 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6136 let diagnostics = verify_yaml_workflow(&workflow);
6137 assert!(diagnostics
6138 .iter()
6139 .any(|diagnostic| diagnostic.code == "invalid_tools_format"));
6140 }
6141
6142 #[test]
6143 fn mock_custom_worker_supports_get_employee_record() {
6144 let result = mock_custom_worker_output(
6145 "get_employee_record",
6146 &json!({"employee_name": "Alex Johnson"}),
6147 )
6148 .expect("mock tool should resolve employee record");
6149
6150 assert_eq!(result["employee_id"], Value::String("EMP-2041".to_string()));
6151 assert_eq!(result["location"], Value::String("Austin".to_string()));
6152 }
6153
6154 #[tokio::test]
6155 async fn custom_worker_receives_trace_context_block() {
6156 let yaml = r#"
6157id: custom-worker-trace-context
6158entry_node: lookup
6159nodes:
6160 - id: lookup
6161 node_type:
6162 custom_worker:
6163 handler: GetRagData
6164 config:
6165 payload:
6166 topic: demo
6167"#;
6168
6169 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6170 let worker = CapturingWorker {
6171 context: Mutex::new(None),
6172 };
6173 let options = YamlWorkflowRunOptions {
6174 trace: YamlWorkflowTraceOptions {
6175 context: Some(YamlWorkflowTraceContextInput {
6176 trace_id: Some("trace-fixed-ctx".to_string()),
6177 traceparent: Some("00-trace-fixed-ctx-span-fixed-01".to_string()),
6178 ..YamlWorkflowTraceContextInput::default()
6179 }),
6180 tenant: YamlWorkflowTraceTenantContext {
6181 conversation_id: Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5".to_string()),
6182 request_id: Some("turn-7".to_string()),
6183 ..YamlWorkflowTraceTenantContext::default()
6184 },
6185 },
6186 ..YamlWorkflowRunOptions::default()
6187 };
6188
6189 run_workflow_yaml_with_custom_worker_and_events_and_options(
6190 &workflow,
6191 &json!({"email_text":"hello"}),
6192 &MockExecutor,
6193 Some(&worker),
6194 None,
6195 &options,
6196 )
6197 .await
6198 .expect("workflow should execute");
6199
6200 let captured_context = worker
6201 .context
6202 .lock()
6203 .expect("capturing worker lock should not be poisoned")
6204 .clone()
6205 .expect("custom worker should receive context");
6206
6207 assert_eq!(
6208 captured_context
6209 .get("trace")
6210 .and_then(|trace| trace.get("context"))
6211 .and_then(|context| context.get("trace_id"))
6212 .and_then(Value::as_str),
6213 Some("trace-fixed-ctx")
6214 );
6215 assert_eq!(
6216 captured_context
6217 .get("trace")
6218 .and_then(|trace| trace.get("context"))
6219 .and_then(|context| context.get("traceparent"))
6220 .and_then(Value::as_str),
6221 Some("00-trace-fixed-ctx-span-fixed-01")
6222 );
6223 assert_eq!(
6224 captured_context
6225 .get("trace")
6226 .and_then(|trace| trace.get("tenant"))
6227 .and_then(|tenant| tenant.get("conversation_id"))
6228 .and_then(Value::as_str),
6229 Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5")
6230 );
6231 }
6232
6233 #[tokio::test]
6234 async fn event_sink_cancellation_stops_workflow_before_llm_execution() {
6235 let yaml = r#"
6236id: cancellation-test
6237entry_node: classify
6238nodes:
6239 - id: classify
6240 node_type:
6241 llm_call:
6242 model: gpt-4.1
6243 config:
6244 prompt: |
6245 Classify this email into exactly one category:
6246 {{ input.email_text }}
6247"#;
6248
6249 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6250 let executor = CountingExecutor {
6251 call_count: AtomicUsize::new(0),
6252 };
6253 let sink = CancelAfterFirstEventSink {
6254 cancelled: AtomicBool::new(false),
6255 };
6256
6257 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6258 &workflow,
6259 &json!({"email_text":"hello"}),
6260 &executor,
6261 None,
6262 Some(&sink),
6263 &YamlWorkflowRunOptions::default(),
6264 )
6265 .await
6266 .expect_err("workflow should stop when sink signals cancellation");
6267
6268 assert!(matches!(
6269 err,
6270 YamlWorkflowRunError::EventSinkCancelled { .. }
6271 ));
6272 assert_eq!(executor.call_count.load(Ordering::SeqCst), 0);
6273 }
6274
6275 #[tokio::test]
6276 async fn rejects_invalid_messages_path_shape() {
6277 let yaml = r#"
6278id: email-intake-classification
6279entry_node: classify_top_level
6280nodes:
6281 - id: classify_top_level
6282 node_type:
6283 llm_call:
6284 model: gpt-4.1
6285 messages_path: input.messages
6286"#;
6287
6288 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6289 let input = json!({
6290 "email_text": "ignored",
6291 "messages": "not-a-list"
6292 });
6293
6294 let err = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
6295 .await
6296 .expect_err("workflow should fail for invalid messages shape");
6297
6298 assert!(matches!(err, YamlWorkflowRunError::Llm { .. }));
6299 }
6300
6301 #[test]
6302 fn renders_yaml_workflow_to_mermaid_with_switch_labels() {
6303 let yaml = r#"
6304id: chat-workflow
6305entry_node: decide
6306nodes:
6307 - id: decide
6308 node_type:
6309 switch:
6310 branches:
6311 - condition: '$.input.mode == "draft"'
6312 target: draft
6313 default: ask
6314 - id: draft
6315 node_type:
6316 llm_call:
6317 model: gpt-4.1
6318 - id: ask
6319 node_type:
6320 llm_call:
6321 model: gpt-4.1
6322edges:
6323 - from: draft
6324 to: ask
6325"#;
6326
6327 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6328 let mermaid = yaml_workflow_to_mermaid(&workflow);
6329
6330 assert!(mermaid.contains("flowchart TD"));
6331 assert!(mermaid.contains("decide -- \"route1\" --> draft"));
6332 assert!(mermaid.contains("decide -- \"default\" --> ask"));
6333 assert!(mermaid.contains("draft --> ask"));
6334 }
6335
6336 #[test]
6337 fn renders_yaml_workflow_tools_as_colored_tool_nodes() {
6338 let yaml = r#"
6339id: tool-graph
6340entry_node: chat
6341nodes:
6342 - id: chat
6343 node_type:
6344 llm_call:
6345 model: gemini-3-flash
6346 tools_format: simplified
6347 tools:
6348 - name: run_workflow_graph
6349 input_schema:
6350 type: object
6351 properties:
6352 workflow_id: { type: string }
6353 required: [workflow_id]
6354edges: []
6355"#;
6356
6357 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6358 let mermaid = yaml_workflow_to_mermaid(&workflow);
6359
6360 assert!(mermaid.contains("chat__tool_0"));
6361 assert!(mermaid.contains("chat -.-> chat__tool_0"));
6362 assert!(mermaid.contains("classDef toolNode"));
6363 assert!(mermaid.contains("class chat__tool_0 toolNode;"));
6364 }
6365
6366 #[test]
6367 fn renders_yaml_workflow_file_to_mermaid_with_subgraph_cluster_when_present() {
6368 let base_dir = std::env::temp_dir().join(format!(
6369 "simple_agents_mermaid_subgraph_{}",
6370 std::time::SystemTime::now()
6371 .duration_since(std::time::UNIX_EPOCH)
6372 .expect("unix epoch")
6373 .as_nanos()
6374 ));
6375 fs::create_dir_all(&base_dir).expect("temp dir should be created");
6376
6377 let orchestrator_path = base_dir.join("email-chat-orchestrator-with-subgraph-tool.yaml");
6378 let subgraph_path = base_dir.join("hr-warning-email-subgraph.yaml");
6379
6380 let orchestrator_yaml = r#"
6381id: email-chat-orchestrator-with-subgraph-tool
6382entry_node: respond_casual
6383nodes:
6384 - id: respond_casual
6385 node_type:
6386 llm_call:
6387 model: gemini-3-flash
6388 tools_format: simplified
6389 tools:
6390 - name: run_workflow_graph
6391 input_schema:
6392 type: object
6393 properties:
6394 workflow_id: { type: string }
6395 required: [workflow_id]
6396 config:
6397 prompt: |
6398 Call with:
6399 {
6400 "workflow_id": "hr_warning_email_subgraph"
6401 }
6402edges: []
6403"#;
6404
6405 let subgraph_yaml = r#"
6406id: hr-warning-email-subgraph
6407entry_node: draft_hr_warning_email
6408nodes:
6409 - id: draft_hr_warning_email
6410 node_type:
6411 llm_call:
6412 model: gemini-3-flash
6413edges: []
6414"#;
6415
6416 fs::write(&orchestrator_path, orchestrator_yaml).expect("orchestrator yaml written");
6417 fs::write(&subgraph_path, subgraph_yaml).expect("subgraph yaml written");
6418
6419 let mermaid =
6420 yaml_workflow_file_to_mermaid(&orchestrator_path).expect("mermaid should render");
6421
6422 assert!(mermaid.contains("Main: email-chat-orchestrator-with-subgraph-tool"));
6423 assert!(mermaid.contains("Subgraph: hr_warning_email_subgraph"));
6424 assert!(mermaid.contains("calls hr_warning_email_subgraph"));
6425 assert!(mermaid.contains("subgraph_1__draft_hr_warning_email"));
6426
6427 fs::remove_dir_all(base_dir).expect("temp dir removed");
6428 }
6429
6430 #[test]
6431 fn converts_yaml_workflow_to_ir_definition() {
6432 let yaml = r#"
6433id: chat-workflow
6434entry_node: classify
6435nodes:
6436 - id: classify
6437 node_type:
6438 llm_call:
6439 model: gpt-4.1
6440 config:
6441 prompt: |
6442 classify
6443 - id: route
6444 node_type:
6445 switch:
6446 branches:
6447 - condition: '$.nodes.classify.output.kind == "x"'
6448 target: done
6449 default: done
6450 - id: done
6451 node_type:
6452 custom_worker:
6453 handler: GetRagData
6454 config:
6455 payload:
6456 topic: test
6457edges:
6458 - from: classify
6459 to: route
6460"#;
6461
6462 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6463 let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
6464
6465 assert_eq!(ir.name, "chat-workflow");
6466 assert!(ir.nodes.iter().any(|n| n.id == "__yaml_start"));
6467 assert!(ir.nodes.iter().any(|n| n.id == "classify"));
6468 assert!(ir.nodes.iter().any(|n| n.id == "route"));
6469 assert!(ir.nodes.iter().any(|n| n.id == "done"));
6470 }
6471
6472 #[test]
6473 fn supports_yaml_to_ir_when_messages_path_is_used() {
6474 let yaml = r#"
6475id: chat-workflow
6476entry_node: classify
6477nodes:
6478 - id: classify
6479 node_type:
6480 llm_call:
6481 model: gpt-4.1
6482 messages_path: input.messages
6483"#;
6484
6485 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6486 let ir =
6487 yaml_workflow_to_ir(&workflow).expect("messages_path should convert to tool-based IR");
6488 assert!(ir.nodes.iter().any(|node| matches!(
6489 node.kind,
6490 crate::ir::NodeKind::Tool { ref tool, .. } if tool == "__yaml_llm_call"
6491 )));
6492 }
6493
6494 #[tokio::test]
6495 async fn workflow_output_contains_trace_id_in_both_locations() {
6496 let yaml = r#"
6497id: trace-test
6498entry_node: classify
6499nodes:
6500 - id: classify
6501 node_type:
6502 llm_call:
6503 model: gpt-4.1
6504 config:
6505 prompt: |
6506 Classify this email into exactly one category:
6507 {{ input.email_text }}
6508"#;
6509
6510 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6511 let output = run_workflow_yaml(&workflow, &json!({"email_text":"hello"}), &MockExecutor)
6512 .await
6513 .expect("workflow should execute");
6514
6515 let trace_id = output
6516 .trace_id
6517 .as_deref()
6518 .expect("trace_id should be present");
6519 assert!(!trace_id.is_empty());
6520 assert_eq!(
6521 output.metadata.as_ref().and_then(|value| {
6522 value
6523 .get("telemetry")
6524 .and_then(|telemetry| telemetry.get("trace_id"))
6525 .and_then(Value::as_str)
6526 }),
6527 Some(trace_id)
6528 );
6529 }
6530
6531 #[tokio::test]
6532 async fn workflow_run_options_sample_rate_zero_marks_trace_unsampled() {
6533 let yaml = r#"
6534id: sample-rate-zero
6535entry_node: classify
6536nodes:
6537 - id: classify
6538 node_type:
6539 llm_call:
6540 model: gpt-4.1
6541 config:
6542 prompt: |
6543 Classify this email into exactly one category:
6544 {{ input.email_text }}
6545"#;
6546
6547 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6548 let options = YamlWorkflowRunOptions {
6549 telemetry: YamlWorkflowTelemetryConfig {
6550 sample_rate: 0.0,
6551 ..YamlWorkflowTelemetryConfig::default()
6552 },
6553 trace: YamlWorkflowTraceOptions {
6554 context: Some(YamlWorkflowTraceContextInput {
6555 trace_id: Some("trace-sample-zero".to_string()),
6556 ..YamlWorkflowTraceContextInput::default()
6557 }),
6558 tenant: YamlWorkflowTraceTenantContext::default(),
6559 },
6560 model: None,
6561 };
6562
6563 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6564 &workflow,
6565 &json!({"email_text":"hello"}),
6566 &MockExecutor,
6567 None,
6568 None,
6569 &options,
6570 )
6571 .await
6572 .expect("workflow should execute");
6573
6574 assert_eq!(output.trace_id.as_deref(), Some("trace-sample-zero"));
6575 assert_eq!(
6576 output
6577 .metadata
6578 .as_ref()
6579 .and_then(|value| value.get("telemetry"))
6580 .and_then(|telemetry| telemetry.get("sampled"))
6581 .and_then(Value::as_bool),
6582 Some(false)
6583 );
6584 }
6585
6586 #[test]
6587 fn trace_id_from_traceparent_parses_w3c_header() {
6588 let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
6589 assert_eq!(
6590 trace_id_from_traceparent(traceparent).as_deref(),
6591 Some("4bf92f3577b34da6a3ce929d0e0e4736")
6592 );
6593 }
6594
6595 #[test]
6596 fn resolve_telemetry_context_marks_traceparent_source() {
6597 let mut options = YamlWorkflowRunOptions::default();
6598 options.trace.context = Some(YamlWorkflowTraceContextInput {
6599 traceparent: Some(
6600 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
6601 ),
6602 ..YamlWorkflowTraceContextInput::default()
6603 });
6604
6605 let context = resolve_telemetry_context(&options, None);
6606
6607 assert_eq!(context.trace_id_source, TraceIdSource::Traceparent);
6608 assert_eq!(
6609 context.trace_id.as_deref(),
6610 Some("4bf92f3577b34da6a3ce929d0e0e4736")
6611 );
6612 }
6613
6614 #[tokio::test]
6615 async fn workflow_run_options_derives_trace_id_from_traceparent_when_trace_id_missing() {
6616 let yaml = r#"
6617id: traceparent-derived
6618entry_node: classify
6619nodes:
6620 - id: classify
6621 node_type:
6622 llm_call:
6623 model: gpt-4.1
6624 config:
6625 prompt: |
6626 Classify this email into exactly one category:
6627 {{ input.email_text }}
6628"#;
6629
6630 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6631 let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
6632 let options = YamlWorkflowRunOptions {
6633 telemetry: YamlWorkflowTelemetryConfig {
6634 sample_rate: 0.0,
6635 ..YamlWorkflowTelemetryConfig::default()
6636 },
6637 trace: YamlWorkflowTraceOptions {
6638 context: Some(YamlWorkflowTraceContextInput {
6639 traceparent: Some(traceparent.to_string()),
6640 ..YamlWorkflowTraceContextInput::default()
6641 }),
6642 tenant: YamlWorkflowTraceTenantContext::default(),
6643 },
6644 model: None,
6645 };
6646
6647 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6648 &workflow,
6649 &json!({"email_text":"hello"}),
6650 &MockExecutor,
6651 None,
6652 None,
6653 &options,
6654 )
6655 .await
6656 .expect("workflow should execute");
6657
6658 assert_eq!(
6659 output.trace_id.as_deref(),
6660 Some("4bf92f3577b34da6a3ce929d0e0e4736")
6661 );
6662 assert_eq!(
6663 output
6664 .metadata
6665 .as_ref()
6666 .and_then(|value| value.get("telemetry"))
6667 .and_then(|telemetry| telemetry.get("sampled"))
6668 .and_then(Value::as_bool),
6669 Some(false)
6670 );
6671 }
6672
6673 #[tokio::test]
6674 async fn workflow_run_options_sample_rate_one_marks_trace_sampled() {
6675 let yaml = r#"
6676id: sample-rate-one
6677entry_node: classify
6678nodes:
6679 - id: classify
6680 node_type:
6681 llm_call:
6682 model: gpt-4.1
6683 config:
6684 prompt: |
6685 Classify this email into exactly one category:
6686 {{ input.email_text }}
6687"#;
6688
6689 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6690 let options = YamlWorkflowRunOptions {
6691 telemetry: YamlWorkflowTelemetryConfig {
6692 sample_rate: 1.0,
6693 ..YamlWorkflowTelemetryConfig::default()
6694 },
6695 trace: YamlWorkflowTraceOptions {
6696 context: Some(YamlWorkflowTraceContextInput {
6697 trace_id: Some("trace-sample-one".to_string()),
6698 ..YamlWorkflowTraceContextInput::default()
6699 }),
6700 tenant: YamlWorkflowTraceTenantContext::default(),
6701 },
6702 model: None,
6703 };
6704
6705 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6706 &workflow,
6707 &json!({"email_text":"hello"}),
6708 &MockExecutor,
6709 None,
6710 None,
6711 &options,
6712 )
6713 .await
6714 .expect("workflow should execute");
6715
6716 assert_eq!(output.trace_id.as_deref(), Some("trace-sample-one"));
6717 assert_eq!(
6718 output
6719 .metadata
6720 .as_ref()
6721 .and_then(|value| value.get("telemetry"))
6722 .and_then(|telemetry| telemetry.get("sampled"))
6723 .and_then(Value::as_bool),
6724 Some(true)
6725 );
6726 }
6727
6728 #[tokio::test]
6729 async fn workflow_run_options_sampling_is_deterministic_for_fixed_trace_id() {
6730 let yaml = r#"
6731id: sample-rate-deterministic
6732entry_node: classify
6733nodes:
6734 - id: classify
6735 node_type:
6736 llm_call:
6737 model: gpt-4.1
6738 config:
6739 prompt: |
6740 Classify this email into exactly one category:
6741 {{ input.email_text }}
6742"#;
6743
6744 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6745 let options = YamlWorkflowRunOptions {
6746 telemetry: YamlWorkflowTelemetryConfig {
6747 sample_rate: 0.5,
6748 ..YamlWorkflowTelemetryConfig::default()
6749 },
6750 trace: YamlWorkflowTraceOptions {
6751 context: Some(YamlWorkflowTraceContextInput {
6752 trace_id: Some("trace-sample-deterministic".to_string()),
6753 ..YamlWorkflowTraceContextInput::default()
6754 }),
6755 tenant: YamlWorkflowTraceTenantContext::default(),
6756 },
6757 model: None,
6758 };
6759
6760 let mut sampled_values = Vec::new();
6761 for _ in 0..3 {
6762 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6763 &workflow,
6764 &json!({"email_text":"hello"}),
6765 &MockExecutor,
6766 None,
6767 None,
6768 &options,
6769 )
6770 .await
6771 .expect("workflow should execute");
6772
6773 let sampled = output
6774 .metadata
6775 .as_ref()
6776 .and_then(|value| value.get("telemetry"))
6777 .and_then(|telemetry| telemetry.get("sampled"))
6778 .and_then(Value::as_bool)
6779 .expect("sampled flag should be present");
6780 sampled_values.push(sampled);
6781 }
6782
6783 assert!(sampled_values
6784 .iter()
6785 .all(|value| *value == sampled_values[0]));
6786 }
6787
6788 #[tokio::test]
6789 async fn workflow_run_options_reject_invalid_sample_rate() {
6790 let yaml = r#"
6791id: sample-rate-invalid
6792entry_node: classify
6793nodes:
6794 - id: classify
6795 node_type:
6796 llm_call:
6797 model: gpt-4.1
6798 config:
6799 prompt: |
6800 Classify this email into exactly one category:
6801 {{ input.email_text }}
6802"#;
6803
6804 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6805 let options = YamlWorkflowRunOptions {
6806 telemetry: YamlWorkflowTelemetryConfig {
6807 sample_rate: 1.1,
6808 ..YamlWorkflowTelemetryConfig::default()
6809 },
6810 ..YamlWorkflowRunOptions::default()
6811 };
6812
6813 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6814 &workflow,
6815 &json!({"email_text":"hello"}),
6816 &MockExecutor,
6817 None,
6818 None,
6819 &options,
6820 )
6821 .await
6822 .expect_err("invalid sample_rate should fail");
6823
6824 match err {
6825 YamlWorkflowRunError::InvalidInput { message } => {
6826 assert!(message.contains("telemetry.sample_rate"));
6827 }
6828 _ => panic!("expected invalid input error for sample_rate"),
6829 }
6830 }
6831
6832 #[tokio::test]
6833 async fn workflow_run_options_reject_nan_sample_rate() {
6834 let yaml = r#"
6835id: sample-rate-invalid
6836entry_node: classify
6837nodes:
6838 - id: classify
6839 node_type:
6840 llm_call:
6841 model: gpt-4.1
6842 config:
6843 prompt: |
6844 Classify this email into exactly one category:
6845 {{ input.email_text }}
6846"#;
6847
6848 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6849 let options = YamlWorkflowRunOptions {
6850 telemetry: YamlWorkflowTelemetryConfig {
6851 sample_rate: f32::NAN,
6852 ..YamlWorkflowTelemetryConfig::default()
6853 },
6854 ..YamlWorkflowRunOptions::default()
6855 };
6856
6857 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6858 &workflow,
6859 &json!({"email_text":"hello"}),
6860 &MockExecutor,
6861 None,
6862 None,
6863 &options,
6864 )
6865 .await
6866 .expect_err("nan sample_rate should fail");
6867
6868 assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
6869 }
6870
6871 #[test]
6872 fn subworkflow_options_inherit_parent_telemetry_configuration() {
6873 let mut parent_options = YamlWorkflowRunOptions::default();
6874 parent_options.telemetry.sample_rate = 0.0;
6875 parent_options.telemetry.payload_mode = YamlWorkflowPayloadMode::RedactedPayload;
6876 parent_options.telemetry.tool_trace_mode = YamlToolTraceMode::Redacted;
6877 parent_options.trace.tenant.conversation_id = Some("conv-123".to_string());
6878
6879 let parent_context = TraceContext {
6880 trace_id: Some("trace-parent".to_string()),
6881 span_id: Some("span-parent".to_string()),
6882 parent_span_id: None,
6883 traceparent: Some("00-trace-parent-span-parent-01".to_string()),
6884 tracestate: None,
6885 baggage: BTreeMap::new(),
6886 };
6887
6888 let subworkflow_options =
6889 build_subworkflow_options(&parent_options, Some(&parent_context), None);
6890
6891 assert_eq!(subworkflow_options.telemetry.sample_rate, 0.0);
6892 assert_eq!(
6893 subworkflow_options.telemetry.payload_mode,
6894 YamlWorkflowPayloadMode::RedactedPayload
6895 );
6896 assert_eq!(
6897 subworkflow_options.telemetry.tool_trace_mode,
6898 YamlToolTraceMode::Redacted
6899 );
6900 assert_eq!(
6901 subworkflow_options.trace.tenant.conversation_id.as_deref(),
6902 Some("conv-123")
6903 );
6904 assert_eq!(
6905 subworkflow_options
6906 .trace
6907 .context
6908 .as_ref()
6909 .and_then(|ctx| ctx.trace_id.as_deref()),
6910 Some("trace-parent")
6911 );
6912 }
6913
6914 #[tokio::test]
6915 async fn workflow_run_options_use_explicit_trace_id_and_payload_mode() {
6916 let yaml = r#"
6917id: trace-options-test
6918entry_node: classify
6919nodes:
6920 - id: classify
6921 node_type:
6922 llm_call:
6923 model: gpt-4.1
6924 config:
6925 prompt: |
6926 Classify this email into exactly one category:
6927 {{ input.email_text }}
6928"#;
6929
6930 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6931 let options = YamlWorkflowRunOptions {
6932 telemetry: YamlWorkflowTelemetryConfig {
6933 payload_mode: YamlWorkflowPayloadMode::RedactedPayload,
6934 ..YamlWorkflowTelemetryConfig::default()
6935 },
6936 trace: YamlWorkflowTraceOptions {
6937 context: Some(YamlWorkflowTraceContextInput {
6938 trace_id: Some("trace-fixed-123".to_string()),
6939 traceparent: Some("00-trace-fixed-123-span-1-01".to_string()),
6940 ..YamlWorkflowTraceContextInput::default()
6941 }),
6942 tenant: YamlWorkflowTraceTenantContext {
6943 conversation_id: Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42".to_string()),
6944 ..YamlWorkflowTraceTenantContext::default()
6945 },
6946 },
6947 model: None,
6948 };
6949
6950 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6951 &workflow,
6952 &json!({"email_text":"hello"}),
6953 &MockExecutor,
6954 None,
6955 None,
6956 &options,
6957 )
6958 .await
6959 .expect("workflow should execute");
6960
6961 assert_eq!(output.trace_id.as_deref(), Some("trace-fixed-123"));
6962 assert_eq!(
6963 output
6964 .metadata
6965 .as_ref()
6966 .and_then(|value| value.get("telemetry"))
6967 .and_then(|telemetry| telemetry.get("payload_mode"))
6968 .and_then(Value::as_str),
6969 Some("redacted_payload")
6970 );
6971 assert_eq!(
6972 output
6973 .metadata
6974 .as_ref()
6975 .and_then(|value| value.get("trace"))
6976 .and_then(|trace| trace.get("tenant"))
6977 .and_then(|tenant| tenant.get("conversation_id"))
6978 .and_then(Value::as_str),
6979 Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42")
6980 );
6981 }
6982
6983 #[test]
6984 fn streamed_payload_parser_extracts_last_json_object() {
6985 let raw = r#"{"state":"missing_scenario","reason":"ok"}
6986
6987extra reasoning text
6988
6989{"state":"ready","reason":"final"}"#;
6990
6991 let resolved = parse_streamed_structured_payload(raw, false)
6992 .expect("parser should extract final JSON object");
6993 assert_eq!(resolved.payload["state"], "ready");
6994 assert!(resolved.heal_confidence.is_none());
6995 }
6996
6997 #[test]
6998 fn streamed_payload_parser_handles_unbalanced_reasoning_before_json() {
6999 let raw = "reasoning text with unmatched { braces and thoughts\n{\"state\":\"ready\",\"reason\":\"final\"}";
7000
7001 let resolved = parse_streamed_structured_payload(raw, false)
7002 .expect("parser should recover final structured JSON object");
7003 assert_eq!(resolved.payload["state"], "ready");
7004 }
7005
7006 #[test]
7007 fn streamed_payload_parser_handles_markdown_with_heal() {
7008 let raw = r#"Some preface
7009```json
7010{
7011 "state": "missing_scenario",
7012 "reason": "Need more details"
7013}
7014```
7015Some trailing explanation"#;
7016
7017 let resolved = parse_streamed_structured_payload(raw, true)
7018 .expect("heal path should parse JSON block");
7019 assert_eq!(resolved.payload["state"], "missing_scenario");
7020 assert!(resolved.heal_confidence.is_some());
7021 }
7022
7023 #[test]
7024 fn streamed_payload_parser_errors_when_no_json_candidate_exists() {
7025 let raw = "No JSON in this streamed output";
7026 let error = parse_streamed_structured_payload(raw, false)
7027 .expect_err("strict stream parse should fail without JSON candidate");
7028 assert!(error.contains("no JSON object candidate found"));
7029 }
7030
7031 #[test]
7032 fn include_raw_stream_debug_events_defaults_to_false() {
7033 let _guard = stream_debug_env_lock()
7034 .lock()
7035 .expect("stream debug env lock should not be poisoned");
7036 std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7037 assert!(!include_raw_stream_debug_events());
7038 }
7039
7040 #[test]
7041 fn include_raw_stream_debug_events_accepts_truthy_values() {
7042 let _guard = stream_debug_env_lock()
7043 .lock()
7044 .expect("stream debug env lock should not be poisoned");
7045 std::env::set_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW", "true");
7046 assert!(include_raw_stream_debug_events());
7047 std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7048 }
7049
7050 #[test]
7051 fn structured_json_delta_filter_strips_reasoning_prefix_and_suffix() {
7052 let mut filter = StructuredJsonDeltaFilter::default();
7053 let chunks = vec![
7054 "I will think first... ",
7055 "{\"state\":\"missing_scenario\",",
7056 "\"reason\":\"Need more details\"}",
7057 " additional commentary",
7058 ];
7059
7060 let filtered = chunks
7061 .into_iter()
7062 .filter_map(|chunk| filter.split(chunk).0)
7063 .collect::<String>();
7064
7065 assert_eq!(
7066 filtered,
7067 "{\"state\":\"missing_scenario\",\"reason\":\"Need more details\"}"
7068 );
7069 }
7070
7071 #[test]
7072 fn structured_json_delta_filter_handles_braces_inside_strings() {
7073 let mut filter = StructuredJsonDeltaFilter::default();
7074 let chunks = vec![
7075 "preface ",
7076 "{\"reason\":\"brace } in text\",\"state\":\"ok\"}",
7077 " trailing",
7078 ];
7079
7080 let filtered = chunks
7081 .into_iter()
7082 .filter_map(|chunk| filter.split(chunk).0)
7083 .collect::<String>();
7084
7085 assert_eq!(
7086 filtered,
7087 "{\"reason\":\"brace } in text\",\"state\":\"ok\"}"
7088 );
7089 }
7090
7091 #[test]
7092 fn render_json_object_as_text_converts_top_level_fields() {
7093 let rendered =
7094 render_json_object_as_text(r#"{"question":"q","confidence":0.8,"nested":{"a":1}}"#);
7095 let lines: std::collections::HashSet<&str> = rendered.lines().collect();
7096
7097 assert_eq!(lines.len(), 3);
7098 assert!(lines.contains("question: q"));
7099 assert!(lines.contains("confidence: 0.8"));
7100 assert!(lines.contains("nested: {\"a\":1}"));
7101 }
7102
7103 #[test]
7104 fn stream_json_as_text_formatter_emits_once_when_complete() {
7105 let mut formatter = StreamJsonAsTextFormatter::default();
7106 formatter.push("{\"question\":\"hello\"}");
7107
7108 let first = formatter.emit_if_ready(true);
7109 let second = formatter.emit_if_ready(true);
7110
7111 assert_eq!(first, Some("question: hello".to_string()));
7112 assert_eq!(second, None);
7113 }
7114
7115 #[test]
7116 fn rewrite_yaml_condition_preserves_output_prefix_in_field_names() {
7117 let expr = "$.nodes.classify.output.output_total == 1";
7118 let rewritten = rewrite_yaml_condition_to_ir(expr);
7119 assert_eq!(rewritten, "$.node_outputs.classify.output_total == 1");
7120 }
7121
7122 #[tokio::test]
7123 async fn validates_workflow_input_before_ir_runtime_path() {
7124 let yaml = r#"
7125id: chat-workflow
7126entry_node: classify
7127nodes:
7128 - id: classify
7129 node_type:
7130 llm_call:
7131 model: gpt-4.1
7132 config:
7133 prompt: |
7134 classify
7135"#;
7136
7137 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7138 let err = run_workflow_yaml(&workflow, &json!("not-an-object"), &MockExecutor)
7139 .await
7140 .expect_err("non-object input should fail before execution");
7141
7142 assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
7143 }
7144
7145 #[test]
7146 fn interpolate_template_supports_dollar_prefixed_paths() {
7147 let context = json!({
7148 "input": {
7149 "email_text": "hello"
7150 }
7151 });
7152
7153 let rendered = interpolate_template("value={{ $.input.email_text }}", &context);
7154 assert_eq!(rendered, "value=hello");
7155 }
7156}