Skip to main content

car_multi/patterns/
swarm.rs

1//! Swarm — N agents working on the same problem.
2//!
3//! Modes:
4//! - **Parallel**: all agents run concurrently, then a synthesizer combines results.
5//! - **Sequential**: agents run one after another, each seeing prior agents' outputs.
6//! - **Debate**: two rounds — initial answers, then critique, then a judge picks the best.
7
8use crate::error::MultiError;
9use crate::mailbox::Mailbox;
10use crate::runner::AgentRunner;
11use crate::shared::SharedInfra;
12use crate::types::{AgentOutput, AgentSpec};
13use serde::{Deserialize, Serialize};
14use std::sync::Arc;
15use std::time::Instant;
16use tracing::instrument;
17
18#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
19#[serde(rename_all = "snake_case")]
20pub enum SwarmMode {
21    Parallel,
22    Sequential,
23    Debate,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct SwarmResult {
28    pub task: String,
29    pub outputs: Vec<AgentOutput>,
30    pub final_summary: String,
31}
32
33pub struct Swarm {
34    pub agents: Vec<AgentSpec>,
35    pub mode: SwarmMode,
36    pub synthesizer: Option<AgentSpec>,
37    /// When true, each agent gets an isolated state overlay.
38    /// Writes go to a per-agent local store; reads fall through to the shared parent.
39    /// On success, local state is merged back to the parent.
40    pub isolated: bool,
41    /// When set (parallel mode), each agent gets an isolated filesystem
42    /// workspace, advertised to the runner via `AgentSpec.metadata["workspace"]`.
43    pub workspaces: Option<crate::workspace::WorkspaceConfig>,
44}
45
46impl Swarm {
47    pub fn new(agents: Vec<AgentSpec>, mode: SwarmMode) -> Self {
48        Self {
49            agents,
50            mode,
51            synthesizer: None,
52            isolated: false,
53            workspaces: None,
54        }
55    }
56
57    pub fn with_synthesizer(mut self, spec: AgentSpec) -> Self {
58        self.synthesizer = Some(spec);
59        self
60    }
61
62    /// Enable per-agent state isolation for this swarm.
63    pub fn with_isolation(mut self) -> Self {
64        self.isolated = true;
65        self
66    }
67
68    /// Provision an isolated filesystem workspace per agent (parallel mode). Each
69    /// agent's [`AgentSpec`] gets a `workspace` metadata entry with its directory;
70    /// the runner is expected to run its file tools there. Workspaces are removed
71    /// when the run completes. Prevents parallel file-mutating agents from
72    /// clobbering one another.
73    pub fn with_workspaces(mut self, config: crate::workspace::WorkspaceConfig) -> Self {
74        self.workspaces = Some(config);
75        self
76    }
77
78    #[instrument(name = "multi.swarm", skip_all)]
79    pub fn run<'a>(
80        &'a self,
81        task: &'a str,
82        runner: &'a Arc<dyn AgentRunner>,
83        infra: &'a SharedInfra,
84    ) -> futures::future::BoxFuture<'a, Result<SwarmResult, MultiError>> {
85        Box::pin(async move {
86            match self.mode {
87                SwarmMode::Parallel => self.run_parallel(task, runner, infra).await,
88                SwarmMode::Sequential => self.run_sequential(task, runner, infra).await,
89                SwarmMode::Debate => self.run_debate(task, runner, infra).await,
90            }
91        })
92    }
93
94    async fn run_parallel(
95        &self,
96        task: &str,
97        runner: &Arc<dyn AgentRunner>,
98        infra: &SharedInfra,
99    ) -> Result<SwarmResult, MultiError> {
100        let mailbox = Arc::new(Mailbox::default());
101
102        // Each agent slot is either spawned (index into `handles`) or pre-empted
103        // by the coordination budget. Keeping per-agent slots preserves output
104        // order even when some agents are skipped.
105        enum Slot {
106            Spawned(usize),
107            Skipped(AgentOutput),
108        }
109
110        // When isolated, each handle returns (Result, Option<AgentContext>) so we
111        // can merge state back on success.  When not isolated, the context is None.
112        let mut handles: Vec<
113            tokio::task::JoinHandle<(
114                Result<AgentOutput, MultiError>,
115                Option<crate::task_context::AgentContext>,
116            )>,
117        > = Vec::new();
118        let mut slots: Vec<Slot> = Vec::new();
119
120        for spec in &self.agents {
121            // Provision an isolated filesystem workspace if configured, and
122            // advertise its path to the runner via the spec's metadata. Done
123            // BEFORE the budget reservation so a provisioning failure doesn't
124            // burn a (non-refundable) agent slot. On failure, fail this agent
125            // closed rather than running it unisolated and risk clobbering a
126            // sibling.
127            let workspace = match &self.workspaces {
128                Some(cfg) => match crate::workspace::AgentWorkspace::provision(cfg, &spec.name) {
129                    Ok(ws) => Some(ws),
130                    Err(e) => {
131                        slots.push(Slot::Skipped(AgentOutput {
132                            name: spec.name.clone(),
133                            answer: String::new(),
134                            turns: 0,
135                            tool_calls: 0,
136                            duration_ms: 0.0,
137                            error: Some(format!("workspace provisioning failed: {e}")),
138                            outcome: None,
139                            tokens: None,
140                        }));
141                        continue;
142                    }
143                },
144                None => None,
145            };
146
147            // Budget pre-flight: a crossed token/cost ceiling or the agent cap
148            // stops further spawns. The whole parallel batch is launched at once,
149            // so this gates the batch rather than metering mid-batch. On denial
150            // the just-provisioned `workspace` guard drops here and cleans up.
151            if let Err(e) = infra.begin_agent() {
152                slots.push(Slot::Skipped(crate::budget::budget_skipped_output(
153                    &spec.name, &e,
154                )));
155                continue;
156            }
157
158            let runner = Arc::clone(runner);
159            let mut spec = spec.clone();
160            if let Some(ws) = &workspace {
161                spec = ws.inject(spec);
162            }
163            let task = task.to_string();
164            let mailbox = Arc::clone(&mailbox);
165
166            if self.isolated {
167                let (rt, ctx) = infra.make_isolated_runtime(&spec.name);
168                for tool in &spec.tools {
169                    rt.register_tool(tool).await;
170                }
171                let ctx_clone = ctx.clone();
172                handles.push(tokio::spawn(async move {
173                    // Hold the workspace guard for the agent's lifetime; dropped
174                    // (cleaned up) when the task finishes.
175                    let _workspace = workspace;
176                    let result = crate::task_context::TaskScope::run(ctx_clone, async {
177                        runner.run(&spec, &task, &rt, &mailbox).await
178                    })
179                    .await;
180                    (result, Some(ctx))
181                }));
182            } else {
183                let rt = infra.make_runtime();
184                for tool in &spec.tools {
185                    rt.register_tool(tool).await;
186                }
187                handles.push(tokio::spawn(async move {
188                    let _workspace = workspace;
189                    let result = runner.run(&spec, &task, &rt, &mailbox).await;
190                    (result, None)
191                }));
192            }
193            slots.push(Slot::Spawned(handles.len() - 1));
194        }
195
196        // Move owned join results out by handle index as each slot is visited.
197        let mut results: Vec<Option<_>> = futures::future::join_all(handles)
198            .await
199            .into_iter()
200            .map(Some)
201            .collect();
202        let mut outputs = Vec::new();
203        for (i, slot) in slots.into_iter().enumerate() {
204            let handle_idx = match slot {
205                Slot::Skipped(output) => {
206                    outputs.push(output);
207                    continue;
208                }
209                Slot::Spawned(idx) => idx,
210            };
211            match results.get_mut(handle_idx).and_then(Option::take) {
212                Some(Ok((Ok(output), ctx))) => {
213                    // Merge isolated state back to parent on success
214                    if let Some(ctx) = ctx {
215                        ctx.merge_to_parent();
216                    }
217                    // Record reported spend against the coordination budget.
218                    infra.record_output(&output);
219                    // Write to shared state
220                    infra.state.set(
221                        &format!("agent.{}.answer", output.name),
222                        serde_json::Value::String(output.answer.clone()),
223                        &format!("swarm.{}", output.name),
224                    );
225                    outputs.push(output);
226                }
227                Some(Ok((Err(e), _ctx))) => {
228                    // Note: an agent that spent tokens before returning Err has
229                    // that spend dropped — the error path carries no token
230                    // payload, so the budget can under-count failed work.
231                    outputs.push(AgentOutput {
232                        name: self.agents[i].name.clone(),
233                        answer: String::new(),
234                        turns: 0,
235                        tool_calls: 0,
236                        duration_ms: 0.0,
237                        error: Some(e.to_string()),
238                        outcome: None,
239                        tokens: None,
240                    });
241                }
242                Some(Err(e)) => {
243                    outputs.push(AgentOutput {
244                        name: self.agents[i].name.clone(),
245                        answer: String::new(),
246                        turns: 0,
247                        tool_calls: 0,
248                        duration_ms: 0.0,
249                        error: Some(format!("join error: {}", e)),
250                        outcome: None,
251                        tokens: None,
252                    });
253                }
254                None => {
255                    outputs.push(AgentOutput {
256                        name: self.agents[i].name.clone(),
257                        answer: String::new(),
258                        turns: 0,
259                        tool_calls: 0,
260                        duration_ms: 0.0,
261                        error: Some("internal: missing join result".to_string()),
262                        outcome: None,
263                        tokens: None,
264                    });
265                }
266            }
267        }
268
269        let summary = self.synthesize(task, &outputs, runner, infra).await;
270
271        Ok(SwarmResult {
272            task: task.to_string(),
273            outputs,
274            final_summary: summary,
275        })
276    }
277
278    async fn run_sequential(
279        &self,
280        task: &str,
281        runner: &Arc<dyn AgentRunner>,
282        infra: &SharedInfra,
283    ) -> Result<SwarmResult, MultiError> {
284        let mailbox = Arc::new(Mailbox::default());
285        let mut outputs = Vec::new();
286
287        for spec in &self.agents {
288            // Budget gate before each agent. In a sequential chain this is real
289            // between-agent enforcement: once a prior agent's reported spend
290            // crosses a limit, the remaining agents are skipped.
291            if let Err(e) = infra.begin_agent() {
292                outputs.push(crate::budget::budget_skipped_output(&spec.name, &e));
293                continue;
294            }
295
296            // Enrich task with prior results
297            let enriched = if outputs.is_empty() {
298                task.to_string()
299            } else {
300                let prior: Vec<String> = outputs
301                    .iter()
302                    .filter_map(|o: &AgentOutput| {
303                        if o.succeeded() {
304                            Some(format!("- {}: {}", o.name, truncate(&o.answer, 300)))
305                        } else {
306                            None
307                        }
308                    })
309                    .collect();
310                format!("{}\n\nPrior agents' findings:\n{}", task, prior.join("\n"))
311            };
312
313            let rt = infra.make_runtime();
314            for tool in &spec.tools {
315                rt.register_tool(tool).await;
316            }
317
318            let start = Instant::now();
319            match runner.run(spec, &enriched, &rt, &mailbox).await {
320                Ok(output) => {
321                    infra.record_output(&output);
322                    infra.state.set(
323                        &format!("agent.{}.answer", output.name),
324                        serde_json::Value::String(output.answer.clone()),
325                        &format!("swarm.{}", output.name),
326                    );
327                    outputs.push(output);
328                }
329                Err(e) => {
330                    outputs.push(AgentOutput {
331                        name: spec.name.clone(),
332                        answer: String::new(),
333                        turns: 0,
334                        tool_calls: 0,
335                        duration_ms: start.elapsed().as_secs_f64() * 1000.0,
336                        error: Some(e.to_string()),
337                        outcome: None,
338                        tokens: None,
339                    });
340                }
341            }
342        }
343
344        let summary = self.synthesize(task, &outputs, runner, infra).await;
345
346        Ok(SwarmResult {
347            task: task.to_string(),
348            outputs,
349            final_summary: summary,
350        })
351    }
352
353    async fn run_debate(
354        &self,
355        task: &str,
356        runner: &Arc<dyn AgentRunner>,
357        infra: &SharedInfra,
358    ) -> Result<SwarmResult, MultiError> {
359        // Round 1: independent answers
360        let round1 = Swarm::new(self.agents.clone(), SwarmMode::Parallel)
361            .run(task, runner, infra)
362            .await?;
363
364        // Round 2: each agent critiques the others
365        let mut critique_specs = Vec::new();
366        for spec in &self.agents {
367            let others: Vec<String> = round1
368                .outputs
369                .iter()
370                .filter(|o| o.name != spec.name && o.succeeded())
371                .map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 300)))
372                .collect();
373
374            let critique_prompt = format!(
375                "{}\n\nOriginal task: {}\n\nOther agents' answers:\n{}\n\n\
376                 Critique these answers and provide your improved response.",
377                spec.system_prompt,
378                task,
379                others.join("\n")
380            );
381
382            let mut critique_spec = spec.clone();
383            critique_spec.name = format!("{}_critique", spec.name);
384            critique_spec.system_prompt = critique_prompt;
385            critique_specs.push(critique_spec);
386        }
387
388        let round2 = Swarm::new(critique_specs, SwarmMode::Parallel)
389            .run(task, runner, infra)
390            .await?;
391
392        // Combine both rounds
393        let mut all_outputs = round1.outputs;
394        all_outputs.extend(round2.outputs);
395
396        let summary = self.synthesize(task, &all_outputs, runner, infra).await;
397
398        Ok(SwarmResult {
399            task: task.to_string(),
400            outputs: all_outputs,
401            final_summary: summary,
402        })
403    }
404
405    async fn synthesize(
406        &self,
407        task: &str,
408        outputs: &[AgentOutput],
409        runner: &Arc<dyn AgentRunner>,
410        infra: &SharedInfra,
411    ) -> String {
412        let answers: Vec<&AgentOutput> = outputs.iter().filter(|o| o.succeeded()).collect();
413        if answers.is_empty() {
414            return "[no agent produced an answer]".to_string();
415        }
416        if answers.len() == 1 {
417            return answers[0].answer.clone();
418        }
419
420        if let Some(synth_spec) = &self.synthesizer {
421            let summaries: Vec<String> = answers
422                .iter()
423                .map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 500)))
424                .collect();
425
426            let synth_task = format!(
427                "Original task: {}\n\nAgent outputs:\n{}\n\nSynthesize these into a single coherent answer.",
428                task,
429                summaries.join("\n")
430            );
431
432            // Gate the synthesizer on the budget too; on denial fall through to
433            // the default concatenation rather than failing the whole run.
434            if infra.begin_agent().is_ok() {
435                let mailbox = Mailbox::default();
436                let rt = infra.make_runtime();
437                if let Ok(output) = runner.run(synth_spec, &synth_task, &rt, &mailbox).await {
438                    infra.record_output(&output);
439                    return output.answer;
440                }
441            }
442        }
443
444        // Default: concatenate with headers
445        answers
446            .iter()
447            .map(|o| format!("## {}\n{}", o.name, o.answer))
448            .collect::<Vec<_>>()
449            .join("\n\n")
450    }
451}
452
453fn truncate(s: &str, max_len: usize) -> &str {
454    if s.len() <= max_len {
455        return s;
456    }
457    let mut end = max_len;
458    while end > 0 && !s.is_char_boundary(end) {
459        end -= 1;
460    }
461    &s[..end]
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467    use crate::error::MultiError;
468    use crate::mailbox::Mailbox;
469    use crate::runner::AgentRunner;
470    use crate::types::{AgentOutput, AgentSpec};
471    use car_engine::Runtime;
472    use std::sync::atomic::{AtomicU32, Ordering};
473
474    struct MockRunner {
475        call_count: AtomicU32,
476    }
477
478    #[async_trait::async_trait]
479    impl AgentRunner for MockRunner {
480        async fn run(
481            &self,
482            spec: &AgentSpec,
483            task: &str,
484            _runtime: &Runtime,
485            _mailbox: &Mailbox,
486        ) -> Result<AgentOutput, MultiError> {
487            let _n = self.call_count.fetch_add(1, Ordering::SeqCst);
488            Ok(AgentOutput {
489                name: spec.name.clone(),
490                answer: format!(
491                    "answer from {} for: {}",
492                    spec.name,
493                    &task[..task.len().min(50)]
494                ),
495                turns: 1,
496                tool_calls: 0,
497                duration_ms: 10.0,
498                error: None,
499                outcome: None,
500                tokens: None,
501            })
502        }
503    }
504
505    #[tokio::test]
506    async fn test_parallel_swarm() {
507        let agents = vec![
508            AgentSpec::new("alice", "You are Alice"),
509            AgentSpec::new("bob", "You are Bob"),
510        ];
511        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
512            call_count: AtomicU32::new(0),
513        });
514        let infra = SharedInfra::new();
515
516        let result = Swarm::new(agents, SwarmMode::Parallel)
517            .run("test task", &runner, &infra)
518            .await
519            .unwrap();
520
521        assert_eq!(result.outputs.len(), 2);
522        assert!(result.outputs.iter().all(|o| o.succeeded()));
523
524        // Check shared state was written
525        assert!(infra.state.get("agent.alice.answer").is_some());
526        assert!(infra.state.get("agent.bob.answer").is_some());
527    }
528
529    #[tokio::test]
530    async fn test_sequential_swarm() {
531        let agents = vec![
532            AgentSpec::new("first", "Go first"),
533            AgentSpec::new("second", "Go second"),
534        ];
535        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
536            call_count: AtomicU32::new(0),
537        });
538        let infra = SharedInfra::new();
539
540        let result = Swarm::new(agents, SwarmMode::Sequential)
541            .run("sequential task", &runner, &infra)
542            .await
543            .unwrap();
544
545        assert_eq!(result.outputs.len(), 2);
546        // Second agent should see first agent's output in enriched task
547        assert!(result.outputs[1].answer.contains("Prior agents"));
548    }
549
550    /// Reports a fixed token spend per call so a budget can meter it.
551    struct TokenRunner {
552        per_call_total: u64,
553    }
554
555    #[async_trait::async_trait]
556    impl AgentRunner for TokenRunner {
557        async fn run(
558            &self,
559            spec: &AgentSpec,
560            _task: &str,
561            _runtime: &Runtime,
562            _mailbox: &Mailbox,
563        ) -> Result<AgentOutput, MultiError> {
564            Ok(AgentOutput {
565                name: spec.name.clone(),
566                answer: format!("answer from {}", spec.name),
567                turns: 1,
568                tool_calls: 0,
569                duration_ms: 1.0,
570                error: None,
571                outcome: None,
572                tokens: Some(crate::types::TokenAccounting::new(
573                    self.per_call_total,
574                    0,
575                    0.0,
576                )),
577            })
578        }
579    }
580
581    #[tokio::test]
582    async fn sequential_budget_stops_chain_when_tokens_exhausted() {
583        // Three agents, each reporting 100 tokens; a 150-token ceiling lets the
584        // first two run (cumulative 200 crosses 150 only after the second) and
585        // denies the third.
586        let agents = vec![
587            AgentSpec::new("a", ""),
588            AgentSpec::new("b", ""),
589            AgentSpec::new("c", ""),
590        ];
591        let runner: Arc<dyn AgentRunner> = Arc::new(TokenRunner { per_call_total: 100 });
592        let infra = SharedInfra::new().with_budget(crate::BudgetLimits {
593            max_total_tokens: Some(150),
594            ..Default::default()
595        });
596
597        let result = Swarm::new(agents, SwarmMode::Sequential)
598            .run("task", &runner, &infra)
599            .await
600            .unwrap();
601
602        assert_eq!(result.outputs.len(), 3);
603        assert!(result.outputs[0].succeeded());
604        assert!(result.outputs[1].succeeded());
605        assert!(!result.outputs[2].succeeded());
606        assert!(crate::is_budget_skipped(&result.outputs[2]));
607        assert_eq!(infra.budget.snapshot().total_tokens, 200);
608    }
609
610    /// Records the `workspace` metadata each agent was handed.
611    struct WorkspaceProbeRunner {
612        seen: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
613    }
614
615    #[async_trait::async_trait]
616    impl AgentRunner for WorkspaceProbeRunner {
617        async fn run(
618            &self,
619            spec: &AgentSpec,
620            _task: &str,
621            _runtime: &Runtime,
622            _mailbox: &Mailbox,
623        ) -> Result<AgentOutput, MultiError> {
624            let ws = spec
625                .metadata
626                .get(crate::workspace::WORKSPACE_METADATA_KEY)
627                .and_then(|v| v.as_str())
628                .unwrap_or("")
629                .to_string();
630            self.seen.lock().unwrap().push(ws.clone());
631            // The directory must exist while the agent runs.
632            assert!(!ws.is_empty() && std::path::Path::new(&ws).is_dir());
633            Ok(AgentOutput {
634                name: spec.name.clone(),
635                answer: "ok".into(),
636                turns: 1,
637                tool_calls: 0,
638                duration_ms: 1.0,
639                error: None,
640                outcome: None,
641                tokens: None,
642            })
643        }
644    }
645
646    #[tokio::test]
647    async fn parallel_workspaces_are_provisioned_and_distinct() {
648        let base = std::env::temp_dir().join(format!("car-swarm-ws-{}", std::process::id()));
649        let seen = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
650        let runner: Arc<dyn AgentRunner> = Arc::new(WorkspaceProbeRunner { seen: seen.clone() });
651        let infra = SharedInfra::new();
652
653        let agents = vec![AgentSpec::new("alice", ""), AgentSpec::new("bob", "")];
654        let result = Swarm::new(agents, SwarmMode::Parallel)
655            .with_workspaces(crate::workspace::WorkspaceConfig::directory(&base))
656            .run("task", &runner, &infra)
657            .await
658            .unwrap();
659
660        assert_eq!(result.outputs.len(), 2);
661        assert!(result.outputs.iter().all(|o| o.succeeded()));
662        let paths = seen.lock().unwrap().clone();
663        assert_eq!(paths.len(), 2);
664        assert_ne!(paths[0], paths[1], "each agent gets a distinct workspace");
665        // Cleaned up after the run.
666        for p in &paths {
667            assert!(!std::path::Path::new(p).exists(), "workspace removed on drop");
668        }
669        let _ = std::fs::remove_dir_all(&base);
670    }
671
672    #[tokio::test]
673    async fn parallel_budget_agent_cap_skips_excess() {
674        // Five agents, cap of 2: exactly two run, three are skipped.
675        let agents: Vec<AgentSpec> = (0..5)
676            .map(|i| AgentSpec::new(&format!("a{}", i), ""))
677            .collect();
678        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
679            call_count: AtomicU32::new(0),
680        });
681        let infra = SharedInfra::new().with_budget(crate::BudgetLimits {
682            max_agents: Some(2),
683            ..Default::default()
684        });
685
686        let result = Swarm::new(agents, SwarmMode::Parallel)
687            .run("task", &runner, &infra)
688            .await
689            .unwrap();
690
691        assert_eq!(result.outputs.len(), 5);
692        let ran = result.outputs.iter().filter(|o| o.succeeded()).count();
693        let skipped = result
694            .outputs
695            .iter()
696            .filter(|o| crate::is_budget_skipped(o))
697            .count();
698        assert_eq!(ran, 2);
699        assert_eq!(skipped, 3);
700    }
701
702    #[tokio::test]
703    async fn test_debate_swarm() {
704        let agents = vec![
705            AgentSpec::new("debater_a", "Argue for"),
706            AgentSpec::new("debater_b", "Argue against"),
707        ];
708        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
709            call_count: AtomicU32::new(0),
710        });
711        let infra = SharedInfra::new();
712
713        let result = Swarm::new(agents, SwarmMode::Debate)
714            .run("debate topic", &runner, &infra)
715            .await
716            .unwrap();
717
718        // 2 agents x 2 rounds = 4 outputs
719        assert_eq!(result.outputs.len(), 4);
720    }
721}