1use serde_json::{json, Value};
8use std::sync::atomic::Ordering;
9use std::sync::{Arc, RwLock};
10use std::time::Duration;
11use tokio::sync::{mpsc, oneshot};
12
13use crate::{Result, RuntimeError, LlmEvent, SessionEvent, AgentEvent};
14use super::super::{Tool, ToolContext, resolve_agent_prompt, NEXT_SUBAGENT_ID};
15use crate::runtime::subagent::{SubagentHandle, SubagentResult, SubagentStatus, SubagentState};
16
17pub struct SubagentStartTool;
18
19#[async_trait::async_trait]
20impl Tool for SubagentStartTool {
21 fn name(&self) -> &str { "subagent_start" }
22
23 fn description(&self) -> &str {
24 "Dispatch a reactive subagent and return immediately with a handle_id. \
25 The subagent runs in the background — use subagent_status to poll, \
26 subagent_steer to inject guidance mid-run, and subagent_collect to poll for the result (non-blocking — call \
27 repeatedly until done). Use this for parallel execution or when you \
28 want to continue working while the subagent runs. For simple sequential \
29 delegation, use subagent instead. Provide either an agent name (resolves \
30 from ~/.synaps-cli/agents/<name>.md) or a system_prompt string directly."
31 }
32
33 fn parameters(&self) -> Value {
34 json!({
35 "type": "object",
36 "properties": {
37 "agent": {
38 "type": "string",
39 "description": "Agent name — resolves to ~/.synaps-cli/agents/<name>.md. Mutually exclusive with system_prompt."
40 },
41 "system_prompt": {
42 "type": "string",
43 "description": "Inline system prompt for the subagent. Use when you don't have a named agent file."
44 },
45 "task": {
46 "type": "string",
47 "description": "The task/prompt to send to the subagent."
48 },
49 "model": {
50 "type": "string",
51 "description": "Model override (default: claude-sonnet-4-6). Use claude-opus-4-7 for complex tasks."
52 },
53 "timeout": {
54 "type": "integer",
55 "description": "Timeout in seconds (default: 300). Increase for long-running tasks."
56 }
57 },
58 "required": ["task"]
59 })
60 }
61
62 async fn execute(&self, params: Value, ctx: ToolContext) -> Result<String> {
63 let task = params["task"].as_str()
65 .ok_or_else(|| RuntimeError::Tool("Missing 'task' parameter".to_string()))?
66 .to_string();
67
68 let is_blank = |s: &String| s.chars().all(|c| c.is_whitespace() || c.is_control());
74 let agent_name = params["agent"]
75 .as_str()
76 .map(|s| s.to_string())
77 .filter(|s| !is_blank(s));
78 let inline_prompt = params["system_prompt"]
79 .as_str()
80 .map(|s| s.to_string())
81 .filter(|s| !is_blank(s));
82 let model_override = params["model"].as_str().map(|s| s.to_string());
83 let timeout_secs = params["timeout"].as_u64().unwrap_or(ctx.limits.subagent_timeout);
84
85 let system_prompt = match (&agent_name, &inline_prompt) {
86 (Some(name), _) => resolve_agent_prompt(name).map_err(RuntimeError::Tool)?,
87 (None, Some(p)) => p.clone(),
88 (None, None) => {
89 return Err(RuntimeError::Tool(
90 "Must provide either 'agent' (name) or 'system_prompt' (inline). Got neither.".to_string()
91 ));
92 }
93 };
94
95 let label = agent_name.as_deref().unwrap_or("inline").to_string();
96 let model = model_override.unwrap_or_else(|| crate::models::default_model().to_string());
97 let task_preview: String = task.chars().take(80).collect();
98 let task_full = task.clone();
99 let subagent_id = NEXT_SUBAGENT_ID.fetch_add(1, Ordering::Relaxed);
100 let handle_id = format!("sa_{}", subagent_id);
101
102 tracing::info!("subagent_start: dispatching '{}' (id={}) model={}", label, handle_id, model);
103
104 let state = Arc::new(RwLock::new(SubagentState::new()));
106
107 let (steer_tx, steer_rx) = mpsc::unbounded_channel::<String>();
109 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
110 let (result_tx, result_rx) = oneshot::channel::<SubagentResult>();
111
112 if let Some(ref tx) = ctx.channels.tx_events {
114 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentStart {
115 subagent_id,
116 agent_name: label.clone(),
117 task_preview: task_preview.clone(),
118 }));
119 }
120
121 let state_t = Arc::clone(&state);
123 let task_full_a = task_full.clone();
124 let label_inner = label.clone();
125 let model_inner = model.clone();
126 let tx_events_inner = ctx.channels.tx_events.clone();
127 let start_time = std::time::Instant::now();
128
129 let system_prompt_for_handle = system_prompt.clone();
131 let thread_handle = std::thread::spawn(move || {
132 let panic_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
133 let rt = match tokio::runtime::Builder::new_current_thread()
134 .enable_all()
135 .build()
136 {
137 Ok(rt) => rt,
138 Err(e) => {
139 state_t.write().unwrap().status =
140 SubagentStatus::Failed(format!("tokio runtime: {}", e));
141 return;
142 }
143 };
144
145 let state_a = Arc::clone(&state_t);
147 let label_a = label_inner.clone();
148 let model_a = model_inner.clone();
149 let tx_events_a = tx_events_inner.clone();
150 let task_for_timeout = task_full_a.clone();
151 let task_for_complete = task_full_a;
152
153 let outcome: std::result::Result<SubagentResult, String> = rt.block_on(async move {
154 use futures::StreamExt;
155
156 let mut runtime = match crate::Runtime::new().await {
157 Ok(r) => r,
158 Err(e) => return Err(format!("Failed to create subagent runtime: {}", e)),
159 };
160
161 runtime.set_system_prompt(system_prompt);
162 runtime.set_model(model_a.clone());
163 let tools = if let Some(ext_mgr) = crate::runtime::openai::extension_manager_for_routing() {
164 let mgr = ext_mgr.read().await;
165 if let Some(shared) = mgr.tools_shared() {
166 let extension_tools = shared.read().await;
167 crate::ToolRegistry::without_subagent_with_extensions(&extension_tools)
168 } else {
169 crate::ToolRegistry::without_subagent()
170 }
171 } else {
172 crate::ToolRegistry::without_subagent()
173 };
174 runtime.set_tools(tools);
175
176 let cancel = crate::CancellationToken::new();
177 let cancel_inner = cancel.clone();
178 tokio::spawn(async move {
179 let _ = shutdown_rx.await;
180 cancel_inner.cancel();
181 });
182
183 let mut stream = runtime.run_stream_with_messages(vec![serde_json::json!({"role": "user", "content": task})], cancel, Some(steer_rx), None, false).await;
184
185 let mut tool_count = 0u32;
186 let mut total_input_tokens = 0u64;
187 let mut total_output_tokens = 0u64;
188 let mut total_cache_read = 0u64;
189 let mut total_cache_creation = 0u64;
190 let mut total_cache_5m: Option<u64> = None;
192 let mut total_cache_1h: Option<u64> = None;
193
194 let timeout_fut = tokio::time::sleep(Duration::from_secs(timeout_secs));
195 tokio::pin!(timeout_fut);
196
197 loop {
198 tokio::select! {
199 event = stream.next() => {
200 let Some(event) = event else { break };
201 match event {
202 crate::StreamEvent::Llm(LlmEvent::Thinking(_)) => {
203 if let Some(ref tx) = tx_events_a {
204 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
205 subagent_id,
206 agent_name: label_a.clone(),
207 status: "💭 thinking...".to_string(),
208 }));
209 }
210 }
211 crate::StreamEvent::Llm(LlmEvent::Text(text)) => {
212 state_a.write().unwrap().partial_text.push_str(&text);
213 }
214 crate::StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name: name, .. }) => {
215 tool_count += 1;
216 if let Some(ref tx) = tx_events_a {
217 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
218 subagent_id,
219 agent_name: label_a.clone(),
220 status: format!("⚙ {} (tool #{})", name, tool_count),
221 }));
222 }
223 }
224 crate::StreamEvent::Llm(LlmEvent::ToolUse { tool_name, input, .. }) => {
225 let input_str = input.to_string();
226 let input_preview: String = input_str.chars().take(200).collect();
227 state_a.write().unwrap().tool_log
228 .push(format!("[tool_use]: {} — {}", tool_name, input_preview));
229 let detail = match tool_name.as_str() {
230 "bash" => {
231 let cmd = input["command"].as_str().unwrap_or("");
232 let preview: String = cmd.chars().take(60).collect();
233 format!("$ {}", preview)
234 }
235 "read" => format!("reading {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
236 "write" => format!("writing {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
237 "edit" => format!("editing {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
238 "grep" => format!("grep /{}/", input["pattern"].as_str().unwrap_or("?").chars().take(30).collect::<String>()),
239 "find" => format!("find {}", input["pattern"].as_str().unwrap_or("?")),
240 "ls" => format!("ls {}", input["path"].as_str().unwrap_or(".").rsplit('/').next().unwrap_or(".")),
241 other => {
242 if other.starts_with("ext__") {
243 other.splitn(3, "__").last().unwrap_or(other).to_string()
244 } else {
245 other.to_string()
246 }
247 }
248 };
249 if let Some(ref tx) = tx_events_a {
250 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
251 subagent_id,
252 agent_name: label_a.clone(),
253 status: detail,
254 }));
255 }
256 }
257 crate::StreamEvent::Llm(LlmEvent::ToolResult { result, .. }) => {
258 let preview: String = result.chars().take(300).collect();
259 state_a.write().unwrap().tool_log
260 .push(format!("[tool_result]: {}", preview));
261 }
262 crate::StreamEvent::Session(SessionEvent::Usage {
263 input_tokens, output_tokens,
264 cache_read_input_tokens, cache_creation_input_tokens,
265 cache_creation_5m, cache_creation_1h,
266 model: _,
267 }) => {
268 total_input_tokens += input_tokens;
269 total_output_tokens += output_tokens;
270 total_cache_read += cache_read_input_tokens;
271 total_cache_creation += cache_creation_input_tokens;
272 crate::core::rpc_dispatch::merge_split(&mut total_cache_5m, cache_creation_5m);
273 crate::core::rpc_dispatch::merge_split(&mut total_cache_1h, cache_creation_1h);
274 }
275 crate::StreamEvent::Session(SessionEvent::Error(e)) => return Err(e),
276 crate::StreamEvent::Session(SessionEvent::Done) => break,
277 _ => {}
278 }
279 }
280 _ = &mut timeout_fut => {
281 let (partial, log) = {
282 let mut s = state_a.write().unwrap();
283 s.status = SubagentStatus::TimedOut;
284 s.conversation_state = vec![
285 serde_json::json!({"role": "user", "content": task_for_timeout.clone()}),
286 serde_json::json!({"role": "assistant", "content": &s.partial_text}),
287 ];
288 (s.partial_text.clone(), s.tool_log.clone())
289 };
290 let mut text = format!("[TIMED OUT after {}s — partial results below]\n\n", timeout_secs);
291 if !log.is_empty() {
292 text.push_str(&log.join("\n"));
293 text.push('\n');
294 }
295 if !partial.is_empty() {
296 text.push_str("\n[partial response]:\n");
297 text.push_str(&partial);
298 }
299 return Ok(SubagentResult {
300 text,
301 model: model_a.clone(),
302 input_tokens: total_input_tokens,
303 output_tokens: total_output_tokens,
304 cache_read: total_cache_read,
305 cache_creation: total_cache_creation,
306 cache_creation_5m: total_cache_5m,
307 cache_creation_1h: total_cache_1h,
308 tool_count,
309 });
310 }
311 }
312 }
313
314 Ok(SubagentResult {
315 text: state_a.write().unwrap().partial_text.clone(),
316 model: model_a.clone(),
317 input_tokens: total_input_tokens,
318 output_tokens: total_output_tokens,
319 cache_read: total_cache_read,
320 cache_creation: total_cache_creation,
321 cache_creation_5m: total_cache_5m,
322 cache_creation_1h: total_cache_1h,
323 tool_count,
324 })
325 });
326
327 match outcome {
328 Ok(sa_result) => {
329 {
331 let mut s = state_t.write().unwrap();
332 if matches!(s.status, SubagentStatus::Running) {
333 s.status = SubagentStatus::Completed;
334 s.conversation_state = vec![
335 serde_json::json!({"role": "user", "content": task_for_complete.clone()}),
336 serde_json::json!({"role": "assistant", "content": sa_result.text.clone()}),
337 ];
338 }
339 }
340 let elapsed = start_time.elapsed().as_secs_f64();
341 let preview: String = sa_result.text.chars().take(120).collect();
342 if let Some(ref tx) = tx_events_inner {
343 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
344 subagent_id,
345 agent_name: label_inner.clone(),
346 result_preview: preview,
347 duration_secs: elapsed,
348 }));
349 }
350 let _ = result_tx.send(sa_result);
351 }
352 Err(e) => {
353 state_t.write().unwrap().status = SubagentStatus::Failed(e.clone());
354 let elapsed = start_time.elapsed().as_secs_f64();
355 if let Some(ref tx) = tx_events_inner {
356 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
357 subagent_id,
358 agent_name: label_inner.clone(),
359 result_preview: format!("ERROR: {}", e),
360 duration_secs: elapsed,
361 }));
362 }
363 }
365 }
366 }));
367
368 if let Err(panic_info) = panic_result {
369 let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
370 s.to_string()
371 } else if let Some(s) = panic_info.downcast_ref::<String>() {
372 s.clone()
373 } else {
374 "unknown panic".to_string()
375 };
376 tracing::error!("Subagent thread panicked: {}", msg);
377 state_t.write().unwrap().status = SubagentStatus::Failed(format!("panic: {}", msg));
378 }
379 });
380
381 let handle = SubagentHandle::new(
383 handle_id.clone(),
384 label.clone(),
385 task_preview,
386 model,
387 system_prompt_for_handle,
388 timeout_secs,
389 state,
390 Some(steer_tx),
391 Some(shutdown_tx),
392 Some(result_rx),
393 );
394
395 if let Some(registry) = &ctx.capabilities.subagent_registry {
396 let mut reg = registry.lock().unwrap();
397 reg.register(handle);
398 if let Some(h) = reg.get_mut(&handle_id) {
399 h.set_thread_handle(thread_handle);
400 }
401 } else {
402 return Err(RuntimeError::Tool(
403 "subagent_start requires a subagent_registry in ToolContext".to_string()
404 ));
405 }
406
407 Ok(json!({
408 "handle_id": handle_id,
409 "agent_name": label,
410 "status": "running"
411 }).to_string())
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418 use crate::tools::test_helpers::create_tool_context;
419 use crate::tools::{SubagentRegistry, Tool};
420 use serde_json::json;
421 use std::sync::{Arc, Mutex};
422
423 #[tokio::test]
424 async fn test_subagent_start_blank_agent_uses_system_prompt() {
425 let tool = SubagentStartTool;
426 let mut ctx = create_tool_context();
427 ctx.capabilities.subagent_registry = Some(Arc::new(Mutex::new(SubagentRegistry::new())));
428
429 let params = json!({
430 "agent": "",
431 "system_prompt": "You are a concise test subagent. Reply with only: ok",
432 "task": "Say ok",
433 "model": "claude-sonnet-4-6",
434 "timeout": 1
435 });
436
437 let result = tool.execute(params, ctx).await;
438 assert!(result.is_ok(), "blank agent should not be resolved as ~/.synaps-cli/agents/.md: {result:?}");
439 let body: serde_json::Value = serde_json::from_str(&result.unwrap()).unwrap();
440 assert_eq!(body["agent_name"], "inline");
441 assert!(body["handle_id"].as_str().unwrap_or_default().starts_with("sa_"));
442 }
443}