Skip to main content

agent_engine/tools/subagent/
oneshot.rs

1use serde_json::{json, Value};
2use std::sync::atomic::Ordering;
3use std::time::Duration;
4use crate::{Result, RuntimeError, LlmEvent, SessionEvent, AgentEvent};
5use super::super::{Tool, ToolContext, resolve_agent_prompt, NEXT_SUBAGENT_ID};
6pub use crate::runtime::subagent::SubagentResult;
7
8pub struct SubagentTool;
9
10#[async_trait::async_trait]
11impl Tool for SubagentTool {
12    fn name(&self) -> &str { "subagent" }
13
14    fn description(&self) -> &str {
15        "Dispatch a one-shot subagent with a specific system prompt to perform a task. The subagent gets its own tool suite (bash, read, write, edit, grep, find, ls) and runs autonomously until done. Use this when you need the result before continuing. Blocks until done. For parallel work, use subagent_start instead. Provide either an agent name (resolves from ~/.synaps-cli/agents/<name>.md) or a system_prompt string directly."
16    }
17
18    fn parameters(&self) -> Value {
19        json!({
20            "type": "object",
21            "properties": {
22                "agent": {
23                    "type": "string",
24                    "description": "Agent name — resolves to ~/.synaps-cli/agents/<name>.md. Mutually exclusive with system_prompt."
25                },
26                "system_prompt": {
27                    "type": "string",
28                    "description": "Inline system prompt for the subagent. Use when you don't have a named agent file."
29                },
30                "task": {
31                    "type": "string",
32                    "description": "The task/prompt to send to the subagent."
33                },
34                "model": {
35                    "type": "string",
36                    "description": "Model override (default: claude-sonnet-4-6). Use claude-opus-4-7 for complex tasks."
37                },
38                "timeout": {
39                    "type": "integer",
40                    "description": "Timeout in seconds (default: 300). Increase for long-running tasks."
41                }
42            },
43            "required": ["task"]
44        })
45    }
46
47    async fn execute(&self, params: Value, ctx: ToolContext) -> Result<String> {
48        let task = params["task"].as_str()
49            .ok_or_else(|| RuntimeError::Tool("Missing 'task' parameter".to_string()))?
50            .to_string();
51
52        // Treat blank / whitespace / control-char strings as absent — see
53        // subagent_start.rs for full rationale. Models occasionally pass `agent: ""`
54        // (or "\u{0}", or " ") alongside a real `system_prompt`; without this filter
55        // we'd try to resolve an empty agent name and fail, instead of falling
56        // through to the inline prompt.
57        let is_blank = |s: &String| s.chars().all(|c| c.is_whitespace() || c.is_control());
58        let agent_name = params["agent"]
59            .as_str()
60            .map(|s| s.to_string())
61            .filter(|s| !is_blank(s));
62        let inline_prompt = params["system_prompt"]
63            .as_str()
64            .map(|s| s.to_string())
65            .filter(|s| !is_blank(s));
66        let model_override = params["model"].as_str().map(|s| s.to_string());
67        let timeout_secs = params["timeout"].as_u64().unwrap_or(ctx.limits.subagent_timeout);
68
69        let system_prompt = match (&agent_name, &inline_prompt) {
70            (Some(name), _) => {
71                resolve_agent_prompt(name)
72                    .map_err(RuntimeError::Tool)?
73            }
74            (None, Some(prompt)) => prompt.clone(),
75            (None, None) => {
76                return Err(RuntimeError::Tool(
77                    "Must provide either 'agent' (name) or 'system_prompt' (inline). Got neither.".to_string()
78                ));
79            }
80        };
81
82        let label = agent_name.as_deref().unwrap_or("inline").to_string();
83        let model = model_override.unwrap_or_else(|| crate::models::default_model().to_string());
84        let task_preview: String = task.chars().take(80).collect();
85        let subagent_id = NEXT_SUBAGENT_ID.fetch_add(1, Ordering::Relaxed);
86
87        tracing::info!("Dispatching subagent '{}' (id={}) with model {}", label, subagent_id, model);
88
89        if let Some(ref tx) = ctx.channels.tx_events {
90            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentStart {
91                subagent_id,
92                agent_name: label.clone(),
93                task_preview: task_preview.clone(),
94            }));
95        }
96
97        let start_time = std::time::Instant::now();
98
99        let (result_tx, result_rx) = tokio::sync::oneshot::channel::<std::result::Result<SubagentResult, String>>();
100        let label_inner = label.clone();
101        let model_inner = model.clone();
102        let tx_events_inner = ctx.channels.tx_events.clone();
103
104        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
105
106        let _thread_handle = std::thread::spawn(move || {
107            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
108                let rt = match tokio::runtime::Builder::new_current_thread()
109                    .enable_all()
110                    .build()
111                {
112                    Ok(rt) => rt,
113                    Err(e) => {
114                        let _ = result_tx.send(Err(format!("Failed to create tokio runtime: {}", e)));
115                        return;
116                    }
117                };
118
119                let result = rt.block_on(async move {
120                    use futures::StreamExt;
121
122                    let mut runtime = match crate::Runtime::new().await {
123                        Ok(r) => r,
124                        Err(e) => return Err(format!("Failed to create subagent runtime: {}", e)),
125                    };
126
127                    runtime.set_system_prompt(system_prompt);
128                    runtime.set_model(model);
129                    runtime.set_tools(crate::ToolRegistry::without_subagent());
130
131                    let cancel = crate::CancellationToken::new();
132                    let cancel_inner = cancel.clone();
133
134                    tokio::spawn(async move {
135                        let _ = shutdown_rx.await;
136                        cancel_inner.cancel();
137                    });
138
139                    let mut stream = runtime.run_stream(task, cancel).await;
140
141                let mut final_text = String::new();
142                let mut tool_count = 0u32;
143                let mut tool_log: Vec<String> = Vec::new();
144                let mut total_input_tokens = 0u64;
145                let mut total_output_tokens = 0u64;
146                let mut total_cache_read = 0u64;
147                let mut total_cache_creation = 0u64;
148                // TTL split: None only if no turn ever reported one; otherwise summed.
149                let mut total_cache_5m: Option<u64> = None;
150                let mut total_cache_1h: Option<u64> = None;
151
152                let timeout_fut = tokio::time::sleep(Duration::from_secs(timeout_secs));
153                tokio::pin!(timeout_fut);
154
155                loop {
156                    tokio::select! {
157                        event = stream.next() => {
158                            let Some(event) = event else { break };
159                            match event {
160                                crate::StreamEvent::Llm(LlmEvent::Thinking(_)) => {
161                                    if let Some(ref tx) = tx_events_inner {
162                                        let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
163                                            subagent_id,
164                                            agent_name: label_inner.clone(),
165                                            status: "💭 thinking...".to_string(),
166                                        }));
167                                    }
168                                }
169                                crate::StreamEvent::Llm(LlmEvent::Text(text)) => {
170                                    final_text.push_str(&text);
171                                }
172                                crate::StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name: name, .. }) => {
173                                    tool_count += 1;
174                                    if let Some(ref tx) = tx_events_inner {
175                                        let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
176                                            subagent_id,
177                                            agent_name: label_inner.clone(),
178                                            status: format!("⚙ {} (tool #{})", name, tool_count),
179                                        }));
180                                    }
181                                }
182                                crate::StreamEvent::Llm(LlmEvent::ToolUse { tool_name, input, .. }) => {
183                                    let input_str = input.to_string();
184                                    let input_preview: String = input_str.chars().take(200).collect();
185                                    tool_log.push(format!("[tool_use]: {} — {}", tool_name, input_preview));
186                                    // Build a rich status from the tool input
187                                    let detail = match tool_name.as_str() {
188                                        "bash" => {
189                                            let cmd = input["command"].as_str().unwrap_or("");
190                                            let preview: String = cmd.chars().take(60).collect();
191                                            format!("$ {}", preview)
192                                        }
193                                        "read" => {
194                                            let path = input["path"].as_str().unwrap_or("?");
195                                            let short = path.rsplit('/').next().unwrap_or(path);
196                                            format!("reading {}", short)
197                                        }
198                                        "write" => {
199                                            let path = input["path"].as_str().unwrap_or("?");
200                                            let short = path.rsplit('/').next().unwrap_or(path);
201                                            format!("writing {}", short)
202                                        }
203                                        "edit" => {
204                                            let path = input["path"].as_str().unwrap_or("?");
205                                            let short = path.rsplit('/').next().unwrap_or(path);
206                                            format!("editing {}", short)
207                                        }
208                                        "grep" => {
209                                            let pat = input["pattern"].as_str().unwrap_or("?");
210                                            let preview: String = pat.chars().take(30).collect();
211                                            format!("grep /{}/", preview)
212                                        }
213                                        "find" => {
214                                            let pat = input["pattern"].as_str().unwrap_or("?");
215                                            format!("find {}", pat)
216                                        }
217                                        "ls" => {
218                                            let path = input["path"].as_str().unwrap_or(".");
219                                            let short = path.rsplit('/').next().unwrap_or(path);
220                                            format!("ls {}", short)
221                                        }
222                                        "subagent" => {
223                                            let name = input["agent"].as_str()
224                                                .or_else(|| input["system_prompt"].as_str().map(|s| if s.len() > 20 { "inline" } else { s }))
225                                                .unwrap_or("?");
226                                            format!("spawning {}", name)
227                                        }
228                                        other => {
229                                            // MCP or unknown tools — show tool name + first param
230                                            let short_name = if other.starts_with("ext__") {
231                                                other.splitn(3, "__").last().unwrap_or(other)
232                                            } else {
233                                                other
234                                            };
235                                            short_name.to_string()
236                                        }
237                                    };
238                                    if let Some(ref tx) = tx_events_inner {
239                                        let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
240                                            subagent_id,
241                                            agent_name: label_inner.clone(),
242                                            status: detail,
243                                        }));
244                                    }
245                                }
246                                crate::StreamEvent::Llm(LlmEvent::ToolResult { result, .. }) => {
247                                    let preview: String = result.chars().take(300).collect();
248                                    tool_log.push(format!("[tool_result]: {}", preview));
249                                }
250                                crate::StreamEvent::Session(SessionEvent::Usage {
251                                    input_tokens, output_tokens,
252                                    cache_read_input_tokens, cache_creation_input_tokens,
253                                    cache_creation_5m, cache_creation_1h,
254                                    model: _,
255                                }) => {
256                                    total_input_tokens += input_tokens;
257                                    total_output_tokens += output_tokens;
258                                    total_cache_read += cache_read_input_tokens;
259                                    total_cache_creation += cache_creation_input_tokens;
260                                    crate::core::rpc_dispatch::merge_split(&mut total_cache_5m, cache_creation_5m);
261                                    crate::core::rpc_dispatch::merge_split(&mut total_cache_1h, cache_creation_1h);
262                                }
263                                crate::StreamEvent::Session(SessionEvent::Error(e)) => {
264                                    return Err(e);
265                                }
266                                crate::StreamEvent::Session(SessionEvent::Done) => break,
267                                _ => {}
268                            }
269                        }
270                        _ = &mut timeout_fut => {
271                            // Return partial work instead of just an error
272                            let mut partial = format!("[TIMED OUT after {}s — partial results below]\n\n", timeout_secs);
273                            if !tool_log.is_empty() {
274                                partial.push_str(&tool_log.join("\n"));
275                                partial.push('\n');
276                            }
277                            if !final_text.is_empty() {
278                                partial.push_str("\n[partial response]:\n");
279                                partial.push_str(&final_text);
280                            }
281                            return Ok(SubagentResult {
282                                text: partial,
283                                model: model_inner,
284                                input_tokens: total_input_tokens,
285                                output_tokens: total_output_tokens,
286                                cache_read: total_cache_read,
287                                cache_creation: total_cache_creation,
288                                cache_creation_5m: total_cache_5m,
289                                cache_creation_1h: total_cache_1h,
290                                tool_count,
291                            });
292                        }
293                    }
294                }
295
296                Ok(SubagentResult {
297                    text: final_text,
298                    model: model_inner,
299                    input_tokens: total_input_tokens,
300                    output_tokens: total_output_tokens,
301                    cache_read: total_cache_read,
302                    cache_creation: total_cache_creation,
303                    cache_creation_5m: total_cache_5m,
304                    cache_creation_1h: total_cache_1h,
305                    tool_count,
306                })
307            });
308
309                let _ = result_tx.send(result);
310            }));
311
312            if let Err(panic_info) = result {
313                let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
314                    s.to_string()
315                } else if let Some(s) = panic_info.downcast_ref::<String>() {
316                    s.clone()
317                } else {
318                    "unknown panic".to_string()
319                };
320                tracing::error!("Subagent thread panicked: {}", msg);
321                // result_tx is consumed inside the closure, so we can't send here —
322                // the oneshot receiver will see a RecvError, handled below.
323            }
324        });
325
326        let result = result_rx.await;
327        let elapsed = start_time.elapsed().as_secs_f64();
328
329        drop(shutdown_tx);
330
331        let log_dir = crate::config::base_dir().join("logs").join("subagents");
332        let _ = tokio::fs::create_dir_all(&log_dir).await;
333        let timestamp = chrono::Local::now().format("%Y%m%d-%H%M%S");
334
335        match result {
336            Ok(Ok(sa_result)) => {
337                let preview: String = sa_result.text.chars().take(120).collect();
338
339                if let Some(ref tx) = ctx.channels.tx_events {
340                    let _ = tx.send(crate::StreamEvent::Session(SessionEvent::Usage {
341                        input_tokens: sa_result.input_tokens,
342                        output_tokens: sa_result.output_tokens,
343                        cache_read_input_tokens: sa_result.cache_read,
344                        cache_creation_input_tokens: sa_result.cache_creation,
345                        // TTL split aggregated across the subagent's turns —
346                        // forwarded so the parent bills 1h writes at 2.0x, not 1.25x.
347                        cache_creation_5m: sa_result.cache_creation_5m,
348                        cache_creation_1h: sa_result.cache_creation_1h,
349                        model: Some(sa_result.model),
350                    }));
351                    let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
352                        subagent_id,
353                        agent_name: label.clone(),
354                        result_preview: preview,
355                        duration_secs: elapsed,
356                    }));
357                }
358
359                let log_content = format!(
360                    "# Subagent: {}\nDate: {}\nModel: {}\nTask: {}\nDuration: {:.1}s\nTokens: {}in/{}out ({}cr/{}cw)\nTools used: {}\n\n## Result\n\n{}\n",
361                    label, timestamp, params["model"].as_str().unwrap_or("sonnet"),
362                    task_preview, elapsed,
363                    sa_result.input_tokens, sa_result.output_tokens,
364                    sa_result.cache_read, sa_result.cache_creation,
365                    sa_result.tool_count, sa_result.text,
366                );
367                let log_path = log_dir.join(format!("{}-{}.md", timestamp, label));
368                let _ = tokio::fs::write(&log_path, &log_content).await;
369
370                Ok(format!("[subagent:{}] {}", label, sa_result.text))
371            }
372            Ok(Err(e)) => {
373                if let Some(ref tx) = ctx.channels.tx_events {
374                    let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
375                        subagent_id,
376                        agent_name: label.clone(),
377                        result_preview: format!("ERROR: {}", e),
378                        duration_secs: elapsed,
379                    }));
380                }
381                let log_path = log_dir.join(format!("{}-{}-error.md", timestamp, label));
382                let _ = tokio::fs::write(&log_path, format!("# Subagent ERROR: {}\nTask: {}\nError: {}\n", label, task_preview, e)).await;
383                Ok(format!("[subagent:{} ERROR] {}", label, e))
384            }
385            Err(_) => {
386                if let Some(ref tx) = ctx.channels.tx_events {
387                    let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
388                        subagent_id,
389                        agent_name: label.clone(),
390                        result_preview: "Task panicked or dropped".to_string(),
391                        duration_secs: elapsed,
392                    }));
393                }
394                Ok(format!("[subagent:{} ERROR] Subagent task panicked or was dropped", label))
395            }
396        }
397    }
398}
399
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404    use crate::tools::test_helpers::create_tool_context;
405    use crate::tools::Tool;
406    use serde_json::json;
407
408    #[test]
409    fn test_subagent_tool_schema() {
410        let tool = SubagentTool;
411        assert_eq!(tool.name(), "subagent");
412        assert!(!tool.description().is_empty());
413
414        let params = tool.parameters();
415        assert_eq!(params["type"], "object");
416        assert!(params["properties"].is_object());
417        assert!(params["required"].is_array());
418    }
419
420    #[tokio::test]
421    async fn test_subagent_blank_agent_uses_system_prompt() {
422        let tool = SubagentTool;
423        let ctx = create_tool_context();
424        let params = json!({
425            "agent": "",
426            "system_prompt": "You are a concise test subagent. Reply with only: ok",
427            "task": "Say ok",
428            "model": "claude-sonnet-4-6",
429            "timeout": 1
430        });
431
432        let result = tool.execute(params, ctx).await;
433        assert!(result.is_ok(), "blank agent should not be resolved as ~/.synaps-cli/agents/.md: {result:?}");
434    }
435}