1use serde_json::Value;
48use simple_agent_type::message::Message;
49use simple_agents_core::{CompletionOptions, CompletionOutcome, SimpleAgentsClient};
50
51use crate::yaml_runner::{
52 workflow_execution, RunMetadata, StepTiming, WorkflowEventSink, WorkflowRunOutput,
53 YamlWorkflowEventSink, YamlWorkflowExecutionFlags, YamlWorkflowExecutionRequest,
54 YamlWorkflowExecutorBinding, YamlWorkflowRunOptions, YamlWorkflowSource,
55};
56
57use simple_agent_type::prelude::SimpleAgentsError;
58
59#[derive(Debug)]
61pub enum WorkflowError {
62 Workflow(String),
64 Core(SimpleAgentsError),
66}
67
68impl std::fmt::Display for WorkflowError {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 match self {
71 WorkflowError::Workflow(msg) => write!(f, "workflow error: {msg}"),
72 WorkflowError::Core(err) => write!(f, "core error: {err}"),
73 }
74 }
75}
76
77impl std::error::Error for WorkflowError {}
78
79impl From<SimpleAgentsError> for WorkflowError {
80 fn from(e: SimpleAgentsError) -> Self {
81 WorkflowError::Core(e)
82 }
83}
84
85#[derive(Clone, Default)]
91pub struct RunOptions {
92 pub workflow_options: YamlWorkflowRunOptions,
94 pub execution_flags: YamlWorkflowExecutionFlags,
96}
97
98pub struct WorkflowClient {
104 inner: SimpleAgentsClient,
105}
106
107impl WorkflowClient {
108 pub fn from_client(client: SimpleAgentsClient) -> Self {
110 Self { inner: client }
111 }
112
113 pub fn client(&self) -> &SimpleAgentsClient {
115 &self.inner
116 }
117
118 pub async fn run(
124 &self,
125 workflow_path: &str,
126 messages: Vec<Message>,
127 options: RunOptions,
128 ) -> Result<WorkflowRunOutput, WorkflowError> {
129 let input = messages_to_value(messages);
130 let request = YamlWorkflowExecutionRequest {
131 source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
132 workflow_input: &input,
133 executor: YamlWorkflowExecutorBinding::Client(&self.inner),
134 custom_worker: None,
135 resume: None,
136 human_response: None,
137 options: &options.workflow_options,
138 flags: options.execution_flags,
139 };
140
141 let output = workflow_execution::run(request)
142 .await
143 .map_err(|e| WorkflowError::Workflow(e.to_string()))?;
144
145 Ok(yaml_output_to_workflow_output(output))
147 }
148
149 pub async fn stream(
153 &self,
154 workflow_path: &str,
155 messages: Vec<Message>,
156 sink: &dyn WorkflowEventSink,
157 options: RunOptions,
158 ) -> Result<WorkflowRunOutput, WorkflowError> {
159 let input = messages_to_value(messages);
160 let mut flags = options.execution_flags;
161 flags.workflow_streaming = true;
162
163 let request = YamlWorkflowExecutionRequest {
164 source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
165 workflow_input: &input,
166 executor: YamlWorkflowExecutorBinding::Client(&self.inner),
167 custom_worker: None,
168 resume: None,
169 human_response: None,
170 options: &options.workflow_options,
171 flags,
172 };
173
174 let bridge = EventSinkBridge { inner: sink };
176 let output = workflow_execution::stream(request, &bridge)
177 .await
178 .map_err(|e| WorkflowError::Workflow(e.to_string()))?;
179
180 Ok(yaml_output_to_workflow_output(output))
181 }
182
183 pub async fn complete(
185 &self,
186 request: &simple_agent_type::request::CompletionRequest,
187 options: CompletionOptions,
188 ) -> Result<CompletionOutcome, SimpleAgentsError> {
189 self.inner.complete(request, options).await
190 }
191}
192
193fn messages_to_value(messages: Vec<Message>) -> Value {
200 let msgs: Vec<Value> = messages
201 .into_iter()
202 .map(|m| serde_json::to_value(&m).unwrap_or(Value::Null))
203 .collect();
204 serde_json::json!({ "messages": msgs })
205}
206
207struct EventSinkBridge<'a> {
210 inner: &'a dyn WorkflowEventSink,
211}
212
213impl YamlWorkflowEventSink for EventSinkBridge<'_> {
214 fn emit(&self, event: &crate::yaml_runner::YamlWorkflowEvent) {
215 use crate::yaml_runner::{TokenKind, WorkflowEvent};
217 let mapped = match event.event_type.as_str() {
218 "workflow_started" => Some(WorkflowEvent::WorkflowStarted {
219 workflow_id: event.node_id.clone().unwrap_or_default(),
220 }),
221 "node_started" => Some(WorkflowEvent::NodeStarted {
222 node_id: event.node_id.clone().unwrap_or_default(),
223 node_type: node_type_from_kind(event.node_kind.as_deref()),
224 }),
225 "node_completed" => Some(WorkflowEvent::NodeCompleted {
226 node_id: event.node_id.clone().unwrap_or_default(),
227 output: event
228 .snapshot
229 .clone()
230 .or_else(|| event.message.clone().map(Value::String))
231 .unwrap_or(Value::Null),
232 }),
233 "node_tool_call_requested" => {
234 tool_name_from_event(event).map(|tool_name| WorkflowEvent::ToolCallRequested {
235 node_id: event.node_id.clone().unwrap_or_default(),
236 tool_name,
237 arguments: event
238 .metadata
239 .as_ref()
240 .and_then(|metadata| metadata.get("arguments"))
241 .cloned()
242 .unwrap_or(Value::Null),
243 })
244 }
245 "node_tool_call_completed" => {
246 tool_name_from_event(event).map(|tool_name| WorkflowEvent::ToolCallCompleted {
247 node_id: event.node_id.clone().unwrap_or_default(),
248 tool_name,
249 output: event
250 .metadata
251 .as_ref()
252 .and_then(|metadata| metadata.get("output"))
253 .cloned()
254 .unwrap_or(Value::Null),
255 })
256 }
257 "human_input_requested" => Some(WorkflowEvent::HumanInputRequested {
258 node_id: event.node_id.clone().unwrap_or_default(),
259 request: event.metadata.clone().unwrap_or(Value::Null),
260 }),
261 "human_input_received" => Some(WorkflowEvent::HumanInputReceived {
262 node_id: event.node_id.clone().unwrap_or_default(),
263 response: event.metadata.clone().unwrap_or(Value::Null),
264 }),
265 "node_tool_call_failed" => Some(WorkflowEvent::NodeFailed {
266 node_id: event.node_id.clone().unwrap_or_default(),
267 error: event
268 .message
269 .clone()
270 .unwrap_or_else(|| "tool call failed without an error message".to_string()),
271 }),
272 "node_stream_delta" | "node_stream_output_delta" => {
273 Some(WorkflowEvent::LlmTokenDelta {
274 node_id: event.node_id.clone().unwrap_or_default(),
275 token: event.delta.clone().unwrap_or_default(),
276 token_kind: TokenKind::Output,
277 })
278 }
279 "node_stream_thinking_delta" => Some(WorkflowEvent::LlmTokenDelta {
280 node_id: event.node_id.clone().unwrap_or_default(),
281 token: event.delta.clone().unwrap_or_default(),
282 token_kind: TokenKind::Reasoning,
283 }),
284 "workflow_completed" => Some(WorkflowEvent::WorkflowCompleted {
285 output: event.snapshot.clone().unwrap_or(Value::Null),
286 metadata: event.metadata.clone(),
287 }),
288 _ => None,
289 };
290 if let Some(ev) = mapped {
291 self.inner.emit(&ev);
292 }
293 }
294}
295
296fn node_type_from_kind(kind: Option<&str>) -> crate::yaml_runner::NodeType {
297 use crate::yaml_runner::NodeType;
298
299 match kind {
300 Some("llm_call") => NodeType::LlmCall,
301 Some("switch") => NodeType::Switch,
302 Some("custom_worker") => NodeType::CustomWorker,
303 Some("human_input") => NodeType::HumanInput,
304 Some("end") => NodeType::End,
305 _ => NodeType::Unknown,
306 }
307}
308
309fn tool_name_from_event(event: &crate::yaml_runner::YamlWorkflowEvent) -> Option<String> {
310 event
311 .metadata
312 .as_ref()
313 .and_then(|metadata| metadata.get("tool_name"))
314 .and_then(Value::as_str)
315 .map(str::to_string)
316}
317
318fn yaml_output_to_workflow_output(
320 output: crate::yaml_runner::YamlWorkflowRunOutput,
321) -> WorkflowRunOutput {
322 let metadata = Some(RunMetadata {
323 total_elapsed_ms: output.total_elapsed_ms,
324 ttft_ms: output.ttft_ms,
325 total_input_tokens: output.total_input_tokens,
326 total_output_tokens: output.total_output_tokens,
327 total_tokens: output.total_tokens,
328 total_reasoning_tokens: output.total_reasoning_tokens,
329 tokens_per_second: output.tokens_per_second,
330 step_details: output
331 .step_timings
332 .iter()
333 .map(|step| StepTiming {
334 node_id: step.node_id.clone(),
335 node_type: step.node_kind.clone(),
336 model: step.model_name.clone(),
337 elapsed_ms: step.elapsed_ms,
338 input_tokens: step.prompt_tokens.map(u64::from),
339 output_tokens: step.completion_tokens.map(u64::from),
340 total_tokens: step.total_tokens.map(u64::from),
341 reasoning_tokens: step.reasoning_tokens.map(u64::from),
342 ttft_ms: None,
343 })
344 .collect(),
345 trace_id: output.trace_id.clone(),
346 });
347
348 WorkflowRunOutput {
349 workflow_id: output.workflow_id,
350 entry_node: output.entry_node,
351 trace: output.trace,
352 outputs: output.outputs,
353 globals: output.globals,
354 terminal_node: output.terminal_node,
355 terminal_output: output.terminal_output,
356 status: match output.status {
357 crate::yaml_runner::YamlWorkflowRunStatus::Completed => "completed".to_string(),
358 crate::yaml_runner::YamlWorkflowRunStatus::AwaitingHumanInput => {
359 "awaiting_human_input".to_string()
360 }
361 },
362 human_request: output
363 .human_request
364 .as_ref()
365 .and_then(|request| serde_json::to_value(request).ok()),
366 metadata,
367 events: None,
368 }
369}