1use crate::error::MultiError;
9use crate::mailbox::Mailbox;
10use crate::runner::AgentRunner;
11use crate::shared::SharedInfra;
12use crate::types::{AgentOutput, AgentSpec};
13use serde::{Deserialize, Serialize};
14use std::sync::Arc;
15use std::time::Instant;
16use tracing::instrument;
17
18#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
19#[serde(rename_all = "snake_case")]
20pub enum SwarmMode {
21 Parallel,
22 Sequential,
23 Debate,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct SwarmResult {
28 pub task: String,
29 pub outputs: Vec<AgentOutput>,
30 pub final_summary: String,
31}
32
33pub struct Swarm {
34 pub agents: Vec<AgentSpec>,
35 pub mode: SwarmMode,
36 pub synthesizer: Option<AgentSpec>,
37 pub isolated: bool,
41}
42
43impl Swarm {
44 pub fn new(agents: Vec<AgentSpec>, mode: SwarmMode) -> Self {
45 Self {
46 agents,
47 mode,
48 synthesizer: None,
49 isolated: false,
50 }
51 }
52
53 pub fn with_synthesizer(mut self, spec: AgentSpec) -> Self {
54 self.synthesizer = Some(spec);
55 self
56 }
57
58 pub fn with_isolation(mut self) -> Self {
60 self.isolated = true;
61 self
62 }
63
64 #[instrument(name = "multi.swarm", skip_all)]
65 pub fn run<'a>(
66 &'a self,
67 task: &'a str,
68 runner: &'a Arc<dyn AgentRunner>,
69 infra: &'a SharedInfra,
70 ) -> futures::future::BoxFuture<'a, Result<SwarmResult, MultiError>> {
71 Box::pin(async move {
72 match self.mode {
73 SwarmMode::Parallel => self.run_parallel(task, runner, infra).await,
74 SwarmMode::Sequential => self.run_sequential(task, runner, infra).await,
75 SwarmMode::Debate => self.run_debate(task, runner, infra).await,
76 }
77 })
78 }
79
80 async fn run_parallel(
81 &self,
82 task: &str,
83 runner: &Arc<dyn AgentRunner>,
84 infra: &SharedInfra,
85 ) -> Result<SwarmResult, MultiError> {
86 let mailbox = Arc::new(Mailbox::default());
87
88 let mut handles: Vec<
91 tokio::task::JoinHandle<(
92 Result<AgentOutput, MultiError>,
93 Option<crate::task_context::AgentContext>,
94 )>,
95 > = Vec::new();
96
97 for spec in &self.agents {
98 let runner = Arc::clone(runner);
99 let spec = spec.clone();
100 let task = task.to_string();
101 let mailbox = Arc::clone(&mailbox);
102
103 if self.isolated {
104 let (rt, ctx) = infra.make_isolated_runtime(&spec.name);
105 for tool in &spec.tools {
106 rt.register_tool(tool).await;
107 }
108 let ctx_clone = ctx.clone();
109 handles.push(tokio::spawn(async move {
110 let result = crate::task_context::TaskScope::run(ctx_clone, async {
111 runner.run(&spec, &task, &rt, &mailbox).await
112 })
113 .await;
114 (result, Some(ctx))
115 }));
116 } else {
117 let rt = infra.make_runtime();
118 for tool in &spec.tools {
119 rt.register_tool(tool).await;
120 }
121 handles.push(tokio::spawn(async move {
122 let result = runner.run(&spec, &task, &rt, &mailbox).await;
123 (result, None)
124 }));
125 }
126 }
127
128 let results = futures::future::join_all(handles).await;
129 let mut outputs = Vec::new();
130 for (i, result) in results.into_iter().enumerate() {
131 match result {
132 Ok((Ok(output), ctx)) => {
133 if let Some(ctx) = ctx {
135 ctx.merge_to_parent();
136 }
137 infra.state.set(
139 &format!("agent.{}.answer", output.name),
140 serde_json::Value::String(output.answer.clone()),
141 &format!("swarm.{}", output.name),
142 );
143 outputs.push(output);
144 }
145 Ok((Err(e), _ctx)) => {
146 outputs.push(AgentOutput {
147 name: self.agents[i].name.clone(),
148 answer: String::new(),
149 turns: 0,
150 tool_calls: 0,
151 duration_ms: 0.0,
152 error: Some(e.to_string()),
153 outcome: None,
154 tokens: None,
155 });
156 }
157 Err(e) => {
158 outputs.push(AgentOutput {
159 name: self.agents[i].name.clone(),
160 answer: String::new(),
161 turns: 0,
162 tool_calls: 0,
163 duration_ms: 0.0,
164 error: Some(format!("join error: {}", e)),
165 outcome: None,
166 tokens: None,
167 });
168 }
169 }
170 }
171
172 let summary = self.synthesize(task, &outputs, runner, infra).await;
173
174 Ok(SwarmResult {
175 task: task.to_string(),
176 outputs,
177 final_summary: summary,
178 })
179 }
180
181 async fn run_sequential(
182 &self,
183 task: &str,
184 runner: &Arc<dyn AgentRunner>,
185 infra: &SharedInfra,
186 ) -> Result<SwarmResult, MultiError> {
187 let mailbox = Arc::new(Mailbox::default());
188 let mut outputs = Vec::new();
189
190 for spec in &self.agents {
191 let enriched = if outputs.is_empty() {
193 task.to_string()
194 } else {
195 let prior: Vec<String> = outputs
196 .iter()
197 .filter_map(|o: &AgentOutput| {
198 if o.succeeded() {
199 Some(format!("- {}: {}", o.name, truncate(&o.answer, 300)))
200 } else {
201 None
202 }
203 })
204 .collect();
205 format!("{}\n\nPrior agents' findings:\n{}", task, prior.join("\n"))
206 };
207
208 let rt = infra.make_runtime();
209 for tool in &spec.tools {
210 rt.register_tool(tool).await;
211 }
212
213 let start = Instant::now();
214 match runner.run(spec, &enriched, &rt, &mailbox).await {
215 Ok(output) => {
216 infra.state.set(
217 &format!("agent.{}.answer", output.name),
218 serde_json::Value::String(output.answer.clone()),
219 &format!("swarm.{}", output.name),
220 );
221 outputs.push(output);
222 }
223 Err(e) => {
224 outputs.push(AgentOutput {
225 name: spec.name.clone(),
226 answer: String::new(),
227 turns: 0,
228 tool_calls: 0,
229 duration_ms: start.elapsed().as_secs_f64() * 1000.0,
230 error: Some(e.to_string()),
231 outcome: None,
232 tokens: None,
233 });
234 }
235 }
236 }
237
238 let summary = self.synthesize(task, &outputs, runner, infra).await;
239
240 Ok(SwarmResult {
241 task: task.to_string(),
242 outputs,
243 final_summary: summary,
244 })
245 }
246
247 async fn run_debate(
248 &self,
249 task: &str,
250 runner: &Arc<dyn AgentRunner>,
251 infra: &SharedInfra,
252 ) -> Result<SwarmResult, MultiError> {
253 let round1 = Swarm::new(self.agents.clone(), SwarmMode::Parallel)
255 .run(task, runner, infra)
256 .await?;
257
258 let mut critique_specs = Vec::new();
260 for spec in &self.agents {
261 let others: Vec<String> = round1
262 .outputs
263 .iter()
264 .filter(|o| o.name != spec.name && o.succeeded())
265 .map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 300)))
266 .collect();
267
268 let critique_prompt = format!(
269 "{}\n\nOriginal task: {}\n\nOther agents' answers:\n{}\n\n\
270 Critique these answers and provide your improved response.",
271 spec.system_prompt,
272 task,
273 others.join("\n")
274 );
275
276 let mut critique_spec = spec.clone();
277 critique_spec.name = format!("{}_critique", spec.name);
278 critique_spec.system_prompt = critique_prompt;
279 critique_specs.push(critique_spec);
280 }
281
282 let round2 = Swarm::new(critique_specs, SwarmMode::Parallel)
283 .run(task, runner, infra)
284 .await?;
285
286 let mut all_outputs = round1.outputs;
288 all_outputs.extend(round2.outputs);
289
290 let summary = self.synthesize(task, &all_outputs, runner, infra).await;
291
292 Ok(SwarmResult {
293 task: task.to_string(),
294 outputs: all_outputs,
295 final_summary: summary,
296 })
297 }
298
299 async fn synthesize(
300 &self,
301 task: &str,
302 outputs: &[AgentOutput],
303 runner: &Arc<dyn AgentRunner>,
304 infra: &SharedInfra,
305 ) -> String {
306 let answers: Vec<&AgentOutput> = outputs.iter().filter(|o| o.succeeded()).collect();
307 if answers.is_empty() {
308 return "[no agent produced an answer]".to_string();
309 }
310 if answers.len() == 1 {
311 return answers[0].answer.clone();
312 }
313
314 if let Some(synth_spec) = &self.synthesizer {
315 let summaries: Vec<String> = answers
316 .iter()
317 .map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 500)))
318 .collect();
319
320 let synth_task = format!(
321 "Original task: {}\n\nAgent outputs:\n{}\n\nSynthesize these into a single coherent answer.",
322 task,
323 summaries.join("\n")
324 );
325
326 let mailbox = Mailbox::default();
327 let rt = infra.make_runtime();
328 if let Ok(output) = runner.run(synth_spec, &synth_task, &rt, &mailbox).await {
329 return output.answer;
330 }
331 }
332
333 answers
335 .iter()
336 .map(|o| format!("## {}\n{}", o.name, o.answer))
337 .collect::<Vec<_>>()
338 .join("\n\n")
339 }
340}
341
342fn truncate(s: &str, max_len: usize) -> &str {
343 if s.len() <= max_len {
344 return s;
345 }
346 let mut end = max_len;
347 while end > 0 && !s.is_char_boundary(end) {
348 end -= 1;
349 }
350 &s[..end]
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356 use crate::error::MultiError;
357 use crate::mailbox::Mailbox;
358 use crate::runner::AgentRunner;
359 use crate::types::{AgentOutput, AgentSpec};
360 use car_engine::Runtime;
361 use std::sync::atomic::{AtomicU32, Ordering};
362
363 struct MockRunner {
364 call_count: AtomicU32,
365 }
366
367 #[async_trait::async_trait]
368 impl AgentRunner for MockRunner {
369 async fn run(
370 &self,
371 spec: &AgentSpec,
372 task: &str,
373 _runtime: &Runtime,
374 _mailbox: &Mailbox,
375 ) -> Result<AgentOutput, MultiError> {
376 let _n = self.call_count.fetch_add(1, Ordering::SeqCst);
377 Ok(AgentOutput {
378 name: spec.name.clone(),
379 answer: format!(
380 "answer from {} for: {}",
381 spec.name,
382 &task[..task.len().min(50)]
383 ),
384 turns: 1,
385 tool_calls: 0,
386 duration_ms: 10.0,
387 error: None,
388 outcome: None,
389 tokens: None,
390 })
391 }
392 }
393
394 #[tokio::test]
395 async fn test_parallel_swarm() {
396 let agents = vec![
397 AgentSpec::new("alice", "You are Alice"),
398 AgentSpec::new("bob", "You are Bob"),
399 ];
400 let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
401 call_count: AtomicU32::new(0),
402 });
403 let infra = SharedInfra::new();
404
405 let result = Swarm::new(agents, SwarmMode::Parallel)
406 .run("test task", &runner, &infra)
407 .await
408 .unwrap();
409
410 assert_eq!(result.outputs.len(), 2);
411 assert!(result.outputs.iter().all(|o| o.succeeded()));
412
413 assert!(infra.state.get("agent.alice.answer").is_some());
415 assert!(infra.state.get("agent.bob.answer").is_some());
416 }
417
418 #[tokio::test]
419 async fn test_sequential_swarm() {
420 let agents = vec![
421 AgentSpec::new("first", "Go first"),
422 AgentSpec::new("second", "Go second"),
423 ];
424 let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
425 call_count: AtomicU32::new(0),
426 });
427 let infra = SharedInfra::new();
428
429 let result = Swarm::new(agents, SwarmMode::Sequential)
430 .run("sequential task", &runner, &infra)
431 .await
432 .unwrap();
433
434 assert_eq!(result.outputs.len(), 2);
435 assert!(result.outputs[1].answer.contains("Prior agents"));
437 }
438
439 #[tokio::test]
440 async fn test_debate_swarm() {
441 let agents = vec![
442 AgentSpec::new("debater_a", "Argue for"),
443 AgentSpec::new("debater_b", "Argue against"),
444 ];
445 let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
446 call_count: AtomicU32::new(0),
447 });
448 let infra = SharedInfra::new();
449
450 let result = Swarm::new(agents, SwarmMode::Debate)
451 .run("debate topic", &runner, &infra)
452 .await
453 .unwrap();
454
455 assert_eq!(result.outputs.len(), 4);
457 }
458}