1use crate::error::MultiError;
9use crate::mailbox::Mailbox;
10use crate::runner::AgentRunner;
11use crate::shared::SharedInfra;
12use crate::types::{AgentOutput, AgentSpec};
13use serde::{Deserialize, Serialize};
14use std::sync::Arc;
15use std::time::Instant;
16
17#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
18#[serde(rename_all = "snake_case")]
19pub enum SwarmMode {
20 Parallel,
21 Sequential,
22 Debate,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct SwarmResult {
27 pub task: String,
28 pub outputs: Vec<AgentOutput>,
29 pub final_summary: String,
30}
31
32pub struct Swarm {
33 pub agents: Vec<AgentSpec>,
34 pub mode: SwarmMode,
35 pub synthesizer: Option<AgentSpec>,
36}
37
38impl Swarm {
39 pub fn new(agents: Vec<AgentSpec>, mode: SwarmMode) -> Self {
40 Self {
41 agents,
42 mode,
43 synthesizer: None,
44 }
45 }
46
47 pub fn with_synthesizer(mut self, spec: AgentSpec) -> Self {
48 self.synthesizer = Some(spec);
49 self
50 }
51
52 pub fn run<'a>(
53 &'a self,
54 task: &'a str,
55 runner: &'a Arc<dyn AgentRunner>,
56 infra: &'a SharedInfra,
57 ) -> futures::future::BoxFuture<'a, Result<SwarmResult, MultiError>> {
58 Box::pin(async move {
59 match self.mode {
60 SwarmMode::Parallel => self.run_parallel(task, runner, infra).await,
61 SwarmMode::Sequential => self.run_sequential(task, runner, infra).await,
62 SwarmMode::Debate => self.run_debate(task, runner, infra).await,
63 }
64 })
65 }
66
67 async fn run_parallel(
68 &self,
69 task: &str,
70 runner: &Arc<dyn AgentRunner>,
71 infra: &SharedInfra,
72 ) -> Result<SwarmResult, MultiError> {
73 let mailbox = Arc::new(Mailbox::default());
74
75 let mut handles = Vec::new();
76 for spec in &self.agents {
77 let runner = Arc::clone(runner);
78 let rt = infra.make_runtime();
79 let spec = spec.clone();
80 let task = task.to_string();
81 let mailbox = Arc::clone(&mailbox);
82
83 for tool in &spec.tools {
85 rt.register_tool(tool).await;
86 }
87
88 handles.push(tokio::spawn(async move {
89 runner.run(&spec, &task, &rt, &mailbox).await
90 }));
91 }
92
93 let results = futures::future::join_all(handles).await;
94 let mut outputs = Vec::new();
95 for (i, result) in results.into_iter().enumerate() {
96 match result {
97 Ok(Ok(output)) => {
98 infra.state.set(
100 &format!("agent.{}.answer", output.name),
101 serde_json::Value::String(output.answer.clone()),
102 &format!("swarm.{}", output.name),
103 );
104 outputs.push(output);
105 }
106 Ok(Err(e)) => {
107 outputs.push(AgentOutput {
108 name: self.agents[i].name.clone(),
109 answer: String::new(),
110 turns: 0,
111 tool_calls: 0,
112 duration_ms: 0.0,
113 error: Some(e.to_string()),
114 });
115 }
116 Err(e) => {
117 outputs.push(AgentOutput {
118 name: self.agents[i].name.clone(),
119 answer: String::new(),
120 turns: 0,
121 tool_calls: 0,
122 duration_ms: 0.0,
123 error: Some(format!("join error: {}", e)),
124 });
125 }
126 }
127 }
128
129 let summary = self.synthesize(task, &outputs, runner, infra).await;
130
131 Ok(SwarmResult {
132 task: task.to_string(),
133 outputs,
134 final_summary: summary,
135 })
136 }
137
138 async fn run_sequential(
139 &self,
140 task: &str,
141 runner: &Arc<dyn AgentRunner>,
142 infra: &SharedInfra,
143 ) -> Result<SwarmResult, MultiError> {
144 let mailbox = Arc::new(Mailbox::default());
145 let mut outputs = Vec::new();
146
147 for spec in &self.agents {
148 let enriched = if outputs.is_empty() {
150 task.to_string()
151 } else {
152 let prior: Vec<String> = outputs
153 .iter()
154 .filter_map(|o: &AgentOutput| {
155 if o.succeeded() {
156 Some(format!("- {}: {}", o.name, truncate(&o.answer, 300)))
157 } else {
158 None
159 }
160 })
161 .collect();
162 format!("{}\n\nPrior agents' findings:\n{}", task, prior.join("\n"))
163 };
164
165 let rt = infra.make_runtime();
166 for tool in &spec.tools {
167 rt.register_tool(tool).await;
168 }
169
170 let start = Instant::now();
171 match runner.run(spec, &enriched, &rt, &mailbox).await {
172 Ok(output) => {
173 infra.state.set(
174 &format!("agent.{}.answer", output.name),
175 serde_json::Value::String(output.answer.clone()),
176 &format!("swarm.{}", output.name),
177 );
178 outputs.push(output);
179 }
180 Err(e) => {
181 outputs.push(AgentOutput {
182 name: spec.name.clone(),
183 answer: String::new(),
184 turns: 0,
185 tool_calls: 0,
186 duration_ms: start.elapsed().as_secs_f64() * 1000.0,
187 error: Some(e.to_string()),
188 });
189 }
190 }
191 }
192
193 let summary = self.synthesize(task, &outputs, runner, infra).await;
194
195 Ok(SwarmResult {
196 task: task.to_string(),
197 outputs,
198 final_summary: summary,
199 })
200 }
201
202 async fn run_debate(
203 &self,
204 task: &str,
205 runner: &Arc<dyn AgentRunner>,
206 infra: &SharedInfra,
207 ) -> Result<SwarmResult, MultiError> {
208 let round1 = Swarm::new(self.agents.clone(), SwarmMode::Parallel)
210 .run(task, runner, infra)
211 .await?;
212
213 let mut critique_specs = Vec::new();
215 for spec in &self.agents {
216 let others: Vec<String> = round1
217 .outputs
218 .iter()
219 .filter(|o| o.name != spec.name && o.succeeded())
220 .map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 300)))
221 .collect();
222
223 let critique_prompt = format!(
224 "{}\n\nOriginal task: {}\n\nOther agents' answers:\n{}\n\n\
225 Critique these answers and provide your improved response.",
226 spec.system_prompt,
227 task,
228 others.join("\n")
229 );
230
231 let mut critique_spec = spec.clone();
232 critique_spec.name = format!("{}_critique", spec.name);
233 critique_spec.system_prompt = critique_prompt;
234 critique_specs.push(critique_spec);
235 }
236
237 let round2 = Swarm::new(critique_specs, SwarmMode::Parallel)
238 .run(task, runner, infra)
239 .await?;
240
241 let mut all_outputs = round1.outputs;
243 all_outputs.extend(round2.outputs);
244
245 let summary = self.synthesize(task, &all_outputs, runner, infra).await;
246
247 Ok(SwarmResult {
248 task: task.to_string(),
249 outputs: all_outputs,
250 final_summary: summary,
251 })
252 }
253
254 async fn synthesize(
255 &self,
256 task: &str,
257 outputs: &[AgentOutput],
258 runner: &Arc<dyn AgentRunner>,
259 infra: &SharedInfra,
260 ) -> String {
261 let answers: Vec<&AgentOutput> = outputs.iter().filter(|o| o.succeeded()).collect();
262 if answers.is_empty() {
263 return "[no agent produced an answer]".to_string();
264 }
265 if answers.len() == 1 {
266 return answers[0].answer.clone();
267 }
268
269 if let Some(synth_spec) = &self.synthesizer {
270 let summaries: Vec<String> = answers
271 .iter()
272 .map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 500)))
273 .collect();
274
275 let synth_task = format!(
276 "Original task: {}\n\nAgent outputs:\n{}\n\nSynthesize these into a single coherent answer.",
277 task,
278 summaries.join("\n")
279 );
280
281 let mailbox = Mailbox::default();
282 let rt = infra.make_runtime();
283 if let Ok(output) = runner.run(synth_spec, &synth_task, &rt, &mailbox).await {
284 return output.answer;
285 }
286 }
287
288 answers
290 .iter()
291 .map(|o| format!("## {}\n{}", o.name, o.answer))
292 .collect::<Vec<_>>()
293 .join("\n\n")
294 }
295}
296
297fn truncate(s: &str, max_len: usize) -> &str {
298 if s.len() <= max_len {
299 return s;
300 }
301 let mut end = max_len;
302 while end > 0 && !s.is_char_boundary(end) {
303 end -= 1;
304 }
305 &s[..end]
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311 use crate::error::MultiError;
312 use crate::mailbox::Mailbox;
313 use crate::runner::AgentRunner;
314 use crate::types::{AgentOutput, AgentSpec};
315 use car_engine::Runtime;
316 use std::sync::atomic::{AtomicU32, Ordering};
317
318 struct MockRunner {
319 call_count: AtomicU32,
320 }
321
322 #[async_trait::async_trait]
323 impl AgentRunner for MockRunner {
324 async fn run(
325 &self,
326 spec: &AgentSpec,
327 task: &str,
328 _runtime: &Runtime,
329 _mailbox: &Mailbox,
330 ) -> Result<AgentOutput, MultiError> {
331 let _n = self.call_count.fetch_add(1, Ordering::SeqCst);
332 Ok(AgentOutput {
333 name: spec.name.clone(),
334 answer: format!("answer from {} for: {}", spec.name, &task[..task.len().min(50)]),
335 turns: 1,
336 tool_calls: 0,
337 duration_ms: 10.0,
338 error: None,
339 })
340 }
341 }
342
343 #[tokio::test]
344 async fn test_parallel_swarm() {
345 let agents = vec![
346 AgentSpec::new("alice", "You are Alice"),
347 AgentSpec::new("bob", "You are Bob"),
348 ];
349 let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
350 call_count: AtomicU32::new(0),
351 });
352 let infra = SharedInfra::new();
353
354 let result = Swarm::new(agents, SwarmMode::Parallel)
355 .run("test task", &runner, &infra)
356 .await
357 .unwrap();
358
359 assert_eq!(result.outputs.len(), 2);
360 assert!(result.outputs.iter().all(|o| o.succeeded()));
361
362 assert!(infra.state.get("agent.alice.answer").is_some());
364 assert!(infra.state.get("agent.bob.answer").is_some());
365 }
366
367 #[tokio::test]
368 async fn test_sequential_swarm() {
369 let agents = vec![
370 AgentSpec::new("first", "Go first"),
371 AgentSpec::new("second", "Go second"),
372 ];
373 let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
374 call_count: AtomicU32::new(0),
375 });
376 let infra = SharedInfra::new();
377
378 let result = Swarm::new(agents, SwarmMode::Sequential)
379 .run("sequential task", &runner, &infra)
380 .await
381 .unwrap();
382
383 assert_eq!(result.outputs.len(), 2);
384 assert!(result.outputs[1].answer.contains("Prior agents"));
386 }
387
388 #[tokio::test]
389 async fn test_debate_swarm() {
390 let agents = vec![
391 AgentSpec::new("debater_a", "Argue for"),
392 AgentSpec::new("debater_b", "Argue against"),
393 ];
394 let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
395 call_count: AtomicU32::new(0),
396 });
397 let infra = SharedInfra::new();
398
399 let result = Swarm::new(agents, SwarmMode::Debate)
400 .run("debate topic", &runner, &infra)
401 .await
402 .unwrap();
403
404 assert_eq!(result.outputs.len(), 4);
406 }
407}