use crate::error::MultiError;
use crate::mailbox::Mailbox;
use crate::runner::AgentRunner;
use crate::shared::SharedInfra;
use crate::types::{AgentOutput, AgentSpec};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Instant;
use tracing::instrument;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum SwarmMode {
Parallel,
Sequential,
Debate,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmResult {
pub task: String,
pub outputs: Vec<AgentOutput>,
pub final_summary: String,
}
pub struct Swarm {
pub agents: Vec<AgentSpec>,
pub mode: SwarmMode,
pub synthesizer: Option<AgentSpec>,
pub isolated: bool,
}
impl Swarm {
pub fn new(agents: Vec<AgentSpec>, mode: SwarmMode) -> Self {
Self {
agents,
mode,
synthesizer: None,
isolated: false,
}
}
pub fn with_synthesizer(mut self, spec: AgentSpec) -> Self {
self.synthesizer = Some(spec);
self
}
pub fn with_isolation(mut self) -> Self {
self.isolated = true;
self
}
#[instrument(name = "multi.swarm", skip_all)]
pub fn run<'a>(
&'a self,
task: &'a str,
runner: &'a Arc<dyn AgentRunner>,
infra: &'a SharedInfra,
) -> futures::future::BoxFuture<'a, Result<SwarmResult, MultiError>> {
Box::pin(async move {
match self.mode {
SwarmMode::Parallel => self.run_parallel(task, runner, infra).await,
SwarmMode::Sequential => self.run_sequential(task, runner, infra).await,
SwarmMode::Debate => self.run_debate(task, runner, infra).await,
}
})
}
async fn run_parallel(
&self,
task: &str,
runner: &Arc<dyn AgentRunner>,
infra: &SharedInfra,
) -> Result<SwarmResult, MultiError> {
let mailbox = Arc::new(Mailbox::default());
let mut handles: Vec<
tokio::task::JoinHandle<(
Result<AgentOutput, MultiError>,
Option<crate::task_context::AgentContext>,
)>,
> = Vec::new();
for spec in &self.agents {
let runner = Arc::clone(runner);
let spec = spec.clone();
let task = task.to_string();
let mailbox = Arc::clone(&mailbox);
if self.isolated {
let (rt, ctx) = infra.make_isolated_runtime(&spec.name);
for tool in &spec.tools {
rt.register_tool(tool).await;
}
let ctx_clone = ctx.clone();
handles.push(tokio::spawn(async move {
let result = crate::task_context::TaskScope::run(ctx_clone, async {
runner.run(&spec, &task, &rt, &mailbox).await
})
.await;
(result, Some(ctx))
}));
} else {
let rt = infra.make_runtime();
for tool in &spec.tools {
rt.register_tool(tool).await;
}
handles.push(tokio::spawn(async move {
let result = runner.run(&spec, &task, &rt, &mailbox).await;
(result, None)
}));
}
}
let results = futures::future::join_all(handles).await;
let mut outputs = Vec::new();
for (i, result) in results.into_iter().enumerate() {
match result {
Ok((Ok(output), ctx)) => {
if let Some(ctx) = ctx {
ctx.merge_to_parent();
}
infra.state.set(
&format!("agent.{}.answer", output.name),
serde_json::Value::String(output.answer.clone()),
&format!("swarm.{}", output.name),
);
outputs.push(output);
}
Ok((Err(e), _ctx)) => {
outputs.push(AgentOutput {
name: self.agents[i].name.clone(),
answer: String::new(),
turns: 0,
tool_calls: 0,
duration_ms: 0.0,
error: Some(e.to_string()),
outcome: None,
tokens: None,
});
}
Err(e) => {
outputs.push(AgentOutput {
name: self.agents[i].name.clone(),
answer: String::new(),
turns: 0,
tool_calls: 0,
duration_ms: 0.0,
error: Some(format!("join error: {}", e)),
outcome: None,
tokens: None,
});
}
}
}
let summary = self.synthesize(task, &outputs, runner, infra).await;
Ok(SwarmResult {
task: task.to_string(),
outputs,
final_summary: summary,
})
}
async fn run_sequential(
&self,
task: &str,
runner: &Arc<dyn AgentRunner>,
infra: &SharedInfra,
) -> Result<SwarmResult, MultiError> {
let mailbox = Arc::new(Mailbox::default());
let mut outputs = Vec::new();
for spec in &self.agents {
let enriched = if outputs.is_empty() {
task.to_string()
} else {
let prior: Vec<String> = outputs
.iter()
.filter_map(|o: &AgentOutput| {
if o.succeeded() {
Some(format!("- {}: {}", o.name, truncate(&o.answer, 300)))
} else {
None
}
})
.collect();
format!("{}\n\nPrior agents' findings:\n{}", task, prior.join("\n"))
};
let rt = infra.make_runtime();
for tool in &spec.tools {
rt.register_tool(tool).await;
}
let start = Instant::now();
match runner.run(spec, &enriched, &rt, &mailbox).await {
Ok(output) => {
infra.state.set(
&format!("agent.{}.answer", output.name),
serde_json::Value::String(output.answer.clone()),
&format!("swarm.{}", output.name),
);
outputs.push(output);
}
Err(e) => {
outputs.push(AgentOutput {
name: spec.name.clone(),
answer: String::new(),
turns: 0,
tool_calls: 0,
duration_ms: start.elapsed().as_secs_f64() * 1000.0,
error: Some(e.to_string()),
outcome: None,
tokens: None,
});
}
}
}
let summary = self.synthesize(task, &outputs, runner, infra).await;
Ok(SwarmResult {
task: task.to_string(),
outputs,
final_summary: summary,
})
}
async fn run_debate(
&self,
task: &str,
runner: &Arc<dyn AgentRunner>,
infra: &SharedInfra,
) -> Result<SwarmResult, MultiError> {
let round1 = Swarm::new(self.agents.clone(), SwarmMode::Parallel)
.run(task, runner, infra)
.await?;
let mut critique_specs = Vec::new();
for spec in &self.agents {
let others: Vec<String> = round1
.outputs
.iter()
.filter(|o| o.name != spec.name && o.succeeded())
.map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 300)))
.collect();
let critique_prompt = format!(
"{}\n\nOriginal task: {}\n\nOther agents' answers:\n{}\n\n\
Critique these answers and provide your improved response.",
spec.system_prompt,
task,
others.join("\n")
);
let mut critique_spec = spec.clone();
critique_spec.name = format!("{}_critique", spec.name);
critique_spec.system_prompt = critique_prompt;
critique_specs.push(critique_spec);
}
let round2 = Swarm::new(critique_specs, SwarmMode::Parallel)
.run(task, runner, infra)
.await?;
let mut all_outputs = round1.outputs;
all_outputs.extend(round2.outputs);
let summary = self.synthesize(task, &all_outputs, runner, infra).await;
Ok(SwarmResult {
task: task.to_string(),
outputs: all_outputs,
final_summary: summary,
})
}
async fn synthesize(
&self,
task: &str,
outputs: &[AgentOutput],
runner: &Arc<dyn AgentRunner>,
infra: &SharedInfra,
) -> String {
let answers: Vec<&AgentOutput> = outputs.iter().filter(|o| o.succeeded()).collect();
if answers.is_empty() {
return "[no agent produced an answer]".to_string();
}
if answers.len() == 1 {
return answers[0].answer.clone();
}
if let Some(synth_spec) = &self.synthesizer {
let summaries: Vec<String> = answers
.iter()
.map(|o| format!("- {}: {}", o.name, truncate(&o.answer, 500)))
.collect();
let synth_task = format!(
"Original task: {}\n\nAgent outputs:\n{}\n\nSynthesize these into a single coherent answer.",
task,
summaries.join("\n")
);
let mailbox = Mailbox::default();
let rt = infra.make_runtime();
if let Ok(output) = runner.run(synth_spec, &synth_task, &rt, &mailbox).await {
return output.answer;
}
}
answers
.iter()
.map(|o| format!("## {}\n{}", o.name, o.answer))
.collect::<Vec<_>>()
.join("\n\n")
}
}
fn truncate(s: &str, max_len: usize) -> &str {
if s.len() <= max_len {
return s;
}
let mut end = max_len;
while end > 0 && !s.is_char_boundary(end) {
end -= 1;
}
&s[..end]
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::MultiError;
use crate::mailbox::Mailbox;
use crate::runner::AgentRunner;
use crate::types::{AgentOutput, AgentSpec};
use car_engine::Runtime;
use std::sync::atomic::{AtomicU32, Ordering};
struct MockRunner {
call_count: AtomicU32,
}
#[async_trait::async_trait]
impl AgentRunner for MockRunner {
async fn run(
&self,
spec: &AgentSpec,
task: &str,
_runtime: &Runtime,
_mailbox: &Mailbox,
) -> Result<AgentOutput, MultiError> {
let _n = self.call_count.fetch_add(1, Ordering::SeqCst);
Ok(AgentOutput {
name: spec.name.clone(),
answer: format!("answer from {} for: {}", spec.name, &task[..task.len().min(50)]),
turns: 1,
tool_calls: 0,
duration_ms: 10.0,
error: None,
outcome: None,
tokens: None,
})
}
}
#[tokio::test]
async fn test_parallel_swarm() {
let agents = vec![
AgentSpec::new("alice", "You are Alice"),
AgentSpec::new("bob", "You are Bob"),
];
let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
call_count: AtomicU32::new(0),
});
let infra = SharedInfra::new();
let result = Swarm::new(agents, SwarmMode::Parallel)
.run("test task", &runner, &infra)
.await
.unwrap();
assert_eq!(result.outputs.len(), 2);
assert!(result.outputs.iter().all(|o| o.succeeded()));
assert!(infra.state.get("agent.alice.answer").is_some());
assert!(infra.state.get("agent.bob.answer").is_some());
}
#[tokio::test]
async fn test_sequential_swarm() {
let agents = vec![
AgentSpec::new("first", "Go first"),
AgentSpec::new("second", "Go second"),
];
let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
call_count: AtomicU32::new(0),
});
let infra = SharedInfra::new();
let result = Swarm::new(agents, SwarmMode::Sequential)
.run("sequential task", &runner, &infra)
.await
.unwrap();
assert_eq!(result.outputs.len(), 2);
assert!(result.outputs[1].answer.contains("Prior agents"));
}
#[tokio::test]
async fn test_debate_swarm() {
let agents = vec![
AgentSpec::new("debater_a", "Argue for"),
AgentSpec::new("debater_b", "Argue against"),
];
let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner {
call_count: AtomicU32::new(0),
});
let infra = SharedInfra::new();
let result = Swarm::new(agents, SwarmMode::Debate)
.run("debate topic", &runner, &infra)
.await
.unwrap();
assert_eq!(result.outputs.len(), 4);
}
}