Skip to main content

car_multi/patterns/
fleet.rs

1//! Fleet — independent agents with shared knowledge graph.
2//!
3//! Unlike other patterns:
4//! - **Not Swarm**: agents work on *different* problems, not the same one
5//! - **Not Pipeline**: no ordering between agents
6//! - **Not Supervisor**: no review loop
7//!
8//! Agents run fully independently but share a `car-memgine` knowledge graph.
9//! Each agent can publish facts that others discover via spreading activation.
10//! This maps to Hydra's "heads" concept: parallel missions with explicit
11//! knowledge sharing.
12
13use crate::error::MultiError;
14use crate::mailbox::Mailbox;
15use crate::runner::AgentRunner;
16use crate::shared::SharedInfra;
17use crate::types::{AgentOutput, AgentSpec};
18use serde::{Deserialize, Serialize};
19use std::sync::Arc;
20use std::time::Instant;
21use tracing::instrument;
22
23/// Result from a Fleet execution.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct FleetResult {
26    /// Outputs from all agents.
27    pub outputs: Vec<AgentOutput>,
28    /// Total wall-clock duration.
29    pub duration_ms: f64,
30    /// Number of agents that succeeded.
31    pub succeeded: usize,
32    /// Number of agents that failed.
33    pub failed: usize,
34}
35
36/// Fleet orchestrator — runs independent agents concurrently with shared knowledge.
37///
38/// Each agent gets its own Runtime (via SharedInfra) but they share state,
39/// event log, and policies. Agents communicate through the shared state
40/// (and optionally via Mailbox for real-time messaging).
41///
42/// # Example
43/// ```ignore
44/// let fleet = Fleet::new(vec![
45///     AgentSpec::new("researcher", "Research competitor pricing"),
46///     AgentSpec::new("analyst", "Analyze market trends"),
47/// ]);
48/// let result = fleet.run(&runner, &infra).await?;
49/// ```
50pub struct Fleet {
51    /// Agent specifications — each runs independently.
52    pub agents: Vec<AgentSpec>,
53    /// Optional timeout per agent (seconds).
54    pub agent_timeout_secs: Option<u64>,
55}
56
57impl Fleet {
58    pub fn new(agents: Vec<AgentSpec>) -> Self {
59        Self {
60            agents,
61            agent_timeout_secs: None,
62        }
63    }
64
65    pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
66        self.agent_timeout_secs = Some(timeout_secs);
67        self
68    }
69
70    /// Run all agents concurrently. Returns when all complete (or timeout).
71    #[instrument(name = "multi.fleet", skip_all)]
72    pub async fn run(
73        &self,
74        runner: &Arc<dyn AgentRunner>,
75        infra: &SharedInfra,
76    ) -> Result<FleetResult, MultiError> {
77        let start = Instant::now();
78        let mailbox = Arc::new(Mailbox::default());
79
80        // Spawn all agents concurrently
81        let mut handles = Vec::new();
82        for spec in &self.agents {
83            let runner = Arc::clone(runner);
84            let rt = infra.make_runtime();
85            let spec = spec.clone();
86            let mailbox = Arc::clone(&mailbox);
87            let timeout = self.agent_timeout_secs;
88
89            // Register with mailbox for optional inter-agent messaging
90            let _rx = mailbox.register(&spec.name).await;
91
92            let handle = tokio::spawn(async move {
93                let task = spec
94                    .metadata
95                    .get("task")
96                    .and_then(|v| v.as_str())
97                    .unwrap_or("")
98                    .to_string();
99
100                let result = if let Some(secs) = timeout {
101                    match tokio::time::timeout(
102                        tokio::time::Duration::from_secs(secs),
103                        runner.run(&spec, &task, &rt, &mailbox),
104                    )
105                    .await
106                    {
107                        Ok(result) => result,
108                        Err(_) => Ok(AgentOutput {
109                            name: spec.name.clone(),
110                            answer: format!("Timed out after {}s", secs),
111                            turns: 0,
112                            tool_calls: 0,
113                            duration_ms: (secs * 1000) as f64,
114                            error: Some("timeout".to_string()),
115                            outcome: None,
116                            tokens: None,
117                        }),
118                    }
119                } else {
120                    runner.run(&spec, &task, &rt, &mailbox).await
121                };
122
123                // Unregister from mailbox
124                mailbox.unregister(&spec.name).await;
125
126                result
127            });
128
129            handles.push(handle);
130        }
131
132        // Collect results
133        let mut outputs = Vec::new();
134        let mut succeeded = 0;
135        let mut failed = 0;
136
137        for handle in handles {
138            match handle.await {
139                Ok(Ok(output)) => {
140                    if output.succeeded() {
141                        succeeded += 1;
142                    } else {
143                        failed += 1;
144                    }
145                    outputs.push(output);
146                }
147                Ok(Err(e)) => {
148                    failed += 1;
149                    outputs.push(AgentOutput {
150                        name: "unknown".to_string(),
151                        answer: String::new(),
152                        turns: 0,
153                        tool_calls: 0,
154                        duration_ms: 0.0,
155                        error: Some(e.to_string()),
156                        outcome: None,
157                        tokens: None,
158                    });
159                }
160                Err(e) => {
161                    failed += 1;
162                    outputs.push(AgentOutput {
163                        name: "unknown".to_string(),
164                        answer: String::new(),
165                        turns: 0,
166                        tool_calls: 0,
167                        duration_ms: 0.0,
168                        error: Some(format!("Task join error: {}", e)),
169                        outcome: None,
170                        tokens: None,
171                    });
172                }
173            }
174        }
175
176        Ok(FleetResult {
177            outputs,
178            duration_ms: start.elapsed().as_millis() as f64,
179            succeeded,
180            failed,
181        })
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188    use crate::types::AgentSpec;
189
190    #[test]
191    fn test_fleet_construction() {
192        let fleet = Fleet::new(vec![
193            AgentSpec::new("a", "Agent A system prompt"),
194            AgentSpec::new("b", "Agent B system prompt"),
195        ])
196        .with_timeout(60);
197
198        assert_eq!(fleet.agents.len(), 2);
199        assert_eq!(fleet.agent_timeout_secs, Some(60));
200    }
201}