Skip to main content

simple_agents_workflow/yaml_runner/
contracts.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use simple_agent_type::message::{MessageContent, Role};
7use simple_agent_type::tool::{ToolChoice, ToolType};
8use thiserror::Error;
9
10use super::{TraceContext, YamlToolTraceMode, YamlWorkflowTraceTenantContext};
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "snake_case")]
14pub enum YamlWorkflowTokenKind {
15    Output,
16    Thinking,
17}
18
19#[derive(Debug, Clone, PartialEq, Serialize)]
20pub struct YamlWorkflowEvent {
21    pub event_type: String,
22    #[serde(skip_serializing_if = "Option::is_none")]
23    pub node_id: Option<String>,
24    #[serde(skip_serializing_if = "Option::is_none")]
25    pub step_id: Option<String>,
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub node_kind: Option<String>,
28    #[serde(skip_serializing_if = "Option::is_none")]
29    pub streamable: Option<bool>,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub message: Option<String>,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub delta: Option<String>,
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub snapshot: Option<Value>,
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub token_kind: Option<YamlWorkflowTokenKind>,
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub is_terminal_node_token: Option<bool>,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub elapsed_ms: Option<u128>,
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub metadata: Option<Value>,
44}
45
46pub type WorkflowMessageRole = Role;
47
48#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
49pub struct WorkflowMessage {
50    pub role: WorkflowMessageRole,
51    pub content: MessageContent,
52    #[serde(default)]
53    pub name: Option<String>,
54    #[serde(default, alias = "toolCallId")]
55    pub tool_call_id: Option<String>,
56}
57
58#[derive(Debug, Clone, PartialEq, Serialize)]
59pub struct YamlTemplateBinding {
60    pub index: usize,
61    pub expression: String,
62    pub source_path: String,
63    pub resolved: Value,
64    pub resolved_type: String,
65    pub missing: bool,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
69pub enum YamlWorkflowDiagnosticSeverity {
70    Error,
71    Warning,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
75pub struct YamlWorkflowDiagnostic {
76    pub node_id: Option<String>,
77    pub code: String,
78    pub severity: YamlWorkflowDiagnosticSeverity,
79    pub message: String,
80}
81
82#[derive(Debug, Error)]
83pub enum YamlWorkflowRunError {
84    #[error("failed to read workflow yaml '{path}': {source}")]
85    Read {
86        path: String,
87        source: std::io::Error,
88    },
89    #[error("failed to parse workflow yaml '{path}': {source}")]
90    Parse {
91        path: String,
92        source: serde_yaml::Error,
93    },
94    #[error("rejected workflow yaml '{path}': {reason}")]
95    FileRejected { path: String, reason: String },
96    #[error("workflow '{workflow_id}' has no nodes")]
97    EmptyNodes { workflow_id: String },
98    #[error("entry node '{entry_node}' does not exist")]
99    MissingEntry { entry_node: String },
100    #[error("unknown node id '{node_id}'")]
101    MissingNode { node_id: String },
102    #[error("unsupported node type in '{node_id}'")]
103    UnsupportedNodeType { node_id: String },
104    #[error("unsupported switch condition format: {condition}")]
105    UnsupportedCondition { condition: String },
106    #[error("switch node '{node_id}' has no valid next target")]
107    InvalidSwitchTarget { node_id: String },
108    #[error("llm returned non-object payload for node '{node_id}'")]
109    LlmPayloadNotObject { node_id: String },
110    #[error("custom worker handler '{handler}' is not supported")]
111    UnsupportedCustomHandler { handler: String },
112    #[error("llm execution failed for node '{node_id}': {message}")]
113    Llm { node_id: String, message: String },
114    #[error("custom worker execution failed for node '{node_id}': {message}")]
115    CustomWorker { node_id: String, message: String },
116    #[error("workflow validation failed with {diagnostics_count} error(s)")]
117    Validation {
118        diagnostics_count: usize,
119        diagnostics: Vec<YamlWorkflowDiagnostic>,
120    },
121    #[error("invalid workflow input: {message}")]
122    InvalidInput { message: String },
123    #[error("ir runtime execution failed: {message}")]
124    IrRuntime { message: String },
125    #[error("workflow event stream cancelled: {message}")]
126    EventSinkCancelled { message: String },
127}
128
129pub trait YamlWorkflowEventSink: Send + Sync {
130    fn emit(&self, event: &YamlWorkflowEvent);
131
132    fn is_cancelled(&self) -> bool {
133        false
134    }
135}
136
137/// Returns true for token delta events that should be gated by `workflow_streaming`.
138pub fn is_workflow_stream_delta_event(event_type: &str) -> bool {
139    matches!(
140        event_type,
141        "node_stream_delta"
142            | "node_stream_thinking_delta"
143            | "node_stream_output_delta"
144            | "node_stream_snapshot"
145    )
146}
147
148/// Forwards events to `inner` unless `workflow_streaming` is false and the event is a stream delta.
149pub struct YamlWorkflowStreamFilterSink<'a> {
150    inner: &'a dyn YamlWorkflowEventSink,
151    workflow_streaming: bool,
152}
153
154impl<'a> YamlWorkflowStreamFilterSink<'a> {
155    pub fn new(inner: &'a dyn YamlWorkflowEventSink, workflow_streaming: bool) -> Self {
156        Self {
157            inner,
158            workflow_streaming,
159        }
160    }
161}
162
163impl YamlWorkflowEventSink for YamlWorkflowStreamFilterSink<'_> {
164    fn emit(&self, event: &YamlWorkflowEvent) {
165        if !self.workflow_streaming && is_workflow_stream_delta_event(event.event_type.as_str()) {
166            return;
167        }
168        self.inner.emit(event);
169    }
170
171    fn is_cancelled(&self) -> bool {
172        self.inner.is_cancelled()
173    }
174}
175
176pub struct NoopYamlWorkflowEventSink;
177
178impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
179    fn emit(&self, _event: &YamlWorkflowEvent) {}
180}
181
182pub(super) fn workflow_event_sink_cancelled_message() -> &'static str {
183    "workflow event callback cancelled"
184}
185
186pub(super) fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
187    event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
188}
189
190#[derive(Debug, Clone, PartialEq, Eq, Error)]
191pub enum YamlToIrError {
192    #[error("entry node '{entry_node}' does not exist")]
193    MissingEntry { entry_node: String },
194    #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
195    MultipleOutgoingEdge { node_id: String },
196    #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
197    UnsupportedNode { node_id: String, reason: String },
198}
199
200#[derive(Debug, Clone)]
201pub struct YamlLlmExecutionRequest {
202    pub node_id: String,
203    pub is_terminal_node: bool,
204    pub stream_json_as_text: bool,
205    pub model: String,
206    pub max_tokens: Option<u32>,
207    pub temperature: Option<f32>,
208    pub top_p: Option<f32>,
209    pub messages: Option<Vec<super::Message>>,
210    pub append_prompt_as_user: bool,
211    pub prompt: String,
212    pub prompt_template: String,
213    pub prompt_bindings: Vec<YamlTemplateBinding>,
214    pub schema: Value,
215    pub stream: bool,
216    pub heal: bool,
217    pub send_schema: bool,
218    pub tools: Vec<YamlResolvedTool>,
219    pub tool_choice: Option<ToolChoice>,
220    pub max_tool_roundtrips: u8,
221    pub tool_calls_global_key: Option<String>,
222    pub tool_trace_mode: YamlToolTraceMode,
223    pub execution_context: Value,
224    pub trace_id: Option<String>,
225    pub trace_context: Option<TraceContext>,
226    pub tenant_context: YamlWorkflowTraceTenantContext,
227    pub trace_sampled: bool,
228    /// Split thinking/output stream events (mirrors [`YamlWorkflowExecutionFlags::split_stream_deltas`]).
229    pub split_stream_deltas: bool,
230}
231
232#[derive(Debug, Clone)]
233pub struct YamlResolvedTool {
234    pub definition: super::ToolDefinition,
235    pub output_schema: Option<Value>,
236}
237
238#[async_trait]
239pub trait YamlWorkflowLlmExecutor: Send + Sync {
240    async fn complete_structured(
241        &self,
242        request: YamlLlmExecutionRequest,
243        event_sink: Option<&dyn YamlWorkflowEventSink>,
244    ) -> Result<super::YamlLlmExecutionResult, String>;
245}
246
247#[async_trait]
248pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
249    async fn execute(
250        &self,
251        handler: &str,
252        handler_file: Option<&str>,
253        payload: &Value,
254        context: &Value,
255    ) -> Result<Value, String>;
256}
257
258#[derive(Debug, Clone, Deserialize)]
259#[serde(deny_unknown_fields)]
260pub struct YamlGlobalUpdate {
261    pub op: String,
262    pub from: Option<String>,
263    pub by: Option<f64>,
264}
265
266#[derive(Debug, Clone, Deserialize)]
267#[serde(deny_unknown_fields)]
268pub struct YamlNodeConfig {
269    pub prompt: Option<String>,
270    #[serde(default, alias = "schema")]
271    pub output_schema: Option<Value>,
272    pub payload: Option<Value>,
273    pub set_globals: Option<HashMap<String, String>>,
274    pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
275}
276
277#[derive(Debug, Clone, Deserialize)]
278#[serde(deny_unknown_fields)]
279pub struct YamlCustomWorker {
280    pub handler: String,
281    pub handler_file: Option<String>,
282}
283
284#[derive(Debug, Clone, Deserialize)]
285#[serde(deny_unknown_fields)]
286pub struct YamlSwitchBranch {
287    pub condition: String,
288    pub target: String,
289}
290
291#[derive(Debug, Clone, Deserialize)]
292#[serde(deny_unknown_fields)]
293pub struct YamlSwitch {
294    #[serde(default)]
295    pub branches: Vec<YamlSwitchBranch>,
296    pub default: String,
297}
298
299#[derive(Debug, Clone, Deserialize)]
300#[serde(untagged)]
301pub enum YamlToolChoiceConfig {
302    Mode(super::ToolChoiceMode),
303    Function(YamlToolChoiceFunction),
304    OpenAi(super::ToolChoiceTool),
305}
306
307#[derive(Debug, Clone, Deserialize)]
308#[serde(deny_unknown_fields)]
309pub struct YamlToolChoiceFunction {
310    pub function: String,
311}
312
313#[derive(Debug, Clone, Deserialize)]
314#[serde(deny_unknown_fields)]
315pub struct YamlSimplifiedToolDeclaration {
316    pub name: String,
317    pub description: Option<String>,
318    pub input_schema: Value,
319    pub output_schema: Option<Value>,
320}
321
322#[derive(Debug, Clone, Deserialize)]
323#[serde(deny_unknown_fields)]
324pub struct YamlOpenAiToolFunction {
325    pub name: String,
326    pub description: Option<String>,
327    pub parameters: Option<Value>,
328    pub output_schema: Option<Value>,
329}
330
331#[derive(Debug, Clone, Deserialize)]
332#[serde(deny_unknown_fields)]
333pub struct YamlOpenAiToolDeclaration {
334    #[serde(rename = "type")]
335    pub tool_type: Option<ToolType>,
336    pub function: YamlOpenAiToolFunction,
337}
338
339#[derive(Debug, Clone, Deserialize)]
340#[serde(untagged)]
341pub enum YamlToolDeclaration {
342    OpenAi(YamlOpenAiToolDeclaration),
343    Simplified(YamlSimplifiedToolDeclaration),
344}
345
346#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
347#[serde(rename_all = "snake_case")]
348pub enum YamlToolFormat {
349    #[default]
350    Openai,
351    Simplified,
352}
353
354#[derive(Debug, Clone, Deserialize)]
355#[serde(deny_unknown_fields)]
356pub struct YamlLlmCall {
357    pub model: String,
358    pub max_tokens: Option<u32>,
359    pub temperature: Option<f32>,
360    pub top_p: Option<f32>,
361    pub stream: Option<bool>,
362    pub stream_json_as_text: Option<bool>,
363    pub heal: Option<bool>,
364    pub send_schema: Option<bool>,
365    pub messages_path: Option<String>,
366    pub append_prompt_as_user: Option<bool>,
367    #[serde(default)]
368    pub tools_format: YamlToolFormat,
369    #[serde(default)]
370    pub tools: Vec<YamlToolDeclaration>,
371    pub tool_choice: Option<YamlToolChoiceConfig>,
372    pub max_tool_roundtrips: Option<u8>,
373    pub tool_calls_global_key: Option<String>,
374}
375
376#[derive(Debug, Clone, Deserialize)]
377#[serde(deny_unknown_fields)]
378pub struct YamlNodeType {
379    pub llm_call: Option<YamlLlmCall>,
380    pub switch: Option<YamlSwitch>,
381    pub custom_worker: Option<YamlCustomWorker>,
382    pub end: Option<Value>,
383}
384
385#[derive(Debug, Clone, Deserialize)]
386#[serde(deny_unknown_fields)]
387pub struct YamlNode {
388    pub id: String,
389    #[serde(default)]
390    pub name: Option<String>,
391    pub node_type: YamlNodeType,
392    pub config: Option<YamlNodeConfig>,
393}
394
395impl YamlNode {
396    pub(super) fn kind_name(&self) -> &'static str {
397        if self.node_type.llm_call.is_some() {
398            "llm_call"
399        } else if self.node_type.switch.is_some() {
400            "switch"
401        } else if self.node_type.custom_worker.is_some() {
402            "custom_worker"
403        } else if self.node_type.end.is_some() {
404            "end"
405        } else {
406            "unknown"
407        }
408    }
409}
410
411#[derive(Debug, Clone, Deserialize)]
412#[serde(deny_unknown_fields)]
413pub struct YamlEdge {
414    pub from: String,
415    pub to: String,
416}
417
418#[derive(Debug, Clone, Deserialize)]
419#[serde(deny_unknown_fields)]
420pub struct YamlWorkflow {
421    pub id: String,
422    #[serde(default)]
423    pub version: Option<String>,
424    #[serde(default)]
425    pub metadata: Option<HashMap<String, Value>>,
426    pub entry_node: String,
427    #[serde(default)]
428    pub nodes: Vec<YamlNode>,
429    #[serde(default)]
430    pub edges: Vec<YamlEdge>,
431}