use async_trait::async_trait;
use std::collections::HashMap;
use std::time::Instant;
use crate::traits::Result;
use super::{SwarmPatternExecutor, SwarmResult};
use crate::swarm::agent::{AgentConfig, AgentRuntime, ChatMessage, Role};
use crate::swarm::memory_bus::CerebroMemoryBus;
use crate::swarm::trace::{ExecutionTracer, RunStatus, TraceAction, TraceStep};
use crate::swarm::llm::LlmClient;
pub struct ParallelPattern {
pub parallel_agents: Vec<String>,
pub merger_agent: String,
}
impl ParallelPattern {
pub fn new(parallel_agents: Vec<String>, merger_agent: String) -> Self {
Self {
parallel_agents,
merger_agent,
}
}
}
#[async_trait]
impl SwarmPatternExecutor for ParallelPattern {
async fn execute(
&self,
agents: &HashMap<String, AgentConfig>,
memory: &CerebroMemoryBus,
tracer: &ExecutionTracer,
llm: &LlmClient,
input: &str,
) -> Result<SwarmResult> {
let run_start = Instant::now();
let mut total_tokens: usize = 0;
println!(" [Fan-Out] Dispatching to {} parallel agents...", self.parallel_agents.len());
let mut parallel_outputs: Vec<(String, String, String)> = Vec::new();
for (idx, agent_id) in self.parallel_agents.iter().enumerate() {
let config = match agents.get(agent_id) {
Some(c) => c.clone(),
None => continue,
};
let agent_name = config.name.clone();
tracer.record(TraceStep {
agent_id: agent_id.clone(),
agent_name: agent_name.clone(),
step_number: idx,
action: TraceAction::AgentStart,
input_summary: truncate(input, 200),
output_summary: String::new(),
duration_ms: 0,
tokens_used: 0,
timestamp: chrono::Utc::now().to_rfc3339(),
});
let mut runtime = AgentRuntime::new(config.clone());
let context = memory.recall_as_context(input, 3).await;
let user_content = if context.is_empty() {
input.to_string()
} else {
format!("{}\n\n{}", context, input)
};
runtime.push_message(ChatMessage::new(Role::User, &user_content));
memory.push_message(agent_id, ChatMessage::new(Role::User, &user_content));
memory.set_state(agent_id, "status", "running").await.ok();
let llm_start = Instant::now();
let response = llm.chat(&config.model, &runtime.messages).await?;
let llm_duration = llm_start.elapsed().as_millis() as u64;
let step_tokens = response.input_tokens + response.output_tokens;
total_tokens += step_tokens;
tracer.record_llm_call(
agent_id,
&agent_name,
idx,
input,
&response.content,
llm_duration,
step_tokens,
);
memory.push_message(
agent_id,
ChatMessage::new(Role::Assistant, &response.content),
);
memory
.commit_to_semantic(agent_id, tracer.run_id(), &response.content)
.await
.ok();
memory.set_state(agent_id, "status", "completed").await.ok();
println!(
" [{}] {} ✓ ({} tokens, {}ms)",
idx + 1,
agent_name,
step_tokens,
llm_duration
);
parallel_outputs.push((agent_id.clone(), agent_name, response.content));
}
println!(" [Fan-In] Synthesizing {} results...", parallel_outputs.len());
let merger_config = match agents.get(&self.merger_agent) {
Some(c) => c.clone(),
None => {
let combined = parallel_outputs
.iter()
.map(|(_, name, out)| format!("## {}\n{}", name, out))
.collect::<Vec<_>>()
.join("\n\n---\n\n");
return Ok(SwarmResult {
final_output: combined,
trace: tracer.finalize(RunStatus::Completed),
total_tokens,
total_duration_ms: run_start.elapsed().as_millis() as u64,
});
}
};
let merger_name = merger_config.name.clone();
tracer.record(TraceStep {
agent_id: self.merger_agent.clone(),
agent_name: merger_name.clone(),
step_number: self.parallel_agents.len(),
action: TraceAction::AgentStart,
input_summary: "Merging parallel results".into(),
output_summary: String::new(),
duration_ms: 0,
tokens_used: 0,
timestamp: chrono::Utc::now().to_rfc3339(),
});
let mut merge_input = format!(
"You are synthesizing the results from {} specialist agents who analyzed the following input:\n\n\
**Original Input:**\n{}\n\n\
**Agent Results:**\n",
parallel_outputs.len(),
input
);
for (_, name, output) in ¶llel_outputs {
merge_input.push_str(&format!("\n### {} Report\n{}\n", name, output));
}
merge_input.push_str("\n\nPlease provide a unified, comprehensive synthesis of all the above reports.");
let mut merger_runtime = AgentRuntime::new(merger_config.clone());
merger_runtime.push_message(ChatMessage::new(Role::User, &merge_input));
let llm_start = Instant::now();
let response = llm.chat(&merger_config.model, &merger_runtime.messages).await?;
let llm_duration = llm_start.elapsed().as_millis() as u64;
let step_tokens = response.input_tokens + response.output_tokens;
total_tokens += step_tokens;
tracer.record_llm_call(
&self.merger_agent,
&merger_name,
self.parallel_agents.len(),
&merge_input,
&response.content,
llm_duration,
step_tokens,
);
memory
.commit_to_semantic(&self.merger_agent, tracer.run_id(), &response.content)
.await
.ok();
println!(" [Merger] {} ✓ ({} tokens, {}ms)", merger_name, step_tokens, llm_duration);
Ok(SwarmResult {
final_output: response.content,
trace: tracer.finalize(RunStatus::Completed),
total_tokens,
total_duration_ms: run_start.elapsed().as_millis() as u64,
})
}
}
fn truncate(s: &str, max: usize) -> String {
if s.len() <= max {
s.to_string()
} else {
format!("{}...", &s[..max])
}
}