use serde_json::Value;
use simple_agent_type::message::Message;
use simple_agents_core::{CompletionOptions, CompletionOutcome, SimpleAgentsClient};
use crate::yaml_runner::{
workflow_execution, WorkflowCheckpoint, WorkflowEventSink, WorkflowRunOutput,
YamlWorkflowEventSink, YamlWorkflowExecutionFlags, YamlWorkflowExecutionRequest,
YamlWorkflowExecutorBinding, YamlWorkflowRunOptions, YamlWorkflowSource,
};
use simple_agent_type::prelude::SimpleAgentsError;
#[derive(Debug)]
pub enum WorkflowError {
Workflow(String),
Core(SimpleAgentsError),
}
impl std::fmt::Display for WorkflowError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkflowError::Workflow(msg) => write!(f, "workflow error: {msg}"),
WorkflowError::Core(err) => write!(f, "core error: {err}"),
}
}
}
impl std::error::Error for WorkflowError {}
impl From<SimpleAgentsError> for WorkflowError {
fn from(e: SimpleAgentsError) -> Self {
WorkflowError::Core(e)
}
}
#[derive(Clone, Default)]
pub struct RunOptions {
pub workflow_options: YamlWorkflowRunOptions,
pub execution_flags: YamlWorkflowExecutionFlags,
}
pub struct WorkflowClient {
inner: SimpleAgentsClient,
}
impl WorkflowClient {
pub fn from_client(client: SimpleAgentsClient) -> Self {
Self { inner: client }
}
pub fn client(&self) -> &SimpleAgentsClient {
&self.inner
}
pub async fn run(
&self,
workflow_path: &str,
messages: Vec<Message>,
options: RunOptions,
) -> Result<WorkflowRunOutput, WorkflowError> {
let input = messages_to_value(messages);
let request = YamlWorkflowExecutionRequest {
source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
workflow_input: &input,
executor: YamlWorkflowExecutorBinding::Client(&self.inner),
custom_worker: None,
options: &options.workflow_options,
flags: options.execution_flags,
};
let output = workflow_execution::run(request)
.await
.map_err(|e| WorkflowError::Workflow(e.to_string()))?;
Ok(yaml_output_to_workflow_output(output))
}
pub async fn stream(
&self,
workflow_path: &str,
messages: Vec<Message>,
sink: &dyn WorkflowEventSink,
options: RunOptions,
) -> Result<WorkflowRunOutput, WorkflowError> {
let input = messages_to_value(messages);
let mut flags = options.execution_flags;
flags.workflow_streaming = true;
let request = YamlWorkflowExecutionRequest {
source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
workflow_input: &input,
executor: YamlWorkflowExecutorBinding::Client(&self.inner),
custom_worker: None,
options: &options.workflow_options,
flags,
};
let bridge = EventSinkBridge { inner: sink };
let output = workflow_execution::stream(request, &bridge)
.await
.map_err(|e| WorkflowError::Workflow(e.to_string()))?;
Ok(yaml_output_to_workflow_output(output))
}
pub async fn resume(
&self,
checkpoint: WorkflowCheckpoint,
options: RunOptions,
) -> Result<WorkflowRunOutput, WorkflowError> {
let messages = checkpoint.original_messages.clone();
self.run(&checkpoint.workflow_path, messages, options).await
}
pub async fn complete(
&self,
request: &simple_agent_type::request::CompletionRequest,
options: CompletionOptions,
) -> Result<CompletionOutcome, SimpleAgentsError> {
self.inner.complete(request, options).await
}
}
fn messages_to_value(messages: Vec<Message>) -> Value {
let msgs: Vec<Value> = messages
.into_iter()
.map(|m| serde_json::to_value(&m).unwrap_or(Value::Null))
.collect();
serde_json::json!({ "messages": msgs })
}
struct EventSinkBridge<'a> {
inner: &'a dyn WorkflowEventSink,
}
impl YamlWorkflowEventSink for EventSinkBridge<'_> {
fn emit(&self, event: &crate::yaml_runner::YamlWorkflowEvent) {
use crate::yaml_runner::{NodeType, TokenKind, WorkflowEvent};
let mapped = match event.event_type.as_str() {
"workflow_started" => Some(WorkflowEvent::WorkflowStarted {
workflow_id: event.node_id.clone().unwrap_or_default(),
}),
"node_started" => Some(WorkflowEvent::NodeStarted {
node_id: event.node_id.clone().unwrap_or_default(),
node_type: NodeType::LlmCall,
}),
"node_completed" => Some(WorkflowEvent::NodeCompleted {
node_id: event.node_id.clone().unwrap_or_default(),
output: event
.message
.clone()
.map(Value::String)
.unwrap_or(Value::Null),
}),
"node_stream_delta" | "node_stream_output_delta" => {
Some(WorkflowEvent::LlmTokenDelta {
node_id: event.node_id.clone().unwrap_or_default(),
token: event.delta.clone().unwrap_or_default(),
token_kind: TokenKind::Output,
})
}
"node_stream_thinking_delta" => Some(WorkflowEvent::LlmTokenDelta {
node_id: event.node_id.clone().unwrap_or_default(),
token: event.delta.clone().unwrap_or_default(),
token_kind: TokenKind::Reasoning,
}),
"workflow_completed" => Some(WorkflowEvent::WorkflowCompleted {
output: Value::Null,
metadata: None,
}),
_ => None,
};
if let Some(ev) = mapped {
self.inner.emit(&ev);
}
}
}
fn yaml_output_to_workflow_output(
output: crate::yaml_runner::YamlWorkflowRunOutput,
) -> WorkflowRunOutput {
WorkflowRunOutput {
workflow_id: output.workflow_id,
entry_node: output.entry_node,
trace: output.trace,
outputs: output.outputs,
terminal_node: output.terminal_node,
terminal_output: output.terminal_output,
metadata: None,
events: None,
}
}