1use serde_json::Value;
48use simple_agent_type::message::Message;
49use simple_agents_core::{CompletionOptions, CompletionOutcome, SimpleAgentsClient};
50
51use crate::yaml_runner::{
52 workflow_execution, RunMetadata, StepTiming, WorkflowEventSink, WorkflowRunOutput,
53 YamlWorkflowEventSink, YamlWorkflowExecutionFlags, YamlWorkflowExecutionRequest,
54 YamlWorkflowExecutorBinding, YamlWorkflowRunError, YamlWorkflowRunOptions, YamlWorkflowSource,
55};
56
57use simple_agent_type::prelude::SimpleAgentsError;
58
59#[derive(Debug)]
61pub enum WorkflowError {
62 Workflow(YamlWorkflowRunError),
64 Core(SimpleAgentsError),
66}
67
68impl std::fmt::Display for WorkflowError {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 match self {
71 WorkflowError::Workflow(err) => write!(f, "workflow error: {err}"),
72 WorkflowError::Core(err) => write!(f, "core error: {err}"),
73 }
74 }
75}
76
77impl std::error::Error for WorkflowError {}
78
79impl From<SimpleAgentsError> for WorkflowError {
80 fn from(e: SimpleAgentsError) -> Self {
81 WorkflowError::Core(e)
82 }
83}
84
85impl From<YamlWorkflowRunError> for WorkflowError {
86 fn from(e: YamlWorkflowRunError) -> Self {
87 WorkflowError::Workflow(e)
88 }
89}
90
91#[derive(Clone, Default)]
97pub struct RunOptions {
98 pub workflow_options: YamlWorkflowRunOptions,
100 pub execution_flags: YamlWorkflowExecutionFlags,
102}
103
104pub struct WorkflowClient {
110 inner: SimpleAgentsClient,
111}
112
113impl WorkflowClient {
114 pub fn from_client(client: SimpleAgentsClient) -> Self {
116 Self { inner: client }
117 }
118
119 pub fn client(&self) -> &SimpleAgentsClient {
121 &self.inner
122 }
123
124 pub async fn run(
130 &self,
131 workflow_path: &str,
132 messages: Vec<Message>,
133 options: RunOptions,
134 ) -> Result<WorkflowRunOutput, WorkflowError> {
135 let input = messages_to_value(messages);
136 let request = YamlWorkflowExecutionRequest {
137 source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
138 workflow_input: &input,
139 executor: YamlWorkflowExecutorBinding::Client(&self.inner),
140 custom_worker: None,
141 resume: None,
142 human_response: None,
143 options: &options.workflow_options,
144 flags: options.execution_flags,
145 };
146
147 let output = workflow_execution::run(request).await?;
148
149 Ok(yaml_output_to_workflow_output(output))
151 }
152
153 pub async fn stream(
157 &self,
158 workflow_path: &str,
159 messages: Vec<Message>,
160 sink: &dyn WorkflowEventSink,
161 options: RunOptions,
162 ) -> Result<WorkflowRunOutput, WorkflowError> {
163 let input = messages_to_value(messages);
164 let mut flags = options.execution_flags;
165 flags.workflow_streaming = true;
166
167 let request = YamlWorkflowExecutionRequest {
168 source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
169 workflow_input: &input,
170 executor: YamlWorkflowExecutorBinding::Client(&self.inner),
171 custom_worker: None,
172 resume: None,
173 human_response: None,
174 options: &options.workflow_options,
175 flags,
176 };
177
178 let bridge = EventSinkBridge { inner: sink };
180 let output = workflow_execution::stream(request, &bridge).await?;
181
182 Ok(yaml_output_to_workflow_output(output))
183 }
184
185 pub async fn complete(
187 &self,
188 request: &simple_agent_type::request::CompletionRequest,
189 options: CompletionOptions,
190 ) -> Result<CompletionOutcome, SimpleAgentsError> {
191 self.inner.complete(request, options).await
192 }
193}
194
195fn messages_to_value(messages: Vec<Message>) -> Value {
202 let msgs: Vec<Value> = messages
203 .into_iter()
204 .map(|m| match serde_json::to_value(&m) {
205 Ok(v) => v,
206 Err(err) => {
207 eprintln!("[simple-agents-workflow] WARN: failed to serialize message: {err}");
208 Value::Null
209 }
210 })
211 .collect();
212 serde_json::json!({ "messages": msgs })
213}
214
215struct EventSinkBridge<'a> {
218 inner: &'a dyn WorkflowEventSink,
219}
220
221impl YamlWorkflowEventSink for EventSinkBridge<'_> {
222 fn emit(&self, event: &crate::yaml_runner::YamlWorkflowEvent) {
223 use crate::yaml_runner::{TokenKind, WorkflowEvent};
225 let mapped = match event.event_type.as_str() {
226 "workflow_started" => Some(WorkflowEvent::WorkflowStarted {
227 workflow_id: event.node_id.clone().unwrap_or_default(),
228 }),
229 "node_started" => Some(WorkflowEvent::NodeStarted {
230 node_id: event.node_id.clone().unwrap_or_default(),
231 node_type: node_type_from_kind(event.node_kind.as_deref()),
232 }),
233 "node_completed" => Some(WorkflowEvent::NodeCompleted {
234 node_id: event.node_id.clone().unwrap_or_default(),
235 output: event
236 .snapshot
237 .clone()
238 .or_else(|| event.message.clone().map(Value::String))
239 .unwrap_or(Value::Null),
240 }),
241 "node_tool_call_requested" => {
242 tool_name_from_event(event).map(|tool_name| WorkflowEvent::ToolCallRequested {
243 node_id: event.node_id.clone().unwrap_or_default(),
244 tool_name,
245 arguments: event
246 .metadata
247 .as_ref()
248 .and_then(|metadata| metadata.get("arguments"))
249 .cloned()
250 .unwrap_or(Value::Null),
251 })
252 }
253 "node_tool_call_completed" => {
254 tool_name_from_event(event).map(|tool_name| WorkflowEvent::ToolCallCompleted {
255 node_id: event.node_id.clone().unwrap_or_default(),
256 tool_name,
257 output: event
258 .metadata
259 .as_ref()
260 .and_then(|metadata| metadata.get("output"))
261 .cloned()
262 .unwrap_or(Value::Null),
263 })
264 }
265 "human_input_requested" => Some(WorkflowEvent::HumanInputRequested {
266 node_id: event.node_id.clone().unwrap_or_default(),
267 request: event.metadata.clone().unwrap_or(Value::Null),
268 }),
269 "human_input_received" => Some(WorkflowEvent::HumanInputReceived {
270 node_id: event.node_id.clone().unwrap_or_default(),
271 response: event.metadata.clone().unwrap_or(Value::Null),
272 }),
273 "node_tool_call_failed" => Some(WorkflowEvent::NodeFailed {
274 node_id: event.node_id.clone().unwrap_or_default(),
275 error: event
276 .message
277 .clone()
278 .unwrap_or_else(|| "tool call failed without an error message".to_string()),
279 }),
280 "node_stream_delta" | "node_stream_output_delta" => {
281 Some(WorkflowEvent::LlmTokenDelta {
282 node_id: event.node_id.clone().unwrap_or_default(),
283 token: event.delta.clone().unwrap_or_default(),
284 token_kind: TokenKind::Output,
285 })
286 }
287 "node_stream_thinking_delta" => Some(WorkflowEvent::LlmTokenDelta {
288 node_id: event.node_id.clone().unwrap_or_default(),
289 token: event.delta.clone().unwrap_or_default(),
290 token_kind: TokenKind::Reasoning,
291 }),
292 "workflow_completed" => Some(WorkflowEvent::WorkflowCompleted {
293 output: event.snapshot.clone().unwrap_or(Value::Null),
294 metadata: event.metadata.clone(),
295 }),
296 _ => None,
297 };
298 if let Some(ev) = mapped {
299 self.inner.emit(&ev);
300 }
301 }
302}
303
304fn node_type_from_kind(kind: Option<&str>) -> crate::yaml_runner::NodeType {
305 use crate::yaml_runner::NodeType;
306
307 match kind {
308 Some("llm_call") => NodeType::LlmCall,
309 Some("switch") => NodeType::Switch,
310 Some("custom_worker") => NodeType::CustomWorker,
311 Some("human_input") => NodeType::HumanInput,
312 Some("end") => NodeType::End,
313 _ => NodeType::Unknown,
314 }
315}
316
317fn tool_name_from_event(event: &crate::yaml_runner::YamlWorkflowEvent) -> Option<String> {
318 event
319 .metadata
320 .as_ref()
321 .and_then(|metadata| metadata.get("tool_name"))
322 .and_then(Value::as_str)
323 .map(str::to_string)
324}
325
326fn yaml_output_to_workflow_output(
328 output: crate::yaml_runner::YamlWorkflowRunOutput,
329) -> WorkflowRunOutput {
330 let metadata = Some(RunMetadata {
331 total_elapsed_ms: output.total_elapsed_ms,
332 ttft_ms: output.ttft_ms,
333 total_input_tokens: output.total_input_tokens,
334 total_output_tokens: output.total_output_tokens,
335 total_tokens: output.total_tokens,
336 total_reasoning_tokens: output.total_reasoning_tokens,
337 tokens_per_second: output.tokens_per_second,
338 step_details: output
339 .step_timings
340 .iter()
341 .map(|step| StepTiming {
342 node_id: step.node_id.clone(),
343 node_type: step.node_kind.clone(),
344 model: step.model_name.clone(),
345 elapsed_ms: step.elapsed_ms,
346 input_tokens: step.prompt_tokens.map(u64::from),
347 output_tokens: step.completion_tokens.map(u64::from),
348 total_tokens: step.total_tokens.map(u64::from),
349 reasoning_tokens: step.reasoning_tokens.map(u64::from),
350 ttft_ms: None,
351 })
352 .collect(),
353 trace_id: output.trace_id.clone(),
354 });
355
356 WorkflowRunOutput {
357 workflow_id: output.workflow_id,
358 entry_node: output.entry_node,
359 trace: output.trace,
360 outputs: output.outputs,
361 globals: output.globals,
362 terminal_node: output.terminal_node,
363 terminal_output: output.terminal_output,
364 status: match output.status {
365 crate::yaml_runner::YamlWorkflowRunStatus::Completed => "completed".to_string(),
366 crate::yaml_runner::YamlWorkflowRunStatus::AwaitingHumanInput => {
367 "awaiting_human_input".to_string()
368 }
369 },
370 human_request: output
371 .human_request
372 .as_ref()
373 .and_then(|request| serde_json::to_value(request).ok()),
374 metadata,
375 events: None,
376 }
377}