cerebro 1.1.8

A blazing-fast AI memory layer that enables teams of specialized agents to collaborate through a shared cognitive architecture.
Documentation
//! # Parallel Fan-Out / Fan-In Pattern
//!
//! Multiple agents work simultaneously on the same input,
//! then a merger agent synthesizes all their outputs into a final result.
//!
//! ```text
//!                ┌→ [Agent A] ──┐
//! [Input] ───────┼→ [Agent B] ──┼→ [Merger Agent] → Final Output
//!                └→ [Agent C] ──┘
//! ```

use async_trait::async_trait;
use std::collections::HashMap;
use std::time::Instant;

use super::{SwarmPatternExecutor, SwarmResult};
use crate::swarm::agent::{AgentConfig, AgentRuntime, ChatMessage, Role};
use crate::swarm::llm::LlmClient;
use crate::swarm::memory_bus::CerebroMemoryBus;
use crate::swarm::trace::{ExecutionTracer, RunStatus, TraceAction, TraceStep};
use crate::traits::Result;

/// Parallel pattern: multiple agents run concurrently, results merged by a synthesizer.
pub struct ParallelPattern {
    /// Agent IDs to execute in parallel.
    pub parallel_agents: Vec<String>,
    /// Agent ID responsible for merging all parallel outputs.
    pub merger_agent: String,
}

impl ParallelPattern {
    pub fn new(parallel_agents: Vec<String>, merger_agent: String) -> Self {
        Self {
            parallel_agents,
            merger_agent,
        }
    }
}

#[async_trait]
impl SwarmPatternExecutor for ParallelPattern {
    async fn execute(
        &self,
        agents: &HashMap<String, AgentConfig>,
        tools: &HashMap<String, std::sync::Arc<dyn crate::swarm::tools::AgentTool>>,
        memory: &CerebroMemoryBus,
        tracer: &ExecutionTracer,
        llm: &LlmClient,
        input: &str,
    ) -> Result<SwarmResult> {
        let run_start = Instant::now();
        let mut total_tokens: usize = 0;

        println!(
            "  [Fan-Out] Dispatching to {} parallel agents...",
            self.parallel_agents.len()
        );

        // ─── Fan-Out: Execute all parallel agents ───────────────────
        // Note: We execute them sequentially here but with independent contexts.
        // True async parallelism (tokio::join!) requires cloning the LLM client
        // and would be added in a future iteration.
        let mut parallel_outputs: Vec<(String, String, String)> = Vec::new(); // (agent_id, agent_name, output)

        for (idx, agent_id) in self.parallel_agents.iter().enumerate() {
            let config = match agents.get(agent_id) {
                Some(c) => c.clone(),
                None => continue,
            };

            let agent_name = config.name.clone();

            tracer.record(TraceStep {
                agent_id: agent_id.clone(),
                agent_name: agent_name.clone(),
                step_number: idx,
                action: TraceAction::AgentStart,
                input_summary: truncate(input, 200),
                output_summary: String::new(),
                duration_ms: 0,
                tokens_used: 0,
                timestamp: chrono::Utc::now().to_rfc3339(),
            });

            let mut runtime = AgentRuntime::new(config.clone());

            // Query memory for context
            let context = memory.recall_as_context(input, 3).await;
            let user_content = if context.is_empty() {
                input.to_string()
            } else {
                format!("{}\n\n{}", context, input)
            };

            runtime.push_message(ChatMessage::new(Role::User, &user_content));
            memory.push_message(agent_id, ChatMessage::new(Role::User, &user_content));
            memory.set_state(agent_id, "status", "running").await.ok();

            // Execute agent loop (handles tool calls)
            let (final_agent_output, step_tokens, llm_duration) = super::execute_agent_loop(
                agent_id,
                &agent_name,
                &config,
                tools,
                memory,
                tracer,
                llm,
                idx,
                &mut runtime,
            )
            .await?;

            total_tokens += step_tokens;

            memory
                .commit_to_semantic(agent_id, tracer.run_id(), &final_agent_output)
                .await
                .ok();

            memory.set_state(agent_id, "status", "completed").await.ok();

            println!(
                "    [{}] {} ✓ ({} tokens, {}ms)",
                idx + 1,
                agent_name,
                step_tokens,
                llm_duration
            );

            parallel_outputs.push((agent_id.clone(), agent_name, final_agent_output));
        }

        // ─── Fan-In: Merge results via the merger agent ─────────────

        println!(
            "  [Fan-In] Synthesizing {} results...",
            parallel_outputs.len()
        );

        let merger_config = match agents.get(&self.merger_agent) {
            Some(c) => c.clone(),
            None => {
                // If no merger, concatenate all outputs
                let combined = parallel_outputs
                    .iter()
                    .map(|(_, name, out)| format!("## {}\n{}", name, out))
                    .collect::<Vec<_>>()
                    .join("\n\n---\n\n");

                return Ok(SwarmResult {
                    final_output: combined,
                    trace: tracer.finalize(RunStatus::Completed),
                    total_tokens,
                    total_duration_ms: run_start.elapsed().as_millis() as u64,
                });
            }
        };

        let merger_name = merger_config.name.clone();

        tracer.record(TraceStep {
            agent_id: self.merger_agent.clone(),
            agent_name: merger_name.clone(),
            step_number: self.parallel_agents.len(),
            action: TraceAction::AgentStart,
            input_summary: "Merging parallel results".into(),
            output_summary: String::new(),
            duration_ms: 0,
            tokens_used: 0,
            timestamp: chrono::Utc::now().to_rfc3339(),
        });

        // Build merge prompt with all parallel outputs
        let mut merge_input = format!(
            "You are synthesizing the results from {} specialist agents who analyzed the following input:\n\n\
             **Original Input:**\n{}\n\n\
             **Agent Results:**\n",
            parallel_outputs.len(),
            input
        );

        for (_, name, output) in &parallel_outputs {
            merge_input.push_str(&format!("\n### {} Report\n{}\n", name, output));
        }

        merge_input.push_str(
            "\n\nPlease provide a unified, comprehensive synthesis of all the above reports.",
        );

        let mut merger_runtime = AgentRuntime::new(merger_config.clone());
        merger_runtime.push_message(ChatMessage::new(Role::User, &merge_input));

        let (final_output, step_tokens, llm_duration) = super::execute_agent_loop(
            &self.merger_agent,
            &merger_name,
            &merger_config,
            tools,
            memory,
            tracer,
            llm,
            self.parallel_agents.len(),
            &mut merger_runtime,
        )
        .await?;

        total_tokens += step_tokens;

        memory
            .commit_to_semantic(&self.merger_agent, tracer.run_id(), &final_output)
            .await
            .ok();

        println!(
            "    [Merger] {} ✓ ({} tokens, {}ms)",
            merger_name, step_tokens, llm_duration
        );

        Ok(SwarmResult {
            final_output,
            trace: tracer.finalize(RunStatus::Completed),
            total_tokens,
            total_duration_ms: run_start.elapsed().as_millis() as u64,
        })
    }
}

fn truncate(s: &str, max: usize) -> String {
    if s.len() <= max {
        s.to_string()
    } else {
        format!("{}...", &s[..max])
    }
}