use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::agent::Agent;
use crate::error::{Error, Result};
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Process {
#[default]
Sequential,
Parallel,
Hierarchical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepResult {
pub agent: String,
pub output: String,
pub success: bool,
pub error: Option<String>,
}
impl StepResult {
pub fn success(agent: impl Into<String>, output: impl Into<String>) -> Self {
Self {
agent: agent.into(),
output: output.into(),
success: true,
error: None,
}
}
pub fn failure(agent: impl Into<String>, error: impl Into<String>) -> Self {
Self {
agent: agent.into(),
output: String::new(),
success: false,
error: Some(error.into()),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct WorkflowContext {
pub variables: std::collections::HashMap<String, String>,
pub results: Vec<StepResult>,
}
impl WorkflowContext {
pub fn new() -> Self {
Self::default()
}
pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) {
self.variables.insert(key.into(), value.into());
}
pub fn get(&self, key: &str) -> Option<&String> {
self.variables.get(key)
}
pub fn add_result(&mut self, result: StepResult) {
let var_name = format!("{}_output", result.agent);
self.variables.insert(var_name, result.output.clone());
self.results.push(result);
}
pub fn last_result(&self) -> Option<&StepResult> {
self.results.last()
}
}
pub struct AgentTeam {
agents: Vec<Arc<Agent>>,
process: Process,
verbose: bool,
}
impl AgentTeam {
pub fn is_verbose(&self) -> bool {
self.verbose
}
}
impl AgentTeam {
#[allow(clippy::new_ret_no_self)]
pub fn new() -> AgentTeamBuilder {
AgentTeamBuilder::new()
}
pub async fn start(&self, task: &str) -> Result<String> {
match self.process {
Process::Sequential => self.run_sequential(task).await,
Process::Parallel => self.run_parallel(task).await,
Process::Hierarchical => self.run_hierarchical(task).await,
}
}
pub async fn run(&self, task: &str) -> Result<String> {
self.start(task).await
}
async fn run_sequential(&self, task: &str) -> Result<String> {
let mut context = WorkflowContext::new();
context.set("task", task);
let mut current_input = task.to_string();
for agent in &self.agents {
let prompt = if context.results.is_empty() {
current_input.clone()
} else {
let prev_output = context
.last_result()
.map(|r| r.output.as_str())
.unwrap_or("");
format!("{}\n\nPrevious output:\n{}", current_input, prev_output)
};
match agent.chat(&prompt).await {
Ok(output) => {
context.add_result(StepResult::success(agent.name(), &output));
current_input = output;
}
Err(e) => {
context.add_result(StepResult::failure(agent.name(), e.to_string()));
return Err(Error::workflow(format!(
"Agent {} failed: {}",
agent.name(),
e
)));
}
}
}
context
.last_result()
.map(|r| r.output.clone())
.ok_or_else(|| Error::workflow("No results from workflow"))
}
async fn run_parallel(&self, task: &str) -> Result<String> {
use futures::future::join_all;
let futures: Vec<_> = self
.agents
.iter()
.map(|agent| {
let agent = Arc::clone(agent);
let task = task.to_string();
async move {
agent
.chat(&task)
.await
.map(|output| StepResult::success(agent.name(), output))
.unwrap_or_else(|e| StepResult::failure(agent.name(), e.to_string()))
}
})
.collect();
let results = join_all(futures).await;
let combined: Vec<String> = results
.iter()
.filter(|r| r.success)
.map(|r| format!("## {}\n{}", r.agent, r.output))
.collect();
if combined.is_empty() {
Err(Error::workflow("All agents failed"))
} else {
Ok(combined.join("\n\n"))
}
}
async fn run_hierarchical(&self, task: &str) -> Result<String> {
self.run_sequential(task).await
}
pub fn len(&self) -> usize {
self.agents.len()
}
pub fn is_empty(&self) -> bool {
self.agents.is_empty()
}
}
impl Default for AgentTeam {
fn default() -> Self {
Self {
agents: Vec::new(),
process: Process::Sequential,
verbose: false,
}
}
}
pub struct AgentTeamBuilder {
agents: Vec<Arc<Agent>>,
process: Process,
verbose: bool,
}
impl AgentTeamBuilder {
pub fn new() -> Self {
Self {
agents: Vec::new(),
process: Process::Sequential,
verbose: false,
}
}
pub fn agent(mut self, agent: Agent) -> Self {
self.agents.push(Arc::new(agent));
self
}
pub fn agent_arc(mut self, agent: Arc<Agent>) -> Self {
self.agents.push(agent);
self
}
pub fn process(mut self, process: Process) -> Self {
self.process = process;
self
}
pub fn verbose(mut self, enabled: bool) -> Self {
self.verbose = enabled;
self
}
pub fn build(self) -> AgentTeam {
AgentTeam {
agents: self.agents,
process: self.process,
verbose: self.verbose,
}
}
}
impl Default for AgentTeamBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct AgentFlow {
steps: Vec<FlowStep>,
}
pub enum FlowStep {
Agent(Arc<Agent>),
Route(Route),
Parallel(Parallel),
Loop(Loop),
Repeat(Repeat),
}
pub struct Route {
pub condition: Box<dyn Fn(&str) -> bool + Send + Sync>,
pub if_true: Arc<Agent>,
pub if_false: Option<Arc<Agent>>,
}
pub struct Parallel {
pub agents: Vec<Arc<Agent>>,
}
pub struct Loop {
pub agent: Arc<Agent>,
pub items: Vec<String>,
}
pub struct Repeat {
pub agent: Arc<Agent>,
pub times: usize,
}
impl AgentFlow {
pub fn new() -> Self {
Self { steps: Vec::new() }
}
pub fn step(mut self, step: FlowStep) -> Self {
self.steps.push(step);
self
}
pub fn agent(self, agent: Agent) -> Self {
self.step(FlowStep::Agent(Arc::new(agent)))
}
pub async fn run(&self, input: &str) -> Result<String> {
let mut current = input.to_string();
for step in &self.steps {
current = match step {
FlowStep::Agent(agent) => agent.chat(¤t).await?,
FlowStep::Route(route) => {
if (route.condition)(¤t) {
route.if_true.chat(¤t).await?
} else if let Some(agent) = &route.if_false {
agent.chat(¤t).await?
} else {
current
}
}
FlowStep::Parallel(parallel) => {
use futures::future::join_all;
let futures: Vec<_> =
parallel.agents.iter().map(|a| a.chat(¤t)).collect();
let results = join_all(futures).await;
let outputs: Vec<String> = results.into_iter().filter_map(|r| r.ok()).collect();
outputs.join("\n\n")
}
FlowStep::Loop(loop_step) => {
let mut outputs = Vec::new();
for item in &loop_step.items {
let prompt = format!("{}\n\nItem: {}", current, item);
outputs.push(loop_step.agent.chat(&prompt).await?);
}
outputs.join("\n\n")
}
FlowStep::Repeat(repeat) => {
let mut output = current.clone();
for _ in 0..repeat.times {
output = repeat.agent.chat(&output).await?;
}
output
}
};
}
Ok(current)
}
}
impl Default for AgentFlow {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workflow_context() {
let mut ctx = WorkflowContext::new();
ctx.set("key", "value");
assert_eq!(ctx.get("key"), Some(&"value".to_string()));
}
#[test]
fn test_step_result() {
let success = StepResult::success("agent1", "output");
assert!(success.success);
let failure = StepResult::failure("agent1", "error");
assert!(!failure.success);
}
#[test]
fn test_agent_team_builder() {
let team = AgentTeam::new()
.process(Process::Parallel)
.verbose(true)
.build();
assert!(team.is_empty());
}
}