1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use serde::{ser::SerializeStruct, Deserialize, Serialize};
5use serde_json::Value;
6use simple_agent_type::message::{MessageContent, Role};
7use simple_agent_type::tool::{ToolChoice, ToolType};
8use thiserror::Error;
9
10use super::{TraceContext, YamlToolTraceMode, YamlWorkflowTraceTenantContext};
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "snake_case")]
14pub enum YamlWorkflowTokenKind {
15 Output,
16 Thinking,
17}
18
19#[derive(Debug, Clone, PartialEq, Serialize)]
20pub struct YamlWorkflowEvent {
21 pub event_type: String,
22 #[serde(skip_serializing_if = "Option::is_none")]
23 pub node_id: Option<String>,
24 #[serde(skip_serializing_if = "Option::is_none")]
25 pub step_id: Option<String>,
26 #[serde(skip_serializing_if = "Option::is_none")]
27 pub node_kind: Option<String>,
28 #[serde(skip_serializing_if = "Option::is_none")]
29 pub streamable: Option<bool>,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub message: Option<String>,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub delta: Option<String>,
34 #[serde(skip_serializing_if = "Option::is_none")]
35 pub snapshot: Option<Value>,
36 #[serde(skip_serializing_if = "Option::is_none")]
37 pub token_kind: Option<YamlWorkflowTokenKind>,
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub is_terminal_node_token: Option<bool>,
40 #[serde(skip_serializing_if = "Option::is_none")]
41 pub elapsed_ms: Option<u128>,
42 #[serde(skip_serializing_if = "Option::is_none")]
43 pub metadata: Option<Value>,
44}
45
46pub type WorkflowMessageRole = Role;
47
48#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
49pub struct WorkflowMessage {
50 pub role: WorkflowMessageRole,
51 pub content: MessageContent,
52 #[serde(default)]
53 pub name: Option<String>,
54 #[serde(default, alias = "toolCallId")]
55 pub tool_call_id: Option<String>,
56}
57
58#[derive(Debug, Clone, PartialEq, Serialize)]
59pub struct YamlTemplateBinding {
60 pub index: usize,
61 pub expression: String,
62 pub source_path: String,
63 pub resolved: Value,
64 pub resolved_type: String,
65 pub missing: bool,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
69pub enum YamlWorkflowDiagnosticSeverity {
70 Error,
71 Warning,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
75pub struct YamlWorkflowDiagnostic {
76 pub node_id: Option<String>,
77 pub code: String,
78 pub severity: YamlWorkflowDiagnosticSeverity,
79 pub message: String,
80}
81
82#[derive(Debug, Error)]
83pub enum YamlWorkflowRunError {
84 #[error("failed to read workflow yaml '{path}': {source}")]
85 Read {
86 path: String,
87 source: std::io::Error,
88 },
89 #[error("failed to parse workflow yaml '{path}': {source}")]
90 Parse {
91 path: String,
92 source: serde_yaml::Error,
93 },
94 #[error("rejected workflow yaml '{path}': {reason}")]
95 FileRejected { path: String, reason: String },
96 #[error("workflow '{workflow_id}' has no nodes")]
97 EmptyNodes { workflow_id: String },
98 #[error("entry node '{entry_node}' does not exist")]
99 MissingEntry { entry_node: String },
100 #[error("unknown node id '{node_id}'")]
101 MissingNode { node_id: String },
102 #[error("unsupported node type in '{node_id}'")]
103 UnsupportedNodeType { node_id: String },
104 #[error("unsupported switch condition format: {condition}")]
105 UnsupportedCondition { condition: String },
106 #[error("switch node '{node_id}' has no valid next target")]
107 InvalidSwitchTarget { node_id: String },
108 #[error("llm returned non-object payload for node '{node_id}'")]
109 LlmPayloadNotObject { node_id: String },
110 #[error("custom worker handler '{handler}' is not supported")]
111 UnsupportedCustomHandler { handler: String },
112 #[error("llm execution failed for node '{node_id}': {message}")]
113 Llm { node_id: String, message: String },
114 #[error("custom worker execution failed for node '{node_id}': {message}")]
115 CustomWorker { node_id: String, message: String },
116 #[error("workflow validation failed with {diagnostics_count} error(s)")]
117 Validation {
118 diagnostics_count: usize,
119 diagnostics: Vec<YamlWorkflowDiagnostic>,
120 },
121 #[error("invalid workflow input: {message}")]
122 InvalidInput { message: String },
123 #[error("ir runtime execution failed: {message}")]
124 IrRuntime { message: String },
125 #[error("workflow event stream cancelled: {message}")]
126 EventSinkCancelled { message: String },
127}
128
129impl Serialize for YamlWorkflowRunError {
130 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
131 where
132 S: serde::Serializer,
133 {
134 let mut state = serializer.serialize_struct("YamlWorkflowRunError", 2)?;
135 state.serialize_field("code", yaml_workflow_run_error_code(self))?;
136 state.serialize_field("message", &self.to_string())?;
137 state.end()
138 }
139}
140
141fn yaml_workflow_run_error_code(error: &YamlWorkflowRunError) -> &'static str {
142 match error {
143 YamlWorkflowRunError::Read { .. } => "read_failed",
144 YamlWorkflowRunError::Parse { .. } => "parse_failed",
145 YamlWorkflowRunError::FileRejected { .. } => "file_rejected",
146 YamlWorkflowRunError::EmptyNodes { .. } => "empty_nodes",
147 YamlWorkflowRunError::MissingEntry { .. } => "missing_entry",
148 YamlWorkflowRunError::MissingNode { .. } => "missing_node",
149 YamlWorkflowRunError::UnsupportedNodeType { .. } => "unsupported_node_type",
150 YamlWorkflowRunError::UnsupportedCondition { .. } => "unsupported_condition",
151 YamlWorkflowRunError::InvalidSwitchTarget { .. } => "invalid_switch_target",
152 YamlWorkflowRunError::LlmPayloadNotObject { .. } => "llm_payload_not_object",
153 YamlWorkflowRunError::UnsupportedCustomHandler { .. } => "unsupported_custom_handler",
154 YamlWorkflowRunError::Llm { .. } => "llm_failed",
155 YamlWorkflowRunError::CustomWorker { .. } => "custom_worker_failed",
156 YamlWorkflowRunError::Validation { .. } => "validation_failed",
157 YamlWorkflowRunError::InvalidInput { .. } => "invalid_input",
158 YamlWorkflowRunError::IrRuntime { .. } => "ir_runtime_failed",
159 YamlWorkflowRunError::EventSinkCancelled { .. } => "event_sink_cancelled",
160 }
161}
162
163pub trait YamlWorkflowEventSink: Send + Sync {
164 fn emit(&self, event: &YamlWorkflowEvent);
165
166 fn is_cancelled(&self) -> bool {
167 false
168 }
169}
170
171pub fn is_workflow_stream_delta_event(event_type: &str) -> bool {
173 matches!(
174 event_type,
175 "node_stream_delta"
176 | "node_stream_thinking_delta"
177 | "node_stream_output_delta"
178 | "node_stream_snapshot"
179 )
180}
181
182pub struct YamlWorkflowStreamFilterSink<'a> {
184 inner: &'a dyn YamlWorkflowEventSink,
185 workflow_streaming: bool,
186}
187
188impl<'a> YamlWorkflowStreamFilterSink<'a> {
189 pub fn new(inner: &'a dyn YamlWorkflowEventSink, workflow_streaming: bool) -> Self {
190 Self {
191 inner,
192 workflow_streaming,
193 }
194 }
195}
196
197impl YamlWorkflowEventSink for YamlWorkflowStreamFilterSink<'_> {
198 fn emit(&self, event: &YamlWorkflowEvent) {
199 if !self.workflow_streaming && is_workflow_stream_delta_event(event.event_type.as_str()) {
200 return;
201 }
202 self.inner.emit(event);
203 }
204
205 fn is_cancelled(&self) -> bool {
206 self.inner.is_cancelled()
207 }
208}
209
210pub struct NoopYamlWorkflowEventSink;
211
212impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
213 fn emit(&self, _event: &YamlWorkflowEvent) {}
214}
215
216pub(super) fn workflow_event_sink_cancelled_message() -> &'static str {
217 "workflow event callback cancelled"
218}
219
220pub(super) fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
221 event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
222}
223
224#[derive(Debug, Clone, PartialEq, Eq, Error)]
225pub enum YamlToIrError {
226 #[error("entry node '{entry_node}' does not exist")]
227 MissingEntry { entry_node: String },
228 #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
229 MultipleOutgoingEdge { node_id: String },
230 #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
231 UnsupportedNode { node_id: String, reason: String },
232}
233
234#[derive(Debug, Clone)]
235pub struct YamlLlmExecutionRequest {
236 pub node_id: String,
237 pub is_terminal_node: bool,
238 pub stream_json_as_text: bool,
239 pub model: String,
240 pub max_tokens: Option<u32>,
241 pub temperature: Option<f32>,
242 pub top_p: Option<f32>,
243 pub messages: Option<Vec<super::Message>>,
244 pub append_prompt_as_user: bool,
245 pub prompt: String,
246 pub prompt_template: String,
247 pub prompt_bindings: Vec<YamlTemplateBinding>,
248 pub schema: Value,
249 pub stream: bool,
250 pub heal: bool,
251 pub send_schema: bool,
252 pub tools: Vec<YamlResolvedTool>,
253 pub tool_choice: Option<ToolChoice>,
254 pub max_tool_roundtrips: u8,
255 pub tool_calls_global_key: Option<String>,
256 pub tool_trace_mode: YamlToolTraceMode,
257 pub execution_context: Value,
258 pub trace_id: Option<String>,
259 pub trace_context: Option<TraceContext>,
260 pub tenant_context: YamlWorkflowTraceTenantContext,
261 pub trace_sampled: bool,
262 pub split_stream_deltas: bool,
264 pub debug_stream_parse: bool,
266}
267
268#[derive(Debug, Clone)]
269pub struct YamlResolvedTool {
270 pub definition: super::ToolDefinition,
271 pub output_schema: Option<Value>,
272}
273
274#[async_trait]
275pub trait YamlWorkflowLlmExecutor: Send + Sync {
276 async fn complete_structured(
277 &self,
278 request: YamlLlmExecutionRequest,
279 event_sink: Option<&dyn YamlWorkflowEventSink>,
280 ) -> Result<super::YamlLlmExecutionResult, String>;
281}
282
283#[async_trait]
284pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
285 async fn execute(
286 &self,
287 handler: &str,
288 handler_file: Option<&str>,
289 payload: &Value,
290 context: &Value,
291 ) -> Result<Value, String>;
292}
293
294#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
295#[serde(rename_all = "snake_case")]
296pub enum YamlGlobalUpdateOp {
297 Set,
298 Append,
299 Increment,
300 Merge,
301}
302
303impl YamlGlobalUpdateOp {
304 pub fn as_str(self) -> &'static str {
305 match self {
306 YamlGlobalUpdateOp::Set => "set",
307 YamlGlobalUpdateOp::Append => "append",
308 YamlGlobalUpdateOp::Increment => "increment",
309 YamlGlobalUpdateOp::Merge => "merge",
310 }
311 }
312}
313
314#[derive(Debug, Clone, Deserialize)]
315#[serde(deny_unknown_fields)]
316pub struct YamlGlobalUpdate {
317 pub op: YamlGlobalUpdateOp,
318 pub from: Option<String>,
319 pub by: Option<f64>,
320}
321
322#[derive(Debug, Clone, Deserialize)]
323#[serde(deny_unknown_fields)]
324pub struct YamlNodeConfig {
325 pub prompt: Option<String>,
326 #[serde(default, alias = "schema")]
327 pub output_schema: Option<Value>,
328 pub payload: Option<Value>,
329 pub set_globals: Option<HashMap<String, String>>,
330 pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
331}
332
333#[derive(Debug, Clone, Deserialize)]
334#[serde(deny_unknown_fields)]
335pub struct YamlCustomWorker {
336 pub handler: String,
337 pub handler_file: Option<String>,
338}
339
340#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
341#[serde(rename_all = "snake_case")]
342pub enum YamlHumanInputType {
343 Choice,
344 Text,
345 Form,
346}
347
348#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
349#[serde(deny_unknown_fields)]
350pub struct YamlHumanInputOption {
351 pub value: String,
352 #[serde(default)]
353 pub label: Option<String>,
354}
355
356#[derive(Debug, Clone, Deserialize)]
357#[serde(deny_unknown_fields)]
358pub struct YamlHumanInput {
359 pub input_type: YamlHumanInputType,
360 pub prompt: Option<String>,
361 pub options: Option<Vec<YamlHumanInputOption>>,
362 pub form_schema: Option<Value>,
363 pub form_prefill: Option<String>,
364 pub timeout_seconds: Option<u64>,
365}
366
367#[derive(Debug, Clone, Deserialize)]
368#[serde(deny_unknown_fields)]
369pub struct YamlSwitchBranch {
370 pub condition: String,
371 pub target: String,
372}
373
374#[derive(Debug, Clone, Deserialize)]
375#[serde(deny_unknown_fields)]
376pub struct YamlSwitch {
377 #[serde(default)]
378 pub branches: Vec<YamlSwitchBranch>,
379 pub default: String,
380}
381
382#[derive(Debug, Clone, Deserialize)]
383#[serde(untagged)]
384pub enum YamlToolChoiceConfig {
385 Mode(super::ToolChoiceMode),
386 Function(YamlToolChoiceFunction),
387 OpenAi(super::ToolChoiceTool),
388}
389
390#[derive(Debug, Clone, Deserialize)]
391#[serde(deny_unknown_fields)]
392pub struct YamlToolChoiceFunction {
393 pub function: String,
394}
395
396#[derive(Debug, Clone, Deserialize)]
397#[serde(deny_unknown_fields)]
398pub struct YamlSimplifiedToolDeclaration {
399 pub name: String,
400 pub description: Option<String>,
401 pub input_schema: Value,
402 pub output_schema: Option<Value>,
403}
404
405#[derive(Debug, Clone, Deserialize)]
406#[serde(deny_unknown_fields)]
407pub struct YamlOpenAiToolFunction {
408 pub name: String,
409 pub description: Option<String>,
410 pub parameters: Option<Value>,
411 pub output_schema: Option<Value>,
412}
413
414#[derive(Debug, Clone, Deserialize)]
415#[serde(deny_unknown_fields)]
416pub struct YamlOpenAiToolDeclaration {
417 #[serde(rename = "type")]
418 pub tool_type: Option<ToolType>,
419 pub function: YamlOpenAiToolFunction,
420}
421
422#[derive(Debug, Clone, Deserialize)]
423#[serde(untagged)]
424pub enum YamlToolDeclaration {
425 OpenAi(YamlOpenAiToolDeclaration),
426 Simplified(YamlSimplifiedToolDeclaration),
427}
428
429#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
430#[serde(rename_all = "snake_case")]
431pub enum YamlToolFormat {
432 #[default]
433 Openai,
434 Simplified,
435}
436
437#[derive(Debug, Clone, Deserialize)]
438#[serde(deny_unknown_fields)]
439pub struct YamlLlmCall {
440 pub model: String,
441 pub max_tokens: Option<u32>,
442 pub temperature: Option<f32>,
443 pub top_p: Option<f32>,
444 pub stream: Option<bool>,
445 pub stream_json_as_text: Option<bool>,
446 pub heal: Option<bool>,
447 pub send_schema: Option<bool>,
448 pub messages_path: Option<String>,
449 pub append_prompt_as_user: Option<bool>,
450 #[serde(default)]
451 pub tools_format: YamlToolFormat,
452 #[serde(default)]
453 pub tools: Vec<YamlToolDeclaration>,
454 pub tool_choice: Option<YamlToolChoiceConfig>,
455 pub max_tool_roundtrips: Option<u8>,
456 pub tool_calls_global_key: Option<String>,
457}
458
459#[derive(Debug, Clone, Deserialize)]
460#[serde(deny_unknown_fields)]
461pub struct YamlNodeType {
462 pub llm_call: Option<YamlLlmCall>,
463 pub switch: Option<YamlSwitch>,
464 pub custom_worker: Option<YamlCustomWorker>,
465 pub human_input: Option<YamlHumanInput>,
466 pub end: Option<Value>,
467}
468
469#[derive(Debug, Clone, Deserialize)]
470#[serde(deny_unknown_fields)]
471pub struct YamlNode {
472 pub id: String,
473 #[serde(default)]
474 pub name: Option<String>,
475 pub node_type: YamlNodeType,
476 pub config: Option<YamlNodeConfig>,
477}
478
479impl YamlNode {
480 pub(super) fn kind_name(&self) -> &'static str {
481 if self.node_type.llm_call.is_some() {
482 "llm_call"
483 } else if self.node_type.switch.is_some() {
484 "switch"
485 } else if self.node_type.custom_worker.is_some() {
486 "custom_worker"
487 } else if self.node_type.human_input.is_some() {
488 "human_input"
489 } else if self.node_type.end.is_some() {
490 "end"
491 } else {
492 "unknown"
493 }
494 }
495}
496
497#[derive(Debug, Clone, Deserialize)]
498#[serde(deny_unknown_fields)]
499pub struct YamlEdge {
500 pub from: String,
501 pub to: String,
502}
503
504#[derive(Debug, Clone, Deserialize)]
505#[serde(deny_unknown_fields)]
506pub struct YamlWorkflow {
507 pub id: String,
508 #[serde(default)]
509 pub version: Option<String>,
510 #[serde(default)]
511 pub metadata: Option<HashMap<String, Value>>,
512 pub entry_node: String,
513 #[serde(default)]
514 pub nodes: Vec<YamlNode>,
515 #[serde(default)]
516 pub edges: Vec<YamlEdge>,
517}