Skip to main content

car_multi/patterns/
swarm.rs

1//! Swarm — N agents working on the same problem.
2//!
3//! Modes:
4//! - **Parallel**: all agents run concurrently, then a synthesizer combines results.
5//! - **Sequential**: agents run one after another, each seeing prior agents' outputs.
6//! - **Debate**: two rounds — initial answers, then critique, then a judge picks the best.
7
8use crate::error::MultiError;
9use crate::mailbox::Mailbox;
10use crate::runner::AgentRunner;
11use crate::shared::SharedInfra;
12use crate::types::{AgentOutput, AgentSpec};
13use serde::{Deserialize, Serialize};
14use std::sync::Arc;
15use std::time::Instant;
16use tracing::instrument;
17
18#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
19#[serde(rename_all = "snake_case")]
20pub enum SwarmMode {
21    Parallel,
22    Sequential,
23    Debate,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct SwarmResult {
28    pub task: String,
29    pub outputs: Vec<AgentOutput>,
30    pub final_summary: String,
31}
32
33pub struct Swarm {
34    pub agents: Vec<AgentSpec>,
35    pub mode: SwarmMode,
36    pub synthesizer: Option<AgentSpec>,
37    /// When true, each agent gets an isolated state overlay.
38    /// Writes go to a per-agent local store; reads fall through to the shared parent.
39    /// On success, local state is merged back to the parent.
40    pub isolated: bool,
41}
42
43impl Swarm {
44    pub fn new(agents: Vec<AgentSpec>, mode: SwarmMode) -> Self {
45        Self {
46            agents,
47            mode,
48            synthesizer: None,
49            isolated: false,
50        }
51    }
52
53    pub fn with_synthesizer(mut self, spec: AgentSpec) -> Self {
54        self.synthesizer = Some(spec);
55        self
56    }
57
58    /// Enable per-agent state isolation for this swarm.
59    pub fn with_isolation(mut self) -> Self {
60        self.isolated = true;
61        self
62    }
63
64    #[instrument(name = "multi.swarm", skip_all)]
65    pub fn run<'a>(
66        &'a self,
67        task: &'a str,
68        runner: &'a Arc<dyn AgentRunner>,
69        infra: &'a SharedInfra,
70    ) -> futures::future::BoxFuture<'a, Result<SwarmResult, MultiError>> {
71        Box::pin(async move {
72            match self.mode {
73                SwarmMode::Parallel => self.run_parallel(task, runner, infra).await,
74                SwarmMode::Sequential => self.run_sequential(task, runner, infra).await,
75                SwarmMode::Debate => self.run_debate(task, runner, infra).await,
76            }
77        })
78    }
79
80    async fn run_parallel(
81        &self,
82        task: &str,
83        runner: &Arc<dyn AgentRunner>,
84        infra: &SharedInfra,
85    ) -> Result<SwarmResult, MultiError> {
86        let mailbox = Arc::new(Mailbox::default());
87
88        // When isolated, each handle returns (Result, Option<AgentContext>) so we
89        // can merge state back on success.  When not isolated, the context is None.
90        let mut handles: Vec<
91            tokio::task::JoinHandle<(
92                Result<AgentOutput, MultiError>,
93                Option<crate::task_context::AgentContext>,
94            )>,
95        > = Vec::new();
96
97        for spec in &self.agents {
98            let runner = Arc::clone(runner);
99            let spec = spec.clone();
100            let task = task.to_string();
101            let mailbox = Arc::clone(&mailbox);
102
103            if self.isolated {
104                let (rt, ctx) = infra.make_isolated_runtime(&spec.name);
105                for tool in &spec.tools {
106                    rt.register_tool(tool).await;
107                }
108                let ctx_clone = ctx.clone();
109                handles.push(tokio::spawn(async move {
110                    let result = crate::task_context::TaskScope::run(ctx_clone, async {
111                        runner.run(&spec, &task, &rt, &mailbox).await
112                    })
113                    .await;
114                    (result, Some(ctx))
115                }));
116            } else {
117                let rt = infra.make_runtime();
118                for tool in &spec.tools {
119                    rt.register_tool(tool).await;
120                }
121                handles.push(tokio::spawn(async move {
122                    let result = runner.run(&spec, &task, &rt, &mailbox).await;
123                    (result, None)
124                }));
125            }
126        }
127
128        let results = futures::future::join_all(handles).await;
129        let mut outputs = Vec::new();
130        for (i, result) in results.into_iter().enumerate() {
131            match result {
132                Ok((Ok(output), ctx)) => {
133                    // Merge isolated state back to parent on success
134                    if let Some(ctx) = ctx {
135                        ctx.merge_to_parent();
136                    }
137                    // Write to shared state
138                    infra.state.set(
139                        &format!("agent.{}.answer", output.name),
140                        serde_json::Value::String(output.answer.clone()),
141                        &format!("swarm.{}", output.name),
142                    );
143                    outputs.push(output);
144                }
145                Ok((Err(e), _ctx)) => {
146                    outputs.push(AgentOutput {
147                        name: self.agents[i].name.clone(),
148                        answer: String::new(),
149                        turns: 0,
150                        tool_calls: 0,
151                        duration_ms: 0.0,
152                        error: Some(e.to_string()),
153                        outcome: None,
154                        tokens: None,
155                    });
156                }
157                Err(e) => {
158                    outputs.push(AgentOutput {
159                        name: self.agents[i].name.clone(),
160                        answer: String::new(),
161                        turns: 0,
162                        tool_calls: 0,
163                        duration_ms: 0.0,
164                        error: Some(format!("join error: {}", e)),
165                        outcome: None,
166                        tokens: None,
167                    });
168                }
169            }
170        }
171
172        let summary = self.synthesize(task, &outputs, runner, infra).await;
173
174        Ok(SwarmResult {
175            task: task.to_string(),
176            outputs,
177            final_summary: summary,
178        })
179    }
180
181    async fn run_sequential(
182        &self,
183        task: &str,
184        runner: &Arc<dyn AgentRunner>,
185        infra: &SharedInfra,
186    ) -> Result<SwarmResult, MultiError> {
187        let mailbox = Arc::new(Mailbox::default());
188        let mut outputs = Vec::new();
189
190        for spec in &self.agents {
191            // Enrich task with prior results
192            let enriched = if outputs.is_empty() {
193                task.to_string()
194            } else {
195                let prior: Vec<String> = outputs
196                    .iter()
197                    .filter_map(|o: &AgentOutput| {
198                        if o.succeeded() {
199                            Some(format!("- {}: {}", o.name, truncate(&o.answer, 300)))
200                        } else {
201                            None
202                        }
203                    })
204                    .collect();
205                format!("{}\n\nPrior agents' findings:\n{}", task, prior.join("\n"))
206            };
207
208            let rt = infra.make_runtime();
209            for tool in &spec.tools {
210                rt.register_tool(tool).await;
211            }
212
213            let start = Instant::now();
214            match runner.run(spec, &enriched, &rt, &mailbox).await {
215                Ok(output) => {
216                    infra.state.set(
217                        &format!("agent.{}.answer", output.name),
218                        serde_json::Value::String(output.answer.clone()),
219                        &format!("swarm.{}", output.name),
220                    );
221                    outputs.push(output);
222                }
223                Err(e) => {
224                    outputs.push(AgentOutput {
225                        name: spec.name.clone(),
226                        answer: String::new(),
227                        turns: 0,
228                        tool_calls: 0,
229                        duration_ms: start.elapsed().as_secs_f64() * 1000.0,
230                        error: Some(e.to_string()),
231                        outcome: None,
232                        tokens: None,
233                    });
234                }
235            }
236        }
237
238        let summary = self.synthesize(task, &outputs, runner, infra).await;
239
240        Ok(SwarmResult {
241            task: task.to_string(),
242            outputs,
243            final_summary: summary,
244        })
245    }
246
247    async fn run_debate(
248        &self,
249        task: &str,
250        runner: &Arc<dyn AgentRunner>,
251        infra: &SharedInfra,
252    ) -> Result<SwarmResult, MultiError> {
253        // Round 1: independent answers
254        let round1 = Swarm::new(self.agents.clone(), SwarmMode::Parallel)
255            .run(task, runner, infra)
256            .await?;
257
258        // Round 2: each agent critiques the others
259        let mut critique_specs = Vec::new();
260        for spec in &self.agents {
261            let others: Vec<String> = round1
262                .outputs
263                .iter()
264                .filter(|o| o.name != spec.name && o.succeeded())
265                .map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 300)))
266                .collect();
267
268            let critique_prompt = format!(
269                "{}\n\nOriginal task: {}\n\nOther agents' answers:\n{}\n\n\
270                 Critique these answers and provide your improved response.",
271                spec.system_prompt,
272                task,
273                others.join("\n")
274            );
275
276            let mut critique_spec = spec.clone();
277            critique_spec.name = format!("{}_critique", spec.name);
278            critique_spec.system_prompt = critique_prompt;
279            critique_specs.push(critique_spec);
280        }
281
282        let round2 = Swarm::new(critique_specs, SwarmMode::Parallel)
283            .run(task, runner, infra)
284            .await?;
285
286        // Combine both rounds
287        let mut all_outputs = round1.outputs;
288        all_outputs.extend(round2.outputs);
289
290        let summary = self.synthesize(task, &all_outputs, runner, infra).await;
291
292        Ok(SwarmResult {
293            task: task.to_string(),
294            outputs: all_outputs,
295            final_summary: summary,
296        })
297    }
298
299    async fn synthesize(
300        &self,
301        task: &str,
302        outputs: &[AgentOutput],
303        runner: &Arc<dyn AgentRunner>,
304        infra: &SharedInfra,
305    ) -> String {
306        let answers: Vec<&AgentOutput> = outputs.iter().filter(|o| o.succeeded()).collect();
307        if answers.is_empty() {
308            return "[no agent produced an answer]".to_string();
309        }
310        if answers.len() == 1 {
311            return answers[0].answer.clone();
312        }
313
314        if let Some(synth_spec) = &self.synthesizer {
315            let summaries: Vec<String> = answers
316                .iter()
317                .map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 500)))
318                .collect();
319
320            let synth_task = format!(
321                "Original task: {}\n\nAgent outputs:\n{}\n\nSynthesize these into a single coherent answer.",
322                task,
323                summaries.join("\n")
324            );
325
326            let mailbox = Mailbox::default();
327            let rt = infra.make_runtime();
328            if let Ok(output) = runner.run(synth_spec, &synth_task, &rt, &mailbox).await {
329                return output.answer;
330            }
331        }
332
333        // Default: concatenate with headers
334        answers
335            .iter()
336            .map(|o| format!("## {}\n{}", o.name, o.answer))
337            .collect::<Vec<_>>()
338            .join("\n\n")
339    }
340}
341
342fn truncate(s: &str, max_len: usize) -> &str {
343    if s.len() <= max_len {
344        return s;
345    }
346    let mut end = max_len;
347    while end > 0 && !s.is_char_boundary(end) {
348        end -= 1;
349    }
350    &s[..end]
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356    use crate::error::MultiError;
357    use crate::mailbox::Mailbox;
358    use crate::runner::AgentRunner;
359    use crate::types::{AgentOutput, AgentSpec};
360    use car_engine::Runtime;
361    use std::sync::atomic::{AtomicU32, Ordering};
362
363    struct MockRunner {
364        call_count: AtomicU32,
365    }
366
367    #[async_trait::async_trait]
368    impl AgentRunner for MockRunner {
369        async fn run(
370            &self,
371            spec: &AgentSpec,
372            task: &str,
373            _runtime: &Runtime,
374            _mailbox: &Mailbox,
375        ) -> Result<AgentOutput, MultiError> {
376            let _n = self.call_count.fetch_add(1, Ordering::SeqCst);
377            Ok(AgentOutput {
378                name: spec.name.clone(),
379                answer: format!(
380                    "answer from {} for: {}",
381                    spec.name,
382                    &task[..task.len().min(50)]
383                ),
384                turns: 1,
385                tool_calls: 0,
386                duration_ms: 10.0,
387                error: None,
388                outcome: None,
389                tokens: None,
390            })
391        }
392    }
393
394    #[tokio::test]
395    async fn test_parallel_swarm() {
396        let agents = vec![
397            AgentSpec::new("alice", "You are Alice"),
398            AgentSpec::new("bob", "You are Bob"),
399        ];
400        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
401            call_count: AtomicU32::new(0),
402        });
403        let infra = SharedInfra::new();
404
405        let result = Swarm::new(agents, SwarmMode::Parallel)
406            .run("test task", &runner, &infra)
407            .await
408            .unwrap();
409
410        assert_eq!(result.outputs.len(), 2);
411        assert!(result.outputs.iter().all(|o| o.succeeded()));
412
413        // Check shared state was written
414        assert!(infra.state.get("agent.alice.answer").is_some());
415        assert!(infra.state.get("agent.bob.answer").is_some());
416    }
417
418    #[tokio::test]
419    async fn test_sequential_swarm() {
420        let agents = vec![
421            AgentSpec::new("first", "Go first"),
422            AgentSpec::new("second", "Go second"),
423        ];
424        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
425            call_count: AtomicU32::new(0),
426        });
427        let infra = SharedInfra::new();
428
429        let result = Swarm::new(agents, SwarmMode::Sequential)
430            .run("sequential task", &runner, &infra)
431            .await
432            .unwrap();
433
434        assert_eq!(result.outputs.len(), 2);
435        // Second agent should see first agent's output in enriched task
436        assert!(result.outputs[1].answer.contains("Prior agents"));
437    }
438
439    #[tokio::test]
440    async fn test_debate_swarm() {
441        let agents = vec![
442            AgentSpec::new("debater_a", "Argue for"),
443            AgentSpec::new("debater_b", "Argue against"),
444        ];
445        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
446            call_count: AtomicU32::new(0),
447        });
448        let infra = SharedInfra::new();
449
450        let result = Swarm::new(agents, SwarmMode::Debate)
451            .run("debate topic", &runner, &infra)
452            .await
453            .unwrap();
454
455        // 2 agents x 2 rounds = 4 outputs
456        assert_eq!(result.outputs.len(), 4);
457    }
458}