cerebro 1.1.1

A high-performance semantic memory engine + multi-agent swarm orchestrator for AI, written in pure Rust.
Documentation
//! # Hierarchical Supervisor Pattern
//!
//! A supervisor agent decomposes a complex task, delegates sub-tasks to workers,
//! monitors their progress, and synthesizes the final output.
//!
//! ```text
//!         [Supervisor Agent]
//!        /        |         \
//! [Worker A]  [Worker B]  [Worker C]
//! ```

use async_trait::async_trait;
use std::collections::HashMap;
use std::time::Instant;

use crate::traits::Result;
use super::{SwarmPatternExecutor, SwarmResult};
use crate::swarm::agent::{AgentConfig, AgentRuntime, ChatMessage, Role};
use crate::swarm::memory_bus::CerebroMemoryBus;
use crate::swarm::trace::{ExecutionTracer, RunStatus};
use crate::swarm::llm::LlmClient;

/// Hierarchical pattern: a supervisor delegates to workers and synthesizes results.
pub struct HierarchicalPattern {
    /// The supervisor agent ID.
    pub supervisor: String,
    /// Worker agent IDs the supervisor can delegate to.
    pub workers: Vec<String>,
}

impl HierarchicalPattern {
    pub fn new(supervisor: String, workers: Vec<String>) -> Self {
        Self {
            supervisor,
            workers,
        }
    }
}

#[async_trait]
impl SwarmPatternExecutor for HierarchicalPattern {
    async fn execute(
        &self,
        agents: &HashMap<String, AgentConfig>,
        memory: &CerebroMemoryBus,
        tracer: &ExecutionTracer,
        llm: &LlmClient,
        input: &str,
    ) -> Result<SwarmResult> {
        let run_start = Instant::now();
        let mut total_tokens: usize = 0;

        let supervisor_config = match agents.get(&self.supervisor) {
            Some(c) => c.clone(),
            None => {
                return Err(crate::traits::CerebroError::StorageError(format!(
                    "Supervisor agent '{}' not found",
                    self.supervisor
                )));
            }
        };

        let supervisor_name = supervisor_config.name.clone();
        println!("  [Supervisor] {} analyzing task...", supervisor_name);

        // ─── Phase 1: Supervisor decomposes the task ─────────────────

        let worker_list = self
            .workers
            .iter()
            .filter_map(|id| agents.get(id).map(|a| format!("- {} (id: {}): {}", a.name, a.id, a.system_prompt.chars().take(100).collect::<String>())))
            .collect::<Vec<_>>()
            .join("\n");

        let decompose_prompt = format!(
            "You are a supervisor coordinating a team of specialists.\n\n\
             **Task:** {}\n\n\
             **Available Workers:**\n{}\n\n\
             Please decompose this task into sub-tasks, one per worker. \
             For each worker, specify the exact sub-task they should perform.\n\n\
             Respond in this exact format for each worker:\n\
             DELEGATE [worker_id]: [specific sub-task instruction]\n\n\
             Then add a final line:\n\
             SYNTHESIS_PROMPT: [instruction for how you'll combine their results]",
            input, worker_list
        );

        let mut supervisor_runtime = AgentRuntime::new(supervisor_config.clone());
        supervisor_runtime.push_message(ChatMessage::new(Role::User, &decompose_prompt));

        let llm_start = Instant::now();
        let decompose_response = llm.chat(&supervisor_config.model, &supervisor_runtime.messages).await?;
        let llm_duration = llm_start.elapsed().as_millis() as u64;
        let step_tokens = decompose_response.input_tokens + decompose_response.output_tokens;
        total_tokens += step_tokens;

        tracer.record_llm_call(
            &self.supervisor,
            &supervisor_name,
            0,
            &decompose_prompt,
            &decompose_response.content,
            llm_duration,
            step_tokens,
        );

        println!("    [Decompose] ✓ ({} tokens, {}ms)", step_tokens, llm_duration);

        // ─── Phase 2: Execute workers with delegated sub-tasks ───────

        let delegations = parse_delegations(&decompose_response.content);
        let mut worker_results: Vec<(String, String, String)> = Vec::new(); // (id, name, output)

        for (worker_id, sub_task) in &delegations {
            let config = match agents.get(worker_id) {
                Some(c) => c.clone(),
                None => {
                    eprintln!("    [Skip] Worker '{}' not found", worker_id);
                    continue;
                }
            };

            let worker_name = config.name.clone();
            println!("    [Worker] {} executing...", worker_name);

            tracer.record_handoff(&self.supervisor, &supervisor_name, worker_id);

            let mut worker_runtime = AgentRuntime::new(config.clone());

            // Recall relevant context
            let context = memory.recall_as_context(sub_task, 3).await;
            let worker_input = if context.is_empty() {
                sub_task.clone()
            } else {
                format!("{}\n\n{}", context, sub_task)
            };

            worker_runtime.push_message(ChatMessage::new(Role::User, &worker_input));
            memory.push_message(worker_id, ChatMessage::new(Role::User, &worker_input));

            let llm_start = Instant::now();
            let response = llm.chat(&config.model, &worker_runtime.messages).await?;
            let llm_duration = llm_start.elapsed().as_millis() as u64;
            let step_tokens = response.input_tokens + response.output_tokens;
            total_tokens += step_tokens;

            tracer.record_llm_call(worker_id, &worker_name, 1, sub_task, &response.content, llm_duration, step_tokens);

            memory.push_message(worker_id, ChatMessage::new(Role::Assistant, &response.content));
            memory.commit_to_semantic(worker_id, tracer.run_id(), &response.content).await.ok();

            println!("      ✓ ({} tokens, {}ms)", step_tokens, llm_duration);

            worker_results.push((worker_id.clone(), worker_name, response.content));
        }

        // ─── Phase 3: Supervisor synthesizes results ─────────────────

        println!("  [Supervisor] {} synthesizing...", supervisor_name);

        let mut synthesis_prompt = format!(
            "You previously decomposed a task and delegated to workers. Here are their results:\n\n\
             **Original Task:** {}\n\n",
            input
        );

        for (_, name, output) in &worker_results {
            synthesis_prompt.push_str(&format!("### {} Report\n{}\n\n", name, output));
        }

        synthesis_prompt.push_str("Please provide a comprehensive synthesis of all worker reports into a final, cohesive result.");

        supervisor_runtime.push_message(ChatMessage::new(Role::User, &synthesis_prompt));

        let llm_start = Instant::now();
        let synthesis_response = llm.chat(&supervisor_config.model, &supervisor_runtime.messages).await?;
        let llm_duration = llm_start.elapsed().as_millis() as u64;
        let step_tokens = synthesis_response.input_tokens + synthesis_response.output_tokens;
        total_tokens += step_tokens;

        tracer.record_llm_call(
            &self.supervisor,
            &supervisor_name,
            2,
            "Synthesis",
            &synthesis_response.content,
            llm_duration,
            step_tokens,
        );

        memory.commit_to_semantic(&self.supervisor, tracer.run_id(), &synthesis_response.content).await.ok();

        println!("    [Synthesis] ✓ ({} tokens, {}ms)", step_tokens, llm_duration);

        Ok(SwarmResult {
            final_output: synthesis_response.content,
            trace: tracer.finalize(RunStatus::Completed),
            total_tokens,
            total_duration_ms: run_start.elapsed().as_millis() as u64,
        })
    }
}

/// Parse "DELEGATE [worker_id]: [sub-task]" lines from the supervisor's response.
fn parse_delegations(response: &str) -> Vec<(String, String)> {
    let mut delegations = Vec::new();

    for line in response.lines() {
        let trimmed = line.trim();
        if let Some(rest) = trimmed.strip_prefix("DELEGATE ") {
            if let Some(colon_pos) = rest.find(':') {
                let worker_id = rest[..colon_pos].trim().to_string();
                let sub_task = rest[colon_pos + 1..].trim().to_string();
                if !worker_id.is_empty() && !sub_task.is_empty() {
                    delegations.push((worker_id, sub_task));
                }
            }
        }
    }

    delegations
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parse_delegations() {
        let input = "\
            DELEGATE security-agent: Check for SQL injection vulnerabilities\n\
            DELEGATE perf-agent: Analyze time complexity and suggest optimizations\n\
            SYNTHESIS_PROMPT: Combine results into a unified report\n";

        let result = parse_delegations(input);
        assert_eq!(result.len(), 2);
        assert_eq!(result[0].0, "security-agent");
        assert!(result[0].1.contains("SQL injection"));
        assert_eq!(result[1].0, "perf-agent");
    }

    #[test]
    fn test_parse_delegations_empty() {
        assert!(parse_delegations("No delegations here").is_empty());
    }
}