1use std::sync::{Arc, Mutex};
8use std::time::Instant;
9
10use tokio::sync::RwLock;
11use tokio::task::JoinHandle;
12use tracing::{info, warn};
13
14use crate::agents::{ActionResult as AgentActionResult, AgentAction};
15use crate::constants::MAX_CONCURRENT_AGENTS;
16use crate::models::{
17 ChatMessage, Model, ModelConfig, ReasoningLevel, StreamCallback, StreamEvent, ToolCall,
18};
19use crate::prompts;
20use crate::runtime::agent_loop::{self, AgentObserver, LoopControl, MAX_AGENT_ITERATIONS};
21use crate::utils::MutexExt;
22
23#[derive(Debug, Clone)]
26pub struct SubagentProgress {
27 pub id: usize,
28 pub description: String,
29 pub status: SubagentStatus,
30 pub tool_uses: usize,
31 pub tokens: usize,
32 pub started_at: Instant,
33}
34
35#[derive(Debug, Clone)]
37pub enum SubagentStatus {
38 Running,
39 Completed,
40 Failed(String),
41}
42
43#[derive(Debug, Clone)]
45pub struct SubagentResult {
46 pub id: usize,
47 pub description: String,
48 pub response: String,
49 pub tool_uses: usize,
50 pub tokens: usize,
51 pub duration_secs: f64,
52 pub success: bool,
53}
54
55struct SubagentObserver {
57 progress: Arc<Mutex<Vec<SubagentProgress>>>,
58 index: usize,
59}
60
61impl AgentObserver for SubagentObserver {
62 fn check_interrupt(&mut self) -> LoopControl {
63 LoopControl::Continue
66 }
67
68 fn on_status(&mut self, _message: &str) {}
69
70 fn on_tool_result(
71 &mut self,
72 _tool_name: &str,
73 _tool_call_id: &str,
74 _action: &AgentAction,
75 _result: &AgentActionResult,
76 ) {
77 let mut progress = self.progress.lock_mut_safe();
78 if let Some(entry) = progress.get_mut(self.index) {
79 entry.tool_uses += 1;
80 }
81 }
82
83 fn on_error(&mut self, error: &str) {
84 warn!(subagent_index = self.index, "Subagent error: {}", error);
85 }
86
87 fn on_generation_start(&mut self) {}
88
89 fn on_generation_complete(&mut self, tokens: usize) {
90 let mut progress = self.progress.lock_mut_safe();
91 if let Some(entry) = progress.get_mut(self.index) {
92 entry.tokens += tokens;
93 }
94 }
95}
96
97async fn run_subagent(
99 model: Arc<RwLock<Box<dyn Model>>>,
100 config: ModelConfig,
101 id: usize,
102 prompt: String,
103 description: String,
104 progress: Arc<Mutex<Vec<SubagentProgress>>>,
105 progress_index: usize,
106) -> SubagentResult {
107 let started_at = Instant::now();
108
109 let system_prompt = config
111 .system_prompt
112 .clone()
113 .unwrap_or_else(prompts::get_system_prompt);
114 let mut messages = vec![
115 ChatMessage::system(system_prompt),
116 ChatMessage::user(prompt),
117 ];
118
119 let response_text = Arc::new(std::sync::Mutex::new(String::new()));
124 let typed_tool_calls = Arc::new(std::sync::Mutex::new(Vec::<ToolCall>::new()));
125 let text_clone = Arc::clone(&response_text);
126 let tool_clone = Arc::clone(&typed_tool_calls);
127 let callback: StreamCallback = Arc::new(move |event| match event {
128 StreamEvent::Text(chunk) => {
129 text_clone.lock_mut_safe().push_str(&chunk);
130 },
131 StreamEvent::ToolCall(tc) => {
132 tool_clone.lock_mut_safe().push(tc);
133 },
134 StreamEvent::Reasoning(_) | StreamEvent::Done { .. } => {},
135 });
136
137 let initial_result = {
138 let model_guard = model.read().await;
139 model_guard.chat(&messages, &config, Some(callback)).await
140 };
141
142 let (content, initial_tool_calls, initial_tokens) = match initial_result {
143 Ok(response) => {
144 let streamed_text = response_text.lock_mut_safe().clone();
145 let content = if !streamed_text.is_empty() {
146 streamed_text
147 } else {
148 response.content.clone()
149 };
150 let tokens = response.usage.map(|u| u.total_tokens).unwrap_or(0);
151 let streamed_tool_calls = std::mem::take(&mut *typed_tool_calls.lock_mut_safe());
152 let tool_calls = if !streamed_tool_calls.is_empty() {
153 streamed_tool_calls
154 } else {
155 response.tool_calls.unwrap_or_default()
156 };
157
158 {
159 let mut prog = progress.lock_mut_safe();
160 if let Some(entry) = prog.get_mut(progress_index) {
161 entry.tokens += tokens;
162 }
163 }
164
165 (content, tool_calls, tokens)
166 },
167 Err(e) => {
168 let error_msg = e.to_string();
169 {
170 let mut prog = progress.lock_mut_safe();
171 if let Some(entry) = prog.get_mut(progress_index) {
172 entry.status = SubagentStatus::Failed(error_msg.clone());
173 }
174 }
175 return SubagentResult {
176 id,
177 description,
178 response: error_msg,
179 tool_uses: 0,
180 tokens: 0,
181 duration_secs: started_at.elapsed().as_secs_f64(),
182 success: false,
183 };
184 },
185 };
186
187 if initial_tool_calls.is_empty() {
189 {
190 let mut prog = progress.lock_mut_safe();
191 if let Some(entry) = prog.get_mut(progress_index) {
192 entry.status = SubagentStatus::Completed;
193 }
194 }
195 return SubagentResult {
196 id,
197 description,
198 response: content,
199 tool_uses: 0,
200 tokens: initial_tokens,
201 duration_secs: started_at.elapsed().as_secs_f64(),
202 success: true,
203 };
204 }
205
206 let assistant_msg =
208 ChatMessage::assistant(content.clone()).with_tool_calls(initial_tool_calls.clone());
209 messages.push(assistant_msg);
210
211 let mut observer = SubagentObserver {
212 progress: Arc::clone(&progress),
213 index: progress_index,
214 };
215
216 let loop_result = agent_loop::run_agent_loop(
217 Arc::clone(&model),
218 &config,
219 &mut messages,
220 initial_tool_calls,
221 &mut observer,
222 MAX_AGENT_ITERATIONS,
223 )
224 .await;
225
226 match loop_result {
227 Ok(result) => {
228 let total_tokens = initial_tokens + result.total_tokens;
229 let total_tool_uses = result.tool_results.len();
230 let final_response = if result.final_response.is_empty() {
231 content
232 } else {
233 result.final_response
234 };
235
236 {
237 let mut prog = progress.lock_mut_safe();
238 if let Some(entry) = prog.get_mut(progress_index) {
239 entry.status = SubagentStatus::Completed;
240 entry.tokens = total_tokens;
241 entry.tool_uses = total_tool_uses;
242 }
243 }
244
245 SubagentResult {
246 id,
247 description,
248 response: final_response,
249 tool_uses: total_tool_uses,
250 tokens: total_tokens,
251 duration_secs: started_at.elapsed().as_secs_f64(),
252 success: !result.interrupted,
253 }
254 },
255 Err(e) => {
256 let error_msg = e.to_string();
257 let (tool_uses, tokens) = {
258 let prog = progress.lock_mut_safe();
259 prog.get(progress_index)
260 .map(|p| (p.tool_uses, p.tokens))
261 .unwrap_or((0, initial_tokens))
262 };
263
264 {
265 let mut prog = progress.lock_mut_safe();
266 if let Some(entry) = prog.get_mut(progress_index) {
267 entry.status = SubagentStatus::Failed(error_msg.clone());
268 }
269 }
270
271 SubagentResult {
272 id,
273 description,
274 response: error_msg,
275 tool_uses,
276 tokens,
277 duration_secs: started_at.elapsed().as_secs_f64(),
278 success: false,
279 }
280 },
281 }
282}
283
284pub fn spawn_subagents(
292 agents: Vec<(String, String)>,
293 model: Arc<RwLock<Box<dyn Model>>>,
294 config: &ModelConfig,
295 progress: Arc<Mutex<Vec<SubagentProgress>>>,
296) -> (Vec<JoinHandle<SubagentResult>>, Vec<SubagentResult>) {
297 let mut handles = Vec::new();
298 let mut overflow_results = Vec::new();
299
300 {
302 let mut prog = progress.lock_mut_safe();
303 for (i, (_prompt, description)) in agents.iter().enumerate() {
304 if i < MAX_CONCURRENT_AGENTS {
305 prog.push(SubagentProgress {
306 id: i,
307 description: description.clone(),
308 status: SubagentStatus::Running,
309 tool_uses: 0,
310 tokens: 0,
311 started_at: Instant::now(),
312 });
313 }
314 }
315 }
316
317 for (i, (prompt, description)) in agents.into_iter().enumerate() {
318 if i >= MAX_CONCURRENT_AGENTS {
319 warn!(
320 "Exceeded MAX_CONCURRENT_AGENTS ({}), skipping agent: {}",
321 MAX_CONCURRENT_AGENTS, description
322 );
323 overflow_results.push(SubagentResult {
324 id: i,
325 description,
326 response: format!(
327 "Exceeded maximum of {} concurrent agents. This agent was not spawned.",
328 MAX_CONCURRENT_AGENTS
329 ),
330 tool_uses: 0,
331 tokens: 0,
332 duration_secs: 0.0,
333 success: false,
334 });
335 continue;
336 }
337
338 let mut subagent_config = config.clone();
342 subagent_config.is_subagent = true;
343 subagent_config.reasoning = ReasoningLevel::None;
344
345 let model_clone = Arc::clone(&model);
346 let progress_clone = Arc::clone(&progress);
347
348 info!(agent_id = i, description = %description, "Spawning subagent");
349
350 let handle = tokio::spawn(async move {
351 run_subagent(
352 model_clone,
353 subagent_config,
354 i,
355 prompt,
356 description,
357 progress_clone,
358 i,
359 )
360 .await
361 });
362
363 handles.push(handle);
364 }
365
366 (handles, overflow_results)
367}
368
369pub async fn collect_subagent_results(
372 handles: Vec<JoinHandle<SubagentResult>>,
373 mut overflow_results: Vec<SubagentResult>,
374) -> Vec<SubagentResult> {
375 let mut results = Vec::with_capacity(handles.len() + overflow_results.len());
376
377 for (i, handle) in handles.into_iter().enumerate() {
378 match handle.await {
379 Ok(result) => results.push(result),
380 Err(e) => {
381 warn!("Subagent task failed: {}", e);
382 results.push(SubagentResult {
383 id: i,
384 description: "Unknown".to_string(),
385 response: format!("Agent task failed: {}", e),
386 tool_uses: 0,
387 tokens: 0,
388 duration_secs: 0.0,
389 success: false,
390 });
391 },
392 }
393 }
394
395 results.append(&mut overflow_results);
396 results.sort_by_key(|r| r.id);
397 results
398}
399
400pub fn format_subagent_tool_result(result: &SubagentResult) -> String {
402 if result.success {
403 format!(
404 "Agent '{}' completed successfully ({} tool uses, {} tokens, {:.1}s):\n\n{}",
405 result.description,
406 result.tool_uses,
407 result.tokens,
408 result.duration_secs,
409 result.response
410 )
411 } else {
412 format!(
413 "Agent '{}' failed: {} ({} tool uses, {} tokens, {:.1}s)",
414 result.description,
415 result.response,
416 result.tool_uses,
417 result.tokens,
418 result.duration_secs
419 )
420 }
421}