car_multi/patterns/
fleet.rs1use crate::error::MultiError;
14use crate::mailbox::Mailbox;
15use crate::runner::AgentRunner;
16use crate::shared::SharedInfra;
17use crate::types::{AgentOutput, AgentSpec};
18use serde::{Deserialize, Serialize};
19use std::sync::Arc;
20use std::time::Instant;
21use tracing::instrument;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct FleetResult {
26 pub outputs: Vec<AgentOutput>,
28 pub duration_ms: f64,
30 pub succeeded: usize,
32 pub failed: usize,
34}
35
36pub struct Fleet {
51 pub agents: Vec<AgentSpec>,
53 pub agent_timeout_secs: Option<u64>,
55}
56
57impl Fleet {
58 pub fn new(agents: Vec<AgentSpec>) -> Self {
59 Self {
60 agents,
61 agent_timeout_secs: None,
62 }
63 }
64
65 pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
66 self.agent_timeout_secs = Some(timeout_secs);
67 self
68 }
69
70 #[instrument(name = "multi.fleet", skip_all)]
72 pub async fn run(
73 &self,
74 runner: &Arc<dyn AgentRunner>,
75 infra: &SharedInfra,
76 ) -> Result<FleetResult, MultiError> {
77 let start = Instant::now();
78 let mailbox = Arc::new(Mailbox::default());
79
80 let mut handles = Vec::new();
82 for spec in &self.agents {
83 let runner = Arc::clone(runner);
84 let rt = infra.make_runtime();
85 let spec = spec.clone();
86 let mailbox = Arc::clone(&mailbox);
87 let timeout = self.agent_timeout_secs;
88
89 let _rx = mailbox.register(&spec.name).await;
91
92 let handle = tokio::spawn(async move {
93 let task = spec
94 .metadata
95 .get("task")
96 .and_then(|v| v.as_str())
97 .unwrap_or("")
98 .to_string();
99
100 let result = if let Some(secs) = timeout {
101 match tokio::time::timeout(
102 tokio::time::Duration::from_secs(secs),
103 runner.run(&spec, &task, &rt, &mailbox),
104 )
105 .await
106 {
107 Ok(result) => result,
108 Err(_) => Ok(AgentOutput {
109 name: spec.name.clone(),
110 answer: format!("Timed out after {}s", secs),
111 turns: 0,
112 tool_calls: 0,
113 duration_ms: (secs * 1000) as f64,
114 error: Some("timeout".to_string()),
115 outcome: None,
116 tokens: None,
117 }),
118 }
119 } else {
120 runner.run(&spec, &task, &rt, &mailbox).await
121 };
122
123 mailbox.unregister(&spec.name).await;
125
126 result
127 });
128
129 handles.push(handle);
130 }
131
132 let mut outputs = Vec::new();
134 let mut succeeded = 0;
135 let mut failed = 0;
136
137 for handle in handles {
138 match handle.await {
139 Ok(Ok(output)) => {
140 if output.succeeded() {
141 succeeded += 1;
142 } else {
143 failed += 1;
144 }
145 outputs.push(output);
146 }
147 Ok(Err(e)) => {
148 failed += 1;
149 outputs.push(AgentOutput {
150 name: "unknown".to_string(),
151 answer: String::new(),
152 turns: 0,
153 tool_calls: 0,
154 duration_ms: 0.0,
155 error: Some(e.to_string()),
156 outcome: None,
157 tokens: None,
158 });
159 }
160 Err(e) => {
161 failed += 1;
162 outputs.push(AgentOutput {
163 name: "unknown".to_string(),
164 answer: String::new(),
165 turns: 0,
166 tool_calls: 0,
167 duration_ms: 0.0,
168 error: Some(format!("Task join error: {}", e)),
169 outcome: None,
170 tokens: None,
171 });
172 }
173 }
174 }
175
176 Ok(FleetResult {
177 outputs,
178 duration_ms: start.elapsed().as_millis() as f64,
179 succeeded,
180 failed,
181 })
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188 use crate::types::AgentSpec;
189
190 #[test]
191 fn test_fleet_construction() {
192 let fleet = Fleet::new(vec![
193 AgentSpec::new("a", "Agent A system prompt"),
194 AgentSpec::new("b", "Agent B system prompt"),
195 ])
196 .with_timeout(60);
197
198 assert_eq!(fleet.agents.len(), 2);
199 assert_eq!(fleet.agent_timeout_secs, Some(60));
200 }
201}