car_multi/patterns/fleet.rs
1//! Fleet — independent agents with shared knowledge graph.
2//!
3//! Unlike other patterns:
4//! - **Not Swarm**: agents work on *different* problems, not the same one
5//! - **Not Pipeline**: no ordering between agents
6//! - **Not Supervisor**: no review loop
7//!
8//! Agents run fully independently but share a `car-memgine` knowledge graph.
9//! Each agent can publish facts that others discover via spreading activation.
10//! This maps to Hydra's "heads" concept: parallel missions with explicit
11//! knowledge sharing.
12
13use crate::error::MultiError;
14use crate::mailbox::Mailbox;
15use crate::runner::AgentRunner;
16use crate::shared::SharedInfra;
17use crate::types::{AgentOutput, AgentSpec};
18use serde::{Deserialize, Serialize};
19use std::sync::Arc;
20use std::time::Instant;
21use tracing::instrument;
22
23/// Result from a Fleet execution.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct FleetResult {
26 /// Outputs from all agents.
27 pub outputs: Vec<AgentOutput>,
28 /// Total wall-clock duration.
29 pub duration_ms: f64,
30 /// Number of agents that succeeded.
31 pub succeeded: usize,
32 /// Number of agents that failed.
33 pub failed: usize,
34}
35
36/// Fleet orchestrator — runs independent agents concurrently with shared knowledge.
37///
38/// Each agent gets its own Runtime (via SharedInfra) but they share state,
39/// event log, and policies. Agents communicate through the shared state
40/// (and optionally via Mailbox for real-time messaging).
41///
42/// # Example
43/// ```ignore
44/// let fleet = Fleet::new(vec![
45/// AgentSpec::new("researcher", "Research competitor pricing"),
46/// AgentSpec::new("analyst", "Analyze market trends"),
47/// ]);
48/// let result = fleet.run(&runner, &infra).await?;
49/// ```
50pub struct Fleet {
51 /// Agent specifications — each runs independently.
52 pub agents: Vec<AgentSpec>,
53 /// Optional timeout per agent (seconds).
54 pub agent_timeout_secs: Option<u64>,
55}
56
57impl Fleet {
58 pub fn new(agents: Vec<AgentSpec>) -> Self {
59 Self {
60 agents,
61 agent_timeout_secs: None,
62 }
63 }
64
65 pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
66 self.agent_timeout_secs = Some(timeout_secs);
67 self
68 }
69
70 /// Run all agents concurrently. Returns when all complete (or timeout).
71 #[instrument(name = "multi.fleet", skip_all)]
72 pub async fn run(
73 &self,
74 runner: &Arc<dyn AgentRunner>,
75 infra: &SharedInfra,
76 ) -> Result<FleetResult, MultiError> {
77 let start = Instant::now();
78 let mailbox = Arc::new(Mailbox::default());
79
80 // Spawn all agents concurrently
81 let mut handles = Vec::new();
82 let mut skipped: Vec<AgentOutput> = Vec::new();
83 for spec in &self.agents {
84 // Budget gate per agent. The fleet runs independent missions, so a
85 // denied agent is simply dropped from the run and reported as failed.
86 if let Err(e) = infra.begin_agent() {
87 skipped.push(crate::budget::budget_skipped_output(&spec.name, &e));
88 continue;
89 }
90
91 let runner = Arc::clone(runner);
92 let rt = infra.make_runtime();
93 let spec = spec.clone();
94 let mailbox = Arc::clone(&mailbox);
95 let timeout = self.agent_timeout_secs;
96
97 // Register with mailbox for optional inter-agent messaging
98 let _rx = mailbox.register(&spec.name).await;
99
100 let handle = tokio::spawn(async move {
101 let task = spec
102 .metadata
103 .get("task")
104 .and_then(|v| v.as_str())
105 .unwrap_or("")
106 .to_string();
107
108 let result = if let Some(secs) = timeout {
109 match tokio::time::timeout(
110 tokio::time::Duration::from_secs(secs),
111 runner.run(&spec, &task, &rt, &mailbox),
112 )
113 .await
114 {
115 Ok(result) => result,
116 Err(_) => Ok(AgentOutput {
117 name: spec.name.clone(),
118 answer: format!("Timed out after {}s", secs),
119 turns: 0,
120 tool_calls: 0,
121 duration_ms: (secs * 1000) as f64,
122 error: Some("timeout".to_string()),
123 outcome: None,
124 tokens: None,
125 }),
126 }
127 } else {
128 runner.run(&spec, &task, &rt, &mailbox).await
129 };
130
131 // Unregister from mailbox
132 mailbox.unregister(&spec.name).await;
133
134 result
135 });
136
137 handles.push(handle);
138 }
139
140 // Collect results
141 let mut outputs = Vec::new();
142 let mut succeeded = 0;
143 let mut failed = 0;
144
145 for handle in handles {
146 match handle.await {
147 Ok(Ok(output)) => {
148 infra.record_output(&output);
149 if output.succeeded() {
150 succeeded += 1;
151 } else {
152 failed += 1;
153 }
154 outputs.push(output);
155 }
156 Ok(Err(e)) => {
157 // Spend before a runner Err is not metered (no token payload
158 // on the error path) — the budget can under-count failures.
159 failed += 1;
160 outputs.push(AgentOutput {
161 name: "unknown".to_string(),
162 answer: String::new(),
163 turns: 0,
164 tool_calls: 0,
165 duration_ms: 0.0,
166 error: Some(e.to_string()),
167 outcome: None,
168 tokens: None,
169 });
170 }
171 Err(e) => {
172 failed += 1;
173 outputs.push(AgentOutput {
174 name: "unknown".to_string(),
175 answer: String::new(),
176 turns: 0,
177 tool_calls: 0,
178 duration_ms: 0.0,
179 error: Some(format!("Task join error: {}", e)),
180 outcome: None,
181 tokens: None,
182 });
183 }
184 }
185 }
186
187 // Append budget-skipped agents as failures.
188 failed += skipped.len();
189 outputs.extend(skipped);
190
191 Ok(FleetResult {
192 outputs,
193 duration_ms: start.elapsed().as_millis() as f64,
194 succeeded,
195 failed,
196 })
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use crate::types::AgentSpec;
204
205 #[test]
206 fn test_fleet_construction() {
207 let fleet = Fleet::new(vec![
208 AgentSpec::new("a", "Agent A system prompt"),
209 AgentSpec::new("b", "Agent B system prompt"),
210 ])
211 .with_timeout(60);
212
213 assert_eq!(fleet.agents.len(), 2);
214 assert_eq!(fleet.agent_timeout_secs, Some(60));
215 }
216}