1use serde_json::Value;
48use simple_agent_type::message::Message;
49use simple_agents_core::{CompletionOptions, CompletionOutcome, SimpleAgentsClient};
50
51use crate::yaml_runner::{
52 workflow_execution, WorkflowCheckpoint, WorkflowEventSink, WorkflowRunOutput,
53 YamlWorkflowEventSink, YamlWorkflowExecutionFlags, YamlWorkflowExecutionRequest,
54 YamlWorkflowExecutorBinding, YamlWorkflowRunOptions, YamlWorkflowSource,
55};
56
57use simple_agent_type::prelude::SimpleAgentsError;
58
59#[derive(Debug)]
61pub enum WorkflowError {
62 Workflow(String),
64 Core(SimpleAgentsError),
66}
67
68impl std::fmt::Display for WorkflowError {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 match self {
71 WorkflowError::Workflow(msg) => write!(f, "workflow error: {msg}"),
72 WorkflowError::Core(err) => write!(f, "core error: {err}"),
73 }
74 }
75}
76
77impl std::error::Error for WorkflowError {}
78
79impl From<SimpleAgentsError> for WorkflowError {
80 fn from(e: SimpleAgentsError) -> Self {
81 WorkflowError::Core(e)
82 }
83}
84
85#[derive(Clone, Default)]
91pub struct RunOptions {
92 pub workflow_options: YamlWorkflowRunOptions,
94 pub execution_flags: YamlWorkflowExecutionFlags,
96}
97
98pub struct WorkflowClient {
104 inner: SimpleAgentsClient,
105}
106
107impl WorkflowClient {
108 pub fn from_client(client: SimpleAgentsClient) -> Self {
110 Self { inner: client }
111 }
112
113 pub fn client(&self) -> &SimpleAgentsClient {
115 &self.inner
116 }
117
118 pub async fn run(
124 &self,
125 workflow_path: &str,
126 messages: Vec<Message>,
127 options: RunOptions,
128 ) -> Result<WorkflowRunOutput, WorkflowError> {
129 let input = messages_to_value(messages);
130 let request = YamlWorkflowExecutionRequest {
131 source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
132 workflow_input: &input,
133 executor: YamlWorkflowExecutorBinding::Client(&self.inner),
134 custom_worker: None,
135 options: &options.workflow_options,
136 flags: options.execution_flags,
137 };
138
139 let output = workflow_execution::run(request)
140 .await
141 .map_err(|e| WorkflowError::Workflow(e.to_string()))?;
142
143 Ok(yaml_output_to_workflow_output(output))
145 }
146
147 pub async fn stream(
151 &self,
152 workflow_path: &str,
153 messages: Vec<Message>,
154 sink: &dyn WorkflowEventSink,
155 options: RunOptions,
156 ) -> Result<WorkflowRunOutput, WorkflowError> {
157 let input = messages_to_value(messages);
158 let mut flags = options.execution_flags;
159 flags.workflow_streaming = true;
160
161 let request = YamlWorkflowExecutionRequest {
162 source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
163 workflow_input: &input,
164 executor: YamlWorkflowExecutorBinding::Client(&self.inner),
165 custom_worker: None,
166 options: &options.workflow_options,
167 flags,
168 };
169
170 let bridge = EventSinkBridge { inner: sink };
172 let output = workflow_execution::stream(request, &bridge)
173 .await
174 .map_err(|e| WorkflowError::Workflow(e.to_string()))?;
175
176 Ok(yaml_output_to_workflow_output(output))
177 }
178
179 pub async fn resume(
181 &self,
182 checkpoint: WorkflowCheckpoint,
183 options: RunOptions,
184 ) -> Result<WorkflowRunOutput, WorkflowError> {
185 let messages = checkpoint.original_messages.clone();
187 self.run(&checkpoint.workflow_path, messages, options).await
188 }
189
190 pub async fn complete(
192 &self,
193 request: &simple_agent_type::request::CompletionRequest,
194 options: CompletionOptions,
195 ) -> Result<CompletionOutcome, SimpleAgentsError> {
196 self.inner.complete(request, options).await
197 }
198}
199
200fn messages_to_value(messages: Vec<Message>) -> Value {
207 let msgs: Vec<Value> = messages
208 .into_iter()
209 .map(|m| serde_json::to_value(&m).unwrap_or(Value::Null))
210 .collect();
211 serde_json::json!({ "messages": msgs })
212}
213
214struct EventSinkBridge<'a> {
217 inner: &'a dyn WorkflowEventSink,
218}
219
220impl YamlWorkflowEventSink for EventSinkBridge<'_> {
221 fn emit(&self, event: &crate::yaml_runner::YamlWorkflowEvent) {
222 use crate::yaml_runner::{NodeType, TokenKind, WorkflowEvent};
224 let mapped = match event.event_type.as_str() {
225 "workflow_started" => Some(WorkflowEvent::WorkflowStarted {
226 workflow_id: event.node_id.clone().unwrap_or_default(),
227 }),
228 "node_started" => Some(WorkflowEvent::NodeStarted {
229 node_id: event.node_id.clone().unwrap_or_default(),
230 node_type: NodeType::LlmCall,
231 }),
232 "node_completed" => Some(WorkflowEvent::NodeCompleted {
233 node_id: event.node_id.clone().unwrap_or_default(),
234 output: event
235 .message
236 .clone()
237 .map(Value::String)
238 .unwrap_or(Value::Null),
239 }),
240 "node_stream_delta" | "node_stream_output_delta" => {
241 Some(WorkflowEvent::LlmTokenDelta {
242 node_id: event.node_id.clone().unwrap_or_default(),
243 token: event.delta.clone().unwrap_or_default(),
244 token_kind: TokenKind::Output,
245 })
246 }
247 "node_stream_thinking_delta" => Some(WorkflowEvent::LlmTokenDelta {
248 node_id: event.node_id.clone().unwrap_or_default(),
249 token: event.delta.clone().unwrap_or_default(),
250 token_kind: TokenKind::Reasoning,
251 }),
252 "workflow_completed" => Some(WorkflowEvent::WorkflowCompleted {
253 output: Value::Null,
254 metadata: None,
255 }),
256 _ => None,
257 };
258 if let Some(ev) = mapped {
259 self.inner.emit(&ev);
260 }
261 }
262}
263
264fn yaml_output_to_workflow_output(
266 output: crate::yaml_runner::YamlWorkflowRunOutput,
267) -> WorkflowRunOutput {
268 WorkflowRunOutput {
269 workflow_id: output.workflow_id,
270 entry_node: output.entry_node,
271 trace: output.trace,
272 outputs: output.outputs,
273 terminal_node: output.terminal_node,
274 terminal_output: output.terminal_output,
275 metadata: None,
276 events: None,
277 }
278}