claude_agent_sdk/orchestration/patterns/
sequential.rs

1//! # Sequential Orchestration Pattern
2//!
3//! Agents execute one after another, with each agent's output becoming the input
4//! for the next agent in the sequence.
5//!
6//! ```text
7//! Input → Agent A → Agent B → Agent C → Output
8//! ```
9//!
10//! Use cases:
11//! - Data processing pipelines
12//! - Multi-step reasoning
13//! - Content generation and refinement
14
15use crate::orchestration::{
16    Result,
17    agent::{Agent, AgentInput, AgentOutput},
18    context::{AgentExecution, ExecutionContext},
19    orchestrator::{BaseOrchestrator, Orchestrator, OrchestratorInput, OrchestratorOutput},
20};
21
22/// Sequential orchestrator that executes agents one after another
23pub struct SequentialOrchestrator {
24    base: BaseOrchestrator,
25    max_retries: usize,
26}
27
28impl SequentialOrchestrator {
29    /// Create a new sequential orchestrator
30    pub fn new() -> Self {
31        Self {
32            base: BaseOrchestrator::new(
33                "SequentialOrchestrator",
34                "Executes agents sequentially, passing each output to the next input",
35            ),
36            max_retries: 3,
37        }
38    }
39
40    /// Set max retries per agent
41    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
42        self.max_retries = max_retries;
43        self
44    }
45
46    /// Execute agents sequentially
47    async fn execute_sequential(
48        &self,
49        agents: Vec<Box<dyn Agent>>,
50        mut input: AgentInput,
51        ctx: &ExecutionContext,
52    ) -> Result<Vec<AgentOutput>> {
53        let mut outputs = Vec::new();
54
55        for (index, agent) in agents.iter().enumerate() {
56            // Create execution record
57            let mut exec_record = AgentExecution::new(agent.name(), input.clone());
58
59            if ctx.is_logging_enabled() {
60                println!(
61                    "[{}] Executing agent {}/{}: {}",
62                    self.base.name(),
63                    index + 1,
64                    agents.len(),
65                    agent.name()
66                );
67            }
68
69            // Execute agent with retry
70            let output = self
71                .base
72                .execute_agent_with_retry(agent.as_ref(), input.clone(), self.max_retries)
73                .await;
74
75            let success = output.is_successful();
76
77            if success {
78                exec_record.succeed(output.clone());
79                outputs.push(output.clone());
80
81                // Use this output as input for next agent
82                input = AgentInput::new(&output.content)
83                    .with_context(output.data.clone())
84                    .with_metadata("previous_agent", agent.name());
85            } else {
86                exec_record.fail(output.content.clone());
87                return Err(
88                    crate::orchestration::errors::OrchestrationError::agent_failure(
89                        agent.name(),
90                        output.content,
91                    ),
92                );
93            }
94
95            // Add to trace if enabled
96            if ctx.is_tracing_enabled() {
97                ctx.add_execution(exec_record).await;
98            }
99        }
100
101        Ok(outputs)
102    }
103}
104
105impl Default for SequentialOrchestrator {
106    fn default() -> Self {
107        Self::new()
108    }
109}
110
111#[async_trait::async_trait]
112impl Orchestrator for SequentialOrchestrator {
113    fn name(&self) -> &str {
114        self.base.name()
115    }
116
117    fn description(&self) -> &str {
118        self.base.description()
119    }
120
121    async fn orchestrate(
122        &self,
123        agents: Vec<Box<dyn Agent>>,
124        input: OrchestratorInput,
125    ) -> Result<OrchestratorOutput> {
126        if agents.is_empty() {
127            return Err(
128                crate::orchestration::errors::OrchestrationError::invalid_config(
129                    "At least one agent is required",
130                ),
131            );
132        }
133
134        // Create execution context
135        let config = crate::orchestration::context::ExecutionConfig::new();
136        let ctx = ExecutionContext::new(config);
137
138        let agent_input = self.base.input_to_agent_input(&input);
139
140        // Execute agents sequentially
141        let outputs = match self.execute_sequential(agents, agent_input, &ctx).await {
142            Ok(outputs) => outputs,
143            Err(e) => {
144                ctx.complete_trace().await;
145                let trace = ctx.get_trace().await;
146                return Ok(OrchestratorOutput::failure(e.to_string(), trace));
147            },
148        };
149
150        // Complete trace
151        ctx.complete_trace().await;
152        let trace = ctx.get_trace().await;
153
154        // Get final result
155        let final_output = outputs.last().unwrap();
156        let result = final_output.content.clone();
157
158        Ok(OrchestratorOutput::success(result, outputs, trace))
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use crate::orchestration::agent::SimpleAgent;
166
167    #[tokio::test]
168    async fn test_sequential_orchestrator() {
169        let orchestrator = SequentialOrchestrator::new();
170
171        // Create three simple agents
172        let agent1 = SimpleAgent::new("Agent1", "First agent", |input| {
173            Ok(AgentOutput::new(format!("Step 1: {}", input.content)).with_metadata("step", "1"))
174        });
175
176        let agent2 = SimpleAgent::new("Agent2", "Second agent", |input| {
177            Ok(AgentOutput::new(format!("Step 2: {}", input.content)).with_metadata("step", "2"))
178        });
179
180        let agent3 = SimpleAgent::new("Agent3", "Third agent", |input| {
181            Ok(AgentOutput::new(format!("Step 3: {}", input.content)).with_metadata("step", "3"))
182        });
183
184        let agents: Vec<Box<dyn Agent>> =
185            vec![Box::new(agent1), Box::new(agent2), Box::new(agent3)];
186
187        let input = OrchestratorInput::new("Initial input");
188
189        let output = orchestrator.orchestrate(agents, input).await.unwrap();
190
191        assert!(output.is_successful());
192        assert_eq!(output.agent_outputs.len(), 3);
193        assert_eq!(output.agent_outputs[0].content, "Step 1: Initial input");
194        assert_eq!(
195            output.agent_outputs[1].content,
196            "Step 2: Step 1: Initial input"
197        );
198        assert_eq!(
199            output.agent_outputs[2].content,
200            "Step 3: Step 2: Step 1: Initial input"
201        );
202        assert_eq!(output.result, "Step 3: Step 2: Step 1: Initial input");
203    }
204
205    #[tokio::test]
206    async fn test_sequential_orchestrator_empty_agents() {
207        let orchestrator = SequentialOrchestrator::new();
208        let agents: Vec<Box<dyn Agent>> = vec![];
209        let input = OrchestratorInput::new("Test");
210
211        let result = orchestrator.orchestrate(agents, input).await;
212
213        assert!(result.is_err());
214        assert!(matches!(
215            result.unwrap_err(),
216            crate::orchestration::errors::OrchestrationError::InvalidConfig(_)
217        ));
218    }
219
220    #[tokio::test]
221    async fn test_sequential_orchestrator_with_retry() {
222        let orchestrator = SequentialOrchestrator::new().with_max_retries(2);
223
224        let call_count = std::sync::atomic::AtomicUsize::new(0);
225
226        let agent = SimpleAgent::new("FlakyAgent", "Sometimes fails", move |input| {
227            let count = call_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
228            if count < 2 {
229                // Fail first two times
230                Err(anyhow::anyhow!("Temporary failure").into())
231            } else {
232                Ok(AgentOutput::new(format!("Success: {}", input.content)))
233            }
234        });
235
236        let agents: Vec<Box<dyn Agent>> = vec![Box::new(agent)];
237        let input = OrchestratorInput::new("Test");
238
239        let output = orchestrator.orchestrate(agents, input).await.unwrap();
240
241        assert!(output.is_successful());
242        assert_eq!(output.agent_outputs[0].content, "Success: Test");
243    }
244}