car-multi 0.14.0

Multi-agent coordination patterns for Common Agent Runtime
Documentation
//! Pipeline — agent A's output feeds agent B's input.
//!
//! A linear chain where each agent transforms or enriches the result.
//! State flows forward through the pipeline.
//!
//! ```text
//! researcher → analyst → writer → final report
//! ```

use crate::error::MultiError;
use crate::patterns::swarm::{Swarm, SwarmMode};
use crate::runner::AgentRunner;
use crate::shared::SharedInfra;
use crate::types::{AgentOutput, AgentSpec};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::instrument;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineResult {
    pub task: String,
    pub stages: Vec<AgentOutput>,
    pub final_answer: String,
}

impl PipelineResult {
    pub fn all_succeeded(&self) -> bool {
        self.stages.iter().all(|s| s.succeeded())
    }
}

pub struct Pipeline {
    pub stages: Vec<AgentSpec>,
}

impl Pipeline {
    pub fn new(stages: Vec<AgentSpec>) -> Self {
        Self { stages }
    }

    #[instrument(name = "multi.pipeline", skip_all)]
    pub async fn run(
        &self,
        task: &str,
        runner: &Arc<dyn AgentRunner>,
        infra: &SharedInfra,
    ) -> Result<PipelineResult, MultiError> {
        let swarm = Swarm::new(self.stages.clone(), SwarmMode::Sequential);
        let result = swarm.run(task, runner, infra).await?;

        let final_answer = result
            .outputs
            .last()
            .filter(|o| o.succeeded())
            .map(|o| o.answer.clone())
            .unwrap_or_default();

        Ok(PipelineResult {
            task: task.to_string(),
            stages: result.outputs,
            final_answer,
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::error::MultiError;
    use crate::mailbox::Mailbox;
    use crate::runner::AgentRunner;
    use crate::types::{AgentOutput, AgentSpec};
    use car_engine::Runtime;

    struct EchoRunner;

    #[async_trait::async_trait]
    impl AgentRunner for EchoRunner {
        async fn run(
            &self,
            spec: &AgentSpec,
            task: &str,
            _runtime: &Runtime,
            _mailbox: &Mailbox,
        ) -> Result<AgentOutput, MultiError> {
            Ok(AgentOutput {
                name: spec.name.clone(),
                answer: format!("[{}] processed: {}", spec.name, &task[..task.len().min(80)]),
                turns: 1,
                tool_calls: 0,
                duration_ms: 5.0,
                error: None,
                outcome: None,
                tokens: None,
            })
        }
    }

    #[tokio::test]
    async fn test_pipeline() {
        let stages = vec![
            AgentSpec::new("researcher", "Find data"),
            AgentSpec::new("analyst", "Analyze data"),
            AgentSpec::new("writer", "Write report"),
        ];
        let runner: Arc<dyn AgentRunner> = Arc::new(EchoRunner);
        let infra = SharedInfra::new();

        let result = Pipeline::new(stages)
            .run("analyze trends", &runner, &infra)
            .await
            .unwrap();

        assert_eq!(result.stages.len(), 3);
        assert!(result.all_succeeded());
        assert!(result.final_answer.contains("[writer]"));
    }
}