car_multi/patterns/
pipeline.rs1use crate::error::MultiError;
11use crate::runner::AgentRunner;
12use crate::shared::SharedInfra;
13use crate::types::{AgentOutput, AgentSpec};
14use crate::patterns::swarm::{Swarm, SwarmMode};
15use serde::{Deserialize, Serialize};
16use std::sync::Arc;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct PipelineResult {
20 pub task: String,
21 pub stages: Vec<AgentOutput>,
22 pub final_answer: String,
23}
24
25impl PipelineResult {
26 pub fn all_succeeded(&self) -> bool {
27 self.stages.iter().all(|s| s.succeeded())
28 }
29}
30
31pub struct Pipeline {
32 pub stages: Vec<AgentSpec>,
33}
34
35impl Pipeline {
36 pub fn new(stages: Vec<AgentSpec>) -> Self {
37 Self { stages }
38 }
39
40 pub async fn run(
41 &self,
42 task: &str,
43 runner: &Arc<dyn AgentRunner>,
44 infra: &SharedInfra,
45 ) -> Result<PipelineResult, MultiError> {
46 let swarm = Swarm::new(self.stages.clone(), SwarmMode::Sequential);
47 let result = swarm.run(task, runner, infra).await?;
48
49 let final_answer = result
50 .outputs
51 .last()
52 .filter(|o| o.succeeded())
53 .map(|o| o.answer.clone())
54 .unwrap_or_default();
55
56 Ok(PipelineResult {
57 task: task.to_string(),
58 stages: result.outputs,
59 final_answer,
60 })
61 }
62}
63
64#[cfg(test)]
65mod tests {
66 use super::*;
67 use crate::error::MultiError;
68 use crate::mailbox::Mailbox;
69 use crate::runner::AgentRunner;
70 use crate::types::{AgentOutput, AgentSpec};
71 use car_engine::Runtime;
72
73 struct EchoRunner;
74
75 #[async_trait::async_trait]
76 impl AgentRunner for EchoRunner {
77 async fn run(
78 &self,
79 spec: &AgentSpec,
80 task: &str,
81 _runtime: &Runtime,
82 _mailbox: &Mailbox,
83 ) -> Result<AgentOutput, MultiError> {
84 Ok(AgentOutput {
85 name: spec.name.clone(),
86 answer: format!("[{}] processed: {}", spec.name, &task[..task.len().min(80)]),
87 turns: 1,
88 tool_calls: 0,
89 duration_ms: 5.0,
90 error: None,
91 })
92 }
93 }
94
95 #[tokio::test]
96 async fn test_pipeline() {
97 let stages = vec![
98 AgentSpec::new("researcher", "Find data"),
99 AgentSpec::new("analyst", "Analyze data"),
100 AgentSpec::new("writer", "Write report"),
101 ];
102 let runner: Arc<dyn AgentRunner> = Arc::new(EchoRunner);
103 let infra = SharedInfra::new();
104
105 let result = Pipeline::new(stages).run("analyze trends", &runner, &infra).await.unwrap();
106
107 assert_eq!(result.stages.len(), 3);
108 assert!(result.all_succeeded());
109 assert!(result.final_answer.contains("[writer]"));
110 }
111}