Skip to main content

car_multi/patterns/
fleet.rs

1//! Fleet — independent agents with shared knowledge graph.
2//!
3//! Unlike other patterns:
4//! - **Not Swarm**: agents work on *different* problems, not the same one
5//! - **Not Pipeline**: no ordering between agents
6//! - **Not Supervisor**: no review loop
7//!
8//! Agents run fully independently but share a `car-memgine` knowledge graph.
9//! Each agent can publish facts that others discover via spreading activation.
10//! This maps to Hydra's "heads" concept: parallel missions with explicit
11//! knowledge sharing.
12
13use crate::error::MultiError;
14use crate::mailbox::Mailbox;
15use crate::runner::AgentRunner;
16use crate::shared::SharedInfra;
17use crate::types::{AgentOutput, AgentSpec};
18use serde::{Deserialize, Serialize};
19use std::sync::Arc;
20use std::time::Instant;
21
22/// Result from a Fleet execution.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct FleetResult {
25    /// Outputs from all agents.
26    pub outputs: Vec<AgentOutput>,
27    /// Total wall-clock duration.
28    pub duration_ms: f64,
29    /// Number of agents that succeeded.
30    pub succeeded: usize,
31    /// Number of agents that failed.
32    pub failed: usize,
33}
34
35/// Fleet orchestrator — runs independent agents concurrently with shared knowledge.
36///
37/// Each agent gets its own Runtime (via SharedInfra) but they share state,
38/// event log, and policies. Agents communicate through the shared state
39/// (and optionally via Mailbox for real-time messaging).
40///
41/// # Example
42/// ```ignore
43/// let fleet = Fleet::new(vec![
44///     AgentSpec::new("researcher", "Research competitor pricing"),
45///     AgentSpec::new("analyst", "Analyze market trends"),
46/// ]);
47/// let result = fleet.run(&runner, &infra).await?;
48/// ```
49pub struct Fleet {
50    /// Agent specifications — each runs independently.
51    pub agents: Vec<AgentSpec>,
52    /// Optional timeout per agent (seconds).
53    pub agent_timeout_secs: Option<u64>,
54}
55
56impl Fleet {
57    pub fn new(agents: Vec<AgentSpec>) -> Self {
58        Self {
59            agents,
60            agent_timeout_secs: None,
61        }
62    }
63
64    pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
65        self.agent_timeout_secs = Some(timeout_secs);
66        self
67    }
68
69    /// Run all agents concurrently. Returns when all complete (or timeout).
70    pub async fn run(
71        &self,
72        runner: &Arc<dyn AgentRunner>,
73        infra: &SharedInfra,
74    ) -> Result<FleetResult, MultiError> {
75        let start = Instant::now();
76        let mailbox = Arc::new(Mailbox::default());
77
78        // Spawn all agents concurrently
79        let mut handles = Vec::new();
80        for spec in &self.agents {
81            let runner = Arc::clone(runner);
82            let rt = infra.make_runtime();
83            let spec = spec.clone();
84            let mailbox = Arc::clone(&mailbox);
85            let timeout = self.agent_timeout_secs;
86
87            // Register with mailbox for optional inter-agent messaging
88            let _rx = mailbox.register(&spec.name).await;
89
90            let handle = tokio::spawn(async move {
91                let task = spec
92                    .metadata
93                    .get("task")
94                    .and_then(|v| v.as_str())
95                    .unwrap_or("")
96                    .to_string();
97
98                let result = if let Some(secs) = timeout {
99                    match tokio::time::timeout(
100                        tokio::time::Duration::from_secs(secs),
101                        runner.run(&spec, &task, &rt, &mailbox),
102                    )
103                    .await
104                    {
105                        Ok(result) => result,
106                        Err(_) => Ok(AgentOutput {
107                            name: spec.name.clone(),
108                            answer: format!("Timed out after {}s", secs),
109                            turns: 0,
110                            tool_calls: 0,
111                            duration_ms: (secs * 1000) as f64,
112                            error: Some("timeout".to_string()),
113                        }),
114                    }
115                } else {
116                    runner.run(&spec, &task, &rt, &mailbox).await
117                };
118
119                // Unregister from mailbox
120                mailbox.unregister(&spec.name).await;
121
122                result
123            });
124
125            handles.push(handle);
126        }
127
128        // Collect results
129        let mut outputs = Vec::new();
130        let mut succeeded = 0;
131        let mut failed = 0;
132
133        for handle in handles {
134            match handle.await {
135                Ok(Ok(output)) => {
136                    if output.succeeded() {
137                        succeeded += 1;
138                    } else {
139                        failed += 1;
140                    }
141                    outputs.push(output);
142                }
143                Ok(Err(e)) => {
144                    failed += 1;
145                    outputs.push(AgentOutput {
146                        name: "unknown".to_string(),
147                        answer: String::new(),
148                        turns: 0,
149                        tool_calls: 0,
150                        duration_ms: 0.0,
151                        error: Some(e.to_string()),
152                    });
153                }
154                Err(e) => {
155                    failed += 1;
156                    outputs.push(AgentOutput {
157                        name: "unknown".to_string(),
158                        answer: String::new(),
159                        turns: 0,
160                        tool_calls: 0,
161                        duration_ms: 0.0,
162                        error: Some(format!("Task join error: {}", e)),
163                    });
164                }
165            }
166        }
167
168        Ok(FleetResult {
169            outputs,
170            duration_ms: start.elapsed().as_millis() as f64,
171            succeeded,
172            failed,
173        })
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180    use crate::types::AgentSpec;
181
182    #[test]
183    fn test_fleet_construction() {
184        let fleet = Fleet::new(vec![
185            AgentSpec::new("a", "Agent A system prompt"),
186            AgentSpec::new("b", "Agent B system prompt"),
187        ])
188        .with_timeout(60);
189
190        assert_eq!(fleet.agents.len(), 2);
191        assert_eq!(fleet.agent_timeout_secs, Some(60));
192    }
193}