synaps_cli/tools/subagent/
start.rs1use serde_json::{json, Value};
8use std::sync::atomic::Ordering;
9use std::sync::{Arc, RwLock};
10use std::time::Duration;
11use tokio::sync::{mpsc, oneshot};
12
13use crate::{Result, RuntimeError, LlmEvent, SessionEvent, AgentEvent};
14use super::super::{Tool, ToolContext, resolve_agent_prompt, NEXT_SUBAGENT_ID};
15use crate::runtime::subagent::{SubagentHandle, SubagentResult, SubagentStatus, SubagentState};
16
17pub struct SubagentStartTool;
18
19#[async_trait::async_trait]
20impl Tool for SubagentStartTool {
21 fn name(&self) -> &str { "subagent_start" }
22
23 fn description(&self) -> &str {
24 "Dispatch a reactive subagent and return immediately with a handle_id. \
25 The subagent runs in the background — use subagent_status to poll, \
26 subagent_steer to inject guidance mid-run, and subagent_collect to poll for the result (non-blocking — call \
27 repeatedly until done). Use this for parallel execution or when you \
28 want to continue working while the subagent runs. For simple sequential \
29 delegation, use subagent instead. Provide either an agent name (resolves \
30 from ~/.synaps-cli/agents/<name>.md) or a system_prompt string directly."
31 }
32
33 fn parameters(&self) -> Value {
34 json!({
35 "type": "object",
36 "properties": {
37 "agent": {
38 "type": "string",
39 "description": "Agent name — resolves to ~/.synaps-cli/agents/<name>.md. Mutually exclusive with system_prompt."
40 },
41 "system_prompt": {
42 "type": "string",
43 "description": "Inline system prompt for the subagent. Use when you don't have a named agent file."
44 },
45 "task": {
46 "type": "string",
47 "description": "The task/prompt to send to the subagent."
48 },
49 "model": {
50 "type": "string",
51 "description": "Model override (default: claude-sonnet-4-6). Use claude-opus-4-7 for complex tasks."
52 },
53 "timeout": {
54 "type": "integer",
55 "description": "Timeout in seconds (default: 300). Increase for long-running tasks."
56 }
57 },
58 "required": ["task"]
59 })
60 }
61
62 async fn execute(&self, params: Value, ctx: ToolContext) -> Result<String> {
63 let task = params["task"].as_str()
65 .ok_or_else(|| RuntimeError::Tool("Missing 'task' parameter".to_string()))?
66 .to_string();
67
68 let is_blank = |s: &String| s.chars().all(|c| c.is_whitespace() || c.is_control());
74 let agent_name = params["agent"]
75 .as_str()
76 .map(|s| s.to_string())
77 .filter(|s| !is_blank(s));
78 let inline_prompt = params["system_prompt"]
79 .as_str()
80 .map(|s| s.to_string())
81 .filter(|s| !is_blank(s));
82 let model_override = params["model"].as_str().map(|s| s.to_string());
83 let timeout_secs = params["timeout"].as_u64().unwrap_or(ctx.limits.subagent_timeout);
84
85 let system_prompt = match (&agent_name, &inline_prompt) {
86 (Some(name), _) => resolve_agent_prompt(name).map_err(RuntimeError::Tool)?,
87 (None, Some(p)) => p.clone(),
88 (None, None) => {
89 return Err(RuntimeError::Tool(
90 "Must provide either 'agent' (name) or 'system_prompt' (inline). Got neither.".to_string()
91 ));
92 }
93 };
94
95 let label = agent_name.as_deref().unwrap_or("inline").to_string();
96 let model = model_override.unwrap_or_else(|| crate::models::default_model().to_string());
97 let task_preview: String = task.chars().take(80).collect();
98 let task_full = task.clone();
99 let subagent_id = NEXT_SUBAGENT_ID.fetch_add(1, Ordering::Relaxed);
100 let handle_id = format!("sa_{}", subagent_id);
101
102 tracing::info!("subagent_start: dispatching '{}' (id={}) model={}", label, handle_id, model);
103
104 let state = Arc::new(RwLock::new(SubagentState::new()));
106
107 let (steer_tx, steer_rx) = mpsc::unbounded_channel::<String>();
109 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
110 let (result_tx, result_rx) = oneshot::channel::<SubagentResult>();
111
112 if let Some(ref tx) = ctx.channels.tx_events {
114 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentStart {
115 subagent_id,
116 agent_name: label.clone(),
117 task_preview: task_preview.clone(),
118 }));
119 }
120
121 let state_t = Arc::clone(&state);
123 let task_full_a = task_full.clone();
124 let label_inner = label.clone();
125 let model_inner = model.clone();
126 let tx_events_inner = ctx.channels.tx_events.clone();
127 let start_time = std::time::Instant::now();
128
129 let system_prompt_for_handle = system_prompt.clone();
131 let thread_handle = std::thread::spawn(move || {
132 let panic_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
133 let rt = match tokio::runtime::Builder::new_current_thread()
134 .enable_all()
135 .build()
136 {
137 Ok(rt) => rt,
138 Err(e) => {
139 state_t.write().unwrap().status =
140 SubagentStatus::Failed(format!("tokio runtime: {}", e));
141 return;
142 }
143 };
144
145 let state_a = Arc::clone(&state_t);
147 let label_a = label_inner.clone();
148 let model_a = model_inner.clone();
149 let tx_events_a = tx_events_inner.clone();
150 let task_for_timeout = task_full_a.clone();
151 let task_for_complete = task_full_a;
152
153 let outcome: std::result::Result<SubagentResult, String> = rt.block_on(async move {
154 use futures::StreamExt;
155
156 let mut runtime = match crate::Runtime::new().await {
157 Ok(r) => r,
158 Err(e) => return Err(format!("Failed to create subagent runtime: {}", e)),
159 };
160
161 runtime.set_system_prompt(system_prompt);
162 runtime.set_model(model_a.clone());
163 let tools = if let Some(ext_mgr) = crate::runtime::openai::extension_manager_for_routing() {
164 let mgr = ext_mgr.read().await;
165 if let Some(shared) = mgr.tools_shared() {
166 let extension_tools = shared.read().await;
167 crate::ToolRegistry::without_subagent_with_extensions(&*extension_tools)
168 } else {
169 crate::ToolRegistry::without_subagent()
170 }
171 } else {
172 crate::ToolRegistry::without_subagent()
173 };
174 runtime.set_tools(tools);
175
176 let cancel = crate::CancellationToken::new();
177 let cancel_inner = cancel.clone();
178 tokio::spawn(async move {
179 let _ = shutdown_rx.await;
180 cancel_inner.cancel();
181 });
182
183 let mut stream = runtime.run_stream_with_messages(vec![serde_json::json!({"role": "user", "content": task})], cancel, Some(steer_rx), None).await;
184
185 let mut tool_count = 0u32;
186 let mut total_input_tokens = 0u64;
187 let mut total_output_tokens = 0u64;
188 let mut total_cache_read = 0u64;
189 let mut total_cache_creation = 0u64;
190
191 let timeout_fut = tokio::time::sleep(Duration::from_secs(timeout_secs));
192 tokio::pin!(timeout_fut);
193
194 loop {
195 tokio::select! {
196 event = stream.next() => {
197 let Some(event) = event else { break };
198 match event {
199 crate::StreamEvent::Llm(LlmEvent::Thinking(_)) => {
200 if let Some(ref tx) = tx_events_a {
201 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
202 subagent_id,
203 agent_name: label_a.clone(),
204 status: "💭 thinking...".to_string(),
205 }));
206 }
207 }
208 crate::StreamEvent::Llm(LlmEvent::Text(text)) => {
209 state_a.write().unwrap().partial_text.push_str(&text);
210 }
211 crate::StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name: name, .. }) => {
212 tool_count += 1;
213 if let Some(ref tx) = tx_events_a {
214 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
215 subagent_id,
216 agent_name: label_a.clone(),
217 status: format!("⚙ {} (tool #{})", name, tool_count),
218 }));
219 }
220 }
221 crate::StreamEvent::Llm(LlmEvent::ToolUse { tool_name, input, .. }) => {
222 let input_str = input.to_string();
223 let input_preview: String = input_str.chars().take(200).collect();
224 state_a.write().unwrap().tool_log
225 .push(format!("[tool_use]: {} — {}", tool_name, input_preview));
226 let detail = match tool_name.as_str() {
227 "bash" => {
228 let cmd = input["command"].as_str().unwrap_or("");
229 let preview: String = cmd.chars().take(60).collect();
230 format!("$ {}", preview)
231 }
232 "read" => format!("reading {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
233 "write" => format!("writing {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
234 "edit" => format!("editing {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
235 "grep" => format!("grep /{}/", input["pattern"].as_str().unwrap_or("?").chars().take(30).collect::<String>()),
236 "find" => format!("find {}", input["pattern"].as_str().unwrap_or("?")),
237 "ls" => format!("ls {}", input["path"].as_str().unwrap_or(".").rsplit('/').next().unwrap_or(".")),
238 other => {
239 if other.starts_with("ext__") {
240 other.splitn(3, "__").last().unwrap_or(other).to_string()
241 } else {
242 other.to_string()
243 }
244 }
245 };
246 if let Some(ref tx) = tx_events_a {
247 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
248 subagent_id,
249 agent_name: label_a.clone(),
250 status: detail,
251 }));
252 }
253 }
254 crate::StreamEvent::Llm(LlmEvent::ToolResult { result, .. }) => {
255 let preview: String = result.chars().take(300).collect();
256 state_a.write().unwrap().tool_log
257 .push(format!("[tool_result]: {}", preview));
258 }
259 crate::StreamEvent::Session(SessionEvent::Usage {
260 input_tokens, output_tokens,
261 cache_read_input_tokens, cache_creation_input_tokens,
262 model: _,
263 }) => {
264 total_input_tokens += input_tokens;
265 total_output_tokens += output_tokens;
266 total_cache_read += cache_read_input_tokens;
267 total_cache_creation += cache_creation_input_tokens;
268 }
269 crate::StreamEvent::Session(SessionEvent::Error(e)) => return Err(e),
270 crate::StreamEvent::Session(SessionEvent::Done) => break,
271 _ => {}
272 }
273 }
274 _ = &mut timeout_fut => {
275 let (partial, log) = {
276 let mut s = state_a.write().unwrap();
277 s.status = SubagentStatus::TimedOut;
278 s.conversation_state = vec![
279 serde_json::json!({"role": "user", "content": task_for_timeout.clone()}),
280 serde_json::json!({"role": "assistant", "content": &s.partial_text}),
281 ];
282 (s.partial_text.clone(), s.tool_log.clone())
283 };
284 let mut text = format!("[TIMED OUT after {}s — partial results below]\n\n", timeout_secs);
285 if !log.is_empty() {
286 text.push_str(&log.join("\n"));
287 text.push('\n');
288 }
289 if !partial.is_empty() {
290 text.push_str("\n[partial response]:\n");
291 text.push_str(&partial);
292 }
293 return Ok(SubagentResult {
294 text,
295 model: model_a.clone(),
296 input_tokens: total_input_tokens,
297 output_tokens: total_output_tokens,
298 cache_read: total_cache_read,
299 cache_creation: total_cache_creation,
300 tool_count,
301 });
302 }
303 }
304 }
305
306 Ok(SubagentResult {
307 text: state_a.write().unwrap().partial_text.clone(),
308 model: model_a.clone(),
309 input_tokens: total_input_tokens,
310 output_tokens: total_output_tokens,
311 cache_read: total_cache_read,
312 cache_creation: total_cache_creation,
313 tool_count,
314 })
315 });
316
317 match outcome {
318 Ok(sa_result) => {
319 {
321 let mut s = state_t.write().unwrap();
322 if matches!(s.status, SubagentStatus::Running) {
323 s.status = SubagentStatus::Completed;
324 s.conversation_state = vec![
325 serde_json::json!({"role": "user", "content": task_for_complete.clone()}),
326 serde_json::json!({"role": "assistant", "content": sa_result.text.clone()}),
327 ];
328 }
329 }
330 let elapsed = start_time.elapsed().as_secs_f64();
331 let preview: String = sa_result.text.chars().take(120).collect();
332 if let Some(ref tx) = tx_events_inner {
333 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
334 subagent_id,
335 agent_name: label_inner.clone(),
336 result_preview: preview,
337 duration_secs: elapsed,
338 }));
339 }
340 let _ = result_tx.send(sa_result);
341 }
342 Err(e) => {
343 state_t.write().unwrap().status = SubagentStatus::Failed(e.clone());
344 let elapsed = start_time.elapsed().as_secs_f64();
345 if let Some(ref tx) = tx_events_inner {
346 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
347 subagent_id,
348 agent_name: label_inner.clone(),
349 result_preview: format!("ERROR: {}", e),
350 duration_secs: elapsed,
351 }));
352 }
353 }
355 }
356 }));
357
358 if let Err(panic_info) = panic_result {
359 let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
360 s.to_string()
361 } else if let Some(s) = panic_info.downcast_ref::<String>() {
362 s.clone()
363 } else {
364 "unknown panic".to_string()
365 };
366 tracing::error!("Subagent thread panicked: {}", msg);
367 state_t.write().unwrap().status = SubagentStatus::Failed(format!("panic: {}", msg));
368 }
369 });
370
371 let handle = SubagentHandle::new(
373 handle_id.clone(),
374 label.clone(),
375 task_preview,
376 model,
377 system_prompt_for_handle,
378 timeout_secs,
379 state,
380 Some(steer_tx),
381 Some(shutdown_tx),
382 Some(result_rx),
383 );
384
385 if let Some(registry) = &ctx.capabilities.subagent_registry {
386 let mut reg = registry.lock().unwrap();
387 reg.register(handle);
388 if let Some(h) = reg.get_mut(&handle_id) {
389 h.set_thread_handle(thread_handle);
390 }
391 } else {
392 return Err(RuntimeError::Tool(
393 "subagent_start requires a subagent_registry in ToolContext".to_string()
394 ));
395 }
396
397 Ok(json!({
398 "handle_id": handle_id,
399 "agent_name": label,
400 "status": "running"
401 }).to_string())
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use crate::tools::test_helpers::create_tool_context;
409 use crate::tools::{SubagentRegistry, Tool};
410 use serde_json::json;
411 use std::sync::{Arc, Mutex};
412
413 #[tokio::test]
414 async fn test_subagent_start_blank_agent_uses_system_prompt() {
415 let tool = SubagentStartTool;
416 let mut ctx = create_tool_context();
417 ctx.capabilities.subagent_registry = Some(Arc::new(Mutex::new(SubagentRegistry::new())));
418
419 let params = json!({
420 "agent": "",
421 "system_prompt": "You are a concise test subagent. Reply with only: ok",
422 "task": "Say ok",
423 "model": "claude-sonnet-4-6",
424 "timeout": 1
425 });
426
427 let result = tool.execute(params, ctx).await;
428 assert!(result.is_ok(), "blank agent should not be resolved as ~/.synaps-cli/agents/.md: {result:?}");
429 let body: serde_json::Value = serde_json::from_str(&result.unwrap()).unwrap();
430 assert_eq!(body["agent_name"], "inline");
431 assert!(body["handle_id"].as_str().unwrap_or_default().starts_with("sa_"));
432 }
433}