use crate::error::MultiError;
use crate::runner::AgentRunner;
use crate::shared::SharedInfra;
use crate::types::{AgentOutput, AgentSpec};
use crate::patterns::swarm::{Swarm, SwarmMode};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::instrument;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineResult {
pub task: String,
pub stages: Vec<AgentOutput>,
pub final_answer: String,
}
impl PipelineResult {
pub fn all_succeeded(&self) -> bool {
self.stages.iter().all(|s| s.succeeded())
}
}
pub struct Pipeline {
pub stages: Vec<AgentSpec>,
}
impl Pipeline {
pub fn new(stages: Vec<AgentSpec>) -> Self {
Self { stages }
}
#[instrument(name = "multi.pipeline", skip_all)]
pub async fn run(
&self,
task: &str,
runner: &Arc<dyn AgentRunner>,
infra: &SharedInfra,
) -> Result<PipelineResult, MultiError> {
let swarm = Swarm::new(self.stages.clone(), SwarmMode::Sequential);
let result = swarm.run(task, runner, infra).await?;
let final_answer = result
.outputs
.last()
.filter(|o| o.succeeded())
.map(|o| o.answer.clone())
.unwrap_or_default();
Ok(PipelineResult {
task: task.to_string(),
stages: result.outputs,
final_answer,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::MultiError;
use crate::mailbox::Mailbox;
use crate::runner::AgentRunner;
use crate::types::{AgentOutput, AgentSpec};
use car_engine::Runtime;
struct EchoRunner;
#[async_trait::async_trait]
impl AgentRunner for EchoRunner {
async fn run(
&self,
spec: &AgentSpec,
task: &str,
_runtime: &Runtime,
_mailbox: &Mailbox,
) -> Result<AgentOutput, MultiError> {
Ok(AgentOutput {
name: spec.name.clone(),
answer: format!("[{}] processed: {}", spec.name, &task[..task.len().min(80)]),
turns: 1,
tool_calls: 0,
duration_ms: 5.0,
error: None,
outcome: None,
tokens: None,
})
}
}
#[tokio::test]
async fn test_pipeline() {
let stages = vec![
AgentSpec::new("researcher", "Find data"),
AgentSpec::new("analyst", "Analyze data"),
AgentSpec::new("writer", "Write report"),
];
let runner: Arc<dyn AgentRunner> = Arc::new(EchoRunner);
let infra = SharedInfra::new();
let result = Pipeline::new(stages).run("analyze trends", &runner, &infra).await.unwrap();
assert_eq!(result.stages.len(), 3);
assert!(result.all_succeeded());
assert!(result.final_answer.contains("[writer]"));
}
}