use async_trait::async_trait;
use std::collections::HashMap;
use std::time::Instant;
use crate::traits::Result;
use super::{SwarmPatternExecutor, SwarmResult};
use crate::swarm::agent::{AgentConfig, AgentRuntime, ChatMessage, Role};
use crate::swarm::memory_bus::CerebroMemoryBus;
use crate::swarm::trace::{ExecutionTracer, RunStatus};
use crate::swarm::llm::LlmClient;
pub struct HierarchicalPattern {
pub supervisor: String,
pub workers: Vec<String>,
}
impl HierarchicalPattern {
pub fn new(supervisor: String, workers: Vec<String>) -> Self {
Self {
supervisor,
workers,
}
}
}
#[async_trait]
impl SwarmPatternExecutor for HierarchicalPattern {
async fn execute(
&self,
agents: &HashMap<String, AgentConfig>,
memory: &CerebroMemoryBus,
tracer: &ExecutionTracer,
llm: &LlmClient,
input: &str,
) -> Result<SwarmResult> {
let run_start = Instant::now();
let mut total_tokens: usize = 0;
let supervisor_config = match agents.get(&self.supervisor) {
Some(c) => c.clone(),
None => {
return Err(crate::traits::CerebroError::StorageError(format!(
"Supervisor agent '{}' not found",
self.supervisor
)));
}
};
let supervisor_name = supervisor_config.name.clone();
println!(" [Supervisor] {} analyzing task...", supervisor_name);
let worker_list = self
.workers
.iter()
.filter_map(|id| agents.get(id).map(|a| format!("- {} (id: {}): {}", a.name, a.id, a.system_prompt.chars().take(100).collect::<String>())))
.collect::<Vec<_>>()
.join("\n");
let decompose_prompt = format!(
"You are a supervisor coordinating a team of specialists.\n\n\
**Task:** {}\n\n\
**Available Workers:**\n{}\n\n\
Please decompose this task into sub-tasks, one per worker. \
For each worker, specify the exact sub-task they should perform.\n\n\
Respond in this exact format for each worker:\n\
DELEGATE [worker_id]: [specific sub-task instruction]\n\n\
Then add a final line:\n\
SYNTHESIS_PROMPT: [instruction for how you'll combine their results]",
input, worker_list
);
let mut supervisor_runtime = AgentRuntime::new(supervisor_config.clone());
supervisor_runtime.push_message(ChatMessage::new(Role::User, &decompose_prompt));
let llm_start = Instant::now();
let decompose_response = llm.chat(&supervisor_config.model, &supervisor_runtime.messages).await?;
let llm_duration = llm_start.elapsed().as_millis() as u64;
let step_tokens = decompose_response.input_tokens + decompose_response.output_tokens;
total_tokens += step_tokens;
tracer.record_llm_call(
&self.supervisor,
&supervisor_name,
0,
&decompose_prompt,
&decompose_response.content,
llm_duration,
step_tokens,
);
println!(" [Decompose] ✓ ({} tokens, {}ms)", step_tokens, llm_duration);
let delegations = parse_delegations(&decompose_response.content);
let mut worker_results: Vec<(String, String, String)> = Vec::new();
for (worker_id, sub_task) in &delegations {
let config = match agents.get(worker_id) {
Some(c) => c.clone(),
None => {
eprintln!(" [Skip] Worker '{}' not found", worker_id);
continue;
}
};
let worker_name = config.name.clone();
println!(" [Worker] {} executing...", worker_name);
tracer.record_handoff(&self.supervisor, &supervisor_name, worker_id);
let mut worker_runtime = AgentRuntime::new(config.clone());
let context = memory.recall_as_context(sub_task, 3).await;
let worker_input = if context.is_empty() {
sub_task.clone()
} else {
format!("{}\n\n{}", context, sub_task)
};
worker_runtime.push_message(ChatMessage::new(Role::User, &worker_input));
memory.push_message(worker_id, ChatMessage::new(Role::User, &worker_input));
let llm_start = Instant::now();
let response = llm.chat(&config.model, &worker_runtime.messages).await?;
let llm_duration = llm_start.elapsed().as_millis() as u64;
let step_tokens = response.input_tokens + response.output_tokens;
total_tokens += step_tokens;
tracer.record_llm_call(worker_id, &worker_name, 1, sub_task, &response.content, llm_duration, step_tokens);
memory.push_message(worker_id, ChatMessage::new(Role::Assistant, &response.content));
memory.commit_to_semantic(worker_id, tracer.run_id(), &response.content).await.ok();
println!(" ✓ ({} tokens, {}ms)", step_tokens, llm_duration);
worker_results.push((worker_id.clone(), worker_name, response.content));
}
println!(" [Supervisor] {} synthesizing...", supervisor_name);
let mut synthesis_prompt = format!(
"You previously decomposed a task and delegated to workers. Here are their results:\n\n\
**Original Task:** {}\n\n",
input
);
for (_, name, output) in &worker_results {
synthesis_prompt.push_str(&format!("### {} Report\n{}\n\n", name, output));
}
synthesis_prompt.push_str("Please provide a comprehensive synthesis of all worker reports into a final, cohesive result.");
supervisor_runtime.push_message(ChatMessage::new(Role::User, &synthesis_prompt));
let llm_start = Instant::now();
let synthesis_response = llm.chat(&supervisor_config.model, &supervisor_runtime.messages).await?;
let llm_duration = llm_start.elapsed().as_millis() as u64;
let step_tokens = synthesis_response.input_tokens + synthesis_response.output_tokens;
total_tokens += step_tokens;
tracer.record_llm_call(
&self.supervisor,
&supervisor_name,
2,
"Synthesis",
&synthesis_response.content,
llm_duration,
step_tokens,
);
memory.commit_to_semantic(&self.supervisor, tracer.run_id(), &synthesis_response.content).await.ok();
println!(" [Synthesis] ✓ ({} tokens, {}ms)", step_tokens, llm_duration);
Ok(SwarmResult {
final_output: synthesis_response.content,
trace: tracer.finalize(RunStatus::Completed),
total_tokens,
total_duration_ms: run_start.elapsed().as_millis() as u64,
})
}
}
fn parse_delegations(response: &str) -> Vec<(String, String)> {
let mut delegations = Vec::new();
for line in response.lines() {
let trimmed = line.trim();
if let Some(rest) = trimmed.strip_prefix("DELEGATE ") {
if let Some(colon_pos) = rest.find(':') {
let worker_id = rest[..colon_pos].trim().to_string();
let sub_task = rest[colon_pos + 1..].trim().to_string();
if !worker_id.is_empty() && !sub_task.is_empty() {
delegations.push((worker_id, sub_task));
}
}
}
}
delegations
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_delegations() {
let input = "\
DELEGATE security-agent: Check for SQL injection vulnerabilities\n\
DELEGATE perf-agent: Analyze time complexity and suggest optimizations\n\
SYNTHESIS_PROMPT: Combine results into a unified report\n";
let result = parse_delegations(input);
assert_eq!(result.len(), 2);
assert_eq!(result[0].0, "security-agent");
assert!(result[0].1.contains("SQL injection"));
assert_eq!(result[1].0, "perf-agent");
}
#[test]
fn test_parse_delegations_empty() {
assert!(parse_delegations("No delegations here").is_empty());
}
}