use super::{SharedAgent, StepOutput, Workflow, WorkflowOutput, shared_agent};
use echo_core::agent::Agent;
use echo_core::error::Result;
use futures::future::BoxFuture;
use std::time::Instant;
use tracing::{debug, info};
type MergeFn = Box<dyn Fn(Vec<String>) -> String + Send + Sync>;
fn default_merge(results: Vec<String>) -> String {
results.join("\n---\n")
}
pub struct ConcurrentWorkflow {
agents: Vec<SharedAgent>,
merge: MergeFn,
}
impl ConcurrentWorkflow {
pub fn builder() -> ConcurrentWorkflowBuilder {
ConcurrentWorkflowBuilder {
agents: Vec::new(),
merge: None,
}
}
}
impl Workflow for ConcurrentWorkflow {
fn run<'a>(&'a mut self, input: &'a str) -> BoxFuture<'a, Result<WorkflowOutput>> {
Box::pin(async move {
let total_start = Instant::now();
let agent_count = self.agents.len();
info!(
workflow = "concurrent",
agents = agent_count,
"⚡ Executing {} agents concurrently",
agent_count
);
let mut handles = Vec::with_capacity(agent_count);
for agent_handle in &self.agents {
let agent_handle = agent_handle.clone();
let input = input.to_string();
handles.push(tokio::spawn(async move {
let step_start = Instant::now();
let agent = agent_handle.lock().await;
let agent_name = agent.name().to_string();
debug!(workflow = "concurrent", agent = %agent_name, "▶ Starting execution");
let result = agent.execute(&input).await;
let elapsed = step_start.elapsed();
(agent_name, input, result, elapsed)
}));
}
let mut step_outputs = Vec::with_capacity(agent_count);
let mut results = Vec::with_capacity(agent_count);
for handle in handles {
let (agent_name, step_input, result, elapsed) = handle.await.map_err(|e| {
echo_core::error::ReactError::Other(format!("task join error: {e}"))
})?;
let output = result?;
info!(
workflow = "concurrent",
agent = %agent_name,
elapsed_ms = elapsed.as_millis(),
"✓ Agent completed"
);
step_outputs.push(StepOutput {
agent_name,
input: step_input,
output: output.clone(),
elapsed,
});
results.push(output);
}
let merged = (self.merge)(results);
Ok(WorkflowOutput {
result: merged,
steps: step_outputs,
elapsed: total_start.elapsed(),
})
})
}
}
pub struct ConcurrentWorkflowBuilder {
agents: Vec<SharedAgent>,
merge: Option<MergeFn>,
}
impl ConcurrentWorkflowBuilder {
pub fn agent(mut self, agent: impl Agent + 'static) -> Self {
self.agents.push(shared_agent(agent));
self
}
pub fn agent_shared(mut self, agent: SharedAgent) -> Self {
self.agents.push(agent);
self
}
pub fn merge(mut self, f: impl Fn(Vec<String>) -> String + Send + Sync + 'static) -> Self {
self.merge = Some(Box::new(f));
self
}
pub fn build(self) -> ConcurrentWorkflow {
ConcurrentWorkflow {
agents: self.agents,
merge: self.merge.unwrap_or_else(|| Box::new(default_merge)),
}
}
}