1use std::sync::{Arc, Mutex};
8use std::time::Instant;
9
10use tokio::sync::RwLock;
11use tokio::task::JoinHandle;
12use tracing::{info, warn};
13
14use crate::agents::{ActionResult as AgentActionResult, AgentAction};
15use crate::constants::MAX_CONCURRENT_AGENTS;
16use crate::models::{ChatMessage, Model, ModelConfig, StreamCallback};
17use crate::prompts;
18use crate::runtime::agent_loop::{self, AgentObserver, LoopControl, MAX_AGENT_ITERATIONS};
19use crate::utils::MutexExt;
20
21#[derive(Debug, Clone)]
24pub struct SubagentProgress {
25 pub id: usize,
26 pub description: String,
27 pub status: SubagentStatus,
28 pub tool_uses: usize,
29 pub tokens: usize,
30 pub started_at: Instant,
31}
32
33#[derive(Debug, Clone)]
35pub enum SubagentStatus {
36 Running,
37 Completed,
38 Failed(String),
39}
40
41#[derive(Debug, Clone)]
43pub struct SubagentResult {
44 pub id: usize,
45 pub description: String,
46 pub response: String,
47 pub tool_uses: usize,
48 pub tokens: usize,
49 pub duration_secs: f64,
50 pub success: bool,
51}
52
53struct SubagentObserver {
55 progress: Arc<Mutex<Vec<SubagentProgress>>>,
56 index: usize,
57}
58
59impl AgentObserver for SubagentObserver {
60 fn check_interrupt(&mut self) -> LoopControl {
61 LoopControl::Continue
64 }
65
66 fn on_status(&mut self, _message: &str) {}
67
68 fn on_tool_result(
69 &mut self,
70 _tool_name: &str,
71 _tool_call_id: &str,
72 _action: &AgentAction,
73 _result: &AgentActionResult,
74 ) {
75 let mut progress = self.progress.lock_mut_safe();
76 if let Some(entry) = progress.get_mut(self.index) {
77 entry.tool_uses += 1;
78 }
79 }
80
81 fn on_error(&mut self, error: &str) {
82 warn!(subagent_index = self.index, "Subagent error: {}", error);
83 }
84
85 fn on_generation_start(&mut self) {}
86
87 fn on_generation_complete(&mut self, tokens: usize) {
88 let mut progress = self.progress.lock_mut_safe();
89 if let Some(entry) = progress.get_mut(self.index) {
90 entry.tokens += tokens;
91 }
92 }
93}
94
95async fn run_subagent(
97 model: Arc<RwLock<Box<dyn Model>>>,
98 config: ModelConfig,
99 id: usize,
100 prompt: String,
101 description: String,
102 progress: Arc<Mutex<Vec<SubagentProgress>>>,
103 progress_index: usize,
104) -> SubagentResult {
105 let started_at = Instant::now();
106
107 let system_prompt = config
109 .system_prompt
110 .clone()
111 .unwrap_or_else(prompts::get_system_prompt);
112 let mut messages = vec![
113 ChatMessage::system(system_prompt),
114 ChatMessage::user(prompt),
115 ];
116
117 let response_text = Arc::new(std::sync::Mutex::new(String::new()));
119 let response_clone = Arc::clone(&response_text);
120 let callback: StreamCallback = Arc::new(move |chunk: &str| {
121 let mut resp = response_clone.lock_mut_safe();
122 resp.push_str(chunk);
123 });
124
125 let initial_result = {
126 let model_guard = model.read().await;
127 model_guard.chat(&messages, &config, Some(callback)).await
128 };
129
130 let (content, initial_tool_calls, initial_tokens) = match initial_result {
131 Ok(response) => {
132 let callback_content = response_text.lock_mut_safe().clone();
133 let content = if !callback_content.is_empty() {
134 callback_content
135 } else {
136 response.content.clone()
137 };
138 let tokens = response.usage.map(|u| u.completion_tokens).unwrap_or(0);
139 let tool_calls = response.tool_calls.unwrap_or_default();
140
141 {
142 let mut prog = progress.lock_mut_safe();
143 if let Some(entry) = prog.get_mut(progress_index) {
144 entry.tokens += tokens;
145 }
146 }
147
148 (content, tool_calls, tokens)
149 },
150 Err(e) => {
151 let error_msg = e.to_string();
152 {
153 let mut prog = progress.lock_mut_safe();
154 if let Some(entry) = prog.get_mut(progress_index) {
155 entry.status = SubagentStatus::Failed(error_msg.clone());
156 }
157 }
158 return SubagentResult {
159 id,
160 description,
161 response: error_msg,
162 tool_uses: 0,
163 tokens: 0,
164 duration_secs: started_at.elapsed().as_secs_f64(),
165 success: false,
166 };
167 },
168 };
169
170 if initial_tool_calls.is_empty() {
172 {
173 let mut prog = progress.lock_mut_safe();
174 if let Some(entry) = prog.get_mut(progress_index) {
175 entry.status = SubagentStatus::Completed;
176 }
177 }
178 return SubagentResult {
179 id,
180 description,
181 response: content,
182 tool_uses: 0,
183 tokens: initial_tokens,
184 duration_secs: started_at.elapsed().as_secs_f64(),
185 success: true,
186 };
187 }
188
189 let assistant_msg =
191 ChatMessage::assistant(content.clone()).with_tool_calls(initial_tool_calls.clone());
192 messages.push(assistant_msg);
193
194 let mut observer = SubagentObserver {
195 progress: Arc::clone(&progress),
196 index: progress_index,
197 };
198
199 let loop_result = agent_loop::run_agent_loop(
200 Arc::clone(&model),
201 &config,
202 &mut messages,
203 initial_tool_calls,
204 &mut observer,
205 MAX_AGENT_ITERATIONS,
206 )
207 .await;
208
209 match loop_result {
210 Ok(result) => {
211 let total_tokens = initial_tokens + result.total_tokens;
212 let total_tool_uses = result.tool_results.len();
213 let final_response = if result.final_response.is_empty() {
214 content
215 } else {
216 result.final_response
217 };
218
219 {
220 let mut prog = progress.lock_mut_safe();
221 if let Some(entry) = prog.get_mut(progress_index) {
222 entry.status = SubagentStatus::Completed;
223 entry.tokens = total_tokens;
224 entry.tool_uses = total_tool_uses;
225 }
226 }
227
228 SubagentResult {
229 id,
230 description,
231 response: final_response,
232 tool_uses: total_tool_uses,
233 tokens: total_tokens,
234 duration_secs: started_at.elapsed().as_secs_f64(),
235 success: !result.interrupted,
236 }
237 },
238 Err(e) => {
239 let error_msg = e.to_string();
240 let (tool_uses, tokens) = {
241 let prog = progress.lock_mut_safe();
242 prog.get(progress_index)
243 .map(|p| (p.tool_uses, p.tokens))
244 .unwrap_or((0, initial_tokens))
245 };
246
247 {
248 let mut prog = progress.lock_mut_safe();
249 if let Some(entry) = prog.get_mut(progress_index) {
250 entry.status = SubagentStatus::Failed(error_msg.clone());
251 }
252 }
253
254 SubagentResult {
255 id,
256 description,
257 response: error_msg,
258 tool_uses,
259 tokens,
260 duration_secs: started_at.elapsed().as_secs_f64(),
261 success: false,
262 }
263 },
264 }
265}
266
267pub fn spawn_subagents(
275 agents: Vec<(String, String)>,
276 model: Arc<RwLock<Box<dyn Model>>>,
277 config: &ModelConfig,
278 progress: Arc<Mutex<Vec<SubagentProgress>>>,
279) -> (Vec<JoinHandle<SubagentResult>>, Vec<SubagentResult>) {
280 let mut handles = Vec::new();
281 let mut overflow_results = Vec::new();
282
283 {
285 let mut prog = progress.lock_mut_safe();
286 for (i, (_prompt, description)) in agents.iter().enumerate() {
287 if i < MAX_CONCURRENT_AGENTS {
288 prog.push(SubagentProgress {
289 id: i,
290 description: description.clone(),
291 status: SubagentStatus::Running,
292 tool_uses: 0,
293 tokens: 0,
294 started_at: Instant::now(),
295 });
296 }
297 }
298 }
299
300 for (i, (prompt, description)) in agents.into_iter().enumerate() {
301 if i >= MAX_CONCURRENT_AGENTS {
302 warn!(
303 "Exceeded MAX_CONCURRENT_AGENTS ({}), skipping agent: {}",
304 MAX_CONCURRENT_AGENTS, description
305 );
306 overflow_results.push(SubagentResult {
307 id: i,
308 description,
309 response: format!(
310 "Exceeded maximum of {} concurrent agents. This agent was not spawned.",
311 MAX_CONCURRENT_AGENTS
312 ),
313 tool_uses: 0,
314 tokens: 0,
315 duration_secs: 0.0,
316 success: false,
317 });
318 continue;
319 }
320
321 let mut subagent_config = config.clone();
323 subagent_config.is_subagent = true;
324 subagent_config.thinking_enabled = Some(false);
325
326 let model_clone = Arc::clone(&model);
327 let progress_clone = Arc::clone(&progress);
328
329 info!(agent_id = i, description = %description, "Spawning subagent");
330
331 let handle = tokio::spawn(async move {
332 run_subagent(
333 model_clone,
334 subagent_config,
335 i,
336 prompt,
337 description,
338 progress_clone,
339 i,
340 )
341 .await
342 });
343
344 handles.push(handle);
345 }
346
347 (handles, overflow_results)
348}
349
350pub async fn collect_subagent_results(
353 handles: Vec<JoinHandle<SubagentResult>>,
354 mut overflow_results: Vec<SubagentResult>,
355) -> Vec<SubagentResult> {
356 let mut results = Vec::with_capacity(handles.len() + overflow_results.len());
357
358 for handle in handles {
359 match handle.await {
360 Ok(result) => results.push(result),
361 Err(e) => {
362 warn!("Subagent task failed: {}", e);
363 results.push(SubagentResult {
364 id: 0,
365 description: "Unknown".to_string(),
366 response: format!("Agent task failed: {}", e),
367 tool_uses: 0,
368 tokens: 0,
369 duration_secs: 0.0,
370 success: false,
371 });
372 },
373 }
374 }
375
376 results.append(&mut overflow_results);
377 results.sort_by_key(|r| r.id);
378 results
379}
380
381pub fn format_subagent_tool_result(result: &SubagentResult) -> String {
383 if result.success {
384 format!(
385 "Agent '{}' completed successfully ({} tool uses, {} tokens, {:.1}s):\n\n{}",
386 result.description, result.tool_uses, result.tokens, result.duration_secs,
387 result.response
388 )
389 } else {
390 format!(
391 "Agent '{}' failed: {} ({} tool uses, {} tokens, {:.1}s)",
392 result.description, result.response, result.tool_uses, result.tokens,
393 result.duration_secs
394 )
395 }
396}