Skip to main content

synaps_cli/tools/subagent/
oneshot.rs

1use serde_json::{json, Value};
2use std::sync::atomic::Ordering;
3use std::time::Duration;
4use crate::{Result, RuntimeError, LlmEvent, SessionEvent, AgentEvent};
5use super::super::{Tool, ToolContext, resolve_agent_prompt, NEXT_SUBAGENT_ID};
6pub use crate::runtime::subagent::SubagentResult;
7
8pub struct SubagentTool;
9
10#[async_trait::async_trait]
11impl Tool for SubagentTool {
12    fn name(&self) -> &str { "subagent" }
13
14    fn description(&self) -> &str {
15        "Dispatch a one-shot subagent with a specific system prompt to perform a task. The subagent gets its own tool suite (bash, read, write, edit, grep, find, ls) and runs autonomously until done. Use this when you need the result before continuing. Blocks until done. For parallel work, use subagent_start instead. Provide either an agent name (resolves from ~/.synaps-cli/agents/<name>.md) or a system_prompt string directly."
16    }
17
18    fn parameters(&self) -> Value {
19        json!({
20            "type": "object",
21            "properties": {
22                "agent": {
23                    "type": "string",
24                    "description": "Agent name — resolves to ~/.synaps-cli/agents/<name>.md. Mutually exclusive with system_prompt."
25                },
26                "system_prompt": {
27                    "type": "string",
28                    "description": "Inline system prompt for the subagent. Use when you don't have a named agent file."
29                },
30                "task": {
31                    "type": "string",
32                    "description": "The task/prompt to send to the subagent."
33                },
34                "model": {
35                    "type": "string",
36                    "description": "Model override (default: claude-sonnet-4-6). Use claude-opus-4-7 for complex tasks."
37                },
38                "timeout": {
39                    "type": "integer",
40                    "description": "Timeout in seconds (default: 300). Increase for long-running tasks."
41                }
42            },
43            "required": ["task"]
44        })
45    }
46
47    async fn execute(&self, params: Value, ctx: ToolContext) -> Result<String> {
48        let task = params["task"].as_str()
49            .ok_or_else(|| RuntimeError::Tool("Missing 'task' parameter".to_string()))?
50            .to_string();
51
52        // Treat blank / whitespace / control-char strings as absent — see
53        // subagent_start.rs for full rationale. Models occasionally pass `agent: ""`
54        // (or "\u{0}", or " ") alongside a real `system_prompt`; without this filter
55        // we'd try to resolve an empty agent name and fail, instead of falling
56        // through to the inline prompt.
57        let is_blank = |s: &String| s.chars().all(|c| c.is_whitespace() || c.is_control());
58        let agent_name = params["agent"]
59            .as_str()
60            .map(|s| s.to_string())
61            .filter(|s| !is_blank(s));
62        let inline_prompt = params["system_prompt"]
63            .as_str()
64            .map(|s| s.to_string())
65            .filter(|s| !is_blank(s));
66        let model_override = params["model"].as_str().map(|s| s.to_string());
67        let timeout_secs = params["timeout"].as_u64().unwrap_or(ctx.limits.subagent_timeout);
68
69        let system_prompt = match (&agent_name, &inline_prompt) {
70            (Some(name), _) => {
71                resolve_agent_prompt(name)
72                    .map_err(RuntimeError::Tool)?
73            }
74            (None, Some(prompt)) => prompt.clone(),
75            (None, None) => {
76                return Err(RuntimeError::Tool(
77                    "Must provide either 'agent' (name) or 'system_prompt' (inline). Got neither.".to_string()
78                ));
79            }
80        };
81
82        let label = agent_name.as_deref().unwrap_or("inline").to_string();
83        let model = model_override.unwrap_or_else(|| crate::models::default_model().to_string());
84        let task_preview: String = task.chars().take(80).collect();
85        let subagent_id = NEXT_SUBAGENT_ID.fetch_add(1, Ordering::Relaxed);
86
87        tracing::info!("Dispatching subagent '{}' (id={}) with model {}", label, subagent_id, model);
88
89        if let Some(ref tx) = ctx.channels.tx_events {
90            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentStart {
91                subagent_id,
92                agent_name: label.clone(),
93                task_preview: task_preview.clone(),
94            }));
95        }
96
97        let start_time = std::time::Instant::now();
98
99        let (result_tx, result_rx) = tokio::sync::oneshot::channel::<std::result::Result<SubagentResult, String>>();
100        let label_inner = label.clone();
101        let model_inner = model.clone();
102        let tx_events_inner = ctx.channels.tx_events.clone();
103
104        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
105
106        let _thread_handle = std::thread::spawn(move || {
107            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
108                let rt = match tokio::runtime::Builder::new_current_thread()
109                    .enable_all()
110                    .build()
111                {
112                    Ok(rt) => rt,
113                    Err(e) => {
114                        let _ = result_tx.send(Err(format!("Failed to create tokio runtime: {}", e)));
115                        return;
116                    }
117                };
118
119                let result = rt.block_on(async move {
120                    use futures::StreamExt;
121
122                    let mut runtime = match crate::Runtime::new().await {
123                        Ok(r) => r,
124                        Err(e) => return Err(format!("Failed to create subagent runtime: {}", e)),
125                    };
126
127                    runtime.set_system_prompt(system_prompt);
128                    runtime.set_model(model);
129                    runtime.set_tools(crate::ToolRegistry::without_subagent());
130
131                    let cancel = crate::CancellationToken::new();
132                    let cancel_inner = cancel.clone();
133
134                    tokio::spawn(async move {
135                        let _ = shutdown_rx.await;
136                        cancel_inner.cancel();
137                    });
138
139                    let mut stream = runtime.run_stream(task, cancel).await;
140
141                let mut final_text = String::new();
142                let mut tool_count = 0u32;
143                let mut tool_log: Vec<String> = Vec::new();
144                let mut total_input_tokens = 0u64;
145                let mut total_output_tokens = 0u64;
146                let mut total_cache_read = 0u64;
147                let mut total_cache_creation = 0u64;
148
149                let timeout_fut = tokio::time::sleep(Duration::from_secs(timeout_secs));
150                tokio::pin!(timeout_fut);
151
152                loop {
153                    tokio::select! {
154                        event = stream.next() => {
155                            let Some(event) = event else { break };
156                            match event {
157                                crate::StreamEvent::Llm(LlmEvent::Thinking(_)) => {
158                                    if let Some(ref tx) = tx_events_inner {
159                                        let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
160                                            subagent_id,
161                                            agent_name: label_inner.clone(),
162                                            status: "💭 thinking...".to_string(),
163                                        }));
164                                    }
165                                }
166                                crate::StreamEvent::Llm(LlmEvent::Text(text)) => {
167                                    final_text.push_str(&text);
168                                }
169                                crate::StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name: name, .. }) => {
170                                    tool_count += 1;
171                                    if let Some(ref tx) = tx_events_inner {
172                                        let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
173                                            subagent_id,
174                                            agent_name: label_inner.clone(),
175                                            status: format!("⚙ {} (tool #{})", name, tool_count),
176                                        }));
177                                    }
178                                }
179                                crate::StreamEvent::Llm(LlmEvent::ToolUse { tool_name, input, .. }) => {
180                                    let input_str = input.to_string();
181                                    let input_preview: String = input_str.chars().take(200).collect();
182                                    tool_log.push(format!("[tool_use]: {} — {}", tool_name, input_preview));
183                                    // Build a rich status from the tool input
184                                    let detail = match tool_name.as_str() {
185                                        "bash" => {
186                                            let cmd = input["command"].as_str().unwrap_or("");
187                                            let preview: String = cmd.chars().take(60).collect();
188                                            format!("$ {}", preview)
189                                        }
190                                        "read" => {
191                                            let path = input["path"].as_str().unwrap_or("?");
192                                            let short = path.rsplit('/').next().unwrap_or(path);
193                                            format!("reading {}", short)
194                                        }
195                                        "write" => {
196                                            let path = input["path"].as_str().unwrap_or("?");
197                                            let short = path.rsplit('/').next().unwrap_or(path);
198                                            format!("writing {}", short)
199                                        }
200                                        "edit" => {
201                                            let path = input["path"].as_str().unwrap_or("?");
202                                            let short = path.rsplit('/').next().unwrap_or(path);
203                                            format!("editing {}", short)
204                                        }
205                                        "grep" => {
206                                            let pat = input["pattern"].as_str().unwrap_or("?");
207                                            let preview: String = pat.chars().take(30).collect();
208                                            format!("grep /{}/", preview)
209                                        }
210                                        "find" => {
211                                            let pat = input["pattern"].as_str().unwrap_or("?");
212                                            format!("find {}", pat)
213                                        }
214                                        "ls" => {
215                                            let path = input["path"].as_str().unwrap_or(".");
216                                            let short = path.rsplit('/').next().unwrap_or(path);
217                                            format!("ls {}", short)
218                                        }
219                                        "subagent" => {
220                                            let name = input["agent"].as_str()
221                                                .or_else(|| input["system_prompt"].as_str().map(|s| if s.len() > 20 { "inline" } else { s }))
222                                                .unwrap_or("?");
223                                            format!("spawning {}", name)
224                                        }
225                                        other => {
226                                            // MCP or unknown tools — show tool name + first param
227                                            let short_name = if other.starts_with("ext__") {
228                                                other.splitn(3, "__").last().unwrap_or(other)
229                                            } else {
230                                                other
231                                            };
232                                            short_name.to_string()
233                                        }
234                                    };
235                                    if let Some(ref tx) = tx_events_inner {
236                                        let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
237                                            subagent_id,
238                                            agent_name: label_inner.clone(),
239                                            status: detail,
240                                        }));
241                                    }
242                                }
243                                crate::StreamEvent::Llm(LlmEvent::ToolResult { result, .. }) => {
244                                    let preview: String = result.chars().take(300).collect();
245                                    tool_log.push(format!("[tool_result]: {}", preview));
246                                }
247                                crate::StreamEvent::Session(SessionEvent::Usage {
248                                    input_tokens, output_tokens,
249                                    cache_read_input_tokens, cache_creation_input_tokens,
250                                    model: _,
251                                }) => {
252                                    total_input_tokens += input_tokens;
253                                    total_output_tokens += output_tokens;
254                                    total_cache_read += cache_read_input_tokens;
255                                    total_cache_creation += cache_creation_input_tokens;
256                                }
257                                crate::StreamEvent::Session(SessionEvent::Error(e)) => {
258                                    return Err(e);
259                                }
260                                crate::StreamEvent::Session(SessionEvent::Done) => break,
261                                _ => {}
262                            }
263                        }
264                        _ = &mut timeout_fut => {
265                            // Return partial work instead of just an error
266                            let mut partial = format!("[TIMED OUT after {}s — partial results below]\n\n", timeout_secs);
267                            if !tool_log.is_empty() {
268                                partial.push_str(&tool_log.join("\n"));
269                                partial.push('\n');
270                            }
271                            if !final_text.is_empty() {
272                                partial.push_str("\n[partial response]:\n");
273                                partial.push_str(&final_text);
274                            }
275                            return Ok(SubagentResult {
276                                text: partial,
277                                model: model_inner,
278                                input_tokens: total_input_tokens,
279                                output_tokens: total_output_tokens,
280                                cache_read: total_cache_read,
281                                cache_creation: total_cache_creation,
282                                tool_count,
283                            });
284                        }
285                    }
286                }
287
288                Ok(SubagentResult {
289                    text: final_text,
290                    model: model_inner,
291                    input_tokens: total_input_tokens,
292                    output_tokens: total_output_tokens,
293                    cache_read: total_cache_read,
294                    cache_creation: total_cache_creation,
295                    tool_count,
296                })
297            });
298
299                let _ = result_tx.send(result);
300            }));
301
302            if let Err(panic_info) = result {
303                let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
304                    s.to_string()
305                } else if let Some(s) = panic_info.downcast_ref::<String>() {
306                    s.clone()
307                } else {
308                    "unknown panic".to_string()
309                };
310                tracing::error!("Subagent thread panicked: {}", msg);
311                // result_tx is consumed inside the closure, so we can't send here —
312                // the oneshot receiver will see a RecvError, handled below.
313            }
314        });
315
316        let result = result_rx.await;
317        let elapsed = start_time.elapsed().as_secs_f64();
318
319        drop(shutdown_tx);
320
321        let log_dir = crate::config::base_dir().join("logs").join("subagents");
322        let _ = tokio::fs::create_dir_all(&log_dir).await;
323        let timestamp = chrono::Local::now().format("%Y%m%d-%H%M%S");
324
325        match result {
326            Ok(Ok(sa_result)) => {
327                let preview: String = sa_result.text.chars().take(120).collect();
328
329                if let Some(ref tx) = ctx.channels.tx_events {
330                    let _ = tx.send(crate::StreamEvent::Session(SessionEvent::Usage {
331                        input_tokens: sa_result.input_tokens,
332                        output_tokens: sa_result.output_tokens,
333                        cache_read_input_tokens: sa_result.cache_read,
334                        cache_creation_input_tokens: sa_result.cache_creation,
335                        model: Some(sa_result.model),
336                    }));
337                    let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
338                        subagent_id,
339                        agent_name: label.clone(),
340                        result_preview: preview,
341                        duration_secs: elapsed,
342                    }));
343                }
344
345                let log_content = format!(
346                    "# Subagent: {}\nDate: {}\nModel: {}\nTask: {}\nDuration: {:.1}s\nTokens: {}in/{}out ({}cr/{}cw)\nTools used: {}\n\n## Result\n\n{}\n",
347                    label, timestamp, params["model"].as_str().unwrap_or("sonnet"),
348                    task_preview, elapsed,
349                    sa_result.input_tokens, sa_result.output_tokens,
350                    sa_result.cache_read, sa_result.cache_creation,
351                    sa_result.tool_count, sa_result.text,
352                );
353                let log_path = log_dir.join(format!("{}-{}.md", timestamp, label));
354                let _ = tokio::fs::write(&log_path, &log_content).await;
355
356                Ok(format!("[subagent:{}] {}", label, sa_result.text))
357            }
358            Ok(Err(e)) => {
359                if let Some(ref tx) = ctx.channels.tx_events {
360                    let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
361                        subagent_id,
362                        agent_name: label.clone(),
363                        result_preview: format!("ERROR: {}", e),
364                        duration_secs: elapsed,
365                    }));
366                }
367                let log_path = log_dir.join(format!("{}-{}-error.md", timestamp, label));
368                let _ = tokio::fs::write(&log_path, format!("# Subagent ERROR: {}\nTask: {}\nError: {}\n", label, task_preview, e)).await;
369                Ok(format!("[subagent:{} ERROR] {}", label, e))
370            }
371            Err(_) => {
372                if let Some(ref tx) = ctx.channels.tx_events {
373                    let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
374                        subagent_id,
375                        agent_name: label.clone(),
376                        result_preview: "Task panicked or dropped".to_string(),
377                        duration_secs: elapsed,
378                    }));
379                }
380                Ok(format!("[subagent:{} ERROR] Subagent task panicked or was dropped", label))
381            }
382        }
383    }
384}
385
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use crate::tools::test_helpers::create_tool_context;
391    use crate::tools::Tool;
392    use serde_json::json;
393
394    #[test]
395    fn test_subagent_tool_schema() {
396        let tool = SubagentTool;
397        assert_eq!(tool.name(), "subagent");
398        assert!(!tool.description().is_empty());
399
400        let params = tool.parameters();
401        assert_eq!(params["type"], "object");
402        assert!(params["properties"].is_object());
403        assert!(params["required"].is_array());
404    }
405
406    #[tokio::test]
407    async fn test_subagent_blank_agent_uses_system_prompt() {
408        let tool = SubagentTool;
409        let ctx = create_tool_context();
410        let params = json!({
411            "agent": "",
412            "system_prompt": "You are a concise test subagent. Reply with only: ok",
413            "task": "Say ok",
414            "model": "claude-sonnet-4-6",
415            "timeout": 1
416        });
417
418        let result = tool.execute(params, ctx).await;
419        assert!(result.is_ok(), "blank agent should not be resolved as ~/.synaps-cli/agents/.md: {result:?}");
420    }
421}