car_multi/patterns/
fleet.rs1use crate::error::MultiError;
14use crate::mailbox::Mailbox;
15use crate::runner::AgentRunner;
16use crate::shared::SharedInfra;
17use crate::types::{AgentOutput, AgentSpec};
18use serde::{Deserialize, Serialize};
19use std::sync::Arc;
20use std::time::Instant;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct FleetResult {
25 pub outputs: Vec<AgentOutput>,
27 pub duration_ms: f64,
29 pub succeeded: usize,
31 pub failed: usize,
33}
34
35pub struct Fleet {
50 pub agents: Vec<AgentSpec>,
52 pub agent_timeout_secs: Option<u64>,
54}
55
56impl Fleet {
57 pub fn new(agents: Vec<AgentSpec>) -> Self {
58 Self {
59 agents,
60 agent_timeout_secs: None,
61 }
62 }
63
64 pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
65 self.agent_timeout_secs = Some(timeout_secs);
66 self
67 }
68
69 pub async fn run(
71 &self,
72 runner: &Arc<dyn AgentRunner>,
73 infra: &SharedInfra,
74 ) -> Result<FleetResult, MultiError> {
75 let start = Instant::now();
76 let mailbox = Arc::new(Mailbox::default());
77
78 let mut handles = Vec::new();
80 for spec in &self.agents {
81 let runner = Arc::clone(runner);
82 let rt = infra.make_runtime();
83 let spec = spec.clone();
84 let mailbox = Arc::clone(&mailbox);
85 let timeout = self.agent_timeout_secs;
86
87 let _rx = mailbox.register(&spec.name).await;
89
90 let handle = tokio::spawn(async move {
91 let task = spec
92 .metadata
93 .get("task")
94 .and_then(|v| v.as_str())
95 .unwrap_or("")
96 .to_string();
97
98 let result = if let Some(secs) = timeout {
99 match tokio::time::timeout(
100 tokio::time::Duration::from_secs(secs),
101 runner.run(&spec, &task, &rt, &mailbox),
102 )
103 .await
104 {
105 Ok(result) => result,
106 Err(_) => Ok(AgentOutput {
107 name: spec.name.clone(),
108 answer: format!("Timed out after {}s", secs),
109 turns: 0,
110 tool_calls: 0,
111 duration_ms: (secs * 1000) as f64,
112 error: Some("timeout".to_string()),
113 }),
114 }
115 } else {
116 runner.run(&spec, &task, &rt, &mailbox).await
117 };
118
119 mailbox.unregister(&spec.name).await;
121
122 result
123 });
124
125 handles.push(handle);
126 }
127
128 let mut outputs = Vec::new();
130 let mut succeeded = 0;
131 let mut failed = 0;
132
133 for handle in handles {
134 match handle.await {
135 Ok(Ok(output)) => {
136 if output.succeeded() {
137 succeeded += 1;
138 } else {
139 failed += 1;
140 }
141 outputs.push(output);
142 }
143 Ok(Err(e)) => {
144 failed += 1;
145 outputs.push(AgentOutput {
146 name: "unknown".to_string(),
147 answer: String::new(),
148 turns: 0,
149 tool_calls: 0,
150 duration_ms: 0.0,
151 error: Some(e.to_string()),
152 });
153 }
154 Err(e) => {
155 failed += 1;
156 outputs.push(AgentOutput {
157 name: "unknown".to_string(),
158 answer: String::new(),
159 turns: 0,
160 tool_calls: 0,
161 duration_ms: 0.0,
162 error: Some(format!("Task join error: {}", e)),
163 });
164 }
165 }
166 }
167
168 Ok(FleetResult {
169 outputs,
170 duration_ms: start.elapsed().as_millis() as f64,
171 succeeded,
172 failed,
173 })
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180 use crate::types::AgentSpec;
181
182 #[test]
183 fn test_fleet_construction() {
184 let fleet = Fleet::new(vec![
185 AgentSpec::new("a", "Agent A system prompt"),
186 AgentSpec::new("b", "Agent B system prompt"),
187 ])
188 .with_timeout(60);
189
190 assert_eq!(fleet.agents.len(), 2);
191 assert_eq!(fleet.agent_timeout_secs, Some(60));
192 }
193}