1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::Path;
3use std::time::Instant;
4
5use async_trait::async_trait;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use simple_agent_type::message::{Message, Role};
10use simple_agent_type::request::CompletionRequest;
11use simple_agent_type::response::FinishReason;
12use simple_agent_type::tool::{
13 ToolCall, ToolChoice, ToolChoiceFunction, ToolChoiceMode, ToolChoiceTool, ToolDefinition,
14 ToolFunction, ToolType,
15};
16use simple_agents_core::{
17 CompletionMode, CompletionOptions, CompletionOutcome, SimpleAgentsClient,
18};
19use simple_agents_healing::JsonishParser;
20use thiserror::Error;
21
22use crate::ir::{Node, NodeKind, RouterRoute, WorkflowDefinition, WORKFLOW_IR_V0};
23use crate::observability::tracing::{
24 flush_workflow_tracer, workflow_tracer, SpanKind, TraceContext, WorkflowSpan,
25};
26use crate::runtime::{
27 LlmExecutionError, LlmExecutionInput, LlmExecutionOutput, LlmExecutor, ToolExecutionError,
28 ToolExecutionInput, ToolExecutor, WorkflowRuntime, WorkflowRuntimeError,
29 WorkflowRuntimeOptions,
30};
31use crate::visualize::workflow_to_mermaid;
32
33const YAML_START_NODE_ID: &str = "__yaml_start";
34const YAML_LLM_TOOL_ID: &str = "__yaml_llm_call";
35
36static TRACE_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
37
38#[derive(Debug, Clone, PartialEq, Serialize)]
39pub struct YamlStepTiming {
40 pub node_id: String,
41 pub node_kind: String,
42 pub elapsed_ms: u128,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 pub prompt_tokens: Option<u32>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub completion_tokens: Option<u32>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub total_tokens: Option<u32>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub thinking_tokens: Option<u32>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub tokens_per_second: Option<f64>,
53}
54
55#[derive(Debug, Clone, PartialEq, Serialize)]
56pub struct YamlLlmNodeMetrics {
57 pub elapsed_ms: u128,
58 pub prompt_tokens: u32,
59 pub completion_tokens: u32,
60 pub total_tokens: u32,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub thinking_tokens: Option<u32>,
63 pub tokens_per_second: f64,
64}
65
66#[derive(Debug, Clone, PartialEq, Serialize)]
67pub struct YamlWorkflowRunOutput {
68 pub workflow_id: String,
69 pub entry_node: String,
70 pub email_text: String,
71 pub trace: Vec<String>,
72 pub outputs: BTreeMap<String, Value>,
73 pub terminal_node: String,
74 pub terminal_output: Option<Value>,
75 pub step_timings: Vec<YamlStepTiming>,
76 pub llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics>,
77 pub llm_node_models: BTreeMap<String, String>,
78 pub total_elapsed_ms: u128,
79 #[serde(skip_serializing_if = "Option::is_none")]
80 pub ttft_ms: Option<u128>,
81 pub total_input_tokens: u64,
82 pub total_output_tokens: u64,
83 pub total_tokens: u64,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub total_thinking_tokens: Option<u64>,
86 pub tokens_per_second: f64,
87 #[serde(skip_serializing_if = "Option::is_none")]
88 pub trace_id: Option<String>,
89 #[serde(skip_serializing_if = "Option::is_none")]
90 pub metadata: Option<Value>,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
94#[serde(rename_all = "snake_case")]
95pub enum YamlWorkflowPayloadMode {
96 #[default]
97 FullPayload,
98 RedactedPayload,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
102#[serde(rename_all = "snake_case")]
103pub enum YamlToolTraceMode {
104 #[default]
105 Full,
106 Redacted,
107 Off,
108}
109
110#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
111pub struct YamlWorkflowTraceContextInput {
112 #[serde(default)]
113 pub trace_id: Option<String>,
114 #[serde(default)]
115 pub span_id: Option<String>,
116 #[serde(default)]
117 pub parent_span_id: Option<String>,
118 #[serde(default)]
119 pub traceparent: Option<String>,
120 #[serde(default)]
121 pub tracestate: Option<String>,
122 #[serde(default)]
123 pub baggage: BTreeMap<String, String>,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
127pub struct YamlWorkflowTraceTenantContext {
128 #[serde(default)]
129 pub workspace_id: Option<String>,
130 #[serde(default)]
131 pub user_id: Option<String>,
132 #[serde(default)]
133 pub conversation_id: Option<String>,
134 #[serde(default)]
135 pub request_id: Option<String>,
136 #[serde(default)]
137 pub run_id: Option<String>,
138}
139
140#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
141pub struct YamlWorkflowTelemetryConfig {
142 #[serde(default = "default_true")]
143 pub enabled: bool,
144 #[serde(default = "default_true")]
145 pub nerdstats: bool,
146 #[serde(default = "default_sample_rate")]
147 pub sample_rate: f32,
148 #[serde(default)]
149 pub payload_mode: YamlWorkflowPayloadMode,
150 #[serde(default = "default_retention_days")]
151 pub retention_days: u32,
152 #[serde(default = "default_true")]
153 pub multi_tenant: bool,
154 #[serde(default)]
155 pub tool_trace_mode: YamlToolTraceMode,
156}
157
158impl Default for YamlWorkflowTelemetryConfig {
159 fn default() -> Self {
160 Self {
161 enabled: true,
162 nerdstats: true,
163 sample_rate: 1.0,
164 payload_mode: YamlWorkflowPayloadMode::FullPayload,
165 retention_days: 30,
166 multi_tenant: true,
167 tool_trace_mode: YamlToolTraceMode::Full,
168 }
169 }
170}
171
172#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
173pub struct YamlWorkflowTraceOptions {
174 #[serde(default)]
175 pub context: Option<YamlWorkflowTraceContextInput>,
176 #[serde(default)]
177 pub tenant: YamlWorkflowTraceTenantContext,
178}
179
180#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
181pub struct YamlWorkflowRunOptions {
182 #[serde(default)]
183 pub telemetry: YamlWorkflowTelemetryConfig,
184 #[serde(default)]
185 pub trace: YamlWorkflowTraceOptions,
186 #[serde(default)]
187 pub model: Option<String>,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
191pub struct YamlLlmTokenUsage {
192 pub prompt_tokens: u32,
193 pub completion_tokens: u32,
194 pub total_tokens: u32,
195 pub thinking_tokens: Option<u32>,
196}
197
198#[derive(Debug, Clone, PartialEq, Serialize)]
199pub struct YamlLlmExecutionResult {
200 pub payload: Value,
201 pub usage: Option<YamlLlmTokenUsage>,
202 pub ttft_ms: Option<u128>,
203 pub tool_calls: Vec<YamlToolCallTrace>,
204}
205
206#[derive(Debug, Clone, PartialEq, Serialize)]
207pub struct YamlToolCallTrace {
208 pub id: String,
209 pub name: String,
210 pub arguments: Value,
211 pub output: Option<Value>,
212 pub status: String,
213 pub elapsed_ms: u128,
214 pub error: Option<String>,
215}
216
217#[derive(Debug, Clone, Default)]
218struct YamlTokenTotals {
219 input_tokens: u64,
220 output_tokens: u64,
221 total_tokens: u64,
222 thinking_tokens: Option<u64>,
223}
224
225impl YamlTokenTotals {
226 fn add_usage(&mut self, usage: &YamlLlmTokenUsage) {
227 self.input_tokens += u64::from(usage.prompt_tokens);
228 self.output_tokens += u64::from(usage.completion_tokens);
229 self.total_tokens += u64::from(usage.total_tokens);
230
231 if let Some(thinking_tokens) = usage.thinking_tokens {
232 let next = self.thinking_tokens.unwrap_or(0) + u64::from(thinking_tokens);
233 self.thinking_tokens = Some(next);
234 }
235 }
236
237 fn tokens_per_second(&self, elapsed_ms: u128) -> f64 {
238 if elapsed_ms == 0 {
239 return 0.0;
240 }
241 round_two_decimals((self.output_tokens as f64) * 1000.0 / (elapsed_ms as f64))
242 }
243}
244
245fn round_two_decimals(value: f64) -> f64 {
246 (value * 100.0).round() / 100.0
247}
248
249fn completion_tokens_per_second(completion_tokens: u32, elapsed_ms: u128) -> f64 {
250 if elapsed_ms == 0 {
251 return 0.0;
252 }
253 round_two_decimals((completion_tokens as f64) * 1000.0 / (elapsed_ms as f64))
254}
255
256fn resolve_requested_model(run_model_override: Option<&str>, node_model: &str) -> String {
257 run_model_override
258 .and_then(|model| {
259 let trimmed = model.trim();
260 if trimmed.is_empty() {
261 None
262 } else {
263 Some(trimmed.to_string())
264 }
265 })
266 .unwrap_or_else(|| node_model.to_string())
267}
268
269fn default_true() -> bool {
270 true
271}
272
273fn default_sample_rate() -> f32 {
274 1.0
275}
276
277fn default_retention_days() -> u32 {
278 30
279}
280
281fn validate_sample_rate(sample_rate: f32) -> Result<(), YamlWorkflowRunError> {
282 if sample_rate.is_finite() && (0.0..=1.0).contains(&sample_rate) {
283 return Ok(());
284 }
285
286 Err(YamlWorkflowRunError::InvalidInput {
287 message: format!(
288 "telemetry.sample_rate must be between 0.0 and 1.0 inclusive; received {sample_rate}"
289 ),
290 })
291}
292
293fn should_sample_trace(trace_id: &str, sample_rate: f32) -> bool {
294 if sample_rate >= 1.0 {
295 return true;
296 }
297 if sample_rate <= 0.0 {
298 return false;
299 }
300
301 let mut hash: u64 = 0xcbf29ce484222325;
302 for byte in trace_id.as_bytes() {
303 hash ^= u64::from(*byte);
304 hash = hash.wrapping_mul(0x100000001b3);
305 }
306
307 let ratio = (hash as f64) / (u64::MAX as f64);
308 ratio < (sample_rate as f64)
309}
310
311fn trace_id_from_traceparent(traceparent: &str) -> Option<String> {
312 let mut parts = traceparent.split('-');
313 let version = parts.next()?;
314 let trace_id = parts.next()?;
315 let _span_id = parts.next()?;
316 let _flags = parts.next()?;
317 if parts.next().is_some() {
318 return None;
319 }
320
321 if version.len() != 2 || trace_id.len() != 32 {
322 return None;
323 }
324
325 if !version.chars().all(|ch| ch.is_ascii_hexdigit()) {
326 return None;
327 }
328 if !trace_id.chars().all(|ch| ch.is_ascii_hexdigit()) {
329 return None;
330 }
331 if trace_id.chars().all(|ch| ch == '0') {
332 return None;
333 }
334
335 Some(trace_id.to_ascii_lowercase())
336}
337
338#[derive(Debug, Clone, Copy, PartialEq, Eq)]
339enum TraceIdSource {
340 Disabled,
341 ExplicitTraceId,
342 ParentTraceId,
343 Traceparent,
344 ParentTraceparent,
345 Generated,
346}
347
348#[derive(Debug, Clone)]
349struct ResolvedTelemetryContext {
350 trace_id: Option<String>,
351 sampled: bool,
352 trace_id_source: TraceIdSource,
353}
354
355fn resolve_run_trace_id_with_source(
356 options: &YamlWorkflowRunOptions,
357 parent_trace_context: Option<&TraceContext>,
358) -> (Option<String>, TraceIdSource) {
359 if !options.telemetry.enabled {
360 return (None, TraceIdSource::Disabled);
361 }
362
363 if let Some(trace_id) = options
364 .trace
365 .context
366 .as_ref()
367 .and_then(|context| context.trace_id.clone())
368 {
369 return (Some(trace_id), TraceIdSource::ExplicitTraceId);
370 }
371
372 if let Some(trace_id) = parent_trace_context.and_then(|context| context.trace_id.clone()) {
373 return (Some(trace_id), TraceIdSource::ParentTraceId);
374 }
375
376 if let Some(trace_id) = options
377 .trace
378 .context
379 .as_ref()
380 .and_then(|context| context.traceparent.as_deref())
381 .and_then(trace_id_from_traceparent)
382 {
383 return (Some(trace_id), TraceIdSource::Traceparent);
384 }
385
386 if let Some(trace_id) = parent_trace_context
387 .and_then(|context| context.traceparent.as_deref())
388 .and_then(trace_id_from_traceparent)
389 {
390 return (Some(trace_id), TraceIdSource::ParentTraceparent);
391 }
392
393 (Some(generate_trace_id()), TraceIdSource::Generated)
394}
395
396fn resolve_telemetry_context(
397 options: &YamlWorkflowRunOptions,
398 parent_trace_context: Option<&TraceContext>,
399) -> ResolvedTelemetryContext {
400 let (trace_id, trace_id_source) =
401 resolve_run_trace_id_with_source(options, parent_trace_context);
402 let sampled = trace_id
403 .as_deref()
404 .map(|value| should_sample_trace(value, options.telemetry.sample_rate))
405 .unwrap_or(false);
406
407 ResolvedTelemetryContext {
408 trace_id,
409 sampled,
410 trace_id_source,
411 }
412}
413
414fn generate_trace_id() -> String {
415 use std::time::{SystemTime, UNIX_EPOCH};
416
417 let now_nanos = SystemTime::now()
418 .duration_since(UNIX_EPOCH)
419 .map(|duration| duration.as_nanos())
420 .unwrap_or(0);
421 let sequence = u128::from(TRACE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
422 format!("{:032x}", now_nanos ^ sequence)
423}
424
425fn workflow_metadata_with_trace(
426 options: &YamlWorkflowRunOptions,
427 trace_id: &str,
428 sampled: bool,
429 trace_id_source: TraceIdSource,
430) -> Value {
431 json!({
432 "telemetry": {
433 "trace_id": trace_id,
434 "trace_id_source": match trace_id_source {
435 TraceIdSource::Disabled => "disabled",
436 TraceIdSource::ExplicitTraceId => "explicit_trace_id",
437 TraceIdSource::ParentTraceId => "parent_trace_id",
438 TraceIdSource::Traceparent => "traceparent",
439 TraceIdSource::ParentTraceparent => "parent_traceparent",
440 TraceIdSource::Generated => "generated",
441 },
442 "enabled": options.telemetry.enabled,
443 "sampled": sampled,
444 "nerdstats": options.telemetry.nerdstats,
445 "sample_rate": options.telemetry.sample_rate,
446 "payload_mode": match options.telemetry.payload_mode {
447 YamlWorkflowPayloadMode::FullPayload => "full_payload",
448 YamlWorkflowPayloadMode::RedactedPayload => "redacted_payload",
449 },
450 "retention_days": options.telemetry.retention_days,
451 "multi_tenant": options.telemetry.multi_tenant,
452 "tool_trace_mode": match options.telemetry.tool_trace_mode {
453 YamlToolTraceMode::Full => "full",
454 YamlToolTraceMode::Redacted => "redacted",
455 YamlToolTraceMode::Off => "off",
456 },
457 },
458 "trace": {
459 "tenant": {
460 "workspace_id": options.trace.tenant.workspace_id,
461 "user_id": options.trace.tenant.user_id,
462 "conversation_id": options.trace.tenant.conversation_id,
463 "request_id": options.trace.tenant.request_id,
464 "run_id": options.trace.tenant.run_id,
465 }
466 },
467 })
468}
469
470fn apply_trace_tenant_attributes(span: &mut dyn WorkflowSpan, options: &YamlWorkflowRunOptions) {
471 if let Some(workspace_id) = options.trace.tenant.workspace_id.as_deref() {
472 span.set_attribute("tenant.workspace_id", workspace_id);
473 }
474 if let Some(user_id) = options.trace.tenant.user_id.as_deref() {
475 span.set_attribute("tenant.user_id", user_id);
476 }
477 if let Some(conversation_id) = options.trace.tenant.conversation_id.as_deref() {
478 span.set_attribute("tenant.conversation_id", conversation_id);
479 }
480 if let Some(request_id) = options.trace.tenant.request_id.as_deref() {
481 span.set_attribute("tenant.request_id", request_id);
482 }
483 if let Some(run_id) = options.trace.tenant.run_id.as_deref() {
484 span.set_attribute("tenant.run_id", run_id);
485 }
486}
487
488fn workflow_nerdstats(output: &YamlWorkflowRunOutput) -> Value {
489 let llm_nodes_without_usage: Vec<String> = output
490 .step_timings
491 .iter()
492 .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
493 .map(|step| step.node_id.clone())
494 .collect();
495 let token_metrics_available = llm_nodes_without_usage.is_empty();
496 let token_metrics_source = if token_metrics_available {
497 "provider_usage"
498 } else {
499 "provider_stream_usage_unavailable"
500 };
501 let total_input_tokens = if token_metrics_available {
502 json!(output.total_input_tokens)
503 } else {
504 Value::Null
505 };
506 let total_output_tokens = if token_metrics_available {
507 json!(output.total_output_tokens)
508 } else {
509 Value::Null
510 };
511 let total_tokens = if token_metrics_available {
512 json!(output.total_tokens)
513 } else {
514 Value::Null
515 };
516 let total_thinking_tokens = if token_metrics_available {
517 json!(output.total_thinking_tokens)
518 } else {
519 Value::Null
520 };
521 let tokens_per_second = if token_metrics_available {
522 json!(output.tokens_per_second)
523 } else {
524 Value::Null
525 };
526
527 json!({
528 "workflow_id": output.workflow_id,
529 "terminal_node": output.terminal_node,
530 "total_elapsed_ms": output.total_elapsed_ms,
531 "ttft_ms": output.ttft_ms,
532 "step_details": output.step_timings,
533 "llm_node_models": output.llm_node_models,
534 "total_input_tokens": total_input_tokens,
535 "total_output_tokens": total_output_tokens,
536 "total_tokens": total_tokens,
537 "total_thinking_tokens": total_thinking_tokens,
538 "tokens_per_second": tokens_per_second,
539 "trace_id": output.trace_id,
540 "token_metrics_available": token_metrics_available,
541 "token_metrics_source": token_metrics_source,
542 "llm_nodes_without_usage": llm_nodes_without_usage,
543 })
544}
545
546fn payload_for_span(mode: YamlWorkflowPayloadMode, payload: &Value) -> String {
547 match mode {
548 YamlWorkflowPayloadMode::FullPayload => payload.to_string(),
549 YamlWorkflowPayloadMode::RedactedPayload => json!({
550 "redacted": true,
551 "value_type": match payload {
552 Value::Null => "null",
553 Value::Bool(_) => "bool",
554 Value::Number(_) => "number",
555 Value::String(_) => "string",
556 Value::Array(_) => "array",
557 Value::Object(_) => "object",
558 }
559 })
560 .to_string(),
561 }
562}
563
564fn payload_for_tool_trace(mode: YamlToolTraceMode, payload: &Value) -> Value {
565 match mode {
566 YamlToolTraceMode::Full => payload.clone(),
567 YamlToolTraceMode::Redacted => json!({
568 "redacted": true,
569 "value_type": json_type_name(payload),
570 }),
571 YamlToolTraceMode::Off => Value::Null,
572 }
573}
574
575fn validate_json_schema(schema: &Value) -> Result<(), String> {
576 jsonschema::JSONSchema::compile(schema)
577 .map(|_| ())
578 .map_err(|error| format!("invalid JSON schema: {error}"))
579}
580
581fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
582 let validator = jsonschema::JSONSchema::compile(schema)
583 .map_err(|error| format!("invalid JSON schema: {error}"))?;
584 if let Err(errors) = validator.validate(instance) {
585 let message = errors
586 .into_iter()
587 .next()
588 .map(|error| error.to_string())
589 .unwrap_or_else(|| "unknown schema validation error".to_string());
590 return Err(format!("schema validation failed: {message}"));
591 }
592 Ok(())
593}
594
595fn schema_type(schema: &Value) -> Option<&str> {
596 schema.get("type").and_then(Value::as_str)
597}
598
599fn schema_expects_object(schema: &Value) -> bool {
600 schema_type(schema) == Some("object")
601}
602
603fn trace_context_from_options(options: &YamlWorkflowRunOptions) -> Option<TraceContext> {
604 options.trace.context.as_ref().map(|input| TraceContext {
605 trace_id: input.trace_id.clone(),
606 span_id: input.span_id.clone(),
607 parent_span_id: input.parent_span_id.clone(),
608 traceparent: input.traceparent.clone(),
609 tracestate: input.tracestate.clone(),
610 baggage: input.baggage.clone(),
611 })
612}
613
614fn merged_trace_context_for_worker(
615 span_context: Option<&TraceContext>,
616 resolved_trace_id: Option<&str>,
617 options: &YamlWorkflowRunOptions,
618) -> TraceContext {
619 let input_context = options.trace.context.as_ref();
620 let baggage = if let Some(context) = span_context {
621 if !context.baggage.is_empty() {
622 context.baggage.clone()
623 } else {
624 input_context
625 .map(|value| value.baggage.clone())
626 .unwrap_or_default()
627 }
628 } else {
629 input_context
630 .map(|value| value.baggage.clone())
631 .unwrap_or_default()
632 };
633
634 TraceContext {
635 trace_id: span_context
636 .and_then(|context| context.trace_id.clone())
637 .or_else(|| resolved_trace_id.map(|value| value.to_string()))
638 .or_else(|| input_context.and_then(|context| context.trace_id.clone())),
639 span_id: span_context
640 .and_then(|context| context.span_id.clone())
641 .or_else(|| input_context.and_then(|context| context.span_id.clone())),
642 parent_span_id: span_context
643 .and_then(|context| context.parent_span_id.clone())
644 .or_else(|| input_context.and_then(|context| context.parent_span_id.clone())),
645 traceparent: span_context
646 .and_then(|context| context.traceparent.clone())
647 .or_else(|| input_context.and_then(|context| context.traceparent.clone())),
648 tracestate: span_context
649 .and_then(|context| context.tracestate.clone())
650 .or_else(|| input_context.and_then(|context| context.tracestate.clone())),
651 baggage,
652 }
653}
654
655fn custom_worker_context_with_trace(
656 context: &Value,
657 trace_context: &TraceContext,
658 tenant_context: &YamlWorkflowTraceTenantContext,
659) -> Value {
660 let mut context_with_trace = context.clone();
661 let Some(root) = context_with_trace.as_object_mut() else {
662 return context_with_trace;
663 };
664
665 root.insert(
666 "trace".to_string(),
667 json!({
668 "context": {
669 "trace_id": trace_context.trace_id,
670 "span_id": trace_context.span_id,
671 "parent_span_id": trace_context.parent_span_id,
672 "traceparent": trace_context.traceparent,
673 "tracestate": trace_context.tracestate,
674 "baggage": trace_context.baggage,
675 },
676 "tenant": {
677 "workspace_id": tenant_context.workspace_id,
678 "user_id": tenant_context.user_id,
679 "conversation_id": tenant_context.conversation_id,
680 "request_id": tenant_context.request_id,
681 "run_id": tenant_context.run_id,
682 }
683 }),
684 );
685
686 context_with_trace
687}
688
689fn include_raw_stream_debug_events() -> bool {
690 match std::env::var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW") {
691 Ok(value) => {
692 let normalized = value.trim().to_ascii_lowercase();
693 normalized == "1" || normalized == "true" || normalized == "yes" || normalized == "on"
694 }
695 Err(_) => false,
696 }
697}
698
699#[derive(Debug)]
700struct StreamedPayloadResolution {
701 payload: Value,
702 heal_confidence: Option<f32>,
703}
704
705#[derive(Debug, Default)]
706struct StreamJsonAsTextFormatter {
707 raw_json: String,
708 emitted: bool,
709}
710
711impl StreamJsonAsTextFormatter {
712 fn push(&mut self, chunk: &str) {
713 self.raw_json.push_str(chunk);
714 }
715
716 fn emit_if_ready(&mut self, complete: bool) -> Option<String> {
717 if self.emitted || !complete {
718 return None;
719 }
720 self.emitted = true;
721 Some(render_json_object_as_text(self.raw_json.as_str()))
722 }
723}
724
725fn render_json_object_as_text(raw_json: &str) -> String {
726 let value = match serde_json::from_str::<Value>(raw_json) {
727 Ok(value) => value,
728 Err(_) => return raw_json.to_string(),
729 };
730 let Some(object) = value.as_object() else {
731 return raw_json.to_string();
732 };
733
734 let mut lines = Vec::with_capacity(object.len());
735 for (key, value) in object {
736 let rendered = match value {
737 Value::String(text) => text.clone(),
738 _ => value.to_string(),
739 };
740 lines.push(format!("{key}: {rendered}"));
741 }
742 lines.join("\n")
743}
744
745#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
746#[serde(rename_all = "snake_case")]
747pub enum YamlWorkflowTokenKind {
748 Output,
749 Thinking,
750}
751
752#[derive(Debug, Default)]
753struct StructuredJsonDeltaFilter {
754 started: bool,
755 completed: bool,
756 depth: u32,
757 in_string: bool,
758 escape: bool,
759}
760
761impl StructuredJsonDeltaFilter {
762 fn split(&mut self, delta: &str) -> (Option<String>, Option<String>) {
763 if delta.is_empty() {
764 return (None, None);
765 }
766
767 let mut output = String::new();
768 let mut thinking = String::new();
769
770 for ch in delta.chars() {
771 if self.completed {
772 thinking.push(ch);
773 continue;
774 }
775
776 if !self.started {
777 if ch != '{' {
778 thinking.push(ch);
779 continue;
780 }
781 self.started = true;
782 self.depth = 1;
783 output.push(ch);
784 continue;
785 }
786
787 output.push(ch);
788 if self.in_string {
789 if self.escape {
790 self.escape = false;
791 continue;
792 }
793 if ch == '\\' {
794 self.escape = true;
795 continue;
796 }
797 if ch == '"' {
798 self.in_string = false;
799 }
800 continue;
801 }
802
803 match ch {
804 '"' => self.in_string = true,
805 '{' => self.depth = self.depth.saturating_add(1),
806 '}' => {
807 if self.depth > 0 {
808 self.depth -= 1;
809 }
810 if self.depth == 0 {
811 self.completed = true;
812 }
813 }
814 _ => {}
815 }
816 }
817
818 let output = if output.is_empty() {
819 None
820 } else {
821 Some(output)
822 };
823 let thinking = if thinking.is_empty() {
824 None
825 } else {
826 Some(thinking)
827 };
828
829 (output, thinking)
830 }
831
832 fn completed(&self) -> bool {
833 self.completed
834 }
835}
836
837fn extract_last_fenced_json_block(raw: &str) -> Option<&str> {
838 let start = raw.rfind("```json")?;
839 let remainder = &raw[start + "```json".len()..];
840 let end = remainder.find("```")?;
841 let candidate = remainder[..end].trim();
842 if candidate.is_empty() {
843 return None;
844 }
845 Some(candidate)
846}
847
848fn extract_balanced_object_from(raw: &str, start_index: usize) -> Option<&str> {
849 let mut depth = 0u32;
850 let mut in_string = false;
851 let mut escape = false;
852
853 for (relative_index, ch) in raw[start_index..].char_indices() {
854 if in_string {
855 if escape {
856 escape = false;
857 continue;
858 }
859 if ch == '\\' {
860 escape = true;
861 continue;
862 }
863 if ch == '"' {
864 in_string = false;
865 }
866 continue;
867 }
868
869 match ch {
870 '"' => in_string = true,
871 '{' => depth = depth.saturating_add(1),
872 '}' => {
873 if depth == 0 {
874 return None;
875 }
876 depth -= 1;
877 if depth == 0 {
878 let end_index = start_index + relative_index + ch.len_utf8();
879 return Some(raw[start_index..end_index].trim());
880 }
881 }
882 _ => {}
883 }
884 }
885
886 None
887}
888
889fn extract_last_parsable_object(raw: &str) -> Option<&str> {
890 let starts: Vec<usize> = raw
891 .char_indices()
892 .filter_map(|(index, ch)| if ch == '{' { Some(index) } else { None })
893 .collect();
894
895 for start in starts.into_iter().rev() {
896 let Some(candidate) = extract_balanced_object_from(raw, start) else {
897 continue;
898 };
899 if serde_json::from_str::<Value>(candidate).is_ok() {
900 return Some(candidate);
901 }
902 }
903
904 None
905}
906
907fn resolve_structured_json_candidate(raw: &str) -> Option<&str> {
908 extract_last_fenced_json_block(raw).or_else(|| extract_last_parsable_object(raw))
909}
910
911fn parse_streamed_structured_payload(
912 raw: &str,
913 heal: bool,
914) -> Result<StreamedPayloadResolution, String> {
915 if !heal {
916 if let Ok(payload) = serde_json::from_str::<Value>(raw) {
917 return Ok(StreamedPayloadResolution {
918 payload,
919 heal_confidence: None,
920 });
921 }
922
923 let candidate = resolve_structured_json_candidate(raw).ok_or_else(|| {
924 "failed to parse streamed structured completion JSON: no JSON object candidate found"
925 .to_string()
926 })?;
927 let payload = serde_json::from_str::<Value>(candidate).map_err(|error| {
928 format!(
929 "failed to parse streamed structured completion JSON: {error}; candidate={candidate}"
930 )
931 })?;
932 return Ok(StreamedPayloadResolution {
933 payload,
934 heal_confidence: None,
935 });
936 }
937
938 let candidate = resolve_structured_json_candidate(raw).unwrap_or(raw);
939 let parser = JsonishParser::new();
940 let healed = parser
941 .parse(candidate)
942 .map_err(|error| format!("failed to heal streamed structured completion JSON: {error}"))?;
943
944 Ok(StreamedPayloadResolution {
945 payload: healed.value,
946 heal_confidence: Some(healed.confidence),
947 })
948}
949
950#[derive(Debug, Clone, PartialEq, Serialize)]
951pub struct YamlWorkflowEvent {
952 pub event_type: String,
953 pub node_id: Option<String>,
954 pub step_id: Option<String>,
955 pub node_kind: Option<String>,
956 pub streamable: Option<bool>,
957 pub message: Option<String>,
958 pub delta: Option<String>,
959 pub token_kind: Option<YamlWorkflowTokenKind>,
960 pub is_terminal_node_token: Option<bool>,
961 pub elapsed_ms: Option<u128>,
962 pub metadata: Option<Value>,
963}
964
965pub type WorkflowMessageRole = Role;
966
967#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
968pub struct WorkflowMessage {
969 pub role: WorkflowMessageRole,
970 pub content: String,
971 #[serde(default)]
972 pub name: Option<String>,
973 #[serde(default, alias = "toolCallId")]
974 pub tool_call_id: Option<String>,
975}
976
977#[derive(Debug, Clone, PartialEq, Serialize)]
978pub struct YamlTemplateBinding {
979 pub index: usize,
980 pub expression: String,
981 pub source_path: String,
982 pub resolved: Value,
983 pub resolved_type: String,
984 pub missing: bool,
985}
986
987#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
988pub enum YamlWorkflowDiagnosticSeverity {
989 Error,
990 Warning,
991}
992
993#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
994pub struct YamlWorkflowDiagnostic {
995 pub node_id: Option<String>,
996 pub code: String,
997 pub severity: YamlWorkflowDiagnosticSeverity,
998 pub message: String,
999}
1000
1001#[derive(Debug, Error)]
1002pub enum YamlWorkflowRunError {
1003 #[error("failed to read workflow yaml '{path}': {source}")]
1004 Read {
1005 path: String,
1006 source: std::io::Error,
1007 },
1008 #[error("failed to parse workflow yaml '{path}': {source}")]
1009 Parse {
1010 path: String,
1011 source: serde_yaml::Error,
1012 },
1013 #[error("workflow '{workflow_id}' has no nodes")]
1014 EmptyNodes { workflow_id: String },
1015 #[error("entry node '{entry_node}' does not exist")]
1016 MissingEntry { entry_node: String },
1017 #[error("unknown node id '{node_id}'")]
1018 MissingNode { node_id: String },
1019 #[error("unsupported node type in '{node_id}'")]
1020 UnsupportedNodeType { node_id: String },
1021 #[error("unsupported switch condition format: {condition}")]
1022 UnsupportedCondition { condition: String },
1023 #[error("switch node '{node_id}' has no valid next target")]
1024 InvalidSwitchTarget { node_id: String },
1025 #[error("llm returned non-object payload for node '{node_id}'")]
1026 LlmPayloadNotObject { node_id: String },
1027 #[error("custom worker handler '{handler}' is not supported")]
1028 UnsupportedCustomHandler { handler: String },
1029 #[error("llm execution failed for node '{node_id}': {message}")]
1030 Llm { node_id: String, message: String },
1031 #[error("custom worker execution failed for node '{node_id}': {message}")]
1032 CustomWorker { node_id: String, message: String },
1033 #[error("workflow validation failed with {diagnostics_count} error(s)")]
1034 Validation {
1035 diagnostics_count: usize,
1036 diagnostics: Vec<YamlWorkflowDiagnostic>,
1037 },
1038 #[error("invalid workflow input: {message}")]
1039 InvalidInput { message: String },
1040 #[error("ir runtime execution failed: {message}")]
1041 IrRuntime { message: String },
1042 #[error("workflow event stream cancelled: {message}")]
1043 EventSinkCancelled { message: String },
1044}
1045
1046pub trait YamlWorkflowEventSink: Send + Sync {
1047 fn emit(&self, event: &YamlWorkflowEvent);
1048
1049 fn is_cancelled(&self) -> bool {
1050 false
1051 }
1052}
1053
1054pub struct NoopYamlWorkflowEventSink;
1055
1056impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
1057 fn emit(&self, _event: &YamlWorkflowEvent) {}
1058}
1059
1060fn workflow_event_sink_cancelled_message() -> &'static str {
1061 "workflow event callback cancelled"
1062}
1063
1064fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
1065 event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
1066}
1067
1068#[derive(Debug, Clone, PartialEq, Eq, Error)]
1069pub enum YamlToIrError {
1070 #[error("entry node '{entry_node}' does not exist")]
1071 MissingEntry { entry_node: String },
1072 #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
1073 MultipleOutgoingEdge { node_id: String },
1074 #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
1075 UnsupportedNode { node_id: String, reason: String },
1076}
1077
1078pub fn yaml_workflow_to_mermaid(workflow: &YamlWorkflow) -> String {
1080 if let Ok(ir) = yaml_workflow_to_ir(workflow) {
1081 return workflow_to_mermaid(&ir);
1082 }
1083
1084 yaml_workflow_to_mermaid_fallback(workflow)
1085}
1086
1087fn yaml_workflow_to_mermaid_fallback(workflow: &YamlWorkflow) -> String {
1088 let mut lines = Vec::new();
1089 lines.push("flowchart TD".to_string());
1090 let mut tool_node_ids: Vec<String> = Vec::new();
1091
1092 for node in &workflow.nodes {
1093 let label = format!("{}\\n({})", node.id, node.kind_name());
1094 lines.push(format!(
1095 " {}[\"{}\"]",
1096 sanitize_mermaid_id(&node.id),
1097 escape_mermaid_label(label.as_str()),
1098 ));
1099
1100 if let Some(llm) = node.node_type.llm_call.as_ref() {
1101 for (idx, tool_name) in llm_tool_names(llm).into_iter().enumerate() {
1102 let tool_id = sanitize_mermaid_id(format!("{}__tool_{}", node.id, idx).as_str());
1103 lines.push(format!(
1104 " {}([\"{}\"])",
1105 tool_id,
1106 escape_mermaid_label(format!("tool: {tool_name}").as_str())
1107 ));
1108 lines.push(format!(
1109 " {} -.-> {}",
1110 sanitize_mermaid_id(&node.id),
1111 tool_id
1112 ));
1113 tool_node_ids.push(tool_id);
1114 }
1115 }
1116 }
1117
1118 let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1119
1120 for edge in &workflow.edges {
1121 emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1122 }
1123
1124 for node in &workflow.nodes {
1125 if let Some(switch) = node.node_type.switch.as_ref() {
1126 for branch in &switch.branches {
1127 emitted.insert((
1128 node.id.clone(),
1129 branch.condition.clone(),
1130 branch.target.clone(),
1131 ));
1132 }
1133 emitted.insert((
1134 node.id.clone(),
1135 "default".to_string(),
1136 switch.default.clone(),
1137 ));
1138 }
1139 }
1140
1141 let mut edges = emitted.into_iter().collect::<Vec<_>>();
1142 edges.sort();
1143
1144 for (from, label, to) in edges {
1145 if label.is_empty() {
1146 lines.push(format!(
1147 " {} --> {}",
1148 sanitize_mermaid_id(&from),
1149 sanitize_mermaid_id(&to)
1150 ));
1151 } else {
1152 lines.push(format!(
1153 " {} -- \"{}\" --> {}",
1154 sanitize_mermaid_id(&from),
1155 escape_mermaid_label(&label),
1156 sanitize_mermaid_id(&to)
1157 ));
1158 }
1159 }
1160
1161 if !tool_node_ids.is_empty() {
1162 lines.push(" classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1163 lines.push(format!(" class {} toolNode;", tool_node_ids.join(",")));
1164 }
1165
1166 lines.join("\n")
1167}
1168
1169fn llm_tool_names(llm: &YamlLlmCall) -> Vec<String> {
1170 llm.tools
1171 .iter()
1172 .map(|tool| match tool {
1173 YamlToolDeclaration::OpenAi(openai) => openai.function.name.clone(),
1174 YamlToolDeclaration::Simplified(simple) => simple.name.clone(),
1175 })
1176 .collect()
1177}
1178
1179pub fn yaml_workflow_file_to_mermaid(workflow_path: &Path) -> Result<String, YamlWorkflowRunError> {
1181 let contents =
1182 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1183 path: workflow_path.display().to_string(),
1184 source,
1185 })?;
1186
1187 let workflow: YamlWorkflow =
1188 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1189 path: workflow_path.display().to_string(),
1190 source,
1191 })?;
1192
1193 let referenced_subgraphs = discover_referenced_subgraphs(workflow_path, &workflow)?;
1194 if referenced_subgraphs.is_empty() {
1195 return Ok(yaml_workflow_to_mermaid(&workflow));
1196 }
1197
1198 Ok(yaml_workflow_to_mermaid_with_subgraphs(
1199 &workflow,
1200 &referenced_subgraphs,
1201 ))
1202}
1203
1204#[derive(Debug, Clone)]
1205struct MermaidSubgraphWorkflow {
1206 alias: String,
1207 workflow: YamlWorkflow,
1208}
1209
1210#[derive(Debug, Default)]
1211struct MermaidBlockRender {
1212 lines: Vec<String>,
1213 tool_node_ids: Vec<String>,
1214 run_workflow_tool_node_ids: Vec<String>,
1215 entry_node_id: String,
1216}
1217
1218fn yaml_workflow_to_mermaid_with_subgraphs(
1219 workflow: &YamlWorkflow,
1220 subgraphs: &[MermaidSubgraphWorkflow],
1221) -> String {
1222 let mut lines = vec!["flowchart TD".to_string()];
1223
1224 let main_block = render_mermaid_block(workflow, "main");
1225 lines.push(format!(
1226 " subgraph main_graph[\"Main: {}\"]",
1227 escape_mermaid_label(&workflow.id)
1228 ));
1229 for line in &main_block.lines {
1230 lines.push(format!(" {line}"));
1231 }
1232 lines.push(" end".to_string());
1233
1234 let mut all_tool_nodes = main_block.tool_node_ids.clone();
1235
1236 for (index, subgraph) in subgraphs.iter().enumerate() {
1237 let block_id = format!("subgraph_{}", index + 1);
1238 let block = render_mermaid_block(&subgraph.workflow, &block_id);
1239 lines.push(format!(
1240 " subgraph {}[\"Subgraph: {}\"]",
1241 sanitize_mermaid_id(&format!("{}_cluster", block_id)),
1242 escape_mermaid_label(&subgraph.alias)
1243 ));
1244 for line in &block.lines {
1245 lines.push(format!(" {line}"));
1246 }
1247 lines.push(" end".to_string());
1248
1249 for tool_node in &main_block.run_workflow_tool_node_ids {
1250 lines.push(format!(
1251 " {} -. \"{}\" .-> {}",
1252 tool_node,
1253 escape_mermaid_label(&format!("calls {}", subgraph.alias)),
1254 block.entry_node_id
1255 ));
1256 }
1257
1258 all_tool_nodes.extend(block.tool_node_ids);
1259 }
1260
1261 if !all_tool_nodes.is_empty() {
1262 lines.push(" classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1263 lines.push(format!(" class {} toolNode;", all_tool_nodes.join(",")));
1264 }
1265
1266 lines.join("\n")
1267}
1268
1269fn render_mermaid_block(workflow: &YamlWorkflow, prefix: &str) -> MermaidBlockRender {
1270 let mut block = MermaidBlockRender {
1271 entry_node_id: prefixed_mermaid_id(prefix, &workflow.entry_node),
1272 ..Default::default()
1273 };
1274
1275 for node in &workflow.nodes {
1276 let node_id = prefixed_mermaid_id(prefix, &node.id);
1277 let label = format!("{}\\n({})", node.id, node.kind_name());
1278 block
1279 .lines
1280 .push(format!("{}[\"{}\"]", node_id, escape_mermaid_label(&label)));
1281
1282 if let Some(llm) = node.node_type.llm_call.as_ref() {
1283 for (idx, tool) in llm.tools.iter().enumerate() {
1284 let tool_name = tool_declaration_name(tool);
1285 let tool_id = prefixed_mermaid_id(prefix, &format!("{}__tool_{}", node.id, idx));
1286 block.lines.push(format!(
1287 "{}([\"{}\"])",
1288 tool_id,
1289 escape_mermaid_label(format!("tool: {tool_name}").as_str())
1290 ));
1291 block.lines.push(format!("{} -.-> {}", node_id, tool_id));
1292 block.tool_node_ids.push(tool_id.clone());
1293
1294 if tool_name == "run_workflow_graph" {
1295 block.run_workflow_tool_node_ids.push(tool_id);
1296 }
1297 }
1298 }
1299 }
1300
1301 let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1302
1303 for edge in &workflow.edges {
1304 emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1305 }
1306
1307 for node in &workflow.nodes {
1308 if let Some(switch) = node.node_type.switch.as_ref() {
1309 for branch in &switch.branches {
1310 emitted.insert((
1311 node.id.clone(),
1312 branch.condition.clone(),
1313 branch.target.clone(),
1314 ));
1315 }
1316 emitted.insert((
1317 node.id.clone(),
1318 "default".to_string(),
1319 switch.default.clone(),
1320 ));
1321 }
1322 }
1323
1324 let mut edges = emitted.into_iter().collect::<Vec<_>>();
1325 edges.sort();
1326
1327 for (from, label, to) in edges {
1328 let from_id = prefixed_mermaid_id(prefix, &from);
1329 let to_id = prefixed_mermaid_id(prefix, &to);
1330 if label.is_empty() {
1331 block.lines.push(format!("{} --> {}", from_id, to_id));
1332 } else {
1333 block.lines.push(format!(
1334 "{} -- \"{}\" --> {}",
1335 from_id,
1336 escape_mermaid_label(&label),
1337 to_id
1338 ));
1339 }
1340 }
1341
1342 block
1343}
1344
1345fn discover_referenced_subgraphs(
1346 workflow_path: &Path,
1347 workflow: &YamlWorkflow,
1348) -> Result<Vec<MermaidSubgraphWorkflow>, YamlWorkflowRunError> {
1349 let workflow_ids = referenced_workflow_ids(workflow);
1350 if workflow_ids.is_empty() {
1351 return Ok(Vec::new());
1352 }
1353
1354 let parent_dir = workflow_path.parent().unwrap_or(Path::new("."));
1355 let sibling_workflows = load_yaml_sibling_workflows(parent_dir, workflow_path)?;
1356
1357 let mut discovered = Vec::new();
1358 let mut seen = HashSet::new();
1359
1360 for workflow_id in workflow_ids {
1361 let normalized = normalize_workflow_lookup_key(&workflow_id);
1362 if seen.contains(&normalized) {
1363 continue;
1364 }
1365
1366 if let Some((_, subworkflow)) = sibling_workflows.iter().find(|(key, _)| key == &normalized)
1367 {
1368 discovered.push(MermaidSubgraphWorkflow {
1369 alias: workflow_id.clone(),
1370 workflow: subworkflow.clone(),
1371 });
1372 seen.insert(normalized);
1373 }
1374 }
1375
1376 Ok(discovered)
1377}
1378
1379fn load_yaml_sibling_workflows(
1380 parent_dir: &Path,
1381 workflow_path: &Path,
1382) -> Result<Vec<(String, YamlWorkflow)>, YamlWorkflowRunError> {
1383 let mut results = Vec::new();
1384 let entries = std::fs::read_dir(parent_dir).map_err(|source| YamlWorkflowRunError::Read {
1385 path: parent_dir.display().to_string(),
1386 source,
1387 })?;
1388
1389 for entry in entries {
1390 let entry = entry.map_err(|source| YamlWorkflowRunError::Read {
1391 path: parent_dir.display().to_string(),
1392 source,
1393 })?;
1394 let path = entry.path();
1395 if !is_yaml_file(&path) {
1396 continue;
1397 }
1398
1399 if path == workflow_path {
1400 continue;
1401 }
1402
1403 let contents =
1404 std::fs::read_to_string(&path).map_err(|source| YamlWorkflowRunError::Read {
1405 path: path.display().to_string(),
1406 source,
1407 })?;
1408 let subworkflow: YamlWorkflow =
1409 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1410 path: path.display().to_string(),
1411 source,
1412 })?;
1413
1414 if let Some(stem) = path.file_stem().and_then(|value| value.to_str()) {
1415 results.push((normalize_workflow_lookup_key(stem), subworkflow.clone()));
1416 }
1417 results.push((normalize_workflow_lookup_key(&subworkflow.id), subworkflow));
1418 }
1419
1420 Ok(results)
1421}
1422
1423fn referenced_workflow_ids(workflow: &YamlWorkflow) -> Vec<String> {
1424 let mut ids = Vec::new();
1425 let mut seen = HashSet::new();
1426
1427 for node in &workflow.nodes {
1428 let prompt = node
1429 .config
1430 .as_ref()
1431 .and_then(|config| config.prompt.as_deref());
1432
1433 if let Some(llm) = node.node_type.llm_call.as_ref() {
1434 for tool in &llm.tools {
1435 if tool_declaration_name(tool) != "run_workflow_graph" {
1436 continue;
1437 }
1438
1439 for workflow_id in referenced_workflow_ids_from_tool(tool) {
1440 let normalized = normalize_workflow_lookup_key(&workflow_id);
1441 if seen.insert(normalized) {
1442 ids.push(workflow_id);
1443 }
1444 }
1445
1446 if let Some(prompt_text) = prompt {
1447 for workflow_id in referenced_workflow_ids_from_prompt(prompt_text) {
1448 let normalized = normalize_workflow_lookup_key(&workflow_id);
1449 if seen.insert(normalized) {
1450 ids.push(workflow_id);
1451 }
1452 }
1453 }
1454 }
1455 }
1456 }
1457
1458 ids
1459}
1460
1461fn referenced_workflow_ids_from_tool(tool: &YamlToolDeclaration) -> Vec<String> {
1462 let schema = match tool {
1463 YamlToolDeclaration::OpenAi(openai) => openai.function.parameters.as_ref(),
1464 YamlToolDeclaration::Simplified(simple) => Some(&simple.input_schema),
1465 };
1466
1467 let mut ids = Vec::new();
1468 if let Some(schema) = schema {
1469 if let Some(workflow_prop) = schema
1470 .get("properties")
1471 .and_then(Value::as_object)
1472 .and_then(|properties| properties.get("workflow_id"))
1473 {
1474 if let Some(value) = workflow_prop.get("const").and_then(Value::as_str) {
1475 ids.push(value.to_string());
1476 }
1477
1478 if let Some(enum_values) = workflow_prop.get("enum").and_then(Value::as_array) {
1479 for value in enum_values {
1480 if let Some(value) = value.as_str() {
1481 ids.push(value.to_string());
1482 }
1483 }
1484 }
1485 }
1486 }
1487
1488 ids
1489}
1490
1491fn referenced_workflow_ids_from_prompt(prompt: &str) -> Vec<String> {
1492 let mut ids = Vec::new();
1493 let mut search = prompt;
1494
1495 while let Some(index) = search.find("\"workflow_id\"") {
1496 let remainder = &search[index + "\"workflow_id\"".len()..];
1497 let Some(colon_index) = remainder.find(':') else {
1498 break;
1499 };
1500
1501 let candidate = remainder[colon_index + 1..].trim_start();
1502 if let Some(rest) = candidate.strip_prefix('"') {
1503 if let Some(end_quote_index) = rest.find('"') {
1504 let workflow_id = rest[..end_quote_index].trim();
1505 if !workflow_id.is_empty() {
1506 ids.push(workflow_id.to_string());
1507 }
1508 search = &rest[end_quote_index + 1..];
1509 continue;
1510 }
1511 }
1512
1513 search = &remainder[colon_index + 1..];
1514 }
1515
1516 ids
1517}
1518
1519fn normalize_workflow_lookup_key(value: &str) -> String {
1520 value
1521 .chars()
1522 .map(|ch| match ch {
1523 '-' => '_',
1524 _ => ch.to_ascii_lowercase(),
1525 })
1526 .collect()
1527}
1528
1529fn is_yaml_file(path: &Path) -> bool {
1530 matches!(
1531 path.extension().and_then(|ext| ext.to_str()),
1532 Some("yaml") | Some("yml")
1533 )
1534}
1535
1536fn prefixed_mermaid_id(prefix: &str, id: &str) -> String {
1537 sanitize_mermaid_id(format!("{}__{}", prefix, id).as_str())
1538}
1539
1540fn tool_declaration_name(tool: &YamlToolDeclaration) -> &str {
1541 match tool {
1542 YamlToolDeclaration::OpenAi(openai) => &openai.function.name,
1543 YamlToolDeclaration::Simplified(simple) => &simple.name,
1544 }
1545}
1546
1547pub fn yaml_workflow_to_ir(workflow: &YamlWorkflow) -> Result<WorkflowDefinition, YamlToIrError> {
1548 let known_ids: HashSet<&str> = workflow.nodes.iter().map(|n| n.id.as_str()).collect();
1549 if !known_ids.contains(workflow.entry_node.as_str()) {
1550 return Err(YamlToIrError::MissingEntry {
1551 entry_node: workflow.entry_node.clone(),
1552 });
1553 }
1554
1555 let mut outgoing: HashMap<&str, Vec<&str>> = HashMap::new();
1556 for edge in &workflow.edges {
1557 outgoing
1558 .entry(edge.from.as_str())
1559 .or_default()
1560 .push(edge.to.as_str());
1561 }
1562
1563 let mut nodes = Vec::with_capacity(workflow.nodes.len() + 1);
1564 nodes.push(Node {
1565 id: YAML_START_NODE_ID.to_string(),
1566 kind: NodeKind::Start {
1567 next: workflow.entry_node.clone(),
1568 },
1569 });
1570
1571 for node in &workflow.nodes {
1572 if let Some(llm) = node.node_type.llm_call.as_ref() {
1573 if node
1574 .config
1575 .as_ref()
1576 .and_then(|c| c.set_globals.as_ref())
1577 .is_some()
1578 || node
1579 .config
1580 .as_ref()
1581 .and_then(|c| c.update_globals.as_ref())
1582 .is_some()
1583 {
1584 return Err(YamlToIrError::UnsupportedNode {
1585 node_id: node.id.clone(),
1586 reason: "set_globals/update_globals are not represented in canonical IR llm nodes yet"
1587 .to_string(),
1588 });
1589 }
1590
1591 if !llm.tools.is_empty() {
1592 return Err(YamlToIrError::UnsupportedNode {
1593 node_id: node.id.clone(),
1594 reason: "llm_call.tools are not represented in canonical IR llm nodes yet"
1595 .to_string(),
1596 });
1597 }
1598
1599 let next = single_next_for_node(&outgoing, &node.id)?;
1600 nodes.push(Node {
1601 id: node.id.clone(),
1602 kind: NodeKind::Tool {
1603 tool: YAML_LLM_TOOL_ID.to_string(),
1604 input: json!({
1605 "node_id": node.id,
1606 "model": llm.model,
1607 "prompt_template": node
1608 .config
1609 .as_ref()
1610 .and_then(|c| c.prompt.clone())
1611 .unwrap_or_default(),
1612 "stream": llm.stream.unwrap_or(false),
1613 "stream_json_as_text": llm.stream_json_as_text.unwrap_or(false),
1614 "heal": llm.heal.unwrap_or(false),
1615 "messages_path": llm.messages_path,
1616 "append_prompt_as_user": llm.append_prompt_as_user.unwrap_or(true),
1617 "output_schema": node
1618 .config
1619 .as_ref()
1620 .and_then(|c| c.output_schema.clone())
1621 .unwrap_or_else(default_llm_output_schema),
1622 }),
1623 next,
1624 },
1625 });
1626 continue;
1627 }
1628
1629 if let Some(worker) = node.node_type.custom_worker.as_ref() {
1630 if node
1631 .config
1632 .as_ref()
1633 .and_then(|c| c.set_globals.as_ref())
1634 .is_some()
1635 || node
1636 .config
1637 .as_ref()
1638 .and_then(|c| c.update_globals.as_ref())
1639 .is_some()
1640 {
1641 return Err(YamlToIrError::UnsupportedNode {
1642 node_id: node.id.clone(),
1643 reason: "set_globals/update_globals are not represented in canonical IR tool nodes yet"
1644 .to_string(),
1645 });
1646 }
1647
1648 let next = single_next_for_node(&outgoing, &node.id)?;
1649 nodes.push(Node {
1650 id: node.id.clone(),
1651 kind: NodeKind::Tool {
1652 tool: worker.handler.clone(),
1653 input: node
1654 .config
1655 .as_ref()
1656 .and_then(|c| c.payload.clone())
1657 .unwrap_or_else(|| json!({})),
1658 next,
1659 },
1660 });
1661 continue;
1662 }
1663
1664 if let Some(switch) = node.node_type.switch.as_ref() {
1665 nodes.push(Node {
1666 id: node.id.clone(),
1667 kind: NodeKind::Router {
1668 routes: switch
1669 .branches
1670 .iter()
1671 .map(|b| RouterRoute {
1672 when: rewrite_yaml_condition_to_ir(&b.condition),
1673 next: b.target.clone(),
1674 })
1675 .collect(),
1676 default: switch.default.clone(),
1677 },
1678 });
1679 continue;
1680 }
1681
1682 return Err(YamlToIrError::UnsupportedNode {
1683 node_id: node.id.clone(),
1684 reason: "node_type must be llm_call, switch, or custom_worker".to_string(),
1685 });
1686 }
1687
1688 Ok(WorkflowDefinition {
1689 version: WORKFLOW_IR_V0.to_string(),
1690 name: workflow.id.clone(),
1691 nodes,
1692 })
1693}
1694
1695fn single_next_for_node(
1696 outgoing: &HashMap<&str, Vec<&str>>,
1697 node_id: &str,
1698) -> Result<Option<String>, YamlToIrError> {
1699 match outgoing.get(node_id) {
1700 None => Ok(None),
1701 Some(targets) if targets.len() == 1 => Ok(Some(targets[0].to_string())),
1702 Some(_) => Err(YamlToIrError::MultipleOutgoingEdge {
1703 node_id: node_id.to_string(),
1704 }),
1705 }
1706}
1707
1708fn rewrite_yaml_condition_to_ir(expr: &str) -> String {
1709 let rewritten = expr
1710 .replace("$.nodes.", "$.node_outputs.")
1711 .replace(".output.", ".");
1712 if let Some(prefix) = rewritten.strip_suffix(".output") {
1713 prefix.to_string()
1714 } else {
1715 rewritten
1716 }
1717}
1718
1719fn sanitize_mermaid_id(id: &str) -> String {
1720 let mut out = String::with_capacity(id.len() + 1);
1721 if id
1722 .chars()
1723 .next()
1724 .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
1725 {
1726 out.push_str(id);
1727 } else {
1728 out.push('n');
1729 out.push('_');
1730 out.push_str(id);
1731 }
1732 out.chars()
1733 .map(|ch| {
1734 if ch.is_ascii_alphanumeric() || ch == '_' {
1735 ch
1736 } else {
1737 '_'
1738 }
1739 })
1740 .collect()
1741}
1742
1743fn escape_mermaid_label(label: &str) -> String {
1744 label.replace('"', "\\\"")
1745}
1746
1747#[derive(Debug, Clone)]
1748pub struct YamlLlmExecutionRequest {
1749 pub node_id: String,
1750 pub is_terminal_node: bool,
1751 pub stream_json_as_text: bool,
1752 pub model: String,
1753 pub messages: Option<Vec<Message>>,
1754 pub append_prompt_as_user: bool,
1755 pub prompt: String,
1756 pub prompt_template: String,
1757 pub prompt_bindings: Vec<YamlTemplateBinding>,
1758 pub schema: Value,
1759 pub stream: bool,
1760 pub heal: bool,
1761 pub tools: Vec<YamlResolvedTool>,
1762 pub tool_choice: Option<ToolChoice>,
1763 pub max_tool_roundtrips: u8,
1764 pub tool_calls_global_key: Option<String>,
1765 pub tool_trace_mode: YamlToolTraceMode,
1766 pub execution_context: Value,
1767 pub email_text: String,
1768 pub trace_id: Option<String>,
1769 pub trace_context: Option<TraceContext>,
1770 pub trace_sampled: bool,
1771}
1772
1773#[derive(Debug, Clone)]
1774pub struct YamlResolvedTool {
1775 pub definition: ToolDefinition,
1776 pub output_schema: Option<Value>,
1777}
1778
1779#[async_trait]
1780pub trait YamlWorkflowLlmExecutor: Send + Sync {
1781 async fn complete_structured(
1782 &self,
1783 request: YamlLlmExecutionRequest,
1784 event_sink: Option<&dyn YamlWorkflowEventSink>,
1785 ) -> Result<YamlLlmExecutionResult, String>;
1786}
1787
1788#[async_trait]
1789pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
1790 async fn execute(
1791 &self,
1792 handler: &str,
1793 payload: &Value,
1794 email_text: &str,
1795 context: &Value,
1796 ) -> Result<Value, String>;
1797}
1798
1799pub async fn run_workflow_yaml_file(
1800 workflow_path: &Path,
1801 workflow_input: &Value,
1802 executor: &dyn YamlWorkflowLlmExecutor,
1803) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1804 let contents =
1805 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1806 path: workflow_path.display().to_string(),
1807 source,
1808 })?;
1809
1810 let workflow: YamlWorkflow =
1811 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1812 path: workflow_path.display().to_string(),
1813 source,
1814 })?;
1815
1816 run_workflow_yaml(&workflow, workflow_input, executor).await
1817}
1818
1819pub async fn run_email_workflow_yaml_file(
1820 workflow_path: &Path,
1821 email_text: &str,
1822 executor: &dyn YamlWorkflowLlmExecutor,
1823) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1824 let workflow_input = json!({ "email_text": email_text });
1825 run_workflow_yaml_file(workflow_path, &workflow_input, executor).await
1826}
1827
1828pub async fn run_workflow_yaml_file_with_client(
1829 workflow_path: &Path,
1830 workflow_input: &Value,
1831 client: &SimpleAgentsClient,
1832) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1833 let contents =
1834 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1835 path: workflow_path.display().to_string(),
1836 source,
1837 })?;
1838
1839 let workflow: YamlWorkflow =
1840 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1841 path: workflow_path.display().to_string(),
1842 source,
1843 })?;
1844
1845 run_workflow_yaml_with_client(&workflow, workflow_input, client).await
1846}
1847
1848pub async fn run_email_workflow_yaml_file_with_client(
1849 workflow_path: &Path,
1850 email_text: &str,
1851 client: &SimpleAgentsClient,
1852) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1853 let workflow_input = json!({ "email_text": email_text });
1854 run_workflow_yaml_file_with_client(workflow_path, &workflow_input, client).await
1855}
1856
1857pub async fn run_workflow_yaml_with_client(
1858 workflow: &YamlWorkflow,
1859 workflow_input: &Value,
1860 client: &SimpleAgentsClient,
1861) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1862 run_workflow_yaml_with_client_and_custom_worker(workflow, workflow_input, client, None).await
1863}
1864
1865pub async fn run_email_workflow_yaml_with_client(
1866 workflow: &YamlWorkflow,
1867 email_text: &str,
1868 client: &SimpleAgentsClient,
1869) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1870 let workflow_input = json!({ "email_text": email_text });
1871 run_workflow_yaml_with_client(workflow, &workflow_input, client).await
1872}
1873
1874pub async fn run_workflow_yaml_file_with_client_and_custom_worker(
1875 workflow_path: &Path,
1876 workflow_input: &Value,
1877 client: &SimpleAgentsClient,
1878 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1879) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1880 let contents =
1881 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1882 path: workflow_path.display().to_string(),
1883 source,
1884 })?;
1885
1886 let workflow: YamlWorkflow =
1887 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1888 path: workflow_path.display().to_string(),
1889 source,
1890 })?;
1891
1892 run_workflow_yaml_with_client_and_custom_worker(
1893 &workflow,
1894 workflow_input,
1895 client,
1896 custom_worker,
1897 )
1898 .await
1899}
1900
1901pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker(
1902 workflow_path: &Path,
1903 email_text: &str,
1904 client: &SimpleAgentsClient,
1905 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1906) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1907 let workflow_input = json!({ "email_text": email_text });
1908 run_workflow_yaml_file_with_client_and_custom_worker(
1909 workflow_path,
1910 &workflow_input,
1911 client,
1912 custom_worker,
1913 )
1914 .await
1915}
1916
1917pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1918 workflow_path: &Path,
1919 workflow_input: &Value,
1920 client: &SimpleAgentsClient,
1921 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1922 event_sink: Option<&dyn YamlWorkflowEventSink>,
1923) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1924 run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1925 workflow_path,
1926 workflow_input,
1927 client,
1928 custom_worker,
1929 event_sink,
1930 &YamlWorkflowRunOptions::default(),
1931 )
1932 .await
1933}
1934
1935pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1936 workflow_path: &Path,
1937 workflow_input: &Value,
1938 client: &SimpleAgentsClient,
1939 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1940 event_sink: Option<&dyn YamlWorkflowEventSink>,
1941 options: &YamlWorkflowRunOptions,
1942) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1943 let contents =
1944 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1945 path: workflow_path.display().to_string(),
1946 source,
1947 })?;
1948
1949 let workflow: YamlWorkflow =
1950 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1951 path: workflow_path.display().to_string(),
1952 source,
1953 })?;
1954
1955 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1956 &workflow,
1957 workflow_input,
1958 client,
1959 custom_worker,
1960 event_sink,
1961 options,
1962 )
1963 .await
1964}
1965
1966pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker_and_events(
1967 workflow_path: &Path,
1968 email_text: &str,
1969 client: &SimpleAgentsClient,
1970 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1971 event_sink: Option<&dyn YamlWorkflowEventSink>,
1972) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1973 let workflow_input = json!({ "email_text": email_text });
1974 run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1975 workflow_path,
1976 &workflow_input,
1977 client,
1978 custom_worker,
1979 event_sink,
1980 )
1981 .await
1982}
1983
1984pub async fn run_workflow_yaml_with_client_and_custom_worker(
1985 workflow: &YamlWorkflow,
1986 workflow_input: &Value,
1987 client: &SimpleAgentsClient,
1988 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1989) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1990 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1991 workflow,
1992 workflow_input,
1993 client,
1994 custom_worker,
1995 None,
1996 &YamlWorkflowRunOptions::default(),
1997 )
1998 .await
1999}
2000
2001pub async fn run_email_workflow_yaml_with_client_and_custom_worker(
2002 workflow: &YamlWorkflow,
2003 email_text: &str,
2004 client: &SimpleAgentsClient,
2005 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2006) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2007 let workflow_input = json!({ "email_text": email_text });
2008 run_workflow_yaml_with_client_and_custom_worker(
2009 workflow,
2010 &workflow_input,
2011 client,
2012 custom_worker,
2013 )
2014 .await
2015}
2016
2017pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events(
2018 workflow: &YamlWorkflow,
2019 workflow_input: &Value,
2020 client: &SimpleAgentsClient,
2021 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2022 event_sink: Option<&dyn YamlWorkflowEventSink>,
2023) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2024 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2025 workflow,
2026 workflow_input,
2027 client,
2028 custom_worker,
2029 event_sink,
2030 &YamlWorkflowRunOptions::default(),
2031 )
2032 .await
2033}
2034
2035pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2036 workflow: &YamlWorkflow,
2037 workflow_input: &Value,
2038 client: &SimpleAgentsClient,
2039 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2040 event_sink: Option<&dyn YamlWorkflowEventSink>,
2041 options: &YamlWorkflowRunOptions,
2042) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2043 struct BorrowedClientExecutor<'a> {
2044 client: &'a SimpleAgentsClient,
2045 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
2046 run_options: YamlWorkflowRunOptions,
2047 }
2048
2049 #[async_trait]
2050 impl<'a> YamlWorkflowLlmExecutor for BorrowedClientExecutor<'a> {
2051 async fn complete_structured(
2052 &self,
2053 request: YamlLlmExecutionRequest,
2054 event_sink: Option<&dyn YamlWorkflowEventSink>,
2055 ) -> Result<YamlLlmExecutionResult, String> {
2056 let expects_object = schema_expects_object(&request.schema);
2057 let messages = if let Some(mut history) = request.messages.clone() {
2058 if request.append_prompt_as_user && !request.prompt.trim().is_empty() {
2059 history.push(Message::user(&request.prompt));
2060 }
2061 history
2062 } else {
2063 vec![
2064 Message::system("You execute workflow classification steps."),
2065 Message::user(&request.prompt),
2066 ]
2067 };
2068
2069 if !request.tools.is_empty() {
2070 let mut tool_traces: Vec<YamlToolCallTrace> = Vec::new();
2071 let mut conversation = messages;
2072 let mut usage_total: Option<YamlLlmTokenUsage> = None;
2073
2074 for roundtrip in 0..=request.max_tool_roundtrips {
2075 let mut builder = CompletionRequest::builder()
2076 .model(&request.model)
2077 .messages(conversation.clone())
2078 .tools(request.tools.iter().map(|t| t.definition.clone()).collect());
2079
2080 if request.heal && expects_object {
2081 builder = builder.json_schema("workflow_step", request.schema.clone());
2082 }
2083
2084 if let Some(choice) = request.tool_choice.clone() {
2085 builder = builder.tool_choice(choice);
2086 }
2087
2088 if request.stream {
2089 builder = builder.stream(true);
2090 }
2091
2092 let completion_request = builder
2093 .build()
2094 .map_err(|error| format!("failed to build completion request: {error}"))?;
2095
2096 let outcome = self
2097 .client
2098 .complete(&completion_request, CompletionOptions::default())
2099 .await
2100 .map_err(|error| error.to_string())?;
2101
2102 let mut streamed_tool_calls: Option<Vec<ToolCall>> = None;
2103 let mut streamed_content = String::new();
2104 let mut finish_reason = FinishReason::Stop;
2105
2106 match outcome {
2107 CompletionOutcome::Response(response) => {
2108 if let Some(usage) = usage_total.as_mut() {
2109 usage.prompt_tokens += response.usage.prompt_tokens;
2110 usage.completion_tokens += response.usage.completion_tokens;
2111 usage.total_tokens += response.usage.total_tokens;
2112 } else {
2113 usage_total = Some(YamlLlmTokenUsage {
2114 prompt_tokens: response.usage.prompt_tokens,
2115 completion_tokens: response.usage.completion_tokens,
2116 total_tokens: response.usage.total_tokens,
2117 thinking_tokens: None,
2118 });
2119 }
2120
2121 let choice = response
2122 .choices
2123 .first()
2124 .ok_or_else(|| "completion returned no choices".to_string())?;
2125 streamed_content = choice.message.content.clone();
2126 streamed_tool_calls = choice.message.tool_calls.clone();
2127 finish_reason = choice.finish_reason;
2128 }
2129 CompletionOutcome::HealedJson(healed) => {
2130 let response = healed.response;
2131 if let Some(usage) = usage_total.as_mut() {
2132 usage.prompt_tokens += response.usage.prompt_tokens;
2133 usage.completion_tokens += response.usage.completion_tokens;
2134 usage.total_tokens += response.usage.total_tokens;
2135 } else {
2136 usage_total = Some(YamlLlmTokenUsage {
2137 prompt_tokens: response.usage.prompt_tokens,
2138 completion_tokens: response.usage.completion_tokens,
2139 total_tokens: response.usage.total_tokens,
2140 thinking_tokens: None,
2141 });
2142 }
2143
2144 let choice = response
2145 .choices
2146 .first()
2147 .ok_or_else(|| "completion returned no choices".to_string())?;
2148 streamed_content = choice.message.content.clone();
2149 streamed_tool_calls = choice.message.tool_calls.clone();
2150 finish_reason = choice.finish_reason;
2151 }
2152 CompletionOutcome::CoercedSchema(coerced) => {
2153 let response = coerced.response;
2154 if let Some(usage) = usage_total.as_mut() {
2155 usage.prompt_tokens += response.usage.prompt_tokens;
2156 usage.completion_tokens += response.usage.completion_tokens;
2157 usage.total_tokens += response.usage.total_tokens;
2158 } else {
2159 usage_total = Some(YamlLlmTokenUsage {
2160 prompt_tokens: response.usage.prompt_tokens,
2161 completion_tokens: response.usage.completion_tokens,
2162 total_tokens: response.usage.total_tokens,
2163 thinking_tokens: None,
2164 });
2165 }
2166
2167 let choice = response
2168 .choices
2169 .first()
2170 .ok_or_else(|| "completion returned no choices".to_string())?;
2171 streamed_content = choice.message.content.clone();
2172 streamed_tool_calls = choice.message.tool_calls.clone();
2173 finish_reason = choice.finish_reason;
2174 }
2175 CompletionOutcome::Stream(mut stream) => {
2176 let mut final_stream_usage: Option<simple_agent_type::response::Usage> =
2177 None;
2178 let mut delta_filter = StructuredJsonDeltaFilter::default();
2179 let include_raw_debug = include_raw_stream_debug_events();
2180 let mut json_text_formatter = if request.stream_json_as_text {
2181 Some(StreamJsonAsTextFormatter::default())
2182 } else {
2183 None
2184 };
2185 let mut tool_calls_by_index: HashMap<u32, ToolCall> = HashMap::new();
2186
2187 while let Some(chunk_result) = stream.next().await {
2188 if event_sink_is_cancelled(event_sink) {
2189 return Err(workflow_event_sink_cancelled_message().to_string());
2190 }
2191
2192 let chunk = chunk_result.map_err(|error| error.to_string())?;
2193 if let Some(usage) = chunk.usage {
2194 final_stream_usage = Some(usage);
2195 }
2196
2197 if let Some(choice) = chunk.choices.first() {
2198 if let Some(chunk_finish_reason) = choice.finish_reason {
2199 finish_reason = chunk_finish_reason;
2200 }
2201
2202 if include_raw_debug {
2203 if let Some(reasoning_delta) =
2204 choice.delta.reasoning_content.as_ref()
2205 {
2206 if let Some(sink) = event_sink {
2207 sink.emit(&YamlWorkflowEvent {
2208 event_type: "node_stream_thinking_delta"
2209 .to_string(),
2210 node_id: Some(request.node_id.clone()),
2211 step_id: Some(request.node_id.clone()),
2212 node_kind: Some("llm_call".to_string()),
2213 streamable: Some(true),
2214 message: None,
2215 delta: Some(reasoning_delta.clone()),
2216 token_kind: Some(
2217 YamlWorkflowTokenKind::Thinking,
2218 ),
2219 is_terminal_node_token: Some(
2220 request.is_terminal_node,
2221 ),
2222 elapsed_ms: None,
2223 metadata: None,
2224 });
2225 }
2226 }
2227 }
2228
2229 if let Some(delta) = choice.delta.content.clone() {
2230 streamed_content.push_str(delta.as_str());
2231 let (output_delta, thinking_delta) = if expects_object {
2232 delta_filter.split(delta.as_str())
2233 } else {
2234 (Some(delta.clone()), None)
2235 };
2236 let rendered_output_delta = if let Some(output_chunk) =
2237 output_delta
2238 {
2239 if let Some(formatter) = json_text_formatter.as_mut() {
2240 formatter.push(output_chunk.as_str());
2241 formatter.emit_if_ready(delta_filter.completed())
2242 } else {
2243 Some(output_chunk)
2244 }
2245 } else {
2246 None
2247 };
2248
2249 if include_raw_debug {
2250 if let Some(sink) = event_sink {
2251 if let Some(raw_thinking_delta) =
2252 thinking_delta.as_ref()
2253 {
2254 sink.emit(&YamlWorkflowEvent {
2255 event_type: "node_stream_thinking_delta"
2256 .to_string(),
2257 node_id: Some(request.node_id.clone()),
2258 step_id: Some(request.node_id.clone()),
2259 node_kind: Some("llm_call".to_string()),
2260 streamable: Some(true),
2261 message: None,
2262 delta: Some(raw_thinking_delta.clone()),
2263 token_kind: Some(
2264 YamlWorkflowTokenKind::Thinking,
2265 ),
2266 is_terminal_node_token: Some(
2267 request.is_terminal_node,
2268 ),
2269 elapsed_ms: None,
2270 metadata: None,
2271 });
2272 }
2273 if let Some(raw_output_delta) =
2274 rendered_output_delta.as_ref()
2275 {
2276 sink.emit(&YamlWorkflowEvent {
2277 event_type: "node_stream_output_delta"
2278 .to_string(),
2279 node_id: Some(request.node_id.clone()),
2280 step_id: Some(request.node_id.clone()),
2281 node_kind: Some("llm_call".to_string()),
2282 streamable: Some(true),
2283 message: None,
2284 delta: Some(raw_output_delta.clone()),
2285 token_kind: Some(
2286 YamlWorkflowTokenKind::Output,
2287 ),
2288 is_terminal_node_token: Some(
2289 request.is_terminal_node,
2290 ),
2291 elapsed_ms: None,
2292 metadata: None,
2293 });
2294 }
2295 }
2296 }
2297
2298 if let Some(filtered_delta) = rendered_output_delta {
2299 if let Some(sink) = event_sink {
2300 sink.emit(&YamlWorkflowEvent {
2301 event_type: "node_stream_delta".to_string(),
2302 node_id: Some(request.node_id.clone()),
2303 step_id: Some(request.node_id.clone()),
2304 node_kind: Some("llm_call".to_string()),
2305 streamable: Some(true),
2306 message: None,
2307 delta: Some(filtered_delta),
2308 token_kind: Some(YamlWorkflowTokenKind::Output),
2309 is_terminal_node_token: Some(
2310 request.is_terminal_node,
2311 ),
2312 elapsed_ms: None,
2313 metadata: None,
2314 });
2315 }
2316 }
2317 }
2318
2319 if let Some(tool_call_deltas) = choice.delta.tool_calls.as_ref()
2320 {
2321 for tool_call_delta in tool_call_deltas {
2322 let entry = tool_calls_by_index
2323 .entry(tool_call_delta.index)
2324 .or_insert_with(|| ToolCall {
2325 id: tool_call_delta.id.clone().unwrap_or_else(
2326 || {
2327 format!(
2328 "tool_call_{}",
2329 tool_call_delta.index
2330 )
2331 },
2332 ),
2333 tool_type: ToolType::Function,
2334 function:
2335 simple_agent_type::tool::ToolCallFunction {
2336 name: String::new(),
2337 arguments: String::new(),
2338 },
2339 });
2340
2341 if let Some(id) = tool_call_delta.id.as_ref() {
2342 entry.id = id.clone();
2343 }
2344 if let Some(tool_type) = tool_call_delta.tool_type {
2345 entry.tool_type = tool_type;
2346 }
2347 if let Some(function_delta) =
2348 tool_call_delta.function.as_ref()
2349 {
2350 if let Some(name) = function_delta.name.as_ref() {
2351 entry.function.name = name.clone();
2352 }
2353 if let Some(arguments) =
2354 function_delta.arguments.as_ref()
2355 {
2356 entry.function.arguments.push_str(arguments);
2357 }
2358 }
2359 }
2360 }
2361 }
2362
2363 if event_sink_is_cancelled(event_sink) {
2364 return Err(workflow_event_sink_cancelled_message().to_string());
2365 }
2366 }
2367
2368 if let Some(usage) = final_stream_usage {
2369 if let Some(total) = usage_total.as_mut() {
2370 total.prompt_tokens += usage.prompt_tokens;
2371 total.completion_tokens += usage.completion_tokens;
2372 total.total_tokens += usage.total_tokens;
2373 } else {
2374 usage_total = Some(YamlLlmTokenUsage {
2375 prompt_tokens: usage.prompt_tokens,
2376 completion_tokens: usage.completion_tokens,
2377 total_tokens: usage.total_tokens,
2378 thinking_tokens: None,
2379 });
2380 }
2381 }
2382
2383 let mut ordered_tool_calls =
2384 tool_calls_by_index.into_iter().collect::<Vec<_>>();
2385 ordered_tool_calls.sort_by_key(|(index, _)| *index);
2386 if !ordered_tool_calls.is_empty() {
2387 streamed_tool_calls = Some(
2388 ordered_tool_calls
2389 .into_iter()
2390 .map(|(_, tool_call)| tool_call)
2391 .collect::<Vec<_>>(),
2392 );
2393 }
2394 }
2395 }
2396
2397 let has_tool_calls = streamed_tool_calls
2398 .as_ref()
2399 .is_some_and(|calls| !calls.is_empty());
2400 if finish_reason != FinishReason::ToolCalls && !has_tool_calls {
2401 let payload = if expects_object {
2402 parse_streamed_structured_payload(
2403 streamed_content.as_str(),
2404 request.heal,
2405 )
2406 .map_err(|error| {
2407 format!("failed to parse structured completion JSON: {error}")
2408 })?
2409 .payload
2410 } else {
2411 Value::String(streamed_content.clone())
2412 };
2413 return Ok(YamlLlmExecutionResult {
2414 payload,
2415 usage: usage_total,
2416 ttft_ms: None,
2417 tool_calls: tool_traces,
2418 });
2419 }
2420
2421 if roundtrip >= request.max_tool_roundtrips {
2422 return Err(format!(
2423 "tool call roundtrip limit reached for node '{}' (max={})",
2424 request.node_id, request.max_tool_roundtrips
2425 ));
2426 }
2427
2428 let tool_calls: Vec<ToolCall> = streamed_tool_calls.ok_or_else(|| {
2429 "finish_reason=tool_calls but no tool calls found".to_string()
2430 })?;
2431 if tool_calls
2432 .iter()
2433 .any(|tool_call| tool_call.function.name.trim().is_empty())
2434 {
2435 return Err("streamed tool call missing function name".to_string());
2436 }
2437
2438 let assistant_tool_message =
2439 Message::assistant(&streamed_content).with_tool_calls(tool_calls.clone());
2440 conversation.push(assistant_tool_message);
2441
2442 for tool_call in tool_calls {
2443 let tool_call_id = tool_call.id.clone();
2444 let tool_name = tool_call.function.name.clone();
2445 let tool_started = Instant::now();
2446 let arguments: Value = serde_json::from_str(&tool_call.function.arguments)
2447 .map_err(|error| {
2448 format!(
2449 "tool '{}' arguments must be valid JSON: {}",
2450 tool_name, error
2451 )
2452 })?;
2453 let mut tool_span_context: Option<TraceContext> = None;
2454 let mut tool_span = if request.trace_sampled {
2455 let (span_context, mut span) = workflow_tracer().start_span(
2456 "workflow.tool.execute",
2457 SpanKind::Node,
2458 request.trace_context.as_ref(),
2459 );
2460 tool_span_context = Some(span_context);
2461 span.set_attribute(
2462 "trace_id",
2463 request.trace_id.as_deref().unwrap_or_default(),
2464 );
2465 span.set_attribute("node_id", request.node_id.as_str());
2466 span.set_attribute("node_kind", "llm_call");
2467 span.set_attribute("tool_name", tool_name.as_str());
2468 span.set_attribute("tool_call_id", tool_call_id.as_str());
2469 let args_for_span =
2470 payload_for_tool_trace(request.tool_trace_mode, &arguments)
2471 .to_string();
2472 span.set_attribute("tool_arguments", args_for_span.as_str());
2473 Some(span)
2474 } else {
2475 None
2476 };
2477
2478 if request.tool_trace_mode != YamlToolTraceMode::Off {
2479 if let Some(sink) = event_sink {
2480 sink.emit(&YamlWorkflowEvent {
2481 event_type: "node_tool_call_requested".to_string(),
2482 node_id: Some(request.node_id.clone()),
2483 step_id: Some(request.node_id.clone()),
2484 node_kind: Some("llm_call".to_string()),
2485 streamable: Some(false),
2486 message: Some(format!(
2487 "tool call requested: {}",
2488 tool_name
2489 )),
2490 delta: None,
2491 token_kind: None,
2492 is_terminal_node_token: None,
2493 elapsed_ms: None,
2494 metadata: Some(json!({
2495 "tool_call_id": tool_call_id.clone(),
2496 "tool_name": tool_name.clone(),
2497 "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2498 })),
2499 });
2500 }
2501 }
2502
2503 let Some(tool_config) = request
2504 .tools
2505 .iter()
2506 .find(|tool| tool.definition.function.name == tool_name)
2507 else {
2508 return Err(format!("model requested unknown tool '{}'", tool_name));
2509 };
2510
2511 let tool_output_result = if tool_name == "run_workflow_graph" {
2512 execute_subworkflow_tool_call(
2513 &arguments,
2514 &request.execution_context,
2515 self.client,
2516 self.custom_worker,
2517 &self.run_options,
2518 tool_span_context.as_ref(),
2519 request.trace_id.as_deref(),
2520 )
2521 .await
2522 } else if let Some(custom_worker) = self.custom_worker {
2523 custom_worker
2524 .execute(
2525 tool_name.as_str(),
2526 &arguments,
2527 request.email_text.as_str(),
2528 &request.execution_context,
2529 )
2530 .await
2531 } else {
2532 Err(format!(
2533 "tool '{}' requested but no custom worker executor is configured",
2534 tool_name
2535 ))
2536 };
2537
2538 let tool_output = match tool_output_result {
2539 Ok(output) => output,
2540 Err(message) => {
2541 let elapsed_ms = tool_started.elapsed().as_millis();
2542 if let Some(span) = tool_span.as_mut() {
2543 span.add_event("workflow.tool.execute.error");
2544 span.set_attribute("tool_status", "error");
2545 span.set_attribute("tool_error", message.as_str());
2546 span.set_attribute(
2547 "elapsed_ms",
2548 elapsed_ms.to_string().as_str(),
2549 );
2550 }
2551 if request.tool_trace_mode != YamlToolTraceMode::Off {
2552 if let Some(sink) = event_sink {
2553 sink.emit(&YamlWorkflowEvent {
2554 event_type: "node_tool_call_failed".to_string(),
2555 node_id: Some(request.node_id.clone()),
2556 step_id: Some(request.node_id.clone()),
2557 node_kind: Some("llm_call".to_string()),
2558 streamable: Some(false),
2559 message: Some(message.clone()),
2560 delta: None,
2561 token_kind: None,
2562 is_terminal_node_token: None,
2563 elapsed_ms: Some(elapsed_ms),
2564 metadata: Some(json!({
2565 "tool_call_id": tool_call_id.clone(),
2566 "tool_name": tool_name.clone(),
2567 })),
2568 });
2569 }
2570 }
2571 tool_traces.push(YamlToolCallTrace {
2572 id: tool_call_id.clone(),
2573 name: tool_name.clone(),
2574 arguments,
2575 output: None,
2576 status: "error".to_string(),
2577 elapsed_ms,
2578 error: Some(message.clone()),
2579 });
2580 if let Some(span) = tool_span.take() {
2581 span.end();
2582 }
2583 return Err(format!("tool '{}' failed: {}", tool_name, message));
2584 }
2585 };
2586
2587 if let Some(output_schema) = tool_config.output_schema.as_ref() {
2588 validate_schema_instance(output_schema, &tool_output).map_err(
2589 |message| {
2590 format!(
2591 "tool '{}' output failed schema validation: {}",
2592 tool_name, message
2593 )
2594 },
2595 )?;
2596 }
2597
2598 let elapsed_ms = tool_started.elapsed().as_millis();
2599 if let Some(span) = tool_span.as_mut() {
2600 span.add_event("workflow.tool.execute.completed");
2601 span.set_attribute("tool_status", "ok");
2602 span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
2603 let output_for_span =
2604 payload_for_tool_trace(request.tool_trace_mode, &tool_output)
2605 .to_string();
2606 span.set_attribute("tool_output", output_for_span.as_str());
2607 }
2608 if request.tool_trace_mode != YamlToolTraceMode::Off {
2609 if let Some(sink) = event_sink {
2610 sink.emit(&YamlWorkflowEvent {
2611 event_type: "node_tool_call_completed".to_string(),
2612 node_id: Some(request.node_id.clone()),
2613 step_id: Some(request.node_id.clone()),
2614 node_kind: Some("llm_call".to_string()),
2615 streamable: Some(false),
2616 message: Some(format!(
2617 "tool call completed: {}",
2618 tool_name
2619 )),
2620 delta: None,
2621 token_kind: None,
2622 is_terminal_node_token: None,
2623 elapsed_ms: Some(elapsed_ms),
2624 metadata: Some(json!({
2625 "tool_call_id": tool_call_id.clone(),
2626 "tool_name": tool_name.clone(),
2627 "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2628 "output": payload_for_tool_trace(request.tool_trace_mode, &tool_output),
2629 })),
2630 });
2631 }
2632 }
2633
2634 tool_traces.push(YamlToolCallTrace {
2635 id: tool_call_id.clone(),
2636 name: tool_name.clone(),
2637 arguments: arguments.clone(),
2638 output: Some(tool_output.clone()),
2639 status: "ok".to_string(),
2640 elapsed_ms,
2641 error: None,
2642 });
2643
2644 conversation.push(Message::tool(
2645 serde_json::to_string(&tool_output).map_err(|error| {
2646 format!("failed to serialize tool output: {error}")
2647 })?,
2648 tool_call_id,
2649 ));
2650 if let Some(span) = tool_span.take() {
2651 span.end();
2652 }
2653 }
2654
2655 if request.tool_trace_mode != YamlToolTraceMode::Off {
2656 if let Some(sink) = event_sink {
2657 sink.emit(&YamlWorkflowEvent {
2658 event_type: "node_tool_roundtrip_completed".to_string(),
2659 node_id: Some(request.node_id.clone()),
2660 step_id: Some(request.node_id.clone()),
2661 node_kind: Some("llm_call".to_string()),
2662 streamable: Some(false),
2663 message: Some(format!(
2664 "tool roundtrip {} completed",
2665 roundtrip + 1
2666 )),
2667 delta: None,
2668 token_kind: None,
2669 is_terminal_node_token: None,
2670 elapsed_ms: None,
2671 metadata: Some(json!({
2672 "roundtrip": roundtrip + 1,
2673 "max_tool_roundtrips": request.max_tool_roundtrips,
2674 })),
2675 });
2676 }
2677 }
2678 }
2679
2680 return Err(format!(
2681 "tool-enabled llm_call '{}' exhausted loop without final payload",
2682 request.node_id
2683 ));
2684 }
2685
2686 let mut builder = CompletionRequest::builder()
2687 .model(&request.model)
2688 .messages(messages);
2689
2690 if request.heal && !request.stream && expects_object {
2691 builder = builder.json_schema("workflow_step", request.schema.clone());
2692 }
2693
2694 if request.stream {
2695 builder = builder.stream(true);
2696 }
2697
2698 let completion_request = builder
2699 .build()
2700 .map_err(|error| format!("failed to build completion request: {error}"))?;
2701
2702 let completion_options = if request.heal && !request.stream && expects_object {
2703 CompletionOptions {
2704 mode: CompletionMode::HealedJson,
2705 }
2706 } else {
2707 CompletionOptions::default()
2708 };
2709
2710 let outcome = self
2711 .client
2712 .complete(&completion_request, completion_options)
2713 .await
2714 .map_err(|error| error.to_string())?;
2715
2716 match outcome {
2717 CompletionOutcome::Stream(mut stream) => {
2718 let mut aggregated = String::new();
2719 let mut final_stream_usage: Option<simple_agent_type::response::Usage> = None;
2720 let stream_started = Instant::now();
2721 let mut ttft_ms: Option<u128> = None;
2722 let mut delta_filter = StructuredJsonDeltaFilter::default();
2723 let include_raw_debug = include_raw_stream_debug_events();
2724 let mut json_text_formatter = if request.stream_json_as_text {
2725 Some(StreamJsonAsTextFormatter::default())
2726 } else {
2727 None
2728 };
2729 while let Some(chunk_result) = stream.next().await {
2730 if event_sink_is_cancelled(event_sink) {
2731 return Err(workflow_event_sink_cancelled_message().to_string());
2732 }
2733 let chunk = chunk_result.map_err(|error| error.to_string())?;
2734 if let Some(usage) = chunk.usage {
2735 final_stream_usage = Some(usage);
2736 }
2737 if let Some(choice) = chunk.choices.first() {
2738 if ttft_ms.is_none()
2739 && (choice
2740 .delta
2741 .content
2742 .as_ref()
2743 .is_some_and(|delta| !delta.is_empty())
2744 || choice
2745 .delta
2746 .reasoning_content
2747 .as_ref()
2748 .is_some_and(|delta| !delta.is_empty()))
2749 {
2750 ttft_ms = Some(stream_started.elapsed().as_millis());
2751 }
2752 if include_raw_debug {
2753 if let Some(reasoning_delta) =
2754 choice.delta.reasoning_content.as_ref()
2755 {
2756 if let Some(sink) = event_sink {
2757 sink.emit(&YamlWorkflowEvent {
2758 event_type: "node_stream_thinking_delta".to_string(),
2759 node_id: Some(request.node_id.clone()),
2760 step_id: Some(request.node_id.clone()),
2761 node_kind: Some("llm_call".to_string()),
2762 streamable: Some(true),
2763 message: None,
2764 delta: Some(reasoning_delta.clone()),
2765 token_kind: Some(YamlWorkflowTokenKind::Thinking),
2766 is_terminal_node_token: Some(request.is_terminal_node),
2767 elapsed_ms: None,
2768 metadata: None,
2769 });
2770 }
2771 }
2772 }
2773 if let Some(delta) = choice.delta.content.clone() {
2774 aggregated.push_str(delta.as_str());
2775 let (output_delta, thinking_delta) = if expects_object {
2776 delta_filter.split(delta.as_str())
2777 } else {
2778 (Some(delta.clone()), None)
2779 };
2780 let rendered_output_delta = if let Some(output_chunk) = output_delta
2781 {
2782 if let Some(formatter) = json_text_formatter.as_mut() {
2783 formatter.push(output_chunk.as_str());
2784 formatter.emit_if_ready(delta_filter.completed())
2785 } else {
2786 Some(output_chunk)
2787 }
2788 } else {
2789 None
2790 };
2791 if include_raw_debug {
2792 if let Some(sink) = event_sink {
2793 if let Some(raw_thinking_delta) = thinking_delta.as_ref() {
2794 sink.emit(&YamlWorkflowEvent {
2795 event_type: "node_stream_thinking_delta"
2796 .to_string(),
2797 node_id: Some(request.node_id.clone()),
2798 step_id: Some(request.node_id.clone()),
2799 node_kind: Some("llm_call".to_string()),
2800 streamable: Some(true),
2801 message: None,
2802 delta: Some(raw_thinking_delta.clone()),
2803 token_kind: Some(YamlWorkflowTokenKind::Thinking),
2804 is_terminal_node_token: Some(
2805 request.is_terminal_node,
2806 ),
2807 elapsed_ms: None,
2808 metadata: None,
2809 });
2810 }
2811 if let Some(raw_output_delta) =
2812 rendered_output_delta.as_ref()
2813 {
2814 sink.emit(&YamlWorkflowEvent {
2815 event_type: "node_stream_output_delta".to_string(),
2816 node_id: Some(request.node_id.clone()),
2817 step_id: Some(request.node_id.clone()),
2818 node_kind: Some("llm_call".to_string()),
2819 streamable: Some(true),
2820 message: None,
2821 delta: Some(raw_output_delta.clone()),
2822 token_kind: Some(YamlWorkflowTokenKind::Output),
2823 is_terminal_node_token: Some(
2824 request.is_terminal_node,
2825 ),
2826 elapsed_ms: None,
2827 metadata: None,
2828 });
2829 }
2830 }
2831 }
2832 if let Some(filtered_delta) = rendered_output_delta {
2833 if let Some(sink) = event_sink {
2834 sink.emit(&YamlWorkflowEvent {
2835 event_type: "node_stream_delta".to_string(),
2836 node_id: Some(request.node_id.clone()),
2837 step_id: Some(request.node_id.clone()),
2838 node_kind: Some("llm_call".to_string()),
2839 streamable: Some(true),
2840 message: None,
2841 delta: Some(filtered_delta),
2842 token_kind: Some(YamlWorkflowTokenKind::Output),
2843 is_terminal_node_token: Some(request.is_terminal_node),
2844 elapsed_ms: None,
2845 metadata: None,
2846 });
2847 }
2848 }
2849 }
2850 }
2851
2852 if event_sink_is_cancelled(event_sink) {
2853 return Err(workflow_event_sink_cancelled_message().to_string());
2854 }
2855 }
2856
2857 let payload = if expects_object {
2858 let resolved =
2859 parse_streamed_structured_payload(aggregated.as_str(), request.heal)?;
2860 if let Some(confidence) = resolved.heal_confidence {
2861 if let Some(sink) = event_sink {
2862 sink.emit(&YamlWorkflowEvent {
2863 event_type: "node_healed".to_string(),
2864 node_id: Some(request.node_id.clone()),
2865 step_id: Some(request.node_id.clone()),
2866 node_kind: Some("llm_call".to_string()),
2867 streamable: Some(true),
2868 message: Some(format!(
2869 "healed streamed structured response confidence={confidence}"
2870 )),
2871 delta: None,
2872 token_kind: None,
2873 is_terminal_node_token: None,
2874 elapsed_ms: None,
2875 metadata: None,
2876 });
2877 }
2878 }
2879 resolved.payload
2880 } else {
2881 Value::String(aggregated)
2882 };
2883
2884 Ok(YamlLlmExecutionResult {
2885 payload,
2886 usage: final_stream_usage.map(|usage| YamlLlmTokenUsage {
2887 prompt_tokens: usage.prompt_tokens,
2888 completion_tokens: usage.completion_tokens,
2889 total_tokens: usage.total_tokens,
2890 thinking_tokens: None,
2891 }),
2892 ttft_ms,
2893 tool_calls: Vec::new(),
2894 })
2895 }
2896 CompletionOutcome::Response(response) => {
2897 let payload = if expects_object {
2898 let content = response
2899 .content()
2900 .ok_or_else(|| "completion returned empty content".to_string())?;
2901 serde_json::from_str(content).map_err(|error| {
2902 format!("failed to parse structured completion JSON: {error}")
2903 })?
2904 } else {
2905 Value::String(response.content().unwrap_or_default().to_string())
2906 };
2907
2908 Ok(YamlLlmExecutionResult {
2909 payload,
2910 usage: Some(YamlLlmTokenUsage {
2911 prompt_tokens: response.usage.prompt_tokens,
2912 completion_tokens: response.usage.completion_tokens,
2913 total_tokens: response.usage.total_tokens,
2914 thinking_tokens: None,
2915 }),
2916 ttft_ms: None,
2917 tool_calls: Vec::new(),
2918 })
2919 }
2920 CompletionOutcome::HealedJson(healed) => {
2921 if !expects_object {
2922 return Err(
2923 "healed json outcome is unsupported for non-object schema".to_string()
2924 );
2925 }
2926 if let Some(sink) = event_sink {
2927 sink.emit(&YamlWorkflowEvent {
2928 event_type: "node_healed".to_string(),
2929 node_id: Some(request.node_id.clone()),
2930 step_id: Some(request.node_id.clone()),
2931 node_kind: Some("llm_call".to_string()),
2932 streamable: Some(request.stream),
2933 message: Some(format!(
2934 "healed structured response confidence={}",
2935 healed.parsed.confidence
2936 )),
2937 delta: None,
2938 token_kind: None,
2939 is_terminal_node_token: None,
2940 elapsed_ms: None,
2941 metadata: None,
2942 });
2943 }
2944 Ok(YamlLlmExecutionResult {
2945 payload: healed.parsed.value,
2946 usage: Some(YamlLlmTokenUsage {
2947 prompt_tokens: healed.response.usage.prompt_tokens,
2948 completion_tokens: healed.response.usage.completion_tokens,
2949 total_tokens: healed.response.usage.total_tokens,
2950 thinking_tokens: None,
2951 }),
2952 ttft_ms: None,
2953 tool_calls: Vec::new(),
2954 })
2955 }
2956 CompletionOutcome::CoercedSchema(coerced) => {
2957 if !expects_object {
2958 return Err(
2959 "coerced schema outcome is unsupported for non-object schema"
2960 .to_string(),
2961 );
2962 }
2963 Ok(YamlLlmExecutionResult {
2964 payload: coerced.coerced.value,
2965 usage: Some(YamlLlmTokenUsage {
2966 prompt_tokens: coerced.response.usage.prompt_tokens,
2967 completion_tokens: coerced.response.usage.completion_tokens,
2968 total_tokens: coerced.response.usage.total_tokens,
2969 thinking_tokens: None,
2970 }),
2971 ttft_ms: None,
2972 tool_calls: Vec::new(),
2973 })
2974 }
2975 }
2976 }
2977 }
2978
2979 let executor = BorrowedClientExecutor {
2980 client,
2981 custom_worker,
2982 run_options: options.clone(),
2983 };
2984 run_workflow_yaml_with_custom_worker_and_events_and_options(
2985 workflow,
2986 workflow_input,
2987 &executor,
2988 custom_worker,
2989 event_sink,
2990 options,
2991 )
2992 .await
2993}
2994
2995pub async fn run_email_workflow_yaml_with_client_and_custom_worker_and_events(
2996 workflow: &YamlWorkflow,
2997 email_text: &str,
2998 client: &SimpleAgentsClient,
2999 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3000 event_sink: Option<&dyn YamlWorkflowEventSink>,
3001) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3002 let workflow_input = json!({ "email_text": email_text });
3003 run_workflow_yaml_with_client_and_custom_worker_and_events(
3004 workflow,
3005 &workflow_input,
3006 client,
3007 custom_worker,
3008 event_sink,
3009 )
3010 .await
3011}
3012
3013pub async fn run_workflow_yaml(
3014 workflow: &YamlWorkflow,
3015 workflow_input: &Value,
3016 executor: &dyn YamlWorkflowLlmExecutor,
3017) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3018 run_workflow_yaml_with_custom_worker_and_events(workflow, workflow_input, executor, None, None)
3019 .await
3020}
3021
3022pub async fn run_email_workflow_yaml(
3023 workflow: &YamlWorkflow,
3024 email_text: &str,
3025 executor: &dyn YamlWorkflowLlmExecutor,
3026) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3027 let workflow_input = json!({ "email_text": email_text });
3028 run_workflow_yaml(workflow, &workflow_input, executor).await
3029}
3030
3031pub async fn run_workflow_yaml_with_custom_worker(
3032 workflow: &YamlWorkflow,
3033 workflow_input: &Value,
3034 executor: &dyn YamlWorkflowLlmExecutor,
3035 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3036) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3037 run_workflow_yaml_with_custom_worker_and_events(
3038 workflow,
3039 workflow_input,
3040 executor,
3041 custom_worker,
3042 None,
3043 )
3044 .await
3045}
3046
3047pub async fn run_email_workflow_yaml_with_custom_worker(
3048 workflow: &YamlWorkflow,
3049 email_text: &str,
3050 executor: &dyn YamlWorkflowLlmExecutor,
3051 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3052) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3053 let workflow_input = json!({ "email_text": email_text });
3054 run_workflow_yaml_with_custom_worker(workflow, &workflow_input, executor, custom_worker).await
3055}
3056
3057pub async fn run_workflow_yaml_with_custom_worker_and_events(
3058 workflow: &YamlWorkflow,
3059 workflow_input: &Value,
3060 executor: &dyn YamlWorkflowLlmExecutor,
3061 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3062 event_sink: Option<&dyn YamlWorkflowEventSink>,
3063) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3064 run_workflow_yaml_with_custom_worker_and_events_and_options(
3065 workflow,
3066 workflow_input,
3067 executor,
3068 custom_worker,
3069 event_sink,
3070 &YamlWorkflowRunOptions::default(),
3071 )
3072 .await
3073}
3074
3075pub async fn run_workflow_yaml_with_custom_worker_and_events_and_options(
3076 workflow: &YamlWorkflow,
3077 workflow_input: &Value,
3078 executor: &dyn YamlWorkflowLlmExecutor,
3079 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3080 event_sink: Option<&dyn YamlWorkflowEventSink>,
3081 options: &YamlWorkflowRunOptions,
3082) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3083 if !workflow_input.is_object() {
3084 return Err(YamlWorkflowRunError::InvalidInput {
3085 message: "workflow input must be a JSON object".to_string(),
3086 });
3087 }
3088
3089 validate_sample_rate(options.telemetry.sample_rate)?;
3090
3091 let email_text = workflow_input
3092 .get("email_text")
3093 .and_then(Value::as_str)
3094 .unwrap_or_default();
3095
3096 let diagnostics = verify_yaml_workflow(workflow);
3097 let errors: Vec<YamlWorkflowDiagnostic> = diagnostics
3098 .iter()
3099 .filter(|d| d.severity == YamlWorkflowDiagnosticSeverity::Error)
3100 .cloned()
3101 .collect();
3102 if !errors.is_empty() {
3103 return Err(YamlWorkflowRunError::Validation {
3104 diagnostics_count: errors.len(),
3105 diagnostics: errors,
3106 });
3107 }
3108
3109 if let Some(output) =
3110 try_run_yaml_via_ir_runtime(workflow, workflow_input, executor, custom_worker, options)
3111 .await?
3112 {
3113 return Ok(output);
3114 }
3115
3116 let parent_trace_context = trace_context_from_options(options);
3117 let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3118
3119 let tracer = workflow_tracer();
3120 let mut workflow_span_context: Option<TraceContext> = None;
3121 let mut workflow_span = if telemetry_context.sampled {
3122 let (span_context, mut span) = tracer.start_span(
3123 "workflow.run",
3124 SpanKind::Workflow,
3125 parent_trace_context.as_ref(),
3126 );
3127 if let Some(trace_id_value) = telemetry_context.trace_id.as_deref() {
3128 span.set_attribute("trace_id", trace_id_value);
3129 }
3130 apply_trace_tenant_attributes(span.as_mut(), options);
3131 workflow_span_context = Some(span_context);
3132 Some(span)
3133 } else {
3134 None
3135 };
3136
3137 if workflow.nodes.is_empty() {
3138 return Err(YamlWorkflowRunError::EmptyNodes {
3139 workflow_id: workflow.id.clone(),
3140 });
3141 }
3142
3143 let node_map: HashMap<&str, &YamlNode> = workflow
3144 .nodes
3145 .iter()
3146 .map(|node| (node.id.as_str(), node))
3147 .collect();
3148 if !node_map.contains_key(workflow.entry_node.as_str()) {
3149 return Err(YamlWorkflowRunError::MissingEntry {
3150 entry_node: workflow.entry_node.clone(),
3151 });
3152 }
3153
3154 let edge_map: HashMap<&str, &str> = workflow
3155 .edges
3156 .iter()
3157 .map(|edge| (edge.from.as_str(), edge.to.as_str()))
3158 .collect();
3159
3160 let mut current = workflow.entry_node.clone();
3161 let mut trace = Vec::new();
3162 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
3163 let mut globals = serde_json::Map::new();
3164 let mut step_timings = Vec::new();
3165 let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
3166 let mut llm_node_models: BTreeMap<String, String> = BTreeMap::new();
3167 let mut token_totals = YamlTokenTotals::default();
3168 let mut workflow_ttft_ms: Option<u128> = None;
3169 let started = Instant::now();
3170
3171 if let Some(sink) = event_sink {
3172 sink.emit(&YamlWorkflowEvent {
3173 event_type: "workflow_started".to_string(),
3174 node_id: None,
3175 step_id: None,
3176 node_kind: None,
3177 streamable: None,
3178 message: Some(format!("workflow_id={}", workflow.id)),
3179 delta: None,
3180 token_kind: None,
3181 is_terminal_node_token: None,
3182 elapsed_ms: Some(0),
3183 metadata: None,
3184 });
3185 }
3186
3187 if event_sink_is_cancelled(event_sink) {
3188 return Err(YamlWorkflowRunError::EventSinkCancelled {
3189 message: workflow_event_sink_cancelled_message().to_string(),
3190 });
3191 }
3192
3193 loop {
3194 if event_sink_is_cancelled(event_sink) {
3195 return Err(YamlWorkflowRunError::EventSinkCancelled {
3196 message: workflow_event_sink_cancelled_message().to_string(),
3197 });
3198 }
3199
3200 let node =
3201 *node_map
3202 .get(current.as_str())
3203 .ok_or_else(|| YamlWorkflowRunError::MissingNode {
3204 node_id: current.clone(),
3205 })?;
3206
3207 trace.push(node.id.clone());
3208 let step_started = Instant::now();
3209
3210 let mut node_span_context: Option<TraceContext> = None;
3211 let mut node_span = if telemetry_context.sampled {
3212 let (span_context, mut span) = tracer.start_span(
3213 "workflow.node.execute",
3214 SpanKind::Node,
3215 workflow_span_context.as_ref(),
3216 );
3217 node_span_context = Some(span_context);
3218 span.set_attribute(
3219 "trace_id",
3220 telemetry_context.trace_id.as_deref().unwrap_or_default(),
3221 );
3222 span.set_attribute("node_id", node.id.as_str());
3223 span.set_attribute("node_kind", node.kind_name());
3224 Some(span)
3225 } else {
3226 None
3227 };
3228
3229 let node_streamable = node
3230 .node_type
3231 .llm_call
3232 .as_ref()
3233 .map(|llm| llm.stream.unwrap_or(false) && !llm.heal.unwrap_or(false));
3234 let workflow_elapsed_before_node_ms = started.elapsed().as_millis();
3235
3236 if let Some(sink) = event_sink {
3237 sink.emit(&YamlWorkflowEvent {
3238 event_type: "node_started".to_string(),
3239 node_id: Some(node.id.clone()),
3240 step_id: Some(node.id.clone()),
3241 node_kind: Some(node.kind_name().to_string()),
3242 streamable: node_streamable,
3243 message: if node_streamable == Some(false) {
3244 Some("Node is not streamable; status events only".to_string())
3245 } else {
3246 None
3247 },
3248 delta: None,
3249 token_kind: None,
3250 is_terminal_node_token: None,
3251 elapsed_ms: Some(workflow_elapsed_before_node_ms),
3252 metadata: None,
3253 });
3254 }
3255
3256 if event_sink_is_cancelled(event_sink) {
3257 return Err(YamlWorkflowRunError::EventSinkCancelled {
3258 message: workflow_event_sink_cancelled_message().to_string(),
3259 });
3260 }
3261
3262 let mut node_usage: Option<YamlLlmTokenUsage> = None;
3263 let is_terminal_node = !edge_map.contains_key(node.id.as_str());
3264 let next = if let Some(llm) = &node.node_type.llm_call {
3265 let prompt_template = node
3266 .config
3267 .as_ref()
3268 .and_then(|cfg| cfg.prompt.as_deref())
3269 .unwrap_or_default();
3270 let context = json!({
3271 "input": workflow_input,
3272 "nodes": outputs,
3273 "globals": Value::Object(globals.clone())
3274 });
3275 let messages = if let Some(path) = llm.messages_path.as_deref() {
3276 Some(
3277 parse_messages_from_context(path, &context).map_err(|message| {
3278 YamlWorkflowRunError::Llm {
3279 node_id: node.id.clone(),
3280 message,
3281 }
3282 })?,
3283 )
3284 } else {
3285 None
3286 };
3287 let prompt_bindings = collect_template_bindings(prompt_template, &context);
3288 let prompt = interpolate_template(prompt_template, &context);
3289 let schema = llm_output_schema_for_node(node);
3290
3291 let request = YamlLlmExecutionRequest {
3292 node_id: node.id.clone(),
3293 is_terminal_node,
3294 stream_json_as_text: llm.stream_json_as_text.unwrap_or(false),
3295 model: resolve_requested_model(options.model.as_deref(), &llm.model),
3296 messages,
3297 append_prompt_as_user: llm.append_prompt_as_user.unwrap_or(true),
3298 prompt,
3299 prompt_template: prompt_template.to_string(),
3300 prompt_bindings,
3301 schema,
3302 stream: llm.stream.unwrap_or(false),
3303 heal: llm.heal.unwrap_or(false),
3304 tools: normalize_llm_tools(llm).map_err(|message| YamlWorkflowRunError::Llm {
3305 node_id: node.id.clone(),
3306 message,
3307 })?,
3308 tool_choice: normalize_tool_choice(llm.tool_choice.clone()).map_err(|message| {
3309 YamlWorkflowRunError::Llm {
3310 node_id: node.id.clone(),
3311 message,
3312 }
3313 })?,
3314 max_tool_roundtrips: llm.max_tool_roundtrips.unwrap_or(1),
3315 tool_calls_global_key: llm.tool_calls_global_key.clone(),
3316 tool_trace_mode: options.telemetry.tool_trace_mode,
3317 execution_context: context.clone(),
3318 email_text: email_text.to_string(),
3319 trace_id: telemetry_context.trace_id.clone(),
3320 trace_context: node_span_context.clone(),
3321 trace_sampled: telemetry_context.sampled,
3322 };
3323
3324 if let Some(span) = node_span.as_mut() {
3325 span.set_attribute(
3326 "node_input",
3327 payload_for_span(options.telemetry.payload_mode, &context).as_str(),
3328 );
3329 }
3330
3331 if let Some(sink) = event_sink {
3332 sink.emit(&YamlWorkflowEvent {
3333 event_type: "node_llm_input_resolved".to_string(),
3334 node_id: Some(node.id.clone()),
3335 step_id: Some(node.id.clone()),
3336 node_kind: Some("llm_call".to_string()),
3337 streamable: Some(request.stream),
3338 message: Some("resolved llm input for telemetry".to_string()),
3339 delta: None,
3340 token_kind: None,
3341 is_terminal_node_token: None,
3342 elapsed_ms: Some(started.elapsed().as_millis()),
3343 metadata: Some(json!({
3344 "model": request.model.clone(),
3345 "stream_requested": request.stream,
3346 "stream_json_as_text": request.stream_json_as_text,
3347 "heal_requested": request.heal,
3348 "effective_stream": request.stream,
3349 "prompt_template": request.prompt_template.clone(),
3350 "prompt": request.prompt.clone(),
3351 "schema": request.schema.clone(),
3352 "bindings": request.prompt_bindings.clone(),
3353 "tools_count": request.tools.len(),
3354 "max_tool_roundtrips": request.max_tool_roundtrips,
3355 })),
3356 });
3357 }
3358
3359 llm_node_models.insert(node.id.clone(), request.model.clone());
3360
3361 if event_sink_is_cancelled(event_sink) {
3362 return Err(YamlWorkflowRunError::EventSinkCancelled {
3363 message: workflow_event_sink_cancelled_message().to_string(),
3364 });
3365 }
3366
3367 let llm_result = executor
3368 .complete_structured(request, event_sink)
3369 .await
3370 .map_err(|message| YamlWorkflowRunError::Llm {
3371 node_id: node.id.clone(),
3372 message,
3373 })?;
3374
3375 if let Some(usage) = llm_result.usage.as_ref() {
3376 token_totals.add_usage(usage);
3377 }
3378 if workflow_ttft_ms.is_none() {
3379 workflow_ttft_ms = llm_result
3380 .ttft_ms
3381 .map(|node_ttft_ms| workflow_elapsed_before_node_ms + node_ttft_ms);
3382 }
3383 node_usage = llm_result.usage;
3384
3385 let payload = llm_result.payload;
3386 let tool_calls = llm_result.tool_calls;
3387
3388 let mut node_output = json!({ "output": payload });
3389 if !tool_calls.is_empty() {
3390 if let Some(output_obj) = node_output.as_object_mut() {
3391 output_obj.insert("tool_calls".to_string(), json!(tool_calls));
3392 }
3393 }
3394 outputs.insert(node.id.clone(), node_output);
3395 if let Some(span) = node_span.as_mut() {
3396 if let Some(output_payload) = outputs.get(node.id.as_str()) {
3397 span.set_attribute(
3398 "node_output",
3399 payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
3400 );
3401 }
3402 }
3403 apply_set_globals(node, &outputs, workflow_input, &mut globals);
3404 apply_update_globals(node, &outputs, workflow_input, &mut globals);
3405 if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
3406 if let Some(node_tool_calls) = outputs
3407 .get(node.id.as_str())
3408 .and_then(|value| value.get("tool_calls"))
3409 .cloned()
3410 {
3411 globals.insert(global_key.clone(), node_tool_calls);
3412 }
3413 }
3414 edge_map
3415 .get(node.id.as_str())
3416 .map(|value| value.to_string())
3417 } else if let Some(switch) = &node.node_type.switch {
3418 let context = json!({
3419 "input": workflow_input,
3420 "nodes": outputs,
3421 "globals": Value::Object(globals.clone())
3422 });
3423 let mut chosen = Some(switch.default.clone());
3424 for branch in &switch.branches {
3425 if evaluate_switch_condition(branch.condition.as_str(), &context)? {
3426 chosen = Some(branch.target.clone());
3427 break;
3428 }
3429 }
3430 let chosen = chosen.ok_or_else(|| YamlWorkflowRunError::InvalidSwitchTarget {
3431 node_id: node.id.clone(),
3432 })?;
3433 Some(chosen)
3434 } else if let Some(custom) = &node.node_type.custom_worker {
3435 let payload = node
3436 .config
3437 .as_ref()
3438 .and_then(|cfg| cfg.payload.as_ref())
3439 .cloned()
3440 .unwrap_or_else(|| json!({}));
3441 let context = json!({
3442 "input": workflow_input,
3443 "nodes": outputs,
3444 "globals": Value::Object(globals.clone())
3445 });
3446
3447 if let Some(span) = node_span.as_mut() {
3448 span.set_attribute("handler_name", custom.handler.as_str());
3449 span.set_attribute(
3450 "node_input",
3451 payload_for_span(options.telemetry.payload_mode, &payload).as_str(),
3452 );
3453 }
3454
3455 let mut handler_span_context: Option<TraceContext> = None;
3456 let mut handler_span = if telemetry_context.sampled {
3457 let (span_context, mut span) = tracer.start_span(
3458 "handler.invoke",
3459 SpanKind::Node,
3460 workflow_span_context.as_ref(),
3461 );
3462 handler_span_context = Some(span_context);
3463 span.set_attribute(
3464 "trace_id",
3465 telemetry_context.trace_id.as_deref().unwrap_or_default(),
3466 );
3467 span.set_attribute("handler_name", custom.handler.as_str());
3468 apply_trace_tenant_attributes(span.as_mut(), options);
3469 Some(span)
3470 } else {
3471 None
3472 };
3473
3474 let worker_trace_context = merged_trace_context_for_worker(
3475 handler_span_context.as_ref(),
3476 telemetry_context.trace_id.as_deref(),
3477 options,
3478 );
3479 let worker_context = custom_worker_context_with_trace(
3480 &context,
3481 &worker_trace_context,
3482 &options.trace.tenant,
3483 );
3484
3485 let worker_output_result = if let Some(custom_worker_executor) = custom_worker {
3486 custom_worker_executor
3487 .execute(
3488 custom.handler.as_str(),
3489 &payload,
3490 email_text,
3491 &worker_context,
3492 )
3493 .await
3494 .map_err(|message| YamlWorkflowRunError::CustomWorker {
3495 node_id: node.id.clone(),
3496 message,
3497 })
3498 } else {
3499 mock_custom_worker_output(custom.handler.as_str(), &payload)
3500 };
3501
3502 if let Some(span) = handler_span.take() {
3503 span.end();
3504 }
3505
3506 let worker_output = worker_output_result?;
3507
3508 outputs.insert(node.id.clone(), json!({ "output": worker_output }));
3509 if let Some(span) = node_span.as_mut() {
3510 if let Some(output_payload) = outputs.get(node.id.as_str()) {
3511 span.set_attribute(
3512 "node_output",
3513 payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
3514 );
3515 }
3516 }
3517 apply_set_globals(node, &outputs, workflow_input, &mut globals);
3518 apply_update_globals(node, &outputs, workflow_input, &mut globals);
3519 edge_map
3520 .get(node.id.as_str())
3521 .map(|value| value.to_string())
3522 } else {
3523 return Err(YamlWorkflowRunError::UnsupportedNodeType {
3524 node_id: node.id.clone(),
3525 });
3526 };
3527
3528 let node_kind = node.kind_name().to_string();
3529 let elapsed_ms = step_started.elapsed().as_millis();
3530 step_timings.push(YamlStepTiming {
3531 node_id: node.id.clone(),
3532 node_kind,
3533 elapsed_ms,
3534 prompt_tokens: node_usage.as_ref().map(|usage| usage.prompt_tokens),
3535 completion_tokens: node_usage.as_ref().map(|usage| usage.completion_tokens),
3536 total_tokens: node_usage.as_ref().map(|usage| usage.total_tokens),
3537 thinking_tokens: node_usage.as_ref().and_then(|usage| usage.thinking_tokens),
3538 tokens_per_second: node_usage
3539 .as_ref()
3540 .map(|usage| completion_tokens_per_second(usage.completion_tokens, elapsed_ms)),
3541 });
3542
3543 if let Some(usage) = node_usage.as_ref() {
3544 llm_node_metrics.insert(
3545 node.id.clone(),
3546 YamlLlmNodeMetrics {
3547 elapsed_ms,
3548 prompt_tokens: usage.prompt_tokens,
3549 completion_tokens: usage.completion_tokens,
3550 total_tokens: usage.total_tokens,
3551 thinking_tokens: usage.thinking_tokens,
3552 tokens_per_second: completion_tokens_per_second(
3553 usage.completion_tokens,
3554 elapsed_ms,
3555 ),
3556 },
3557 );
3558 }
3559
3560 if let Some(mut span) = node_span.take() {
3561 span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
3562 span.add_event("node_completed");
3563 span.end();
3564 }
3565
3566 if let Some(sink) = event_sink {
3567 sink.emit(&YamlWorkflowEvent {
3568 event_type: "node_completed".to_string(),
3569 node_id: Some(node.id.clone()),
3570 step_id: Some(node.id.clone()),
3571 node_kind: Some(node.kind_name().to_string()),
3572 streamable: node_streamable,
3573 message: None,
3574 delta: None,
3575 token_kind: None,
3576 is_terminal_node_token: None,
3577 elapsed_ms: Some(elapsed_ms),
3578 metadata: None,
3579 });
3580 }
3581
3582 if event_sink_is_cancelled(event_sink) {
3583 return Err(YamlWorkflowRunError::EventSinkCancelled {
3584 message: workflow_event_sink_cancelled_message().to_string(),
3585 });
3586 }
3587
3588 if let Some(next) = next {
3589 current = next;
3590 continue;
3591 }
3592 break;
3593 }
3594
3595 let terminal_node = trace
3596 .last()
3597 .cloned()
3598 .ok_or_else(|| YamlWorkflowRunError::EmptyNodes {
3599 workflow_id: workflow.id.clone(),
3600 })?;
3601
3602 let terminal_output = outputs
3603 .get(terminal_node.as_str())
3604 .and_then(|value| value.get("output"))
3605 .cloned();
3606
3607 let total_elapsed_ms = started.elapsed().as_millis();
3608 let output = YamlWorkflowRunOutput {
3609 workflow_id: workflow.id.clone(),
3610 entry_node: workflow.entry_node.clone(),
3611 email_text: email_text.to_string(),
3612 trace,
3613 outputs,
3614 terminal_node,
3615 terminal_output,
3616 step_timings,
3617 llm_node_metrics,
3618 llm_node_models,
3619 total_elapsed_ms,
3620 ttft_ms: workflow_ttft_ms,
3621 total_input_tokens: token_totals.input_tokens,
3622 total_output_tokens: token_totals.output_tokens,
3623 total_tokens: token_totals.total_tokens,
3624 total_thinking_tokens: token_totals.thinking_tokens,
3625 tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
3626 trace_id: telemetry_context.trace_id.clone(),
3627 metadata: telemetry_context.trace_id.as_ref().map(|value| {
3628 workflow_metadata_with_trace(
3629 options,
3630 value,
3631 telemetry_context.sampled,
3632 telemetry_context.trace_id_source,
3633 )
3634 }),
3635 };
3636
3637 if let Some(sink) = event_sink {
3638 let event_metadata = if options.telemetry.nerdstats {
3639 Some(json!({
3640 "nerdstats": workflow_nerdstats(&output),
3641 }))
3642 } else {
3643 None
3644 };
3645 sink.emit(&YamlWorkflowEvent {
3646 event_type: "workflow_completed".to_string(),
3647 node_id: None,
3648 step_id: None,
3649 node_kind: None,
3650 streamable: None,
3651 message: Some(format!("terminal_node={}", output.terminal_node)),
3652 delta: None,
3653 token_kind: None,
3654 is_terminal_node_token: None,
3655 elapsed_ms: Some(output.total_elapsed_ms),
3656 metadata: event_metadata,
3657 });
3658 }
3659
3660 if event_sink_is_cancelled(event_sink) {
3661 return Err(YamlWorkflowRunError::EventSinkCancelled {
3662 message: workflow_event_sink_cancelled_message().to_string(),
3663 });
3664 }
3665
3666 if let Some(mut span) = workflow_span.take() {
3667 span.set_attribute("workflow_id", workflow.id.as_str());
3668 if let Some(trace_id_value) = telemetry_context.trace_id.as_ref() {
3669 span.set_attribute("trace_id", trace_id_value.as_str());
3670 }
3671 span.end();
3672 flush_workflow_tracer();
3673 }
3674
3675 Ok(output)
3676}
3677
3678async fn try_run_yaml_via_ir_runtime(
3679 workflow: &YamlWorkflow,
3680 workflow_input: &Value,
3681 executor: &dyn YamlWorkflowLlmExecutor,
3682 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3683 options: &YamlWorkflowRunOptions,
3684) -> Result<Option<YamlWorkflowRunOutput>, YamlWorkflowRunError> {
3685 let ir = match yaml_workflow_to_ir(workflow) {
3686 Ok(def) => def,
3687 Err(YamlToIrError::UnsupportedNode { .. })
3688 | Err(YamlToIrError::MultipleOutgoingEdge { .. }) => return Ok(None),
3689 Err(err) => {
3690 return Err(YamlWorkflowRunError::InvalidInput {
3691 message: err.to_string(),
3692 });
3693 }
3694 };
3695
3696 let parent_trace_context = trace_context_from_options(options);
3697 let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3698
3699 let tracer = workflow_tracer();
3700 let mut workflow_span_context: Option<TraceContext> = None;
3701 let mut workflow_span = if telemetry_context.sampled {
3702 let (span_context, mut span) = tracer.start_span(
3703 "workflow.run",
3704 SpanKind::Workflow,
3705 parent_trace_context.as_ref(),
3706 );
3707 if let Some(trace_id_value) = telemetry_context.trace_id.as_deref() {
3708 span.set_attribute("trace_id", trace_id_value);
3709 }
3710 apply_trace_tenant_attributes(span.as_mut(), options);
3711 workflow_span_context = Some(span_context);
3712 Some(span)
3713 } else {
3714 None
3715 };
3716
3717 struct NoopLlm;
3718 #[async_trait]
3719 impl LlmExecutor for NoopLlm {
3720 async fn execute(
3721 &self,
3722 _input: LlmExecutionInput,
3723 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
3724 Err(LlmExecutionError::UnexpectedOutcome(
3725 "yaml_ir_uses_tool_path",
3726 ))
3727 }
3728 }
3729
3730 struct YamlIrToolExecutor<'a> {
3731 llm_executor: &'a dyn YamlWorkflowLlmExecutor,
3732 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
3733 token_totals: std::sync::Mutex<YamlTokenTotals>,
3734 node_usage: std::sync::Mutex<BTreeMap<String, YamlLlmTokenUsage>>,
3735 node_models: std::sync::Mutex<BTreeMap<String, String>>,
3736 model_override: Option<String>,
3737 trace_id: Option<String>,
3738 trace_context: Option<TraceContext>,
3739 trace_input_context: Option<YamlWorkflowTraceContextInput>,
3740 tenant_context: YamlWorkflowTraceTenantContext,
3741 payload_mode: YamlWorkflowPayloadMode,
3742 trace_sampled: bool,
3743 }
3744
3745 #[async_trait]
3746 impl ToolExecutor for YamlIrToolExecutor<'_> {
3747 async fn execute_tool(
3748 &self,
3749 input: ToolExecutionInput,
3750 ) -> Result<Value, ToolExecutionError> {
3751 let context = build_yaml_context_from_ir_scope(&input.scoped_input);
3752
3753 if input.tool == YAML_LLM_TOOL_ID {
3754 let node_id = input
3755 .input
3756 .get("node_id")
3757 .and_then(Value::as_str)
3758 .ok_or_else(|| {
3759 ToolExecutionError::Failed("yaml llm call missing node_id".to_string())
3760 })?
3761 .to_string();
3762 let node_id_for_metrics = node_id.clone();
3763 let model = input
3764 .input
3765 .get("model")
3766 .and_then(Value::as_str)
3767 .ok_or_else(|| {
3768 ToolExecutionError::Failed("yaml llm call missing model".to_string())
3769 })?
3770 .to_string();
3771 let resolved_model =
3772 resolve_requested_model(self.model_override.as_deref(), &model);
3773 let prompt_template = input
3774 .input
3775 .get("prompt_template")
3776 .and_then(Value::as_str)
3777 .unwrap_or_default()
3778 .to_string();
3779 let stream = input
3780 .input
3781 .get("stream")
3782 .and_then(Value::as_bool)
3783 .unwrap_or(false);
3784 let heal = input
3785 .input
3786 .get("heal")
3787 .and_then(Value::as_bool)
3788 .unwrap_or(false);
3789 let append_prompt_as_user = input
3790 .input
3791 .get("append_prompt_as_user")
3792 .and_then(Value::as_bool)
3793 .unwrap_or(true);
3794 let messages_path = input
3795 .input
3796 .get("messages_path")
3797 .and_then(Value::as_str)
3798 .map(str::to_string);
3799
3800 let messages = if let Some(path) = messages_path.as_deref() {
3801 Some(
3802 parse_messages_from_context(path, &context)
3803 .map_err(ToolExecutionError::Failed)?,
3804 )
3805 } else {
3806 None
3807 };
3808
3809 let prompt_bindings = collect_template_bindings(&prompt_template, &context);
3810 let prompt = interpolate_template(&prompt_template, &context);
3811 let email_text = context
3812 .get("input")
3813 .and_then(|v| v.get("email_text"))
3814 .and_then(Value::as_str)
3815 .unwrap_or_default();
3816 let schema = input
3817 .input
3818 .get("output_schema")
3819 .cloned()
3820 .unwrap_or_else(default_llm_output_schema);
3821
3822 let request = YamlLlmExecutionRequest {
3823 node_id,
3824 is_terminal_node: false,
3825 stream_json_as_text: input
3826 .input
3827 .get("stream_json_as_text")
3828 .and_then(Value::as_bool)
3829 .unwrap_or(false),
3830 model: resolved_model.clone(),
3831 messages,
3832 append_prompt_as_user,
3833 prompt,
3834 prompt_template,
3835 prompt_bindings,
3836 schema,
3837 stream,
3838 heal,
3839 tools: Vec::new(),
3840 tool_choice: None,
3841 max_tool_roundtrips: 1,
3842 tool_calls_global_key: None,
3843 tool_trace_mode: YamlToolTraceMode::Off,
3844 execution_context: context.clone(),
3845 email_text: email_text.to_string(),
3846 trace_id: self.trace_id.clone(),
3847 trace_context: self.trace_context.clone(),
3848 trace_sampled: self.trace_sampled,
3849 };
3850
3851 let llm_result = self
3852 .llm_executor
3853 .complete_structured(request, None)
3854 .await
3855 .map_err(ToolExecutionError::Failed);
3856
3857 if let Ok(ref result) = llm_result {
3858 if let Some(usage) = result.usage.as_ref() {
3859 if let Ok(mut totals) = self.token_totals.lock() {
3860 totals.add_usage(usage);
3861 }
3862 if let Ok(mut usage_map) = self.node_usage.lock() {
3863 usage_map.insert(node_id_for_metrics.clone(), usage.clone());
3864 }
3865 }
3866 if let Ok(mut model_map) = self.node_models.lock() {
3867 model_map.insert(node_id_for_metrics, resolved_model);
3868 }
3869 }
3870
3871 return llm_result.map(|result| result.payload);
3872 }
3873
3874 let worker = self
3875 .custom_worker
3876 .ok_or_else(|| ToolExecutionError::NotFound {
3877 tool: input.tool.clone(),
3878 })?;
3879
3880 let payload = input.input.clone();
3881 let email_text = context
3882 .get("input")
3883 .and_then(|v| v.get("email_text"))
3884 .and_then(Value::as_str)
3885 .unwrap_or_default();
3886
3887 let tracer = workflow_tracer();
3888 let mut handler_span_context: Option<TraceContext> = None;
3889 let mut handler_span = if self.trace_sampled {
3890 let (span_context, mut span) = tracer.start_span(
3891 "handler.invoke",
3892 SpanKind::Node,
3893 self.trace_context.as_ref(),
3894 );
3895 handler_span_context = Some(span_context);
3896 if let Some(trace_id) = self.trace_id.as_ref() {
3897 span.set_attribute("trace_id", trace_id.as_str());
3898 }
3899 span.set_attribute("handler_name", input.tool.as_str());
3900 if let Some(workspace_id) = self.tenant_context.workspace_id.as_deref() {
3901 span.set_attribute("tenant.workspace_id", workspace_id);
3902 }
3903 if let Some(user_id) = self.tenant_context.user_id.as_deref() {
3904 span.set_attribute("tenant.user_id", user_id);
3905 }
3906 if let Some(conversation_id) = self.tenant_context.conversation_id.as_deref() {
3907 span.set_attribute("tenant.conversation_id", conversation_id);
3908 }
3909 if let Some(request_id) = self.tenant_context.request_id.as_deref() {
3910 span.set_attribute("tenant.request_id", request_id);
3911 }
3912 if let Some(run_id) = self.tenant_context.run_id.as_deref() {
3913 span.set_attribute("tenant.run_id", run_id);
3914 }
3915 span.set_attribute(
3916 "node_input",
3917 payload_for_span(self.payload_mode, &payload).as_str(),
3918 );
3919 Some(span)
3920 } else {
3921 None
3922 };
3923
3924 let trace_options = YamlWorkflowRunOptions {
3925 telemetry: YamlWorkflowTelemetryConfig::default(),
3926 trace: YamlWorkflowTraceOptions {
3927 context: self.trace_input_context.clone(),
3928 tenant: self.tenant_context.clone(),
3929 },
3930 model: None,
3931 };
3932 let worker_trace_context = merged_trace_context_for_worker(
3933 handler_span_context.as_ref(),
3934 self.trace_id.as_deref(),
3935 &trace_options,
3936 );
3937 let worker_context = custom_worker_context_with_trace(
3938 &context,
3939 &worker_trace_context,
3940 &self.tenant_context,
3941 );
3942
3943 let output_result = worker
3944 .execute(&input.tool, &payload, email_text, &worker_context)
3945 .await
3946 .map_err(ToolExecutionError::Failed);
3947
3948 if let Some(span) = handler_span.as_mut() {
3949 if output_result.is_ok() {
3950 span.add_event("handler.success");
3951 } else {
3952 span.add_event("handler.error");
3953 }
3954 }
3955
3956 if let Some(span) = handler_span.take() {
3957 span.end();
3958 }
3959
3960 output_result
3961 }
3962 }
3963
3964 let tool_executor = YamlIrToolExecutor {
3965 llm_executor: executor,
3966 custom_worker,
3967 token_totals: std::sync::Mutex::new(YamlTokenTotals::default()),
3968 node_usage: std::sync::Mutex::new(BTreeMap::new()),
3969 node_models: std::sync::Mutex::new(BTreeMap::new()),
3970 model_override: options.model.clone(),
3971 trace_id: telemetry_context.trace_id.clone(),
3972 trace_context: workflow_span_context.clone(),
3973 trace_input_context: options.trace.context.clone(),
3974 tenant_context: options.trace.tenant.clone(),
3975 payload_mode: options.telemetry.payload_mode,
3976 trace_sampled: telemetry_context.sampled,
3977 };
3978
3979 let runtime = WorkflowRuntime::new(
3980 ir,
3981 &NoopLlm,
3982 Some(&tool_executor),
3983 WorkflowRuntimeOptions::default(),
3984 );
3985
3986 let started = Instant::now();
3987 let result = match runtime.execute(workflow_input.clone(), None).await {
3988 Ok(result) => result,
3989 Err(WorkflowRuntimeError::Validation(_)) => return Ok(None),
3990 Err(error) => {
3991 return Err(YamlWorkflowRunError::IrRuntime {
3992 message: error.to_string(),
3993 });
3994 }
3995 };
3996 let total_elapsed_ms = started.elapsed().as_millis();
3997
3998 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
3999 for (node_id, output) in result.node_outputs {
4000 if node_id == YAML_START_NODE_ID {
4001 continue;
4002 }
4003 outputs.insert(node_id, json!({"output": output}));
4004 }
4005
4006 let mut trace = Vec::new();
4007 let mut step_timings = Vec::new();
4008 let node_usage_map = tool_executor
4009 .node_usage
4010 .lock()
4011 .map(|usage| usage.clone())
4012 .unwrap_or_default();
4013 let llm_node_models = tool_executor
4014 .node_models
4015 .lock()
4016 .map(|models| models.clone())
4017 .unwrap_or_default();
4018 let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
4019 for execution in result.node_executions {
4020 if execution.node_id == YAML_START_NODE_ID {
4021 continue;
4022 }
4023 trace.push(execution.node_id.clone());
4024 let usage = node_usage_map.get(&execution.node_id);
4025 if let Some(usage) = usage {
4026 llm_node_metrics.insert(
4027 execution.node_id.clone(),
4028 YamlLlmNodeMetrics {
4029 elapsed_ms: 0,
4030 prompt_tokens: usage.prompt_tokens,
4031 completion_tokens: usage.completion_tokens,
4032 total_tokens: usage.total_tokens,
4033 thinking_tokens: usage.thinking_tokens,
4034 tokens_per_second: completion_tokens_per_second(usage.completion_tokens, 0),
4035 },
4036 );
4037 }
4038 step_timings.push(YamlStepTiming {
4039 node_id: execution.node_id,
4040 node_kind: "ir_runtime".to_string(),
4041 elapsed_ms: 0,
4042 prompt_tokens: usage.map(|value| value.prompt_tokens),
4043 completion_tokens: usage.map(|value| value.completion_tokens),
4044 total_tokens: usage.map(|value| value.total_tokens),
4045 thinking_tokens: usage.and_then(|value| value.thinking_tokens),
4046 tokens_per_second: usage
4047 .map(|value| completion_tokens_per_second(value.completion_tokens, 0)),
4048 });
4049 }
4050
4051 let terminal_node = result.terminal_node_id;
4052 let terminal_output = outputs
4053 .get(&terminal_node)
4054 .and_then(|v| v.get("output"))
4055 .cloned();
4056
4057 let email_text = workflow_input
4058 .get("email_text")
4059 .and_then(Value::as_str)
4060 .unwrap_or_default()
4061 .to_string();
4062
4063 let token_totals = tool_executor
4064 .token_totals
4065 .lock()
4066 .map(|totals| totals.clone())
4067 .unwrap_or_default();
4068
4069 if let Some(mut span) = workflow_span.take() {
4070 span.set_attribute("workflow_id", workflow.id.as_str());
4071 if let Some(trace_id_value) = telemetry_context.trace_id.as_ref() {
4072 span.set_attribute("trace_id", trace_id_value.as_str());
4073 }
4074 span.end();
4075 flush_workflow_tracer();
4076 }
4077
4078 Ok(Some(YamlWorkflowRunOutput {
4079 workflow_id: workflow.id.clone(),
4080 entry_node: workflow.entry_node.clone(),
4081 email_text,
4082 trace,
4083 outputs,
4084 terminal_node,
4085 terminal_output,
4086 step_timings,
4087 llm_node_metrics,
4088 llm_node_models,
4089 total_elapsed_ms,
4090 ttft_ms: None,
4091 total_input_tokens: token_totals.input_tokens,
4092 total_output_tokens: token_totals.output_tokens,
4093 total_tokens: token_totals.total_tokens,
4094 total_thinking_tokens: token_totals.thinking_tokens,
4095 tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
4096 trace_id: telemetry_context.trace_id.clone(),
4097 metadata: telemetry_context.trace_id.as_ref().map(|value| {
4098 workflow_metadata_with_trace(
4099 options,
4100 value,
4101 telemetry_context.sampled,
4102 telemetry_context.trace_id_source,
4103 )
4104 }),
4105 }))
4106}
4107
4108fn build_yaml_context_from_ir_scope(scoped_input: &Value) -> Value {
4109 let input = scoped_input.get("input").cloned().unwrap_or(Value::Null);
4110
4111 let mut nodes = serde_json::Map::new();
4112 if let Some(node_outputs) = scoped_input.get("node_outputs").and_then(Value::as_object) {
4113 for (node_id, output) in node_outputs {
4114 nodes.insert(node_id.clone(), json!({"output": output.clone()}));
4115 }
4116 }
4117
4118 json!({
4119 "input": input,
4120 "nodes": Value::Object(nodes),
4121 "globals": Value::Object(serde_json::Map::new())
4122 })
4123}
4124
4125pub async fn run_email_workflow_yaml_with_custom_worker_and_events(
4126 workflow: &YamlWorkflow,
4127 email_text: &str,
4128 executor: &dyn YamlWorkflowLlmExecutor,
4129 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4130 event_sink: Option<&dyn YamlWorkflowEventSink>,
4131) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
4132 let workflow_input = json!({ "email_text": email_text });
4133 run_workflow_yaml_with_custom_worker_and_events(
4134 workflow,
4135 &workflow_input,
4136 executor,
4137 custom_worker,
4138 event_sink,
4139 )
4140 .await
4141}
4142
4143fn evaluate_switch_condition(
4144 condition: &str,
4145 context: &Value,
4146) -> Result<bool, YamlWorkflowRunError> {
4147 let (left, right) =
4148 condition
4149 .split_once("==")
4150 .ok_or_else(|| YamlWorkflowRunError::UnsupportedCondition {
4151 condition: condition.to_string(),
4152 })?;
4153
4154 let left_path = left.trim().trim_start_matches("$.");
4155 let right_literal = right.trim().trim_matches('"').trim_matches('\'');
4156 let left_value = resolve_path(context, left_path);
4157 Ok(left_value
4158 .and_then(Value::as_str)
4159 .map(|value| value == right_literal)
4160 .unwrap_or(false))
4161}
4162
4163fn parse_messages_from_context(path: &str, context: &Value) -> Result<Vec<Message>, String> {
4164 let normalized_path = path.trim().trim_start_matches("$.");
4165 let value = resolve_path(context, normalized_path)
4166 .ok_or_else(|| format!("messages_path not found: {path}"))?;
4167 let list: Vec<WorkflowMessage> = serde_json::from_value(value.clone()).map_err(|err| {
4168 format!("messages_path must resolve to a list of messages: {path}; {err}")
4169 })?;
4170 if list.is_empty() {
4171 return Err(format!(
4172 "messages_path must not resolve to an empty list: {path}"
4173 ));
4174 }
4175
4176 let mut messages = Vec::with_capacity(list.len());
4177 for (index, item) in list.into_iter().enumerate() {
4178 let mut message = match item.role {
4179 Role::System => Message::system(item.content),
4180 Role::User => Message::user(item.content),
4181 Role::Assistant => Message::assistant(item.content),
4182 Role::Tool => {
4183 let tool_call_id = item
4184 .tool_call_id
4185 .ok_or_else(|| format!("tool message at index {index} missing tool_call_id"))?;
4186 Message::tool(item.content, tool_call_id)
4187 }
4188 };
4189
4190 if let Some(name) = item.name {
4191 message = message.with_name(name);
4192 }
4193
4194 messages.push(message);
4195 }
4196
4197 Ok(messages)
4198}
4199
4200pub fn verify_yaml_workflow(workflow: &YamlWorkflow) -> Vec<YamlWorkflowDiagnostic> {
4201 let mut diagnostics = Vec::new();
4202 let known_ids: HashMap<&str, &YamlNode> = workflow
4203 .nodes
4204 .iter()
4205 .map(|node| (node.id.as_str(), node))
4206 .collect();
4207
4208 if !known_ids.contains_key(workflow.entry_node.as_str()) {
4209 diagnostics.push(YamlWorkflowDiagnostic {
4210 node_id: None,
4211 code: "missing_entry".to_string(),
4212 severity: YamlWorkflowDiagnosticSeverity::Error,
4213 message: format!("entry node '{}' does not exist", workflow.entry_node),
4214 });
4215 }
4216
4217 for edge in &workflow.edges {
4218 if !known_ids.contains_key(edge.from.as_str()) {
4219 diagnostics.push(YamlWorkflowDiagnostic {
4220 node_id: Some(edge.from.clone()),
4221 code: "unknown_edge_from".to_string(),
4222 severity: YamlWorkflowDiagnosticSeverity::Error,
4223 message: format!("edge.from '{}' does not exist", edge.from),
4224 });
4225 }
4226 if !known_ids.contains_key(edge.to.as_str()) {
4227 diagnostics.push(YamlWorkflowDiagnostic {
4228 node_id: Some(edge.to.clone()),
4229 code: "unknown_edge_to".to_string(),
4230 severity: YamlWorkflowDiagnosticSeverity::Error,
4231 message: format!("edge.to '{}' does not exist", edge.to),
4232 });
4233 }
4234 }
4235
4236 for node in &workflow.nodes {
4237 if let Some(llm) = &node.node_type.llm_call {
4238 if llm.model.trim().is_empty() {
4239 diagnostics.push(YamlWorkflowDiagnostic {
4240 node_id: Some(node.id.clone()),
4241 code: "empty_model".to_string(),
4242 severity: YamlWorkflowDiagnosticSeverity::Error,
4243 message: "llm_call.model must not be empty".to_string(),
4244 });
4245 }
4246 if llm.stream.unwrap_or(false) && llm.heal.unwrap_or(false) {
4247 diagnostics.push(YamlWorkflowDiagnostic {
4248 node_id: Some(node.id.clone()),
4249 code: "stream_heal_conflict".to_string(),
4250 severity: YamlWorkflowDiagnosticSeverity::Warning,
4251 message:
4252 "llm_call.stream=true with heal=true is not streamable; runtime will disable streaming"
4253 .to_string(),
4254 });
4255 }
4256
4257 if llm.max_tool_roundtrips.unwrap_or(1) == 0 {
4258 diagnostics.push(YamlWorkflowDiagnostic {
4259 node_id: Some(node.id.clone()),
4260 code: "invalid_max_tool_roundtrips".to_string(),
4261 severity: YamlWorkflowDiagnosticSeverity::Error,
4262 message: "llm_call.max_tool_roundtrips must be >= 1".to_string(),
4263 });
4264 }
4265
4266 if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
4267 if global_key.trim().is_empty() {
4268 diagnostics.push(YamlWorkflowDiagnostic {
4269 node_id: Some(node.id.clone()),
4270 code: "empty_tool_calls_global_key".to_string(),
4271 severity: YamlWorkflowDiagnosticSeverity::Error,
4272 message: "llm_call.tool_calls_global_key must not be empty".to_string(),
4273 });
4274 }
4275 }
4276
4277 match normalize_tool_choice(llm.tool_choice.clone()) {
4278 Ok(choice) => {
4279 if let Some(ToolChoice::Tool(choice_tool)) = choice.as_ref() {
4280 if !llm.tools.iter().any(|tool| match (llm.tools_format, tool) {
4281 (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4282 openai.function.name == choice_tool.function.name
4283 }
4284 (
4285 YamlToolFormat::Simplified,
4286 YamlToolDeclaration::Simplified(simple),
4287 ) => simple.name == choice_tool.function.name,
4288 _ => false,
4289 }) {
4290 diagnostics.push(YamlWorkflowDiagnostic {
4291 node_id: Some(node.id.clone()),
4292 code: "unknown_tool_choice_function".to_string(),
4293 severity: YamlWorkflowDiagnosticSeverity::Error,
4294 message: format!(
4295 "llm_call.tool_choice references unknown function '{}'",
4296 choice_tool.function.name
4297 ),
4298 });
4299 }
4300 }
4301 }
4302 Err(message) => {
4303 diagnostics.push(YamlWorkflowDiagnostic {
4304 node_id: Some(node.id.clone()),
4305 code: "invalid_tool_choice".to_string(),
4306 severity: YamlWorkflowDiagnosticSeverity::Error,
4307 message,
4308 });
4309 }
4310 }
4311
4312 let normalized_tools = match normalize_llm_tools(llm) {
4313 Ok(tools) => tools,
4314 Err(message) => {
4315 diagnostics.push(YamlWorkflowDiagnostic {
4316 node_id: Some(node.id.clone()),
4317 code: "invalid_tools_format".to_string(),
4318 severity: YamlWorkflowDiagnosticSeverity::Error,
4319 message,
4320 });
4321 Vec::new()
4322 }
4323 };
4324
4325 let mut seen_tool_names = HashSet::new();
4326 for tool in &normalized_tools {
4327 let name = tool.definition.function.name.trim();
4328 if name.is_empty() {
4329 diagnostics.push(YamlWorkflowDiagnostic {
4330 node_id: Some(node.id.clone()),
4331 code: "empty_tool_name".to_string(),
4332 severity: YamlWorkflowDiagnosticSeverity::Error,
4333 message: "tool function name must not be empty".to_string(),
4334 });
4335 }
4336 if !seen_tool_names.insert(tool.definition.function.name.clone()) {
4337 diagnostics.push(YamlWorkflowDiagnostic {
4338 node_id: Some(node.id.clone()),
4339 code: "duplicate_tool_name".to_string(),
4340 severity: YamlWorkflowDiagnosticSeverity::Error,
4341 message: format!(
4342 "duplicate tool function name '{}' in node",
4343 tool.definition.function.name
4344 ),
4345 });
4346 }
4347
4348 let schema = tool
4349 .definition
4350 .function
4351 .parameters
4352 .clone()
4353 .unwrap_or(Value::Null);
4354 if schema.is_null() {
4355 diagnostics.push(YamlWorkflowDiagnostic {
4356 node_id: Some(node.id.clone()),
4357 code: "missing_tool_input_schema".to_string(),
4358 severity: YamlWorkflowDiagnosticSeverity::Error,
4359 message: format!(
4360 "tool '{}' is missing input schema",
4361 tool.definition.function.name
4362 ),
4363 });
4364 } else if let Err(message) = validate_json_schema(&schema) {
4365 diagnostics.push(YamlWorkflowDiagnostic {
4366 node_id: Some(node.id.clone()),
4367 code: "invalid_tool_input_schema".to_string(),
4368 severity: YamlWorkflowDiagnosticSeverity::Error,
4369 message: format!(
4370 "tool '{}' has invalid input schema: {}",
4371 tool.definition.function.name, message
4372 ),
4373 });
4374 }
4375
4376 if let Some(output_schema) = tool.output_schema.as_ref() {
4377 if let Err(message) = validate_json_schema(output_schema) {
4378 diagnostics.push(YamlWorkflowDiagnostic {
4379 node_id: Some(node.id.clone()),
4380 code: "invalid_tool_output_schema".to_string(),
4381 severity: YamlWorkflowDiagnosticSeverity::Error,
4382 message: format!(
4383 "tool '{}' has invalid output schema: {}",
4384 tool.definition.function.name, message
4385 ),
4386 });
4387 }
4388 }
4389 }
4390 }
4391
4392 if let Some(switch) = &node.node_type.switch {
4393 for branch in &switch.branches {
4394 if !known_ids.contains_key(branch.target.as_str()) {
4395 diagnostics.push(YamlWorkflowDiagnostic {
4396 node_id: Some(node.id.clone()),
4397 code: "unknown_switch_target".to_string(),
4398 severity: YamlWorkflowDiagnosticSeverity::Error,
4399 message: format!("switch branch target '{}' does not exist", branch.target),
4400 });
4401 }
4402 }
4403 if !known_ids.contains_key(switch.default.as_str()) {
4404 diagnostics.push(YamlWorkflowDiagnostic {
4405 node_id: Some(node.id.clone()),
4406 code: "unknown_switch_default".to_string(),
4407 severity: YamlWorkflowDiagnosticSeverity::Error,
4408 message: format!("switch default target '{}' does not exist", switch.default),
4409 });
4410 }
4411 }
4412
4413 if let Some(config) = node.config.as_ref() {
4414 if let Some(update_globals) = config.update_globals.as_ref() {
4415 for (key, update) in update_globals {
4416 let is_valid_op =
4417 matches!(update.op.as_str(), "set" | "append" | "increment" | "merge");
4418 if !is_valid_op {
4419 diagnostics.push(YamlWorkflowDiagnostic {
4420 node_id: Some(node.id.clone()),
4421 code: "unknown_update_op".to_string(),
4422 severity: YamlWorkflowDiagnosticSeverity::Error,
4423 message: format!(
4424 "update_globals key '{}' has unknown op '{}'; expected set|append|increment|merge",
4425 key, update.op
4426 ),
4427 });
4428 }
4429
4430 if update.op != "increment" && update.from.is_none() {
4431 diagnostics.push(YamlWorkflowDiagnostic {
4432 node_id: Some(node.id.clone()),
4433 code: "missing_update_from".to_string(),
4434 severity: YamlWorkflowDiagnosticSeverity::Error,
4435 message: format!(
4436 "update_globals key '{}' with op '{}' requires 'from'",
4437 key, update.op
4438 ),
4439 });
4440 }
4441 }
4442 }
4443 }
4444 }
4445
4446 diagnostics
4447}
4448
4449fn resolve_path<'a>(value: &'a Value, path: &str) -> Option<&'a Value> {
4450 path.split('.')
4451 .filter(|segment| !segment.is_empty())
4452 .try_fold(value, |current, segment| {
4453 if let Ok(index) = segment.parse::<usize>() {
4454 return current.get(index);
4455 }
4456 current.get(segment)
4457 })
4458}
4459
4460fn interpolate_template(template: &str, context: &Value) -> String {
4461 let mut out = String::with_capacity(template.len());
4462 let mut rest = template;
4463
4464 loop {
4465 let Some(start) = rest.find("{{") else {
4466 out.push_str(rest);
4467 break;
4468 };
4469
4470 out.push_str(&rest[..start]);
4471 let after_start = &rest[start + 2..];
4472 let Some(end) = after_start.find("}}") else {
4473 out.push_str(&rest[start..]);
4474 break;
4475 };
4476
4477 let expr = after_start[..end].trim();
4478 let source_path = expr.trim_start_matches("$.");
4479 let replacement = resolve_path(context, source_path)
4480 .map(value_to_template_string)
4481 .unwrap_or_default();
4482 out.push_str(replacement.as_str());
4483
4484 rest = &after_start[end + 2..];
4485 }
4486
4487 out
4488}
4489
4490fn collect_template_bindings(template: &str, context: &Value) -> Vec<YamlTemplateBinding> {
4491 let mut bindings = Vec::new();
4492 let mut rest = template;
4493
4494 loop {
4495 let Some(start) = rest.find("{{") else {
4496 break;
4497 };
4498
4499 let after_start = &rest[start + 2..];
4500 let Some(end) = after_start.find("}}") else {
4501 break;
4502 };
4503
4504 let expr = after_start[..end].trim();
4505 let source_path = expr.trim_start_matches("$.").to_string();
4506 let resolved = resolve_path(context, source_path.as_str()).cloned();
4507 let missing = resolved.is_none();
4508 let resolved_value = resolved.unwrap_or(Value::Null);
4509 bindings.push(YamlTemplateBinding {
4510 index: bindings.len(),
4511 expression: expr.to_string(),
4512 source_path,
4513 resolved_type: json_type_name(&resolved_value).to_string(),
4514 missing,
4515 resolved: resolved_value,
4516 });
4517
4518 rest = &after_start[end + 2..];
4519 }
4520
4521 bindings
4522}
4523
4524fn json_type_name(value: &Value) -> &'static str {
4525 match value {
4526 Value::Null => "null",
4527 Value::Bool(_) => "bool",
4528 Value::Number(_) => "number",
4529 Value::String(_) => "string",
4530 Value::Array(_) => "array",
4531 Value::Object(_) => "object",
4532 }
4533}
4534
4535fn value_to_template_string(value: &Value) -> String {
4536 match value {
4537 Value::Null => String::new(),
4538 Value::Bool(v) => v.to_string(),
4539 Value::Number(v) => v.to_string(),
4540 Value::String(v) => v.clone(),
4541 Value::Array(_) | Value::Object(_) => serde_json::to_string(value).unwrap_or_default(),
4542 }
4543}
4544
4545fn apply_set_globals(
4546 node: &YamlNode,
4547 outputs: &BTreeMap<String, Value>,
4548 workflow_input: &Value,
4549 globals: &mut serde_json::Map<String, Value>,
4550) {
4551 let Some(config) = node.config.as_ref() else {
4552 return;
4553 };
4554 let Some(set_globals) = config.set_globals.as_ref() else {
4555 return;
4556 };
4557
4558 let context = json!({
4559 "input": workflow_input,
4560 "nodes": outputs,
4561 "globals": Value::Object(globals.clone())
4562 });
4563
4564 for (key, expr) in set_globals {
4565 let value = resolve_path(&context, expr.as_str())
4566 .cloned()
4567 .unwrap_or(Value::Null);
4568 globals.insert(key.clone(), value);
4569 }
4570}
4571
4572fn apply_update_globals(
4573 node: &YamlNode,
4574 outputs: &BTreeMap<String, Value>,
4575 workflow_input: &Value,
4576 globals: &mut serde_json::Map<String, Value>,
4577) {
4578 let Some(config) = node.config.as_ref() else {
4579 return;
4580 };
4581 let Some(update_globals) = config.update_globals.as_ref() else {
4582 return;
4583 };
4584
4585 let context = json!({
4586 "input": workflow_input,
4587 "nodes": outputs,
4588 "globals": Value::Object(globals.clone())
4589 });
4590
4591 for (key, update) in update_globals {
4592 match update.op.as_str() {
4593 "set" => {
4594 if let Some(path) = update.from.as_ref() {
4595 let value = resolve_path(&context, path.as_str())
4596 .cloned()
4597 .unwrap_or(Value::Null);
4598 globals.insert(key.clone(), value);
4599 }
4600 }
4601 "append" => {
4602 if let Some(path) = update.from.as_ref() {
4603 let value = resolve_path(&context, path.as_str())
4604 .cloned()
4605 .unwrap_or(Value::Null);
4606 let entry = globals
4607 .entry(key.clone())
4608 .or_insert_with(|| Value::Array(Vec::new()));
4609 match entry {
4610 Value::Array(items) => items.push(value),
4611 other => {
4612 let existing = other.clone();
4613 *other = Value::Array(vec![existing, value]);
4614 }
4615 }
4616 }
4617 }
4618 "increment" => {
4619 let by = update.by.unwrap_or(1.0);
4620 let current = globals
4621 .get(key.as_str())
4622 .and_then(Value::as_f64)
4623 .unwrap_or(0.0);
4624 if let Some(next) = serde_json::Number::from_f64(current + by) {
4625 globals.insert(key.clone(), Value::Number(next));
4626 }
4627 }
4628 "merge" => {
4629 if let Some(path) = update.from.as_ref() {
4630 let source = resolve_path(&context, path.as_str())
4631 .cloned()
4632 .unwrap_or(Value::Null);
4633 if let Value::Object(source_map) = source {
4634 let target = globals
4635 .entry(key.clone())
4636 .or_insert_with(|| Value::Object(serde_json::Map::new()));
4637 if let Value::Object(target_map) = target {
4638 target_map.extend(source_map);
4639 } else {
4640 *target = Value::Object(source_map);
4641 }
4642 }
4643 }
4644 }
4645 _ => {}
4646 }
4647 }
4648}
4649
4650fn llm_output_schema_for_node(node: &YamlNode) -> Value {
4651 if let Some(schema) = node
4652 .config
4653 .as_ref()
4654 .and_then(|cfg| cfg.output_schema.clone())
4655 {
4656 return schema;
4657 }
4658
4659 default_llm_output_schema()
4660}
4661
4662fn normalize_tool_choice(
4663 config: Option<YamlToolChoiceConfig>,
4664) -> Result<Option<ToolChoice>, String> {
4665 let Some(config) = config else {
4666 return Ok(None);
4667 };
4668
4669 let choice = match config {
4670 YamlToolChoiceConfig::Mode(mode) => ToolChoice::Mode(mode),
4671 YamlToolChoiceConfig::Function { function } => ToolChoice::Tool(ToolChoiceTool {
4672 tool_type: ToolType::Function,
4673 function: ToolChoiceFunction { name: function },
4674 }),
4675 YamlToolChoiceConfig::OpenAi(tool) => ToolChoice::Tool(tool),
4676 };
4677
4678 Ok(Some(choice))
4679}
4680
4681fn normalize_llm_tools(llm: &YamlLlmCall) -> Result<Vec<YamlResolvedTool>, String> {
4682 llm.tools
4683 .iter()
4684 .map(|tool| match (llm.tools_format, tool) {
4685 (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4686 let definition = ToolDefinition {
4687 tool_type: openai.tool_type.unwrap_or(ToolType::Function),
4688 function: ToolFunction {
4689 name: openai.function.name.clone(),
4690 description: openai.function.description.clone(),
4691 parameters: openai.function.parameters.clone(),
4692 },
4693 };
4694 Ok(YamlResolvedTool {
4695 definition,
4696 output_schema: openai.function.output_schema.clone(),
4697 })
4698 }
4699 (YamlToolFormat::Simplified, YamlToolDeclaration::Simplified(simple)) => {
4700 let definition = ToolDefinition {
4701 tool_type: ToolType::Function,
4702 function: ToolFunction {
4703 name: simple.name.clone(),
4704 description: simple.description.clone(),
4705 parameters: Some(simple.input_schema.clone()),
4706 },
4707 };
4708 Ok(YamlResolvedTool {
4709 definition,
4710 output_schema: simple.output_schema.clone(),
4711 })
4712 }
4713 (YamlToolFormat::Openai, _) => {
4714 Err("tools_format=openai requires OpenAI-style tool declarations".to_string())
4715 }
4716 (YamlToolFormat::Simplified, _) => {
4717 Err("tools_format=simplified requires simplified tool declarations".to_string())
4718 }
4719 })
4720 .collect()
4721}
4722
4723fn default_llm_output_schema() -> Value {
4724 json!({
4725 "type": "object",
4726 "additionalProperties": true
4727 })
4728}
4729
4730fn mock_rag(topic: &str) -> Value {
4731 let (kb_source, playbook) = match topic {
4732 "probation" => (
4733 "hr_policy/probation.md",
4734 "Collect manager review, performance evidence, and probation timeline.",
4735 ),
4736 "leave_request" => (
4737 "hr_policy/leave.md",
4738 "Validate leave balance, manager approval, and blackout dates.",
4739 ),
4740 "supply_chain_order_assessment" => (
4741 "supply_chain/order_assessment.md",
4742 "Review order specs, inventory risk, and vendor lead-time guidance.",
4743 ),
4744 "supply_chain_order_replacement" => (
4745 "supply_chain/order_replacement.md",
4746 "Collect order id, damage proof, and replacement SLA policy.",
4747 ),
4748 "termination_first_time_offense" => (
4749 "hr_policy/termination_first_offense.md",
4750 "Validate first-incident criteria and route to HRBP review.",
4751 ),
4752 "termination_repeated_offense" => (
4753 "hr_policy/termination_repeated_offense.md",
4754 "Collect prior warnings and escalation approvals before final action.",
4755 ),
4756 _ => (
4757 "shared/request_clarification.md",
4758 "Request clarifying details before routing.",
4759 ),
4760 };
4761
4762 json!({
4763 "kb_source": kb_source,
4764 "playbook": playbook,
4765 })
4766}
4767
4768async fn execute_subworkflow_tool_call(
4769 payload: &Value,
4770 context: &Value,
4771 client: &SimpleAgentsClient,
4772 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4773 parent_options: &YamlWorkflowRunOptions,
4774 parent_trace_context: Option<&TraceContext>,
4775 resolved_trace_id: Option<&str>,
4776) -> Result<Value, String> {
4777 let workflow_id = payload
4778 .get("workflow_id")
4779 .and_then(Value::as_str)
4780 .ok_or_else(|| "run_workflow_graph requires payload.workflow_id".to_string())?;
4781
4782 let input_context = context
4783 .get("input")
4784 .and_then(Value::as_object)
4785 .ok_or_else(|| "run_workflow_graph requires context.input".to_string())?;
4786
4787 let registry = input_context
4788 .get("workflow_registry")
4789 .and_then(Value::as_object)
4790 .ok_or_else(|| {
4791 "run_workflow_graph requires input.workflow_registry map of workflow_id -> yaml_path"
4792 .to_string()
4793 })?;
4794
4795 let workflow_path = registry
4796 .get(workflow_id)
4797 .and_then(Value::as_str)
4798 .ok_or_else(|| {
4799 format!(
4800 "workflow_registry has no entry for workflow_id '{}'",
4801 workflow_id
4802 )
4803 })?;
4804
4805 let parent_depth = input_context
4806 .get("__subgraph_depth")
4807 .and_then(Value::as_u64)
4808 .unwrap_or(0);
4809 let max_depth = input_context
4810 .get("__subgraph_max_depth")
4811 .and_then(Value::as_u64)
4812 .unwrap_or(3);
4813
4814 if parent_depth >= max_depth {
4815 return Err(format!(
4816 "run_workflow_graph depth limit reached (depth={}, max={})",
4817 parent_depth, max_depth
4818 ));
4819 }
4820
4821 let mut subworkflow_input = payload
4822 .get("input")
4823 .and_then(Value::as_object)
4824 .cloned()
4825 .unwrap_or_default();
4826
4827 if !subworkflow_input.contains_key("messages") {
4828 if let Some(messages) = input_context.get("messages") {
4829 subworkflow_input.insert("messages".to_string(), messages.clone());
4830 }
4831 }
4832
4833 if !subworkflow_input.contains_key("email_text") {
4834 if let Some(email_text) = input_context.get("email_text") {
4835 subworkflow_input.insert("email_text".to_string(), email_text.clone());
4836 }
4837 }
4838
4839 if !subworkflow_input.contains_key("workflow_registry") {
4840 subworkflow_input.insert(
4841 "workflow_registry".to_string(),
4842 Value::Object(registry.clone()),
4843 );
4844 }
4845
4846 subworkflow_input.insert(
4847 "__subgraph_depth".to_string(),
4848 Value::Number(serde_json::Number::from(parent_depth + 1)),
4849 );
4850 subworkflow_input.insert(
4851 "__subgraph_max_depth".to_string(),
4852 Value::Number(serde_json::Number::from(max_depth)),
4853 );
4854
4855 let subworkflow_options =
4856 build_subworkflow_options(parent_options, parent_trace_context, resolved_trace_id);
4857
4858 let output = run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
4859 Path::new(workflow_path),
4860 &Value::Object(subworkflow_input),
4861 client,
4862 custom_worker,
4863 None,
4864 &subworkflow_options,
4865 )
4866 .await
4867 .map_err(|error| format!("subworkflow '{}' failed: {}", workflow_id, error))?;
4868
4869 Ok(json!({
4870 "workflow_id": workflow_id,
4871 "workflow_path": workflow_path,
4872 "terminal_node": output.terminal_node,
4873 "terminal_output": output.terminal_output,
4874 "trace": output.trace,
4875 }))
4876}
4877
4878fn build_subworkflow_options(
4879 parent_options: &YamlWorkflowRunOptions,
4880 parent_trace_context: Option<&TraceContext>,
4881 resolved_trace_id: Option<&str>,
4882) -> YamlWorkflowRunOptions {
4883 let mut subworkflow_options = parent_options.clone();
4884 if parent_trace_context.is_some() || resolved_trace_id.is_some() {
4885 let trace_context = YamlWorkflowTraceContextInput {
4886 trace_id: resolved_trace_id
4887 .map(|value| value.to_string())
4888 .or_else(|| parent_trace_context.and_then(|ctx| ctx.trace_id.clone())),
4889 span_id: parent_trace_context.and_then(|ctx| ctx.span_id.clone()),
4890 parent_span_id: parent_trace_context.and_then(|ctx| ctx.parent_span_id.clone()),
4891 traceparent: parent_trace_context.and_then(|ctx| ctx.traceparent.clone()),
4892 tracestate: parent_trace_context.and_then(|ctx| ctx.tracestate.clone()),
4893 baggage: parent_trace_context
4894 .map(|ctx| ctx.baggage.clone())
4895 .unwrap_or_default(),
4896 };
4897 subworkflow_options.trace.context = Some(trace_context);
4898 }
4899 subworkflow_options
4900}
4901
4902fn mock_custom_worker_output(
4903 handler: &str,
4904 payload: &Value,
4905) -> Result<Value, YamlWorkflowRunError> {
4906 if handler == "get_employee_record" {
4907 let employee_name = payload
4908 .get("employee_name")
4909 .and_then(Value::as_str)
4910 .unwrap_or("Unknown Employee")
4911 .trim();
4912 let normalized_name = if employee_name.is_empty() {
4913 "Unknown Employee"
4914 } else {
4915 employee_name
4916 };
4917
4918 let (employee_id, location) = match normalized_name.to_ascii_lowercase().as_str() {
4919 "alex johnson" => ("EMP-2041", "Austin"),
4920 "priya sharma" => ("EMP-3378", "Bengaluru"),
4921 "marcus lee" => ("EMP-1196", "Singapore"),
4922 "sarah chen" => ("EMP-4450", "Toronto"),
4923 _ => ("EMP-0000", "Unassigned"),
4924 };
4925
4926 return Ok(json!({
4927 "employee_name": normalized_name,
4928 "employee_id": employee_id,
4929 "location": location,
4930 }));
4931 }
4932
4933 if let Some(topic) = payload.get("topic").and_then(Value::as_str) {
4934 let mut value = mock_rag(topic);
4935 if let Value::Object(object) = &mut value {
4936 object.insert("handler".to_string(), Value::String(handler.to_string()));
4937 }
4938 return Ok(value);
4939 }
4940
4941 Err(YamlWorkflowRunError::UnsupportedCustomHandler {
4942 handler: handler.to_string(),
4943 })
4944}
4945
4946#[derive(Debug, Clone, Deserialize)]
4947pub struct YamlWorkflow {
4948 pub id: String,
4949 pub entry_node: String,
4950 #[serde(default)]
4951 pub nodes: Vec<YamlNode>,
4952 #[serde(default)]
4953 pub edges: Vec<YamlEdge>,
4954}
4955
4956#[derive(Debug, Clone, Deserialize)]
4957pub struct YamlNode {
4958 pub id: String,
4959 pub node_type: YamlNodeType,
4960 pub config: Option<YamlNodeConfig>,
4961}
4962
4963impl YamlNode {
4964 fn kind_name(&self) -> &'static str {
4965 if self.node_type.llm_call.is_some() {
4966 "llm_call"
4967 } else if self.node_type.switch.is_some() {
4968 "switch"
4969 } else if self.node_type.custom_worker.is_some() {
4970 "custom_worker"
4971 } else {
4972 "unknown"
4973 }
4974 }
4975}
4976
4977#[derive(Debug, Clone, Deserialize)]
4978pub struct YamlNodeType {
4979 pub llm_call: Option<YamlLlmCall>,
4980 pub switch: Option<YamlSwitch>,
4981 pub custom_worker: Option<YamlCustomWorker>,
4982}
4983
4984#[derive(Debug, Clone, Deserialize)]
4985pub struct YamlLlmCall {
4986 pub model: String,
4987 pub stream: Option<bool>,
4988 pub stream_json_as_text: Option<bool>,
4989 pub heal: Option<bool>,
4990 pub messages_path: Option<String>,
4991 pub append_prompt_as_user: Option<bool>,
4992 #[serde(default)]
4993 pub tools_format: YamlToolFormat,
4994 #[serde(default)]
4995 pub tools: Vec<YamlToolDeclaration>,
4996 pub tool_choice: Option<YamlToolChoiceConfig>,
4997 pub max_tool_roundtrips: Option<u8>,
4998 pub tool_calls_global_key: Option<String>,
4999}
5000
5001#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
5002#[serde(rename_all = "snake_case")]
5003pub enum YamlToolFormat {
5004 #[default]
5005 Openai,
5006 Simplified,
5007}
5008
5009#[derive(Debug, Clone, Deserialize)]
5010#[serde(untagged)]
5011pub enum YamlToolDeclaration {
5012 OpenAi(YamlOpenAiToolDeclaration),
5013 Simplified(YamlSimplifiedToolDeclaration),
5014}
5015
5016#[derive(Debug, Clone, Deserialize)]
5017pub struct YamlOpenAiToolDeclaration {
5018 #[serde(rename = "type")]
5019 pub tool_type: Option<ToolType>,
5020 pub function: YamlOpenAiToolFunction,
5021}
5022
5023#[derive(Debug, Clone, Deserialize)]
5024pub struct YamlOpenAiToolFunction {
5025 pub name: String,
5026 pub description: Option<String>,
5027 pub parameters: Option<Value>,
5028 pub output_schema: Option<Value>,
5029}
5030
5031#[derive(Debug, Clone, Deserialize)]
5032pub struct YamlSimplifiedToolDeclaration {
5033 pub name: String,
5034 pub description: Option<String>,
5035 pub input_schema: Value,
5036 pub output_schema: Option<Value>,
5037}
5038
5039#[derive(Debug, Clone, Deserialize)]
5040#[serde(untagged)]
5041pub enum YamlToolChoiceConfig {
5042 Mode(ToolChoiceMode),
5043 Function { function: String },
5044 OpenAi(ToolChoiceTool),
5045}
5046
5047#[derive(Debug, Clone, Deserialize)]
5048pub struct YamlSwitch {
5049 #[serde(default)]
5050 pub branches: Vec<YamlSwitchBranch>,
5051 pub default: String,
5052}
5053
5054#[derive(Debug, Clone, Deserialize)]
5055pub struct YamlSwitchBranch {
5056 pub condition: String,
5057 pub target: String,
5058}
5059
5060#[derive(Debug, Clone, Deserialize)]
5061pub struct YamlCustomWorker {
5062 pub handler: String,
5063}
5064
5065#[derive(Debug, Clone, Deserialize)]
5066pub struct YamlNodeConfig {
5067 pub prompt: Option<String>,
5068 #[serde(default, alias = "schema")]
5069 pub output_schema: Option<Value>,
5070 pub payload: Option<Value>,
5071 pub set_globals: Option<HashMap<String, String>>,
5072 pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
5073}
5074
5075#[derive(Debug, Clone, Deserialize)]
5076pub struct YamlGlobalUpdate {
5077 pub op: String,
5078 pub from: Option<String>,
5079 pub by: Option<f64>,
5080}
5081
5082#[derive(Debug, Clone, Deserialize)]
5083pub struct YamlEdge {
5084 pub from: String,
5085 pub to: String,
5086}
5087
5088#[cfg(test)]
5089mod tests {
5090 use super::*;
5091 use simple_agent_type::provider::{Provider, ProviderRequest, ProviderResponse};
5092 use simple_agent_type::response::{CompletionChoice, CompletionResponse, Usage};
5093 use simple_agent_type::tool::{ToolCallFunction, ToolType};
5094 use simple_agent_type::{Result as SaResult, SimpleAgentsError};
5095 use simple_agents_core::SimpleAgentsClientBuilder;
5096 use std::fs;
5097 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5098 use std::sync::{Arc, Mutex, OnceLock};
5099
5100 fn stream_debug_env_lock() -> &'static Mutex<()> {
5101 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
5102 LOCK.get_or_init(|| Mutex::new(()))
5103 }
5104
5105 struct MockExecutor;
5106
5107 struct RecordingSink {
5108 events: Mutex<Vec<YamlWorkflowEvent>>,
5109 }
5110
5111 struct CancelAfterFirstEventSink {
5112 cancelled: AtomicBool,
5113 }
5114
5115 impl YamlWorkflowEventSink for CancelAfterFirstEventSink {
5116 fn emit(&self, _event: &YamlWorkflowEvent) {
5117 self.cancelled.store(true, Ordering::SeqCst);
5118 }
5119
5120 fn is_cancelled(&self) -> bool {
5121 self.cancelled.load(Ordering::SeqCst)
5122 }
5123 }
5124
5125 struct CountingExecutor {
5126 call_count: AtomicUsize,
5127 }
5128
5129 struct CapturingWorker {
5130 context: Mutex<Option<Value>>,
5131 }
5132
5133 struct ToolLoopProvider;
5134
5135 struct UnknownToolProvider;
5136
5137 #[async_trait]
5138 impl Provider for ToolLoopProvider {
5139 fn name(&self) -> &str {
5140 "openai"
5141 }
5142
5143 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5144 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5145 Ok(ProviderRequest::new("mock://tool-loop").with_body(body))
5146 }
5147
5148 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5149 let request: CompletionRequest =
5150 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5151
5152 let has_tools = request
5153 .tools
5154 .as_ref()
5155 .is_some_and(|tools| !tools.is_empty());
5156 let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
5157
5158 let response = if has_tools && !has_tool_result {
5159 CompletionResponse {
5160 id: "resp_tool_1".to_string(),
5161 model: request.model.clone(),
5162 choices: vec![CompletionChoice {
5163 index: 0,
5164 message: Message::assistant("").with_tool_calls(vec![ToolCall {
5165 id: "call_get_context".to_string(),
5166 tool_type: ToolType::Function,
5167 function: ToolCallFunction {
5168 name: "get_customer_context".to_string(),
5169 arguments: "{\"order_id\":\"123\"}".to_string(),
5170 },
5171 }]),
5172 finish_reason: FinishReason::ToolCalls,
5173 logprobs: None,
5174 }],
5175 usage: Usage::new(10, 5),
5176 created: None,
5177 provider: Some(self.name().to_string()),
5178 healing_metadata: None,
5179 }
5180 } else if has_tools && has_tool_result {
5181 CompletionResponse {
5182 id: "resp_tool_2".to_string(),
5183 model: request.model.clone(),
5184 choices: vec![CompletionChoice {
5185 index: 0,
5186 message: Message::assistant("{\"state\":\"done\"}"),
5187 finish_reason: FinishReason::Stop,
5188 logprobs: None,
5189 }],
5190 usage: Usage::new(12, 6),
5191 created: None,
5192 provider: Some(self.name().to_string()),
5193 healing_metadata: None,
5194 }
5195 } else {
5196 let prompt = request
5197 .messages
5198 .iter()
5199 .rev()
5200 .find(|m| m.role == Role::User)
5201 .map(|m| m.content.clone())
5202 .unwrap_or_default();
5203 let payload = json!({
5204 "subject": "ok",
5205 "body": prompt,
5206 })
5207 .to_string();
5208 CompletionResponse {
5209 id: "resp_final".to_string(),
5210 model: request.model.clone(),
5211 choices: vec![CompletionChoice {
5212 index: 0,
5213 message: Message::assistant(payload),
5214 finish_reason: FinishReason::Stop,
5215 logprobs: None,
5216 }],
5217 usage: Usage::new(8, 4),
5218 created: None,
5219 provider: Some(self.name().to_string()),
5220 healing_metadata: None,
5221 }
5222 };
5223
5224 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5225 Ok(ProviderResponse::new(200, body))
5226 }
5227
5228 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5229 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5230 }
5231 }
5232
5233 #[async_trait]
5234 impl Provider for UnknownToolProvider {
5235 fn name(&self) -> &str {
5236 "openai"
5237 }
5238
5239 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5240 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5241 Ok(ProviderRequest::new("mock://unknown-tool").with_body(body))
5242 }
5243
5244 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5245 let request: CompletionRequest =
5246 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5247
5248 let response = CompletionResponse {
5249 id: "resp_unknown_tool".to_string(),
5250 model: request.model,
5251 choices: vec![CompletionChoice {
5252 index: 0,
5253 message: Message::assistant("").with_tool_calls(vec![ToolCall {
5254 id: "call_unknown".to_string(),
5255 tool_type: ToolType::Function,
5256 function: ToolCallFunction {
5257 name: "unknown_tool".to_string(),
5258 arguments: "{}".to_string(),
5259 },
5260 }]),
5261 finish_reason: FinishReason::ToolCalls,
5262 logprobs: None,
5263 }],
5264 usage: Usage::new(5, 2),
5265 created: None,
5266 provider: Some(self.name().to_string()),
5267 healing_metadata: None,
5268 };
5269
5270 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5271 Ok(ProviderResponse::new(200, body))
5272 }
5273
5274 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5275 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5276 }
5277 }
5278
5279 struct FixedToolWorker {
5280 payload: Value,
5281 }
5282
5283 struct CountingToolWorker {
5284 execute_calls: AtomicUsize,
5285 }
5286
5287 #[async_trait]
5288 impl YamlWorkflowCustomWorkerExecutor for FixedToolWorker {
5289 async fn execute(
5290 &self,
5291 _handler: &str,
5292 _payload: &Value,
5293 _email_text: &str,
5294 _context: &Value,
5295 ) -> Result<Value, String> {
5296 Ok(self.payload.clone())
5297 }
5298 }
5299
5300 #[async_trait]
5301 impl YamlWorkflowCustomWorkerExecutor for CountingToolWorker {
5302 async fn execute(
5303 &self,
5304 _handler: &str,
5305 _payload: &Value,
5306 _email_text: &str,
5307 _context: &Value,
5308 ) -> Result<Value, String> {
5309 self.execute_calls.fetch_add(1, Ordering::SeqCst);
5310 Ok(json!({"ok": true}))
5311 }
5312 }
5313
5314 #[async_trait]
5315 impl YamlWorkflowLlmExecutor for CountingExecutor {
5316 async fn complete_structured(
5317 &self,
5318 _request: YamlLlmExecutionRequest,
5319 _event_sink: Option<&dyn YamlWorkflowEventSink>,
5320 ) -> Result<YamlLlmExecutionResult, String> {
5321 self.call_count.fetch_add(1, Ordering::SeqCst);
5322 Ok(YamlLlmExecutionResult {
5323 payload: json!({"state":"ok"}),
5324 usage: None,
5325 ttft_ms: None,
5326 tool_calls: Vec::new(),
5327 })
5328 }
5329 }
5330
5331 impl YamlWorkflowEventSink for RecordingSink {
5332 fn emit(&self, event: &YamlWorkflowEvent) {
5333 self.events
5334 .lock()
5335 .expect("recording sink lock should not be poisoned")
5336 .push(event.clone());
5337 }
5338 }
5339
5340 #[async_trait]
5341 impl YamlWorkflowCustomWorkerExecutor for CapturingWorker {
5342 async fn execute(
5343 &self,
5344 _handler: &str,
5345 _payload: &Value,
5346 _email_text: &str,
5347 context: &Value,
5348 ) -> Result<Value, String> {
5349 let mut guard = self
5350 .context
5351 .lock()
5352 .map_err(|_| "capturing worker lock should not be poisoned".to_string())?;
5353 *guard = Some(context.clone());
5354 Ok(json!({"ok": true}))
5355 }
5356 }
5357
5358 #[async_trait]
5359 impl YamlWorkflowLlmExecutor for MockExecutor {
5360 async fn complete_structured(
5361 &self,
5362 request: YamlLlmExecutionRequest,
5363 _event_sink: Option<&dyn YamlWorkflowEventSink>,
5364 ) -> Result<YamlLlmExecutionResult, String> {
5365 let prompt = request.prompt;
5366 if prompt.contains("exactly one category") {
5367 return Ok(YamlLlmExecutionResult {
5368 payload: json!({"category":"termination","reason":"mock"}),
5369 usage: Some(YamlLlmTokenUsage {
5370 prompt_tokens: 10,
5371 completion_tokens: 5,
5372 total_tokens: 15,
5373 thinking_tokens: None,
5374 }),
5375 ttft_ms: None,
5376 tool_calls: Vec::new(),
5377 });
5378 }
5379 if prompt.contains("Determine termination subtype") {
5380 return Ok(YamlLlmExecutionResult {
5381 payload: json!({"subtype":"repeated_offense","reason":"mock"}),
5382 usage: Some(YamlLlmTokenUsage {
5383 prompt_tokens: 12,
5384 completion_tokens: 6,
5385 total_tokens: 18,
5386 thinking_tokens: None,
5387 }),
5388 ttft_ms: None,
5389 tool_calls: Vec::new(),
5390 });
5391 }
5392 if prompt.contains("Determine supply chain subtype") {
5393 return Ok(YamlLlmExecutionResult {
5394 payload: json!({"subtype":"order_replacement","reason":"mock"}),
5395 usage: Some(YamlLlmTokenUsage {
5396 prompt_tokens: 11,
5397 completion_tokens: 4,
5398 total_tokens: 15,
5399 thinking_tokens: None,
5400 }),
5401 ttft_ms: None,
5402 tool_calls: Vec::new(),
5403 });
5404 }
5405 Err("unexpected prompt".to_string())
5406 }
5407 }
5408
5409 #[tokio::test]
5410 async fn runs_yaml_workflow_and_returns_step_timings() {
5411 let yaml = r#"
5412id: email-intake-classification
5413entry_node: classify_top_level
5414nodes:
5415 - id: classify_top_level
5416 node_type:
5417 llm_call:
5418 model: gpt-4.1
5419 config:
5420 prompt: |
5421 Classify this email into exactly one category:
5422 {{ input.email_text }}
5423 - id: route_top_level
5424 node_type:
5425 switch:
5426 branches:
5427 - condition: '$.nodes.classify_top_level.output.category == "termination"'
5428 target: classify_termination_subtype
5429 default: rag_clarification
5430 - id: classify_termination_subtype
5431 node_type:
5432 llm_call:
5433 model: gpt-4.1
5434 config:
5435 prompt: |
5436 Determine termination subtype:
5437 {{ input.email_text }}
5438 - id: route_termination_subtype
5439 node_type:
5440 switch:
5441 branches:
5442 - condition: '$.nodes.classify_termination_subtype.output.subtype == "repeated_offense"'
5443 target: rag_termination_repeated_offense
5444 default: rag_clarification
5445 - id: rag_termination_repeated_offense
5446 node_type:
5447 custom_worker:
5448 handler: GetRagData
5449 config:
5450 payload:
5451 topic: termination_repeated_offense
5452 - id: rag_clarification
5453 node_type:
5454 custom_worker:
5455 handler: GetRagData
5456 config:
5457 payload:
5458 topic: clarification
5459edges:
5460 - from: classify_top_level
5461 to: route_top_level
5462 - from: classify_termination_subtype
5463 to: route_termination_subtype
5464"#;
5465
5466 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5467 let output = run_email_workflow_yaml(&workflow, "test", &MockExecutor)
5468 .await
5469 .expect("yaml workflow should execute");
5470
5471 assert_eq!(output.workflow_id, "email-intake-classification");
5472 assert_eq!(output.terminal_node, "rag_termination_repeated_offense");
5473 assert!(!output.step_timings.is_empty());
5474 assert_eq!(output.step_timings.len(), output.trace.len());
5475 assert!(output
5476 .outputs
5477 .contains_key("rag_termination_repeated_offense"));
5478 assert_eq!(output.total_input_tokens, 22);
5479 assert_eq!(output.total_output_tokens, 11);
5480 assert_eq!(output.total_tokens, 33);
5481 }
5482
5483 #[tokio::test]
5484 async fn emits_resolved_llm_input_event_with_bindings() {
5485 let yaml = r#"
5486id: email-intake-classification
5487entry_node: classify_top_level
5488nodes:
5489 - id: classify_top_level
5490 node_type:
5491 llm_call:
5492 model: gpt-4.1
5493 config:
5494 prompt: |
5495 Classify this email into exactly one category:
5496 {{ input.email_text }}
5497"#;
5498
5499 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5500 let sink = RecordingSink {
5501 events: Mutex::new(Vec::new()),
5502 };
5503
5504 let output = run_email_workflow_yaml_with_custom_worker_and_events(
5505 &workflow,
5506 "Need help with termination",
5507 &MockExecutor,
5508 None,
5509 Some(&sink),
5510 )
5511 .await
5512 .expect("yaml workflow should execute");
5513
5514 assert_eq!(output.terminal_node, "classify_top_level");
5515
5516 let events = sink
5517 .events
5518 .lock()
5519 .expect("recording sink lock should not be poisoned");
5520 let llm_event = events
5521 .iter()
5522 .find(|event| event.event_type == "node_llm_input_resolved")
5523 .expect("expected llm input telemetry event");
5524
5525 let metadata = llm_event
5526 .metadata
5527 .as_ref()
5528 .expect("llm input event must include metadata");
5529 assert_eq!(metadata["model"], Value::String("gpt-4.1".to_string()));
5530 assert_eq!(metadata["stream_requested"], Value::Bool(false));
5531 assert_eq!(metadata["heal_requested"], Value::Bool(false));
5532 assert!(metadata["prompt"]
5533 .as_str()
5534 .expect("prompt should be a string")
5535 .contains("Need help with termination"));
5536
5537 let bindings = metadata["bindings"]
5538 .as_array()
5539 .expect("bindings should be an array");
5540 assert_eq!(bindings.len(), 1);
5541 assert_eq!(
5542 bindings[0]["source_path"],
5543 Value::String("input.email_text".to_string())
5544 );
5545 assert_eq!(
5546 bindings[0]["resolved"],
5547 Value::String("Need help with termination".to_string())
5548 );
5549 assert_eq!(bindings[0]["missing"], Value::Bool(false));
5550 assert_eq!(
5551 bindings[0]["resolved_type"],
5552 Value::String("string".to_string())
5553 );
5554 }
5555
5556 #[tokio::test]
5557 async fn workflow_completed_event_includes_nerdstats_by_default() {
5558 let yaml = r#"
5559id: nerdstats-default
5560entry_node: classify
5561nodes:
5562 - id: classify
5563 node_type:
5564 llm_call:
5565 model: gpt-4.1
5566 config:
5567 prompt: |
5568 Classify this email into exactly one category:
5569 {{ input.email_text }}
5570"#;
5571
5572 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5573 let sink = RecordingSink {
5574 events: Mutex::new(Vec::new()),
5575 };
5576
5577 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5578 &workflow,
5579 &json!({"email_text":"hello"}),
5580 &MockExecutor,
5581 None,
5582 Some(&sink),
5583 &YamlWorkflowRunOptions::default(),
5584 )
5585 .await
5586 .expect("workflow should execute");
5587
5588 let events = sink
5589 .events
5590 .lock()
5591 .expect("recording sink lock should not be poisoned");
5592 let completed = events
5593 .iter()
5594 .find(|event| event.event_type == "workflow_completed")
5595 .expect("expected workflow_completed event");
5596 let metadata = completed
5597 .metadata
5598 .as_ref()
5599 .expect("workflow_completed should include metadata by default");
5600 let nerdstats = metadata
5601 .get("nerdstats")
5602 .expect("nerdstats should be present by default");
5603
5604 assert_eq!(nerdstats["workflow_id"], Value::String(output.workflow_id));
5605 assert_eq!(
5606 nerdstats["terminal_node"],
5607 Value::String(output.terminal_node)
5608 );
5609 assert_eq!(
5610 nerdstats["total_tokens"],
5611 Value::Number(output.total_tokens.into())
5612 );
5613 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
5614 assert_eq!(
5615 nerdstats["token_metrics_source"],
5616 Value::String("provider_usage".to_string())
5617 );
5618 assert!(nerdstats.get("step_timings").is_none());
5619 assert!(nerdstats.get("llm_node_metrics").is_none());
5620 assert!(nerdstats.get("step_details").is_some());
5621 assert!(nerdstats.get("llm_node_models").is_some());
5622 assert_eq!(
5623 nerdstats["llm_node_models"]["classify"],
5624 Value::String("gpt-4.1".to_string())
5625 );
5626 assert_eq!(
5627 nerdstats["step_details"][0]["node_id"],
5628 Value::String("classify".to_string())
5629 );
5630 assert_eq!(nerdstats["ttft_ms"], Value::Null);
5631 }
5632
5633 #[tokio::test]
5634 async fn workflow_completed_event_omits_nerdstats_when_disabled() {
5635 let yaml = r#"
5636id: nerdstats-disabled
5637entry_node: classify
5638nodes:
5639 - id: classify
5640 node_type:
5641 llm_call:
5642 model: gpt-4.1
5643 config:
5644 prompt: |
5645 Classify this email into exactly one category:
5646 {{ input.email_text }}
5647"#;
5648
5649 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5650 let sink = RecordingSink {
5651 events: Mutex::new(Vec::new()),
5652 };
5653 let options = YamlWorkflowRunOptions {
5654 telemetry: YamlWorkflowTelemetryConfig {
5655 nerdstats: false,
5656 ..YamlWorkflowTelemetryConfig::default()
5657 },
5658 ..YamlWorkflowRunOptions::default()
5659 };
5660
5661 run_workflow_yaml_with_custom_worker_and_events_and_options(
5662 &workflow,
5663 &json!({"email_text":"hello"}),
5664 &MockExecutor,
5665 None,
5666 Some(&sink),
5667 &options,
5668 )
5669 .await
5670 .expect("workflow should execute");
5671
5672 let events = sink
5673 .events
5674 .lock()
5675 .expect("recording sink lock should not be poisoned");
5676 let completed = events
5677 .iter()
5678 .find(|event| event.event_type == "workflow_completed")
5679 .expect("expected workflow_completed event");
5680 assert!(completed.metadata.is_none());
5681 }
5682
5683 struct StreamAwareMockExecutor;
5684
5685 #[async_trait]
5686 impl YamlWorkflowLlmExecutor for StreamAwareMockExecutor {
5687 async fn complete_structured(
5688 &self,
5689 request: YamlLlmExecutionRequest,
5690 _event_sink: Option<&dyn YamlWorkflowEventSink>,
5691 ) -> Result<YamlLlmExecutionResult, String> {
5692 Ok(YamlLlmExecutionResult {
5693 payload: json!({"state":"ok"}),
5694 usage: Some(YamlLlmTokenUsage {
5695 prompt_tokens: 20,
5696 completion_tokens: 10,
5697 total_tokens: 30,
5698 thinking_tokens: None,
5699 }),
5700 ttft_ms: if request.stream { Some(12) } else { None },
5701 tool_calls: Vec::new(),
5702 })
5703 }
5704 }
5705
5706 #[tokio::test]
5707 async fn workflow_completed_event_includes_nerdstats_for_streaming_nodes() {
5708 let yaml = r#"
5709id: nerdstats-streaming
5710entry_node: classify
5711nodes:
5712 - id: classify
5713 node_type:
5714 llm_call:
5715 model: gpt-4.1
5716 stream: true
5717 config:
5718 prompt: |
5719 Return JSON only:
5720 {"state":"ok"}
5721"#;
5722
5723 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5724 let sink = RecordingSink {
5725 events: Mutex::new(Vec::new()),
5726 };
5727
5728 run_workflow_yaml_with_custom_worker_and_events_and_options(
5729 &workflow,
5730 &json!({"email_text":"hello"}),
5731 &StreamAwareMockExecutor,
5732 None,
5733 Some(&sink),
5734 &YamlWorkflowRunOptions::default(),
5735 )
5736 .await
5737 .expect("workflow should execute");
5738
5739 let events = sink
5740 .events
5741 .lock()
5742 .expect("recording sink lock should not be poisoned");
5743 let completed = events
5744 .iter()
5745 .find(|event| event.event_type == "workflow_completed")
5746 .expect("expected workflow_completed event");
5747 let metadata = completed
5748 .metadata
5749 .as_ref()
5750 .expect("workflow_completed should include metadata by default");
5751 let nerdstats = metadata
5752 .get("nerdstats")
5753 .expect("nerdstats should be present by default");
5754
5755 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
5756 assert_eq!(nerdstats["total_tokens"], Value::Number(30u64.into()));
5757 assert_eq!(nerdstats["ttft_ms"], Value::Number(12u64.into()));
5758 assert!(nerdstats.get("step_timings").is_none());
5759 assert!(nerdstats.get("llm_node_metrics").is_none());
5760 assert_eq!(
5761 nerdstats["llm_node_models"]["classify"],
5762 Value::String("gpt-4.1".to_string())
5763 );
5764 assert_eq!(
5765 nerdstats["step_details"][0]["node_id"],
5766 Value::String("classify".to_string())
5767 );
5768 }
5769
5770 #[test]
5771 fn workflow_nerdstats_marks_stream_token_metrics_unavailable() {
5772 let output = YamlWorkflowRunOutput {
5773 workflow_id: "workflow".to_string(),
5774 entry_node: "start".to_string(),
5775 email_text: "hello".to_string(),
5776 trace: vec!["llm_node".to_string()],
5777 outputs: BTreeMap::new(),
5778 terminal_node: "llm_node".to_string(),
5779 terminal_output: None,
5780 step_timings: vec![YamlStepTiming {
5781 node_id: "llm_node".to_string(),
5782 node_kind: "llm_call".to_string(),
5783 elapsed_ms: 100,
5784 prompt_tokens: None,
5785 completion_tokens: None,
5786 total_tokens: None,
5787 thinking_tokens: None,
5788 tokens_per_second: None,
5789 }],
5790 llm_node_metrics: BTreeMap::new(),
5791 llm_node_models: BTreeMap::new(),
5792 total_elapsed_ms: 100,
5793 ttft_ms: None,
5794 total_input_tokens: 0,
5795 total_output_tokens: 0,
5796 total_tokens: 0,
5797 total_thinking_tokens: None,
5798 tokens_per_second: 0.0,
5799 trace_id: Some("trace-1".to_string()),
5800 metadata: None,
5801 };
5802
5803 let nerdstats = workflow_nerdstats(&output);
5804 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(false));
5805 assert_eq!(
5806 nerdstats["token_metrics_source"],
5807 Value::String("provider_stream_usage_unavailable".to_string())
5808 );
5809 assert_eq!(nerdstats["total_tokens"], Value::Null);
5810 assert_eq!(nerdstats["ttft_ms"], Value::Null);
5811 assert!(nerdstats.get("step_timings").is_none());
5812 assert!(nerdstats.get("llm_node_metrics").is_none());
5813 assert_eq!(
5814 nerdstats["step_details"][0]["node_id"],
5815 Value::String("llm_node".to_string())
5816 );
5817 assert!(nerdstats["llm_node_models"]
5818 .as_object()
5819 .expect("llm_node_models should be an object")
5820 .is_empty());
5821 }
5822
5823 #[test]
5824 fn workflow_nerdstats_includes_ttft_when_available() {
5825 let output = YamlWorkflowRunOutput {
5826 workflow_id: "workflow".to_string(),
5827 entry_node: "start".to_string(),
5828 email_text: "hello".to_string(),
5829 trace: vec!["llm_node".to_string()],
5830 outputs: BTreeMap::new(),
5831 terminal_node: "llm_node".to_string(),
5832 terminal_output: None,
5833 step_timings: vec![YamlStepTiming {
5834 node_id: "llm_node".to_string(),
5835 node_kind: "llm_call".to_string(),
5836 elapsed_ms: 100,
5837 prompt_tokens: Some(10),
5838 completion_tokens: Some(15),
5839 total_tokens: Some(25),
5840 thinking_tokens: None,
5841 tokens_per_second: Some(150.0),
5842 }],
5843 llm_node_metrics: BTreeMap::new(),
5844 llm_node_models: BTreeMap::new(),
5845 total_elapsed_ms: 100,
5846 ttft_ms: Some(42),
5847 total_input_tokens: 10,
5848 total_output_tokens: 15,
5849 total_tokens: 25,
5850 total_thinking_tokens: None,
5851 tokens_per_second: 150.0,
5852 trace_id: Some("trace-2".to_string()),
5853 metadata: None,
5854 };
5855
5856 let nerdstats = workflow_nerdstats(&output);
5857 assert_eq!(nerdstats["ttft_ms"], Value::Number(42u64.into()));
5858 assert!(nerdstats.get("step_timings").is_none());
5859 assert!(nerdstats.get("llm_node_metrics").is_none());
5860 assert_eq!(
5861 nerdstats["step_details"][0]["node_id"],
5862 Value::String("llm_node".to_string())
5863 );
5864 assert!(nerdstats["llm_node_models"]
5865 .as_object()
5866 .expect("llm_node_models should be an object")
5867 .is_empty());
5868 }
5869
5870 struct MessageHistoryExecutor;
5871
5872 #[async_trait]
5873 impl YamlWorkflowLlmExecutor for MessageHistoryExecutor {
5874 async fn complete_structured(
5875 &self,
5876 request: YamlLlmExecutionRequest,
5877 _event_sink: Option<&dyn YamlWorkflowEventSink>,
5878 ) -> Result<YamlLlmExecutionResult, String> {
5879 let messages = request
5880 .messages
5881 .ok_or_else(|| "expected messages in request".to_string())?;
5882 if messages.len() != 2 {
5883 return Err(format!("expected 2 messages, got {}", messages.len()));
5884 }
5885 Ok(YamlLlmExecutionResult {
5886 payload: json!({"category":"termination","reason":"history"}),
5887 usage: Some(YamlLlmTokenUsage {
5888 prompt_tokens: 7,
5889 completion_tokens: 3,
5890 total_tokens: 10,
5891 thinking_tokens: None,
5892 }),
5893 ttft_ms: None,
5894 tool_calls: Vec::new(),
5895 })
5896 }
5897 }
5898
5899 #[tokio::test]
5900 async fn supports_messages_path_in_workflow_input() {
5901 let yaml = r#"
5902id: email-intake-classification
5903entry_node: classify_top_level
5904nodes:
5905 - id: classify_top_level
5906 node_type:
5907 llm_call:
5908 model: gpt-4.1
5909 messages_path: input.messages
5910 append_prompt_as_user: false
5911"#;
5912
5913 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5914 let input = json!({
5915 "email_text": "ignored",
5916 "messages": [
5917 {"role": "system", "content": "You are a classifier"},
5918 {"role": "user", "content": "Please classify this"}
5919 ]
5920 });
5921
5922 let output = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
5923 .await
5924 .expect("workflow should use chat history from input");
5925
5926 assert_eq!(output.terminal_node, "classify_top_level");
5927 assert_eq!(
5928 output.outputs["classify_top_level"]["output"]["reason"],
5929 Value::String("history".to_string())
5930 );
5931 }
5932
5933 #[tokio::test]
5934 async fn wrapper_entrypoints_produce_equivalent_outputs() {
5935 let yaml = r#"
5936id: wrapper-equivalence
5937entry_node: classify
5938nodes:
5939 - id: classify
5940 node_type:
5941 llm_call:
5942 model: gpt-4.1
5943 config:
5944 prompt: |
5945 Classify this email into exactly one category:
5946 {{ input.email_text }}
5947"#;
5948
5949 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5950 let input = json!({"email_text":"hello"});
5951
5952 let a = run_workflow_yaml(&workflow, &input, &MockExecutor)
5953 .await
5954 .expect("base entrypoint should execute");
5955 let b = run_workflow_yaml_with_custom_worker(&workflow, &input, &MockExecutor, None)
5956 .await
5957 .expect("custom worker wrapper should execute");
5958 let c = run_workflow_yaml_with_custom_worker_and_events_and_options(
5959 &workflow,
5960 &input,
5961 &MockExecutor,
5962 None,
5963 None,
5964 &YamlWorkflowRunOptions::default(),
5965 )
5966 .await
5967 .expect("events/options wrapper should execute");
5968
5969 assert_eq!(a.workflow_id, b.workflow_id);
5970 assert_eq!(a.workflow_id, c.workflow_id);
5971 assert_eq!(a.terminal_node, b.terminal_node);
5972 assert_eq!(a.terminal_node, c.terminal_node);
5973 assert_eq!(a.outputs, b.outputs);
5974 assert_eq!(a.outputs, c.outputs);
5975 assert_eq!(a.total_tokens, b.total_tokens);
5976 assert_eq!(a.total_tokens, c.total_tokens);
5977 }
5978
5979 #[tokio::test]
5980 async fn yaml_llm_tool_calling_captures_traces_and_supports_globals_reference() {
5981 let yaml = r#"
5982id: tool-calling-workflow
5983entry_node: generate_with_tool
5984nodes:
5985 - id: generate_with_tool
5986 node_type:
5987 llm_call:
5988 model: gpt-4.1
5989 tools_format: simplified
5990 max_tool_roundtrips: 1
5991 tool_calls_global_key: audit
5992 tools:
5993 - name: get_customer_context
5994 description: Fetch customer context
5995 input_schema:
5996 type: object
5997 properties:
5998 order_id: { type: string }
5999 required: [order_id]
6000 additionalProperties: false
6001 output_schema:
6002 type: object
6003 properties:
6004 customer_name: { type: string }
6005 required: [customer_name]
6006 additionalProperties: false
6007 config:
6008 output_schema:
6009 type: object
6010 properties:
6011 state: { type: string }
6012 required: [state]
6013 - id: personalize
6014 node_type:
6015 llm_call:
6016 model: gpt-4.1
6017 config:
6018 prompt: |
6019 Write an email greeting for {{ globals.audit.0.output.customer_name }}.
6020 output_schema:
6021 type: object
6022 properties:
6023 subject: { type: string }
6024 body: { type: string }
6025 required: [subject, body]
6026edges:
6027 - from: generate_with_tool
6028 to: personalize
6029"#;
6030
6031 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6032 let client = SimpleAgentsClientBuilder::new()
6033 .with_provider(Arc::new(ToolLoopProvider))
6034 .build()
6035 .expect("client should build");
6036 let worker = FixedToolWorker {
6037 payload: json!({"customer_name": "Ava"}),
6038 };
6039
6040 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6041 &workflow,
6042 &json!({"email_text":"hello"}),
6043 &client,
6044 Some(&worker),
6045 None,
6046 &YamlWorkflowRunOptions::default(),
6047 )
6048 .await
6049 .expect("workflow should execute");
6050
6051 assert_eq!(output.trace, vec!["generate_with_tool", "personalize"]);
6052 assert_eq!(
6053 output.outputs["generate_with_tool"]["tool_calls"][0]["output"]["customer_name"],
6054 Value::String("Ava".to_string())
6055 );
6056 let body = output.outputs["personalize"]["output"]["body"]
6057 .as_str()
6058 .expect("body should be string");
6059 assert!(body.contains("Ava"));
6060 }
6061
6062 #[tokio::test]
6063 async fn yaml_llm_tool_output_schema_mismatch_hard_fails_node() {
6064 let yaml = r#"
6065id: tool-calling-schema-fail
6066entry_node: generate_with_tool
6067nodes:
6068 - id: generate_with_tool
6069 node_type:
6070 llm_call:
6071 model: gpt-4.1
6072 tools_format: simplified
6073 max_tool_roundtrips: 1
6074 tools:
6075 - name: get_customer_context
6076 input_schema:
6077 type: object
6078 properties:
6079 order_id: { type: string }
6080 required: [order_id]
6081 additionalProperties: false
6082 output_schema:
6083 type: object
6084 properties:
6085 customer_name: { type: string }
6086 required: [customer_name]
6087 additionalProperties: false
6088 config:
6089 output_schema:
6090 type: object
6091 properties:
6092 state: { type: string }
6093 required: [state]
6094"#;
6095
6096 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6097 let client = SimpleAgentsClientBuilder::new()
6098 .with_provider(Arc::new(ToolLoopProvider))
6099 .build()
6100 .expect("client should build");
6101 let worker = FixedToolWorker {
6102 payload: json!({"unexpected": "shape"}),
6103 };
6104
6105 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6106 &workflow,
6107 &json!({"email_text":"hello"}),
6108 &client,
6109 Some(&worker),
6110 None,
6111 &YamlWorkflowRunOptions::default(),
6112 )
6113 .await
6114 .expect_err("workflow should hard-fail on schema mismatch");
6115
6116 match error {
6117 YamlWorkflowRunError::Llm { message, .. } => {
6118 assert!(message.contains("output failed schema validation"));
6119 }
6120 other => panic!("expected llm error, got {other:?}"),
6121 }
6122 }
6123
6124 #[tokio::test]
6125 async fn yaml_llm_unknown_tool_is_rejected_before_custom_worker_execution() {
6126 let yaml = r#"
6127id: unknown-tool-rejected
6128entry_node: generate_with_tool
6129nodes:
6130 - id: generate_with_tool
6131 node_type:
6132 llm_call:
6133 model: gpt-4.1
6134 tools_format: simplified
6135 max_tool_roundtrips: 1
6136 tools:
6137 - name: get_customer_context
6138 input_schema:
6139 type: object
6140 properties:
6141 order_id: { type: string }
6142 required: [order_id]
6143 config:
6144 output_schema:
6145 type: object
6146 properties:
6147 state: { type: string }
6148 required: [state]
6149"#;
6150
6151 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6152 let client = SimpleAgentsClientBuilder::new()
6153 .with_provider(Arc::new(UnknownToolProvider))
6154 .build()
6155 .expect("client should build");
6156 let worker = CountingToolWorker {
6157 execute_calls: AtomicUsize::new(0),
6158 };
6159
6160 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6161 &workflow,
6162 &json!({"email_text":"hello"}),
6163 &client,
6164 Some(&worker),
6165 None,
6166 &YamlWorkflowRunOptions::default(),
6167 )
6168 .await
6169 .expect_err("workflow should reject unknown tool before executing worker");
6170
6171 match error {
6172 YamlWorkflowRunError::Llm { message, .. } => {
6173 assert!(message.contains("model requested unknown tool 'unknown_tool'"));
6174 }
6175 other => panic!("expected llm error, got {other:?}"),
6176 }
6177
6178 assert_eq!(worker.execute_calls.load(Ordering::SeqCst), 0);
6179 }
6180
6181 #[test]
6182 fn validates_tools_format_mismatch() {
6183 let yaml = r#"
6184id: mismatch
6185entry_node: generate
6186nodes:
6187 - id: generate
6188 node_type:
6189 llm_call:
6190 model: gpt-4.1
6191 tools_format: openai
6192 tools:
6193 - name: get_customer_context
6194 input_schema:
6195 type: object
6196 properties:
6197 order_id: { type: string }
6198 required: [order_id]
6199"#;
6200
6201 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6202 let diagnostics = verify_yaml_workflow(&workflow);
6203 assert!(diagnostics
6204 .iter()
6205 .any(|diagnostic| diagnostic.code == "invalid_tools_format"));
6206 }
6207
6208 #[test]
6209 fn mock_custom_worker_supports_get_employee_record() {
6210 let result = mock_custom_worker_output(
6211 "get_employee_record",
6212 &json!({"employee_name": "Alex Johnson"}),
6213 )
6214 .expect("mock tool should resolve employee record");
6215
6216 assert_eq!(result["employee_id"], Value::String("EMP-2041".to_string()));
6217 assert_eq!(result["location"], Value::String("Austin".to_string()));
6218 }
6219
6220 #[tokio::test]
6221 async fn custom_worker_receives_trace_context_block() {
6222 let yaml = r#"
6223id: custom-worker-trace-context
6224entry_node: lookup
6225nodes:
6226 - id: lookup
6227 node_type:
6228 custom_worker:
6229 handler: GetRagData
6230 config:
6231 payload:
6232 topic: demo
6233"#;
6234
6235 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6236 let worker = CapturingWorker {
6237 context: Mutex::new(None),
6238 };
6239 let options = YamlWorkflowRunOptions {
6240 trace: YamlWorkflowTraceOptions {
6241 context: Some(YamlWorkflowTraceContextInput {
6242 trace_id: Some("trace-fixed-ctx".to_string()),
6243 traceparent: Some("00-trace-fixed-ctx-span-fixed-01".to_string()),
6244 ..YamlWorkflowTraceContextInput::default()
6245 }),
6246 tenant: YamlWorkflowTraceTenantContext {
6247 conversation_id: Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5".to_string()),
6248 request_id: Some("turn-7".to_string()),
6249 ..YamlWorkflowTraceTenantContext::default()
6250 },
6251 },
6252 ..YamlWorkflowRunOptions::default()
6253 };
6254
6255 run_workflow_yaml_with_custom_worker_and_events_and_options(
6256 &workflow,
6257 &json!({"email_text":"hello"}),
6258 &MockExecutor,
6259 Some(&worker),
6260 None,
6261 &options,
6262 )
6263 .await
6264 .expect("workflow should execute");
6265
6266 let captured_context = worker
6267 .context
6268 .lock()
6269 .expect("capturing worker lock should not be poisoned")
6270 .clone()
6271 .expect("custom worker should receive context");
6272
6273 assert_eq!(
6274 captured_context
6275 .get("trace")
6276 .and_then(|trace| trace.get("context"))
6277 .and_then(|context| context.get("trace_id"))
6278 .and_then(Value::as_str),
6279 Some("trace-fixed-ctx")
6280 );
6281 assert_eq!(
6282 captured_context
6283 .get("trace")
6284 .and_then(|trace| trace.get("context"))
6285 .and_then(|context| context.get("traceparent"))
6286 .and_then(Value::as_str),
6287 Some("00-trace-fixed-ctx-span-fixed-01")
6288 );
6289 assert_eq!(
6290 captured_context
6291 .get("trace")
6292 .and_then(|trace| trace.get("tenant"))
6293 .and_then(|tenant| tenant.get("conversation_id"))
6294 .and_then(Value::as_str),
6295 Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5")
6296 );
6297 }
6298
6299 #[tokio::test]
6300 async fn event_sink_cancellation_stops_workflow_before_llm_execution() {
6301 let yaml = r#"
6302id: cancellation-test
6303entry_node: classify
6304nodes:
6305 - id: classify
6306 node_type:
6307 llm_call:
6308 model: gpt-4.1
6309 config:
6310 prompt: |
6311 Classify this email into exactly one category:
6312 {{ input.email_text }}
6313"#;
6314
6315 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6316 let executor = CountingExecutor {
6317 call_count: AtomicUsize::new(0),
6318 };
6319 let sink = CancelAfterFirstEventSink {
6320 cancelled: AtomicBool::new(false),
6321 };
6322
6323 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6324 &workflow,
6325 &json!({"email_text":"hello"}),
6326 &executor,
6327 None,
6328 Some(&sink),
6329 &YamlWorkflowRunOptions::default(),
6330 )
6331 .await
6332 .expect_err("workflow should stop when sink signals cancellation");
6333
6334 assert!(matches!(
6335 err,
6336 YamlWorkflowRunError::EventSinkCancelled { .. }
6337 ));
6338 assert_eq!(executor.call_count.load(Ordering::SeqCst), 0);
6339 }
6340
6341 #[tokio::test]
6342 async fn rejects_invalid_messages_path_shape() {
6343 let yaml = r#"
6344id: email-intake-classification
6345entry_node: classify_top_level
6346nodes:
6347 - id: classify_top_level
6348 node_type:
6349 llm_call:
6350 model: gpt-4.1
6351 messages_path: input.messages
6352"#;
6353
6354 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6355 let input = json!({
6356 "email_text": "ignored",
6357 "messages": "not-a-list"
6358 });
6359
6360 let err = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
6361 .await
6362 .expect_err("workflow should fail for invalid messages shape");
6363
6364 assert!(matches!(err, YamlWorkflowRunError::Llm { .. }));
6365 }
6366
6367 #[test]
6368 fn renders_yaml_workflow_to_mermaid_with_switch_labels() {
6369 let yaml = r#"
6370id: chat-workflow
6371entry_node: decide
6372nodes:
6373 - id: decide
6374 node_type:
6375 switch:
6376 branches:
6377 - condition: '$.input.mode == "draft"'
6378 target: draft
6379 default: ask
6380 - id: draft
6381 node_type:
6382 llm_call:
6383 model: gpt-4.1
6384 - id: ask
6385 node_type:
6386 llm_call:
6387 model: gpt-4.1
6388edges:
6389 - from: draft
6390 to: ask
6391"#;
6392
6393 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6394 let mermaid = yaml_workflow_to_mermaid(&workflow);
6395
6396 assert!(mermaid.contains("flowchart TD"));
6397 assert!(mermaid.contains("decide -- \"route1\" --> draft"));
6398 assert!(mermaid.contains("decide -- \"default\" --> ask"));
6399 assert!(mermaid.contains("draft --> ask"));
6400 }
6401
6402 #[test]
6403 fn renders_yaml_workflow_tools_as_colored_tool_nodes() {
6404 let yaml = r#"
6405id: tool-graph
6406entry_node: chat
6407nodes:
6408 - id: chat
6409 node_type:
6410 llm_call:
6411 model: gemini-3-flash
6412 tools_format: simplified
6413 tools:
6414 - name: run_workflow_graph
6415 input_schema:
6416 type: object
6417 properties:
6418 workflow_id: { type: string }
6419 required: [workflow_id]
6420edges: []
6421"#;
6422
6423 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6424 let mermaid = yaml_workflow_to_mermaid(&workflow);
6425
6426 assert!(mermaid.contains("chat__tool_0"));
6427 assert!(mermaid.contains("chat -.-> chat__tool_0"));
6428 assert!(mermaid.contains("classDef toolNode"));
6429 assert!(mermaid.contains("class chat__tool_0 toolNode;"));
6430 }
6431
6432 #[test]
6433 fn renders_yaml_workflow_file_to_mermaid_with_subgraph_cluster_when_present() {
6434 let base_dir = std::env::temp_dir().join(format!(
6435 "simple_agents_mermaid_subgraph_{}",
6436 std::time::SystemTime::now()
6437 .duration_since(std::time::UNIX_EPOCH)
6438 .expect("unix epoch")
6439 .as_nanos()
6440 ));
6441 fs::create_dir_all(&base_dir).expect("temp dir should be created");
6442
6443 let orchestrator_path = base_dir.join("email-chat-orchestrator-with-subgraph-tool.yaml");
6444 let subgraph_path = base_dir.join("hr-warning-email-subgraph.yaml");
6445
6446 let orchestrator_yaml = r#"
6447id: email-chat-orchestrator-with-subgraph-tool
6448entry_node: respond_casual
6449nodes:
6450 - id: respond_casual
6451 node_type:
6452 llm_call:
6453 model: gemini-3-flash
6454 tools_format: simplified
6455 tools:
6456 - name: run_workflow_graph
6457 input_schema:
6458 type: object
6459 properties:
6460 workflow_id: { type: string }
6461 required: [workflow_id]
6462 config:
6463 prompt: |
6464 Call with:
6465 {
6466 "workflow_id": "hr_warning_email_subgraph"
6467 }
6468edges: []
6469"#;
6470
6471 let subgraph_yaml = r#"
6472id: hr-warning-email-subgraph
6473entry_node: draft_hr_warning_email
6474nodes:
6475 - id: draft_hr_warning_email
6476 node_type:
6477 llm_call:
6478 model: gemini-3-flash
6479edges: []
6480"#;
6481
6482 fs::write(&orchestrator_path, orchestrator_yaml).expect("orchestrator yaml written");
6483 fs::write(&subgraph_path, subgraph_yaml).expect("subgraph yaml written");
6484
6485 let mermaid =
6486 yaml_workflow_file_to_mermaid(&orchestrator_path).expect("mermaid should render");
6487
6488 assert!(mermaid.contains("Main: email-chat-orchestrator-with-subgraph-tool"));
6489 assert!(mermaid.contains("Subgraph: hr_warning_email_subgraph"));
6490 assert!(mermaid.contains("calls hr_warning_email_subgraph"));
6491 assert!(mermaid.contains("subgraph_1__draft_hr_warning_email"));
6492
6493 fs::remove_dir_all(base_dir).expect("temp dir removed");
6494 }
6495
6496 #[test]
6497 fn converts_yaml_workflow_to_ir_definition() {
6498 let yaml = r#"
6499id: chat-workflow
6500entry_node: classify
6501nodes:
6502 - id: classify
6503 node_type:
6504 llm_call:
6505 model: gpt-4.1
6506 config:
6507 prompt: |
6508 classify
6509 - id: route
6510 node_type:
6511 switch:
6512 branches:
6513 - condition: '$.nodes.classify.output.kind == "x"'
6514 target: done
6515 default: done
6516 - id: done
6517 node_type:
6518 custom_worker:
6519 handler: GetRagData
6520 config:
6521 payload:
6522 topic: test
6523edges:
6524 - from: classify
6525 to: route
6526"#;
6527
6528 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6529 let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
6530
6531 assert_eq!(ir.name, "chat-workflow");
6532 assert!(ir.nodes.iter().any(|n| n.id == "__yaml_start"));
6533 assert!(ir.nodes.iter().any(|n| n.id == "classify"));
6534 assert!(ir.nodes.iter().any(|n| n.id == "route"));
6535 assert!(ir.nodes.iter().any(|n| n.id == "done"));
6536 }
6537
6538 #[test]
6539 fn supports_yaml_to_ir_when_messages_path_is_used() {
6540 let yaml = r#"
6541id: chat-workflow
6542entry_node: classify
6543nodes:
6544 - id: classify
6545 node_type:
6546 llm_call:
6547 model: gpt-4.1
6548 messages_path: input.messages
6549"#;
6550
6551 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6552 let ir =
6553 yaml_workflow_to_ir(&workflow).expect("messages_path should convert to tool-based IR");
6554 assert!(ir.nodes.iter().any(|node| matches!(
6555 node.kind,
6556 crate::ir::NodeKind::Tool { ref tool, .. } if tool == "__yaml_llm_call"
6557 )));
6558 }
6559
6560 #[tokio::test]
6561 async fn workflow_output_contains_trace_id_in_both_locations() {
6562 let yaml = r#"
6563id: trace-test
6564entry_node: classify
6565nodes:
6566 - id: classify
6567 node_type:
6568 llm_call:
6569 model: gpt-4.1
6570 config:
6571 prompt: |
6572 Classify this email into exactly one category:
6573 {{ input.email_text }}
6574"#;
6575
6576 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6577 let output = run_workflow_yaml(&workflow, &json!({"email_text":"hello"}), &MockExecutor)
6578 .await
6579 .expect("workflow should execute");
6580
6581 let trace_id = output
6582 .trace_id
6583 .as_deref()
6584 .expect("trace_id should be present");
6585 assert!(!trace_id.is_empty());
6586 assert_eq!(
6587 output.metadata.as_ref().and_then(|value| {
6588 value
6589 .get("telemetry")
6590 .and_then(|telemetry| telemetry.get("trace_id"))
6591 .and_then(Value::as_str)
6592 }),
6593 Some(trace_id)
6594 );
6595 }
6596
6597 #[tokio::test]
6598 async fn workflow_run_options_sample_rate_zero_marks_trace_unsampled() {
6599 let yaml = r#"
6600id: sample-rate-zero
6601entry_node: classify
6602nodes:
6603 - id: classify
6604 node_type:
6605 llm_call:
6606 model: gpt-4.1
6607 config:
6608 prompt: |
6609 Classify this email into exactly one category:
6610 {{ input.email_text }}
6611"#;
6612
6613 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6614 let options = YamlWorkflowRunOptions {
6615 telemetry: YamlWorkflowTelemetryConfig {
6616 sample_rate: 0.0,
6617 ..YamlWorkflowTelemetryConfig::default()
6618 },
6619 trace: YamlWorkflowTraceOptions {
6620 context: Some(YamlWorkflowTraceContextInput {
6621 trace_id: Some("trace-sample-zero".to_string()),
6622 ..YamlWorkflowTraceContextInput::default()
6623 }),
6624 tenant: YamlWorkflowTraceTenantContext::default(),
6625 },
6626 model: None,
6627 };
6628
6629 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6630 &workflow,
6631 &json!({"email_text":"hello"}),
6632 &MockExecutor,
6633 None,
6634 None,
6635 &options,
6636 )
6637 .await
6638 .expect("workflow should execute");
6639
6640 assert_eq!(output.trace_id.as_deref(), Some("trace-sample-zero"));
6641 assert_eq!(
6642 output
6643 .metadata
6644 .as_ref()
6645 .and_then(|value| value.get("telemetry"))
6646 .and_then(|telemetry| telemetry.get("sampled"))
6647 .and_then(Value::as_bool),
6648 Some(false)
6649 );
6650 }
6651
6652 #[test]
6653 fn trace_id_from_traceparent_parses_w3c_header() {
6654 let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
6655 assert_eq!(
6656 trace_id_from_traceparent(traceparent).as_deref(),
6657 Some("4bf92f3577b34da6a3ce929d0e0e4736")
6658 );
6659 }
6660
6661 #[test]
6662 fn resolve_telemetry_context_marks_traceparent_source() {
6663 let mut options = YamlWorkflowRunOptions::default();
6664 options.trace.context = Some(YamlWorkflowTraceContextInput {
6665 traceparent: Some(
6666 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
6667 ),
6668 ..YamlWorkflowTraceContextInput::default()
6669 });
6670
6671 let context = resolve_telemetry_context(&options, None);
6672
6673 assert_eq!(context.trace_id_source, TraceIdSource::Traceparent);
6674 assert_eq!(
6675 context.trace_id.as_deref(),
6676 Some("4bf92f3577b34da6a3ce929d0e0e4736")
6677 );
6678 }
6679
6680 #[tokio::test]
6681 async fn workflow_run_options_derives_trace_id_from_traceparent_when_trace_id_missing() {
6682 let yaml = r#"
6683id: traceparent-derived
6684entry_node: classify
6685nodes:
6686 - id: classify
6687 node_type:
6688 llm_call:
6689 model: gpt-4.1
6690 config:
6691 prompt: |
6692 Classify this email into exactly one category:
6693 {{ input.email_text }}
6694"#;
6695
6696 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6697 let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
6698 let options = YamlWorkflowRunOptions {
6699 telemetry: YamlWorkflowTelemetryConfig {
6700 sample_rate: 0.0,
6701 ..YamlWorkflowTelemetryConfig::default()
6702 },
6703 trace: YamlWorkflowTraceOptions {
6704 context: Some(YamlWorkflowTraceContextInput {
6705 traceparent: Some(traceparent.to_string()),
6706 ..YamlWorkflowTraceContextInput::default()
6707 }),
6708 tenant: YamlWorkflowTraceTenantContext::default(),
6709 },
6710 model: None,
6711 };
6712
6713 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6714 &workflow,
6715 &json!({"email_text":"hello"}),
6716 &MockExecutor,
6717 None,
6718 None,
6719 &options,
6720 )
6721 .await
6722 .expect("workflow should execute");
6723
6724 assert_eq!(
6725 output.trace_id.as_deref(),
6726 Some("4bf92f3577b34da6a3ce929d0e0e4736")
6727 );
6728 assert_eq!(
6729 output
6730 .metadata
6731 .as_ref()
6732 .and_then(|value| value.get("telemetry"))
6733 .and_then(|telemetry| telemetry.get("sampled"))
6734 .and_then(Value::as_bool),
6735 Some(false)
6736 );
6737 }
6738
6739 #[tokio::test]
6740 async fn workflow_run_options_sample_rate_one_marks_trace_sampled() {
6741 let yaml = r#"
6742id: sample-rate-one
6743entry_node: classify
6744nodes:
6745 - id: classify
6746 node_type:
6747 llm_call:
6748 model: gpt-4.1
6749 config:
6750 prompt: |
6751 Classify this email into exactly one category:
6752 {{ input.email_text }}
6753"#;
6754
6755 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6756 let options = YamlWorkflowRunOptions {
6757 telemetry: YamlWorkflowTelemetryConfig {
6758 sample_rate: 1.0,
6759 ..YamlWorkflowTelemetryConfig::default()
6760 },
6761 trace: YamlWorkflowTraceOptions {
6762 context: Some(YamlWorkflowTraceContextInput {
6763 trace_id: Some("trace-sample-one".to_string()),
6764 ..YamlWorkflowTraceContextInput::default()
6765 }),
6766 tenant: YamlWorkflowTraceTenantContext::default(),
6767 },
6768 model: None,
6769 };
6770
6771 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6772 &workflow,
6773 &json!({"email_text":"hello"}),
6774 &MockExecutor,
6775 None,
6776 None,
6777 &options,
6778 )
6779 .await
6780 .expect("workflow should execute");
6781
6782 assert_eq!(output.trace_id.as_deref(), Some("trace-sample-one"));
6783 assert_eq!(
6784 output
6785 .metadata
6786 .as_ref()
6787 .and_then(|value| value.get("telemetry"))
6788 .and_then(|telemetry| telemetry.get("sampled"))
6789 .and_then(Value::as_bool),
6790 Some(true)
6791 );
6792 }
6793
6794 #[tokio::test]
6795 async fn workflow_run_options_sampling_is_deterministic_for_fixed_trace_id() {
6796 let yaml = r#"
6797id: sample-rate-deterministic
6798entry_node: classify
6799nodes:
6800 - id: classify
6801 node_type:
6802 llm_call:
6803 model: gpt-4.1
6804 config:
6805 prompt: |
6806 Classify this email into exactly one category:
6807 {{ input.email_text }}
6808"#;
6809
6810 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6811 let options = YamlWorkflowRunOptions {
6812 telemetry: YamlWorkflowTelemetryConfig {
6813 sample_rate: 0.5,
6814 ..YamlWorkflowTelemetryConfig::default()
6815 },
6816 trace: YamlWorkflowTraceOptions {
6817 context: Some(YamlWorkflowTraceContextInput {
6818 trace_id: Some("trace-sample-deterministic".to_string()),
6819 ..YamlWorkflowTraceContextInput::default()
6820 }),
6821 tenant: YamlWorkflowTraceTenantContext::default(),
6822 },
6823 model: None,
6824 };
6825
6826 let mut sampled_values = Vec::new();
6827 for _ in 0..3 {
6828 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6829 &workflow,
6830 &json!({"email_text":"hello"}),
6831 &MockExecutor,
6832 None,
6833 None,
6834 &options,
6835 )
6836 .await
6837 .expect("workflow should execute");
6838
6839 let sampled = output
6840 .metadata
6841 .as_ref()
6842 .and_then(|value| value.get("telemetry"))
6843 .and_then(|telemetry| telemetry.get("sampled"))
6844 .and_then(Value::as_bool)
6845 .expect("sampled flag should be present");
6846 sampled_values.push(sampled);
6847 }
6848
6849 assert!(sampled_values
6850 .iter()
6851 .all(|value| *value == sampled_values[0]));
6852 }
6853
6854 #[tokio::test]
6855 async fn workflow_run_options_reject_invalid_sample_rate() {
6856 let yaml = r#"
6857id: sample-rate-invalid
6858entry_node: classify
6859nodes:
6860 - id: classify
6861 node_type:
6862 llm_call:
6863 model: gpt-4.1
6864 config:
6865 prompt: |
6866 Classify this email into exactly one category:
6867 {{ input.email_text }}
6868"#;
6869
6870 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6871 let options = YamlWorkflowRunOptions {
6872 telemetry: YamlWorkflowTelemetryConfig {
6873 sample_rate: 1.1,
6874 ..YamlWorkflowTelemetryConfig::default()
6875 },
6876 ..YamlWorkflowRunOptions::default()
6877 };
6878
6879 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6880 &workflow,
6881 &json!({"email_text":"hello"}),
6882 &MockExecutor,
6883 None,
6884 None,
6885 &options,
6886 )
6887 .await
6888 .expect_err("invalid sample_rate should fail");
6889
6890 match err {
6891 YamlWorkflowRunError::InvalidInput { message } => {
6892 assert!(message.contains("telemetry.sample_rate"));
6893 }
6894 _ => panic!("expected invalid input error for sample_rate"),
6895 }
6896 }
6897
6898 #[tokio::test]
6899 async fn workflow_run_options_reject_nan_sample_rate() {
6900 let yaml = r#"
6901id: sample-rate-invalid
6902entry_node: classify
6903nodes:
6904 - id: classify
6905 node_type:
6906 llm_call:
6907 model: gpt-4.1
6908 config:
6909 prompt: |
6910 Classify this email into exactly one category:
6911 {{ input.email_text }}
6912"#;
6913
6914 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6915 let options = YamlWorkflowRunOptions {
6916 telemetry: YamlWorkflowTelemetryConfig {
6917 sample_rate: f32::NAN,
6918 ..YamlWorkflowTelemetryConfig::default()
6919 },
6920 ..YamlWorkflowRunOptions::default()
6921 };
6922
6923 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6924 &workflow,
6925 &json!({"email_text":"hello"}),
6926 &MockExecutor,
6927 None,
6928 None,
6929 &options,
6930 )
6931 .await
6932 .expect_err("nan sample_rate should fail");
6933
6934 assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
6935 }
6936
6937 #[test]
6938 fn subworkflow_options_inherit_parent_telemetry_configuration() {
6939 let mut parent_options = YamlWorkflowRunOptions::default();
6940 parent_options.telemetry.sample_rate = 0.0;
6941 parent_options.telemetry.payload_mode = YamlWorkflowPayloadMode::RedactedPayload;
6942 parent_options.telemetry.tool_trace_mode = YamlToolTraceMode::Redacted;
6943 parent_options.trace.tenant.conversation_id = Some("conv-123".to_string());
6944
6945 let parent_context = TraceContext {
6946 trace_id: Some("trace-parent".to_string()),
6947 span_id: Some("span-parent".to_string()),
6948 parent_span_id: None,
6949 traceparent: Some("00-trace-parent-span-parent-01".to_string()),
6950 tracestate: None,
6951 baggage: BTreeMap::new(),
6952 };
6953
6954 let subworkflow_options =
6955 build_subworkflow_options(&parent_options, Some(&parent_context), None);
6956
6957 assert_eq!(subworkflow_options.telemetry.sample_rate, 0.0);
6958 assert_eq!(
6959 subworkflow_options.telemetry.payload_mode,
6960 YamlWorkflowPayloadMode::RedactedPayload
6961 );
6962 assert_eq!(
6963 subworkflow_options.telemetry.tool_trace_mode,
6964 YamlToolTraceMode::Redacted
6965 );
6966 assert_eq!(
6967 subworkflow_options.trace.tenant.conversation_id.as_deref(),
6968 Some("conv-123")
6969 );
6970 assert_eq!(
6971 subworkflow_options
6972 .trace
6973 .context
6974 .as_ref()
6975 .and_then(|ctx| ctx.trace_id.as_deref()),
6976 Some("trace-parent")
6977 );
6978 }
6979
6980 #[tokio::test]
6981 async fn workflow_run_options_use_explicit_trace_id_and_payload_mode() {
6982 let yaml = r#"
6983id: trace-options-test
6984entry_node: classify
6985nodes:
6986 - id: classify
6987 node_type:
6988 llm_call:
6989 model: gpt-4.1
6990 config:
6991 prompt: |
6992 Classify this email into exactly one category:
6993 {{ input.email_text }}
6994"#;
6995
6996 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6997 let options = YamlWorkflowRunOptions {
6998 telemetry: YamlWorkflowTelemetryConfig {
6999 payload_mode: YamlWorkflowPayloadMode::RedactedPayload,
7000 ..YamlWorkflowTelemetryConfig::default()
7001 },
7002 trace: YamlWorkflowTraceOptions {
7003 context: Some(YamlWorkflowTraceContextInput {
7004 trace_id: Some("trace-fixed-123".to_string()),
7005 traceparent: Some("00-trace-fixed-123-span-1-01".to_string()),
7006 ..YamlWorkflowTraceContextInput::default()
7007 }),
7008 tenant: YamlWorkflowTraceTenantContext {
7009 conversation_id: Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42".to_string()),
7010 ..YamlWorkflowTraceTenantContext::default()
7011 },
7012 },
7013 model: None,
7014 };
7015
7016 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7017 &workflow,
7018 &json!({"email_text":"hello"}),
7019 &MockExecutor,
7020 None,
7021 None,
7022 &options,
7023 )
7024 .await
7025 .expect("workflow should execute");
7026
7027 assert_eq!(output.trace_id.as_deref(), Some("trace-fixed-123"));
7028 assert_eq!(
7029 output
7030 .metadata
7031 .as_ref()
7032 .and_then(|value| value.get("telemetry"))
7033 .and_then(|telemetry| telemetry.get("payload_mode"))
7034 .and_then(Value::as_str),
7035 Some("redacted_payload")
7036 );
7037 assert_eq!(
7038 output
7039 .metadata
7040 .as_ref()
7041 .and_then(|value| value.get("trace"))
7042 .and_then(|trace| trace.get("tenant"))
7043 .and_then(|tenant| tenant.get("conversation_id"))
7044 .and_then(Value::as_str),
7045 Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42")
7046 );
7047 }
7048
7049 #[test]
7050 fn streamed_payload_parser_extracts_last_json_object() {
7051 let raw = r#"{"state":"missing_scenario","reason":"ok"}
7052
7053extra reasoning text
7054
7055{"state":"ready","reason":"final"}"#;
7056
7057 let resolved = parse_streamed_structured_payload(raw, false)
7058 .expect("parser should extract final JSON object");
7059 assert_eq!(resolved.payload["state"], "ready");
7060 assert!(resolved.heal_confidence.is_none());
7061 }
7062
7063 #[test]
7064 fn streamed_payload_parser_handles_unbalanced_reasoning_before_json() {
7065 let raw = "reasoning text with unmatched { braces and thoughts\n{\"state\":\"ready\",\"reason\":\"final\"}";
7066
7067 let resolved = parse_streamed_structured_payload(raw, false)
7068 .expect("parser should recover final structured JSON object");
7069 assert_eq!(resolved.payload["state"], "ready");
7070 }
7071
7072 #[test]
7073 fn streamed_payload_parser_handles_markdown_with_heal() {
7074 let raw = r#"Some preface
7075```json
7076{
7077 "state": "missing_scenario",
7078 "reason": "Need more details"
7079}
7080```
7081Some trailing explanation"#;
7082
7083 let resolved = parse_streamed_structured_payload(raw, true)
7084 .expect("heal path should parse JSON block");
7085 assert_eq!(resolved.payload["state"], "missing_scenario");
7086 assert!(resolved.heal_confidence.is_some());
7087 }
7088
7089 #[test]
7090 fn streamed_payload_parser_errors_when_no_json_candidate_exists() {
7091 let raw = "No JSON in this streamed output";
7092 let error = parse_streamed_structured_payload(raw, false)
7093 .expect_err("strict stream parse should fail without JSON candidate");
7094 assert!(error.contains("no JSON object candidate found"));
7095 }
7096
7097 #[test]
7098 fn include_raw_stream_debug_events_defaults_to_false() {
7099 let _guard = stream_debug_env_lock()
7100 .lock()
7101 .expect("stream debug env lock should not be poisoned");
7102 std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7103 assert!(!include_raw_stream_debug_events());
7104 }
7105
7106 #[test]
7107 fn include_raw_stream_debug_events_accepts_truthy_values() {
7108 let _guard = stream_debug_env_lock()
7109 .lock()
7110 .expect("stream debug env lock should not be poisoned");
7111 std::env::set_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW", "true");
7112 assert!(include_raw_stream_debug_events());
7113 std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7114 }
7115
7116 #[test]
7117 fn structured_json_delta_filter_strips_reasoning_prefix_and_suffix() {
7118 let mut filter = StructuredJsonDeltaFilter::default();
7119 let chunks = vec![
7120 "I will think first... ",
7121 "{\"state\":\"missing_scenario\",",
7122 "\"reason\":\"Need more details\"}",
7123 " additional commentary",
7124 ];
7125
7126 let filtered = chunks
7127 .into_iter()
7128 .filter_map(|chunk| filter.split(chunk).0)
7129 .collect::<String>();
7130
7131 assert_eq!(
7132 filtered,
7133 "{\"state\":\"missing_scenario\",\"reason\":\"Need more details\"}"
7134 );
7135 }
7136
7137 #[test]
7138 fn structured_json_delta_filter_handles_braces_inside_strings() {
7139 let mut filter = StructuredJsonDeltaFilter::default();
7140 let chunks = vec![
7141 "preface ",
7142 "{\"reason\":\"brace } in text\",\"state\":\"ok\"}",
7143 " trailing",
7144 ];
7145
7146 let filtered = chunks
7147 .into_iter()
7148 .filter_map(|chunk| filter.split(chunk).0)
7149 .collect::<String>();
7150
7151 assert_eq!(
7152 filtered,
7153 "{\"reason\":\"brace } in text\",\"state\":\"ok\"}"
7154 );
7155 }
7156
7157 #[test]
7158 fn render_json_object_as_text_converts_top_level_fields() {
7159 let rendered =
7160 render_json_object_as_text(r#"{"question":"q","confidence":0.8,"nested":{"a":1}}"#);
7161 let lines: std::collections::HashSet<&str> = rendered.lines().collect();
7162
7163 assert_eq!(lines.len(), 3);
7164 assert!(lines.contains("question: q"));
7165 assert!(lines.contains("confidence: 0.8"));
7166 assert!(lines.contains("nested: {\"a\":1}"));
7167 }
7168
7169 #[test]
7170 fn stream_json_as_text_formatter_emits_once_when_complete() {
7171 let mut formatter = StreamJsonAsTextFormatter::default();
7172 formatter.push("{\"question\":\"hello\"}");
7173
7174 let first = formatter.emit_if_ready(true);
7175 let second = formatter.emit_if_ready(true);
7176
7177 assert_eq!(first, Some("question: hello".to_string()));
7178 assert_eq!(second, None);
7179 }
7180
7181 #[test]
7182 fn rewrite_yaml_condition_preserves_output_prefix_in_field_names() {
7183 let expr = "$.nodes.classify.output.output_total == 1";
7184 let rewritten = rewrite_yaml_condition_to_ir(expr);
7185 assert_eq!(rewritten, "$.node_outputs.classify.output_total == 1");
7186 }
7187
7188 #[tokio::test]
7189 async fn validates_workflow_input_before_ir_runtime_path() {
7190 let yaml = r#"
7191id: chat-workflow
7192entry_node: classify
7193nodes:
7194 - id: classify
7195 node_type:
7196 llm_call:
7197 model: gpt-4.1
7198 config:
7199 prompt: |
7200 classify
7201"#;
7202
7203 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7204 let err = run_workflow_yaml(&workflow, &json!("not-an-object"), &MockExecutor)
7205 .await
7206 .expect_err("non-object input should fail before execution");
7207
7208 assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
7209 }
7210
7211 #[test]
7212 fn interpolate_template_supports_dollar_prefixed_paths() {
7213 let context = json!({
7214 "input": {
7215 "email_text": "hello"
7216 }
7217 });
7218
7219 let rendered = interpolate_template("value={{ $.input.email_text }}", &context);
7220 assert_eq!(rendered, "value=hello");
7221 }
7222}