1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::Path;
3use std::time::Instant;
4
5use async_trait::async_trait;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use simple_agent_type::message::{Message, Role};
10use simple_agent_type::request::CompletionRequest;
11use simple_agent_type::response::FinishReason;
12use simple_agent_type::tool::{
13 ToolCall, ToolChoice, ToolChoiceFunction, ToolChoiceMode, ToolChoiceTool, ToolDefinition,
14 ToolFunction, ToolType,
15};
16use simple_agents_core::{
17 CompletionMode, CompletionOptions, CompletionOutcome, SimpleAgentsClient,
18};
19use simple_agents_healing::JsonishParser;
20use thiserror::Error;
21
22use crate::ir::{Node, NodeKind, RouterRoute, WorkflowDefinition, WORKFLOW_IR_V0};
23use crate::observability::tracing::{workflow_tracer, SpanKind, TraceContext, WorkflowSpan};
24use crate::runtime::{
25 LlmExecutionError, LlmExecutionInput, LlmExecutionOutput, LlmExecutor, ToolExecutionError,
26 ToolExecutionInput, ToolExecutor, WorkflowRuntime, WorkflowRuntimeError,
27 WorkflowRuntimeOptions,
28};
29use crate::visualize::workflow_to_mermaid;
30
31const YAML_START_NODE_ID: &str = "__yaml_start";
32const YAML_LLM_TOOL_ID: &str = "__yaml_llm_call";
33
34static TRACE_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
35
36#[derive(Debug, Clone, PartialEq, Serialize)]
37pub struct YamlStepTiming {
38 pub node_id: String,
39 pub node_kind: String,
40 pub elapsed_ms: u128,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub prompt_tokens: Option<u32>,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 pub completion_tokens: Option<u32>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub total_tokens: Option<u32>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub thinking_tokens: Option<u32>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub tokens_per_second: Option<f64>,
51}
52
53#[derive(Debug, Clone, PartialEq, Serialize)]
54pub struct YamlLlmNodeMetrics {
55 pub elapsed_ms: u128,
56 pub prompt_tokens: u32,
57 pub completion_tokens: u32,
58 pub total_tokens: u32,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub thinking_tokens: Option<u32>,
61 pub tokens_per_second: f64,
62}
63
64#[derive(Debug, Clone, PartialEq, Serialize)]
65pub struct YamlWorkflowRunOutput {
66 pub workflow_id: String,
67 pub entry_node: String,
68 pub email_text: String,
69 pub trace: Vec<String>,
70 pub outputs: BTreeMap<String, Value>,
71 pub terminal_node: String,
72 pub terminal_output: Option<Value>,
73 pub step_timings: Vec<YamlStepTiming>,
74 pub llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics>,
75 pub total_elapsed_ms: u128,
76 #[serde(skip_serializing_if = "Option::is_none")]
77 pub ttft_ms: Option<u128>,
78 pub total_input_tokens: u64,
79 pub total_output_tokens: u64,
80 pub total_tokens: u64,
81 #[serde(skip_serializing_if = "Option::is_none")]
82 pub total_thinking_tokens: Option<u64>,
83 pub tokens_per_second: f64,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub trace_id: Option<String>,
86 #[serde(skip_serializing_if = "Option::is_none")]
87 pub metadata: Option<Value>,
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
91#[serde(rename_all = "snake_case")]
92pub enum YamlWorkflowPayloadMode {
93 #[default]
94 FullPayload,
95 RedactedPayload,
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
99#[serde(rename_all = "snake_case")]
100pub enum YamlToolTraceMode {
101 #[default]
102 Full,
103 Redacted,
104 Off,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
108pub struct YamlWorkflowTraceContextInput {
109 #[serde(default)]
110 pub trace_id: Option<String>,
111 #[serde(default)]
112 pub span_id: Option<String>,
113 #[serde(default)]
114 pub parent_span_id: Option<String>,
115 #[serde(default)]
116 pub traceparent: Option<String>,
117 #[serde(default)]
118 pub tracestate: Option<String>,
119 #[serde(default)]
120 pub baggage: BTreeMap<String, String>,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
124pub struct YamlWorkflowTraceTenantContext {
125 #[serde(default)]
126 pub workspace_id: Option<String>,
127 #[serde(default)]
128 pub user_id: Option<String>,
129 #[serde(default)]
130 pub conversation_id: Option<String>,
131 #[serde(default)]
132 pub request_id: Option<String>,
133 #[serde(default)]
134 pub run_id: Option<String>,
135}
136
137#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
138pub struct YamlWorkflowTelemetryConfig {
139 #[serde(default = "default_true")]
140 pub enabled: bool,
141 #[serde(default = "default_true")]
142 pub nerdstats: bool,
143 #[serde(default = "default_sample_rate")]
144 pub sample_rate: f32,
145 #[serde(default)]
146 pub payload_mode: YamlWorkflowPayloadMode,
147 #[serde(default = "default_retention_days")]
148 pub retention_days: u32,
149 #[serde(default = "default_true")]
150 pub multi_tenant: bool,
151 #[serde(default)]
152 pub tool_trace_mode: YamlToolTraceMode,
153}
154
155impl Default for YamlWorkflowTelemetryConfig {
156 fn default() -> Self {
157 Self {
158 enabled: true,
159 nerdstats: true,
160 sample_rate: 1.0,
161 payload_mode: YamlWorkflowPayloadMode::FullPayload,
162 retention_days: 30,
163 multi_tenant: true,
164 tool_trace_mode: YamlToolTraceMode::Full,
165 }
166 }
167}
168
169#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
170pub struct YamlWorkflowTraceOptions {
171 #[serde(default)]
172 pub context: Option<YamlWorkflowTraceContextInput>,
173 #[serde(default)]
174 pub tenant: YamlWorkflowTraceTenantContext,
175}
176
177#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
178pub struct YamlWorkflowRunOptions {
179 #[serde(default)]
180 pub telemetry: YamlWorkflowTelemetryConfig,
181 #[serde(default)]
182 pub trace: YamlWorkflowTraceOptions,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
186pub struct YamlLlmTokenUsage {
187 pub prompt_tokens: u32,
188 pub completion_tokens: u32,
189 pub total_tokens: u32,
190 pub thinking_tokens: Option<u32>,
191}
192
193#[derive(Debug, Clone, PartialEq, Serialize)]
194pub struct YamlLlmExecutionResult {
195 pub payload: Value,
196 pub usage: Option<YamlLlmTokenUsage>,
197 pub ttft_ms: Option<u128>,
198 pub tool_calls: Vec<YamlToolCallTrace>,
199}
200
201#[derive(Debug, Clone, PartialEq, Serialize)]
202pub struct YamlToolCallTrace {
203 pub id: String,
204 pub name: String,
205 pub arguments: Value,
206 pub output: Option<Value>,
207 pub status: String,
208 pub elapsed_ms: u128,
209 pub error: Option<String>,
210}
211
212#[derive(Debug, Clone, Default)]
213struct YamlTokenTotals {
214 input_tokens: u64,
215 output_tokens: u64,
216 total_tokens: u64,
217 thinking_tokens: Option<u64>,
218}
219
220impl YamlTokenTotals {
221 fn add_usage(&mut self, usage: &YamlLlmTokenUsage) {
222 self.input_tokens += u64::from(usage.prompt_tokens);
223 self.output_tokens += u64::from(usage.completion_tokens);
224 self.total_tokens += u64::from(usage.total_tokens);
225
226 if let Some(thinking_tokens) = usage.thinking_tokens {
227 let next = self.thinking_tokens.unwrap_or(0) + u64::from(thinking_tokens);
228 self.thinking_tokens = Some(next);
229 }
230 }
231
232 fn tokens_per_second(&self, elapsed_ms: u128) -> f64 {
233 if elapsed_ms == 0 {
234 return 0.0;
235 }
236 round_two_decimals((self.output_tokens as f64) * 1000.0 / (elapsed_ms as f64))
237 }
238}
239
240fn round_two_decimals(value: f64) -> f64 {
241 (value * 100.0).round() / 100.0
242}
243
244fn completion_tokens_per_second(completion_tokens: u32, elapsed_ms: u128) -> f64 {
245 if elapsed_ms == 0 {
246 return 0.0;
247 }
248 round_two_decimals((completion_tokens as f64) * 1000.0 / (elapsed_ms as f64))
249}
250
251fn default_true() -> bool {
252 true
253}
254
255fn default_sample_rate() -> f32 {
256 1.0
257}
258
259fn default_retention_days() -> u32 {
260 30
261}
262
263fn generate_trace_id() -> String {
264 use std::time::{SystemTime, UNIX_EPOCH};
265
266 let now_nanos = SystemTime::now()
267 .duration_since(UNIX_EPOCH)
268 .map(|duration| duration.as_nanos())
269 .unwrap_or(0);
270 let sequence = u128::from(TRACE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
271 format!("{:032x}", now_nanos ^ sequence)
272}
273
274fn resolve_trace_id(options: &YamlWorkflowRunOptions, span_context: &TraceContext) -> String {
275 options
276 .trace
277 .context
278 .as_ref()
279 .and_then(|context| context.trace_id.clone())
280 .or_else(|| span_context.trace_id.clone())
281 .unwrap_or_else(generate_trace_id)
282}
283
284fn workflow_metadata_with_trace(options: &YamlWorkflowRunOptions, trace_id: &str) -> Value {
285 json!({
286 "telemetry": {
287 "trace_id": trace_id,
288 "enabled": options.telemetry.enabled,
289 "nerdstats": options.telemetry.nerdstats,
290 "sample_rate": options.telemetry.sample_rate,
291 "payload_mode": match options.telemetry.payload_mode {
292 YamlWorkflowPayloadMode::FullPayload => "full_payload",
293 YamlWorkflowPayloadMode::RedactedPayload => "redacted_payload",
294 },
295 "retention_days": options.telemetry.retention_days,
296 "multi_tenant": options.telemetry.multi_tenant,
297 "tool_trace_mode": match options.telemetry.tool_trace_mode {
298 YamlToolTraceMode::Full => "full",
299 YamlToolTraceMode::Redacted => "redacted",
300 YamlToolTraceMode::Off => "off",
301 },
302 },
303 "trace": {
304 "tenant": {
305 "workspace_id": options.trace.tenant.workspace_id,
306 "user_id": options.trace.tenant.user_id,
307 "conversation_id": options.trace.tenant.conversation_id,
308 "request_id": options.trace.tenant.request_id,
309 "run_id": options.trace.tenant.run_id,
310 }
311 },
312 })
313}
314
315fn apply_trace_tenant_attributes(span: &mut dyn WorkflowSpan, options: &YamlWorkflowRunOptions) {
316 if let Some(workspace_id) = options.trace.tenant.workspace_id.as_deref() {
317 span.set_attribute("tenant.workspace_id", workspace_id);
318 }
319 if let Some(user_id) = options.trace.tenant.user_id.as_deref() {
320 span.set_attribute("tenant.user_id", user_id);
321 }
322 if let Some(conversation_id) = options.trace.tenant.conversation_id.as_deref() {
323 span.set_attribute("tenant.conversation_id", conversation_id);
324 }
325 if let Some(request_id) = options.trace.tenant.request_id.as_deref() {
326 span.set_attribute("tenant.request_id", request_id);
327 }
328 if let Some(run_id) = options.trace.tenant.run_id.as_deref() {
329 span.set_attribute("tenant.run_id", run_id);
330 }
331}
332
333fn workflow_nerdstats(output: &YamlWorkflowRunOutput) -> Value {
334 let llm_nodes_without_usage: Vec<String> = output
335 .step_timings
336 .iter()
337 .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
338 .map(|step| step.node_id.clone())
339 .collect();
340 let token_metrics_available = llm_nodes_without_usage.is_empty();
341 let token_metrics_source = if token_metrics_available {
342 "provider_usage"
343 } else {
344 "provider_stream_usage_unavailable"
345 };
346 let total_input_tokens = if token_metrics_available {
347 json!(output.total_input_tokens)
348 } else {
349 Value::Null
350 };
351 let total_output_tokens = if token_metrics_available {
352 json!(output.total_output_tokens)
353 } else {
354 Value::Null
355 };
356 let total_tokens = if token_metrics_available {
357 json!(output.total_tokens)
358 } else {
359 Value::Null
360 };
361 let total_thinking_tokens = if token_metrics_available {
362 json!(output.total_thinking_tokens)
363 } else {
364 Value::Null
365 };
366 let tokens_per_second = if token_metrics_available {
367 json!(output.tokens_per_second)
368 } else {
369 Value::Null
370 };
371
372 json!({
373 "workflow_id": output.workflow_id,
374 "terminal_node": output.terminal_node,
375 "total_elapsed_ms": output.total_elapsed_ms,
376 "ttft_ms": output.ttft_ms,
377 "step_timings": output.step_timings,
378 "llm_node_metrics": output.llm_node_metrics,
379 "total_input_tokens": total_input_tokens,
380 "total_output_tokens": total_output_tokens,
381 "total_tokens": total_tokens,
382 "total_thinking_tokens": total_thinking_tokens,
383 "tokens_per_second": tokens_per_second,
384 "trace_id": output.trace_id,
385 "token_metrics_available": token_metrics_available,
386 "token_metrics_source": token_metrics_source,
387 "llm_nodes_without_usage": llm_nodes_without_usage,
388 })
389}
390
391fn payload_for_span(mode: YamlWorkflowPayloadMode, payload: &Value) -> String {
392 match mode {
393 YamlWorkflowPayloadMode::FullPayload => payload.to_string(),
394 YamlWorkflowPayloadMode::RedactedPayload => json!({
395 "redacted": true,
396 "value_type": match payload {
397 Value::Null => "null",
398 Value::Bool(_) => "bool",
399 Value::Number(_) => "number",
400 Value::String(_) => "string",
401 Value::Array(_) => "array",
402 Value::Object(_) => "object",
403 }
404 })
405 .to_string(),
406 }
407}
408
409fn payload_for_tool_trace(mode: YamlToolTraceMode, payload: &Value) -> Value {
410 match mode {
411 YamlToolTraceMode::Full => payload.clone(),
412 YamlToolTraceMode::Redacted => json!({
413 "redacted": true,
414 "value_type": json_type_name(payload),
415 }),
416 YamlToolTraceMode::Off => Value::Null,
417 }
418}
419
420fn validate_json_schema(schema: &Value) -> Result<(), String> {
421 jsonschema::JSONSchema::compile(schema)
422 .map(|_| ())
423 .map_err(|error| format!("invalid JSON schema: {error}"))
424}
425
426fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
427 let validator = jsonschema::JSONSchema::compile(schema)
428 .map_err(|error| format!("invalid JSON schema: {error}"))?;
429 if let Err(errors) = validator.validate(instance) {
430 let message = errors
431 .into_iter()
432 .next()
433 .map(|error| error.to_string())
434 .unwrap_or_else(|| "unknown schema validation error".to_string());
435 return Err(format!("schema validation failed: {message}"));
436 }
437 Ok(())
438}
439
440fn trace_context_from_options(options: &YamlWorkflowRunOptions) -> Option<TraceContext> {
441 options.trace.context.as_ref().map(|input| TraceContext {
442 trace_id: input.trace_id.clone(),
443 span_id: input.span_id.clone(),
444 parent_span_id: input.parent_span_id.clone(),
445 traceparent: input.traceparent.clone(),
446 tracestate: input.tracestate.clone(),
447 baggage: input.baggage.clone(),
448 })
449}
450
451fn merged_trace_context_for_worker(
452 span_context: Option<&TraceContext>,
453 resolved_trace_id: Option<&str>,
454 options: &YamlWorkflowRunOptions,
455) -> TraceContext {
456 let input_context = options.trace.context.as_ref();
457 let baggage = if let Some(context) = span_context {
458 if !context.baggage.is_empty() {
459 context.baggage.clone()
460 } else {
461 input_context
462 .map(|value| value.baggage.clone())
463 .unwrap_or_default()
464 }
465 } else {
466 input_context
467 .map(|value| value.baggage.clone())
468 .unwrap_or_default()
469 };
470
471 TraceContext {
472 trace_id: span_context
473 .and_then(|context| context.trace_id.clone())
474 .or_else(|| resolved_trace_id.map(|value| value.to_string()))
475 .or_else(|| input_context.and_then(|context| context.trace_id.clone())),
476 span_id: span_context
477 .and_then(|context| context.span_id.clone())
478 .or_else(|| input_context.and_then(|context| context.span_id.clone())),
479 parent_span_id: span_context
480 .and_then(|context| context.parent_span_id.clone())
481 .or_else(|| input_context.and_then(|context| context.parent_span_id.clone())),
482 traceparent: span_context
483 .and_then(|context| context.traceparent.clone())
484 .or_else(|| input_context.and_then(|context| context.traceparent.clone())),
485 tracestate: span_context
486 .and_then(|context| context.tracestate.clone())
487 .or_else(|| input_context.and_then(|context| context.tracestate.clone())),
488 baggage,
489 }
490}
491
492fn custom_worker_context_with_trace(
493 context: &Value,
494 trace_context: &TraceContext,
495 tenant_context: &YamlWorkflowTraceTenantContext,
496) -> Value {
497 let mut context_with_trace = context.clone();
498 let Some(root) = context_with_trace.as_object_mut() else {
499 return context_with_trace;
500 };
501
502 root.insert(
503 "trace".to_string(),
504 json!({
505 "context": {
506 "trace_id": trace_context.trace_id,
507 "span_id": trace_context.span_id,
508 "parent_span_id": trace_context.parent_span_id,
509 "traceparent": trace_context.traceparent,
510 "tracestate": trace_context.tracestate,
511 "baggage": trace_context.baggage,
512 },
513 "tenant": {
514 "workspace_id": tenant_context.workspace_id,
515 "user_id": tenant_context.user_id,
516 "conversation_id": tenant_context.conversation_id,
517 "request_id": tenant_context.request_id,
518 "run_id": tenant_context.run_id,
519 }
520 }),
521 );
522
523 context_with_trace
524}
525
526fn include_raw_stream_debug_events() -> bool {
527 match std::env::var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW") {
528 Ok(value) => {
529 let normalized = value.trim().to_ascii_lowercase();
530 normalized == "1" || normalized == "true" || normalized == "yes" || normalized == "on"
531 }
532 Err(_) => false,
533 }
534}
535
536#[derive(Debug)]
537struct StreamedPayloadResolution {
538 payload: Value,
539 heal_confidence: Option<f32>,
540}
541
542#[derive(Debug, Default)]
543struct StreamJsonAsTextFormatter {
544 raw_json: String,
545 emitted: bool,
546}
547
548impl StreamJsonAsTextFormatter {
549 fn push(&mut self, chunk: &str) {
550 self.raw_json.push_str(chunk);
551 }
552
553 fn emit_if_ready(&mut self, complete: bool) -> Option<String> {
554 if self.emitted || !complete {
555 return None;
556 }
557 self.emitted = true;
558 Some(render_json_object_as_text(self.raw_json.as_str()))
559 }
560}
561
562fn render_json_object_as_text(raw_json: &str) -> String {
563 let value = match serde_json::from_str::<Value>(raw_json) {
564 Ok(value) => value,
565 Err(_) => return raw_json.to_string(),
566 };
567 let Some(object) = value.as_object() else {
568 return raw_json.to_string();
569 };
570
571 let mut lines = Vec::with_capacity(object.len());
572 for (key, value) in object {
573 let rendered = match value {
574 Value::String(text) => text.clone(),
575 _ => value.to_string(),
576 };
577 lines.push(format!("{key}: {rendered}"));
578 }
579 lines.join("\n")
580}
581
582#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
583#[serde(rename_all = "snake_case")]
584pub enum YamlWorkflowTokenKind {
585 Output,
586 Thinking,
587}
588
589#[derive(Debug, Default)]
590struct StructuredJsonDeltaFilter {
591 started: bool,
592 completed: bool,
593 depth: u32,
594 in_string: bool,
595 escape: bool,
596}
597
598impl StructuredJsonDeltaFilter {
599 fn split(&mut self, delta: &str) -> (Option<String>, Option<String>) {
600 if delta.is_empty() {
601 return (None, None);
602 }
603
604 let mut output = String::new();
605 let mut thinking = String::new();
606
607 for ch in delta.chars() {
608 if self.completed {
609 thinking.push(ch);
610 continue;
611 }
612
613 if !self.started {
614 if ch != '{' {
615 thinking.push(ch);
616 continue;
617 }
618 self.started = true;
619 self.depth = 1;
620 output.push(ch);
621 continue;
622 }
623
624 output.push(ch);
625 if self.in_string {
626 if self.escape {
627 self.escape = false;
628 continue;
629 }
630 if ch == '\\' {
631 self.escape = true;
632 continue;
633 }
634 if ch == '"' {
635 self.in_string = false;
636 }
637 continue;
638 }
639
640 match ch {
641 '"' => self.in_string = true,
642 '{' => self.depth = self.depth.saturating_add(1),
643 '}' => {
644 if self.depth > 0 {
645 self.depth -= 1;
646 }
647 if self.depth == 0 {
648 self.completed = true;
649 }
650 }
651 _ => {}
652 }
653 }
654
655 let output = if output.is_empty() {
656 None
657 } else {
658 Some(output)
659 };
660 let thinking = if thinking.is_empty() {
661 None
662 } else {
663 Some(thinking)
664 };
665
666 (output, thinking)
667 }
668
669 fn completed(&self) -> bool {
670 self.completed
671 }
672}
673
674fn extract_last_fenced_json_block(raw: &str) -> Option<&str> {
675 let start = raw.rfind("```json")?;
676 let remainder = &raw[start + "```json".len()..];
677 let end = remainder.find("```")?;
678 let candidate = remainder[..end].trim();
679 if candidate.is_empty() {
680 return None;
681 }
682 Some(candidate)
683}
684
685fn extract_balanced_object_from(raw: &str, start_index: usize) -> Option<&str> {
686 let mut depth = 0u32;
687 let mut in_string = false;
688 let mut escape = false;
689
690 for (relative_index, ch) in raw[start_index..].char_indices() {
691 if in_string {
692 if escape {
693 escape = false;
694 continue;
695 }
696 if ch == '\\' {
697 escape = true;
698 continue;
699 }
700 if ch == '"' {
701 in_string = false;
702 }
703 continue;
704 }
705
706 match ch {
707 '"' => in_string = true,
708 '{' => depth = depth.saturating_add(1),
709 '}' => {
710 if depth == 0 {
711 return None;
712 }
713 depth -= 1;
714 if depth == 0 {
715 let end_index = start_index + relative_index + ch.len_utf8();
716 return Some(raw[start_index..end_index].trim());
717 }
718 }
719 _ => {}
720 }
721 }
722
723 None
724}
725
726fn extract_last_parsable_object(raw: &str) -> Option<&str> {
727 let starts: Vec<usize> = raw
728 .char_indices()
729 .filter_map(|(index, ch)| if ch == '{' { Some(index) } else { None })
730 .collect();
731
732 for start in starts.into_iter().rev() {
733 let Some(candidate) = extract_balanced_object_from(raw, start) else {
734 continue;
735 };
736 if serde_json::from_str::<Value>(candidate).is_ok() {
737 return Some(candidate);
738 }
739 }
740
741 None
742}
743
744fn resolve_structured_json_candidate(raw: &str) -> Option<&str> {
745 extract_last_fenced_json_block(raw).or_else(|| extract_last_parsable_object(raw))
746}
747
748fn parse_streamed_structured_payload(
749 raw: &str,
750 heal: bool,
751) -> Result<StreamedPayloadResolution, String> {
752 if !heal {
753 if let Ok(payload) = serde_json::from_str::<Value>(raw) {
754 return Ok(StreamedPayloadResolution {
755 payload,
756 heal_confidence: None,
757 });
758 }
759
760 let candidate = resolve_structured_json_candidate(raw).ok_or_else(|| {
761 "failed to parse streamed structured completion JSON: no JSON object candidate found"
762 .to_string()
763 })?;
764 let payload = serde_json::from_str::<Value>(candidate).map_err(|error| {
765 format!(
766 "failed to parse streamed structured completion JSON: {error}; candidate={candidate}"
767 )
768 })?;
769 return Ok(StreamedPayloadResolution {
770 payload,
771 heal_confidence: None,
772 });
773 }
774
775 let candidate = resolve_structured_json_candidate(raw).unwrap_or(raw);
776 let parser = JsonishParser::new();
777 let healed = parser
778 .parse(candidate)
779 .map_err(|error| format!("failed to heal streamed structured completion JSON: {error}"))?;
780
781 Ok(StreamedPayloadResolution {
782 payload: healed.value,
783 heal_confidence: Some(healed.confidence),
784 })
785}
786
787#[derive(Debug, Clone, PartialEq, Serialize)]
788pub struct YamlWorkflowEvent {
789 pub event_type: String,
790 pub node_id: Option<String>,
791 pub step_id: Option<String>,
792 pub node_kind: Option<String>,
793 pub streamable: Option<bool>,
794 pub message: Option<String>,
795 pub delta: Option<String>,
796 pub token_kind: Option<YamlWorkflowTokenKind>,
797 pub is_terminal_node_token: Option<bool>,
798 pub elapsed_ms: Option<u128>,
799 pub metadata: Option<Value>,
800}
801
802pub type WorkflowMessageRole = Role;
803
804#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
805pub struct WorkflowMessage {
806 pub role: WorkflowMessageRole,
807 pub content: String,
808 #[serde(default)]
809 pub name: Option<String>,
810 #[serde(default, alias = "toolCallId")]
811 pub tool_call_id: Option<String>,
812}
813
814#[derive(Debug, Clone, PartialEq, Serialize)]
815pub struct YamlTemplateBinding {
816 pub index: usize,
817 pub expression: String,
818 pub source_path: String,
819 pub resolved: Value,
820 pub resolved_type: String,
821 pub missing: bool,
822}
823
824#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
825pub enum YamlWorkflowDiagnosticSeverity {
826 Error,
827 Warning,
828}
829
830#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
831pub struct YamlWorkflowDiagnostic {
832 pub node_id: Option<String>,
833 pub code: String,
834 pub severity: YamlWorkflowDiagnosticSeverity,
835 pub message: String,
836}
837
838#[derive(Debug, Error)]
839pub enum YamlWorkflowRunError {
840 #[error("failed to read workflow yaml '{path}': {source}")]
841 Read {
842 path: String,
843 source: std::io::Error,
844 },
845 #[error("failed to parse workflow yaml '{path}': {source}")]
846 Parse {
847 path: String,
848 source: serde_yaml::Error,
849 },
850 #[error("workflow '{workflow_id}' has no nodes")]
851 EmptyNodes { workflow_id: String },
852 #[error("entry node '{entry_node}' does not exist")]
853 MissingEntry { entry_node: String },
854 #[error("unknown node id '{node_id}'")]
855 MissingNode { node_id: String },
856 #[error("unsupported node type in '{node_id}'")]
857 UnsupportedNodeType { node_id: String },
858 #[error("unsupported switch condition format: {condition}")]
859 UnsupportedCondition { condition: String },
860 #[error("switch node '{node_id}' has no valid next target")]
861 InvalidSwitchTarget { node_id: String },
862 #[error("llm returned non-object payload for node '{node_id}'")]
863 LlmPayloadNotObject { node_id: String },
864 #[error("custom worker handler '{handler}' is not supported")]
865 UnsupportedCustomHandler { handler: String },
866 #[error("llm execution failed for node '{node_id}': {message}")]
867 Llm { node_id: String, message: String },
868 #[error("custom worker execution failed for node '{node_id}': {message}")]
869 CustomWorker { node_id: String, message: String },
870 #[error("workflow validation failed with {diagnostics_count} error(s)")]
871 Validation {
872 diagnostics_count: usize,
873 diagnostics: Vec<YamlWorkflowDiagnostic>,
874 },
875 #[error("invalid workflow input: {message}")]
876 InvalidInput { message: String },
877 #[error("ir runtime execution failed: {message}")]
878 IrRuntime { message: String },
879 #[error("workflow event stream cancelled: {message}")]
880 EventSinkCancelled { message: String },
881}
882
883pub trait YamlWorkflowEventSink: Send + Sync {
884 fn emit(&self, event: &YamlWorkflowEvent);
885
886 fn is_cancelled(&self) -> bool {
887 false
888 }
889}
890
891pub struct NoopYamlWorkflowEventSink;
892
893impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
894 fn emit(&self, _event: &YamlWorkflowEvent) {}
895}
896
897fn workflow_event_sink_cancelled_message() -> &'static str {
898 "workflow event callback cancelled"
899}
900
901fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
902 event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
903}
904
905#[derive(Debug, Clone, PartialEq, Eq, Error)]
906pub enum YamlToIrError {
907 #[error("entry node '{entry_node}' does not exist")]
908 MissingEntry { entry_node: String },
909 #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
910 MultipleOutgoingEdge { node_id: String },
911 #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
912 UnsupportedNode { node_id: String, reason: String },
913}
914
915pub fn yaml_workflow_to_mermaid(workflow: &YamlWorkflow) -> String {
917 if let Ok(ir) = yaml_workflow_to_ir(workflow) {
918 return workflow_to_mermaid(&ir);
919 }
920
921 yaml_workflow_to_mermaid_fallback(workflow)
922}
923
924fn yaml_workflow_to_mermaid_fallback(workflow: &YamlWorkflow) -> String {
925 let mut lines = Vec::new();
926 lines.push("flowchart TD".to_string());
927
928 for node in &workflow.nodes {
929 lines.push(format!(
930 " {}[\"{}\\n({})\"]",
931 sanitize_mermaid_id(&node.id),
932 escape_mermaid_label(&node.id),
933 node.kind_name()
934 ));
935 }
936
937 let mut emitted: HashSet<(String, String, String)> = HashSet::new();
938
939 for edge in &workflow.edges {
940 emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
941 }
942
943 for node in &workflow.nodes {
944 if let Some(switch) = node.node_type.switch.as_ref() {
945 for branch in &switch.branches {
946 emitted.insert((
947 node.id.clone(),
948 branch.condition.clone(),
949 branch.target.clone(),
950 ));
951 }
952 emitted.insert((
953 node.id.clone(),
954 "default".to_string(),
955 switch.default.clone(),
956 ));
957 }
958 }
959
960 let mut edges = emitted.into_iter().collect::<Vec<_>>();
961 edges.sort();
962
963 for (from, label, to) in edges {
964 if label.is_empty() {
965 lines.push(format!(
966 " {} --> {}",
967 sanitize_mermaid_id(&from),
968 sanitize_mermaid_id(&to)
969 ));
970 } else {
971 lines.push(format!(
972 " {} -- \"{}\" --> {}",
973 sanitize_mermaid_id(&from),
974 escape_mermaid_label(&label),
975 sanitize_mermaid_id(&to)
976 ));
977 }
978 }
979
980 lines.join("\n")
981}
982
983pub fn yaml_workflow_file_to_mermaid(workflow_path: &Path) -> Result<String, YamlWorkflowRunError> {
985 let contents =
986 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
987 path: workflow_path.display().to_string(),
988 source,
989 })?;
990
991 let workflow: YamlWorkflow =
992 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
993 path: workflow_path.display().to_string(),
994 source,
995 })?;
996
997 Ok(yaml_workflow_to_mermaid(&workflow))
998}
999
1000pub fn yaml_workflow_to_ir(workflow: &YamlWorkflow) -> Result<WorkflowDefinition, YamlToIrError> {
1001 let known_ids: HashSet<&str> = workflow.nodes.iter().map(|n| n.id.as_str()).collect();
1002 if !known_ids.contains(workflow.entry_node.as_str()) {
1003 return Err(YamlToIrError::MissingEntry {
1004 entry_node: workflow.entry_node.clone(),
1005 });
1006 }
1007
1008 let mut outgoing: HashMap<&str, Vec<&str>> = HashMap::new();
1009 for edge in &workflow.edges {
1010 outgoing
1011 .entry(edge.from.as_str())
1012 .or_default()
1013 .push(edge.to.as_str());
1014 }
1015
1016 let mut nodes = Vec::with_capacity(workflow.nodes.len() + 1);
1017 nodes.push(Node {
1018 id: YAML_START_NODE_ID.to_string(),
1019 kind: NodeKind::Start {
1020 next: workflow.entry_node.clone(),
1021 },
1022 });
1023
1024 for node in &workflow.nodes {
1025 if let Some(llm) = node.node_type.llm_call.as_ref() {
1026 if node
1027 .config
1028 .as_ref()
1029 .and_then(|c| c.set_globals.as_ref())
1030 .is_some()
1031 || node
1032 .config
1033 .as_ref()
1034 .and_then(|c| c.update_globals.as_ref())
1035 .is_some()
1036 {
1037 return Err(YamlToIrError::UnsupportedNode {
1038 node_id: node.id.clone(),
1039 reason: "set_globals/update_globals are not represented in canonical IR llm nodes yet"
1040 .to_string(),
1041 });
1042 }
1043
1044 if !llm.tools.is_empty() {
1045 return Err(YamlToIrError::UnsupportedNode {
1046 node_id: node.id.clone(),
1047 reason: "llm_call.tools are not represented in canonical IR llm nodes yet"
1048 .to_string(),
1049 });
1050 }
1051
1052 let next = single_next_for_node(&outgoing, &node.id)?;
1053 nodes.push(Node {
1054 id: node.id.clone(),
1055 kind: NodeKind::Tool {
1056 tool: YAML_LLM_TOOL_ID.to_string(),
1057 input: json!({
1058 "node_id": node.id,
1059 "model": llm.model,
1060 "prompt_template": node
1061 .config
1062 .as_ref()
1063 .and_then(|c| c.prompt.clone())
1064 .unwrap_or_default(),
1065 "stream": llm.stream.unwrap_or(false),
1066 "stream_json_as_text": llm.stream_json_as_text.unwrap_or(false),
1067 "heal": llm.heal.unwrap_or(false),
1068 "messages_path": llm.messages_path,
1069 "append_prompt_as_user": llm.append_prompt_as_user.unwrap_or(true),
1070 "output_schema": node
1071 .config
1072 .as_ref()
1073 .and_then(|c| c.output_schema.clone())
1074 .unwrap_or_else(default_llm_output_schema),
1075 }),
1076 next,
1077 },
1078 });
1079 continue;
1080 }
1081
1082 if let Some(worker) = node.node_type.custom_worker.as_ref() {
1083 if node
1084 .config
1085 .as_ref()
1086 .and_then(|c| c.set_globals.as_ref())
1087 .is_some()
1088 || node
1089 .config
1090 .as_ref()
1091 .and_then(|c| c.update_globals.as_ref())
1092 .is_some()
1093 {
1094 return Err(YamlToIrError::UnsupportedNode {
1095 node_id: node.id.clone(),
1096 reason: "set_globals/update_globals are not represented in canonical IR tool nodes yet"
1097 .to_string(),
1098 });
1099 }
1100
1101 let next = single_next_for_node(&outgoing, &node.id)?;
1102 nodes.push(Node {
1103 id: node.id.clone(),
1104 kind: NodeKind::Tool {
1105 tool: worker.handler.clone(),
1106 input: node
1107 .config
1108 .as_ref()
1109 .and_then(|c| c.payload.clone())
1110 .unwrap_or_else(|| json!({})),
1111 next,
1112 },
1113 });
1114 continue;
1115 }
1116
1117 if let Some(switch) = node.node_type.switch.as_ref() {
1118 nodes.push(Node {
1119 id: node.id.clone(),
1120 kind: NodeKind::Router {
1121 routes: switch
1122 .branches
1123 .iter()
1124 .map(|b| RouterRoute {
1125 when: rewrite_yaml_condition_to_ir(&b.condition),
1126 next: b.target.clone(),
1127 })
1128 .collect(),
1129 default: switch.default.clone(),
1130 },
1131 });
1132 continue;
1133 }
1134
1135 return Err(YamlToIrError::UnsupportedNode {
1136 node_id: node.id.clone(),
1137 reason: "node_type must be llm_call, switch, or custom_worker".to_string(),
1138 });
1139 }
1140
1141 Ok(WorkflowDefinition {
1142 version: WORKFLOW_IR_V0.to_string(),
1143 name: workflow.id.clone(),
1144 nodes,
1145 })
1146}
1147
1148fn single_next_for_node(
1149 outgoing: &HashMap<&str, Vec<&str>>,
1150 node_id: &str,
1151) -> Result<Option<String>, YamlToIrError> {
1152 match outgoing.get(node_id) {
1153 None => Ok(None),
1154 Some(targets) if targets.len() == 1 => Ok(Some(targets[0].to_string())),
1155 Some(_) => Err(YamlToIrError::MultipleOutgoingEdge {
1156 node_id: node_id.to_string(),
1157 }),
1158 }
1159}
1160
1161fn rewrite_yaml_condition_to_ir(expr: &str) -> String {
1162 let rewritten = expr
1163 .replace("$.nodes.", "$.node_outputs.")
1164 .replace(".output.", ".");
1165 if let Some(prefix) = rewritten.strip_suffix(".output") {
1166 prefix.to_string()
1167 } else {
1168 rewritten
1169 }
1170}
1171
1172fn sanitize_mermaid_id(id: &str) -> String {
1173 let mut out = String::with_capacity(id.len() + 1);
1174 if id
1175 .chars()
1176 .next()
1177 .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
1178 {
1179 out.push_str(id);
1180 } else {
1181 out.push('n');
1182 out.push('_');
1183 out.push_str(id);
1184 }
1185 out.chars()
1186 .map(|ch| {
1187 if ch.is_ascii_alphanumeric() || ch == '_' {
1188 ch
1189 } else {
1190 '_'
1191 }
1192 })
1193 .collect()
1194}
1195
1196fn escape_mermaid_label(label: &str) -> String {
1197 label.replace('"', "\\\"")
1198}
1199
1200#[derive(Debug, Clone)]
1201pub struct YamlLlmExecutionRequest {
1202 pub node_id: String,
1203 pub is_terminal_node: bool,
1204 pub stream_json_as_text: bool,
1205 pub model: String,
1206 pub messages: Option<Vec<Message>>,
1207 pub append_prompt_as_user: bool,
1208 pub prompt: String,
1209 pub prompt_template: String,
1210 pub prompt_bindings: Vec<YamlTemplateBinding>,
1211 pub schema: Value,
1212 pub stream: bool,
1213 pub heal: bool,
1214 pub tools: Vec<YamlResolvedTool>,
1215 pub tool_choice: Option<ToolChoice>,
1216 pub max_tool_roundtrips: u8,
1217 pub tool_calls_global_key: Option<String>,
1218 pub tool_trace_mode: YamlToolTraceMode,
1219 pub execution_context: Value,
1220 pub email_text: String,
1221}
1222
1223#[derive(Debug, Clone)]
1224pub struct YamlResolvedTool {
1225 pub definition: ToolDefinition,
1226 pub output_schema: Option<Value>,
1227}
1228
1229#[async_trait]
1230pub trait YamlWorkflowLlmExecutor: Send + Sync {
1231 async fn complete_structured(
1232 &self,
1233 request: YamlLlmExecutionRequest,
1234 event_sink: Option<&dyn YamlWorkflowEventSink>,
1235 ) -> Result<YamlLlmExecutionResult, String>;
1236}
1237
1238#[async_trait]
1239pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
1240 async fn execute(
1241 &self,
1242 handler: &str,
1243 payload: &Value,
1244 email_text: &str,
1245 context: &Value,
1246 ) -> Result<Value, String>;
1247}
1248
1249pub async fn run_workflow_yaml_file(
1250 workflow_path: &Path,
1251 workflow_input: &Value,
1252 executor: &dyn YamlWorkflowLlmExecutor,
1253) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1254 let contents =
1255 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1256 path: workflow_path.display().to_string(),
1257 source,
1258 })?;
1259
1260 let workflow: YamlWorkflow =
1261 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1262 path: workflow_path.display().to_string(),
1263 source,
1264 })?;
1265
1266 run_workflow_yaml(&workflow, workflow_input, executor).await
1267}
1268
1269pub async fn run_email_workflow_yaml_file(
1270 workflow_path: &Path,
1271 email_text: &str,
1272 executor: &dyn YamlWorkflowLlmExecutor,
1273) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1274 let workflow_input = json!({ "email_text": email_text });
1275 run_workflow_yaml_file(workflow_path, &workflow_input, executor).await
1276}
1277
1278pub async fn run_workflow_yaml_file_with_client(
1279 workflow_path: &Path,
1280 workflow_input: &Value,
1281 client: &SimpleAgentsClient,
1282) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1283 let contents =
1284 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1285 path: workflow_path.display().to_string(),
1286 source,
1287 })?;
1288
1289 let workflow: YamlWorkflow =
1290 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1291 path: workflow_path.display().to_string(),
1292 source,
1293 })?;
1294
1295 run_workflow_yaml_with_client(&workflow, workflow_input, client).await
1296}
1297
1298pub async fn run_email_workflow_yaml_file_with_client(
1299 workflow_path: &Path,
1300 email_text: &str,
1301 client: &SimpleAgentsClient,
1302) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1303 let workflow_input = json!({ "email_text": email_text });
1304 run_workflow_yaml_file_with_client(workflow_path, &workflow_input, client).await
1305}
1306
1307pub async fn run_workflow_yaml_with_client(
1308 workflow: &YamlWorkflow,
1309 workflow_input: &Value,
1310 client: &SimpleAgentsClient,
1311) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1312 run_workflow_yaml_with_client_and_custom_worker(workflow, workflow_input, client, None).await
1313}
1314
1315pub async fn run_email_workflow_yaml_with_client(
1316 workflow: &YamlWorkflow,
1317 email_text: &str,
1318 client: &SimpleAgentsClient,
1319) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1320 let workflow_input = json!({ "email_text": email_text });
1321 run_workflow_yaml_with_client(workflow, &workflow_input, client).await
1322}
1323
1324pub async fn run_workflow_yaml_file_with_client_and_custom_worker(
1325 workflow_path: &Path,
1326 workflow_input: &Value,
1327 client: &SimpleAgentsClient,
1328 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1329) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1330 let contents =
1331 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1332 path: workflow_path.display().to_string(),
1333 source,
1334 })?;
1335
1336 let workflow: YamlWorkflow =
1337 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1338 path: workflow_path.display().to_string(),
1339 source,
1340 })?;
1341
1342 run_workflow_yaml_with_client_and_custom_worker(
1343 &workflow,
1344 workflow_input,
1345 client,
1346 custom_worker,
1347 )
1348 .await
1349}
1350
1351pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker(
1352 workflow_path: &Path,
1353 email_text: &str,
1354 client: &SimpleAgentsClient,
1355 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1356) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1357 let workflow_input = json!({ "email_text": email_text });
1358 run_workflow_yaml_file_with_client_and_custom_worker(
1359 workflow_path,
1360 &workflow_input,
1361 client,
1362 custom_worker,
1363 )
1364 .await
1365}
1366
1367pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1368 workflow_path: &Path,
1369 workflow_input: &Value,
1370 client: &SimpleAgentsClient,
1371 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1372 event_sink: Option<&dyn YamlWorkflowEventSink>,
1373) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1374 run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1375 workflow_path,
1376 workflow_input,
1377 client,
1378 custom_worker,
1379 event_sink,
1380 &YamlWorkflowRunOptions::default(),
1381 )
1382 .await
1383}
1384
1385pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1386 workflow_path: &Path,
1387 workflow_input: &Value,
1388 client: &SimpleAgentsClient,
1389 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1390 event_sink: Option<&dyn YamlWorkflowEventSink>,
1391 options: &YamlWorkflowRunOptions,
1392) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1393 let contents =
1394 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1395 path: workflow_path.display().to_string(),
1396 source,
1397 })?;
1398
1399 let workflow: YamlWorkflow =
1400 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1401 path: workflow_path.display().to_string(),
1402 source,
1403 })?;
1404
1405 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1406 &workflow,
1407 workflow_input,
1408 client,
1409 custom_worker,
1410 event_sink,
1411 options,
1412 )
1413 .await
1414}
1415
1416pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker_and_events(
1417 workflow_path: &Path,
1418 email_text: &str,
1419 client: &SimpleAgentsClient,
1420 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1421 event_sink: Option<&dyn YamlWorkflowEventSink>,
1422) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1423 let workflow_input = json!({ "email_text": email_text });
1424 run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1425 workflow_path,
1426 &workflow_input,
1427 client,
1428 custom_worker,
1429 event_sink,
1430 )
1431 .await
1432}
1433
1434pub async fn run_workflow_yaml_with_client_and_custom_worker(
1435 workflow: &YamlWorkflow,
1436 workflow_input: &Value,
1437 client: &SimpleAgentsClient,
1438 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1439) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1440 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1441 workflow,
1442 workflow_input,
1443 client,
1444 custom_worker,
1445 None,
1446 &YamlWorkflowRunOptions::default(),
1447 )
1448 .await
1449}
1450
1451pub async fn run_email_workflow_yaml_with_client_and_custom_worker(
1452 workflow: &YamlWorkflow,
1453 email_text: &str,
1454 client: &SimpleAgentsClient,
1455 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1456) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1457 let workflow_input = json!({ "email_text": email_text });
1458 run_workflow_yaml_with_client_and_custom_worker(
1459 workflow,
1460 &workflow_input,
1461 client,
1462 custom_worker,
1463 )
1464 .await
1465}
1466
1467pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events(
1468 workflow: &YamlWorkflow,
1469 workflow_input: &Value,
1470 client: &SimpleAgentsClient,
1471 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1472 event_sink: Option<&dyn YamlWorkflowEventSink>,
1473) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1474 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1475 workflow,
1476 workflow_input,
1477 client,
1478 custom_worker,
1479 event_sink,
1480 &YamlWorkflowRunOptions::default(),
1481 )
1482 .await
1483}
1484
1485pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1486 workflow: &YamlWorkflow,
1487 workflow_input: &Value,
1488 client: &SimpleAgentsClient,
1489 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1490 event_sink: Option<&dyn YamlWorkflowEventSink>,
1491 options: &YamlWorkflowRunOptions,
1492) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1493 struct BorrowedClientExecutor<'a> {
1494 client: &'a SimpleAgentsClient,
1495 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
1496 }
1497
1498 #[async_trait]
1499 impl<'a> YamlWorkflowLlmExecutor for BorrowedClientExecutor<'a> {
1500 async fn complete_structured(
1501 &self,
1502 request: YamlLlmExecutionRequest,
1503 event_sink: Option<&dyn YamlWorkflowEventSink>,
1504 ) -> Result<YamlLlmExecutionResult, String> {
1505 let messages = if let Some(mut history) = request.messages.clone() {
1506 if request.append_prompt_as_user && !request.prompt.trim().is_empty() {
1507 history.push(Message::user(&request.prompt));
1508 }
1509 history
1510 } else {
1511 vec![
1512 Message::system("You execute workflow classification steps."),
1513 Message::user(&request.prompt),
1514 ]
1515 };
1516
1517 if !request.tools.is_empty() {
1518 if request.stream {
1519 return Err(
1520 "llm_call.stream=true is not supported when llm_call.tools are configured"
1521 .to_string(),
1522 );
1523 }
1524
1525 let mut tool_traces: Vec<YamlToolCallTrace> = Vec::new();
1526 let mut conversation = messages;
1527 let mut usage_total: Option<YamlLlmTokenUsage> = None;
1528
1529 for roundtrip in 0..=request.max_tool_roundtrips {
1530 let mut builder = CompletionRequest::builder()
1531 .model(&request.model)
1532 .messages(conversation.clone())
1533 .tools(request.tools.iter().map(|t| t.definition.clone()).collect());
1534
1535 if let Some(choice) = request.tool_choice.clone() {
1536 builder = builder.tool_choice(choice);
1537 }
1538
1539 let completion_request = builder
1540 .build()
1541 .map_err(|error| format!("failed to build completion request: {error}"))?;
1542
1543 let outcome = self
1544 .client
1545 .complete(&completion_request, CompletionOptions::default())
1546 .await
1547 .map_err(|error| error.to_string())?;
1548
1549 let response = match outcome {
1550 CompletionOutcome::Response(response) => response,
1551 CompletionOutcome::HealedJson(healed) => healed.response,
1552 CompletionOutcome::CoercedSchema(coerced) => coerced.response,
1553 CompletionOutcome::Stream(_) => {
1554 return Err(
1555 "streaming outcome is unsupported for tool-enabled llm_call"
1556 .to_string(),
1557 )
1558 }
1559 };
1560
1561 if let Some(usage) = usage_total.as_mut() {
1562 usage.prompt_tokens += response.usage.prompt_tokens;
1563 usage.completion_tokens += response.usage.completion_tokens;
1564 usage.total_tokens += response.usage.total_tokens;
1565 } else {
1566 usage_total = Some(YamlLlmTokenUsage {
1567 prompt_tokens: response.usage.prompt_tokens,
1568 completion_tokens: response.usage.completion_tokens,
1569 total_tokens: response.usage.total_tokens,
1570 thinking_tokens: None,
1571 });
1572 }
1573
1574 let choice = response
1575 .choices
1576 .first()
1577 .ok_or_else(|| "completion returned no choices".to_string())?;
1578
1579 if choice.finish_reason != FinishReason::ToolCalls {
1580 let content = response.content().ok_or_else(|| {
1581 "completion returned empty content for structured payload".to_string()
1582 })?;
1583 let payload: Value = serde_json::from_str(content).map_err(|error| {
1584 format!("failed to parse structured completion JSON: {error}")
1585 })?;
1586 return Ok(YamlLlmExecutionResult {
1587 payload,
1588 usage: usage_total,
1589 ttft_ms: None,
1590 tool_calls: tool_traces,
1591 });
1592 }
1593
1594 if roundtrip >= request.max_tool_roundtrips {
1595 return Err(format!(
1596 "tool call roundtrip limit reached for node '{}' (max={})",
1597 request.node_id, request.max_tool_roundtrips
1598 ));
1599 }
1600
1601 let tool_calls: Vec<ToolCall> =
1602 choice.message.tool_calls.clone().ok_or_else(|| {
1603 "finish_reason=tool_calls but no tool calls found".to_string()
1604 })?;
1605
1606 conversation.push(choice.message.clone());
1607
1608 for tool_call in tool_calls {
1609 let tool_call_id = tool_call.id.clone();
1610 let tool_name = tool_call.function.name.clone();
1611 let tool_started = Instant::now();
1612 let arguments: Value = serde_json::from_str(&tool_call.function.arguments)
1613 .map_err(|error| {
1614 format!(
1615 "tool '{}' arguments must be valid JSON: {}",
1616 tool_name, error
1617 )
1618 })?;
1619
1620 if request.tool_trace_mode != YamlToolTraceMode::Off {
1621 if let Some(sink) = event_sink {
1622 sink.emit(&YamlWorkflowEvent {
1623 event_type: "node_tool_call_requested".to_string(),
1624 node_id: Some(request.node_id.clone()),
1625 step_id: Some(request.node_id.clone()),
1626 node_kind: Some("llm_call".to_string()),
1627 streamable: Some(false),
1628 message: Some(format!(
1629 "tool call requested: {}",
1630 tool_name
1631 )),
1632 delta: None,
1633 token_kind: None,
1634 is_terminal_node_token: None,
1635 elapsed_ms: None,
1636 metadata: Some(json!({
1637 "tool_call_id": tool_call_id.clone(),
1638 "tool_name": tool_name.clone(),
1639 "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
1640 })),
1641 });
1642 }
1643 }
1644
1645 let tool_output_result = if let Some(custom_worker) = self.custom_worker {
1646 custom_worker
1647 .execute(
1648 tool_name.as_str(),
1649 &arguments,
1650 request.email_text.as_str(),
1651 &request.execution_context,
1652 )
1653 .await
1654 } else {
1655 mock_custom_worker_output(tool_name.as_str(), &arguments)
1656 .map_err(|error| error.to_string())
1657 };
1658
1659 let Some(tool_config) = request
1660 .tools
1661 .iter()
1662 .find(|tool| tool.definition.function.name == tool_name)
1663 else {
1664 return Err(format!("model requested unknown tool '{}'", tool_name));
1665 };
1666
1667 let tool_output = match tool_output_result {
1668 Ok(output) => output,
1669 Err(message) => {
1670 let elapsed_ms = tool_started.elapsed().as_millis();
1671 if request.tool_trace_mode != YamlToolTraceMode::Off {
1672 if let Some(sink) = event_sink {
1673 sink.emit(&YamlWorkflowEvent {
1674 event_type: "node_tool_call_failed".to_string(),
1675 node_id: Some(request.node_id.clone()),
1676 step_id: Some(request.node_id.clone()),
1677 node_kind: Some("llm_call".to_string()),
1678 streamable: Some(false),
1679 message: Some(message.clone()),
1680 delta: None,
1681 token_kind: None,
1682 is_terminal_node_token: None,
1683 elapsed_ms: Some(elapsed_ms),
1684 metadata: Some(json!({
1685 "tool_call_id": tool_call_id.clone(),
1686 "tool_name": tool_name.clone(),
1687 })),
1688 });
1689 }
1690 }
1691 tool_traces.push(YamlToolCallTrace {
1692 id: tool_call_id.clone(),
1693 name: tool_name.clone(),
1694 arguments,
1695 output: None,
1696 status: "error".to_string(),
1697 elapsed_ms,
1698 error: Some(message.clone()),
1699 });
1700 return Err(format!("tool '{}' failed: {}", tool_name, message));
1701 }
1702 };
1703
1704 if let Some(output_schema) = tool_config.output_schema.as_ref() {
1705 validate_schema_instance(output_schema, &tool_output).map_err(
1706 |message| {
1707 format!(
1708 "tool '{}' output failed schema validation: {}",
1709 tool_name, message
1710 )
1711 },
1712 )?;
1713 }
1714
1715 let elapsed_ms = tool_started.elapsed().as_millis();
1716 if request.tool_trace_mode != YamlToolTraceMode::Off {
1717 if let Some(sink) = event_sink {
1718 sink.emit(&YamlWorkflowEvent {
1719 event_type: "node_tool_call_completed".to_string(),
1720 node_id: Some(request.node_id.clone()),
1721 step_id: Some(request.node_id.clone()),
1722 node_kind: Some("llm_call".to_string()),
1723 streamable: Some(false),
1724 message: Some(format!(
1725 "tool call completed: {}",
1726 tool_name
1727 )),
1728 delta: None,
1729 token_kind: None,
1730 is_terminal_node_token: None,
1731 elapsed_ms: Some(elapsed_ms),
1732 metadata: Some(json!({
1733 "tool_call_id": tool_call_id.clone(),
1734 "tool_name": tool_name.clone(),
1735 "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
1736 "output": payload_for_tool_trace(request.tool_trace_mode, &tool_output),
1737 })),
1738 });
1739 }
1740 }
1741
1742 tool_traces.push(YamlToolCallTrace {
1743 id: tool_call_id.clone(),
1744 name: tool_name.clone(),
1745 arguments: arguments.clone(),
1746 output: Some(tool_output.clone()),
1747 status: "ok".to_string(),
1748 elapsed_ms,
1749 error: None,
1750 });
1751
1752 conversation.push(Message::tool(
1753 serde_json::to_string(&tool_output).map_err(|error| {
1754 format!("failed to serialize tool output: {error}")
1755 })?,
1756 tool_call_id,
1757 ));
1758 }
1759
1760 if request.tool_trace_mode != YamlToolTraceMode::Off {
1761 if let Some(sink) = event_sink {
1762 sink.emit(&YamlWorkflowEvent {
1763 event_type: "node_tool_roundtrip_completed".to_string(),
1764 node_id: Some(request.node_id.clone()),
1765 step_id: Some(request.node_id.clone()),
1766 node_kind: Some("llm_call".to_string()),
1767 streamable: Some(false),
1768 message: Some(format!(
1769 "tool roundtrip {} completed",
1770 roundtrip + 1
1771 )),
1772 delta: None,
1773 token_kind: None,
1774 is_terminal_node_token: None,
1775 elapsed_ms: None,
1776 metadata: Some(json!({
1777 "roundtrip": roundtrip + 1,
1778 "max_tool_roundtrips": request.max_tool_roundtrips,
1779 })),
1780 });
1781 }
1782 }
1783 }
1784
1785 return Err(format!(
1786 "tool-enabled llm_call '{}' exhausted loop without final payload",
1787 request.node_id
1788 ));
1789 }
1790
1791 let mut builder = CompletionRequest::builder()
1792 .model(&request.model)
1793 .messages(messages);
1794
1795 if request.stream {
1796 builder = builder.stream(true);
1797 }
1798
1799 let completion_request = builder
1800 .build()
1801 .map_err(|error| format!("failed to build completion request: {error}"))?;
1802
1803 let completion_options = if request.heal && !request.stream {
1804 CompletionOptions {
1805 mode: CompletionMode::HealedJson,
1806 }
1807 } else {
1808 CompletionOptions::default()
1809 };
1810
1811 let outcome = self
1812 .client
1813 .complete(&completion_request, completion_options)
1814 .await
1815 .map_err(|error| error.to_string())?;
1816
1817 match outcome {
1818 CompletionOutcome::Stream(mut stream) => {
1819 let mut aggregated = String::new();
1820 let mut final_stream_usage: Option<simple_agent_type::response::Usage> = None;
1821 let stream_started = Instant::now();
1822 let mut ttft_ms: Option<u128> = None;
1823 let mut delta_filter = StructuredJsonDeltaFilter::default();
1824 let include_raw_debug = include_raw_stream_debug_events();
1825 let mut json_text_formatter = if request.stream_json_as_text {
1826 Some(StreamJsonAsTextFormatter::default())
1827 } else {
1828 None
1829 };
1830 while let Some(chunk_result) = stream.next().await {
1831 if event_sink_is_cancelled(event_sink) {
1832 return Err(workflow_event_sink_cancelled_message().to_string());
1833 }
1834 let chunk = chunk_result.map_err(|error| error.to_string())?;
1835 if let Some(usage) = chunk.usage {
1836 final_stream_usage = Some(usage);
1837 }
1838 if let Some(choice) = chunk.choices.first() {
1839 if ttft_ms.is_none()
1840 && (choice
1841 .delta
1842 .content
1843 .as_ref()
1844 .is_some_and(|delta| !delta.is_empty())
1845 || choice
1846 .delta
1847 .reasoning_content
1848 .as_ref()
1849 .is_some_and(|delta| !delta.is_empty()))
1850 {
1851 ttft_ms = Some(stream_started.elapsed().as_millis());
1852 }
1853 if include_raw_debug {
1854 if let Some(reasoning_delta) =
1855 choice.delta.reasoning_content.as_ref()
1856 {
1857 if let Some(sink) = event_sink {
1858 sink.emit(&YamlWorkflowEvent {
1859 event_type: "node_stream_thinking_delta".to_string(),
1860 node_id: Some(request.node_id.clone()),
1861 step_id: Some(request.node_id.clone()),
1862 node_kind: Some("llm_call".to_string()),
1863 streamable: Some(true),
1864 message: None,
1865 delta: Some(reasoning_delta.clone()),
1866 token_kind: Some(YamlWorkflowTokenKind::Thinking),
1867 is_terminal_node_token: Some(request.is_terminal_node),
1868 elapsed_ms: None,
1869 metadata: None,
1870 });
1871 }
1872 }
1873 }
1874 if let Some(delta) = choice.delta.content.clone() {
1875 aggregated.push_str(delta.as_str());
1876 let (output_delta, thinking_delta) =
1877 delta_filter.split(delta.as_str());
1878 let rendered_output_delta = if let Some(output_chunk) = output_delta
1879 {
1880 if let Some(formatter) = json_text_formatter.as_mut() {
1881 formatter.push(output_chunk.as_str());
1882 formatter.emit_if_ready(delta_filter.completed())
1883 } else {
1884 Some(output_chunk)
1885 }
1886 } else {
1887 None
1888 };
1889 if include_raw_debug {
1890 if let Some(sink) = event_sink {
1891 if let Some(raw_thinking_delta) = thinking_delta.as_ref() {
1892 sink.emit(&YamlWorkflowEvent {
1893 event_type: "node_stream_thinking_delta"
1894 .to_string(),
1895 node_id: Some(request.node_id.clone()),
1896 step_id: Some(request.node_id.clone()),
1897 node_kind: Some("llm_call".to_string()),
1898 streamable: Some(true),
1899 message: None,
1900 delta: Some(raw_thinking_delta.clone()),
1901 token_kind: Some(YamlWorkflowTokenKind::Thinking),
1902 is_terminal_node_token: Some(
1903 request.is_terminal_node,
1904 ),
1905 elapsed_ms: None,
1906 metadata: None,
1907 });
1908 }
1909 if let Some(raw_output_delta) =
1910 rendered_output_delta.as_ref()
1911 {
1912 sink.emit(&YamlWorkflowEvent {
1913 event_type: "node_stream_output_delta".to_string(),
1914 node_id: Some(request.node_id.clone()),
1915 step_id: Some(request.node_id.clone()),
1916 node_kind: Some("llm_call".to_string()),
1917 streamable: Some(true),
1918 message: None,
1919 delta: Some(raw_output_delta.clone()),
1920 token_kind: Some(YamlWorkflowTokenKind::Output),
1921 is_terminal_node_token: Some(
1922 request.is_terminal_node,
1923 ),
1924 elapsed_ms: None,
1925 metadata: None,
1926 });
1927 }
1928 }
1929 }
1930 if let Some(filtered_delta) = rendered_output_delta {
1931 if let Some(sink) = event_sink {
1932 sink.emit(&YamlWorkflowEvent {
1933 event_type: "node_stream_delta".to_string(),
1934 node_id: Some(request.node_id.clone()),
1935 step_id: Some(request.node_id.clone()),
1936 node_kind: Some("llm_call".to_string()),
1937 streamable: Some(true),
1938 message: None,
1939 delta: Some(filtered_delta),
1940 token_kind: Some(YamlWorkflowTokenKind::Output),
1941 is_terminal_node_token: Some(request.is_terminal_node),
1942 elapsed_ms: None,
1943 metadata: None,
1944 });
1945 }
1946 }
1947 }
1948 }
1949
1950 if event_sink_is_cancelled(event_sink) {
1951 return Err(workflow_event_sink_cancelled_message().to_string());
1952 }
1953 }
1954
1955 let resolved =
1956 parse_streamed_structured_payload(aggregated.as_str(), request.heal)?;
1957 if let Some(confidence) = resolved.heal_confidence {
1958 if let Some(sink) = event_sink {
1959 sink.emit(&YamlWorkflowEvent {
1960 event_type: "node_healed".to_string(),
1961 node_id: Some(request.node_id.clone()),
1962 step_id: Some(request.node_id.clone()),
1963 node_kind: Some("llm_call".to_string()),
1964 streamable: Some(true),
1965 message: Some(format!(
1966 "healed streamed structured response confidence={confidence}"
1967 )),
1968 delta: None,
1969 token_kind: None,
1970 is_terminal_node_token: None,
1971 elapsed_ms: None,
1972 metadata: None,
1973 });
1974 }
1975 }
1976
1977 Ok(YamlLlmExecutionResult {
1978 payload: resolved.payload,
1979 usage: final_stream_usage.map(|usage| YamlLlmTokenUsage {
1980 prompt_tokens: usage.prompt_tokens,
1981 completion_tokens: usage.completion_tokens,
1982 total_tokens: usage.total_tokens,
1983 thinking_tokens: None,
1984 }),
1985 ttft_ms,
1986 tool_calls: Vec::new(),
1987 })
1988 }
1989 CompletionOutcome::Response(response) => {
1990 let content = response
1991 .content()
1992 .ok_or_else(|| "completion returned empty content".to_string())?;
1993 let payload = serde_json::from_str(content).map_err(|error| {
1994 format!("failed to parse structured completion JSON: {error}")
1995 })?;
1996
1997 Ok(YamlLlmExecutionResult {
1998 payload,
1999 usage: Some(YamlLlmTokenUsage {
2000 prompt_tokens: response.usage.prompt_tokens,
2001 completion_tokens: response.usage.completion_tokens,
2002 total_tokens: response.usage.total_tokens,
2003 thinking_tokens: None,
2004 }),
2005 ttft_ms: None,
2006 tool_calls: Vec::new(),
2007 })
2008 }
2009 CompletionOutcome::HealedJson(healed) => {
2010 if let Some(sink) = event_sink {
2011 sink.emit(&YamlWorkflowEvent {
2012 event_type: "node_healed".to_string(),
2013 node_id: Some(request.node_id.clone()),
2014 step_id: Some(request.node_id.clone()),
2015 node_kind: Some("llm_call".to_string()),
2016 streamable: Some(request.stream),
2017 message: Some(format!(
2018 "healed structured response confidence={}",
2019 healed.parsed.confidence
2020 )),
2021 delta: None,
2022 token_kind: None,
2023 is_terminal_node_token: None,
2024 elapsed_ms: None,
2025 metadata: None,
2026 });
2027 }
2028 Ok(YamlLlmExecutionResult {
2029 payload: healed.parsed.value,
2030 usage: Some(YamlLlmTokenUsage {
2031 prompt_tokens: healed.response.usage.prompt_tokens,
2032 completion_tokens: healed.response.usage.completion_tokens,
2033 total_tokens: healed.response.usage.total_tokens,
2034 thinking_tokens: None,
2035 }),
2036 ttft_ms: None,
2037 tool_calls: Vec::new(),
2038 })
2039 }
2040 CompletionOutcome::CoercedSchema(coerced) => Ok(YamlLlmExecutionResult {
2041 payload: coerced.coerced.value,
2042 usage: Some(YamlLlmTokenUsage {
2043 prompt_tokens: coerced.response.usage.prompt_tokens,
2044 completion_tokens: coerced.response.usage.completion_tokens,
2045 total_tokens: coerced.response.usage.total_tokens,
2046 thinking_tokens: None,
2047 }),
2048 ttft_ms: None,
2049 tool_calls: Vec::new(),
2050 }),
2051 }
2052 }
2053 }
2054
2055 let executor = BorrowedClientExecutor {
2056 client,
2057 custom_worker,
2058 };
2059 run_workflow_yaml_with_custom_worker_and_events_and_options(
2060 workflow,
2061 workflow_input,
2062 &executor,
2063 custom_worker,
2064 event_sink,
2065 options,
2066 )
2067 .await
2068}
2069
2070pub async fn run_email_workflow_yaml_with_client_and_custom_worker_and_events(
2071 workflow: &YamlWorkflow,
2072 email_text: &str,
2073 client: &SimpleAgentsClient,
2074 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2075 event_sink: Option<&dyn YamlWorkflowEventSink>,
2076) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2077 let workflow_input = json!({ "email_text": email_text });
2078 run_workflow_yaml_with_client_and_custom_worker_and_events(
2079 workflow,
2080 &workflow_input,
2081 client,
2082 custom_worker,
2083 event_sink,
2084 )
2085 .await
2086}
2087
2088pub async fn run_workflow_yaml(
2089 workflow: &YamlWorkflow,
2090 workflow_input: &Value,
2091 executor: &dyn YamlWorkflowLlmExecutor,
2092) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2093 run_workflow_yaml_with_custom_worker_and_events(workflow, workflow_input, executor, None, None)
2094 .await
2095}
2096
2097pub async fn run_email_workflow_yaml(
2098 workflow: &YamlWorkflow,
2099 email_text: &str,
2100 executor: &dyn YamlWorkflowLlmExecutor,
2101) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2102 let workflow_input = json!({ "email_text": email_text });
2103 run_workflow_yaml(workflow, &workflow_input, executor).await
2104}
2105
2106pub async fn run_workflow_yaml_with_custom_worker(
2107 workflow: &YamlWorkflow,
2108 workflow_input: &Value,
2109 executor: &dyn YamlWorkflowLlmExecutor,
2110 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2111) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2112 run_workflow_yaml_with_custom_worker_and_events(
2113 workflow,
2114 workflow_input,
2115 executor,
2116 custom_worker,
2117 None,
2118 )
2119 .await
2120}
2121
2122pub async fn run_email_workflow_yaml_with_custom_worker(
2123 workflow: &YamlWorkflow,
2124 email_text: &str,
2125 executor: &dyn YamlWorkflowLlmExecutor,
2126 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2127) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2128 let workflow_input = json!({ "email_text": email_text });
2129 run_workflow_yaml_with_custom_worker(workflow, &workflow_input, executor, custom_worker).await
2130}
2131
2132pub async fn run_workflow_yaml_with_custom_worker_and_events(
2133 workflow: &YamlWorkflow,
2134 workflow_input: &Value,
2135 executor: &dyn YamlWorkflowLlmExecutor,
2136 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2137 event_sink: Option<&dyn YamlWorkflowEventSink>,
2138) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2139 run_workflow_yaml_with_custom_worker_and_events_and_options(
2140 workflow,
2141 workflow_input,
2142 executor,
2143 custom_worker,
2144 event_sink,
2145 &YamlWorkflowRunOptions::default(),
2146 )
2147 .await
2148}
2149
2150pub async fn run_workflow_yaml_with_custom_worker_and_events_and_options(
2151 workflow: &YamlWorkflow,
2152 workflow_input: &Value,
2153 executor: &dyn YamlWorkflowLlmExecutor,
2154 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2155 event_sink: Option<&dyn YamlWorkflowEventSink>,
2156 options: &YamlWorkflowRunOptions,
2157) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2158 if !workflow_input.is_object() {
2159 return Err(YamlWorkflowRunError::InvalidInput {
2160 message: "workflow input must be a JSON object".to_string(),
2161 });
2162 }
2163
2164 let email_text = workflow_input
2165 .get("email_text")
2166 .and_then(Value::as_str)
2167 .unwrap_or_default();
2168
2169 let diagnostics = verify_yaml_workflow(workflow);
2170 let errors: Vec<YamlWorkflowDiagnostic> = diagnostics
2171 .iter()
2172 .filter(|d| d.severity == YamlWorkflowDiagnosticSeverity::Error)
2173 .cloned()
2174 .collect();
2175 if !errors.is_empty() {
2176 return Err(YamlWorkflowRunError::Validation {
2177 diagnostics_count: errors.len(),
2178 diagnostics: errors,
2179 });
2180 }
2181
2182 if let Some(output) =
2183 try_run_yaml_via_ir_runtime(workflow, workflow_input, executor, custom_worker, options)
2184 .await?
2185 {
2186 return Ok(output);
2187 }
2188
2189 let tracer = workflow_tracer();
2190 let parent_trace_context = trace_context_from_options(options);
2191 let (workflow_trace_context, mut workflow_span) = tracer.start_span(
2192 "workflow.run",
2193 SpanKind::Workflow,
2194 parent_trace_context.as_ref(),
2195 );
2196 let trace_id = if options.telemetry.enabled {
2197 let value = resolve_trace_id(options, &workflow_trace_context);
2198 workflow_span.set_attribute("trace_id", value.as_str());
2199 apply_trace_tenant_attributes(workflow_span.as_mut(), options);
2200 Some(value)
2201 } else {
2202 None
2203 };
2204
2205 if workflow.nodes.is_empty() {
2206 return Err(YamlWorkflowRunError::EmptyNodes {
2207 workflow_id: workflow.id.clone(),
2208 });
2209 }
2210
2211 let node_map: HashMap<&str, &YamlNode> = workflow
2212 .nodes
2213 .iter()
2214 .map(|node| (node.id.as_str(), node))
2215 .collect();
2216 if !node_map.contains_key(workflow.entry_node.as_str()) {
2217 return Err(YamlWorkflowRunError::MissingEntry {
2218 entry_node: workflow.entry_node.clone(),
2219 });
2220 }
2221
2222 let edge_map: HashMap<&str, &str> = workflow
2223 .edges
2224 .iter()
2225 .map(|edge| (edge.from.as_str(), edge.to.as_str()))
2226 .collect();
2227
2228 let mut current = workflow.entry_node.clone();
2229 let mut trace = Vec::new();
2230 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
2231 let mut globals = serde_json::Map::new();
2232 let mut step_timings = Vec::new();
2233 let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
2234 let mut token_totals = YamlTokenTotals::default();
2235 let mut workflow_ttft_ms: Option<u128> = None;
2236 let started = Instant::now();
2237
2238 if let Some(sink) = event_sink {
2239 sink.emit(&YamlWorkflowEvent {
2240 event_type: "workflow_started".to_string(),
2241 node_id: None,
2242 step_id: None,
2243 node_kind: None,
2244 streamable: None,
2245 message: Some(format!("workflow_id={}", workflow.id)),
2246 delta: None,
2247 token_kind: None,
2248 is_terminal_node_token: None,
2249 elapsed_ms: Some(0),
2250 metadata: None,
2251 });
2252 }
2253
2254 if event_sink_is_cancelled(event_sink) {
2255 return Err(YamlWorkflowRunError::EventSinkCancelled {
2256 message: workflow_event_sink_cancelled_message().to_string(),
2257 });
2258 }
2259
2260 loop {
2261 if event_sink_is_cancelled(event_sink) {
2262 return Err(YamlWorkflowRunError::EventSinkCancelled {
2263 message: workflow_event_sink_cancelled_message().to_string(),
2264 });
2265 }
2266
2267 let node =
2268 *node_map
2269 .get(current.as_str())
2270 .ok_or_else(|| YamlWorkflowRunError::MissingNode {
2271 node_id: current.clone(),
2272 })?;
2273
2274 trace.push(node.id.clone());
2275 let step_started = Instant::now();
2276
2277 let mut node_span = if options.telemetry.enabled {
2278 let (_, mut span) = tracer.start_span(
2279 "workflow.node.execute",
2280 SpanKind::Node,
2281 Some(&workflow_trace_context),
2282 );
2283 span.set_attribute("trace_id", trace_id.as_deref().unwrap_or_default());
2284 span.set_attribute("node_id", node.id.as_str());
2285 span.set_attribute("node_kind", node.kind_name());
2286 Some(span)
2287 } else {
2288 None
2289 };
2290
2291 let node_streamable = node
2292 .node_type
2293 .llm_call
2294 .as_ref()
2295 .map(|llm| llm.stream.unwrap_or(false) && !llm.heal.unwrap_or(false));
2296 let workflow_elapsed_before_node_ms = started.elapsed().as_millis();
2297
2298 if let Some(sink) = event_sink {
2299 sink.emit(&YamlWorkflowEvent {
2300 event_type: "node_started".to_string(),
2301 node_id: Some(node.id.clone()),
2302 step_id: Some(node.id.clone()),
2303 node_kind: Some(node.kind_name().to_string()),
2304 streamable: node_streamable,
2305 message: if node_streamable == Some(false) {
2306 Some("Node is not streamable; status events only".to_string())
2307 } else {
2308 None
2309 },
2310 delta: None,
2311 token_kind: None,
2312 is_terminal_node_token: None,
2313 elapsed_ms: Some(workflow_elapsed_before_node_ms),
2314 metadata: None,
2315 });
2316 }
2317
2318 if event_sink_is_cancelled(event_sink) {
2319 return Err(YamlWorkflowRunError::EventSinkCancelled {
2320 message: workflow_event_sink_cancelled_message().to_string(),
2321 });
2322 }
2323
2324 let mut node_usage: Option<YamlLlmTokenUsage> = None;
2325 let is_terminal_node = !edge_map.contains_key(node.id.as_str());
2326 let next = if let Some(llm) = &node.node_type.llm_call {
2327 let prompt_template = node
2328 .config
2329 .as_ref()
2330 .and_then(|cfg| cfg.prompt.as_deref())
2331 .unwrap_or_default();
2332 let context = json!({
2333 "input": workflow_input,
2334 "nodes": outputs,
2335 "globals": Value::Object(globals.clone())
2336 });
2337 let messages = if let Some(path) = llm.messages_path.as_deref() {
2338 Some(
2339 parse_messages_from_context(path, &context).map_err(|message| {
2340 YamlWorkflowRunError::Llm {
2341 node_id: node.id.clone(),
2342 message,
2343 }
2344 })?,
2345 )
2346 } else {
2347 None
2348 };
2349 let prompt_bindings = collect_template_bindings(prompt_template, &context);
2350 let prompt = interpolate_template(prompt_template, &context);
2351 let schema = llm_output_schema_for_node(node);
2352
2353 let request = YamlLlmExecutionRequest {
2354 node_id: node.id.clone(),
2355 is_terminal_node,
2356 stream_json_as_text: llm.stream_json_as_text.unwrap_or(false),
2357 model: llm.model.clone(),
2358 messages,
2359 append_prompt_as_user: llm.append_prompt_as_user.unwrap_or(true),
2360 prompt,
2361 prompt_template: prompt_template.to_string(),
2362 prompt_bindings,
2363 schema,
2364 stream: llm.stream.unwrap_or(false),
2365 heal: llm.heal.unwrap_or(false),
2366 tools: normalize_llm_tools(llm).map_err(|message| YamlWorkflowRunError::Llm {
2367 node_id: node.id.clone(),
2368 message,
2369 })?,
2370 tool_choice: normalize_tool_choice(llm.tool_choice.clone()).map_err(|message| {
2371 YamlWorkflowRunError::Llm {
2372 node_id: node.id.clone(),
2373 message,
2374 }
2375 })?,
2376 max_tool_roundtrips: llm.max_tool_roundtrips.unwrap_or(1),
2377 tool_calls_global_key: llm.tool_calls_global_key.clone(),
2378 tool_trace_mode: options.telemetry.tool_trace_mode,
2379 execution_context: context.clone(),
2380 email_text: email_text.to_string(),
2381 };
2382
2383 if let Some(span) = node_span.as_mut() {
2384 span.set_attribute(
2385 "node_input",
2386 payload_for_span(options.telemetry.payload_mode, &context).as_str(),
2387 );
2388 }
2389
2390 if let Some(sink) = event_sink {
2391 sink.emit(&YamlWorkflowEvent {
2392 event_type: "node_llm_input_resolved".to_string(),
2393 node_id: Some(node.id.clone()),
2394 step_id: Some(node.id.clone()),
2395 node_kind: Some("llm_call".to_string()),
2396 streamable: Some(request.stream),
2397 message: Some("resolved llm input for telemetry".to_string()),
2398 delta: None,
2399 token_kind: None,
2400 is_terminal_node_token: None,
2401 elapsed_ms: Some(started.elapsed().as_millis()),
2402 metadata: Some(json!({
2403 "model": request.model.clone(),
2404 "stream_requested": request.stream,
2405 "stream_json_as_text": request.stream_json_as_text,
2406 "heal_requested": request.heal,
2407 "effective_stream": request.stream,
2408 "prompt_template": request.prompt_template.clone(),
2409 "prompt": request.prompt.clone(),
2410 "schema": request.schema.clone(),
2411 "bindings": request.prompt_bindings.clone(),
2412 "tools_count": request.tools.len(),
2413 "max_tool_roundtrips": request.max_tool_roundtrips,
2414 })),
2415 });
2416 }
2417
2418 if event_sink_is_cancelled(event_sink) {
2419 return Err(YamlWorkflowRunError::EventSinkCancelled {
2420 message: workflow_event_sink_cancelled_message().to_string(),
2421 });
2422 }
2423
2424 let llm_result = executor
2425 .complete_structured(request, event_sink)
2426 .await
2427 .map_err(|message| YamlWorkflowRunError::Llm {
2428 node_id: node.id.clone(),
2429 message,
2430 })?;
2431
2432 if let Some(usage) = llm_result.usage.as_ref() {
2433 token_totals.add_usage(usage);
2434 }
2435 if workflow_ttft_ms.is_none() {
2436 workflow_ttft_ms = llm_result
2437 .ttft_ms
2438 .map(|node_ttft_ms| workflow_elapsed_before_node_ms + node_ttft_ms);
2439 }
2440 node_usage = llm_result.usage;
2441
2442 let payload = llm_result.payload;
2443 let tool_calls = llm_result.tool_calls;
2444
2445 if !payload.is_object() {
2446 return Err(YamlWorkflowRunError::LlmPayloadNotObject {
2447 node_id: node.id.clone(),
2448 });
2449 }
2450
2451 let mut node_output = json!({ "output": payload });
2452 if !tool_calls.is_empty() {
2453 if let Some(output_obj) = node_output.as_object_mut() {
2454 output_obj.insert("tool_calls".to_string(), json!(tool_calls));
2455 }
2456 }
2457 outputs.insert(node.id.clone(), node_output);
2458 if let Some(span) = node_span.as_mut() {
2459 if let Some(output_payload) = outputs.get(node.id.as_str()) {
2460 span.set_attribute(
2461 "node_output",
2462 payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
2463 );
2464 }
2465 }
2466 apply_set_globals(node, &outputs, workflow_input, &mut globals);
2467 apply_update_globals(node, &outputs, workflow_input, &mut globals);
2468 if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
2469 if let Some(node_tool_calls) = outputs
2470 .get(node.id.as_str())
2471 .and_then(|value| value.get("tool_calls"))
2472 .cloned()
2473 {
2474 globals.insert(global_key.clone(), node_tool_calls);
2475 }
2476 }
2477 edge_map
2478 .get(node.id.as_str())
2479 .map(|value| value.to_string())
2480 } else if let Some(switch) = &node.node_type.switch {
2481 let context = json!({
2482 "input": workflow_input,
2483 "nodes": outputs,
2484 "globals": Value::Object(globals.clone())
2485 });
2486 let mut chosen = Some(switch.default.clone());
2487 for branch in &switch.branches {
2488 if evaluate_switch_condition(branch.condition.as_str(), &context)? {
2489 chosen = Some(branch.target.clone());
2490 break;
2491 }
2492 }
2493 let chosen = chosen.ok_or_else(|| YamlWorkflowRunError::InvalidSwitchTarget {
2494 node_id: node.id.clone(),
2495 })?;
2496 Some(chosen)
2497 } else if let Some(custom) = &node.node_type.custom_worker {
2498 let payload = node
2499 .config
2500 .as_ref()
2501 .and_then(|cfg| cfg.payload.as_ref())
2502 .cloned()
2503 .unwrap_or_else(|| json!({}));
2504 let context = json!({
2505 "input": workflow_input,
2506 "nodes": outputs,
2507 "globals": Value::Object(globals.clone())
2508 });
2509
2510 if let Some(span) = node_span.as_mut() {
2511 span.set_attribute("handler_name", custom.handler.as_str());
2512 span.set_attribute(
2513 "node_input",
2514 payload_for_span(options.telemetry.payload_mode, &payload).as_str(),
2515 );
2516 }
2517
2518 let mut handler_span_context: Option<TraceContext> = None;
2519 let mut handler_span = if options.telemetry.enabled {
2520 let (span_context, mut span) = tracer.start_span(
2521 "handler.invoke",
2522 SpanKind::Node,
2523 Some(&workflow_trace_context),
2524 );
2525 handler_span_context = Some(span_context);
2526 span.set_attribute("trace_id", trace_id.as_deref().unwrap_or_default());
2527 span.set_attribute("handler_name", custom.handler.as_str());
2528 apply_trace_tenant_attributes(span.as_mut(), options);
2529 Some(span)
2530 } else {
2531 None
2532 };
2533
2534 let worker_trace_context = merged_trace_context_for_worker(
2535 handler_span_context.as_ref(),
2536 trace_id.as_deref(),
2537 options,
2538 );
2539 let worker_context = custom_worker_context_with_trace(
2540 &context,
2541 &worker_trace_context,
2542 &options.trace.tenant,
2543 );
2544
2545 let worker_output_result = if let Some(custom_worker_executor) = custom_worker {
2546 custom_worker_executor
2547 .execute(
2548 custom.handler.as_str(),
2549 &payload,
2550 email_text,
2551 &worker_context,
2552 )
2553 .await
2554 .map_err(|message| YamlWorkflowRunError::CustomWorker {
2555 node_id: node.id.clone(),
2556 message,
2557 })
2558 } else {
2559 mock_custom_worker_output(custom.handler.as_str(), &payload)
2560 };
2561
2562 if let Some(span) = handler_span.take() {
2563 span.end();
2564 }
2565
2566 let worker_output = worker_output_result?;
2567
2568 outputs.insert(node.id.clone(), json!({ "output": worker_output }));
2569 if let Some(span) = node_span.as_mut() {
2570 if let Some(output_payload) = outputs.get(node.id.as_str()) {
2571 span.set_attribute(
2572 "node_output",
2573 payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
2574 );
2575 }
2576 }
2577 apply_set_globals(node, &outputs, workflow_input, &mut globals);
2578 apply_update_globals(node, &outputs, workflow_input, &mut globals);
2579 edge_map
2580 .get(node.id.as_str())
2581 .map(|value| value.to_string())
2582 } else {
2583 return Err(YamlWorkflowRunError::UnsupportedNodeType {
2584 node_id: node.id.clone(),
2585 });
2586 };
2587
2588 let node_kind = node.kind_name().to_string();
2589 let elapsed_ms = step_started.elapsed().as_millis();
2590 step_timings.push(YamlStepTiming {
2591 node_id: node.id.clone(),
2592 node_kind,
2593 elapsed_ms,
2594 prompt_tokens: node_usage.as_ref().map(|usage| usage.prompt_tokens),
2595 completion_tokens: node_usage.as_ref().map(|usage| usage.completion_tokens),
2596 total_tokens: node_usage.as_ref().map(|usage| usage.total_tokens),
2597 thinking_tokens: node_usage.as_ref().and_then(|usage| usage.thinking_tokens),
2598 tokens_per_second: node_usage
2599 .as_ref()
2600 .map(|usage| completion_tokens_per_second(usage.completion_tokens, elapsed_ms)),
2601 });
2602
2603 if let Some(usage) = node_usage.as_ref() {
2604 llm_node_metrics.insert(
2605 node.id.clone(),
2606 YamlLlmNodeMetrics {
2607 elapsed_ms,
2608 prompt_tokens: usage.prompt_tokens,
2609 completion_tokens: usage.completion_tokens,
2610 total_tokens: usage.total_tokens,
2611 thinking_tokens: usage.thinking_tokens,
2612 tokens_per_second: completion_tokens_per_second(
2613 usage.completion_tokens,
2614 elapsed_ms,
2615 ),
2616 },
2617 );
2618 }
2619
2620 if let Some(mut span) = node_span.take() {
2621 span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
2622 span.add_event("node_completed");
2623 span.end();
2624 }
2625
2626 if let Some(sink) = event_sink {
2627 sink.emit(&YamlWorkflowEvent {
2628 event_type: "node_completed".to_string(),
2629 node_id: Some(node.id.clone()),
2630 step_id: Some(node.id.clone()),
2631 node_kind: Some(node.kind_name().to_string()),
2632 streamable: node_streamable,
2633 message: None,
2634 delta: None,
2635 token_kind: None,
2636 is_terminal_node_token: None,
2637 elapsed_ms: Some(elapsed_ms),
2638 metadata: None,
2639 });
2640 }
2641
2642 if event_sink_is_cancelled(event_sink) {
2643 return Err(YamlWorkflowRunError::EventSinkCancelled {
2644 message: workflow_event_sink_cancelled_message().to_string(),
2645 });
2646 }
2647
2648 if let Some(next) = next {
2649 current = next;
2650 continue;
2651 }
2652 break;
2653 }
2654
2655 let terminal_node = trace
2656 .last()
2657 .cloned()
2658 .ok_or_else(|| YamlWorkflowRunError::EmptyNodes {
2659 workflow_id: workflow.id.clone(),
2660 })?;
2661
2662 let terminal_output = outputs
2663 .get(terminal_node.as_str())
2664 .and_then(|value| value.get("output"))
2665 .cloned();
2666
2667 let total_elapsed_ms = started.elapsed().as_millis();
2668 let output = YamlWorkflowRunOutput {
2669 workflow_id: workflow.id.clone(),
2670 entry_node: workflow.entry_node.clone(),
2671 email_text: email_text.to_string(),
2672 trace,
2673 outputs,
2674 terminal_node,
2675 terminal_output,
2676 step_timings,
2677 llm_node_metrics,
2678 total_elapsed_ms,
2679 ttft_ms: workflow_ttft_ms,
2680 total_input_tokens: token_totals.input_tokens,
2681 total_output_tokens: token_totals.output_tokens,
2682 total_tokens: token_totals.total_tokens,
2683 total_thinking_tokens: token_totals.thinking_tokens,
2684 tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
2685 trace_id: trace_id.clone(),
2686 metadata: trace_id
2687 .as_ref()
2688 .map(|value| workflow_metadata_with_trace(options, value)),
2689 };
2690
2691 if let Some(sink) = event_sink {
2692 let event_metadata = if options.telemetry.nerdstats {
2693 Some(json!({
2694 "nerdstats": workflow_nerdstats(&output),
2695 }))
2696 } else {
2697 None
2698 };
2699 sink.emit(&YamlWorkflowEvent {
2700 event_type: "workflow_completed".to_string(),
2701 node_id: None,
2702 step_id: None,
2703 node_kind: None,
2704 streamable: None,
2705 message: Some(format!("terminal_node={}", output.terminal_node)),
2706 delta: None,
2707 token_kind: None,
2708 is_terminal_node_token: None,
2709 elapsed_ms: Some(output.total_elapsed_ms),
2710 metadata: event_metadata,
2711 });
2712 }
2713
2714 if event_sink_is_cancelled(event_sink) {
2715 return Err(YamlWorkflowRunError::EventSinkCancelled {
2716 message: workflow_event_sink_cancelled_message().to_string(),
2717 });
2718 }
2719
2720 workflow_span.set_attribute("workflow_id", workflow.id.as_str());
2721 if let Some(trace_id_value) = trace_id.as_ref() {
2722 workflow_span.set_attribute("trace_id", trace_id_value.as_str());
2723 }
2724 workflow_span.end();
2725
2726 Ok(output)
2727}
2728
2729async fn try_run_yaml_via_ir_runtime(
2730 workflow: &YamlWorkflow,
2731 workflow_input: &Value,
2732 executor: &dyn YamlWorkflowLlmExecutor,
2733 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2734 options: &YamlWorkflowRunOptions,
2735) -> Result<Option<YamlWorkflowRunOutput>, YamlWorkflowRunError> {
2736 let ir = match yaml_workflow_to_ir(workflow) {
2737 Ok(def) => def,
2738 Err(YamlToIrError::UnsupportedNode { .. })
2739 | Err(YamlToIrError::MultipleOutgoingEdge { .. }) => return Ok(None),
2740 Err(err) => {
2741 return Err(YamlWorkflowRunError::InvalidInput {
2742 message: err.to_string(),
2743 });
2744 }
2745 };
2746
2747 let tracer = workflow_tracer();
2748 let parent_trace_context = trace_context_from_options(options);
2749 let (workflow_trace_context, mut workflow_span) = tracer.start_span(
2750 "workflow.run",
2751 SpanKind::Workflow,
2752 parent_trace_context.as_ref(),
2753 );
2754 let trace_id = if options.telemetry.enabled {
2755 let value = resolve_trace_id(options, &workflow_trace_context);
2756 workflow_span.set_attribute("trace_id", value.as_str());
2757 apply_trace_tenant_attributes(workflow_span.as_mut(), options);
2758 Some(value)
2759 } else {
2760 None
2761 };
2762
2763 struct NoopLlm;
2764 #[async_trait]
2765 impl LlmExecutor for NoopLlm {
2766 async fn execute(
2767 &self,
2768 _input: LlmExecutionInput,
2769 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
2770 Err(LlmExecutionError::UnexpectedOutcome(
2771 "yaml_ir_uses_tool_path",
2772 ))
2773 }
2774 }
2775
2776 struct YamlIrToolExecutor<'a> {
2777 llm_executor: &'a dyn YamlWorkflowLlmExecutor,
2778 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
2779 token_totals: std::sync::Mutex<YamlTokenTotals>,
2780 node_usage: std::sync::Mutex<BTreeMap<String, YamlLlmTokenUsage>>,
2781 trace_id: Option<String>,
2782 trace_context: Option<TraceContext>,
2783 trace_input_context: Option<YamlWorkflowTraceContextInput>,
2784 tenant_context: YamlWorkflowTraceTenantContext,
2785 payload_mode: YamlWorkflowPayloadMode,
2786 }
2787
2788 #[async_trait]
2789 impl ToolExecutor for YamlIrToolExecutor<'_> {
2790 async fn execute_tool(
2791 &self,
2792 input: ToolExecutionInput,
2793 ) -> Result<Value, ToolExecutionError> {
2794 let context = build_yaml_context_from_ir_scope(&input.scoped_input);
2795
2796 if input.tool == YAML_LLM_TOOL_ID {
2797 let node_id = input
2798 .input
2799 .get("node_id")
2800 .and_then(Value::as_str)
2801 .ok_or_else(|| {
2802 ToolExecutionError::Failed("yaml llm call missing node_id".to_string())
2803 })?
2804 .to_string();
2805 let node_id_for_metrics = node_id.clone();
2806 let model = input
2807 .input
2808 .get("model")
2809 .and_then(Value::as_str)
2810 .ok_or_else(|| {
2811 ToolExecutionError::Failed("yaml llm call missing model".to_string())
2812 })?
2813 .to_string();
2814 let prompt_template = input
2815 .input
2816 .get("prompt_template")
2817 .and_then(Value::as_str)
2818 .unwrap_or_default()
2819 .to_string();
2820 let stream = input
2821 .input
2822 .get("stream")
2823 .and_then(Value::as_bool)
2824 .unwrap_or(false);
2825 let heal = input
2826 .input
2827 .get("heal")
2828 .and_then(Value::as_bool)
2829 .unwrap_or(false);
2830 let append_prompt_as_user = input
2831 .input
2832 .get("append_prompt_as_user")
2833 .and_then(Value::as_bool)
2834 .unwrap_or(true);
2835 let messages_path = input
2836 .input
2837 .get("messages_path")
2838 .and_then(Value::as_str)
2839 .map(str::to_string);
2840
2841 let messages = if let Some(path) = messages_path.as_deref() {
2842 Some(
2843 parse_messages_from_context(path, &context)
2844 .map_err(ToolExecutionError::Failed)?,
2845 )
2846 } else {
2847 None
2848 };
2849
2850 let prompt_bindings = collect_template_bindings(&prompt_template, &context);
2851 let prompt = interpolate_template(&prompt_template, &context);
2852 let email_text = context
2853 .get("input")
2854 .and_then(|v| v.get("email_text"))
2855 .and_then(Value::as_str)
2856 .unwrap_or_default();
2857 let schema = input
2858 .input
2859 .get("output_schema")
2860 .cloned()
2861 .unwrap_or_else(default_llm_output_schema);
2862
2863 let request = YamlLlmExecutionRequest {
2864 node_id,
2865 is_terminal_node: false,
2866 stream_json_as_text: input
2867 .input
2868 .get("stream_json_as_text")
2869 .and_then(Value::as_bool)
2870 .unwrap_or(false),
2871 model,
2872 messages,
2873 append_prompt_as_user,
2874 prompt,
2875 prompt_template,
2876 prompt_bindings,
2877 schema,
2878 stream,
2879 heal,
2880 tools: Vec::new(),
2881 tool_choice: None,
2882 max_tool_roundtrips: 1,
2883 tool_calls_global_key: None,
2884 tool_trace_mode: YamlToolTraceMode::Off,
2885 execution_context: context.clone(),
2886 email_text: email_text.to_string(),
2887 };
2888
2889 let llm_result = self
2890 .llm_executor
2891 .complete_structured(request, None)
2892 .await
2893 .map_err(ToolExecutionError::Failed);
2894
2895 if let Ok(ref result) = llm_result {
2896 if let Some(usage) = result.usage.as_ref() {
2897 if let Ok(mut totals) = self.token_totals.lock() {
2898 totals.add_usage(usage);
2899 }
2900 if let Ok(mut usage_map) = self.node_usage.lock() {
2901 usage_map.insert(node_id_for_metrics, usage.clone());
2902 }
2903 }
2904 }
2905
2906 return llm_result.map(|result| result.payload);
2907 }
2908
2909 let worker = self
2910 .custom_worker
2911 .ok_or_else(|| ToolExecutionError::NotFound {
2912 tool: input.tool.clone(),
2913 })?;
2914
2915 let payload = input.input.clone();
2916 let email_text = context
2917 .get("input")
2918 .and_then(|v| v.get("email_text"))
2919 .and_then(Value::as_str)
2920 .unwrap_or_default();
2921
2922 let tracer = workflow_tracer();
2923 let mut handler_span_context: Option<TraceContext> = None;
2924 let mut handler_span = if self.trace_id.is_some() {
2925 let (span_context, mut span) = tracer.start_span(
2926 "handler.invoke",
2927 SpanKind::Node,
2928 self.trace_context.as_ref(),
2929 );
2930 handler_span_context = Some(span_context);
2931 if let Some(trace_id) = self.trace_id.as_ref() {
2932 span.set_attribute("trace_id", trace_id.as_str());
2933 }
2934 span.set_attribute("handler_name", input.tool.as_str());
2935 if let Some(workspace_id) = self.tenant_context.workspace_id.as_deref() {
2936 span.set_attribute("tenant.workspace_id", workspace_id);
2937 }
2938 if let Some(user_id) = self.tenant_context.user_id.as_deref() {
2939 span.set_attribute("tenant.user_id", user_id);
2940 }
2941 if let Some(conversation_id) = self.tenant_context.conversation_id.as_deref() {
2942 span.set_attribute("tenant.conversation_id", conversation_id);
2943 }
2944 if let Some(request_id) = self.tenant_context.request_id.as_deref() {
2945 span.set_attribute("tenant.request_id", request_id);
2946 }
2947 if let Some(run_id) = self.tenant_context.run_id.as_deref() {
2948 span.set_attribute("tenant.run_id", run_id);
2949 }
2950 span.set_attribute(
2951 "node_input",
2952 payload_for_span(self.payload_mode, &payload).as_str(),
2953 );
2954 Some(span)
2955 } else {
2956 None
2957 };
2958
2959 let trace_options = YamlWorkflowRunOptions {
2960 telemetry: YamlWorkflowTelemetryConfig::default(),
2961 trace: YamlWorkflowTraceOptions {
2962 context: self.trace_input_context.clone(),
2963 tenant: self.tenant_context.clone(),
2964 },
2965 };
2966 let worker_trace_context = merged_trace_context_for_worker(
2967 handler_span_context.as_ref(),
2968 self.trace_id.as_deref(),
2969 &trace_options,
2970 );
2971 let worker_context = custom_worker_context_with_trace(
2972 &context,
2973 &worker_trace_context,
2974 &self.tenant_context,
2975 );
2976
2977 let output_result = worker
2978 .execute(&input.tool, &payload, email_text, &worker_context)
2979 .await
2980 .map_err(ToolExecutionError::Failed);
2981
2982 if let Some(span) = handler_span.as_mut() {
2983 if output_result.is_ok() {
2984 span.add_event("handler.success");
2985 } else {
2986 span.add_event("handler.error");
2987 }
2988 }
2989
2990 if let Some(span) = handler_span.take() {
2991 span.end();
2992 }
2993
2994 output_result
2995 }
2996 }
2997
2998 let tool_executor = YamlIrToolExecutor {
2999 llm_executor: executor,
3000 custom_worker,
3001 token_totals: std::sync::Mutex::new(YamlTokenTotals::default()),
3002 node_usage: std::sync::Mutex::new(BTreeMap::new()),
3003 trace_id: trace_id.clone(),
3004 trace_context: trace_id.as_ref().map(|_| workflow_trace_context.clone()),
3005 trace_input_context: options.trace.context.clone(),
3006 tenant_context: options.trace.tenant.clone(),
3007 payload_mode: options.telemetry.payload_mode,
3008 };
3009
3010 let runtime = WorkflowRuntime::new(
3011 ir,
3012 &NoopLlm,
3013 Some(&tool_executor),
3014 WorkflowRuntimeOptions::default(),
3015 );
3016
3017 let started = Instant::now();
3018 let result = match runtime.execute(workflow_input.clone(), None).await {
3019 Ok(result) => result,
3020 Err(WorkflowRuntimeError::Validation(_)) => return Ok(None),
3021 Err(error) => {
3022 return Err(YamlWorkflowRunError::IrRuntime {
3023 message: error.to_string(),
3024 });
3025 }
3026 };
3027 let total_elapsed_ms = started.elapsed().as_millis();
3028
3029 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
3030 for (node_id, output) in result.node_outputs {
3031 if node_id == YAML_START_NODE_ID {
3032 continue;
3033 }
3034 outputs.insert(node_id, json!({"output": output}));
3035 }
3036
3037 let mut trace = Vec::new();
3038 let mut step_timings = Vec::new();
3039 let node_usage_map = tool_executor
3040 .node_usage
3041 .lock()
3042 .map(|usage| usage.clone())
3043 .unwrap_or_default();
3044 let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
3045 for execution in result.node_executions {
3046 if execution.node_id == YAML_START_NODE_ID {
3047 continue;
3048 }
3049 trace.push(execution.node_id.clone());
3050 let usage = node_usage_map.get(&execution.node_id);
3051 if let Some(usage) = usage {
3052 llm_node_metrics.insert(
3053 execution.node_id.clone(),
3054 YamlLlmNodeMetrics {
3055 elapsed_ms: 0,
3056 prompt_tokens: usage.prompt_tokens,
3057 completion_tokens: usage.completion_tokens,
3058 total_tokens: usage.total_tokens,
3059 thinking_tokens: usage.thinking_tokens,
3060 tokens_per_second: completion_tokens_per_second(usage.completion_tokens, 0),
3061 },
3062 );
3063 }
3064 step_timings.push(YamlStepTiming {
3065 node_id: execution.node_id,
3066 node_kind: "ir_runtime".to_string(),
3067 elapsed_ms: 0,
3068 prompt_tokens: usage.map(|value| value.prompt_tokens),
3069 completion_tokens: usage.map(|value| value.completion_tokens),
3070 total_tokens: usage.map(|value| value.total_tokens),
3071 thinking_tokens: usage.and_then(|value| value.thinking_tokens),
3072 tokens_per_second: usage
3073 .map(|value| completion_tokens_per_second(value.completion_tokens, 0)),
3074 });
3075 }
3076
3077 let terminal_node = result.terminal_node_id;
3078 let terminal_output = outputs
3079 .get(&terminal_node)
3080 .and_then(|v| v.get("output"))
3081 .cloned();
3082
3083 let email_text = workflow_input
3084 .get("email_text")
3085 .and_then(Value::as_str)
3086 .unwrap_or_default()
3087 .to_string();
3088
3089 let token_totals = tool_executor
3090 .token_totals
3091 .lock()
3092 .map(|totals| totals.clone())
3093 .unwrap_or_default();
3094
3095 workflow_span.set_attribute("workflow_id", workflow.id.as_str());
3096 workflow_span.end();
3097
3098 Ok(Some(YamlWorkflowRunOutput {
3099 workflow_id: workflow.id.clone(),
3100 entry_node: workflow.entry_node.clone(),
3101 email_text,
3102 trace,
3103 outputs,
3104 terminal_node,
3105 terminal_output,
3106 step_timings,
3107 llm_node_metrics,
3108 total_elapsed_ms,
3109 ttft_ms: None,
3110 total_input_tokens: token_totals.input_tokens,
3111 total_output_tokens: token_totals.output_tokens,
3112 total_tokens: token_totals.total_tokens,
3113 total_thinking_tokens: token_totals.thinking_tokens,
3114 tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
3115 trace_id: trace_id.clone(),
3116 metadata: trace_id
3117 .as_ref()
3118 .map(|value| workflow_metadata_with_trace(options, value)),
3119 }))
3120}
3121
3122fn build_yaml_context_from_ir_scope(scoped_input: &Value) -> Value {
3123 let input = scoped_input.get("input").cloned().unwrap_or(Value::Null);
3124
3125 let mut nodes = serde_json::Map::new();
3126 if let Some(node_outputs) = scoped_input.get("node_outputs").and_then(Value::as_object) {
3127 for (node_id, output) in node_outputs {
3128 nodes.insert(node_id.clone(), json!({"output": output.clone()}));
3129 }
3130 }
3131
3132 json!({
3133 "input": input,
3134 "nodes": Value::Object(nodes),
3135 "globals": Value::Object(serde_json::Map::new())
3136 })
3137}
3138
3139pub async fn run_email_workflow_yaml_with_custom_worker_and_events(
3140 workflow: &YamlWorkflow,
3141 email_text: &str,
3142 executor: &dyn YamlWorkflowLlmExecutor,
3143 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3144 event_sink: Option<&dyn YamlWorkflowEventSink>,
3145) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3146 let workflow_input = json!({ "email_text": email_text });
3147 run_workflow_yaml_with_custom_worker_and_events(
3148 workflow,
3149 &workflow_input,
3150 executor,
3151 custom_worker,
3152 event_sink,
3153 )
3154 .await
3155}
3156
3157fn evaluate_switch_condition(
3158 condition: &str,
3159 context: &Value,
3160) -> Result<bool, YamlWorkflowRunError> {
3161 let (left, right) =
3162 condition
3163 .split_once("==")
3164 .ok_or_else(|| YamlWorkflowRunError::UnsupportedCondition {
3165 condition: condition.to_string(),
3166 })?;
3167
3168 let left_path = left.trim().trim_start_matches("$.");
3169 let right_literal = right.trim().trim_matches('"').trim_matches('\'');
3170 let left_value = resolve_path(context, left_path);
3171 Ok(left_value
3172 .and_then(Value::as_str)
3173 .map(|value| value == right_literal)
3174 .unwrap_or(false))
3175}
3176
3177fn parse_messages_from_context(path: &str, context: &Value) -> Result<Vec<Message>, String> {
3178 let normalized_path = path.trim().trim_start_matches("$.");
3179 let value = resolve_path(context, normalized_path)
3180 .ok_or_else(|| format!("messages_path not found: {path}"))?;
3181 let list: Vec<WorkflowMessage> = serde_json::from_value(value.clone()).map_err(|err| {
3182 format!("messages_path must resolve to a list of messages: {path}; {err}")
3183 })?;
3184 if list.is_empty() {
3185 return Err(format!(
3186 "messages_path must not resolve to an empty list: {path}"
3187 ));
3188 }
3189
3190 let mut messages = Vec::with_capacity(list.len());
3191 for (index, item) in list.into_iter().enumerate() {
3192 let mut message = match item.role {
3193 Role::System => Message::system(item.content),
3194 Role::User => Message::user(item.content),
3195 Role::Assistant => Message::assistant(item.content),
3196 Role::Tool => {
3197 let tool_call_id = item
3198 .tool_call_id
3199 .ok_or_else(|| format!("tool message at index {index} missing tool_call_id"))?;
3200 Message::tool(item.content, tool_call_id)
3201 }
3202 };
3203
3204 if let Some(name) = item.name {
3205 message = message.with_name(name);
3206 }
3207
3208 messages.push(message);
3209 }
3210
3211 Ok(messages)
3212}
3213
3214pub fn verify_yaml_workflow(workflow: &YamlWorkflow) -> Vec<YamlWorkflowDiagnostic> {
3215 let mut diagnostics = Vec::new();
3216 let known_ids: HashMap<&str, &YamlNode> = workflow
3217 .nodes
3218 .iter()
3219 .map(|node| (node.id.as_str(), node))
3220 .collect();
3221
3222 if !known_ids.contains_key(workflow.entry_node.as_str()) {
3223 diagnostics.push(YamlWorkflowDiagnostic {
3224 node_id: None,
3225 code: "missing_entry".to_string(),
3226 severity: YamlWorkflowDiagnosticSeverity::Error,
3227 message: format!("entry node '{}' does not exist", workflow.entry_node),
3228 });
3229 }
3230
3231 for edge in &workflow.edges {
3232 if !known_ids.contains_key(edge.from.as_str()) {
3233 diagnostics.push(YamlWorkflowDiagnostic {
3234 node_id: Some(edge.from.clone()),
3235 code: "unknown_edge_from".to_string(),
3236 severity: YamlWorkflowDiagnosticSeverity::Error,
3237 message: format!("edge.from '{}' does not exist", edge.from),
3238 });
3239 }
3240 if !known_ids.contains_key(edge.to.as_str()) {
3241 diagnostics.push(YamlWorkflowDiagnostic {
3242 node_id: Some(edge.to.clone()),
3243 code: "unknown_edge_to".to_string(),
3244 severity: YamlWorkflowDiagnosticSeverity::Error,
3245 message: format!("edge.to '{}' does not exist", edge.to),
3246 });
3247 }
3248 }
3249
3250 for node in &workflow.nodes {
3251 if let Some(llm) = &node.node_type.llm_call {
3252 if llm.model.trim().is_empty() {
3253 diagnostics.push(YamlWorkflowDiagnostic {
3254 node_id: Some(node.id.clone()),
3255 code: "empty_model".to_string(),
3256 severity: YamlWorkflowDiagnosticSeverity::Error,
3257 message: "llm_call.model must not be empty".to_string(),
3258 });
3259 }
3260 if llm.stream.unwrap_or(false) && llm.heal.unwrap_or(false) {
3261 diagnostics.push(YamlWorkflowDiagnostic {
3262 node_id: Some(node.id.clone()),
3263 code: "stream_heal_conflict".to_string(),
3264 severity: YamlWorkflowDiagnosticSeverity::Warning,
3265 message:
3266 "llm_call.stream=true with heal=true is not streamable; runtime will disable streaming"
3267 .to_string(),
3268 });
3269 }
3270
3271 if llm.max_tool_roundtrips.unwrap_or(1) == 0 {
3272 diagnostics.push(YamlWorkflowDiagnostic {
3273 node_id: Some(node.id.clone()),
3274 code: "invalid_max_tool_roundtrips".to_string(),
3275 severity: YamlWorkflowDiagnosticSeverity::Error,
3276 message: "llm_call.max_tool_roundtrips must be >= 1".to_string(),
3277 });
3278 }
3279
3280 if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
3281 if global_key.trim().is_empty() {
3282 diagnostics.push(YamlWorkflowDiagnostic {
3283 node_id: Some(node.id.clone()),
3284 code: "empty_tool_calls_global_key".to_string(),
3285 severity: YamlWorkflowDiagnosticSeverity::Error,
3286 message: "llm_call.tool_calls_global_key must not be empty".to_string(),
3287 });
3288 }
3289 }
3290
3291 match normalize_tool_choice(llm.tool_choice.clone()) {
3292 Ok(choice) => {
3293 if let Some(ToolChoice::Tool(choice_tool)) = choice.as_ref() {
3294 if !llm.tools.iter().any(|tool| match (llm.tools_format, tool) {
3295 (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
3296 openai.function.name == choice_tool.function.name
3297 }
3298 (
3299 YamlToolFormat::Simplified,
3300 YamlToolDeclaration::Simplified(simple),
3301 ) => simple.name == choice_tool.function.name,
3302 _ => false,
3303 }) {
3304 diagnostics.push(YamlWorkflowDiagnostic {
3305 node_id: Some(node.id.clone()),
3306 code: "unknown_tool_choice_function".to_string(),
3307 severity: YamlWorkflowDiagnosticSeverity::Error,
3308 message: format!(
3309 "llm_call.tool_choice references unknown function '{}'",
3310 choice_tool.function.name
3311 ),
3312 });
3313 }
3314 }
3315 }
3316 Err(message) => {
3317 diagnostics.push(YamlWorkflowDiagnostic {
3318 node_id: Some(node.id.clone()),
3319 code: "invalid_tool_choice".to_string(),
3320 severity: YamlWorkflowDiagnosticSeverity::Error,
3321 message,
3322 });
3323 }
3324 }
3325
3326 let normalized_tools = match normalize_llm_tools(llm) {
3327 Ok(tools) => tools,
3328 Err(message) => {
3329 diagnostics.push(YamlWorkflowDiagnostic {
3330 node_id: Some(node.id.clone()),
3331 code: "invalid_tools_format".to_string(),
3332 severity: YamlWorkflowDiagnosticSeverity::Error,
3333 message,
3334 });
3335 Vec::new()
3336 }
3337 };
3338
3339 let mut seen_tool_names = HashSet::new();
3340 for tool in &normalized_tools {
3341 let name = tool.definition.function.name.trim();
3342 if name.is_empty() {
3343 diagnostics.push(YamlWorkflowDiagnostic {
3344 node_id: Some(node.id.clone()),
3345 code: "empty_tool_name".to_string(),
3346 severity: YamlWorkflowDiagnosticSeverity::Error,
3347 message: "tool function name must not be empty".to_string(),
3348 });
3349 }
3350 if !seen_tool_names.insert(tool.definition.function.name.clone()) {
3351 diagnostics.push(YamlWorkflowDiagnostic {
3352 node_id: Some(node.id.clone()),
3353 code: "duplicate_tool_name".to_string(),
3354 severity: YamlWorkflowDiagnosticSeverity::Error,
3355 message: format!(
3356 "duplicate tool function name '{}' in node",
3357 tool.definition.function.name
3358 ),
3359 });
3360 }
3361
3362 let schema = tool
3363 .definition
3364 .function
3365 .parameters
3366 .clone()
3367 .unwrap_or(Value::Null);
3368 if schema.is_null() {
3369 diagnostics.push(YamlWorkflowDiagnostic {
3370 node_id: Some(node.id.clone()),
3371 code: "missing_tool_input_schema".to_string(),
3372 severity: YamlWorkflowDiagnosticSeverity::Error,
3373 message: format!(
3374 "tool '{}' is missing input schema",
3375 tool.definition.function.name
3376 ),
3377 });
3378 } else if let Err(message) = validate_json_schema(&schema) {
3379 diagnostics.push(YamlWorkflowDiagnostic {
3380 node_id: Some(node.id.clone()),
3381 code: "invalid_tool_input_schema".to_string(),
3382 severity: YamlWorkflowDiagnosticSeverity::Error,
3383 message: format!(
3384 "tool '{}' has invalid input schema: {}",
3385 tool.definition.function.name, message
3386 ),
3387 });
3388 }
3389
3390 if let Some(output_schema) = tool.output_schema.as_ref() {
3391 if let Err(message) = validate_json_schema(output_schema) {
3392 diagnostics.push(YamlWorkflowDiagnostic {
3393 node_id: Some(node.id.clone()),
3394 code: "invalid_tool_output_schema".to_string(),
3395 severity: YamlWorkflowDiagnosticSeverity::Error,
3396 message: format!(
3397 "tool '{}' has invalid output schema: {}",
3398 tool.definition.function.name, message
3399 ),
3400 });
3401 }
3402 }
3403 }
3404 }
3405
3406 if let Some(switch) = &node.node_type.switch {
3407 for branch in &switch.branches {
3408 if !known_ids.contains_key(branch.target.as_str()) {
3409 diagnostics.push(YamlWorkflowDiagnostic {
3410 node_id: Some(node.id.clone()),
3411 code: "unknown_switch_target".to_string(),
3412 severity: YamlWorkflowDiagnosticSeverity::Error,
3413 message: format!("switch branch target '{}' does not exist", branch.target),
3414 });
3415 }
3416 }
3417 if !known_ids.contains_key(switch.default.as_str()) {
3418 diagnostics.push(YamlWorkflowDiagnostic {
3419 node_id: Some(node.id.clone()),
3420 code: "unknown_switch_default".to_string(),
3421 severity: YamlWorkflowDiagnosticSeverity::Error,
3422 message: format!("switch default target '{}' does not exist", switch.default),
3423 });
3424 }
3425 }
3426
3427 if let Some(config) = node.config.as_ref() {
3428 if let Some(update_globals) = config.update_globals.as_ref() {
3429 for (key, update) in update_globals {
3430 let is_valid_op =
3431 matches!(update.op.as_str(), "set" | "append" | "increment" | "merge");
3432 if !is_valid_op {
3433 diagnostics.push(YamlWorkflowDiagnostic {
3434 node_id: Some(node.id.clone()),
3435 code: "unknown_update_op".to_string(),
3436 severity: YamlWorkflowDiagnosticSeverity::Error,
3437 message: format!(
3438 "update_globals key '{}' has unknown op '{}'; expected set|append|increment|merge",
3439 key, update.op
3440 ),
3441 });
3442 }
3443
3444 if update.op != "increment" && update.from.is_none() {
3445 diagnostics.push(YamlWorkflowDiagnostic {
3446 node_id: Some(node.id.clone()),
3447 code: "missing_update_from".to_string(),
3448 severity: YamlWorkflowDiagnosticSeverity::Error,
3449 message: format!(
3450 "update_globals key '{}' with op '{}' requires 'from'",
3451 key, update.op
3452 ),
3453 });
3454 }
3455 }
3456 }
3457 }
3458 }
3459
3460 diagnostics
3461}
3462
3463fn resolve_path<'a>(value: &'a Value, path: &str) -> Option<&'a Value> {
3464 path.split('.')
3465 .filter(|segment| !segment.is_empty())
3466 .try_fold(value, |current, segment| {
3467 if let Ok(index) = segment.parse::<usize>() {
3468 return current.get(index);
3469 }
3470 current.get(segment)
3471 })
3472}
3473
3474fn interpolate_template(template: &str, context: &Value) -> String {
3475 let mut out = String::with_capacity(template.len());
3476 let mut rest = template;
3477
3478 loop {
3479 let Some(start) = rest.find("{{") else {
3480 out.push_str(rest);
3481 break;
3482 };
3483
3484 out.push_str(&rest[..start]);
3485 let after_start = &rest[start + 2..];
3486 let Some(end) = after_start.find("}}") else {
3487 out.push_str(&rest[start..]);
3488 break;
3489 };
3490
3491 let expr = after_start[..end].trim();
3492 let source_path = expr.trim_start_matches("$.");
3493 let replacement = resolve_path(context, source_path)
3494 .map(value_to_template_string)
3495 .unwrap_or_default();
3496 out.push_str(replacement.as_str());
3497
3498 rest = &after_start[end + 2..];
3499 }
3500
3501 out
3502}
3503
3504fn collect_template_bindings(template: &str, context: &Value) -> Vec<YamlTemplateBinding> {
3505 let mut bindings = Vec::new();
3506 let mut rest = template;
3507
3508 loop {
3509 let Some(start) = rest.find("{{") else {
3510 break;
3511 };
3512
3513 let after_start = &rest[start + 2..];
3514 let Some(end) = after_start.find("}}") else {
3515 break;
3516 };
3517
3518 let expr = after_start[..end].trim();
3519 let source_path = expr.trim_start_matches("$.").to_string();
3520 let resolved = resolve_path(context, source_path.as_str()).cloned();
3521 let missing = resolved.is_none();
3522 let resolved_value = resolved.unwrap_or(Value::Null);
3523 bindings.push(YamlTemplateBinding {
3524 index: bindings.len(),
3525 expression: expr.to_string(),
3526 source_path,
3527 resolved_type: json_type_name(&resolved_value).to_string(),
3528 missing,
3529 resolved: resolved_value,
3530 });
3531
3532 rest = &after_start[end + 2..];
3533 }
3534
3535 bindings
3536}
3537
3538fn json_type_name(value: &Value) -> &'static str {
3539 match value {
3540 Value::Null => "null",
3541 Value::Bool(_) => "bool",
3542 Value::Number(_) => "number",
3543 Value::String(_) => "string",
3544 Value::Array(_) => "array",
3545 Value::Object(_) => "object",
3546 }
3547}
3548
3549fn value_to_template_string(value: &Value) -> String {
3550 match value {
3551 Value::Null => String::new(),
3552 Value::Bool(v) => v.to_string(),
3553 Value::Number(v) => v.to_string(),
3554 Value::String(v) => v.clone(),
3555 Value::Array(_) | Value::Object(_) => serde_json::to_string(value).unwrap_or_default(),
3556 }
3557}
3558
3559fn apply_set_globals(
3560 node: &YamlNode,
3561 outputs: &BTreeMap<String, Value>,
3562 workflow_input: &Value,
3563 globals: &mut serde_json::Map<String, Value>,
3564) {
3565 let Some(config) = node.config.as_ref() else {
3566 return;
3567 };
3568 let Some(set_globals) = config.set_globals.as_ref() else {
3569 return;
3570 };
3571
3572 let context = json!({
3573 "input": workflow_input,
3574 "nodes": outputs,
3575 "globals": Value::Object(globals.clone())
3576 });
3577
3578 for (key, expr) in set_globals {
3579 let value = resolve_path(&context, expr.as_str())
3580 .cloned()
3581 .unwrap_or(Value::Null);
3582 globals.insert(key.clone(), value);
3583 }
3584}
3585
3586fn apply_update_globals(
3587 node: &YamlNode,
3588 outputs: &BTreeMap<String, Value>,
3589 workflow_input: &Value,
3590 globals: &mut serde_json::Map<String, Value>,
3591) {
3592 let Some(config) = node.config.as_ref() else {
3593 return;
3594 };
3595 let Some(update_globals) = config.update_globals.as_ref() else {
3596 return;
3597 };
3598
3599 let context = json!({
3600 "input": workflow_input,
3601 "nodes": outputs,
3602 "globals": Value::Object(globals.clone())
3603 });
3604
3605 for (key, update) in update_globals {
3606 match update.op.as_str() {
3607 "set" => {
3608 if let Some(path) = update.from.as_ref() {
3609 let value = resolve_path(&context, path.as_str())
3610 .cloned()
3611 .unwrap_or(Value::Null);
3612 globals.insert(key.clone(), value);
3613 }
3614 }
3615 "append" => {
3616 if let Some(path) = update.from.as_ref() {
3617 let value = resolve_path(&context, path.as_str())
3618 .cloned()
3619 .unwrap_or(Value::Null);
3620 let entry = globals
3621 .entry(key.clone())
3622 .or_insert_with(|| Value::Array(Vec::new()));
3623 match entry {
3624 Value::Array(items) => items.push(value),
3625 other => {
3626 let existing = other.clone();
3627 *other = Value::Array(vec![existing, value]);
3628 }
3629 }
3630 }
3631 }
3632 "increment" => {
3633 let by = update.by.unwrap_or(1.0);
3634 let current = globals
3635 .get(key.as_str())
3636 .and_then(Value::as_f64)
3637 .unwrap_or(0.0);
3638 if let Some(next) = serde_json::Number::from_f64(current + by) {
3639 globals.insert(key.clone(), Value::Number(next));
3640 }
3641 }
3642 "merge" => {
3643 if let Some(path) = update.from.as_ref() {
3644 let source = resolve_path(&context, path.as_str())
3645 .cloned()
3646 .unwrap_or(Value::Null);
3647 if let Value::Object(source_map) = source {
3648 let target = globals
3649 .entry(key.clone())
3650 .or_insert_with(|| Value::Object(serde_json::Map::new()));
3651 if let Value::Object(target_map) = target {
3652 target_map.extend(source_map);
3653 } else {
3654 *target = Value::Object(source_map);
3655 }
3656 }
3657 }
3658 }
3659 _ => {}
3660 }
3661 }
3662}
3663
3664fn llm_output_schema_for_node(node: &YamlNode) -> Value {
3665 if let Some(schema) = node
3666 .config
3667 .as_ref()
3668 .and_then(|cfg| cfg.output_schema.clone())
3669 {
3670 return schema;
3671 }
3672
3673 default_llm_output_schema()
3674}
3675
3676fn normalize_tool_choice(
3677 config: Option<YamlToolChoiceConfig>,
3678) -> Result<Option<ToolChoice>, String> {
3679 let Some(config) = config else {
3680 return Ok(None);
3681 };
3682
3683 let choice = match config {
3684 YamlToolChoiceConfig::Mode(mode) => ToolChoice::Mode(mode),
3685 YamlToolChoiceConfig::Function { function } => ToolChoice::Tool(ToolChoiceTool {
3686 tool_type: ToolType::Function,
3687 function: ToolChoiceFunction { name: function },
3688 }),
3689 YamlToolChoiceConfig::OpenAi(tool) => ToolChoice::Tool(tool),
3690 };
3691
3692 Ok(Some(choice))
3693}
3694
3695fn normalize_llm_tools(llm: &YamlLlmCall) -> Result<Vec<YamlResolvedTool>, String> {
3696 llm.tools
3697 .iter()
3698 .map(|tool| match (llm.tools_format, tool) {
3699 (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
3700 let definition = ToolDefinition {
3701 tool_type: openai.tool_type.unwrap_or(ToolType::Function),
3702 function: ToolFunction {
3703 name: openai.function.name.clone(),
3704 description: openai.function.description.clone(),
3705 parameters: openai.function.parameters.clone(),
3706 },
3707 };
3708 Ok(YamlResolvedTool {
3709 definition,
3710 output_schema: openai.function.output_schema.clone(),
3711 })
3712 }
3713 (YamlToolFormat::Simplified, YamlToolDeclaration::Simplified(simple)) => {
3714 let definition = ToolDefinition {
3715 tool_type: ToolType::Function,
3716 function: ToolFunction {
3717 name: simple.name.clone(),
3718 description: simple.description.clone(),
3719 parameters: Some(simple.input_schema.clone()),
3720 },
3721 };
3722 Ok(YamlResolvedTool {
3723 definition,
3724 output_schema: simple.output_schema.clone(),
3725 })
3726 }
3727 (YamlToolFormat::Openai, _) => {
3728 Err("tools_format=openai requires OpenAI-style tool declarations".to_string())
3729 }
3730 (YamlToolFormat::Simplified, _) => {
3731 Err("tools_format=simplified requires simplified tool declarations".to_string())
3732 }
3733 })
3734 .collect()
3735}
3736
3737fn default_llm_output_schema() -> Value {
3738 json!({
3739 "type": "object",
3740 "additionalProperties": true
3741 })
3742}
3743
3744fn mock_rag(topic: &str) -> Value {
3745 let (kb_source, playbook) = match topic {
3746 "probation" => (
3747 "hr_policy/probation.md",
3748 "Collect manager review, performance evidence, and probation timeline.",
3749 ),
3750 "leave_request" => (
3751 "hr_policy/leave.md",
3752 "Validate leave balance, manager approval, and blackout dates.",
3753 ),
3754 "supply_chain_order_assessment" => (
3755 "supply_chain/order_assessment.md",
3756 "Review order specs, inventory risk, and vendor lead-time guidance.",
3757 ),
3758 "supply_chain_order_replacement" => (
3759 "supply_chain/order_replacement.md",
3760 "Collect order id, damage proof, and replacement SLA policy.",
3761 ),
3762 "termination_first_time_offense" => (
3763 "hr_policy/termination_first_offense.md",
3764 "Validate first-incident criteria and route to HRBP review.",
3765 ),
3766 "termination_repeated_offense" => (
3767 "hr_policy/termination_repeated_offense.md",
3768 "Collect prior warnings and escalation approvals before final action.",
3769 ),
3770 _ => (
3771 "shared/request_clarification.md",
3772 "Request clarifying details before routing.",
3773 ),
3774 };
3775
3776 json!({
3777 "kb_source": kb_source,
3778 "playbook": playbook,
3779 })
3780}
3781
3782fn mock_custom_worker_output(
3783 handler: &str,
3784 payload: &Value,
3785) -> Result<Value, YamlWorkflowRunError> {
3786 if let Some(topic) = payload.get("topic").and_then(Value::as_str) {
3787 let mut value = mock_rag(topic);
3788 if let Value::Object(object) = &mut value {
3789 object.insert("handler".to_string(), Value::String(handler.to_string()));
3790 }
3791 return Ok(value);
3792 }
3793
3794 Err(YamlWorkflowRunError::UnsupportedCustomHandler {
3795 handler: handler.to_string(),
3796 })
3797}
3798
3799#[derive(Debug, Clone, Deserialize)]
3800pub struct YamlWorkflow {
3801 pub id: String,
3802 pub entry_node: String,
3803 #[serde(default)]
3804 pub nodes: Vec<YamlNode>,
3805 #[serde(default)]
3806 pub edges: Vec<YamlEdge>,
3807}
3808
3809#[derive(Debug, Clone, Deserialize)]
3810pub struct YamlNode {
3811 pub id: String,
3812 pub node_type: YamlNodeType,
3813 pub config: Option<YamlNodeConfig>,
3814}
3815
3816impl YamlNode {
3817 fn kind_name(&self) -> &'static str {
3818 if self.node_type.llm_call.is_some() {
3819 "llm_call"
3820 } else if self.node_type.switch.is_some() {
3821 "switch"
3822 } else if self.node_type.custom_worker.is_some() {
3823 "custom_worker"
3824 } else {
3825 "unknown"
3826 }
3827 }
3828}
3829
3830#[derive(Debug, Clone, Deserialize)]
3831pub struct YamlNodeType {
3832 pub llm_call: Option<YamlLlmCall>,
3833 pub switch: Option<YamlSwitch>,
3834 pub custom_worker: Option<YamlCustomWorker>,
3835}
3836
3837#[derive(Debug, Clone, Deserialize)]
3838pub struct YamlLlmCall {
3839 pub model: String,
3840 pub stream: Option<bool>,
3841 pub stream_json_as_text: Option<bool>,
3842 pub heal: Option<bool>,
3843 pub messages_path: Option<String>,
3844 pub append_prompt_as_user: Option<bool>,
3845 #[serde(default)]
3846 pub tools_format: YamlToolFormat,
3847 #[serde(default)]
3848 pub tools: Vec<YamlToolDeclaration>,
3849 pub tool_choice: Option<YamlToolChoiceConfig>,
3850 pub max_tool_roundtrips: Option<u8>,
3851 pub tool_calls_global_key: Option<String>,
3852}
3853
3854#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
3855#[serde(rename_all = "snake_case")]
3856pub enum YamlToolFormat {
3857 #[default]
3858 Openai,
3859 Simplified,
3860}
3861
3862#[derive(Debug, Clone, Deserialize)]
3863#[serde(untagged)]
3864pub enum YamlToolDeclaration {
3865 OpenAi(YamlOpenAiToolDeclaration),
3866 Simplified(YamlSimplifiedToolDeclaration),
3867}
3868
3869#[derive(Debug, Clone, Deserialize)]
3870pub struct YamlOpenAiToolDeclaration {
3871 #[serde(rename = "type")]
3872 pub tool_type: Option<ToolType>,
3873 pub function: YamlOpenAiToolFunction,
3874}
3875
3876#[derive(Debug, Clone, Deserialize)]
3877pub struct YamlOpenAiToolFunction {
3878 pub name: String,
3879 pub description: Option<String>,
3880 pub parameters: Option<Value>,
3881 pub output_schema: Option<Value>,
3882}
3883
3884#[derive(Debug, Clone, Deserialize)]
3885pub struct YamlSimplifiedToolDeclaration {
3886 pub name: String,
3887 pub description: Option<String>,
3888 pub input_schema: Value,
3889 pub output_schema: Option<Value>,
3890}
3891
3892#[derive(Debug, Clone, Deserialize)]
3893#[serde(untagged)]
3894pub enum YamlToolChoiceConfig {
3895 Mode(ToolChoiceMode),
3896 Function { function: String },
3897 OpenAi(ToolChoiceTool),
3898}
3899
3900#[derive(Debug, Clone, Deserialize)]
3901pub struct YamlSwitch {
3902 #[serde(default)]
3903 pub branches: Vec<YamlSwitchBranch>,
3904 pub default: String,
3905}
3906
3907#[derive(Debug, Clone, Deserialize)]
3908pub struct YamlSwitchBranch {
3909 pub condition: String,
3910 pub target: String,
3911}
3912
3913#[derive(Debug, Clone, Deserialize)]
3914pub struct YamlCustomWorker {
3915 pub handler: String,
3916}
3917
3918#[derive(Debug, Clone, Deserialize)]
3919pub struct YamlNodeConfig {
3920 pub prompt: Option<String>,
3921 #[serde(default, alias = "schema")]
3922 pub output_schema: Option<Value>,
3923 pub payload: Option<Value>,
3924 pub set_globals: Option<HashMap<String, String>>,
3925 pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
3926}
3927
3928#[derive(Debug, Clone, Deserialize)]
3929pub struct YamlGlobalUpdate {
3930 pub op: String,
3931 pub from: Option<String>,
3932 pub by: Option<f64>,
3933}
3934
3935#[derive(Debug, Clone, Deserialize)]
3936pub struct YamlEdge {
3937 pub from: String,
3938 pub to: String,
3939}
3940
3941#[cfg(test)]
3942mod tests {
3943 use super::*;
3944 use simple_agent_type::provider::{Provider, ProviderRequest, ProviderResponse};
3945 use simple_agent_type::response::{CompletionChoice, CompletionResponse, Usage};
3946 use simple_agent_type::tool::{ToolCallFunction, ToolType};
3947 use simple_agent_type::{Result as SaResult, SimpleAgentsError};
3948 use simple_agents_core::SimpleAgentsClientBuilder;
3949 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3950 use std::sync::{Arc, Mutex, OnceLock};
3951
3952 fn stream_debug_env_lock() -> &'static Mutex<()> {
3953 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
3954 LOCK.get_or_init(|| Mutex::new(()))
3955 }
3956
3957 struct MockExecutor;
3958
3959 struct RecordingSink {
3960 events: Mutex<Vec<YamlWorkflowEvent>>,
3961 }
3962
3963 struct CancelAfterFirstEventSink {
3964 cancelled: AtomicBool,
3965 }
3966
3967 impl YamlWorkflowEventSink for CancelAfterFirstEventSink {
3968 fn emit(&self, _event: &YamlWorkflowEvent) {
3969 self.cancelled.store(true, Ordering::SeqCst);
3970 }
3971
3972 fn is_cancelled(&self) -> bool {
3973 self.cancelled.load(Ordering::SeqCst)
3974 }
3975 }
3976
3977 struct CountingExecutor {
3978 call_count: AtomicUsize,
3979 }
3980
3981 struct CapturingWorker {
3982 context: Mutex<Option<Value>>,
3983 }
3984
3985 struct ToolLoopProvider;
3986
3987 #[async_trait]
3988 impl Provider for ToolLoopProvider {
3989 fn name(&self) -> &str {
3990 "openai"
3991 }
3992
3993 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
3994 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
3995 Ok(ProviderRequest::new("mock://tool-loop").with_body(body))
3996 }
3997
3998 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3999 let request: CompletionRequest =
4000 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
4001
4002 let has_tools = request
4003 .tools
4004 .as_ref()
4005 .is_some_and(|tools| !tools.is_empty());
4006 let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
4007
4008 let response = if has_tools && !has_tool_result {
4009 CompletionResponse {
4010 id: "resp_tool_1".to_string(),
4011 model: request.model.clone(),
4012 choices: vec![CompletionChoice {
4013 index: 0,
4014 message: Message::assistant("").with_tool_calls(vec![ToolCall {
4015 id: "call_get_context".to_string(),
4016 tool_type: ToolType::Function,
4017 function: ToolCallFunction {
4018 name: "get_customer_context".to_string(),
4019 arguments: "{\"order_id\":\"123\"}".to_string(),
4020 },
4021 }]),
4022 finish_reason: FinishReason::ToolCalls,
4023 logprobs: None,
4024 }],
4025 usage: Usage::new(10, 5),
4026 created: None,
4027 provider: Some(self.name().to_string()),
4028 healing_metadata: None,
4029 }
4030 } else if has_tools && has_tool_result {
4031 CompletionResponse {
4032 id: "resp_tool_2".to_string(),
4033 model: request.model.clone(),
4034 choices: vec![CompletionChoice {
4035 index: 0,
4036 message: Message::assistant("{\"state\":\"done\"}"),
4037 finish_reason: FinishReason::Stop,
4038 logprobs: None,
4039 }],
4040 usage: Usage::new(12, 6),
4041 created: None,
4042 provider: Some(self.name().to_string()),
4043 healing_metadata: None,
4044 }
4045 } else {
4046 let prompt = request
4047 .messages
4048 .iter()
4049 .rev()
4050 .find(|m| m.role == Role::User)
4051 .map(|m| m.content.clone())
4052 .unwrap_or_default();
4053 let payload = json!({
4054 "subject": "ok",
4055 "body": prompt,
4056 })
4057 .to_string();
4058 CompletionResponse {
4059 id: "resp_final".to_string(),
4060 model: request.model.clone(),
4061 choices: vec![CompletionChoice {
4062 index: 0,
4063 message: Message::assistant(payload),
4064 finish_reason: FinishReason::Stop,
4065 logprobs: None,
4066 }],
4067 usage: Usage::new(8, 4),
4068 created: None,
4069 provider: Some(self.name().to_string()),
4070 healing_metadata: None,
4071 }
4072 };
4073
4074 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
4075 Ok(ProviderResponse::new(200, body))
4076 }
4077
4078 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
4079 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
4080 }
4081 }
4082
4083 struct FixedToolWorker {
4084 payload: Value,
4085 }
4086
4087 #[async_trait]
4088 impl YamlWorkflowCustomWorkerExecutor for FixedToolWorker {
4089 async fn execute(
4090 &self,
4091 _handler: &str,
4092 _payload: &Value,
4093 _email_text: &str,
4094 _context: &Value,
4095 ) -> Result<Value, String> {
4096 Ok(self.payload.clone())
4097 }
4098 }
4099
4100 #[async_trait]
4101 impl YamlWorkflowLlmExecutor for CountingExecutor {
4102 async fn complete_structured(
4103 &self,
4104 _request: YamlLlmExecutionRequest,
4105 _event_sink: Option<&dyn YamlWorkflowEventSink>,
4106 ) -> Result<YamlLlmExecutionResult, String> {
4107 self.call_count.fetch_add(1, Ordering::SeqCst);
4108 Ok(YamlLlmExecutionResult {
4109 payload: json!({"state":"ok"}),
4110 usage: None,
4111 ttft_ms: None,
4112 tool_calls: Vec::new(),
4113 })
4114 }
4115 }
4116
4117 impl YamlWorkflowEventSink for RecordingSink {
4118 fn emit(&self, event: &YamlWorkflowEvent) {
4119 self.events
4120 .lock()
4121 .expect("recording sink lock should not be poisoned")
4122 .push(event.clone());
4123 }
4124 }
4125
4126 #[async_trait]
4127 impl YamlWorkflowCustomWorkerExecutor for CapturingWorker {
4128 async fn execute(
4129 &self,
4130 _handler: &str,
4131 _payload: &Value,
4132 _email_text: &str,
4133 context: &Value,
4134 ) -> Result<Value, String> {
4135 let mut guard = self
4136 .context
4137 .lock()
4138 .map_err(|_| "capturing worker lock should not be poisoned".to_string())?;
4139 *guard = Some(context.clone());
4140 Ok(json!({"ok": true}))
4141 }
4142 }
4143
4144 #[async_trait]
4145 impl YamlWorkflowLlmExecutor for MockExecutor {
4146 async fn complete_structured(
4147 &self,
4148 request: YamlLlmExecutionRequest,
4149 _event_sink: Option<&dyn YamlWorkflowEventSink>,
4150 ) -> Result<YamlLlmExecutionResult, String> {
4151 let prompt = request.prompt;
4152 if prompt.contains("exactly one category") {
4153 return Ok(YamlLlmExecutionResult {
4154 payload: json!({"category":"termination","reason":"mock"}),
4155 usage: Some(YamlLlmTokenUsage {
4156 prompt_tokens: 10,
4157 completion_tokens: 5,
4158 total_tokens: 15,
4159 thinking_tokens: None,
4160 }),
4161 ttft_ms: None,
4162 tool_calls: Vec::new(),
4163 });
4164 }
4165 if prompt.contains("Determine termination subtype") {
4166 return Ok(YamlLlmExecutionResult {
4167 payload: json!({"subtype":"repeated_offense","reason":"mock"}),
4168 usage: Some(YamlLlmTokenUsage {
4169 prompt_tokens: 12,
4170 completion_tokens: 6,
4171 total_tokens: 18,
4172 thinking_tokens: None,
4173 }),
4174 ttft_ms: None,
4175 tool_calls: Vec::new(),
4176 });
4177 }
4178 if prompt.contains("Determine supply chain subtype") {
4179 return Ok(YamlLlmExecutionResult {
4180 payload: json!({"subtype":"order_replacement","reason":"mock"}),
4181 usage: Some(YamlLlmTokenUsage {
4182 prompt_tokens: 11,
4183 completion_tokens: 4,
4184 total_tokens: 15,
4185 thinking_tokens: None,
4186 }),
4187 ttft_ms: None,
4188 tool_calls: Vec::new(),
4189 });
4190 }
4191 Err("unexpected prompt".to_string())
4192 }
4193 }
4194
4195 #[tokio::test]
4196 async fn runs_yaml_workflow_and_returns_step_timings() {
4197 let yaml = r#"
4198id: email-intake-classification
4199entry_node: classify_top_level
4200nodes:
4201 - id: classify_top_level
4202 node_type:
4203 llm_call:
4204 model: gpt-4.1
4205 config:
4206 prompt: |
4207 Classify this email into exactly one category:
4208 {{ input.email_text }}
4209 - id: route_top_level
4210 node_type:
4211 switch:
4212 branches:
4213 - condition: '$.nodes.classify_top_level.output.category == "termination"'
4214 target: classify_termination_subtype
4215 default: rag_clarification
4216 - id: classify_termination_subtype
4217 node_type:
4218 llm_call:
4219 model: gpt-4.1
4220 config:
4221 prompt: |
4222 Determine termination subtype:
4223 {{ input.email_text }}
4224 - id: route_termination_subtype
4225 node_type:
4226 switch:
4227 branches:
4228 - condition: '$.nodes.classify_termination_subtype.output.subtype == "repeated_offense"'
4229 target: rag_termination_repeated_offense
4230 default: rag_clarification
4231 - id: rag_termination_repeated_offense
4232 node_type:
4233 custom_worker:
4234 handler: GetRagData
4235 config:
4236 payload:
4237 topic: termination_repeated_offense
4238 - id: rag_clarification
4239 node_type:
4240 custom_worker:
4241 handler: GetRagData
4242 config:
4243 payload:
4244 topic: clarification
4245edges:
4246 - from: classify_top_level
4247 to: route_top_level
4248 - from: classify_termination_subtype
4249 to: route_termination_subtype
4250"#;
4251
4252 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4253 let output = run_email_workflow_yaml(&workflow, "test", &MockExecutor)
4254 .await
4255 .expect("yaml workflow should execute");
4256
4257 assert_eq!(output.workflow_id, "email-intake-classification");
4258 assert_eq!(output.terminal_node, "rag_termination_repeated_offense");
4259 assert!(!output.step_timings.is_empty());
4260 assert_eq!(output.step_timings.len(), output.trace.len());
4261 assert!(output
4262 .outputs
4263 .contains_key("rag_termination_repeated_offense"));
4264 assert_eq!(output.total_input_tokens, 22);
4265 assert_eq!(output.total_output_tokens, 11);
4266 assert_eq!(output.total_tokens, 33);
4267 }
4268
4269 #[tokio::test]
4270 async fn emits_resolved_llm_input_event_with_bindings() {
4271 let yaml = r#"
4272id: email-intake-classification
4273entry_node: classify_top_level
4274nodes:
4275 - id: classify_top_level
4276 node_type:
4277 llm_call:
4278 model: gpt-4.1
4279 config:
4280 prompt: |
4281 Classify this email into exactly one category:
4282 {{ input.email_text }}
4283"#;
4284
4285 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4286 let sink = RecordingSink {
4287 events: Mutex::new(Vec::new()),
4288 };
4289
4290 let output = run_email_workflow_yaml_with_custom_worker_and_events(
4291 &workflow,
4292 "Need help with termination",
4293 &MockExecutor,
4294 None,
4295 Some(&sink),
4296 )
4297 .await
4298 .expect("yaml workflow should execute");
4299
4300 assert_eq!(output.terminal_node, "classify_top_level");
4301
4302 let events = sink
4303 .events
4304 .lock()
4305 .expect("recording sink lock should not be poisoned");
4306 let llm_event = events
4307 .iter()
4308 .find(|event| event.event_type == "node_llm_input_resolved")
4309 .expect("expected llm input telemetry event");
4310
4311 let metadata = llm_event
4312 .metadata
4313 .as_ref()
4314 .expect("llm input event must include metadata");
4315 assert_eq!(metadata["model"], Value::String("gpt-4.1".to_string()));
4316 assert_eq!(metadata["stream_requested"], Value::Bool(false));
4317 assert_eq!(metadata["heal_requested"], Value::Bool(false));
4318 assert!(metadata["prompt"]
4319 .as_str()
4320 .expect("prompt should be a string")
4321 .contains("Need help with termination"));
4322
4323 let bindings = metadata["bindings"]
4324 .as_array()
4325 .expect("bindings should be an array");
4326 assert_eq!(bindings.len(), 1);
4327 assert_eq!(
4328 bindings[0]["source_path"],
4329 Value::String("input.email_text".to_string())
4330 );
4331 assert_eq!(
4332 bindings[0]["resolved"],
4333 Value::String("Need help with termination".to_string())
4334 );
4335 assert_eq!(bindings[0]["missing"], Value::Bool(false));
4336 assert_eq!(
4337 bindings[0]["resolved_type"],
4338 Value::String("string".to_string())
4339 );
4340 }
4341
4342 #[tokio::test]
4343 async fn workflow_completed_event_includes_nerdstats_by_default() {
4344 let yaml = r#"
4345id: nerdstats-default
4346entry_node: classify
4347nodes:
4348 - id: classify
4349 node_type:
4350 llm_call:
4351 model: gpt-4.1
4352 config:
4353 prompt: |
4354 Classify this email into exactly one category:
4355 {{ input.email_text }}
4356"#;
4357
4358 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4359 let sink = RecordingSink {
4360 events: Mutex::new(Vec::new()),
4361 };
4362
4363 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
4364 &workflow,
4365 &json!({"email_text":"hello"}),
4366 &MockExecutor,
4367 None,
4368 Some(&sink),
4369 &YamlWorkflowRunOptions::default(),
4370 )
4371 .await
4372 .expect("workflow should execute");
4373
4374 let events = sink
4375 .events
4376 .lock()
4377 .expect("recording sink lock should not be poisoned");
4378 let completed = events
4379 .iter()
4380 .find(|event| event.event_type == "workflow_completed")
4381 .expect("expected workflow_completed event");
4382 let metadata = completed
4383 .metadata
4384 .as_ref()
4385 .expect("workflow_completed should include metadata by default");
4386 let nerdstats = metadata
4387 .get("nerdstats")
4388 .expect("nerdstats should be present by default");
4389
4390 assert_eq!(nerdstats["workflow_id"], Value::String(output.workflow_id));
4391 assert_eq!(
4392 nerdstats["terminal_node"],
4393 Value::String(output.terminal_node)
4394 );
4395 assert_eq!(
4396 nerdstats["total_tokens"],
4397 Value::Number(output.total_tokens.into())
4398 );
4399 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
4400 assert_eq!(
4401 nerdstats["token_metrics_source"],
4402 Value::String("provider_usage".to_string())
4403 );
4404 assert_eq!(nerdstats["ttft_ms"], Value::Null);
4405 }
4406
4407 #[tokio::test]
4408 async fn workflow_completed_event_omits_nerdstats_when_disabled() {
4409 let yaml = r#"
4410id: nerdstats-disabled
4411entry_node: classify
4412nodes:
4413 - id: classify
4414 node_type:
4415 llm_call:
4416 model: gpt-4.1
4417 config:
4418 prompt: |
4419 Classify this email into exactly one category:
4420 {{ input.email_text }}
4421"#;
4422
4423 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4424 let sink = RecordingSink {
4425 events: Mutex::new(Vec::new()),
4426 };
4427 let options = YamlWorkflowRunOptions {
4428 telemetry: YamlWorkflowTelemetryConfig {
4429 nerdstats: false,
4430 ..YamlWorkflowTelemetryConfig::default()
4431 },
4432 ..YamlWorkflowRunOptions::default()
4433 };
4434
4435 run_workflow_yaml_with_custom_worker_and_events_and_options(
4436 &workflow,
4437 &json!({"email_text":"hello"}),
4438 &MockExecutor,
4439 None,
4440 Some(&sink),
4441 &options,
4442 )
4443 .await
4444 .expect("workflow should execute");
4445
4446 let events = sink
4447 .events
4448 .lock()
4449 .expect("recording sink lock should not be poisoned");
4450 let completed = events
4451 .iter()
4452 .find(|event| event.event_type == "workflow_completed")
4453 .expect("expected workflow_completed event");
4454 assert!(completed.metadata.is_none());
4455 }
4456
4457 struct StreamAwareMockExecutor;
4458
4459 #[async_trait]
4460 impl YamlWorkflowLlmExecutor for StreamAwareMockExecutor {
4461 async fn complete_structured(
4462 &self,
4463 request: YamlLlmExecutionRequest,
4464 _event_sink: Option<&dyn YamlWorkflowEventSink>,
4465 ) -> Result<YamlLlmExecutionResult, String> {
4466 Ok(YamlLlmExecutionResult {
4467 payload: json!({"state":"ok"}),
4468 usage: Some(YamlLlmTokenUsage {
4469 prompt_tokens: 20,
4470 completion_tokens: 10,
4471 total_tokens: 30,
4472 thinking_tokens: None,
4473 }),
4474 ttft_ms: if request.stream { Some(12) } else { None },
4475 tool_calls: Vec::new(),
4476 })
4477 }
4478 }
4479
4480 #[tokio::test]
4481 async fn workflow_completed_event_includes_nerdstats_for_streaming_nodes() {
4482 let yaml = r#"
4483id: nerdstats-streaming
4484entry_node: classify
4485nodes:
4486 - id: classify
4487 node_type:
4488 llm_call:
4489 model: gpt-4.1
4490 stream: true
4491 config:
4492 prompt: |
4493 Return JSON only:
4494 {"state":"ok"}
4495"#;
4496
4497 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4498 let sink = RecordingSink {
4499 events: Mutex::new(Vec::new()),
4500 };
4501
4502 run_workflow_yaml_with_custom_worker_and_events_and_options(
4503 &workflow,
4504 &json!({"email_text":"hello"}),
4505 &StreamAwareMockExecutor,
4506 None,
4507 Some(&sink),
4508 &YamlWorkflowRunOptions::default(),
4509 )
4510 .await
4511 .expect("workflow should execute");
4512
4513 let events = sink
4514 .events
4515 .lock()
4516 .expect("recording sink lock should not be poisoned");
4517 let completed = events
4518 .iter()
4519 .find(|event| event.event_type == "workflow_completed")
4520 .expect("expected workflow_completed event");
4521 let metadata = completed
4522 .metadata
4523 .as_ref()
4524 .expect("workflow_completed should include metadata by default");
4525 let nerdstats = metadata
4526 .get("nerdstats")
4527 .expect("nerdstats should be present by default");
4528
4529 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
4530 assert_eq!(nerdstats["total_tokens"], Value::Number(30u64.into()));
4531 assert_eq!(nerdstats["ttft_ms"], Value::Number(12u64.into()));
4532 }
4533
4534 #[test]
4535 fn workflow_nerdstats_marks_stream_token_metrics_unavailable() {
4536 let output = YamlWorkflowRunOutput {
4537 workflow_id: "workflow".to_string(),
4538 entry_node: "start".to_string(),
4539 email_text: "hello".to_string(),
4540 trace: vec!["llm_node".to_string()],
4541 outputs: BTreeMap::new(),
4542 terminal_node: "llm_node".to_string(),
4543 terminal_output: None,
4544 step_timings: vec![YamlStepTiming {
4545 node_id: "llm_node".to_string(),
4546 node_kind: "llm_call".to_string(),
4547 elapsed_ms: 100,
4548 prompt_tokens: None,
4549 completion_tokens: None,
4550 total_tokens: None,
4551 thinking_tokens: None,
4552 tokens_per_second: None,
4553 }],
4554 llm_node_metrics: BTreeMap::new(),
4555 total_elapsed_ms: 100,
4556 ttft_ms: None,
4557 total_input_tokens: 0,
4558 total_output_tokens: 0,
4559 total_tokens: 0,
4560 total_thinking_tokens: None,
4561 tokens_per_second: 0.0,
4562 trace_id: Some("trace-1".to_string()),
4563 metadata: None,
4564 };
4565
4566 let nerdstats = workflow_nerdstats(&output);
4567 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(false));
4568 assert_eq!(
4569 nerdstats["token_metrics_source"],
4570 Value::String("provider_stream_usage_unavailable".to_string())
4571 );
4572 assert_eq!(nerdstats["total_tokens"], Value::Null);
4573 assert_eq!(nerdstats["ttft_ms"], Value::Null);
4574 }
4575
4576 #[test]
4577 fn workflow_nerdstats_includes_ttft_when_available() {
4578 let output = YamlWorkflowRunOutput {
4579 workflow_id: "workflow".to_string(),
4580 entry_node: "start".to_string(),
4581 email_text: "hello".to_string(),
4582 trace: vec!["llm_node".to_string()],
4583 outputs: BTreeMap::new(),
4584 terminal_node: "llm_node".to_string(),
4585 terminal_output: None,
4586 step_timings: vec![YamlStepTiming {
4587 node_id: "llm_node".to_string(),
4588 node_kind: "llm_call".to_string(),
4589 elapsed_ms: 100,
4590 prompt_tokens: Some(10),
4591 completion_tokens: Some(15),
4592 total_tokens: Some(25),
4593 thinking_tokens: None,
4594 tokens_per_second: Some(150.0),
4595 }],
4596 llm_node_metrics: BTreeMap::new(),
4597 total_elapsed_ms: 100,
4598 ttft_ms: Some(42),
4599 total_input_tokens: 10,
4600 total_output_tokens: 15,
4601 total_tokens: 25,
4602 total_thinking_tokens: None,
4603 tokens_per_second: 150.0,
4604 trace_id: Some("trace-2".to_string()),
4605 metadata: None,
4606 };
4607
4608 let nerdstats = workflow_nerdstats(&output);
4609 assert_eq!(nerdstats["ttft_ms"], Value::Number(42u64.into()));
4610 }
4611
4612 struct MessageHistoryExecutor;
4613
4614 #[async_trait]
4615 impl YamlWorkflowLlmExecutor for MessageHistoryExecutor {
4616 async fn complete_structured(
4617 &self,
4618 request: YamlLlmExecutionRequest,
4619 _event_sink: Option<&dyn YamlWorkflowEventSink>,
4620 ) -> Result<YamlLlmExecutionResult, String> {
4621 let messages = request
4622 .messages
4623 .ok_or_else(|| "expected messages in request".to_string())?;
4624 if messages.len() != 2 {
4625 return Err(format!("expected 2 messages, got {}", messages.len()));
4626 }
4627 Ok(YamlLlmExecutionResult {
4628 payload: json!({"category":"termination","reason":"history"}),
4629 usage: Some(YamlLlmTokenUsage {
4630 prompt_tokens: 7,
4631 completion_tokens: 3,
4632 total_tokens: 10,
4633 thinking_tokens: None,
4634 }),
4635 ttft_ms: None,
4636 tool_calls: Vec::new(),
4637 })
4638 }
4639 }
4640
4641 #[tokio::test]
4642 async fn supports_messages_path_in_workflow_input() {
4643 let yaml = r#"
4644id: email-intake-classification
4645entry_node: classify_top_level
4646nodes:
4647 - id: classify_top_level
4648 node_type:
4649 llm_call:
4650 model: gpt-4.1
4651 messages_path: input.messages
4652 append_prompt_as_user: false
4653"#;
4654
4655 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4656 let input = json!({
4657 "email_text": "ignored",
4658 "messages": [
4659 {"role": "system", "content": "You are a classifier"},
4660 {"role": "user", "content": "Please classify this"}
4661 ]
4662 });
4663
4664 let output = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
4665 .await
4666 .expect("workflow should use chat history from input");
4667
4668 assert_eq!(output.terminal_node, "classify_top_level");
4669 assert_eq!(
4670 output.outputs["classify_top_level"]["output"]["reason"],
4671 Value::String("history".to_string())
4672 );
4673 }
4674
4675 #[tokio::test]
4676 async fn wrapper_entrypoints_produce_equivalent_outputs() {
4677 let yaml = r#"
4678id: wrapper-equivalence
4679entry_node: classify
4680nodes:
4681 - id: classify
4682 node_type:
4683 llm_call:
4684 model: gpt-4.1
4685 config:
4686 prompt: |
4687 Classify this email into exactly one category:
4688 {{ input.email_text }}
4689"#;
4690
4691 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4692 let input = json!({"email_text":"hello"});
4693
4694 let a = run_workflow_yaml(&workflow, &input, &MockExecutor)
4695 .await
4696 .expect("base entrypoint should execute");
4697 let b = run_workflow_yaml_with_custom_worker(&workflow, &input, &MockExecutor, None)
4698 .await
4699 .expect("custom worker wrapper should execute");
4700 let c = run_workflow_yaml_with_custom_worker_and_events_and_options(
4701 &workflow,
4702 &input,
4703 &MockExecutor,
4704 None,
4705 None,
4706 &YamlWorkflowRunOptions::default(),
4707 )
4708 .await
4709 .expect("events/options wrapper should execute");
4710
4711 assert_eq!(a.workflow_id, b.workflow_id);
4712 assert_eq!(a.workflow_id, c.workflow_id);
4713 assert_eq!(a.terminal_node, b.terminal_node);
4714 assert_eq!(a.terminal_node, c.terminal_node);
4715 assert_eq!(a.outputs, b.outputs);
4716 assert_eq!(a.outputs, c.outputs);
4717 assert_eq!(a.total_tokens, b.total_tokens);
4718 assert_eq!(a.total_tokens, c.total_tokens);
4719 }
4720
4721 #[tokio::test]
4722 async fn yaml_llm_tool_calling_captures_traces_and_supports_globals_reference() {
4723 let yaml = r#"
4724id: tool-calling-workflow
4725entry_node: generate_with_tool
4726nodes:
4727 - id: generate_with_tool
4728 node_type:
4729 llm_call:
4730 model: gpt-4.1
4731 tools_format: simplified
4732 max_tool_roundtrips: 1
4733 tool_calls_global_key: audit
4734 tools:
4735 - name: get_customer_context
4736 description: Fetch customer context
4737 input_schema:
4738 type: object
4739 properties:
4740 order_id: { type: string }
4741 required: [order_id]
4742 additionalProperties: false
4743 output_schema:
4744 type: object
4745 properties:
4746 customer_name: { type: string }
4747 required: [customer_name]
4748 additionalProperties: false
4749 config:
4750 output_schema:
4751 type: object
4752 properties:
4753 state: { type: string }
4754 required: [state]
4755 - id: personalize
4756 node_type:
4757 llm_call:
4758 model: gpt-4.1
4759 config:
4760 prompt: |
4761 Write an email greeting for {{ globals.audit.0.output.customer_name }}.
4762 output_schema:
4763 type: object
4764 properties:
4765 subject: { type: string }
4766 body: { type: string }
4767 required: [subject, body]
4768edges:
4769 - from: generate_with_tool
4770 to: personalize
4771"#;
4772
4773 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4774 let client = SimpleAgentsClientBuilder::new()
4775 .with_provider(Arc::new(ToolLoopProvider))
4776 .build()
4777 .expect("client should build");
4778 let worker = FixedToolWorker {
4779 payload: json!({"customer_name": "Ava"}),
4780 };
4781
4782 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4783 &workflow,
4784 &json!({"email_text":"hello"}),
4785 &client,
4786 Some(&worker),
4787 None,
4788 &YamlWorkflowRunOptions::default(),
4789 )
4790 .await
4791 .expect("workflow should execute");
4792
4793 assert_eq!(output.trace, vec!["generate_with_tool", "personalize"]);
4794 assert_eq!(
4795 output.outputs["generate_with_tool"]["tool_calls"][0]["output"]["customer_name"],
4796 Value::String("Ava".to_string())
4797 );
4798 let body = output.outputs["personalize"]["output"]["body"]
4799 .as_str()
4800 .expect("body should be string");
4801 assert!(body.contains("Ava"));
4802 }
4803
4804 #[tokio::test]
4805 async fn yaml_llm_tool_output_schema_mismatch_hard_fails_node() {
4806 let yaml = r#"
4807id: tool-calling-schema-fail
4808entry_node: generate_with_tool
4809nodes:
4810 - id: generate_with_tool
4811 node_type:
4812 llm_call:
4813 model: gpt-4.1
4814 tools_format: simplified
4815 max_tool_roundtrips: 1
4816 tools:
4817 - name: get_customer_context
4818 input_schema:
4819 type: object
4820 properties:
4821 order_id: { type: string }
4822 required: [order_id]
4823 additionalProperties: false
4824 output_schema:
4825 type: object
4826 properties:
4827 customer_name: { type: string }
4828 required: [customer_name]
4829 additionalProperties: false
4830 config:
4831 output_schema:
4832 type: object
4833 properties:
4834 state: { type: string }
4835 required: [state]
4836"#;
4837
4838 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4839 let client = SimpleAgentsClientBuilder::new()
4840 .with_provider(Arc::new(ToolLoopProvider))
4841 .build()
4842 .expect("client should build");
4843 let worker = FixedToolWorker {
4844 payload: json!({"unexpected": "shape"}),
4845 };
4846
4847 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4848 &workflow,
4849 &json!({"email_text":"hello"}),
4850 &client,
4851 Some(&worker),
4852 None,
4853 &YamlWorkflowRunOptions::default(),
4854 )
4855 .await
4856 .expect_err("workflow should hard-fail on schema mismatch");
4857
4858 match error {
4859 YamlWorkflowRunError::Llm { message, .. } => {
4860 assert!(message.contains("output failed schema validation"));
4861 }
4862 other => panic!("expected llm error, got {other:?}"),
4863 }
4864 }
4865
4866 #[test]
4867 fn validates_tools_format_mismatch() {
4868 let yaml = r#"
4869id: mismatch
4870entry_node: generate
4871nodes:
4872 - id: generate
4873 node_type:
4874 llm_call:
4875 model: gpt-4.1
4876 tools_format: openai
4877 tools:
4878 - name: get_customer_context
4879 input_schema:
4880 type: object
4881 properties:
4882 order_id: { type: string }
4883 required: [order_id]
4884"#;
4885
4886 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4887 let diagnostics = verify_yaml_workflow(&workflow);
4888 assert!(diagnostics
4889 .iter()
4890 .any(|diagnostic| diagnostic.code == "invalid_tools_format"));
4891 }
4892
4893 #[tokio::test]
4894 async fn custom_worker_receives_trace_context_block() {
4895 let yaml = r#"
4896id: custom-worker-trace-context
4897entry_node: lookup
4898nodes:
4899 - id: lookup
4900 node_type:
4901 custom_worker:
4902 handler: GetRagData
4903 config:
4904 payload:
4905 topic: demo
4906"#;
4907
4908 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4909 let worker = CapturingWorker {
4910 context: Mutex::new(None),
4911 };
4912 let options = YamlWorkflowRunOptions {
4913 trace: YamlWorkflowTraceOptions {
4914 context: Some(YamlWorkflowTraceContextInput {
4915 trace_id: Some("trace-fixed-ctx".to_string()),
4916 traceparent: Some("00-trace-fixed-ctx-span-fixed-01".to_string()),
4917 ..YamlWorkflowTraceContextInput::default()
4918 }),
4919 tenant: YamlWorkflowTraceTenantContext {
4920 conversation_id: Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5".to_string()),
4921 request_id: Some("turn-7".to_string()),
4922 ..YamlWorkflowTraceTenantContext::default()
4923 },
4924 },
4925 ..YamlWorkflowRunOptions::default()
4926 };
4927
4928 run_workflow_yaml_with_custom_worker_and_events_and_options(
4929 &workflow,
4930 &json!({"email_text":"hello"}),
4931 &MockExecutor,
4932 Some(&worker),
4933 None,
4934 &options,
4935 )
4936 .await
4937 .expect("workflow should execute");
4938
4939 let captured_context = worker
4940 .context
4941 .lock()
4942 .expect("capturing worker lock should not be poisoned")
4943 .clone()
4944 .expect("custom worker should receive context");
4945
4946 assert_eq!(
4947 captured_context
4948 .get("trace")
4949 .and_then(|trace| trace.get("context"))
4950 .and_then(|context| context.get("trace_id"))
4951 .and_then(Value::as_str),
4952 Some("trace-fixed-ctx")
4953 );
4954 assert_eq!(
4955 captured_context
4956 .get("trace")
4957 .and_then(|trace| trace.get("context"))
4958 .and_then(|context| context.get("traceparent"))
4959 .and_then(Value::as_str),
4960 Some("00-trace-fixed-ctx-span-fixed-01")
4961 );
4962 assert_eq!(
4963 captured_context
4964 .get("trace")
4965 .and_then(|trace| trace.get("tenant"))
4966 .and_then(|tenant| tenant.get("conversation_id"))
4967 .and_then(Value::as_str),
4968 Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5")
4969 );
4970 }
4971
4972 #[tokio::test]
4973 async fn event_sink_cancellation_stops_workflow_before_llm_execution() {
4974 let yaml = r#"
4975id: cancellation-test
4976entry_node: classify
4977nodes:
4978 - id: classify
4979 node_type:
4980 llm_call:
4981 model: gpt-4.1
4982 config:
4983 prompt: |
4984 Classify this email into exactly one category:
4985 {{ input.email_text }}
4986"#;
4987
4988 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4989 let executor = CountingExecutor {
4990 call_count: AtomicUsize::new(0),
4991 };
4992 let sink = CancelAfterFirstEventSink {
4993 cancelled: AtomicBool::new(false),
4994 };
4995
4996 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
4997 &workflow,
4998 &json!({"email_text":"hello"}),
4999 &executor,
5000 None,
5001 Some(&sink),
5002 &YamlWorkflowRunOptions::default(),
5003 )
5004 .await
5005 .expect_err("workflow should stop when sink signals cancellation");
5006
5007 assert!(matches!(
5008 err,
5009 YamlWorkflowRunError::EventSinkCancelled { .. }
5010 ));
5011 assert_eq!(executor.call_count.load(Ordering::SeqCst), 0);
5012 }
5013
5014 #[tokio::test]
5015 async fn rejects_invalid_messages_path_shape() {
5016 let yaml = r#"
5017id: email-intake-classification
5018entry_node: classify_top_level
5019nodes:
5020 - id: classify_top_level
5021 node_type:
5022 llm_call:
5023 model: gpt-4.1
5024 messages_path: input.messages
5025"#;
5026
5027 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5028 let input = json!({
5029 "email_text": "ignored",
5030 "messages": "not-a-list"
5031 });
5032
5033 let err = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
5034 .await
5035 .expect_err("workflow should fail for invalid messages shape");
5036
5037 assert!(matches!(err, YamlWorkflowRunError::Llm { .. }));
5038 }
5039
5040 #[test]
5041 fn renders_yaml_workflow_to_mermaid_with_switch_labels() {
5042 let yaml = r#"
5043id: chat-workflow
5044entry_node: decide
5045nodes:
5046 - id: decide
5047 node_type:
5048 switch:
5049 branches:
5050 - condition: '$.input.mode == "draft"'
5051 target: draft
5052 default: ask
5053 - id: draft
5054 node_type:
5055 llm_call:
5056 model: gpt-4.1
5057 - id: ask
5058 node_type:
5059 llm_call:
5060 model: gpt-4.1
5061edges:
5062 - from: draft
5063 to: ask
5064"#;
5065
5066 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5067 let mermaid = yaml_workflow_to_mermaid(&workflow);
5068
5069 assert!(mermaid.contains("flowchart TD"));
5070 assert!(mermaid.contains("decide -- \"route1\" --> draft"));
5071 assert!(mermaid.contains("decide -- \"default\" --> ask"));
5072 assert!(mermaid.contains("draft --> ask"));
5073 }
5074
5075 #[test]
5076 fn converts_yaml_workflow_to_ir_definition() {
5077 let yaml = r#"
5078id: chat-workflow
5079entry_node: classify
5080nodes:
5081 - id: classify
5082 node_type:
5083 llm_call:
5084 model: gpt-4.1
5085 config:
5086 prompt: |
5087 classify
5088 - id: route
5089 node_type:
5090 switch:
5091 branches:
5092 - condition: '$.nodes.classify.output.kind == "x"'
5093 target: done
5094 default: done
5095 - id: done
5096 node_type:
5097 custom_worker:
5098 handler: GetRagData
5099 config:
5100 payload:
5101 topic: test
5102edges:
5103 - from: classify
5104 to: route
5105"#;
5106
5107 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5108 let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
5109
5110 assert_eq!(ir.name, "chat-workflow");
5111 assert!(ir.nodes.iter().any(|n| n.id == "__yaml_start"));
5112 assert!(ir.nodes.iter().any(|n| n.id == "classify"));
5113 assert!(ir.nodes.iter().any(|n| n.id == "route"));
5114 assert!(ir.nodes.iter().any(|n| n.id == "done"));
5115 }
5116
5117 #[test]
5118 fn supports_yaml_to_ir_when_messages_path_is_used() {
5119 let yaml = r#"
5120id: chat-workflow
5121entry_node: classify
5122nodes:
5123 - id: classify
5124 node_type:
5125 llm_call:
5126 model: gpt-4.1
5127 messages_path: input.messages
5128"#;
5129
5130 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5131 let ir =
5132 yaml_workflow_to_ir(&workflow).expect("messages_path should convert to tool-based IR");
5133 assert!(ir.nodes.iter().any(|node| matches!(
5134 node.kind,
5135 crate::ir::NodeKind::Tool { ref tool, .. } if tool == "__yaml_llm_call"
5136 )));
5137 }
5138
5139 #[tokio::test]
5140 async fn workflow_output_contains_trace_id_in_both_locations() {
5141 let yaml = r#"
5142id: trace-test
5143entry_node: classify
5144nodes:
5145 - id: classify
5146 node_type:
5147 llm_call:
5148 model: gpt-4.1
5149 config:
5150 prompt: |
5151 Classify this email into exactly one category:
5152 {{ input.email_text }}
5153"#;
5154
5155 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5156 let output = run_workflow_yaml(&workflow, &json!({"email_text":"hello"}), &MockExecutor)
5157 .await
5158 .expect("workflow should execute");
5159
5160 let trace_id = output
5161 .trace_id
5162 .as_deref()
5163 .expect("trace_id should be present");
5164 assert!(!trace_id.is_empty());
5165 assert_eq!(
5166 output.metadata.as_ref().and_then(|value| {
5167 value
5168 .get("telemetry")
5169 .and_then(|telemetry| telemetry.get("trace_id"))
5170 .and_then(Value::as_str)
5171 }),
5172 Some(trace_id)
5173 );
5174 }
5175
5176 #[tokio::test]
5177 async fn workflow_run_options_use_explicit_trace_id_and_payload_mode() {
5178 let yaml = r#"
5179id: trace-options-test
5180entry_node: classify
5181nodes:
5182 - id: classify
5183 node_type:
5184 llm_call:
5185 model: gpt-4.1
5186 config:
5187 prompt: |
5188 Classify this email into exactly one category:
5189 {{ input.email_text }}
5190"#;
5191
5192 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5193 let options = YamlWorkflowRunOptions {
5194 telemetry: YamlWorkflowTelemetryConfig {
5195 payload_mode: YamlWorkflowPayloadMode::RedactedPayload,
5196 ..YamlWorkflowTelemetryConfig::default()
5197 },
5198 trace: YamlWorkflowTraceOptions {
5199 context: Some(YamlWorkflowTraceContextInput {
5200 trace_id: Some("trace-fixed-123".to_string()),
5201 traceparent: Some("00-trace-fixed-123-span-1-01".to_string()),
5202 ..YamlWorkflowTraceContextInput::default()
5203 }),
5204 tenant: YamlWorkflowTraceTenantContext {
5205 conversation_id: Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42".to_string()),
5206 ..YamlWorkflowTraceTenantContext::default()
5207 },
5208 },
5209 };
5210
5211 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5212 &workflow,
5213 &json!({"email_text":"hello"}),
5214 &MockExecutor,
5215 None,
5216 None,
5217 &options,
5218 )
5219 .await
5220 .expect("workflow should execute");
5221
5222 assert_eq!(output.trace_id.as_deref(), Some("trace-fixed-123"));
5223 assert_eq!(
5224 output
5225 .metadata
5226 .as_ref()
5227 .and_then(|value| value.get("telemetry"))
5228 .and_then(|telemetry| telemetry.get("payload_mode"))
5229 .and_then(Value::as_str),
5230 Some("redacted_payload")
5231 );
5232 assert_eq!(
5233 output
5234 .metadata
5235 .as_ref()
5236 .and_then(|value| value.get("trace"))
5237 .and_then(|trace| trace.get("tenant"))
5238 .and_then(|tenant| tenant.get("conversation_id"))
5239 .and_then(Value::as_str),
5240 Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42")
5241 );
5242 }
5243
5244 #[test]
5245 fn streamed_payload_parser_extracts_last_json_object() {
5246 let raw = r#"{"state":"missing_scenario","reason":"ok"}
5247
5248extra reasoning text
5249
5250{"state":"ready","reason":"final"}"#;
5251
5252 let resolved = parse_streamed_structured_payload(raw, false)
5253 .expect("parser should extract final JSON object");
5254 assert_eq!(resolved.payload["state"], "ready");
5255 assert!(resolved.heal_confidence.is_none());
5256 }
5257
5258 #[test]
5259 fn streamed_payload_parser_handles_unbalanced_reasoning_before_json() {
5260 let raw = "reasoning text with unmatched { braces and thoughts\n{\"state\":\"ready\",\"reason\":\"final\"}";
5261
5262 let resolved = parse_streamed_structured_payload(raw, false)
5263 .expect("parser should recover final structured JSON object");
5264 assert_eq!(resolved.payload["state"], "ready");
5265 }
5266
5267 #[test]
5268 fn streamed_payload_parser_handles_markdown_with_heal() {
5269 let raw = r#"Some preface
5270```json
5271{
5272 "state": "missing_scenario",
5273 "reason": "Need more details"
5274}
5275```
5276Some trailing explanation"#;
5277
5278 let resolved = parse_streamed_structured_payload(raw, true)
5279 .expect("heal path should parse JSON block");
5280 assert_eq!(resolved.payload["state"], "missing_scenario");
5281 assert!(resolved.heal_confidence.is_some());
5282 }
5283
5284 #[test]
5285 fn streamed_payload_parser_errors_when_no_json_candidate_exists() {
5286 let raw = "No JSON in this streamed output";
5287 let error = parse_streamed_structured_payload(raw, false)
5288 .expect_err("strict stream parse should fail without JSON candidate");
5289 assert!(error.contains("no JSON object candidate found"));
5290 }
5291
5292 #[test]
5293 fn include_raw_stream_debug_events_defaults_to_false() {
5294 let _guard = stream_debug_env_lock()
5295 .lock()
5296 .expect("stream debug env lock should not be poisoned");
5297 std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
5298 assert!(!include_raw_stream_debug_events());
5299 }
5300
5301 #[test]
5302 fn include_raw_stream_debug_events_accepts_truthy_values() {
5303 let _guard = stream_debug_env_lock()
5304 .lock()
5305 .expect("stream debug env lock should not be poisoned");
5306 std::env::set_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW", "true");
5307 assert!(include_raw_stream_debug_events());
5308 std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
5309 }
5310
5311 #[test]
5312 fn structured_json_delta_filter_strips_reasoning_prefix_and_suffix() {
5313 let mut filter = StructuredJsonDeltaFilter::default();
5314 let chunks = vec![
5315 "I will think first... ",
5316 "{\"state\":\"missing_scenario\",",
5317 "\"reason\":\"Need more details\"}",
5318 " additional commentary",
5319 ];
5320
5321 let filtered = chunks
5322 .into_iter()
5323 .filter_map(|chunk| filter.split(chunk).0)
5324 .collect::<String>();
5325
5326 assert_eq!(
5327 filtered,
5328 "{\"state\":\"missing_scenario\",\"reason\":\"Need more details\"}"
5329 );
5330 }
5331
5332 #[test]
5333 fn structured_json_delta_filter_handles_braces_inside_strings() {
5334 let mut filter = StructuredJsonDeltaFilter::default();
5335 let chunks = vec![
5336 "preface ",
5337 "{\"reason\":\"brace } in text\",\"state\":\"ok\"}",
5338 " trailing",
5339 ];
5340
5341 let filtered = chunks
5342 .into_iter()
5343 .filter_map(|chunk| filter.split(chunk).0)
5344 .collect::<String>();
5345
5346 assert_eq!(
5347 filtered,
5348 "{\"reason\":\"brace } in text\",\"state\":\"ok\"}"
5349 );
5350 }
5351
5352 #[test]
5353 fn render_json_object_as_text_converts_top_level_fields() {
5354 let rendered =
5355 render_json_object_as_text(r#"{"question":"q","confidence":0.8,"nested":{"a":1}}"#);
5356 let lines: std::collections::HashSet<&str> = rendered.lines().collect();
5357
5358 assert_eq!(lines.len(), 3);
5359 assert!(lines.contains("question: q"));
5360 assert!(lines.contains("confidence: 0.8"));
5361 assert!(lines.contains("nested: {\"a\":1}"));
5362 }
5363
5364 #[test]
5365 fn stream_json_as_text_formatter_emits_once_when_complete() {
5366 let mut formatter = StreamJsonAsTextFormatter::default();
5367 formatter.push("{\"question\":\"hello\"}");
5368
5369 let first = formatter.emit_if_ready(true);
5370 let second = formatter.emit_if_ready(true);
5371
5372 assert_eq!(first, Some("question: hello".to_string()));
5373 assert_eq!(second, None);
5374 }
5375
5376 #[test]
5377 fn rewrite_yaml_condition_preserves_output_prefix_in_field_names() {
5378 let expr = "$.nodes.classify.output.output_total == 1";
5379 let rewritten = rewrite_yaml_condition_to_ir(expr);
5380 assert_eq!(rewritten, "$.node_outputs.classify.output_total == 1");
5381 }
5382
5383 #[tokio::test]
5384 async fn validates_workflow_input_before_ir_runtime_path() {
5385 let yaml = r#"
5386id: chat-workflow
5387entry_node: classify
5388nodes:
5389 - id: classify
5390 node_type:
5391 llm_call:
5392 model: gpt-4.1
5393 config:
5394 prompt: |
5395 classify
5396"#;
5397
5398 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5399 let err = run_workflow_yaml(&workflow, &json!("not-an-object"), &MockExecutor)
5400 .await
5401 .expect_err("non-object input should fail before execution");
5402
5403 assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
5404 }
5405
5406 #[test]
5407 fn interpolate_template_supports_dollar_prefixed_paths() {
5408 let context = json!({
5409 "input": {
5410 "email_text": "hello"
5411 }
5412 });
5413
5414 let rendered = interpolate_template("value={{ $.input.email_text }}", &context);
5415 assert_eq!(rendered, "value=hello");
5416 }
5417}