Skip to main content

mermaid_cli/agents/
subagent.rs

1//! Subagent execution engine
2//!
3//! Spawns autonomous sub-agents that run in parallel, each with their own
4//! conversation context and full tool access (minus `agent`).
5//! Progress is tracked via shared state for live UI rendering.
6
7use std::sync::{Arc, Mutex};
8use std::time::Instant;
9
10use tokio::sync::RwLock;
11use tokio::task::JoinHandle;
12use tracing::{info, warn};
13
14use crate::agents::{ActionResult as AgentActionResult, AgentAction};
15use crate::constants::MAX_CONCURRENT_AGENTS;
16use crate::models::{
17    ChatMessage, Model, ModelConfig, ReasoningLevel, StreamCallback, StreamEvent, ToolCall,
18};
19use crate::prompts;
20use crate::runtime::agent_loop::{self, AgentObserver, LoopControl, MAX_AGENT_ITERATIONS};
21use crate::utils::MutexExt;
22
23/// Progress state for a single running subagent.
24/// Written by the subagent task, read by the UI render loop.
25#[derive(Debug, Clone)]
26pub struct SubagentProgress {
27    pub id: usize,
28    pub description: String,
29    pub status: SubagentStatus,
30    pub tool_uses: usize,
31    pub tokens: usize,
32    pub started_at: Instant,
33}
34
35/// Status of a running subagent
36#[derive(Debug, Clone)]
37pub enum SubagentStatus {
38    Running,
39    Completed,
40    Failed(String),
41}
42
43/// Final result returned when a subagent finishes
44#[derive(Debug, Clone)]
45pub struct SubagentResult {
46    pub id: usize,
47    pub description: String,
48    pub response: String,
49    pub tool_uses: usize,
50    pub tokens: usize,
51    pub duration_secs: f64,
52    pub success: bool,
53}
54
55/// Observer that bridges the shared agent loop to subagent progress state.
56struct SubagentObserver {
57    progress: Arc<Mutex<Vec<SubagentProgress>>>,
58    index: usize,
59}
60
61impl AgentObserver for SubagentObserver {
62    fn check_interrupt(&mut self) -> LoopControl {
63        // Subagents don't get interrupted individually.
64        // Parent abort (via JoinHandle::abort()) is the cancellation mechanism.
65        LoopControl::Continue
66    }
67
68    fn on_status(&mut self, _message: &str) {}
69
70    fn on_tool_result(
71        &mut self,
72        _tool_name: &str,
73        _tool_call_id: &str,
74        _action: &AgentAction,
75        _result: &AgentActionResult,
76    ) {
77        let mut progress = self.progress.lock_mut_safe();
78        if let Some(entry) = progress.get_mut(self.index) {
79            entry.tool_uses += 1;
80        }
81    }
82
83    fn on_error(&mut self, error: &str) {
84        warn!(subagent_index = self.index, "Subagent error: {}", error);
85    }
86
87    fn on_generation_start(&mut self) {}
88
89    fn on_generation_complete(&mut self, tokens: usize) {
90        let mut progress = self.progress.lock_mut_safe();
91        if let Some(entry) = progress.get_mut(self.index) {
92            entry.tokens += tokens;
93        }
94    }
95}
96
97/// Run a single subagent to completion.
98async fn run_subagent(
99    model: Arc<RwLock<Box<dyn Model>>>,
100    config: ModelConfig,
101    id: usize,
102    prompt: String,
103    description: String,
104    progress: Arc<Mutex<Vec<SubagentProgress>>>,
105    progress_index: usize,
106) -> SubagentResult {
107    let started_at = Instant::now();
108
109    // Build fresh message history for this subagent
110    let system_prompt = config
111        .system_prompt
112        .clone()
113        .unwrap_or_else(prompts::get_system_prompt);
114    let mut messages = vec![
115        ChatMessage::system(system_prompt),
116        ChatMessage::user(prompt),
117    ];
118
119    // First model call. Subagents don't surface reasoning to the parent
120    // (the parent just sees the final summary), so we drop Reasoning
121    // chunks. Text + tool calls feed the agent loop the same way the
122    // legacy text accumulator did.
123    let response_text = Arc::new(std::sync::Mutex::new(String::new()));
124    let typed_tool_calls = Arc::new(std::sync::Mutex::new(Vec::<ToolCall>::new()));
125    let text_clone = Arc::clone(&response_text);
126    let tool_clone = Arc::clone(&typed_tool_calls);
127    let callback: StreamCallback = Arc::new(move |event| match event {
128        StreamEvent::Text(chunk) => {
129            text_clone.lock_mut_safe().push_str(&chunk);
130        },
131        StreamEvent::ToolCall(tc) => {
132            tool_clone.lock_mut_safe().push(tc);
133        },
134        StreamEvent::Reasoning(_) | StreamEvent::Done { .. } => {},
135    });
136
137    let initial_result = {
138        let model_guard = model.read().await;
139        model_guard.chat(&messages, &config, Some(callback)).await
140    };
141
142    let (content, initial_tool_calls, initial_tokens) = match initial_result {
143        Ok(response) => {
144            let streamed_text = response_text.lock_mut_safe().clone();
145            let content = if !streamed_text.is_empty() {
146                streamed_text
147            } else {
148                response.content.clone()
149            };
150            let tokens = response.usage.map(|u| u.total_tokens).unwrap_or(0);
151            let streamed_tool_calls = std::mem::take(&mut *typed_tool_calls.lock_mut_safe());
152            let tool_calls = if !streamed_tool_calls.is_empty() {
153                streamed_tool_calls
154            } else {
155                response.tool_calls.unwrap_or_default()
156            };
157
158            {
159                let mut prog = progress.lock_mut_safe();
160                if let Some(entry) = prog.get_mut(progress_index) {
161                    entry.tokens += tokens;
162                }
163            }
164
165            (content, tool_calls, tokens)
166        },
167        Err(e) => {
168            let error_msg = e.to_string();
169            {
170                let mut prog = progress.lock_mut_safe();
171                if let Some(entry) = prog.get_mut(progress_index) {
172                    entry.status = SubagentStatus::Failed(error_msg.clone());
173                }
174            }
175            return SubagentResult {
176                id,
177                description,
178                response: error_msg,
179                tool_uses: 0,
180                tokens: 0,
181                duration_secs: started_at.elapsed().as_secs_f64(),
182                success: false,
183            };
184        },
185    };
186
187    // If no tool calls, subagent is done after the first response
188    if initial_tool_calls.is_empty() {
189        {
190            let mut prog = progress.lock_mut_safe();
191            if let Some(entry) = prog.get_mut(progress_index) {
192                entry.status = SubagentStatus::Completed;
193            }
194        }
195        return SubagentResult {
196            id,
197            description,
198            response: content,
199            tool_uses: 0,
200            tokens: initial_tokens,
201            duration_secs: started_at.elapsed().as_secs_f64(),
202            success: true,
203        };
204    }
205
206    // Has tool calls -- enter agent loop
207    let assistant_msg =
208        ChatMessage::assistant(content.clone()).with_tool_calls(initial_tool_calls.clone());
209    messages.push(assistant_msg);
210
211    let mut observer = SubagentObserver {
212        progress: Arc::clone(&progress),
213        index: progress_index,
214    };
215
216    let loop_result = agent_loop::run_agent_loop(
217        Arc::clone(&model),
218        &config,
219        &mut messages,
220        initial_tool_calls,
221        &mut observer,
222        MAX_AGENT_ITERATIONS,
223    )
224    .await;
225
226    match loop_result {
227        Ok(result) => {
228            let total_tokens = initial_tokens + result.total_tokens;
229            let total_tool_uses = result.tool_results.len();
230            let final_response = if result.final_response.is_empty() {
231                content
232            } else {
233                result.final_response
234            };
235
236            {
237                let mut prog = progress.lock_mut_safe();
238                if let Some(entry) = prog.get_mut(progress_index) {
239                    entry.status = SubagentStatus::Completed;
240                    entry.tokens = total_tokens;
241                    entry.tool_uses = total_tool_uses;
242                }
243            }
244
245            SubagentResult {
246                id,
247                description,
248                response: final_response,
249                tool_uses: total_tool_uses,
250                tokens: total_tokens,
251                duration_secs: started_at.elapsed().as_secs_f64(),
252                success: !result.interrupted,
253            }
254        },
255        Err(e) => {
256            let error_msg = e.to_string();
257            let (tool_uses, tokens) = {
258                let prog = progress.lock_mut_safe();
259                prog.get(progress_index)
260                    .map(|p| (p.tool_uses, p.tokens))
261                    .unwrap_or((0, initial_tokens))
262            };
263
264            {
265                let mut prog = progress.lock_mut_safe();
266                if let Some(entry) = prog.get_mut(progress_index) {
267                    entry.status = SubagentStatus::Failed(error_msg.clone());
268                }
269            }
270
271            SubagentResult {
272                id,
273                description,
274                response: error_msg,
275                tool_uses,
276                tokens,
277                duration_secs: started_at.elapsed().as_secs_f64(),
278                success: false,
279            }
280        },
281    }
282}
283
284/// Spawn multiple subagents in parallel.
285///
286/// Returns JoinHandles so the caller can decide how to wait:
287/// - TUI: polls `is_finished()` with `render_and_check_interrupt` between checks
288/// - Non-interactive: `join_all` directly
289///
290/// Agents beyond `MAX_CONCURRENT_AGENTS` are returned as immediate failed results.
291pub fn spawn_subagents(
292    agents: Vec<(String, String)>,
293    model: Arc<RwLock<Box<dyn Model>>>,
294    config: &ModelConfig,
295    progress: Arc<Mutex<Vec<SubagentProgress>>>,
296) -> (Vec<JoinHandle<SubagentResult>>, Vec<SubagentResult>) {
297    let mut handles = Vec::new();
298    let mut overflow_results = Vec::new();
299
300    // Initialize progress entries
301    {
302        let mut prog = progress.lock_mut_safe();
303        for (i, (_prompt, description)) in agents.iter().enumerate() {
304            if i < MAX_CONCURRENT_AGENTS {
305                prog.push(SubagentProgress {
306                    id: i,
307                    description: description.clone(),
308                    status: SubagentStatus::Running,
309                    tool_uses: 0,
310                    tokens: 0,
311                    started_at: Instant::now(),
312                });
313            }
314        }
315    }
316
317    for (i, (prompt, description)) in agents.into_iter().enumerate() {
318        if i >= MAX_CONCURRENT_AGENTS {
319            warn!(
320                "Exceeded MAX_CONCURRENT_AGENTS ({}), skipping agent: {}",
321                MAX_CONCURRENT_AGENTS, description
322            );
323            overflow_results.push(SubagentResult {
324                id: i,
325                description,
326                response: format!(
327                    "Exceeded maximum of {} concurrent agents. This agent was not spawned.",
328                    MAX_CONCURRENT_AGENTS
329                ),
330                tool_uses: 0,
331                tokens: 0,
332                duration_secs: 0.0,
333                success: false,
334            });
335            continue;
336        }
337
338        // Build subagent-specific config. Subagents skip reasoning to
339        // keep them fast and focused — the orchestrating turn already
340        // did the planning.
341        let mut subagent_config = config.clone();
342        subagent_config.is_subagent = true;
343        subagent_config.reasoning = ReasoningLevel::None;
344
345        let model_clone = Arc::clone(&model);
346        let progress_clone = Arc::clone(&progress);
347
348        info!(agent_id = i, description = %description, "Spawning subagent");
349
350        let handle = tokio::spawn(async move {
351            run_subagent(
352                model_clone,
353                subagent_config,
354                i,
355                prompt,
356                description,
357                progress_clone,
358                i,
359            )
360            .await
361        });
362
363        handles.push(handle);
364    }
365
366    (handles, overflow_results)
367}
368
369/// Collect results from completed subagent handles.
370/// Handles panicked/cancelled tasks gracefully.
371pub async fn collect_subagent_results(
372    handles: Vec<JoinHandle<SubagentResult>>,
373    mut overflow_results: Vec<SubagentResult>,
374) -> Vec<SubagentResult> {
375    let mut results = Vec::with_capacity(handles.len() + overflow_results.len());
376
377    for (i, handle) in handles.into_iter().enumerate() {
378        match handle.await {
379            Ok(result) => results.push(result),
380            Err(e) => {
381                warn!("Subagent task failed: {}", e);
382                results.push(SubagentResult {
383                    id: i,
384                    description: "Unknown".to_string(),
385                    response: format!("Agent task failed: {}", e),
386                    tool_uses: 0,
387                    tokens: 0,
388                    duration_secs: 0.0,
389                    success: false,
390                });
391            },
392        }
393    }
394
395    results.append(&mut overflow_results);
396    results.sort_by_key(|r| r.id);
397    results
398}
399
400/// Format a SubagentResult into a tool result message string for the parent model.
401pub fn format_subagent_tool_result(result: &SubagentResult) -> String {
402    if result.success {
403        format!(
404            "Agent '{}' completed successfully ({} tool uses, {} tokens, {:.1}s):\n\n{}",
405            result.description,
406            result.tool_uses,
407            result.tokens,
408            result.duration_secs,
409            result.response
410        )
411    } else {
412        format!(
413            "Agent '{}' failed: {} ({} tool uses, {} tokens, {:.1}s)",
414            result.description,
415            result.response,
416            result.tool_uses,
417            result.tokens,
418            result.duration_secs
419        )
420    }
421}