claude_agent_sdk/orchestration/patterns/
sequential.rs1use crate::orchestration::{
16 Result,
17 agent::{Agent, AgentInput, AgentOutput},
18 context::{AgentExecution, ExecutionContext},
19 orchestrator::{BaseOrchestrator, Orchestrator, OrchestratorInput, OrchestratorOutput},
20};
21
22pub struct SequentialOrchestrator {
24 base: BaseOrchestrator,
25 max_retries: usize,
26}
27
28impl SequentialOrchestrator {
29 pub fn new() -> Self {
31 Self {
32 base: BaseOrchestrator::new(
33 "SequentialOrchestrator",
34 "Executes agents sequentially, passing each output to the next input",
35 ),
36 max_retries: 3,
37 }
38 }
39
40 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
42 self.max_retries = max_retries;
43 self
44 }
45
46 async fn execute_sequential(
48 &self,
49 agents: Vec<Box<dyn Agent>>,
50 mut input: AgentInput,
51 ctx: &ExecutionContext,
52 ) -> Result<Vec<AgentOutput>> {
53 let mut outputs = Vec::new();
54
55 for (index, agent) in agents.iter().enumerate() {
56 let mut exec_record = AgentExecution::new(agent.name(), input.clone());
58
59 if ctx.is_logging_enabled() {
60 println!(
61 "[{}] Executing agent {}/{}: {}",
62 self.base.name(),
63 index + 1,
64 agents.len(),
65 agent.name()
66 );
67 }
68
69 let output = self
71 .base
72 .execute_agent_with_retry(agent.as_ref(), input.clone(), self.max_retries)
73 .await;
74
75 let success = output.is_successful();
76
77 if success {
78 exec_record.succeed(output.clone());
79 outputs.push(output.clone());
80
81 input = AgentInput::new(&output.content)
83 .with_context(output.data.clone())
84 .with_metadata("previous_agent", agent.name());
85 } else {
86 exec_record.fail(output.content.clone());
87 return Err(
88 crate::orchestration::errors::OrchestrationError::agent_failure(
89 agent.name(),
90 output.content,
91 ),
92 );
93 }
94
95 if ctx.is_tracing_enabled() {
97 ctx.add_execution(exec_record).await;
98 }
99 }
100
101 Ok(outputs)
102 }
103}
104
105impl Default for SequentialOrchestrator {
106 fn default() -> Self {
107 Self::new()
108 }
109}
110
111#[async_trait::async_trait]
112impl Orchestrator for SequentialOrchestrator {
113 fn name(&self) -> &str {
114 self.base.name()
115 }
116
117 fn description(&self) -> &str {
118 self.base.description()
119 }
120
121 async fn orchestrate(
122 &self,
123 agents: Vec<Box<dyn Agent>>,
124 input: OrchestratorInput,
125 ) -> Result<OrchestratorOutput> {
126 if agents.is_empty() {
127 return Err(
128 crate::orchestration::errors::OrchestrationError::invalid_config(
129 "At least one agent is required",
130 ),
131 );
132 }
133
134 let config = crate::orchestration::context::ExecutionConfig::new();
136 let ctx = ExecutionContext::new(config);
137
138 let agent_input = self.base.input_to_agent_input(&input);
139
140 let outputs = match self.execute_sequential(agents, agent_input, &ctx).await {
142 Ok(outputs) => outputs,
143 Err(e) => {
144 ctx.complete_trace().await;
145 let trace = ctx.get_trace().await;
146 return Ok(OrchestratorOutput::failure(e.to_string(), trace));
147 },
148 };
149
150 ctx.complete_trace().await;
152 let trace = ctx.get_trace().await;
153
154 let final_output = outputs.last().unwrap();
156 let result = final_output.content.clone();
157
158 Ok(OrchestratorOutput::success(result, outputs, trace))
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use crate::orchestration::agent::SimpleAgent;
166
167 #[tokio::test]
168 async fn test_sequential_orchestrator() {
169 let orchestrator = SequentialOrchestrator::new();
170
171 let agent1 = SimpleAgent::new("Agent1", "First agent", |input| {
173 Ok(AgentOutput::new(format!("Step 1: {}", input.content)).with_metadata("step", "1"))
174 });
175
176 let agent2 = SimpleAgent::new("Agent2", "Second agent", |input| {
177 Ok(AgentOutput::new(format!("Step 2: {}", input.content)).with_metadata("step", "2"))
178 });
179
180 let agent3 = SimpleAgent::new("Agent3", "Third agent", |input| {
181 Ok(AgentOutput::new(format!("Step 3: {}", input.content)).with_metadata("step", "3"))
182 });
183
184 let agents: Vec<Box<dyn Agent>> =
185 vec![Box::new(agent1), Box::new(agent2), Box::new(agent3)];
186
187 let input = OrchestratorInput::new("Initial input");
188
189 let output = orchestrator.orchestrate(agents, input).await.unwrap();
190
191 assert!(output.is_successful());
192 assert_eq!(output.agent_outputs.len(), 3);
193 assert_eq!(output.agent_outputs[0].content, "Step 1: Initial input");
194 assert_eq!(
195 output.agent_outputs[1].content,
196 "Step 2: Step 1: Initial input"
197 );
198 assert_eq!(
199 output.agent_outputs[2].content,
200 "Step 3: Step 2: Step 1: Initial input"
201 );
202 assert_eq!(output.result, "Step 3: Step 2: Step 1: Initial input");
203 }
204
205 #[tokio::test]
206 async fn test_sequential_orchestrator_empty_agents() {
207 let orchestrator = SequentialOrchestrator::new();
208 let agents: Vec<Box<dyn Agent>> = vec![];
209 let input = OrchestratorInput::new("Test");
210
211 let result = orchestrator.orchestrate(agents, input).await;
212
213 assert!(result.is_err());
214 assert!(matches!(
215 result.unwrap_err(),
216 crate::orchestration::errors::OrchestrationError::InvalidConfig(_)
217 ));
218 }
219
220 #[tokio::test]
221 async fn test_sequential_orchestrator_with_retry() {
222 let orchestrator = SequentialOrchestrator::new().with_max_retries(2);
223
224 let call_count = std::sync::atomic::AtomicUsize::new(0);
225
226 let agent = SimpleAgent::new("FlakyAgent", "Sometimes fails", move |input| {
227 let count = call_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
228 if count < 2 {
229 Err(anyhow::anyhow!("Temporary failure").into())
231 } else {
232 Ok(AgentOutput::new(format!("Success: {}", input.content)))
233 }
234 });
235
236 let agents: Vec<Box<dyn Agent>> = vec![Box::new(agent)];
237 let input = OrchestratorInput::new("Test");
238
239 let output = orchestrator.orchestrate(agents, input).await.unwrap();
240
241 assert!(output.is_successful());
242 assert_eq!(output.agent_outputs[0].content, "Success: Test");
243 }
244}