car-multi 0.7.0

Multi-agent coordination patterns for Common Agent Runtime
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
//! Swarm — N agents working on the same problem.
//!
//! Modes:
//! - **Parallel**: all agents run concurrently, then a synthesizer combines results.
//! - **Sequential**: agents run one after another, each seeing prior agents' outputs.
//! - **Debate**: two rounds — initial answers, then critique, then a judge picks the best.

use crate::error::MultiError;
use crate::mailbox::Mailbox;
use crate::runner::AgentRunner;
use crate::shared::SharedInfra;
use crate::types::{AgentOutput, AgentSpec};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Instant;
use tracing::instrument;

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum SwarmMode {
    Parallel,
    Sequential,
    Debate,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmResult {
    pub task: String,
    pub outputs: Vec<AgentOutput>,
    pub final_summary: String,
}

pub struct Swarm {
    pub agents: Vec<AgentSpec>,
    pub mode: SwarmMode,
    pub synthesizer: Option<AgentSpec>,
    /// When true, each agent gets an isolated state overlay.
    /// Writes go to a per-agent local store; reads fall through to the shared parent.
    /// On success, local state is merged back to the parent.
    pub isolated: bool,
}

impl Swarm {
    pub fn new(agents: Vec<AgentSpec>, mode: SwarmMode) -> Self {
        Self {
            agents,
            mode,
            synthesizer: None,
            isolated: false,
        }
    }

    pub fn with_synthesizer(mut self, spec: AgentSpec) -> Self {
        self.synthesizer = Some(spec);
        self
    }

    /// Enable per-agent state isolation for this swarm.
    pub fn with_isolation(mut self) -> Self {
        self.isolated = true;
        self
    }

    #[instrument(name = "multi.swarm", skip_all)]
    pub fn run<'a>(
        &'a self,
        task: &'a str,
        runner: &'a Arc<dyn AgentRunner>,
        infra: &'a SharedInfra,
    ) -> futures::future::BoxFuture<'a, Result<SwarmResult, MultiError>> {
        Box::pin(async move {
            match self.mode {
                SwarmMode::Parallel => self.run_parallel(task, runner, infra).await,
                SwarmMode::Sequential => self.run_sequential(task, runner, infra).await,
                SwarmMode::Debate => self.run_debate(task, runner, infra).await,
            }
        })
    }

    async fn run_parallel(
        &self,
        task: &str,
        runner: &Arc<dyn AgentRunner>,
        infra: &SharedInfra,
    ) -> Result<SwarmResult, MultiError> {
        let mailbox = Arc::new(Mailbox::default());

        // When isolated, each handle returns (Result, Option<AgentContext>) so we
        // can merge state back on success.  When not isolated, the context is None.
        let mut handles: Vec<
            tokio::task::JoinHandle<(
                Result<AgentOutput, MultiError>,
                Option<crate::task_context::AgentContext>,
            )>,
        > = Vec::new();

        for spec in &self.agents {
            let runner = Arc::clone(runner);
            let spec = spec.clone();
            let task = task.to_string();
            let mailbox = Arc::clone(&mailbox);

            if self.isolated {
                let (rt, ctx) = infra.make_isolated_runtime(&spec.name);
                for tool in &spec.tools {
                    rt.register_tool(tool).await;
                }
                let ctx_clone = ctx.clone();
                handles.push(tokio::spawn(async move {
                    let result = crate::task_context::TaskScope::run(ctx_clone, async {
                        runner.run(&spec, &task, &rt, &mailbox).await
                    })
                    .await;
                    (result, Some(ctx))
                }));
            } else {
                let rt = infra.make_runtime();
                for tool in &spec.tools {
                    rt.register_tool(tool).await;
                }
                handles.push(tokio::spawn(async move {
                    let result = runner.run(&spec, &task, &rt, &mailbox).await;
                    (result, None)
                }));
            }
        }

        let results = futures::future::join_all(handles).await;
        let mut outputs = Vec::new();
        for (i, result) in results.into_iter().enumerate() {
            match result {
                Ok((Ok(output), ctx)) => {
                    // Merge isolated state back to parent on success
                    if let Some(ctx) = ctx {
                        ctx.merge_to_parent();
                    }
                    // Write to shared state
                    infra.state.set(
                        &format!("agent.{}.answer", output.name),
                        serde_json::Value::String(output.answer.clone()),
                        &format!("swarm.{}", output.name),
                    );
                    outputs.push(output);
                }
                Ok((Err(e), _ctx)) => {
                    outputs.push(AgentOutput {
                        name: self.agents[i].name.clone(),
                        answer: String::new(),
                        turns: 0,
                        tool_calls: 0,
                        duration_ms: 0.0,
                        error: Some(e.to_string()),
                        outcome: None,
                        tokens: None,
                    });
                }
                Err(e) => {
                    outputs.push(AgentOutput {
                        name: self.agents[i].name.clone(),
                        answer: String::new(),
                        turns: 0,
                        tool_calls: 0,
                        duration_ms: 0.0,
                        error: Some(format!("join error: {}", e)),
                        outcome: None,
                        tokens: None,
                    });
                }
            }
        }

        let summary = self.synthesize(task, &outputs, runner, infra).await;

        Ok(SwarmResult {
            task: task.to_string(),
            outputs,
            final_summary: summary,
        })
    }

    async fn run_sequential(
        &self,
        task: &str,
        runner: &Arc<dyn AgentRunner>,
        infra: &SharedInfra,
    ) -> Result<SwarmResult, MultiError> {
        let mailbox = Arc::new(Mailbox::default());
        let mut outputs = Vec::new();

        for spec in &self.agents {
            // Enrich task with prior results
            let enriched = if outputs.is_empty() {
                task.to_string()
            } else {
                let prior: Vec<String> = outputs
                    .iter()
                    .filter_map(|o: &AgentOutput| {
                        if o.succeeded() {
                            Some(format!("- {}: {}", o.name, truncate(&o.answer, 300)))
                        } else {
                            None
                        }
                    })
                    .collect();
                format!("{}\n\nPrior agents' findings:\n{}", task, prior.join("\n"))
            };

            let rt = infra.make_runtime();
            for tool in &spec.tools {
                rt.register_tool(tool).await;
            }

            let start = Instant::now();
            match runner.run(spec, &enriched, &rt, &mailbox).await {
                Ok(output) => {
                    infra.state.set(
                        &format!("agent.{}.answer", output.name),
                        serde_json::Value::String(output.answer.clone()),
                        &format!("swarm.{}", output.name),
                    );
                    outputs.push(output);
                }
                Err(e) => {
                    outputs.push(AgentOutput {
                        name: spec.name.clone(),
                        answer: String::new(),
                        turns: 0,
                        tool_calls: 0,
                        duration_ms: start.elapsed().as_secs_f64() * 1000.0,
                        error: Some(e.to_string()),
                        outcome: None,
                        tokens: None,
                    });
                }
            }
        }

        let summary = self.synthesize(task, &outputs, runner, infra).await;

        Ok(SwarmResult {
            task: task.to_string(),
            outputs,
            final_summary: summary,
        })
    }

    async fn run_debate(
        &self,
        task: &str,
        runner: &Arc<dyn AgentRunner>,
        infra: &SharedInfra,
    ) -> Result<SwarmResult, MultiError> {
        // Round 1: independent answers
        let round1 = Swarm::new(self.agents.clone(), SwarmMode::Parallel)
            .run(task, runner, infra)
            .await?;

        // Round 2: each agent critiques the others
        let mut critique_specs = Vec::new();
        for spec in &self.agents {
            let others: Vec<String> = round1
                .outputs
                .iter()
                .filter(|o| o.name != spec.name && o.succeeded())
                .map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 300)))
                .collect();

            let critique_prompt = format!(
                "{}\n\nOriginal task: {}\n\nOther agents' answers:\n{}\n\n\
                 Critique these answers and provide your improved response.",
                spec.system_prompt,
                task,
                others.join("\n")
            );

            let mut critique_spec = spec.clone();
            critique_spec.name = format!("{}_critique", spec.name);
            critique_spec.system_prompt = critique_prompt;
            critique_specs.push(critique_spec);
        }

        let round2 = Swarm::new(critique_specs, SwarmMode::Parallel)
            .run(task, runner, infra)
            .await?;

        // Combine both rounds
        let mut all_outputs = round1.outputs;
        all_outputs.extend(round2.outputs);

        let summary = self.synthesize(task, &all_outputs, runner, infra).await;

        Ok(SwarmResult {
            task: task.to_string(),
            outputs: all_outputs,
            final_summary: summary,
        })
    }

    async fn synthesize(
        &self,
        task: &str,
        outputs: &[AgentOutput],
        runner: &Arc<dyn AgentRunner>,
        infra: &SharedInfra,
    ) -> String {
        let answers: Vec<&AgentOutput> = outputs.iter().filter(|o| o.succeeded()).collect();
        if answers.is_empty() {
            return "[no agent produced an answer]".to_string();
        }
        if answers.len() == 1 {
            return answers[0].answer.clone();
        }

        if let Some(synth_spec) = &self.synthesizer {
            let summaries: Vec<String> = answers
                .iter()
                .map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 500)))
                .collect();

            let synth_task = format!(
                "Original task: {}\n\nAgent outputs:\n{}\n\nSynthesize these into a single coherent answer.",
                task,
                summaries.join("\n")
            );

            let mailbox = Mailbox::default();
            let rt = infra.make_runtime();
            if let Ok(output) = runner.run(synth_spec, &synth_task, &rt, &mailbox).await {
                return output.answer;
            }
        }

        // Default: concatenate with headers
        answers
            .iter()
            .map(|o| format!("## {}\n{}", o.name, o.answer))
            .collect::<Vec<_>>()
            .join("\n\n")
    }
}

fn truncate(s: &str, max_len: usize) -> &str {
    if s.len() <= max_len {
        return s;
    }
    let mut end = max_len;
    while end > 0 && !s.is_char_boundary(end) {
        end -= 1;
    }
    &s[..end]
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::error::MultiError;
    use crate::mailbox::Mailbox;
    use crate::runner::AgentRunner;
    use crate::types::{AgentOutput, AgentSpec};
    use car_engine::Runtime;
    use std::sync::atomic::{AtomicU32, Ordering};

    struct MockRunner {
        call_count: AtomicU32,
    }

    #[async_trait::async_trait]
    impl AgentRunner for MockRunner {
        async fn run(
            &self,
            spec: &AgentSpec,
            task: &str,
            _runtime: &Runtime,
            _mailbox: &Mailbox,
        ) -> Result<AgentOutput, MultiError> {
            let _n = self.call_count.fetch_add(1, Ordering::SeqCst);
            Ok(AgentOutput {
                name: spec.name.clone(),
                answer: format!("answer from {} for: {}", spec.name, &task[..task.len().min(50)]),
                turns: 1,
                tool_calls: 0,
                duration_ms: 10.0,
                error: None,
                outcome: None,
                tokens: None,
            })
        }
    }

    #[tokio::test]
    async fn test_parallel_swarm() {
        let agents = vec![
            AgentSpec::new("alice", "You are Alice"),
            AgentSpec::new("bob", "You are Bob"),
        ];
        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
            call_count: AtomicU32::new(0),
        });
        let infra = SharedInfra::new();

        let result = Swarm::new(agents, SwarmMode::Parallel)
            .run("test task", &runner, &infra)
            .await
            .unwrap();

        assert_eq!(result.outputs.len(), 2);
        assert!(result.outputs.iter().all(|o| o.succeeded()));

        // Check shared state was written
        assert!(infra.state.get("agent.alice.answer").is_some());
        assert!(infra.state.get("agent.bob.answer").is_some());
    }

    #[tokio::test]
    async fn test_sequential_swarm() {
        let agents = vec![
            AgentSpec::new("first", "Go first"),
            AgentSpec::new("second", "Go second"),
        ];
        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
            call_count: AtomicU32::new(0),
        });
        let infra = SharedInfra::new();

        let result = Swarm::new(agents, SwarmMode::Sequential)
            .run("sequential task", &runner, &infra)
            .await
            .unwrap();

        assert_eq!(result.outputs.len(), 2);
        // Second agent should see first agent's output in enriched task
        assert!(result.outputs[1].answer.contains("Prior agents"));
    }

    #[tokio::test]
    async fn test_debate_swarm() {
        let agents = vec![
            AgentSpec::new("debater_a", "Argue for"),
            AgentSpec::new("debater_b", "Argue against"),
        ];
        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
            call_count: AtomicU32::new(0),
        });
        let infra = SharedInfra::new();

        let result = Swarm::new(agents, SwarmMode::Debate)
            .run("debate topic", &runner, &infra)
            .await
            .unwrap();

        // 2 agents x 2 rounds = 4 outputs
        assert_eq!(result.outputs.len(), 4);
    }
}