use async_trait::async_trait;
use std::collections::HashMap;
use std::time::Instant;
use super::{SwarmPatternExecutor, SwarmResult};
use crate::swarm::agent::{AgentConfig, AgentRuntime, ChatMessage, Role};
use crate::swarm::llm::LlmClient;
use crate::swarm::memory_bus::CerebroMemoryBus;
use crate::swarm::trace::{ExecutionTracer, RunStatus};
use crate::traits::Result;
pub struct HierarchicalPattern {
pub supervisor: String,
pub workers: Vec<String>,
}
impl HierarchicalPattern {
pub fn new(supervisor: String, workers: Vec<String>) -> Self {
Self {
supervisor,
workers,
}
}
}
#[async_trait]
impl SwarmPatternExecutor for HierarchicalPattern {
async fn execute(
&self,
agents: &HashMap<String, AgentConfig>,
tools: &HashMap<String, std::sync::Arc<dyn crate::swarm::tools::AgentTool>>,
memory: &CerebroMemoryBus,
tracer: &ExecutionTracer,
llm: &LlmClient,
input: &str,
) -> Result<SwarmResult> {
let run_start = Instant::now();
let mut total_tokens: usize = 0;
let supervisor_config = match agents.get(&self.supervisor) {
Some(c) => c.clone(),
None => {
return Err(crate::traits::CerebroError::StorageError(format!(
"Supervisor agent '{}' not found",
self.supervisor
)));
}
};
let supervisor_name = supervisor_config.name.clone();
println!(" [Supervisor] {} analyzing task...", supervisor_name);
let worker_list = self
.workers
.iter()
.filter_map(|id| {
agents.get(id).map(|a| {
format!(
"- {} (id: {}): {}",
a.name,
a.id,
a.system_prompt.chars().take(100).collect::<String>()
)
})
})
.collect::<Vec<_>>()
.join("\n");
let decompose_prompt = format!(
"You are a supervisor coordinating a team of specialists.\n\n\
**Task:** {}\n\n\
**Available Workers:**\n{}\n\n\
Please decompose this task into sub-tasks, one per worker. \
For each worker, specify the exact sub-task they should perform.\n\n\
Respond in this exact format for each worker:\n\
DELEGATE [worker_id]: [specific sub-task instruction]\n\n\
Then add a final line:\n\
SYNTHESIS_PROMPT: [instruction for how you'll combine their results]",
input, worker_list
);
let mut supervisor_runtime = AgentRuntime::new(supervisor_config.clone());
supervisor_runtime.push_message(ChatMessage::new(Role::User, &decompose_prompt));
let (decompose_response, step_tokens, llm_duration) = super::execute_agent_loop(
&self.supervisor,
&supervisor_name,
&supervisor_config,
tools,
memory,
tracer,
llm,
0,
&mut supervisor_runtime,
)
.await?;
total_tokens += step_tokens;
println!(
" [Decompose] ✓ ({} tokens, {}ms)",
step_tokens, llm_duration
);
let delegations = parse_delegations(&decompose_response);
let mut worker_results: Vec<(String, String, String)> = Vec::new();
for (worker_id, sub_task) in &delegations {
let config = match agents.get(worker_id) {
Some(c) => c.clone(),
None => {
eprintln!(" [Skip] Worker '{}' not found", worker_id);
continue;
}
};
let worker_name = config.name.clone();
println!(" [Worker] {} executing...", worker_name);
tracer.record_handoff(&self.supervisor, &supervisor_name, worker_id);
let mut worker_runtime = AgentRuntime::new(config.clone());
let context = memory.recall_as_context(sub_task, 3).await;
let worker_input = if context.is_empty() {
sub_task.clone()
} else {
format!("{}\n\n{}", context, sub_task)
};
worker_runtime.push_message(ChatMessage::new(Role::User, &worker_input));
memory.push_message(worker_id, ChatMessage::new(Role::User, &worker_input));
let (worker_output, step_tokens, llm_duration) = super::execute_agent_loop(
worker_id,
&worker_name,
&config,
tools,
memory,
tracer,
llm,
1,
&mut worker_runtime,
)
.await?;
total_tokens += step_tokens;
memory
.commit_to_semantic(worker_id, tracer.run_id(), &worker_output)
.await
.ok();
println!(" ✓ ({} tokens, {}ms)", step_tokens, llm_duration);
worker_results.push((worker_id.clone(), worker_name, worker_output));
}
println!(" [Supervisor] {} synthesizing...", supervisor_name);
let mut synthesis_prompt = format!(
"You previously decomposed a task and delegated to workers. Here are their results:\n\n\
**Original Task:** {}\n\n",
input
);
for (_, name, output) in &worker_results {
synthesis_prompt.push_str(&format!("### {} Report\n{}\n\n", name, output));
}
synthesis_prompt.push_str("Please provide a comprehensive synthesis of all worker reports into a final, cohesive result.");
supervisor_runtime.push_message(ChatMessage::new(Role::User, &synthesis_prompt));
let (synthesis_response, step_tokens, llm_duration) = super::execute_agent_loop(
&self.supervisor,
&supervisor_name,
&supervisor_config,
tools,
memory,
tracer,
llm,
2,
&mut supervisor_runtime,
)
.await?;
total_tokens += step_tokens;
memory
.commit_to_semantic(&self.supervisor, tracer.run_id(), &synthesis_response)
.await
.ok();
println!(
" [Synthesis] ✓ ({} tokens, {}ms)",
step_tokens, llm_duration
);
Ok(SwarmResult {
final_output: synthesis_response,
trace: tracer.finalize(RunStatus::Completed),
total_tokens,
total_duration_ms: run_start.elapsed().as_millis() as u64,
})
}
}
fn parse_delegations(response: &str) -> Vec<(String, String)> {
let mut delegations = Vec::new();
for line in response.lines() {
let trimmed = line.trim();
if let Some(rest) = trimmed.strip_prefix("DELEGATE ") {
if let Some(colon_pos) = rest.find(':') {
let worker_id = rest[..colon_pos].trim().to_string();
let sub_task = rest[colon_pos + 1..].trim().to_string();
if !worker_id.is_empty() && !sub_task.is_empty() {
delegations.push((worker_id, sub_task));
}
}
}
}
delegations
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_delegations() {
let input = "\
DELEGATE security-agent: Check for SQL injection vulnerabilities\n\
DELEGATE perf-agent: Analyze time complexity and suggest optimizations\n\
SYNTHESIS_PROMPT: Combine results into a unified report\n";
let result = parse_delegations(input);
assert_eq!(result.len(), 2);
assert_eq!(result[0].0, "security-agent");
assert!(result[0].1.contains("SQL injection"));
assert_eq!(result[1].0, "perf-agent");
}
#[test]
fn test_parse_delegations_empty() {
assert!(parse_delegations("No delegations here").is_empty());
}
}