Skip to main content

car_multi/patterns/
fleet.rs

1//! Fleet — independent agents with shared knowledge graph.
2//!
3//! Unlike other patterns:
4//! - **Not Swarm**: agents work on *different* problems, not the same one
5//! - **Not Pipeline**: no ordering between agents
6//! - **Not Supervisor**: no review loop
7//!
8//! Agents run fully independently but share a `car-memgine` knowledge graph.
9//! Each agent can publish facts that others discover via spreading activation.
10//! This maps to Hydra's "heads" concept: parallel missions with explicit
11//! knowledge sharing.
12
13use crate::error::MultiError;
14use crate::mailbox::Mailbox;
15use crate::runner::AgentRunner;
16use crate::shared::SharedInfra;
17use crate::types::{AgentOutput, AgentSpec};
18use serde::{Deserialize, Serialize};
19use std::sync::Arc;
20use std::time::Instant;
21use tracing::instrument;
22
23/// Result from a Fleet execution.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct FleetResult {
26    /// Outputs from all agents.
27    pub outputs: Vec<AgentOutput>,
28    /// Total wall-clock duration.
29    pub duration_ms: f64,
30    /// Number of agents that succeeded.
31    pub succeeded: usize,
32    /// Number of agents that failed.
33    pub failed: usize,
34}
35
36/// Fleet orchestrator — runs independent agents concurrently with shared knowledge.
37///
38/// Each agent gets its own Runtime (via SharedInfra) but they share state,
39/// event log, and policies. Agents communicate through the shared state
40/// (and optionally via Mailbox for real-time messaging).
41///
42/// # Example
43/// ```ignore
44/// let fleet = Fleet::new(vec![
45///     AgentSpec::new("researcher", "Research competitor pricing"),
46///     AgentSpec::new("analyst", "Analyze market trends"),
47/// ]);
48/// let result = fleet.run(&runner, &infra).await?;
49/// ```
50pub struct Fleet {
51    /// Agent specifications — each runs independently.
52    pub agents: Vec<AgentSpec>,
53    /// Optional timeout per agent (seconds).
54    pub agent_timeout_secs: Option<u64>,
55}
56
57impl Fleet {
58    pub fn new(agents: Vec<AgentSpec>) -> Self {
59        Self {
60            agents,
61            agent_timeout_secs: None,
62        }
63    }
64
65    pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
66        self.agent_timeout_secs = Some(timeout_secs);
67        self
68    }
69
70    /// Run all agents concurrently. Returns when all complete (or timeout).
71    #[instrument(name = "multi.fleet", skip_all)]
72    pub async fn run(
73        &self,
74        runner: &Arc<dyn AgentRunner>,
75        infra: &SharedInfra,
76    ) -> Result<FleetResult, MultiError> {
77        let start = Instant::now();
78        let mailbox = Arc::new(Mailbox::default());
79
80        // Spawn all agents concurrently
81        let mut handles = Vec::new();
82        let mut skipped: Vec<AgentOutput> = Vec::new();
83        for spec in &self.agents {
84            // Budget gate per agent. The fleet runs independent missions, so a
85            // denied agent is simply dropped from the run and reported as failed.
86            if let Err(e) = infra.begin_agent() {
87                skipped.push(crate::budget::budget_skipped_output(&spec.name, &e));
88                continue;
89            }
90
91            let runner = Arc::clone(runner);
92            let rt = infra.make_runtime();
93            let spec = spec.clone();
94            let mailbox = Arc::clone(&mailbox);
95            let timeout = self.agent_timeout_secs;
96
97            // Register with mailbox for optional inter-agent messaging
98            let _rx = mailbox.register(&spec.name).await;
99
100            let handle = tokio::spawn(async move {
101                let task = spec
102                    .metadata
103                    .get("task")
104                    .and_then(|v| v.as_str())
105                    .unwrap_or("")
106                    .to_string();
107
108                let result = if let Some(secs) = timeout {
109                    match tokio::time::timeout(
110                        tokio::time::Duration::from_secs(secs),
111                        runner.run(&spec, &task, &rt, &mailbox),
112                    )
113                    .await
114                    {
115                        Ok(result) => result,
116                        Err(_) => Ok(AgentOutput {
117                            name: spec.name.clone(),
118                            answer: format!("Timed out after {}s", secs),
119                            turns: 0,
120                            tool_calls: 0,
121                            duration_ms: (secs * 1000) as f64,
122                            error: Some("timeout".to_string()),
123                            outcome: None,
124                            tokens: None,
125                        }),
126                    }
127                } else {
128                    runner.run(&spec, &task, &rt, &mailbox).await
129                };
130
131                // Unregister from mailbox
132                mailbox.unregister(&spec.name).await;
133
134                result
135            });
136
137            handles.push(handle);
138        }
139
140        // Collect results
141        let mut outputs = Vec::new();
142        let mut succeeded = 0;
143        let mut failed = 0;
144
145        for handle in handles {
146            match handle.await {
147                Ok(Ok(output)) => {
148                    infra.record_output(&output);
149                    if output.succeeded() {
150                        succeeded += 1;
151                    } else {
152                        failed += 1;
153                    }
154                    outputs.push(output);
155                }
156                Ok(Err(e)) => {
157                    // Spend before a runner Err is not metered (no token payload
158                    // on the error path) — the budget can under-count failures.
159                    failed += 1;
160                    outputs.push(AgentOutput {
161                        name: "unknown".to_string(),
162                        answer: String::new(),
163                        turns: 0,
164                        tool_calls: 0,
165                        duration_ms: 0.0,
166                        error: Some(e.to_string()),
167                        outcome: None,
168                        tokens: None,
169                    });
170                }
171                Err(e) => {
172                    failed += 1;
173                    outputs.push(AgentOutput {
174                        name: "unknown".to_string(),
175                        answer: String::new(),
176                        turns: 0,
177                        tool_calls: 0,
178                        duration_ms: 0.0,
179                        error: Some(format!("Task join error: {}", e)),
180                        outcome: None,
181                        tokens: None,
182                    });
183                }
184            }
185        }
186
187        // Append budget-skipped agents as failures.
188        failed += skipped.len();
189        outputs.extend(skipped);
190
191        Ok(FleetResult {
192            outputs,
193            duration_ms: start.elapsed().as_millis() as f64,
194            succeeded,
195            failed,
196        })
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use crate::types::AgentSpec;
204
205    #[test]
206    fn test_fleet_construction() {
207        let fleet = Fleet::new(vec![
208            AgentSpec::new("a", "Agent A system prompt"),
209            AgentSpec::new("b", "Agent B system prompt"),
210        ])
211        .with_timeout(60);
212
213        assert_eq!(fleet.agents.len(), 2);
214        assert_eq!(fleet.agent_timeout_secs, Some(60));
215    }
216}