Skip to main content

simple_agents_workflow/
client.rs

1//! [`WorkflowClient`] — wraps [`SimpleAgentsClient`] and adds workflow `run`, `stream`, `resume`.
2//!
3//! Because `simple-agents-core` cannot depend on `simple-agents-workflow` (that would create a
4//! circular dependency), the workflow methods live here instead of on `SimpleAgentsClient`.
5//!
6//! # Usage
7//!
8//! ```no_run
9//! use simple_agents_workflow::{WorkflowClient, WorkflowRunOptions, DefaultEventPrinter};
10//! use simple_agents_core::SimpleAgentsClient;
11//! use simple_agent_type::prelude::*;
12//! use async_trait::async_trait;
13//! use std::sync::Arc;
14//!
15//! struct MyProvider;
16//!
17//! #[async_trait]
18//! impl Provider for MyProvider {
19//!     fn name(&self) -> &str { "mock" }
20//!     fn transform_request(&self, _req: &CompletionRequest) -> Result<ProviderRequest> {
21//!         Ok(ProviderRequest::new("http://example.com"))
22//!     }
23//!     async fn execute(&self, _req: ProviderRequest) -> Result<ProviderResponse> {
24//!         Ok(ProviderResponse::new(200, serde_json::json!({})))
25//!     }
26//!     fn transform_response(&self, _resp: ProviderResponse) -> Result<CompletionResponse> {
27//!         unimplemented!()
28//!     }
29//! }
30//!
31//! # async fn example() -> std::result::Result<(), Box<dyn std::error::Error>> {
32//! let client = WorkflowClient::from_client(
33//!     SimpleAgentsClient::new(Arc::new(MyProvider)),
34//! );
35//!
36//! let messages = vec![Message::user("Hello")];
37//!
38//! // Run a workflow
39//! let output = client.run("workflow.yaml", messages.clone(), WorkflowRunOptions::default()).await?;
40//!
41//! // Stream a workflow
42//! let output = client.stream("workflow.yaml", messages, &DefaultEventPrinter, WorkflowRunOptions::default()).await?;
43//! # Ok(())
44//! # }
45//! ```
46
47use serde_json::Value;
48use simple_agent_type::message::Message;
49use simple_agents_core::{CompletionOptions, CompletionOutcome, SimpleAgentsClient};
50
51use crate::yaml_runner::{
52    workflow_execution, WorkflowCheckpoint, WorkflowEventSink, WorkflowRunOutput,
53    YamlWorkflowEventSink, YamlWorkflowExecutionFlags, YamlWorkflowExecutionRequest,
54    YamlWorkflowExecutorBinding, YamlWorkflowRunOptions, YamlWorkflowSource,
55};
56
57use simple_agent_type::prelude::SimpleAgentsError;
58
59/// Error type for workflow operations.
60#[derive(Debug)]
61pub enum WorkflowError {
62    /// Workflow-level run error.
63    Workflow(String),
64    /// Core LLM error.
65    Core(SimpleAgentsError),
66}
67
68impl std::fmt::Display for WorkflowError {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        match self {
71            WorkflowError::Workflow(msg) => write!(f, "workflow error: {msg}"),
72            WorkflowError::Core(err) => write!(f, "core error: {err}"),
73        }
74    }
75}
76
77impl std::error::Error for WorkflowError {}
78
79impl From<SimpleAgentsError> for WorkflowError {
80    fn from(e: SimpleAgentsError) -> Self {
81        WorkflowError::Core(e)
82    }
83}
84
85/// Execution options for a workflow run.
86///
87/// Tools are passed via `tool_executor` — an optional callback that handles tool calls
88/// declared in the workflow YAML. This mirrors the OpenAI SDK pattern where tools are
89/// just another optional parameter on the same call.
90#[derive(Clone, Default)]
91pub struct RunOptions {
92    /// Override the `YamlWorkflowRunOptions` (telemetry, trace, model override).
93    pub workflow_options: YamlWorkflowRunOptions,
94    /// Override execution flags (streaming, healing, etc.).
95    pub execution_flags: YamlWorkflowExecutionFlags,
96}
97
98/// A client that wraps [`SimpleAgentsClient`] and exposes workflow operations.
99///
100/// The workflow methods (`run`, `stream`, `resume`) live here rather than on
101/// `SimpleAgentsClient` because `simple-agents-core` cannot depend on
102/// `simple-agents-workflow` without creating a circular crate dependency.
103pub struct WorkflowClient {
104    inner: SimpleAgentsClient,
105}
106
107impl WorkflowClient {
108    /// Create a `WorkflowClient` from an already-built `SimpleAgentsClient`.
109    pub fn from_client(client: SimpleAgentsClient) -> Self {
110        Self { inner: client }
111    }
112
113    /// Expose the underlying `SimpleAgentsClient` for direct LLM calls.
114    pub fn client(&self) -> &SimpleAgentsClient {
115        &self.inner
116    }
117
118    /// Run a YAML workflow file and return the full output.
119    ///
120    /// Tools are configured in the workflow YAML itself; runtime tool executors
121    /// (Python `custom_worker` / Node handler callbacks) are currently wired
122    /// through the binding layer.
123    pub async fn run(
124        &self,
125        workflow_path: &str,
126        messages: Vec<Message>,
127        options: RunOptions,
128    ) -> Result<WorkflowRunOutput, WorkflowError> {
129        let input = messages_to_value(messages);
130        let request = YamlWorkflowExecutionRequest {
131            source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
132            workflow_input: &input,
133            executor: YamlWorkflowExecutorBinding::Client(&self.inner),
134            custom_worker: None,
135            options: &options.workflow_options,
136            flags: options.execution_flags,
137        };
138
139        let output = workflow_execution::run(request)
140            .await
141            .map_err(|e| WorkflowError::Workflow(e.to_string()))?;
142
143        // Map YamlWorkflowRunOutput to the clean WorkflowRunOutput
144        Ok(yaml_output_to_workflow_output(output))
145    }
146
147    /// Stream a YAML workflow file, emitting events to `sink`.
148    ///
149    /// Pass `&DefaultEventPrinter` to get tokens printed to stdout by default.
150    pub async fn stream(
151        &self,
152        workflow_path: &str,
153        messages: Vec<Message>,
154        sink: &dyn WorkflowEventSink,
155        options: RunOptions,
156    ) -> Result<WorkflowRunOutput, WorkflowError> {
157        let input = messages_to_value(messages);
158        let mut flags = options.execution_flags;
159        flags.workflow_streaming = true;
160
161        let request = YamlWorkflowExecutionRequest {
162            source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
163            workflow_input: &input,
164            executor: YamlWorkflowExecutorBinding::Client(&self.inner),
165            custom_worker: None,
166            options: &options.workflow_options,
167            flags,
168        };
169
170        // Bridge the typed WorkflowEventSink to YamlWorkflowEventSink
171        let bridge = EventSinkBridge { inner: sink };
172        let output = workflow_execution::stream(request, &bridge)
173            .await
174            .map_err(|e| WorkflowError::Workflow(e.to_string()))?;
175
176        Ok(yaml_output_to_workflow_output(output))
177    }
178
179    /// Resume a workflow from a saved checkpoint.
180    pub async fn resume(
181        &self,
182        checkpoint: WorkflowCheckpoint,
183        options: RunOptions,
184    ) -> Result<WorkflowRunOutput, WorkflowError> {
185        // Reconstruct a messages-based input from the checkpoint
186        let messages = checkpoint.original_messages.clone();
187        self.run(&checkpoint.workflow_path, messages, options).await
188    }
189
190    /// Direct LLM completion (delegates to the inner client).
191    pub async fn complete(
192        &self,
193        request: &simple_agent_type::request::CompletionRequest,
194        options: CompletionOptions,
195    ) -> Result<CompletionOutcome, SimpleAgentsError> {
196        self.inner.complete(request, options).await
197    }
198}
199
200// ---------------------------------------------------------------------------
201// Internal helpers
202// ---------------------------------------------------------------------------
203
204/// Convert a list of messages into the `{"messages": [...]}` Value shape the
205/// YAML executor expects as workflow input.
206fn messages_to_value(messages: Vec<Message>) -> Value {
207    let msgs: Vec<Value> = messages
208        .into_iter()
209        .map(|m| serde_json::to_value(&m).unwrap_or(Value::Null))
210        .collect();
211    serde_json::json!({ "messages": msgs })
212}
213
214/// Bridge from the new typed [`WorkflowEventSink`] to the old
215/// [`YamlWorkflowEventSink`] that the executor uses internally.
216struct EventSinkBridge<'a> {
217    inner: &'a dyn WorkflowEventSink,
218}
219
220impl YamlWorkflowEventSink for EventSinkBridge<'_> {
221    fn emit(&self, event: &crate::yaml_runner::YamlWorkflowEvent) {
222        // Map YamlWorkflowEvent → WorkflowEvent
223        use crate::yaml_runner::{NodeType, TokenKind, WorkflowEvent};
224        let mapped = match event.event_type.as_str() {
225            "workflow_started" => Some(WorkflowEvent::WorkflowStarted {
226                workflow_id: event.node_id.clone().unwrap_or_default(),
227            }),
228            "node_started" => Some(WorkflowEvent::NodeStarted {
229                node_id: event.node_id.clone().unwrap_or_default(),
230                node_type: NodeType::LlmCall,
231            }),
232            "node_completed" => Some(WorkflowEvent::NodeCompleted {
233                node_id: event.node_id.clone().unwrap_or_default(),
234                output: event
235                    .message
236                    .clone()
237                    .map(Value::String)
238                    .unwrap_or(Value::Null),
239            }),
240            "node_stream_delta" | "node_stream_output_delta" => {
241                Some(WorkflowEvent::LlmTokenDelta {
242                    node_id: event.node_id.clone().unwrap_or_default(),
243                    token: event.delta.clone().unwrap_or_default(),
244                    token_kind: TokenKind::Output,
245                })
246            }
247            "node_stream_thinking_delta" => Some(WorkflowEvent::LlmTokenDelta {
248                node_id: event.node_id.clone().unwrap_or_default(),
249                token: event.delta.clone().unwrap_or_default(),
250                token_kind: TokenKind::Reasoning,
251            }),
252            "workflow_completed" => Some(WorkflowEvent::WorkflowCompleted {
253                output: Value::Null,
254                metadata: None,
255            }),
256            _ => None,
257        };
258        if let Some(ev) = mapped {
259            self.inner.emit(&ev);
260        }
261    }
262}
263
264/// Map the internal `YamlWorkflowRunOutput` to the cleaner public `WorkflowRunOutput`.
265fn yaml_output_to_workflow_output(
266    output: crate::yaml_runner::YamlWorkflowRunOutput,
267) -> WorkflowRunOutput {
268    WorkflowRunOutput {
269        workflow_id: output.workflow_id,
270        entry_node: output.entry_node,
271        trace: output.trace,
272        outputs: output.outputs,
273        terminal_node: output.terminal_node,
274        terminal_output: output.terminal_output,
275        metadata: None,
276        events: None,
277    }
278}