use serde_json::Value;
use simple_agent_type::message::Message;
use simple_agents_core::{CompletionOptions, CompletionOutcome, SimpleAgentsClient};
use crate::yaml_runner::{
workflow_execution, RunMetadata, StepTiming, WorkflowEventSink, WorkflowRunOutput,
YamlWorkflowEventSink, YamlWorkflowExecutionFlags, YamlWorkflowExecutionRequest,
YamlWorkflowExecutorBinding, YamlWorkflowRunError, YamlWorkflowRunOptions, YamlWorkflowSource,
};
use simple_agent_type::prelude::SimpleAgentsError;
#[derive(Debug)]
pub enum WorkflowError {
Workflow(YamlWorkflowRunError),
Core(SimpleAgentsError),
}
impl std::fmt::Display for WorkflowError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkflowError::Workflow(err) => write!(f, "workflow error: {err}"),
WorkflowError::Core(err) => write!(f, "core error: {err}"),
}
}
}
impl std::error::Error for WorkflowError {}
impl From<SimpleAgentsError> for WorkflowError {
fn from(e: SimpleAgentsError) -> Self {
WorkflowError::Core(e)
}
}
impl From<YamlWorkflowRunError> for WorkflowError {
fn from(e: YamlWorkflowRunError) -> Self {
WorkflowError::Workflow(e)
}
}
#[derive(Clone, Default)]
pub struct RunOptions {
pub workflow_options: YamlWorkflowRunOptions,
pub execution_flags: YamlWorkflowExecutionFlags,
}
pub struct WorkflowClient {
inner: SimpleAgentsClient,
}
impl WorkflowClient {
pub fn from_client(client: SimpleAgentsClient) -> Self {
Self { inner: client }
}
pub fn client(&self) -> &SimpleAgentsClient {
&self.inner
}
pub async fn run(
&self,
workflow_path: &str,
messages: Vec<Message>,
options: RunOptions,
) -> Result<WorkflowRunOutput, WorkflowError> {
let input = messages_to_value(messages);
let request = YamlWorkflowExecutionRequest {
source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
workflow_input: &input,
executor: YamlWorkflowExecutorBinding::Client(&self.inner),
custom_worker: None,
resume: None,
human_response: None,
options: &options.workflow_options,
flags: options.execution_flags,
};
let output = workflow_execution::run(request).await?;
Ok(yaml_output_to_workflow_output(output))
}
pub async fn stream(
&self,
workflow_path: &str,
messages: Vec<Message>,
sink: &dyn WorkflowEventSink,
options: RunOptions,
) -> Result<WorkflowRunOutput, WorkflowError> {
let input = messages_to_value(messages);
let mut flags = options.execution_flags;
flags.workflow_streaming = true;
let request = YamlWorkflowExecutionRequest {
source: YamlWorkflowSource::File(std::path::Path::new(workflow_path)),
workflow_input: &input,
executor: YamlWorkflowExecutorBinding::Client(&self.inner),
custom_worker: None,
resume: None,
human_response: None,
options: &options.workflow_options,
flags,
};
let bridge = EventSinkBridge { inner: sink };
let output = workflow_execution::stream(request, &bridge).await?;
Ok(yaml_output_to_workflow_output(output))
}
pub async fn complete(
&self,
request: &simple_agent_type::request::CompletionRequest,
options: CompletionOptions,
) -> Result<CompletionOutcome, SimpleAgentsError> {
self.inner.complete(request, options).await
}
}
fn messages_to_value(messages: Vec<Message>) -> Value {
let msgs: Vec<Value> = messages
.into_iter()
.map(|m| match serde_json::to_value(&m) {
Ok(v) => v,
Err(err) => {
eprintln!("[simple-agents-workflow] WARN: failed to serialize message: {err}");
Value::Null
}
})
.collect();
serde_json::json!({ "messages": msgs })
}
struct EventSinkBridge<'a> {
inner: &'a dyn WorkflowEventSink,
}
impl YamlWorkflowEventSink for EventSinkBridge<'_> {
fn emit(&self, event: &crate::yaml_runner::YamlWorkflowEvent) {
use crate::yaml_runner::{TokenKind, WorkflowEvent};
let mapped = match event.event_type.as_str() {
"workflow_started" => Some(WorkflowEvent::WorkflowStarted {
workflow_id: event.node_id.clone().unwrap_or_default(),
}),
"node_started" => Some(WorkflowEvent::NodeStarted {
node_id: event.node_id.clone().unwrap_or_default(),
node_type: node_type_from_kind(event.node_kind.as_deref()),
}),
"node_completed" => Some(WorkflowEvent::NodeCompleted {
node_id: event.node_id.clone().unwrap_or_default(),
output: event
.snapshot
.clone()
.or_else(|| event.message.clone().map(Value::String))
.unwrap_or(Value::Null),
}),
"node_tool_call_requested" => {
tool_name_from_event(event).map(|tool_name| WorkflowEvent::ToolCallRequested {
node_id: event.node_id.clone().unwrap_or_default(),
tool_name,
arguments: event
.metadata
.as_ref()
.and_then(|metadata| metadata.get("arguments"))
.cloned()
.unwrap_or(Value::Null),
})
}
"node_tool_call_completed" => {
tool_name_from_event(event).map(|tool_name| WorkflowEvent::ToolCallCompleted {
node_id: event.node_id.clone().unwrap_or_default(),
tool_name,
output: event
.metadata
.as_ref()
.and_then(|metadata| metadata.get("output"))
.cloned()
.unwrap_or(Value::Null),
})
}
"human_input_requested" => Some(WorkflowEvent::HumanInputRequested {
node_id: event.node_id.clone().unwrap_or_default(),
request: event.metadata.clone().unwrap_or(Value::Null),
}),
"human_input_received" => Some(WorkflowEvent::HumanInputReceived {
node_id: event.node_id.clone().unwrap_or_default(),
response: event.metadata.clone().unwrap_or(Value::Null),
}),
"node_tool_call_failed" => Some(WorkflowEvent::NodeFailed {
node_id: event.node_id.clone().unwrap_or_default(),
error: event
.message
.clone()
.unwrap_or_else(|| "tool call failed without an error message".to_string()),
}),
"node_stream_delta" | "node_stream_output_delta" => {
Some(WorkflowEvent::LlmTokenDelta {
node_id: event.node_id.clone().unwrap_or_default(),
token: event.delta.clone().unwrap_or_default(),
token_kind: TokenKind::Output,
})
}
"node_stream_thinking_delta" => Some(WorkflowEvent::LlmTokenDelta {
node_id: event.node_id.clone().unwrap_or_default(),
token: event.delta.clone().unwrap_or_default(),
token_kind: TokenKind::Reasoning,
}),
"workflow_completed" => Some(WorkflowEvent::WorkflowCompleted {
output: event.snapshot.clone().unwrap_or(Value::Null),
metadata: event.metadata.clone(),
}),
_ => None,
};
if let Some(ev) = mapped {
self.inner.emit(&ev);
}
}
}
fn node_type_from_kind(kind: Option<&str>) -> crate::yaml_runner::NodeType {
use crate::yaml_runner::NodeType;
match kind {
Some("llm_call") => NodeType::LlmCall,
Some("switch") => NodeType::Switch,
Some("custom_worker") => NodeType::CustomWorker,
Some("human_input") => NodeType::HumanInput,
Some("end") => NodeType::End,
_ => NodeType::Unknown,
}
}
fn tool_name_from_event(event: &crate::yaml_runner::YamlWorkflowEvent) -> Option<String> {
event
.metadata
.as_ref()
.and_then(|metadata| metadata.get("tool_name"))
.and_then(Value::as_str)
.map(str::to_string)
}
fn yaml_output_to_workflow_output(
output: crate::yaml_runner::YamlWorkflowRunOutput,
) -> WorkflowRunOutput {
let metadata = Some(RunMetadata {
total_elapsed_ms: output.total_elapsed_ms,
ttft_ms: output.ttft_ms,
total_input_tokens: output.total_input_tokens,
total_output_tokens: output.total_output_tokens,
total_tokens: output.total_tokens,
total_reasoning_tokens: output.total_reasoning_tokens,
tokens_per_second: output.tokens_per_second,
step_details: output
.step_timings
.iter()
.map(|step| StepTiming {
node_id: step.node_id.clone(),
node_type: step.node_kind.clone(),
model: step.model_name.clone(),
elapsed_ms: step.elapsed_ms,
input_tokens: step.prompt_tokens.map(u64::from),
output_tokens: step.completion_tokens.map(u64::from),
total_tokens: step.total_tokens.map(u64::from),
reasoning_tokens: step.reasoning_tokens.map(u64::from),
ttft_ms: None,
})
.collect(),
trace_id: output.trace_id.clone(),
});
WorkflowRunOutput {
workflow_id: output.workflow_id,
entry_node: output.entry_node,
trace: output.trace,
outputs: output.outputs,
globals: output.globals,
terminal_node: output.terminal_node,
terminal_output: output.terminal_output,
status: match output.status {
crate::yaml_runner::YamlWorkflowRunStatus::Completed => "completed".to_string(),
crate::yaml_runner::YamlWorkflowRunStatus::AwaitingHumanInput => {
"awaiting_human_input".to_string()
}
},
human_request: output
.human_request
.as_ref()
.and_then(|request| serde_json::to_value(request).ok()),
metadata,
events: None,
}
}