synaps_cli/tools/subagent/
oneshot.rs1use serde_json::{json, Value};
2use std::sync::atomic::Ordering;
3use std::time::Duration;
4use crate::{Result, RuntimeError, LlmEvent, SessionEvent, AgentEvent};
5use super::super::{Tool, ToolContext, resolve_agent_prompt, NEXT_SUBAGENT_ID};
6pub use crate::runtime::subagent::SubagentResult;
7
8pub struct SubagentTool;
9
10#[async_trait::async_trait]
11impl Tool for SubagentTool {
12 fn name(&self) -> &str { "subagent" }
13
14 fn description(&self) -> &str {
15 "Dispatch a one-shot subagent with a specific system prompt to perform a task. The subagent gets its own tool suite (bash, read, write, edit, grep, find, ls) and runs autonomously until done. Use this when you need the result before continuing. Blocks until done. For parallel work, use subagent_start instead. Provide either an agent name (resolves from ~/.synaps-cli/agents/<name>.md) or a system_prompt string directly."
16 }
17
18 fn parameters(&self) -> Value {
19 json!({
20 "type": "object",
21 "properties": {
22 "agent": {
23 "type": "string",
24 "description": "Agent name — resolves to ~/.synaps-cli/agents/<name>.md. Mutually exclusive with system_prompt."
25 },
26 "system_prompt": {
27 "type": "string",
28 "description": "Inline system prompt for the subagent. Use when you don't have a named agent file."
29 },
30 "task": {
31 "type": "string",
32 "description": "The task/prompt to send to the subagent."
33 },
34 "model": {
35 "type": "string",
36 "description": "Model override (default: claude-sonnet-4-6). Use claude-opus-4-7 for complex tasks."
37 },
38 "timeout": {
39 "type": "integer",
40 "description": "Timeout in seconds (default: 300). Increase for long-running tasks."
41 }
42 },
43 "required": ["task"]
44 })
45 }
46
47 async fn execute(&self, params: Value, ctx: ToolContext) -> Result<String> {
48 let task = params["task"].as_str()
49 .ok_or_else(|| RuntimeError::Tool("Missing 'task' parameter".to_string()))?
50 .to_string();
51
52 let is_blank = |s: &String| s.chars().all(|c| c.is_whitespace() || c.is_control());
58 let agent_name = params["agent"]
59 .as_str()
60 .map(|s| s.to_string())
61 .filter(|s| !is_blank(s));
62 let inline_prompt = params["system_prompt"]
63 .as_str()
64 .map(|s| s.to_string())
65 .filter(|s| !is_blank(s));
66 let model_override = params["model"].as_str().map(|s| s.to_string());
67 let timeout_secs = params["timeout"].as_u64().unwrap_or(ctx.limits.subagent_timeout);
68
69 let system_prompt = match (&agent_name, &inline_prompt) {
70 (Some(name), _) => {
71 resolve_agent_prompt(name)
72 .map_err(RuntimeError::Tool)?
73 }
74 (None, Some(prompt)) => prompt.clone(),
75 (None, None) => {
76 return Err(RuntimeError::Tool(
77 "Must provide either 'agent' (name) or 'system_prompt' (inline). Got neither.".to_string()
78 ));
79 }
80 };
81
82 let label = agent_name.as_deref().unwrap_or("inline").to_string();
83 let model = model_override.unwrap_or_else(|| crate::models::default_model().to_string());
84 let task_preview: String = task.chars().take(80).collect();
85 let subagent_id = NEXT_SUBAGENT_ID.fetch_add(1, Ordering::Relaxed);
86
87 tracing::info!("Dispatching subagent '{}' (id={}) with model {}", label, subagent_id, model);
88
89 if let Some(ref tx) = ctx.channels.tx_events {
90 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentStart {
91 subagent_id,
92 agent_name: label.clone(),
93 task_preview: task_preview.clone(),
94 }));
95 }
96
97 let start_time = std::time::Instant::now();
98
99 let (result_tx, result_rx) = tokio::sync::oneshot::channel::<std::result::Result<SubagentResult, String>>();
100 let label_inner = label.clone();
101 let model_inner = model.clone();
102 let tx_events_inner = ctx.channels.tx_events.clone();
103
104 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
105
106 let _thread_handle = std::thread::spawn(move || {
107 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
108 let rt = match tokio::runtime::Builder::new_current_thread()
109 .enable_all()
110 .build()
111 {
112 Ok(rt) => rt,
113 Err(e) => {
114 let _ = result_tx.send(Err(format!("Failed to create tokio runtime: {}", e)));
115 return;
116 }
117 };
118
119 let result = rt.block_on(async move {
120 use futures::StreamExt;
121
122 let mut runtime = match crate::Runtime::new().await {
123 Ok(r) => r,
124 Err(e) => return Err(format!("Failed to create subagent runtime: {}", e)),
125 };
126
127 runtime.set_system_prompt(system_prompt);
128 runtime.set_model(model);
129 runtime.set_tools(crate::ToolRegistry::without_subagent());
130
131 let cancel = crate::CancellationToken::new();
132 let cancel_inner = cancel.clone();
133
134 tokio::spawn(async move {
135 let _ = shutdown_rx.await;
136 cancel_inner.cancel();
137 });
138
139 let mut stream = runtime.run_stream(task, cancel).await;
140
141 let mut final_text = String::new();
142 let mut tool_count = 0u32;
143 let mut tool_log: Vec<String> = Vec::new();
144 let mut total_input_tokens = 0u64;
145 let mut total_output_tokens = 0u64;
146 let mut total_cache_read = 0u64;
147 let mut total_cache_creation = 0u64;
148
149 let timeout_fut = tokio::time::sleep(Duration::from_secs(timeout_secs));
150 tokio::pin!(timeout_fut);
151
152 loop {
153 tokio::select! {
154 event = stream.next() => {
155 let Some(event) = event else { break };
156 match event {
157 crate::StreamEvent::Llm(LlmEvent::Thinking(_)) => {
158 if let Some(ref tx) = tx_events_inner {
159 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
160 subagent_id,
161 agent_name: label_inner.clone(),
162 status: "💭 thinking...".to_string(),
163 }));
164 }
165 }
166 crate::StreamEvent::Llm(LlmEvent::Text(text)) => {
167 final_text.push_str(&text);
168 }
169 crate::StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name: name, .. }) => {
170 tool_count += 1;
171 if let Some(ref tx) = tx_events_inner {
172 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
173 subagent_id,
174 agent_name: label_inner.clone(),
175 status: format!("⚙ {} (tool #{})", name, tool_count),
176 }));
177 }
178 }
179 crate::StreamEvent::Llm(LlmEvent::ToolUse { tool_name, input, .. }) => {
180 let input_str = input.to_string();
181 let input_preview: String = input_str.chars().take(200).collect();
182 tool_log.push(format!("[tool_use]: {} — {}", tool_name, input_preview));
183 let detail = match tool_name.as_str() {
185 "bash" => {
186 let cmd = input["command"].as_str().unwrap_or("");
187 let preview: String = cmd.chars().take(60).collect();
188 format!("$ {}", preview)
189 }
190 "read" => {
191 let path = input["path"].as_str().unwrap_or("?");
192 let short = path.rsplit('/').next().unwrap_or(path);
193 format!("reading {}", short)
194 }
195 "write" => {
196 let path = input["path"].as_str().unwrap_or("?");
197 let short = path.rsplit('/').next().unwrap_or(path);
198 format!("writing {}", short)
199 }
200 "edit" => {
201 let path = input["path"].as_str().unwrap_or("?");
202 let short = path.rsplit('/').next().unwrap_or(path);
203 format!("editing {}", short)
204 }
205 "grep" => {
206 let pat = input["pattern"].as_str().unwrap_or("?");
207 let preview: String = pat.chars().take(30).collect();
208 format!("grep /{}/", preview)
209 }
210 "find" => {
211 let pat = input["pattern"].as_str().unwrap_or("?");
212 format!("find {}", pat)
213 }
214 "ls" => {
215 let path = input["path"].as_str().unwrap_or(".");
216 let short = path.rsplit('/').next().unwrap_or(path);
217 format!("ls {}", short)
218 }
219 "subagent" => {
220 let name = input["agent"].as_str()
221 .or_else(|| input["system_prompt"].as_str().map(|s| if s.len() > 20 { "inline" } else { s }))
222 .unwrap_or("?");
223 format!("spawning {}", name)
224 }
225 other => {
226 let short_name = if other.starts_with("ext__") {
228 other.splitn(3, "__").last().unwrap_or(other)
229 } else {
230 other
231 };
232 short_name.to_string()
233 }
234 };
235 if let Some(ref tx) = tx_events_inner {
236 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
237 subagent_id,
238 agent_name: label_inner.clone(),
239 status: detail,
240 }));
241 }
242 }
243 crate::StreamEvent::Llm(LlmEvent::ToolResult { result, .. }) => {
244 let preview: String = result.chars().take(300).collect();
245 tool_log.push(format!("[tool_result]: {}", preview));
246 }
247 crate::StreamEvent::Session(SessionEvent::Usage {
248 input_tokens, output_tokens,
249 cache_read_input_tokens, cache_creation_input_tokens,
250 model: _,
251 }) => {
252 total_input_tokens += input_tokens;
253 total_output_tokens += output_tokens;
254 total_cache_read += cache_read_input_tokens;
255 total_cache_creation += cache_creation_input_tokens;
256 }
257 crate::StreamEvent::Session(SessionEvent::Error(e)) => {
258 return Err(e);
259 }
260 crate::StreamEvent::Session(SessionEvent::Done) => break,
261 _ => {}
262 }
263 }
264 _ = &mut timeout_fut => {
265 let mut partial = format!("[TIMED OUT after {}s — partial results below]\n\n", timeout_secs);
267 if !tool_log.is_empty() {
268 partial.push_str(&tool_log.join("\n"));
269 partial.push('\n');
270 }
271 if !final_text.is_empty() {
272 partial.push_str("\n[partial response]:\n");
273 partial.push_str(&final_text);
274 }
275 return Ok(SubagentResult {
276 text: partial,
277 model: model_inner,
278 input_tokens: total_input_tokens,
279 output_tokens: total_output_tokens,
280 cache_read: total_cache_read,
281 cache_creation: total_cache_creation,
282 tool_count,
283 });
284 }
285 }
286 }
287
288 Ok(SubagentResult {
289 text: final_text,
290 model: model_inner,
291 input_tokens: total_input_tokens,
292 output_tokens: total_output_tokens,
293 cache_read: total_cache_read,
294 cache_creation: total_cache_creation,
295 tool_count,
296 })
297 });
298
299 let _ = result_tx.send(result);
300 }));
301
302 if let Err(panic_info) = result {
303 let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
304 s.to_string()
305 } else if let Some(s) = panic_info.downcast_ref::<String>() {
306 s.clone()
307 } else {
308 "unknown panic".to_string()
309 };
310 tracing::error!("Subagent thread panicked: {}", msg);
311 }
314 });
315
316 let result = result_rx.await;
317 let elapsed = start_time.elapsed().as_secs_f64();
318
319 drop(shutdown_tx);
320
321 let log_dir = crate::config::base_dir().join("logs").join("subagents");
322 let _ = tokio::fs::create_dir_all(&log_dir).await;
323 let timestamp = chrono::Local::now().format("%Y%m%d-%H%M%S");
324
325 match result {
326 Ok(Ok(sa_result)) => {
327 let preview: String = sa_result.text.chars().take(120).collect();
328
329 if let Some(ref tx) = ctx.channels.tx_events {
330 let _ = tx.send(crate::StreamEvent::Session(SessionEvent::Usage {
331 input_tokens: sa_result.input_tokens,
332 output_tokens: sa_result.output_tokens,
333 cache_read_input_tokens: sa_result.cache_read,
334 cache_creation_input_tokens: sa_result.cache_creation,
335 model: Some(sa_result.model),
336 }));
337 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
338 subagent_id,
339 agent_name: label.clone(),
340 result_preview: preview,
341 duration_secs: elapsed,
342 }));
343 }
344
345 let log_content = format!(
346 "# Subagent: {}\nDate: {}\nModel: {}\nTask: {}\nDuration: {:.1}s\nTokens: {}in/{}out ({}cr/{}cw)\nTools used: {}\n\n## Result\n\n{}\n",
347 label, timestamp, params["model"].as_str().unwrap_or("sonnet"),
348 task_preview, elapsed,
349 sa_result.input_tokens, sa_result.output_tokens,
350 sa_result.cache_read, sa_result.cache_creation,
351 sa_result.tool_count, sa_result.text,
352 );
353 let log_path = log_dir.join(format!("{}-{}.md", timestamp, label));
354 let _ = tokio::fs::write(&log_path, &log_content).await;
355
356 Ok(format!("[subagent:{}] {}", label, sa_result.text))
357 }
358 Ok(Err(e)) => {
359 if let Some(ref tx) = ctx.channels.tx_events {
360 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
361 subagent_id,
362 agent_name: label.clone(),
363 result_preview: format!("ERROR: {}", e),
364 duration_secs: elapsed,
365 }));
366 }
367 let log_path = log_dir.join(format!("{}-{}-error.md", timestamp, label));
368 let _ = tokio::fs::write(&log_path, format!("# Subagent ERROR: {}\nTask: {}\nError: {}\n", label, task_preview, e)).await;
369 Ok(format!("[subagent:{} ERROR] {}", label, e))
370 }
371 Err(_) => {
372 if let Some(ref tx) = ctx.channels.tx_events {
373 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
374 subagent_id,
375 agent_name: label.clone(),
376 result_preview: "Task panicked or dropped".to_string(),
377 duration_secs: elapsed,
378 }));
379 }
380 Ok(format!("[subagent:{} ERROR] Subagent task panicked or was dropped", label))
381 }
382 }
383 }
384}
385
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use crate::tools::test_helpers::create_tool_context;
391 use crate::tools::Tool;
392 use serde_json::json;
393
394 #[test]
395 fn test_subagent_tool_schema() {
396 let tool = SubagentTool;
397 assert_eq!(tool.name(), "subagent");
398 assert!(!tool.description().is_empty());
399
400 let params = tool.parameters();
401 assert_eq!(params["type"], "object");
402 assert!(params["properties"].is_object());
403 assert!(params["required"].is_array());
404 }
405
406 #[tokio::test]
407 async fn test_subagent_blank_agent_uses_system_prompt() {
408 let tool = SubagentTool;
409 let ctx = create_tool_context();
410 let params = json!({
411 "agent": "",
412 "system_prompt": "You are a concise test subagent. Reply with only: ok",
413 "task": "Say ok",
414 "model": "claude-sonnet-4-6",
415 "timeout": 1
416 });
417
418 let result = tool.execute(params, ctx).await;
419 assert!(result.is_ok(), "blank agent should not be resolved as ~/.synaps-cli/agents/.md: {result:?}");
420 }
421}