Skip to main content

mermaid_cli/agents/
subagent.rs

1//! Subagent execution engine
2//!
3//! Spawns autonomous sub-agents that run in parallel, each with their own
4//! conversation context and full tool access (minus `agent`).
5//! Progress is tracked via shared state for live UI rendering.
6
7use std::sync::{Arc, Mutex};
8use std::time::Instant;
9
10use tokio::sync::RwLock;
11use tokio::task::JoinHandle;
12use tracing::{info, warn};
13
14use crate::agents::{ActionResult as AgentActionResult, AgentAction};
15use crate::constants::MAX_CONCURRENT_AGENTS;
16use crate::models::{ChatMessage, Model, ModelConfig, StreamCallback};
17use crate::prompts;
18use crate::runtime::agent_loop::{self, AgentObserver, LoopControl, MAX_AGENT_ITERATIONS};
19use crate::utils::MutexExt;
20
21/// Progress state for a single running subagent.
22/// Written by the subagent task, read by the UI render loop.
23#[derive(Debug, Clone)]
24pub struct SubagentProgress {
25    pub id: usize,
26    pub description: String,
27    pub status: SubagentStatus,
28    pub tool_uses: usize,
29    pub tokens: usize,
30    pub started_at: Instant,
31}
32
33/// Status of a running subagent
34#[derive(Debug, Clone)]
35pub enum SubagentStatus {
36    Running,
37    Completed,
38    Failed(String),
39}
40
41/// Final result returned when a subagent finishes
42#[derive(Debug, Clone)]
43pub struct SubagentResult {
44    pub id: usize,
45    pub description: String,
46    pub response: String,
47    pub tool_uses: usize,
48    pub tokens: usize,
49    pub duration_secs: f64,
50    pub success: bool,
51}
52
53/// Observer that bridges the shared agent loop to subagent progress state.
54struct SubagentObserver {
55    progress: Arc<Mutex<Vec<SubagentProgress>>>,
56    index: usize,
57}
58
59impl AgentObserver for SubagentObserver {
60    fn check_interrupt(&mut self) -> LoopControl {
61        // Subagents don't get interrupted individually.
62        // Parent abort (via JoinHandle::abort()) is the cancellation mechanism.
63        LoopControl::Continue
64    }
65
66    fn on_status(&mut self, _message: &str) {}
67
68    fn on_tool_result(
69        &mut self,
70        _tool_name: &str,
71        _tool_call_id: &str,
72        _action: &AgentAction,
73        _result: &AgentActionResult,
74    ) {
75        let mut progress = self.progress.lock_mut_safe();
76        if let Some(entry) = progress.get_mut(self.index) {
77            entry.tool_uses += 1;
78        }
79    }
80
81    fn on_error(&mut self, error: &str) {
82        warn!(subagent_index = self.index, "Subagent error: {}", error);
83    }
84
85    fn on_generation_start(&mut self) {}
86
87    fn on_generation_complete(&mut self, tokens: usize) {
88        let mut progress = self.progress.lock_mut_safe();
89        if let Some(entry) = progress.get_mut(self.index) {
90            entry.tokens += tokens;
91        }
92    }
93}
94
95/// Run a single subagent to completion.
96async fn run_subagent(
97    model: Arc<RwLock<Box<dyn Model>>>,
98    config: ModelConfig,
99    id: usize,
100    prompt: String,
101    description: String,
102    progress: Arc<Mutex<Vec<SubagentProgress>>>,
103    progress_index: usize,
104) -> SubagentResult {
105    let started_at = Instant::now();
106
107    // Build fresh message history for this subagent
108    let system_prompt = config
109        .system_prompt
110        .clone()
111        .unwrap_or_else(prompts::get_system_prompt);
112    let mut messages = vec![
113        ChatMessage::system(system_prompt),
114        ChatMessage::user(prompt),
115    ];
116
117    // First model call
118    let response_text = Arc::new(std::sync::Mutex::new(String::new()));
119    let response_clone = Arc::clone(&response_text);
120    let callback: StreamCallback = Arc::new(move |chunk: &str| {
121        let mut resp = response_clone.lock_mut_safe();
122        resp.push_str(chunk);
123    });
124
125    let initial_result = {
126        let model_guard = model.read().await;
127        model_guard.chat(&messages, &config, Some(callback)).await
128    };
129
130    let (content, initial_tool_calls, initial_tokens) = match initial_result {
131        Ok(response) => {
132            let callback_content = response_text.lock_mut_safe().clone();
133            let content = if !callback_content.is_empty() {
134                callback_content
135            } else {
136                response.content.clone()
137            };
138            let tokens = response.usage.map(|u| u.completion_tokens).unwrap_or(0);
139            let tool_calls = response.tool_calls.unwrap_or_default();
140
141            {
142                let mut prog = progress.lock_mut_safe();
143                if let Some(entry) = prog.get_mut(progress_index) {
144                    entry.tokens += tokens;
145                }
146            }
147
148            (content, tool_calls, tokens)
149        },
150        Err(e) => {
151            let error_msg = e.to_string();
152            {
153                let mut prog = progress.lock_mut_safe();
154                if let Some(entry) = prog.get_mut(progress_index) {
155                    entry.status = SubagentStatus::Failed(error_msg.clone());
156                }
157            }
158            return SubagentResult {
159                id,
160                description,
161                response: error_msg,
162                tool_uses: 0,
163                tokens: 0,
164                duration_secs: started_at.elapsed().as_secs_f64(),
165                success: false,
166            };
167        },
168    };
169
170    // If no tool calls, subagent is done after the first response
171    if initial_tool_calls.is_empty() {
172        {
173            let mut prog = progress.lock_mut_safe();
174            if let Some(entry) = prog.get_mut(progress_index) {
175                entry.status = SubagentStatus::Completed;
176            }
177        }
178        return SubagentResult {
179            id,
180            description,
181            response: content,
182            tool_uses: 0,
183            tokens: initial_tokens,
184            duration_secs: started_at.elapsed().as_secs_f64(),
185            success: true,
186        };
187    }
188
189    // Has tool calls -- enter agent loop
190    let assistant_msg =
191        ChatMessage::assistant(content.clone()).with_tool_calls(initial_tool_calls.clone());
192    messages.push(assistant_msg);
193
194    let mut observer = SubagentObserver {
195        progress: Arc::clone(&progress),
196        index: progress_index,
197    };
198
199    let loop_result = agent_loop::run_agent_loop(
200        Arc::clone(&model),
201        &config,
202        &mut messages,
203        initial_tool_calls,
204        &mut observer,
205        MAX_AGENT_ITERATIONS,
206    )
207    .await;
208
209    match loop_result {
210        Ok(result) => {
211            let total_tokens = initial_tokens + result.total_tokens;
212            let total_tool_uses = result.tool_results.len();
213            let final_response = if result.final_response.is_empty() {
214                content
215            } else {
216                result.final_response
217            };
218
219            {
220                let mut prog = progress.lock_mut_safe();
221                if let Some(entry) = prog.get_mut(progress_index) {
222                    entry.status = SubagentStatus::Completed;
223                    entry.tokens = total_tokens;
224                    entry.tool_uses = total_tool_uses;
225                }
226            }
227
228            SubagentResult {
229                id,
230                description,
231                response: final_response,
232                tool_uses: total_tool_uses,
233                tokens: total_tokens,
234                duration_secs: started_at.elapsed().as_secs_f64(),
235                success: !result.interrupted,
236            }
237        },
238        Err(e) => {
239            let error_msg = e.to_string();
240            let (tool_uses, tokens) = {
241                let prog = progress.lock_mut_safe();
242                prog.get(progress_index)
243                    .map(|p| (p.tool_uses, p.tokens))
244                    .unwrap_or((0, initial_tokens))
245            };
246
247            {
248                let mut prog = progress.lock_mut_safe();
249                if let Some(entry) = prog.get_mut(progress_index) {
250                    entry.status = SubagentStatus::Failed(error_msg.clone());
251                }
252            }
253
254            SubagentResult {
255                id,
256                description,
257                response: error_msg,
258                tool_uses,
259                tokens,
260                duration_secs: started_at.elapsed().as_secs_f64(),
261                success: false,
262            }
263        },
264    }
265}
266
267/// Spawn multiple subagents in parallel.
268///
269/// Returns JoinHandles so the caller can decide how to wait:
270/// - TUI: polls `is_finished()` with `render_and_check_interrupt` between checks
271/// - Non-interactive: `join_all` directly
272///
273/// Agents beyond `MAX_CONCURRENT_AGENTS` are returned as immediate failed results.
274pub fn spawn_subagents(
275    agents: Vec<(String, String)>,
276    model: Arc<RwLock<Box<dyn Model>>>,
277    config: &ModelConfig,
278    progress: Arc<Mutex<Vec<SubagentProgress>>>,
279) -> (Vec<JoinHandle<SubagentResult>>, Vec<SubagentResult>) {
280    let mut handles = Vec::new();
281    let mut overflow_results = Vec::new();
282
283    // Initialize progress entries
284    {
285        let mut prog = progress.lock_mut_safe();
286        for (i, (_prompt, description)) in agents.iter().enumerate() {
287            if i < MAX_CONCURRENT_AGENTS {
288                prog.push(SubagentProgress {
289                    id: i,
290                    description: description.clone(),
291                    status: SubagentStatus::Running,
292                    tool_uses: 0,
293                    tokens: 0,
294                    started_at: Instant::now(),
295                });
296            }
297        }
298    }
299
300    for (i, (prompt, description)) in agents.into_iter().enumerate() {
301        if i >= MAX_CONCURRENT_AGENTS {
302            warn!(
303                "Exceeded MAX_CONCURRENT_AGENTS ({}), skipping agent: {}",
304                MAX_CONCURRENT_AGENTS, description
305            );
306            overflow_results.push(SubagentResult {
307                id: i,
308                description,
309                response: format!(
310                    "Exceeded maximum of {} concurrent agents. This agent was not spawned.",
311                    MAX_CONCURRENT_AGENTS
312                ),
313                tool_uses: 0,
314                tokens: 0,
315                duration_secs: 0.0,
316                success: false,
317            });
318            continue;
319        }
320
321        // Build subagent-specific config
322        let mut subagent_config = config.clone();
323        subagent_config.is_subagent = true;
324        subagent_config.thinking_enabled = Some(false);
325
326        let model_clone = Arc::clone(&model);
327        let progress_clone = Arc::clone(&progress);
328
329        info!(agent_id = i, description = %description, "Spawning subagent");
330
331        let handle = tokio::spawn(async move {
332            run_subagent(
333                model_clone,
334                subagent_config,
335                i,
336                prompt,
337                description,
338                progress_clone,
339                i,
340            )
341            .await
342        });
343
344        handles.push(handle);
345    }
346
347    (handles, overflow_results)
348}
349
350/// Collect results from completed subagent handles.
351/// Handles panicked/cancelled tasks gracefully.
352pub async fn collect_subagent_results(
353    handles: Vec<JoinHandle<SubagentResult>>,
354    mut overflow_results: Vec<SubagentResult>,
355) -> Vec<SubagentResult> {
356    let mut results = Vec::with_capacity(handles.len() + overflow_results.len());
357
358    for handle in handles {
359        match handle.await {
360            Ok(result) => results.push(result),
361            Err(e) => {
362                warn!("Subagent task failed: {}", e);
363                results.push(SubagentResult {
364                    id: 0,
365                    description: "Unknown".to_string(),
366                    response: format!("Agent task failed: {}", e),
367                    tool_uses: 0,
368                    tokens: 0,
369                    duration_secs: 0.0,
370                    success: false,
371                });
372            },
373        }
374    }
375
376    results.append(&mut overflow_results);
377    results.sort_by_key(|r| r.id);
378    results
379}
380
381/// Format a SubagentResult into a tool result message string for the parent model.
382pub fn format_subagent_tool_result(result: &SubagentResult) -> String {
383    if result.success {
384        format!(
385            "Agent '{}' completed successfully ({} tool uses, {} tokens, {:.1}s):\n\n{}",
386            result.description, result.tool_uses, result.tokens, result.duration_secs,
387            result.response
388        )
389    } else {
390        format!(
391            "Agent '{}' failed: {} ({} tool uses, {} tokens, {:.1}s)",
392            result.description, result.response, result.tool_uses, result.tokens,
393            result.duration_secs
394        )
395    }
396}