1use serde_json::Value;
48use simple_agent_type::message::Message;
49use simple_agents_core::{CompletionOptions, CompletionOutcome, SimpleAgentsClient};
50
51use crate::yaml_runner::{
52 workflow_execution, RunMetadata, StepTiming, WorkflowCheckpoint, WorkflowEventSink,
53 WorkflowRunOutput, YamlWorkflowEventSink, YamlWorkflowExecutionFlags,
54 YamlWorkflowExecutionRequest, YamlWorkflowExecutorBinding, YamlWorkflowRunOptions,
55 YamlWorkflowSource,
56};
57
58use simple_agent_type::prelude::SimpleAgentsError;
59
60#[derive(Debug)]
62pub enum WorkflowError {
63 Workflow(String),
65 Core(SimpleAgentsError),
67}
68
69impl std::fmt::Display for WorkflowError {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 match self {
72 WorkflowError::Workflow(msg) => write!(f, "workflow error: {msg}"),
73 WorkflowError::Core(err) => write!(f, "core error: {err}"),
74 }
75 }
76}
77
78impl std::error::Error for WorkflowError {}
79
80impl From<SimpleAgentsError> for WorkflowError {
81 fn from(e: SimpleAgentsError) -> Self {
82 WorkflowError::Core(e)
83 }
84}
85
86#[derive(Clone, Default)]
92pub struct RunOptions {
93 pub workflow_options: YamlWorkflowRunOptions,
95 pub execution_flags: YamlWorkflowExecutionFlags,
97}
98
99pub struct WorkflowClient {
105 inner: SimpleAgentsClient,
106}
107
108impl WorkflowClient {
109 pub fn from_client(client: SimpleAgentsClient) -> Self {
111 Self { inner: client }
112 }
113
114 pub fn client(&self) -> &SimpleAgentsClient {
116 &self.inner
117 }
118
119 pub async fn run(
125 &self,
126 workflow_path: &str,
127 messages: Vec<Message>,
128 options: RunOptions,
129 ) -> Result<WorkflowRunOutput, WorkflowError> {
130 let input = messages_to_value(messages);
131 let request = YamlWorkflowExecutionRequest {
132 source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
133 workflow_input: &input,
134 executor: YamlWorkflowExecutorBinding::Client(&self.inner),
135 custom_worker: None,
136 options: &options.workflow_options,
137 flags: options.execution_flags,
138 };
139
140 let output = workflow_execution::run(request)
141 .await
142 .map_err(|e| WorkflowError::Workflow(e.to_string()))?;
143
144 Ok(yaml_output_to_workflow_output(output))
146 }
147
148 pub async fn stream(
152 &self,
153 workflow_path: &str,
154 messages: Vec<Message>,
155 sink: &dyn WorkflowEventSink,
156 options: RunOptions,
157 ) -> Result<WorkflowRunOutput, WorkflowError> {
158 let input = messages_to_value(messages);
159 let mut flags = options.execution_flags;
160 flags.workflow_streaming = true;
161
162 let request = YamlWorkflowExecutionRequest {
163 source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
164 workflow_input: &input,
165 executor: YamlWorkflowExecutorBinding::Client(&self.inner),
166 custom_worker: None,
167 options: &options.workflow_options,
168 flags,
169 };
170
171 let bridge = EventSinkBridge { inner: sink };
173 let output = workflow_execution::stream(request, &bridge)
174 .await
175 .map_err(|e| WorkflowError::Workflow(e.to_string()))?;
176
177 Ok(yaml_output_to_workflow_output(output))
178 }
179
180 pub async fn resume(
182 &self,
183 checkpoint: WorkflowCheckpoint,
184 options: RunOptions,
185 ) -> Result<WorkflowRunOutput, WorkflowError> {
186 let messages = checkpoint.original_messages.clone();
188 self.run(&checkpoint.workflow_path, messages, options).await
189 }
190
191 pub async fn complete(
193 &self,
194 request: &simple_agent_type::request::CompletionRequest,
195 options: CompletionOptions,
196 ) -> Result<CompletionOutcome, SimpleAgentsError> {
197 self.inner.complete(request, options).await
198 }
199}
200
201fn messages_to_value(messages: Vec<Message>) -> Value {
208 let msgs: Vec<Value> = messages
209 .into_iter()
210 .map(|m| serde_json::to_value(&m).unwrap_or(Value::Null))
211 .collect();
212 serde_json::json!({ "messages": msgs })
213}
214
215struct EventSinkBridge<'a> {
218 inner: &'a dyn WorkflowEventSink,
219}
220
221impl YamlWorkflowEventSink for EventSinkBridge<'_> {
222 fn emit(&self, event: &crate::yaml_runner::YamlWorkflowEvent) {
223 use crate::yaml_runner::{TokenKind, WorkflowEvent};
225 let mapped = match event.event_type.as_str() {
226 "workflow_started" => Some(WorkflowEvent::WorkflowStarted {
227 workflow_id: event.node_id.clone().unwrap_or_default(),
228 }),
229 "node_started" => Some(WorkflowEvent::NodeStarted {
230 node_id: event.node_id.clone().unwrap_or_default(),
231 node_type: node_type_from_kind(event.node_kind.as_deref()),
232 }),
233 "node_completed" => Some(WorkflowEvent::NodeCompleted {
234 node_id: event.node_id.clone().unwrap_or_default(),
235 output: event
236 .snapshot
237 .clone()
238 .or_else(|| event.message.clone().map(Value::String))
239 .unwrap_or(Value::Null),
240 }),
241 "node_tool_call_requested" => {
242 tool_name_from_event(event).map(|tool_name| WorkflowEvent::ToolCallRequested {
243 node_id: event.node_id.clone().unwrap_or_default(),
244 tool_name,
245 arguments: event
246 .metadata
247 .as_ref()
248 .and_then(|metadata| metadata.get("arguments"))
249 .cloned()
250 .unwrap_or(Value::Null),
251 })
252 }
253 "node_tool_call_completed" => {
254 tool_name_from_event(event).map(|tool_name| WorkflowEvent::ToolCallCompleted {
255 node_id: event.node_id.clone().unwrap_or_default(),
256 tool_name,
257 output: event
258 .metadata
259 .as_ref()
260 .and_then(|metadata| metadata.get("output"))
261 .cloned()
262 .unwrap_or(Value::Null),
263 })
264 }
265 "node_tool_call_failed" => Some(WorkflowEvent::NodeFailed {
266 node_id: event.node_id.clone().unwrap_or_default(),
267 error: event
268 .message
269 .clone()
270 .unwrap_or_else(|| "tool call failed without an error message".to_string()),
271 }),
272 "node_stream_delta" | "node_stream_output_delta" => {
273 Some(WorkflowEvent::LlmTokenDelta {
274 node_id: event.node_id.clone().unwrap_or_default(),
275 token: event.delta.clone().unwrap_or_default(),
276 token_kind: TokenKind::Output,
277 })
278 }
279 "node_stream_thinking_delta" => Some(WorkflowEvent::LlmTokenDelta {
280 node_id: event.node_id.clone().unwrap_or_default(),
281 token: event.delta.clone().unwrap_or_default(),
282 token_kind: TokenKind::Reasoning,
283 }),
284 "workflow_completed" => Some(WorkflowEvent::WorkflowCompleted {
285 output: event.snapshot.clone().unwrap_or(Value::Null),
286 metadata: event.metadata.clone(),
287 }),
288 _ => None,
289 };
290 if let Some(ev) = mapped {
291 self.inner.emit(&ev);
292 }
293 }
294}
295
296fn node_type_from_kind(kind: Option<&str>) -> crate::yaml_runner::NodeType {
297 use crate::yaml_runner::NodeType;
298
299 match kind {
300 Some("llm_call") => NodeType::LlmCall,
301 Some("switch") => NodeType::Switch,
302 Some("custom_worker") => NodeType::CustomWorker,
303 Some("end") => NodeType::End,
304 _ => NodeType::Unknown,
305 }
306}
307
308fn tool_name_from_event(event: &crate::yaml_runner::YamlWorkflowEvent) -> Option<String> {
309 event
310 .metadata
311 .as_ref()
312 .and_then(|metadata| metadata.get("tool_name"))
313 .and_then(Value::as_str)
314 .map(str::to_string)
315}
316
317fn yaml_output_to_workflow_output(
319 output: crate::yaml_runner::YamlWorkflowRunOutput,
320) -> WorkflowRunOutput {
321 let metadata = Some(RunMetadata {
322 total_elapsed_ms: output.total_elapsed_ms,
323 ttft_ms: output.ttft_ms,
324 total_input_tokens: output.total_input_tokens,
325 total_output_tokens: output.total_output_tokens,
326 total_tokens: output.total_tokens,
327 total_reasoning_tokens: output.total_reasoning_tokens,
328 tokens_per_second: output.tokens_per_second,
329 step_details: output
330 .step_timings
331 .iter()
332 .map(|step| StepTiming {
333 node_id: step.node_id.clone(),
334 node_type: step.node_kind.clone(),
335 model: step.model_name.clone(),
336 elapsed_ms: step.elapsed_ms,
337 input_tokens: step.prompt_tokens.map(u64::from),
338 output_tokens: step.completion_tokens.map(u64::from),
339 total_tokens: step.total_tokens.map(u64::from),
340 reasoning_tokens: step.reasoning_tokens.map(u64::from),
341 ttft_ms: None,
342 })
343 .collect(),
344 trace_id: output.trace_id.clone(),
345 });
346
347 WorkflowRunOutput {
348 workflow_id: output.workflow_id,
349 entry_node: output.entry_node,
350 trace: output.trace,
351 outputs: output.outputs,
352 terminal_node: output.terminal_node,
353 terminal_output: output.terminal_output,
354 metadata,
355 events: None,
356 }
357}