1use crate::error::MultiError;
9use crate::mailbox::Mailbox;
10use crate::runner::AgentRunner;
11use crate::shared::SharedInfra;
12use crate::types::{AgentOutput, AgentSpec};
13use serde::{Deserialize, Serialize};
14use std::sync::Arc;
15use std::time::Instant;
16use tracing::instrument;
17
18#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
19#[serde(rename_all = "snake_case")]
20pub enum SwarmMode {
21 Parallel,
22 Sequential,
23 Debate,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct SwarmResult {
28 pub task: String,
29 pub outputs: Vec<AgentOutput>,
30 pub final_summary: String,
31}
32
33pub struct Swarm {
34 pub agents: Vec<AgentSpec>,
35 pub mode: SwarmMode,
36 pub synthesizer: Option<AgentSpec>,
37 pub isolated: bool,
41 pub workspaces: Option<crate::workspace::WorkspaceConfig>,
44}
45
46impl Swarm {
47 pub fn new(agents: Vec<AgentSpec>, mode: SwarmMode) -> Self {
48 Self {
49 agents,
50 mode,
51 synthesizer: None,
52 isolated: false,
53 workspaces: None,
54 }
55 }
56
57 pub fn with_synthesizer(mut self, spec: AgentSpec) -> Self {
58 self.synthesizer = Some(spec);
59 self
60 }
61
62 pub fn with_isolation(mut self) -> Self {
64 self.isolated = true;
65 self
66 }
67
68 pub fn with_workspaces(mut self, config: crate::workspace::WorkspaceConfig) -> Self {
74 self.workspaces = Some(config);
75 self
76 }
77
78 #[instrument(name = "multi.swarm", skip_all)]
79 pub fn run<'a>(
80 &'a self,
81 task: &'a str,
82 runner: &'a Arc<dyn AgentRunner>,
83 infra: &'a SharedInfra,
84 ) -> futures::future::BoxFuture<'a, Result<SwarmResult, MultiError>> {
85 Box::pin(async move {
86 match self.mode {
87 SwarmMode::Parallel => self.run_parallel(task, runner, infra).await,
88 SwarmMode::Sequential => self.run_sequential(task, runner, infra).await,
89 SwarmMode::Debate => self.run_debate(task, runner, infra).await,
90 }
91 })
92 }
93
94 async fn run_parallel(
95 &self,
96 task: &str,
97 runner: &Arc<dyn AgentRunner>,
98 infra: &SharedInfra,
99 ) -> Result<SwarmResult, MultiError> {
100 let mailbox = Arc::new(Mailbox::default());
101
102 enum Slot {
106 Spawned(usize),
107 Skipped(AgentOutput),
108 }
109
110 let mut handles: Vec<
113 tokio::task::JoinHandle<(
114 Result<AgentOutput, MultiError>,
115 Option<crate::task_context::AgentContext>,
116 )>,
117 > = Vec::new();
118 let mut slots: Vec<Slot> = Vec::new();
119
120 for spec in &self.agents {
121 let workspace = match &self.workspaces {
128 Some(cfg) => match crate::workspace::AgentWorkspace::provision(cfg, &spec.name) {
129 Ok(ws) => Some(ws),
130 Err(e) => {
131 slots.push(Slot::Skipped(AgentOutput {
132 name: spec.name.clone(),
133 answer: String::new(),
134 turns: 0,
135 tool_calls: 0,
136 duration_ms: 0.0,
137 error: Some(format!("workspace provisioning failed: {e}")),
138 outcome: None,
139 tokens: None,
140 }));
141 continue;
142 }
143 },
144 None => None,
145 };
146
147 if let Err(e) = infra.begin_agent() {
152 slots.push(Slot::Skipped(crate::budget::budget_skipped_output(
153 &spec.name, &e,
154 )));
155 continue;
156 }
157
158 let runner = Arc::clone(runner);
159 let mut spec = spec.clone();
160 if let Some(ws) = &workspace {
161 spec = ws.inject(spec);
162 }
163 let task = task.to_string();
164 let mailbox = Arc::clone(&mailbox);
165
166 if self.isolated {
167 let (rt, ctx) = infra.make_isolated_runtime(&spec.name);
168 for tool in &spec.tools {
169 rt.register_tool(tool).await;
170 }
171 let ctx_clone = ctx.clone();
172 handles.push(tokio::spawn(async move {
173 let _workspace = workspace;
176 let result = crate::task_context::TaskScope::run(ctx_clone, async {
177 runner.run(&spec, &task, &rt, &mailbox).await
178 })
179 .await;
180 (result, Some(ctx))
181 }));
182 } else {
183 let rt = infra.make_runtime();
184 for tool in &spec.tools {
185 rt.register_tool(tool).await;
186 }
187 handles.push(tokio::spawn(async move {
188 let _workspace = workspace;
189 let result = runner.run(&spec, &task, &rt, &mailbox).await;
190 (result, None)
191 }));
192 }
193 slots.push(Slot::Spawned(handles.len() - 1));
194 }
195
196 let mut results: Vec<Option<_>> = futures::future::join_all(handles)
198 .await
199 .into_iter()
200 .map(Some)
201 .collect();
202 let mut outputs = Vec::new();
203 for (i, slot) in slots.into_iter().enumerate() {
204 let handle_idx = match slot {
205 Slot::Skipped(output) => {
206 outputs.push(output);
207 continue;
208 }
209 Slot::Spawned(idx) => idx,
210 };
211 match results.get_mut(handle_idx).and_then(Option::take) {
212 Some(Ok((Ok(output), ctx))) => {
213 if let Some(ctx) = ctx {
215 ctx.merge_to_parent();
216 }
217 infra.record_output(&output);
219 infra.state.set(
221 &format!("agent.{}.answer", output.name),
222 serde_json::Value::String(output.answer.clone()),
223 &format!("swarm.{}", output.name),
224 );
225 outputs.push(output);
226 }
227 Some(Ok((Err(e), _ctx))) => {
228 outputs.push(AgentOutput {
232 name: self.agents[i].name.clone(),
233 answer: String::new(),
234 turns: 0,
235 tool_calls: 0,
236 duration_ms: 0.0,
237 error: Some(e.to_string()),
238 outcome: None,
239 tokens: None,
240 });
241 }
242 Some(Err(e)) => {
243 outputs.push(AgentOutput {
244 name: self.agents[i].name.clone(),
245 answer: String::new(),
246 turns: 0,
247 tool_calls: 0,
248 duration_ms: 0.0,
249 error: Some(format!("join error: {}", e)),
250 outcome: None,
251 tokens: None,
252 });
253 }
254 None => {
255 outputs.push(AgentOutput {
256 name: self.agents[i].name.clone(),
257 answer: String::new(),
258 turns: 0,
259 tool_calls: 0,
260 duration_ms: 0.0,
261 error: Some("internal: missing join result".to_string()),
262 outcome: None,
263 tokens: None,
264 });
265 }
266 }
267 }
268
269 let summary = self.synthesize(task, &outputs, runner, infra).await;
270
271 Ok(SwarmResult {
272 task: task.to_string(),
273 outputs,
274 final_summary: summary,
275 })
276 }
277
278 async fn run_sequential(
279 &self,
280 task: &str,
281 runner: &Arc<dyn AgentRunner>,
282 infra: &SharedInfra,
283 ) -> Result<SwarmResult, MultiError> {
284 let mailbox = Arc::new(Mailbox::default());
285 let mut outputs = Vec::new();
286
287 for spec in &self.agents {
288 if let Err(e) = infra.begin_agent() {
292 outputs.push(crate::budget::budget_skipped_output(&spec.name, &e));
293 continue;
294 }
295
296 let enriched = if outputs.is_empty() {
298 task.to_string()
299 } else {
300 let prior: Vec<String> = outputs
301 .iter()
302 .filter_map(|o: &AgentOutput| {
303 if o.succeeded() {
304 Some(format!("- {}: {}", o.name, truncate(&o.answer, 300)))
305 } else {
306 None
307 }
308 })
309 .collect();
310 format!("{}\n\nPrior agents' findings:\n{}", task, prior.join("\n"))
311 };
312
313 let rt = infra.make_runtime();
314 for tool in &spec.tools {
315 rt.register_tool(tool).await;
316 }
317
318 let start = Instant::now();
319 match runner.run(spec, &enriched, &rt, &mailbox).await {
320 Ok(output) => {
321 infra.record_output(&output);
322 infra.state.set(
323 &format!("agent.{}.answer", output.name),
324 serde_json::Value::String(output.answer.clone()),
325 &format!("swarm.{}", output.name),
326 );
327 outputs.push(output);
328 }
329 Err(e) => {
330 outputs.push(AgentOutput {
331 name: spec.name.clone(),
332 answer: String::new(),
333 turns: 0,
334 tool_calls: 0,
335 duration_ms: start.elapsed().as_secs_f64() * 1000.0,
336 error: Some(e.to_string()),
337 outcome: None,
338 tokens: None,
339 });
340 }
341 }
342 }
343
344 let summary = self.synthesize(task, &outputs, runner, infra).await;
345
346 Ok(SwarmResult {
347 task: task.to_string(),
348 outputs,
349 final_summary: summary,
350 })
351 }
352
353 async fn run_debate(
354 &self,
355 task: &str,
356 runner: &Arc<dyn AgentRunner>,
357 infra: &SharedInfra,
358 ) -> Result<SwarmResult, MultiError> {
359 let round1 = Swarm::new(self.agents.clone(), SwarmMode::Parallel)
361 .run(task, runner, infra)
362 .await?;
363
364 let mut critique_specs = Vec::new();
366 for spec in &self.agents {
367 let others: Vec<String> = round1
368 .outputs
369 .iter()
370 .filter(|o| o.name != spec.name && o.succeeded())
371 .map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 300)))
372 .collect();
373
374 let critique_prompt = format!(
375 "{}\n\nOriginal task: {}\n\nOther agents' answers:\n{}\n\n\
376 Critique these answers and provide your improved response.",
377 spec.system_prompt,
378 task,
379 others.join("\n")
380 );
381
382 let mut critique_spec = spec.clone();
383 critique_spec.name = format!("{}_critique", spec.name);
384 critique_spec.system_prompt = critique_prompt;
385 critique_specs.push(critique_spec);
386 }
387
388 let round2 = Swarm::new(critique_specs, SwarmMode::Parallel)
389 .run(task, runner, infra)
390 .await?;
391
392 let mut all_outputs = round1.outputs;
394 all_outputs.extend(round2.outputs);
395
396 let summary = self.synthesize(task, &all_outputs, runner, infra).await;
397
398 Ok(SwarmResult {
399 task: task.to_string(),
400 outputs: all_outputs,
401 final_summary: summary,
402 })
403 }
404
405 async fn synthesize(
406 &self,
407 task: &str,
408 outputs: &[AgentOutput],
409 runner: &Arc<dyn AgentRunner>,
410 infra: &SharedInfra,
411 ) -> String {
412 let answers: Vec<&AgentOutput> = outputs.iter().filter(|o| o.succeeded()).collect();
413 if answers.is_empty() {
414 return "[no agent produced an answer]".to_string();
415 }
416 if answers.len() == 1 {
417 return answers[0].answer.clone();
418 }
419
420 if let Some(synth_spec) = &self.synthesizer {
421 let summaries: Vec<String> = answers
422 .iter()
423 .map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 500)))
424 .collect();
425
426 let synth_task = format!(
427 "Original task: {}\n\nAgent outputs:\n{}\n\nSynthesize these into a single coherent answer.",
428 task,
429 summaries.join("\n")
430 );
431
432 if infra.begin_agent().is_ok() {
435 let mailbox = Mailbox::default();
436 let rt = infra.make_runtime();
437 if let Ok(output) = runner.run(synth_spec, &synth_task, &rt, &mailbox).await {
438 infra.record_output(&output);
439 return output.answer;
440 }
441 }
442 }
443
444 answers
446 .iter()
447 .map(|o| format!("## {}\n{}", o.name, o.answer))
448 .collect::<Vec<_>>()
449 .join("\n\n")
450 }
451}
452
453fn truncate(s: &str, max_len: usize) -> &str {
454 if s.len() <= max_len {
455 return s;
456 }
457 let mut end = max_len;
458 while end > 0 && !s.is_char_boundary(end) {
459 end -= 1;
460 }
461 &s[..end]
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467 use crate::error::MultiError;
468 use crate::mailbox::Mailbox;
469 use crate::runner::AgentRunner;
470 use crate::types::{AgentOutput, AgentSpec};
471 use car_engine::Runtime;
472 use std::sync::atomic::{AtomicU32, Ordering};
473
474 struct MockRunner {
475 call_count: AtomicU32,
476 }
477
478 #[async_trait::async_trait]
479 impl AgentRunner for MockRunner {
480 async fn run(
481 &self,
482 spec: &AgentSpec,
483 task: &str,
484 _runtime: &Runtime,
485 _mailbox: &Mailbox,
486 ) -> Result<AgentOutput, MultiError> {
487 let _n = self.call_count.fetch_add(1, Ordering::SeqCst);
488 Ok(AgentOutput {
489 name: spec.name.clone(),
490 answer: format!(
491 "answer from {} for: {}",
492 spec.name,
493 &task[..task.len().min(50)]
494 ),
495 turns: 1,
496 tool_calls: 0,
497 duration_ms: 10.0,
498 error: None,
499 outcome: None,
500 tokens: None,
501 })
502 }
503 }
504
505 #[tokio::test]
506 async fn test_parallel_swarm() {
507 let agents = vec![
508 AgentSpec::new("alice", "You are Alice"),
509 AgentSpec::new("bob", "You are Bob"),
510 ];
511 let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
512 call_count: AtomicU32::new(0),
513 });
514 let infra = SharedInfra::new();
515
516 let result = Swarm::new(agents, SwarmMode::Parallel)
517 .run("test task", &runner, &infra)
518 .await
519 .unwrap();
520
521 assert_eq!(result.outputs.len(), 2);
522 assert!(result.outputs.iter().all(|o| o.succeeded()));
523
524 assert!(infra.state.get("agent.alice.answer").is_some());
526 assert!(infra.state.get("agent.bob.answer").is_some());
527 }
528
529 #[tokio::test]
530 async fn test_sequential_swarm() {
531 let agents = vec![
532 AgentSpec::new("first", "Go first"),
533 AgentSpec::new("second", "Go second"),
534 ];
535 let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
536 call_count: AtomicU32::new(0),
537 });
538 let infra = SharedInfra::new();
539
540 let result = Swarm::new(agents, SwarmMode::Sequential)
541 .run("sequential task", &runner, &infra)
542 .await
543 .unwrap();
544
545 assert_eq!(result.outputs.len(), 2);
546 assert!(result.outputs[1].answer.contains("Prior agents"));
548 }
549
550 struct TokenRunner {
552 per_call_total: u64,
553 }
554
555 #[async_trait::async_trait]
556 impl AgentRunner for TokenRunner {
557 async fn run(
558 &self,
559 spec: &AgentSpec,
560 _task: &str,
561 _runtime: &Runtime,
562 _mailbox: &Mailbox,
563 ) -> Result<AgentOutput, MultiError> {
564 Ok(AgentOutput {
565 name: spec.name.clone(),
566 answer: format!("answer from {}", spec.name),
567 turns: 1,
568 tool_calls: 0,
569 duration_ms: 1.0,
570 error: None,
571 outcome: None,
572 tokens: Some(crate::types::TokenAccounting::new(
573 self.per_call_total,
574 0,
575 0.0,
576 )),
577 })
578 }
579 }
580
581 #[tokio::test]
582 async fn sequential_budget_stops_chain_when_tokens_exhausted() {
583 let agents = vec![
587 AgentSpec::new("a", ""),
588 AgentSpec::new("b", ""),
589 AgentSpec::new("c", ""),
590 ];
591 let runner: Arc<dyn AgentRunner> = Arc::new(TokenRunner { per_call_total: 100 });
592 let infra = SharedInfra::new().with_budget(crate::BudgetLimits {
593 max_total_tokens: Some(150),
594 ..Default::default()
595 });
596
597 let result = Swarm::new(agents, SwarmMode::Sequential)
598 .run("task", &runner, &infra)
599 .await
600 .unwrap();
601
602 assert_eq!(result.outputs.len(), 3);
603 assert!(result.outputs[0].succeeded());
604 assert!(result.outputs[1].succeeded());
605 assert!(!result.outputs[2].succeeded());
606 assert!(crate::is_budget_skipped(&result.outputs[2]));
607 assert_eq!(infra.budget.snapshot().total_tokens, 200);
608 }
609
610 struct WorkspaceProbeRunner {
612 seen: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
613 }
614
615 #[async_trait::async_trait]
616 impl AgentRunner for WorkspaceProbeRunner {
617 async fn run(
618 &self,
619 spec: &AgentSpec,
620 _task: &str,
621 _runtime: &Runtime,
622 _mailbox: &Mailbox,
623 ) -> Result<AgentOutput, MultiError> {
624 let ws = spec
625 .metadata
626 .get(crate::workspace::WORKSPACE_METADATA_KEY)
627 .and_then(|v| v.as_str())
628 .unwrap_or("")
629 .to_string();
630 self.seen.lock().unwrap().push(ws.clone());
631 assert!(!ws.is_empty() && std::path::Path::new(&ws).is_dir());
633 Ok(AgentOutput {
634 name: spec.name.clone(),
635 answer: "ok".into(),
636 turns: 1,
637 tool_calls: 0,
638 duration_ms: 1.0,
639 error: None,
640 outcome: None,
641 tokens: None,
642 })
643 }
644 }
645
646 #[tokio::test]
647 async fn parallel_workspaces_are_provisioned_and_distinct() {
648 let base = std::env::temp_dir().join(format!("car-swarm-ws-{}", std::process::id()));
649 let seen = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
650 let runner: Arc<dyn AgentRunner> = Arc::new(WorkspaceProbeRunner { seen: seen.clone() });
651 let infra = SharedInfra::new();
652
653 let agents = vec![AgentSpec::new("alice", ""), AgentSpec::new("bob", "")];
654 let result = Swarm::new(agents, SwarmMode::Parallel)
655 .with_workspaces(crate::workspace::WorkspaceConfig::directory(&base))
656 .run("task", &runner, &infra)
657 .await
658 .unwrap();
659
660 assert_eq!(result.outputs.len(), 2);
661 assert!(result.outputs.iter().all(|o| o.succeeded()));
662 let paths = seen.lock().unwrap().clone();
663 assert_eq!(paths.len(), 2);
664 assert_ne!(paths[0], paths[1], "each agent gets a distinct workspace");
665 for p in &paths {
667 assert!(!std::path::Path::new(p).exists(), "workspace removed on drop");
668 }
669 let _ = std::fs::remove_dir_all(&base);
670 }
671
672 #[tokio::test]
673 async fn parallel_budget_agent_cap_skips_excess() {
674 let agents: Vec<AgentSpec> = (0..5)
676 .map(|i| AgentSpec::new(&format!("a{}", i), ""))
677 .collect();
678 let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
679 call_count: AtomicU32::new(0),
680 });
681 let infra = SharedInfra::new().with_budget(crate::BudgetLimits {
682 max_agents: Some(2),
683 ..Default::default()
684 });
685
686 let result = Swarm::new(agents, SwarmMode::Parallel)
687 .run("task", &runner, &infra)
688 .await
689 .unwrap();
690
691 assert_eq!(result.outputs.len(), 5);
692 let ran = result.outputs.iter().filter(|o| o.succeeded()).count();
693 let skipped = result
694 .outputs
695 .iter()
696 .filter(|o| crate::is_budget_skipped(o))
697 .count();
698 assert_eq!(ran, 2);
699 assert_eq!(skipped, 3);
700 }
701
702 #[tokio::test]
703 async fn test_debate_swarm() {
704 let agents = vec![
705 AgentSpec::new("debater_a", "Argue for"),
706 AgentSpec::new("debater_b", "Argue against"),
707 ];
708 let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
709 call_count: AtomicU32::new(0),
710 });
711 let infra = SharedInfra::new();
712
713 let result = Swarm::new(agents, SwarmMode::Debate)
714 .run("debate topic", &runner, &infra)
715 .await
716 .unwrap();
717
718 assert_eq!(result.outputs.len(), 4);
720 }
721}