Skip to main content

car_multi/patterns/
swarm.rs

1//! Swarm — N agents working on the same problem.
2//!
3//! Modes:
4//! - **Parallel**: all agents run concurrently, then a synthesizer combines results.
5//! - **Sequential**: agents run one after another, each seeing prior agents' outputs.
6//! - **Debate**: two rounds — initial answers, then critique, then a judge picks the best.
7
8use crate::error::MultiError;
9use crate::mailbox::Mailbox;
10use crate::runner::AgentRunner;
11use crate::shared::SharedInfra;
12use crate::types::{AgentOutput, AgentSpec};
13use serde::{Deserialize, Serialize};
14use std::sync::Arc;
15use std::time::Instant;
16
17#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
18#[serde(rename_all = "snake_case")]
19pub enum SwarmMode {
20    Parallel,
21    Sequential,
22    Debate,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct SwarmResult {
27    pub task: String,
28    pub outputs: Vec<AgentOutput>,
29    pub final_summary: String,
30}
31
32pub struct Swarm {
33    pub agents: Vec<AgentSpec>,
34    pub mode: SwarmMode,
35    pub synthesizer: Option<AgentSpec>,
36}
37
38impl Swarm {
39    pub fn new(agents: Vec<AgentSpec>, mode: SwarmMode) -> Self {
40        Self {
41            agents,
42            mode,
43            synthesizer: None,
44        }
45    }
46
47    pub fn with_synthesizer(mut self, spec: AgentSpec) -> Self {
48        self.synthesizer = Some(spec);
49        self
50    }
51
52    pub fn run<'a>(
53        &'a self,
54        task: &'a str,
55        runner: &'a Arc<dyn AgentRunner>,
56        infra: &'a SharedInfra,
57    ) -> futures::future::BoxFuture<'a, Result<SwarmResult, MultiError>> {
58        Box::pin(async move {
59            match self.mode {
60                SwarmMode::Parallel => self.run_parallel(task, runner, infra).await,
61                SwarmMode::Sequential => self.run_sequential(task, runner, infra).await,
62                SwarmMode::Debate => self.run_debate(task, runner, infra).await,
63            }
64        })
65    }
66
67    async fn run_parallel(
68        &self,
69        task: &str,
70        runner: &Arc<dyn AgentRunner>,
71        infra: &SharedInfra,
72    ) -> Result<SwarmResult, MultiError> {
73        let mailbox = Arc::new(Mailbox::default());
74
75        let mut handles = Vec::new();
76        for spec in &self.agents {
77            let runner = Arc::clone(runner);
78            let rt = infra.make_runtime();
79            let spec = spec.clone();
80            let task = task.to_string();
81            let mailbox = Arc::clone(&mailbox);
82
83            // Register tools
84            for tool in &spec.tools {
85                rt.register_tool(tool).await;
86            }
87
88            handles.push(tokio::spawn(async move {
89                runner.run(&spec, &task, &rt, &mailbox).await
90            }));
91        }
92
93        let results = futures::future::join_all(handles).await;
94        let mut outputs = Vec::new();
95        for (i, result) in results.into_iter().enumerate() {
96            match result {
97                Ok(Ok(output)) => {
98                    // Write to shared state
99                    infra.state.set(
100                        &format!("agent.{}.answer", output.name),
101                        serde_json::Value::String(output.answer.clone()),
102                        &format!("swarm.{}", output.name),
103                    );
104                    outputs.push(output);
105                }
106                Ok(Err(e)) => {
107                    outputs.push(AgentOutput {
108                        name: self.agents[i].name.clone(),
109                        answer: String::new(),
110                        turns: 0,
111                        tool_calls: 0,
112                        duration_ms: 0.0,
113                        error: Some(e.to_string()),
114                    });
115                }
116                Err(e) => {
117                    outputs.push(AgentOutput {
118                        name: self.agents[i].name.clone(),
119                        answer: String::new(),
120                        turns: 0,
121                        tool_calls: 0,
122                        duration_ms: 0.0,
123                        error: Some(format!("join error: {}", e)),
124                    });
125                }
126            }
127        }
128
129        let summary = self.synthesize(task, &outputs, runner, infra).await;
130
131        Ok(SwarmResult {
132            task: task.to_string(),
133            outputs,
134            final_summary: summary,
135        })
136    }
137
138    async fn run_sequential(
139        &self,
140        task: &str,
141        runner: &Arc<dyn AgentRunner>,
142        infra: &SharedInfra,
143    ) -> Result<SwarmResult, MultiError> {
144        let mailbox = Arc::new(Mailbox::default());
145        let mut outputs = Vec::new();
146
147        for spec in &self.agents {
148            // Enrich task with prior results
149            let enriched = if outputs.is_empty() {
150                task.to_string()
151            } else {
152                let prior: Vec<String> = outputs
153                    .iter()
154                    .filter_map(|o: &AgentOutput| {
155                        if o.succeeded() {
156                            Some(format!("- {}: {}", o.name, truncate(&o.answer, 300)))
157                        } else {
158                            None
159                        }
160                    })
161                    .collect();
162                format!("{}\n\nPrior agents' findings:\n{}", task, prior.join("\n"))
163            };
164
165            let rt = infra.make_runtime();
166            for tool in &spec.tools {
167                rt.register_tool(tool).await;
168            }
169
170            let start = Instant::now();
171            match runner.run(spec, &enriched, &rt, &mailbox).await {
172                Ok(output) => {
173                    infra.state.set(
174                        &format!("agent.{}.answer", output.name),
175                        serde_json::Value::String(output.answer.clone()),
176                        &format!("swarm.{}", output.name),
177                    );
178                    outputs.push(output);
179                }
180                Err(e) => {
181                    outputs.push(AgentOutput {
182                        name: spec.name.clone(),
183                        answer: String::new(),
184                        turns: 0,
185                        tool_calls: 0,
186                        duration_ms: start.elapsed().as_secs_f64() * 1000.0,
187                        error: Some(e.to_string()),
188                    });
189                }
190            }
191        }
192
193        let summary = self.synthesize(task, &outputs, runner, infra).await;
194
195        Ok(SwarmResult {
196            task: task.to_string(),
197            outputs,
198            final_summary: summary,
199        })
200    }
201
202    async fn run_debate(
203        &self,
204        task: &str,
205        runner: &Arc<dyn AgentRunner>,
206        infra: &SharedInfra,
207    ) -> Result<SwarmResult, MultiError> {
208        // Round 1: independent answers
209        let round1 = Swarm::new(self.agents.clone(), SwarmMode::Parallel)
210            .run(task, runner, infra)
211            .await?;
212
213        // Round 2: each agent critiques the others
214        let mut critique_specs = Vec::new();
215        for spec in &self.agents {
216            let others: Vec<String> = round1
217                .outputs
218                .iter()
219                .filter(|o| o.name != spec.name && o.succeeded())
220                .map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 300)))
221                .collect();
222
223            let critique_prompt = format!(
224                "{}\n\nOriginal task: {}\n\nOther agents' answers:\n{}\n\n\
225                 Critique these answers and provide your improved response.",
226                spec.system_prompt,
227                task,
228                others.join("\n")
229            );
230
231            let mut critique_spec = spec.clone();
232            critique_spec.name = format!("{}_critique", spec.name);
233            critique_spec.system_prompt = critique_prompt;
234            critique_specs.push(critique_spec);
235        }
236
237        let round2 = Swarm::new(critique_specs, SwarmMode::Parallel)
238            .run(task, runner, infra)
239            .await?;
240
241        // Combine both rounds
242        let mut all_outputs = round1.outputs;
243        all_outputs.extend(round2.outputs);
244
245        let summary = self.synthesize(task, &all_outputs, runner, infra).await;
246
247        Ok(SwarmResult {
248            task: task.to_string(),
249            outputs: all_outputs,
250            final_summary: summary,
251        })
252    }
253
254    async fn synthesize(
255        &self,
256        task: &str,
257        outputs: &[AgentOutput],
258        runner: &Arc<dyn AgentRunner>,
259        infra: &SharedInfra,
260    ) -> String {
261        let answers: Vec<&AgentOutput> = outputs.iter().filter(|o| o.succeeded()).collect();
262        if answers.is_empty() {
263            return "[no agent produced an answer]".to_string();
264        }
265        if answers.len() == 1 {
266            return answers[0].answer.clone();
267        }
268
269        if let Some(synth_spec) = &self.synthesizer {
270            let summaries: Vec<String> = answers
271                .iter()
272                .map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 500)))
273                .collect();
274
275            let synth_task = format!(
276                "Original task: {}\n\nAgent outputs:\n{}\n\nSynthesize these into a single coherent answer.",
277                task,
278                summaries.join("\n")
279            );
280
281            let mailbox = Mailbox::default();
282            let rt = infra.make_runtime();
283            if let Ok(output) = runner.run(synth_spec, &synth_task, &rt, &mailbox).await {
284                return output.answer;
285            }
286        }
287
288        // Default: concatenate with headers
289        answers
290            .iter()
291            .map(|o| format!("## {}\n{}", o.name, o.answer))
292            .collect::<Vec<_>>()
293            .join("\n\n")
294    }
295}
296
297fn truncate(s: &str, max_len: usize) -> &str {
298    if s.len() <= max_len {
299        return s;
300    }
301    let mut end = max_len;
302    while end > 0 && !s.is_char_boundary(end) {
303        end -= 1;
304    }
305    &s[..end]
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311    use crate::error::MultiError;
312    use crate::mailbox::Mailbox;
313    use crate::runner::AgentRunner;
314    use crate::types::{AgentOutput, AgentSpec};
315    use car_engine::Runtime;
316    use std::sync::atomic::{AtomicU32, Ordering};
317
318    struct MockRunner {
319        call_count: AtomicU32,
320    }
321
322    #[async_trait::async_trait]
323    impl AgentRunner for MockRunner {
324        async fn run(
325            &self,
326            spec: &AgentSpec,
327            task: &str,
328            _runtime: &Runtime,
329            _mailbox: &Mailbox,
330        ) -> Result<AgentOutput, MultiError> {
331            let _n = self.call_count.fetch_add(1, Ordering::SeqCst);
332            Ok(AgentOutput {
333                name: spec.name.clone(),
334                answer: format!("answer from {} for: {}", spec.name, &task[..task.len().min(50)]),
335                turns: 1,
336                tool_calls: 0,
337                duration_ms: 10.0,
338                error: None,
339            })
340        }
341    }
342
343    #[tokio::test]
344    async fn test_parallel_swarm() {
345        let agents = vec![
346            AgentSpec::new("alice", "You are Alice"),
347            AgentSpec::new("bob", "You are Bob"),
348        ];
349        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
350            call_count: AtomicU32::new(0),
351        });
352        let infra = SharedInfra::new();
353
354        let result = Swarm::new(agents, SwarmMode::Parallel)
355            .run("test task", &runner, &infra)
356            .await
357            .unwrap();
358
359        assert_eq!(result.outputs.len(), 2);
360        assert!(result.outputs.iter().all(|o| o.succeeded()));
361
362        // Check shared state was written
363        assert!(infra.state.get("agent.alice.answer").is_some());
364        assert!(infra.state.get("agent.bob.answer").is_some());
365    }
366
367    #[tokio::test]
368    async fn test_sequential_swarm() {
369        let agents = vec![
370            AgentSpec::new("first", "Go first"),
371            AgentSpec::new("second", "Go second"),
372        ];
373        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
374            call_count: AtomicU32::new(0),
375        });
376        let infra = SharedInfra::new();
377
378        let result = Swarm::new(agents, SwarmMode::Sequential)
379            .run("sequential task", &runner, &infra)
380            .await
381            .unwrap();
382
383        assert_eq!(result.outputs.len(), 2);
384        // Second agent should see first agent's output in enriched task
385        assert!(result.outputs[1].answer.contains("Prior agents"));
386    }
387
388    #[tokio::test]
389    async fn test_debate_swarm() {
390        let agents = vec![
391            AgentSpec::new("debater_a", "Argue for"),
392            AgentSpec::new("debater_b", "Argue against"),
393        ];
394        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
395            call_count: AtomicU32::new(0),
396        });
397        let infra = SharedInfra::new();
398
399        let result = Swarm::new(agents, SwarmMode::Debate)
400            .run("debate topic", &runner, &infra)
401            .await
402            .unwrap();
403
404        // 2 agents x 2 rounds = 4 outputs
405        assert_eq!(result.outputs.len(), 4);
406    }
407}