cerebro 1.1.8

A blazing-fast AI memory layer that enables teams of specialized agents to collaborate through a shared cognitive architecture.
Documentation
//! # Sequential Pipeline Pattern
//!
//! Agents execute one after another in a defined order.
//! Each agent's output becomes the next agent's input, like an assembly line.
//!
//! ```text
//! [Agent A] → [Agent B] → [Agent C] → Final Output
//! ```

use async_trait::async_trait;
use std::collections::HashMap;
use std::time::Instant;

use super::{SwarmPatternExecutor, SwarmResult};
use crate::swarm::agent::{AgentConfig, AgentRuntime, ChatMessage, Role};
use crate::swarm::llm::LlmClient;
use crate::swarm::memory_bus::CerebroMemoryBus;
use crate::swarm::trace::{ExecutionTracer, RunStatus, TraceAction, TraceStep};
use crate::traits::Result;

/// Sequential pipeline: agents execute in order, each building on the last.
pub struct SequentialPattern {
    /// Ordered list of agent IDs defining the pipeline.
    pub agent_order: Vec<String>,
}

impl SequentialPattern {
    pub fn new(agent_order: Vec<String>) -> Self {
        Self { agent_order }
    }
}

#[async_trait]
impl SwarmPatternExecutor for SequentialPattern {
    async fn execute(
        &self,
        agents: &HashMap<String, AgentConfig>,
        tools: &HashMap<String, std::sync::Arc<dyn crate::swarm::tools::AgentTool>>,
        memory: &CerebroMemoryBus,
        tracer: &ExecutionTracer,
        llm: &LlmClient,
        input: &str,
    ) -> Result<SwarmResult> {
        let run_start = Instant::now();
        let mut current_input = input.to_string();
        let mut total_tokens: usize = 0;
        let mut final_output = String::new();

        for (pipeline_step, agent_id) in self.agent_order.iter().enumerate() {
            let config = match agents.get(agent_id) {
                Some(c) => c.clone(),
                None => {
                    eprintln!(
                        "[SwarmForge] Agent '{}' not found in registry, skipping.",
                        agent_id
                    );
                    continue;
                }
            };

            let agent_name = config.name.clone();

            // Record agent start
            tracer.record(TraceStep {
                agent_id: agent_id.clone(),
                agent_name: agent_name.clone(),
                step_number: pipeline_step,
                action: TraceAction::AgentStart,
                input_summary: truncate(&current_input, 200),
                output_summary: String::new(),
                duration_ms: 0,
                tokens_used: 0,
                timestamp: chrono::Utc::now().to_rfc3339(),
            });

            // Initialize agent runtime
            let mut runtime = AgentRuntime::new(config.clone());

            // Query semantic memory for relevant context
            let context = memory.recall_as_context(&current_input, 3).await;
            tracer.record_memory_query(
                agent_id,
                &agent_name,
                &current_input,
                if context.is_empty() { 0 } else { 3 },
            );

            // Build the user message with context
            let user_content = if context.is_empty() {
                current_input.clone()
            } else {
                format!("{}\n\n{}", context, current_input)
            };

            runtime.push_message(ChatMessage::new(Role::User, &user_content));

            // Store in episodic memory
            memory.push_message(agent_id, ChatMessage::new(Role::User, &user_content));

            // Update working memory state
            memory.set_state(agent_id, "status", "running").await.ok();
            memory
                .set_state(agent_id, "pipeline_step", &pipeline_step.to_string())
                .await
                .ok();

            // Execute agent loop (handles tool calls)
            let (final_agent_output, step_tokens, llm_duration) = super::execute_agent_loop(
                agent_id,
                &agent_name,
                &config,
                tools,
                memory,
                tracer,
                llm,
                pipeline_step,
                &mut runtime,
            )
            .await?;

            total_tokens += step_tokens;

            // Commit output to semantic memory for future agents to recall
            let doc_id = memory
                .commit_to_semantic(agent_id, tracer.run_id(), &final_agent_output)
                .await
                .unwrap_or_else(|_| "unknown".into());
            tracer.record_memory_commit(agent_id, &agent_name, &doc_id);

            // Update working memory
            memory.set_state(agent_id, "status", "completed").await.ok();
            memory
                .set_state(agent_id, "last_output", &final_agent_output)
                .await
                .ok();

            // Record agent completion
            tracer.record(TraceStep {
                agent_id: agent_id.clone(),
                agent_name: agent_name.clone(),
                step_number: pipeline_step,
                action: TraceAction::AgentComplete,
                input_summary: truncate(&current_input, 200),
                output_summary: truncate(&final_agent_output, 500),
                duration_ms: llm_duration,
                tokens_used: step_tokens,
                timestamp: chrono::Utc::now().to_rfc3339(),
            });

            // Record handoff to next agent (if not the last)
            if pipeline_step < self.agent_order.len() - 1 {
                let next_id = &self.agent_order[pipeline_step + 1];
                tracer.record_handoff(agent_id, &agent_name, next_id);
            }

            // Pass output as input to next agent
            final_output = final_agent_output.clone();
            current_input = final_agent_output;

            println!(
                "  [{}] {} ✓ ({} tokens, {}ms)",
                pipeline_step + 1,
                agent_name,
                step_tokens,
                llm_duration
            );
        }

        let total_duration = run_start.elapsed().as_millis() as u64;

        Ok(SwarmResult {
            final_output,
            trace: tracer.finalize(RunStatus::Completed),
            total_tokens,
            total_duration_ms: total_duration,
        })
    }
}

fn truncate(s: &str, max: usize) -> String {
    if s.len() <= max {
        s.to_string()
    } else {
        format!("{}...", &s[..max])
    }
}