Skip to main content

simple_agents_workflow/yaml_runner/
contracts.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use serde::{ser::SerializeStruct, Deserialize, Serialize};
5use serde_json::Value;
6use simple_agent_type::message::{MessageContent, Role};
7use simple_agent_type::tool::{ToolChoice, ToolType};
8use thiserror::Error;
9
10use super::{TraceContext, YamlToolTraceMode, YamlWorkflowTraceTenantContext};
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "snake_case")]
14pub enum YamlWorkflowTokenKind {
15    Output,
16    Thinking,
17}
18
19#[derive(Debug, Clone, PartialEq, Serialize)]
20pub struct YamlWorkflowEvent {
21    pub event_type: String,
22    #[serde(skip_serializing_if = "Option::is_none")]
23    pub node_id: Option<String>,
24    #[serde(skip_serializing_if = "Option::is_none")]
25    pub step_id: Option<String>,
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub node_kind: Option<String>,
28    #[serde(skip_serializing_if = "Option::is_none")]
29    pub streamable: Option<bool>,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub message: Option<String>,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub delta: Option<String>,
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub snapshot: Option<Value>,
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub token_kind: Option<YamlWorkflowTokenKind>,
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub is_terminal_node_token: Option<bool>,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub elapsed_ms: Option<u128>,
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub metadata: Option<Value>,
44}
45
46pub type WorkflowMessageRole = Role;
47
48#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
49pub struct WorkflowMessage {
50    pub role: WorkflowMessageRole,
51    pub content: MessageContent,
52    #[serde(default)]
53    pub name: Option<String>,
54    #[serde(default, alias = "toolCallId")]
55    pub tool_call_id: Option<String>,
56}
57
58#[derive(Debug, Clone, PartialEq, Serialize)]
59pub struct YamlTemplateBinding {
60    pub index: usize,
61    pub expression: String,
62    pub source_path: String,
63    pub resolved: Value,
64    pub resolved_type: String,
65    pub missing: bool,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
69pub enum YamlWorkflowDiagnosticSeverity {
70    Error,
71    Warning,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
75pub struct YamlWorkflowDiagnostic {
76    pub node_id: Option<String>,
77    pub code: String,
78    pub severity: YamlWorkflowDiagnosticSeverity,
79    pub message: String,
80}
81
82#[derive(Debug, Error)]
83pub enum YamlWorkflowRunError {
84    #[error("failed to read workflow yaml '{path}': {source}")]
85    Read {
86        path: String,
87        source: std::io::Error,
88    },
89    #[error("failed to parse workflow yaml '{path}': {source}")]
90    Parse {
91        path: String,
92        source: serde_yaml::Error,
93    },
94    #[error("rejected workflow yaml '{path}': {reason}")]
95    FileRejected { path: String, reason: String },
96    #[error("workflow '{workflow_id}' has no nodes")]
97    EmptyNodes { workflow_id: String },
98    #[error("entry node '{entry_node}' does not exist")]
99    MissingEntry { entry_node: String },
100    #[error("unknown node id '{node_id}'")]
101    MissingNode { node_id: String },
102    #[error("unsupported node type in '{node_id}'")]
103    UnsupportedNodeType { node_id: String },
104    #[error("unsupported switch condition format: {condition}")]
105    UnsupportedCondition { condition: String },
106    #[error("switch node '{node_id}' has no valid next target")]
107    InvalidSwitchTarget { node_id: String },
108    #[error("llm returned non-object payload for node '{node_id}'")]
109    LlmPayloadNotObject { node_id: String },
110    #[error("custom worker handler '{handler}' is not supported")]
111    UnsupportedCustomHandler { handler: String },
112    #[error("llm execution failed for node '{node_id}': {message}")]
113    Llm { node_id: String, message: String },
114    #[error("custom worker execution failed for node '{node_id}': {message}")]
115    CustomWorker { node_id: String, message: String },
116    #[error("workflow validation failed with {diagnostics_count} error(s)")]
117    Validation {
118        diagnostics_count: usize,
119        diagnostics: Vec<YamlWorkflowDiagnostic>,
120    },
121    #[error("invalid workflow input: {message}")]
122    InvalidInput { message: String },
123    #[error("ir runtime execution failed: {message}")]
124    IrRuntime { message: String },
125    #[error("workflow event stream cancelled: {message}")]
126    EventSinkCancelled { message: String },
127}
128
129impl Serialize for YamlWorkflowRunError {
130    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
131    where
132        S: serde::Serializer,
133    {
134        let mut state = serializer.serialize_struct("YamlWorkflowRunError", 2)?;
135        state.serialize_field("code", yaml_workflow_run_error_code(self))?;
136        state.serialize_field("message", &self.to_string())?;
137        state.end()
138    }
139}
140
141fn yaml_workflow_run_error_code(error: &YamlWorkflowRunError) -> &'static str {
142    match error {
143        YamlWorkflowRunError::Read { .. } => "read_failed",
144        YamlWorkflowRunError::Parse { .. } => "parse_failed",
145        YamlWorkflowRunError::FileRejected { .. } => "file_rejected",
146        YamlWorkflowRunError::EmptyNodes { .. } => "empty_nodes",
147        YamlWorkflowRunError::MissingEntry { .. } => "missing_entry",
148        YamlWorkflowRunError::MissingNode { .. } => "missing_node",
149        YamlWorkflowRunError::UnsupportedNodeType { .. } => "unsupported_node_type",
150        YamlWorkflowRunError::UnsupportedCondition { .. } => "unsupported_condition",
151        YamlWorkflowRunError::InvalidSwitchTarget { .. } => "invalid_switch_target",
152        YamlWorkflowRunError::LlmPayloadNotObject { .. } => "llm_payload_not_object",
153        YamlWorkflowRunError::UnsupportedCustomHandler { .. } => "unsupported_custom_handler",
154        YamlWorkflowRunError::Llm { .. } => "llm_failed",
155        YamlWorkflowRunError::CustomWorker { .. } => "custom_worker_failed",
156        YamlWorkflowRunError::Validation { .. } => "validation_failed",
157        YamlWorkflowRunError::InvalidInput { .. } => "invalid_input",
158        YamlWorkflowRunError::IrRuntime { .. } => "ir_runtime_failed",
159        YamlWorkflowRunError::EventSinkCancelled { .. } => "event_sink_cancelled",
160    }
161}
162
163pub trait YamlWorkflowEventSink: Send + Sync {
164    fn emit(&self, event: &YamlWorkflowEvent);
165
166    fn is_cancelled(&self) -> bool {
167        false
168    }
169}
170
171/// Returns true for token delta events that should be gated by `workflow_streaming`.
172pub fn is_workflow_stream_delta_event(event_type: &str) -> bool {
173    matches!(
174        event_type,
175        "node_stream_delta"
176            | "node_stream_thinking_delta"
177            | "node_stream_output_delta"
178            | "node_stream_snapshot"
179    )
180}
181
182/// Forwards events to `inner` unless `workflow_streaming` is false and the event is a stream delta.
183pub struct YamlWorkflowStreamFilterSink<'a> {
184    inner: &'a dyn YamlWorkflowEventSink,
185    workflow_streaming: bool,
186}
187
188impl<'a> YamlWorkflowStreamFilterSink<'a> {
189    pub fn new(inner: &'a dyn YamlWorkflowEventSink, workflow_streaming: bool) -> Self {
190        Self {
191            inner,
192            workflow_streaming,
193        }
194    }
195}
196
197impl YamlWorkflowEventSink for YamlWorkflowStreamFilterSink<'_> {
198    fn emit(&self, event: &YamlWorkflowEvent) {
199        if !self.workflow_streaming && is_workflow_stream_delta_event(event.event_type.as_str()) {
200            return;
201        }
202        self.inner.emit(event);
203    }
204
205    fn is_cancelled(&self) -> bool {
206        self.inner.is_cancelled()
207    }
208}
209
210pub struct NoopYamlWorkflowEventSink;
211
212impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
213    fn emit(&self, _event: &YamlWorkflowEvent) {}
214}
215
216pub(super) fn workflow_event_sink_cancelled_message() -> &'static str {
217    "workflow event callback cancelled"
218}
219
220pub(super) fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
221    event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
222}
223
224#[derive(Debug, Clone, PartialEq, Eq, Error)]
225pub enum YamlToIrError {
226    #[error("entry node '{entry_node}' does not exist")]
227    MissingEntry { entry_node: String },
228    #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
229    MultipleOutgoingEdge { node_id: String },
230    #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
231    UnsupportedNode { node_id: String, reason: String },
232}
233
234#[derive(Debug, Clone)]
235pub struct YamlLlmExecutionRequest {
236    pub node_id: String,
237    pub is_terminal_node: bool,
238    pub stream_json_as_text: bool,
239    pub model: String,
240    pub max_tokens: Option<u32>,
241    pub temperature: Option<f32>,
242    pub top_p: Option<f32>,
243    pub messages: Option<Vec<super::Message>>,
244    pub user_input_prompt: Option<String>,
245    pub user_input_prompt_template: Option<String>,
246    pub user_input_prompt_bindings: Vec<YamlTemplateBinding>,
247    pub node_system_prompt: Option<String>,
248    pub node_system_prompt_template: Option<String>,
249    pub node_system_prompt_bindings: Vec<YamlTemplateBinding>,
250    pub schema: Value,
251    pub stream: bool,
252    pub heal: bool,
253    pub send_schema: bool,
254    pub tools: Vec<YamlResolvedTool>,
255    pub tool_choice: Option<ToolChoice>,
256    pub max_tool_roundtrips: u8,
257    pub tool_calls_global_key: Option<String>,
258    pub tool_trace_mode: YamlToolTraceMode,
259    pub execution_context: Value,
260    pub trace_id: Option<String>,
261    pub trace_context: Option<TraceContext>,
262    pub tenant_context: YamlWorkflowTraceTenantContext,
263    pub trace_sampled: bool,
264    /// Split thinking/output stream events (mirrors [`YamlWorkflowExecutionFlags::split_stream_deltas`]).
265    pub split_stream_deltas: bool,
266    /// Include partial streamed text in structured JSON parse error messages when debugging.
267    pub debug_stream_parse: bool,
268}
269
270#[derive(Debug, Clone)]
271pub struct YamlResolvedTool {
272    pub definition: super::ToolDefinition,
273    pub output_schema: Option<Value>,
274}
275
276#[async_trait]
277pub trait YamlWorkflowLlmExecutor: Send + Sync {
278    async fn complete_structured(
279        &self,
280        request: YamlLlmExecutionRequest,
281        event_sink: Option<&dyn YamlWorkflowEventSink>,
282    ) -> Result<super::YamlLlmExecutionResult, String>;
283}
284
285#[async_trait]
286pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
287    async fn execute(
288        &self,
289        handler: &str,
290        handler_file: Option<&str>,
291        payload: &Value,
292        context: &Value,
293    ) -> Result<Value, String>;
294}
295
296#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
297#[serde(rename_all = "snake_case")]
298pub enum YamlGlobalUpdateOp {
299    Set,
300    Append,
301    Increment,
302    Merge,
303}
304
305impl YamlGlobalUpdateOp {
306    pub fn as_str(self) -> &'static str {
307        match self {
308            YamlGlobalUpdateOp::Set => "set",
309            YamlGlobalUpdateOp::Append => "append",
310            YamlGlobalUpdateOp::Increment => "increment",
311            YamlGlobalUpdateOp::Merge => "merge",
312        }
313    }
314}
315
316#[derive(Debug, Clone, Deserialize)]
317#[serde(deny_unknown_fields)]
318pub struct YamlGlobalUpdate {
319    pub op: YamlGlobalUpdateOp,
320    pub from: Option<String>,
321    pub by: Option<f64>,
322}
323
324#[derive(Debug, Clone, Deserialize)]
325#[serde(deny_unknown_fields)]
326pub struct YamlNodeConfig {
327    pub user_input_prompt: Option<String>,
328    pub node_system_prompt: Option<String>,
329    #[serde(default, alias = "schema")]
330    pub output_schema: Option<Value>,
331    pub payload: Option<Value>,
332    pub set_globals: Option<HashMap<String, String>>,
333    pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
334}
335
336#[derive(Debug, Clone, Deserialize)]
337#[serde(deny_unknown_fields)]
338pub struct YamlCustomWorker {
339    pub handler: String,
340    pub handler_file: Option<String>,
341}
342
343#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
344#[serde(rename_all = "snake_case")]
345pub enum YamlHumanInputType {
346    Choice,
347    Text,
348    Form,
349}
350
351#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
352#[serde(deny_unknown_fields)]
353pub struct YamlHumanInputOption {
354    pub value: String,
355    #[serde(default)]
356    pub label: Option<String>,
357}
358
359#[derive(Debug, Clone, Deserialize)]
360#[serde(deny_unknown_fields)]
361pub struct YamlHumanInput {
362    pub input_type: YamlHumanInputType,
363    pub prompt: Option<String>,
364    pub options: Option<Vec<YamlHumanInputOption>>,
365    pub form_schema: Option<Value>,
366    pub form_prefill: Option<String>,
367    pub timeout_seconds: Option<u64>,
368}
369
370#[derive(Debug, Clone, Deserialize)]
371#[serde(deny_unknown_fields)]
372pub struct YamlSwitchBranch {
373    pub condition: String,
374    pub target: String,
375}
376
377#[derive(Debug, Clone, Deserialize)]
378#[serde(deny_unknown_fields)]
379pub struct YamlSwitch {
380    #[serde(default)]
381    pub branches: Vec<YamlSwitchBranch>,
382    pub default: String,
383}
384
385#[derive(Debug, Clone, Deserialize)]
386#[serde(untagged)]
387pub enum YamlToolChoiceConfig {
388    Mode(super::ToolChoiceMode),
389    Function(YamlToolChoiceFunction),
390    OpenAi(super::ToolChoiceTool),
391}
392
393#[derive(Debug, Clone, Deserialize)]
394#[serde(deny_unknown_fields)]
395pub struct YamlToolChoiceFunction {
396    pub function: String,
397}
398
399#[derive(Debug, Clone, Deserialize)]
400#[serde(deny_unknown_fields)]
401pub struct YamlSimplifiedToolDeclaration {
402    pub name: String,
403    pub description: Option<String>,
404    pub input_schema: Value,
405    pub output_schema: Option<Value>,
406}
407
408#[derive(Debug, Clone, Deserialize)]
409#[serde(deny_unknown_fields)]
410pub struct YamlOpenAiToolFunction {
411    pub name: String,
412    pub description: Option<String>,
413    pub parameters: Option<Value>,
414    pub output_schema: Option<Value>,
415}
416
417#[derive(Debug, Clone, Deserialize)]
418#[serde(deny_unknown_fields)]
419pub struct YamlOpenAiToolDeclaration {
420    #[serde(rename = "type")]
421    pub tool_type: Option<ToolType>,
422    pub function: YamlOpenAiToolFunction,
423}
424
425#[derive(Debug, Clone, Deserialize)]
426#[serde(untagged)]
427pub enum YamlToolDeclaration {
428    OpenAi(YamlOpenAiToolDeclaration),
429    Simplified(YamlSimplifiedToolDeclaration),
430}
431
432#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
433#[serde(rename_all = "snake_case")]
434pub enum YamlToolFormat {
435    #[default]
436    Openai,
437    Simplified,
438}
439
440#[derive(Debug, Clone, Deserialize)]
441#[serde(deny_unknown_fields)]
442pub struct YamlLlmCall {
443    pub model: String,
444    pub max_tokens: Option<u32>,
445    pub temperature: Option<f32>,
446    pub top_p: Option<f32>,
447    pub stream: Option<bool>,
448    pub stream_json_as_text: Option<bool>,
449    pub heal: Option<bool>,
450    pub send_schema: Option<bool>,
451    pub messages_path: Option<String>,
452    #[serde(default)]
453    pub tools_format: YamlToolFormat,
454    #[serde(default)]
455    pub tools: Vec<YamlToolDeclaration>,
456    pub tool_choice: Option<YamlToolChoiceConfig>,
457    pub max_tool_roundtrips: Option<u8>,
458    pub tool_calls_global_key: Option<String>,
459}
460
461#[derive(Debug, Clone, Deserialize)]
462#[serde(deny_unknown_fields)]
463pub struct YamlNodeType {
464    pub llm_call: Option<YamlLlmCall>,
465    pub switch: Option<YamlSwitch>,
466    pub custom_worker: Option<YamlCustomWorker>,
467    pub human_input: Option<YamlHumanInput>,
468    pub end: Option<Value>,
469}
470
471#[derive(Debug, Clone, Deserialize)]
472#[serde(deny_unknown_fields)]
473pub struct YamlNode {
474    pub id: String,
475    #[serde(default)]
476    pub name: Option<String>,
477    pub node_type: YamlNodeType,
478    pub config: Option<YamlNodeConfig>,
479}
480
481impl YamlNode {
482    pub(super) fn kind_name(&self) -> &'static str {
483        if self.node_type.llm_call.is_some() {
484            "llm_call"
485        } else if self.node_type.switch.is_some() {
486            "switch"
487        } else if self.node_type.custom_worker.is_some() {
488            "custom_worker"
489        } else if self.node_type.human_input.is_some() {
490            "human_input"
491        } else if self.node_type.end.is_some() {
492            "end"
493        } else {
494            "unknown"
495        }
496    }
497}
498
499#[derive(Debug, Clone, Deserialize)]
500#[serde(deny_unknown_fields)]
501pub struct YamlEdge {
502    pub from: String,
503    pub to: String,
504}
505
506#[derive(Debug, Clone, Deserialize)]
507#[serde(deny_unknown_fields)]
508pub struct YamlWorkflow {
509    pub id: String,
510    #[serde(default)]
511    pub version: Option<String>,
512    #[serde(default)]
513    pub metadata: Option<HashMap<String, Value>>,
514    pub entry_node: String,
515    #[serde(default)]
516    pub nodes: Vec<YamlNode>,
517    #[serde(default)]
518    pub edges: Vec<YamlEdge>,
519}