Skip to main content

car_multi/patterns/
pipeline.rs

1//! Pipeline — agent A's output feeds agent B's input.
2//!
3//! A linear chain where each agent transforms or enriches the result.
4//! State flows forward through the pipeline.
5//!
6//! ```text
7//! researcher → analyst → writer → final report
8//! ```
9
10use crate::error::MultiError;
11use crate::runner::AgentRunner;
12use crate::shared::SharedInfra;
13use crate::types::{AgentOutput, AgentSpec};
14use crate::patterns::swarm::{Swarm, SwarmMode};
15use serde::{Deserialize, Serialize};
16use std::sync::Arc;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct PipelineResult {
20    pub task: String,
21    pub stages: Vec<AgentOutput>,
22    pub final_answer: String,
23}
24
25impl PipelineResult {
26    pub fn all_succeeded(&self) -> bool {
27        self.stages.iter().all(|s| s.succeeded())
28    }
29}
30
31pub struct Pipeline {
32    pub stages: Vec<AgentSpec>,
33}
34
35impl Pipeline {
36    pub fn new(stages: Vec<AgentSpec>) -> Self {
37        Self { stages }
38    }
39
40    pub async fn run(
41        &self,
42        task: &str,
43        runner: &Arc<dyn AgentRunner>,
44        infra: &SharedInfra,
45    ) -> Result<PipelineResult, MultiError> {
46        let swarm = Swarm::new(self.stages.clone(), SwarmMode::Sequential);
47        let result = swarm.run(task, runner, infra).await?;
48
49        let final_answer = result
50            .outputs
51            .last()
52            .filter(|o| o.succeeded())
53            .map(|o| o.answer.clone())
54            .unwrap_or_default();
55
56        Ok(PipelineResult {
57            task: task.to_string(),
58            stages: result.outputs,
59            final_answer,
60        })
61    }
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67    use crate::error::MultiError;
68    use crate::mailbox::Mailbox;
69    use crate::runner::AgentRunner;
70    use crate::types::{AgentOutput, AgentSpec};
71    use car_engine::Runtime;
72
73    struct EchoRunner;
74
75    #[async_trait::async_trait]
76    impl AgentRunner for EchoRunner {
77        async fn run(
78            &self,
79            spec: &AgentSpec,
80            task: &str,
81            _runtime: &Runtime,
82            _mailbox: &Mailbox,
83        ) -> Result<AgentOutput, MultiError> {
84            Ok(AgentOutput {
85                name: spec.name.clone(),
86                answer: format!("[{}] processed: {}", spec.name, &task[..task.len().min(80)]),
87                turns: 1,
88                tool_calls: 0,
89                duration_ms: 5.0,
90                error: None,
91            })
92        }
93    }
94
95    #[tokio::test]
96    async fn test_pipeline() {
97        let stages = vec![
98            AgentSpec::new("researcher", "Find data"),
99            AgentSpec::new("analyst", "Analyze data"),
100            AgentSpec::new("writer", "Write report"),
101        ];
102        let runner: Arc<dyn AgentRunner> = Arc::new(EchoRunner);
103        let infra = SharedInfra::new();
104
105        let result = Pipeline::new(stages).run("analyze trends", &runner, &infra).await.unwrap();
106
107        assert_eq!(result.stages.len(), 3);
108        assert!(result.all_succeeded());
109        assert!(result.final_answer.contains("[writer]"));
110    }
111}