Skip to main content

simple_agents_workflow/
client.rs

1//! [`WorkflowClient`] — wraps [`SimpleAgentsClient`] and adds workflow `run` and `stream`.
2//!
3//! Because `simple-agents-core` cannot depend on `simple-agents-workflow` (that would create a
4//! circular dependency), the workflow methods live here instead of on `SimpleAgentsClient`.
5//!
6//! # Usage
7//!
8//! ```no_run
9//! use simple_agents_workflow::{WorkflowClient, WorkflowRunOptions, DefaultEventPrinter};
10//! use simple_agents_core::SimpleAgentsClient;
11//! use simple_agent_type::prelude::*;
12//! use async_trait::async_trait;
13//! use std::sync::Arc;
14//!
15//! struct MyProvider;
16//!
17//! #[async_trait]
18//! impl Provider for MyProvider {
19//!     fn name(&self) -> &str { "mock" }
20//!     fn transform_request(&self, _req: &CompletionRequest) -> Result<ProviderRequest> {
21//!         Ok(ProviderRequest::new("http://example.com"))
22//!     }
23//!     async fn execute(&self, _req: ProviderRequest) -> Result<ProviderResponse> {
24//!         Ok(ProviderResponse::new(200, serde_json::json!({})))
25//!     }
26//!     fn transform_response(&self, _resp: ProviderResponse) -> Result<CompletionResponse> {
27//!         unimplemented!()
28//!     }
29//! }
30//!
31//! # async fn example() -> std::result::Result<(), Box<dyn std::error::Error>> {
32//! let client = WorkflowClient::from_client(
33//!     SimpleAgentsClient::new(Arc::new(MyProvider)),
34//! );
35//!
36//! let messages = vec![Message::user("Hello")];
37//!
38//! // Run a workflow
39//! let output = client.run("workflow.yaml", messages.clone(), WorkflowRunOptions::default()).await?;
40//!
41//! // Stream a workflow
42//! let output = client.stream("workflow.yaml", messages, &DefaultEventPrinter, WorkflowRunOptions::default()).await?;
43//! # Ok(())
44//! # }
45//! ```
46
47use serde_json::Value;
48use simple_agent_type::message::Message;
49use simple_agents_core::{CompletionOptions, CompletionOutcome, SimpleAgentsClient};
50
51use crate::yaml_runner::{
52    workflow_execution, RunMetadata, StepTiming, WorkflowEventSink, WorkflowRunOutput,
53    YamlWorkflowEventSink, YamlWorkflowExecutionFlags, YamlWorkflowExecutionRequest,
54    YamlWorkflowExecutorBinding, YamlWorkflowRunError, YamlWorkflowRunOptions, YamlWorkflowSource,
55};
56
57use simple_agent_type::prelude::SimpleAgentsError;
58
59/// Error type for workflow operations.
60#[derive(Debug)]
61pub enum WorkflowError {
62    /// Workflow-level run error (preserves the structured error variant).
63    Workflow(YamlWorkflowRunError),
64    /// Core LLM error.
65    Core(SimpleAgentsError),
66}
67
68impl std::fmt::Display for WorkflowError {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        match self {
71            WorkflowError::Workflow(err) => write!(f, "workflow error: {err}"),
72            WorkflowError::Core(err) => write!(f, "core error: {err}"),
73        }
74    }
75}
76
77impl std::error::Error for WorkflowError {}
78
79impl From<SimpleAgentsError> for WorkflowError {
80    fn from(e: SimpleAgentsError) -> Self {
81        WorkflowError::Core(e)
82    }
83}
84
85impl From<YamlWorkflowRunError> for WorkflowError {
86    fn from(e: YamlWorkflowRunError) -> Self {
87        WorkflowError::Workflow(e)
88    }
89}
90
91/// Execution options for a workflow run.
92///
93/// Tools are passed via `tool_executor` — an optional callback that handles tool calls
94/// declared in the workflow YAML. This mirrors the OpenAI SDK pattern where tools are
95/// just another optional parameter on the same call.
96#[derive(Clone, Default)]
97pub struct RunOptions {
98    /// Override the `YamlWorkflowRunOptions` (telemetry, trace, model override).
99    pub workflow_options: YamlWorkflowRunOptions,
100    /// Override execution flags (streaming, healing, etc.).
101    pub execution_flags: YamlWorkflowExecutionFlags,
102}
103
104/// A client that wraps [`SimpleAgentsClient`] and exposes workflow operations.
105///
106/// The workflow methods (`run`, `stream`) live here rather than on
107/// `SimpleAgentsClient` because `simple-agents-core` cannot depend on
108/// `simple-agents-workflow` without creating a circular crate dependency.
109pub struct WorkflowClient {
110    inner: SimpleAgentsClient,
111}
112
113impl WorkflowClient {
114    /// Create a `WorkflowClient` from an already-built `SimpleAgentsClient`.
115    pub fn from_client(client: SimpleAgentsClient) -> Self {
116        Self { inner: client }
117    }
118
119    /// Expose the underlying `SimpleAgentsClient` for direct LLM calls.
120    pub fn client(&self) -> &SimpleAgentsClient {
121        &self.inner
122    }
123
124    /// Run a YAML workflow file and return the full output.
125    ///
126    /// Tools are configured in the workflow YAML itself; runtime tool executors
127    /// (Python `custom_worker` / Node handler callbacks) are currently wired
128    /// through the binding layer.
129    pub async fn run(
130        &self,
131        workflow_path: &str,
132        messages: Vec<Message>,
133        options: RunOptions,
134    ) -> Result<WorkflowRunOutput, WorkflowError> {
135        let input = messages_to_value(messages);
136        let request = YamlWorkflowExecutionRequest {
137            source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
138            workflow_input: &input,
139            executor: YamlWorkflowExecutorBinding::Client(&self.inner),
140            custom_worker: None,
141            resume: None,
142            human_response: None,
143            options: &options.workflow_options,
144            flags: options.execution_flags,
145        };
146
147        let output = workflow_execution::run(request).await?;
148
149        // Map YamlWorkflowRunOutput to the clean WorkflowRunOutput
150        Ok(yaml_output_to_workflow_output(output))
151    }
152
153    /// Stream a YAML workflow file, emitting events to `sink`.
154    ///
155    /// Pass `&DefaultEventPrinter` to get tokens printed to stdout by default.
156    pub async fn stream(
157        &self,
158        workflow_path: &str,
159        messages: Vec<Message>,
160        sink: &dyn WorkflowEventSink,
161        options: RunOptions,
162    ) -> Result<WorkflowRunOutput, WorkflowError> {
163        let input = messages_to_value(messages);
164        let mut flags = options.execution_flags;
165        flags.workflow_streaming = true;
166
167        let request = YamlWorkflowExecutionRequest {
168            source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
169            workflow_input: &input,
170            executor: YamlWorkflowExecutorBinding::Client(&self.inner),
171            custom_worker: None,
172            resume: None,
173            human_response: None,
174            options: &options.workflow_options,
175            flags,
176        };
177
178        // Bridge the typed WorkflowEventSink to YamlWorkflowEventSink
179        let bridge = EventSinkBridge { inner: sink };
180        let output = workflow_execution::stream(request, &bridge).await?;
181
182        Ok(yaml_output_to_workflow_output(output))
183    }
184
185    /// Direct LLM completion (delegates to the inner client).
186    pub async fn complete(
187        &self,
188        request: &simple_agent_type::request::CompletionRequest,
189        options: CompletionOptions,
190    ) -> Result<CompletionOutcome, SimpleAgentsError> {
191        self.inner.complete(request, options).await
192    }
193}
194
195// ---------------------------------------------------------------------------
196// Internal helpers
197// ---------------------------------------------------------------------------
198
199/// Convert a list of messages into the `{"messages": [...]}` Value shape the
200/// YAML executor expects as workflow input.
201fn messages_to_value(messages: Vec<Message>) -> Value {
202    let msgs: Vec<Value> = messages
203        .into_iter()
204        .map(|m| match serde_json::to_value(&m) {
205            Ok(v) => v,
206            Err(err) => {
207                eprintln!("[simple-agents-workflow] WARN: failed to serialize message: {err}");
208                Value::Null
209            }
210        })
211        .collect();
212    serde_json::json!({ "messages": msgs })
213}
214
215/// Bridge from the new typed [`WorkflowEventSink`] to the old
216/// [`YamlWorkflowEventSink`] that the executor uses internally.
217struct EventSinkBridge<'a> {
218    inner: &'a dyn WorkflowEventSink,
219}
220
221impl YamlWorkflowEventSink for EventSinkBridge<'_> {
222    fn emit(&self, event: &crate::yaml_runner::YamlWorkflowEvent) {
223        // Map YamlWorkflowEvent → WorkflowEvent
224        use crate::yaml_runner::{TokenKind, WorkflowEvent};
225        let mapped = match event.event_type.as_str() {
226            "workflow_started" => Some(WorkflowEvent::WorkflowStarted {
227                workflow_id: event.node_id.clone().unwrap_or_default(),
228            }),
229            "node_started" => Some(WorkflowEvent::NodeStarted {
230                node_id: event.node_id.clone().unwrap_or_default(),
231                node_type: node_type_from_kind(event.node_kind.as_deref()),
232            }),
233            "node_completed" => Some(WorkflowEvent::NodeCompleted {
234                node_id: event.node_id.clone().unwrap_or_default(),
235                output: event
236                    .snapshot
237                    .clone()
238                    .or_else(|| event.message.clone().map(Value::String))
239                    .unwrap_or(Value::Null),
240            }),
241            "node_tool_call_requested" => {
242                tool_name_from_event(event).map(|tool_name| WorkflowEvent::ToolCallRequested {
243                    node_id: event.node_id.clone().unwrap_or_default(),
244                    tool_name,
245                    arguments: event
246                        .metadata
247                        .as_ref()
248                        .and_then(|metadata| metadata.get("arguments"))
249                        .cloned()
250                        .unwrap_or(Value::Null),
251                })
252            }
253            "node_tool_call_completed" => {
254                tool_name_from_event(event).map(|tool_name| WorkflowEvent::ToolCallCompleted {
255                    node_id: event.node_id.clone().unwrap_or_default(),
256                    tool_name,
257                    output: event
258                        .metadata
259                        .as_ref()
260                        .and_then(|metadata| metadata.get("output"))
261                        .cloned()
262                        .unwrap_or(Value::Null),
263                })
264            }
265            "human_input_requested" => Some(WorkflowEvent::HumanInputRequested {
266                node_id: event.node_id.clone().unwrap_or_default(),
267                request: event.metadata.clone().unwrap_or(Value::Null),
268            }),
269            "human_input_received" => Some(WorkflowEvent::HumanInputReceived {
270                node_id: event.node_id.clone().unwrap_or_default(),
271                response: event.metadata.clone().unwrap_or(Value::Null),
272            }),
273            "node_tool_call_failed" => Some(WorkflowEvent::NodeFailed {
274                node_id: event.node_id.clone().unwrap_or_default(),
275                error: event
276                    .message
277                    .clone()
278                    .unwrap_or_else(|| "tool call failed without an error message".to_string()),
279            }),
280            "node_stream_delta" | "node_stream_output_delta" => {
281                Some(WorkflowEvent::LlmTokenDelta {
282                    node_id: event.node_id.clone().unwrap_or_default(),
283                    token: event.delta.clone().unwrap_or_default(),
284                    token_kind: TokenKind::Output,
285                })
286            }
287            "node_stream_thinking_delta" => Some(WorkflowEvent::LlmTokenDelta {
288                node_id: event.node_id.clone().unwrap_or_default(),
289                token: event.delta.clone().unwrap_or_default(),
290                token_kind: TokenKind::Reasoning,
291            }),
292            "workflow_completed" => Some(WorkflowEvent::WorkflowCompleted {
293                output: event.snapshot.clone().unwrap_or(Value::Null),
294                metadata: event.metadata.clone(),
295            }),
296            _ => None,
297        };
298        if let Some(ev) = mapped {
299            self.inner.emit(&ev);
300        }
301    }
302}
303
304fn node_type_from_kind(kind: Option<&str>) -> crate::yaml_runner::NodeType {
305    use crate::yaml_runner::NodeType;
306
307    match kind {
308        Some("llm_call") => NodeType::LlmCall,
309        Some("switch") => NodeType::Switch,
310        Some("custom_worker") => NodeType::CustomWorker,
311        Some("human_input") => NodeType::HumanInput,
312        Some("end") => NodeType::End,
313        _ => NodeType::Unknown,
314    }
315}
316
317fn tool_name_from_event(event: &crate::yaml_runner::YamlWorkflowEvent) -> Option<String> {
318    event
319        .metadata
320        .as_ref()
321        .and_then(|metadata| metadata.get("tool_name"))
322        .and_then(Value::as_str)
323        .map(str::to_string)
324}
325
326/// Map the internal `YamlWorkflowRunOutput` to the cleaner public `WorkflowRunOutput`.
327fn yaml_output_to_workflow_output(
328    output: crate::yaml_runner::YamlWorkflowRunOutput,
329) -> WorkflowRunOutput {
330    let metadata = Some(RunMetadata {
331        total_elapsed_ms: output.total_elapsed_ms,
332        ttft_ms: output.ttft_ms,
333        total_input_tokens: output.total_input_tokens,
334        total_output_tokens: output.total_output_tokens,
335        total_tokens: output.total_tokens,
336        total_reasoning_tokens: output.total_reasoning_tokens,
337        tokens_per_second: output.tokens_per_second,
338        step_details: output
339            .step_timings
340            .iter()
341            .map(|step| StepTiming {
342                node_id: step.node_id.clone(),
343                node_type: step.node_kind.clone(),
344                model: step.model_name.clone(),
345                elapsed_ms: step.elapsed_ms,
346                input_tokens: step.prompt_tokens.map(u64::from),
347                output_tokens: step.completion_tokens.map(u64::from),
348                total_tokens: step.total_tokens.map(u64::from),
349                reasoning_tokens: step.reasoning_tokens.map(u64::from),
350                ttft_ms: None,
351            })
352            .collect(),
353        trace_id: output.trace_id.clone(),
354    });
355
356    WorkflowRunOutput {
357        workflow_id: output.workflow_id,
358        entry_node: output.entry_node,
359        trace: output.trace,
360        outputs: output.outputs,
361        globals: output.globals,
362        terminal_node: output.terminal_node,
363        terminal_output: output.terminal_output,
364        status: match output.status {
365            crate::yaml_runner::YamlWorkflowRunStatus::Completed => "completed".to_string(),
366            crate::yaml_runner::YamlWorkflowRunStatus::AwaitingHumanInput => {
367                "awaiting_human_input".to_string()
368            }
369        },
370        human_request: output
371            .human_request
372            .as_ref()
373            .and_then(|request| serde_json::to_value(request).ok()),
374        metadata,
375        events: None,
376    }
377}