Skip to main content

synaps_cli/tools/subagent/
start.rs

1//! SubagentStartTool — dispatch a reactive subagent and return a handle_id immediately.
2//!
3//! Unlike the one-shot `subagent` tool, this tool returns *before* the subagent
4//! finishes. The caller gets a `handle_id` they can poll via `subagent_status`,
5//! steer via `subagent_steer`, or block on via `subagent_collect`.
6
7use serde_json::{json, Value};
8use std::sync::atomic::Ordering;
9use std::sync::{Arc, RwLock};
10use std::time::Duration;
11use tokio::sync::{mpsc, oneshot};
12
13use crate::{Result, RuntimeError, LlmEvent, SessionEvent, AgentEvent};
14use super::super::{Tool, ToolContext, resolve_agent_prompt, NEXT_SUBAGENT_ID};
15use crate::runtime::subagent::{SubagentHandle, SubagentResult, SubagentStatus, SubagentState};
16
17pub struct SubagentStartTool;
18
19#[async_trait::async_trait]
20impl Tool for SubagentStartTool {
21    fn name(&self) -> &str { "subagent_start" }
22
23    fn description(&self) -> &str {
24        "Dispatch a reactive subagent and return immediately with a handle_id. \
25         The subagent runs in the background — use subagent_status to poll, \
26         subagent_steer to inject guidance mid-run, and subagent_collect to poll for the result (non-blocking — call \
27         repeatedly until done). Use this for parallel execution or when you \
28         want to continue working while the subagent runs. For simple sequential \
29         delegation, use subagent instead. Provide either an agent name (resolves \
30         from ~/.synaps-cli/agents/<name>.md) or a system_prompt string directly."
31    }
32
33    fn parameters(&self) -> Value {
34        json!({
35            "type": "object",
36            "properties": {
37                "agent": {
38                    "type": "string",
39                    "description": "Agent name — resolves to ~/.synaps-cli/agents/<name>.md. Mutually exclusive with system_prompt."
40                },
41                "system_prompt": {
42                    "type": "string",
43                    "description": "Inline system prompt for the subagent. Use when you don't have a named agent file."
44                },
45                "task": {
46                    "type": "string",
47                    "description": "The task/prompt to send to the subagent."
48                },
49                "model": {
50                    "type": "string",
51                    "description": "Model override (default: claude-sonnet-4-6). Use claude-opus-4-7 for complex tasks."
52                },
53                "timeout": {
54                    "type": "integer",
55                    "description": "Timeout in seconds (default: 300). Increase for long-running tasks."
56                }
57            },
58            "required": ["task"]
59        })
60    }
61
62    async fn execute(&self, params: Value, ctx: ToolContext) -> Result<String> {
63        // ── Parse params ───────────────────────────────────────────────────────
64        let task = params["task"].as_str()
65            .ok_or_else(|| RuntimeError::Tool("Missing 'task' parameter".to_string()))?
66            .to_string();
67
68        // Treat blank / whitespace / control-char strings as absent.
69        // Some model providers serialize "unset" as "" rather than omitting the field,
70        // and we must not try to resolve "" (or " ", or "\u{0}") as an agent name —
71        // doing so produces a hard "Agent '' not found" error that the model then
72        // retries forever with sentinel values instead of falling back to system_prompt.
73        let is_blank = |s: &String| s.chars().all(|c| c.is_whitespace() || c.is_control());
74        let agent_name = params["agent"]
75            .as_str()
76            .map(|s| s.to_string())
77            .filter(|s| !is_blank(s));
78        let inline_prompt = params["system_prompt"]
79            .as_str()
80            .map(|s| s.to_string())
81            .filter(|s| !is_blank(s));
82        let model_override = params["model"].as_str().map(|s| s.to_string());
83        let timeout_secs   = params["timeout"].as_u64().unwrap_or(ctx.limits.subagent_timeout);
84
85        let system_prompt = match (&agent_name, &inline_prompt) {
86            (Some(name), _) => resolve_agent_prompt(name).map_err(RuntimeError::Tool)?,
87            (None, Some(p)) => p.clone(),
88            (None, None) => {
89                return Err(RuntimeError::Tool(
90                    "Must provide either 'agent' (name) or 'system_prompt' (inline). Got neither.".to_string()
91                ));
92            }
93        };
94
95        let label = agent_name.as_deref().unwrap_or("inline").to_string();
96        let model = model_override.unwrap_or_else(|| crate::models::default_model().to_string());
97        let task_preview: String = task.chars().take(80).collect();
98        let task_full = task.clone();
99        let subagent_id = NEXT_SUBAGENT_ID.fetch_add(1, Ordering::Relaxed);
100        let handle_id = format!("sa_{}", subagent_id);
101
102        tracing::info!("subagent_start: dispatching '{}' (id={}) model={}", label, handle_id, model);
103
104        // ── Shared state ───────────────────────────────────────────────────────
105        let state = Arc::new(RwLock::new(SubagentState::new()));
106
107        // ── Channels ───────────────────────────────────────────────────────────
108        let (steer_tx, steer_rx) = mpsc::unbounded_channel::<String>();
109        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
110        let (result_tx, result_rx) = oneshot::channel::<SubagentResult>();
111
112        // ── Forward SubagentStart event to TUI ─────────────────────────────────
113        if let Some(ref tx) = ctx.channels.tx_events {
114            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentStart {
115                subagent_id,
116                agent_name: label.clone(),
117                task_preview: task_preview.clone(),
118            }));
119        }
120
121        // ── Clone state for the spawned thread ─────────────────────────────────
122        let state_t          = Arc::clone(&state);
123        let task_full_a      = task_full.clone();
124        let label_inner      = label.clone();
125        let model_inner      = model.clone();
126        let tx_events_inner  = ctx.channels.tx_events.clone();
127        let start_time       = std::time::Instant::now();
128
129        // ── Spawn subagent thread (mirrors subagent.rs) ────────────────────────
130        let system_prompt_for_handle = system_prompt.clone();
131        let thread_handle = std::thread::spawn(move || {
132            let panic_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
133                let rt = match tokio::runtime::Builder::new_current_thread()
134                    .enable_all()
135                    .build()
136                {
137                    Ok(rt) => rt,
138                    Err(e) => {
139                        state_t.write().unwrap().status =
140                            SubagentStatus::Failed(format!("tokio runtime: {}", e));
141                        return;
142                    }
143                };
144
145                // Clones for the async block — the outer closure still needs the originals.
146                let state_a        = Arc::clone(&state_t);
147                let label_a        = label_inner.clone();
148                let model_a        = model_inner.clone();
149                let tx_events_a    = tx_events_inner.clone();
150                let task_for_timeout = task_full_a.clone();
151                let task_for_complete = task_full_a;
152
153                let outcome: std::result::Result<SubagentResult, String> = rt.block_on(async move {
154                    use futures::StreamExt;
155
156                    let mut runtime = match crate::Runtime::new().await {
157                        Ok(r) => r,
158                        Err(e) => return Err(format!("Failed to create subagent runtime: {}", e)),
159                    };
160
161                    runtime.set_system_prompt(system_prompt);
162                    runtime.set_model(model_a.clone());
163                    let tools = if let Some(ext_mgr) = crate::runtime::openai::extension_manager_for_routing() {
164                        let mgr = ext_mgr.read().await;
165                        if let Some(shared) = mgr.tools_shared() {
166                            let extension_tools = shared.read().await;
167                            crate::ToolRegistry::without_subagent_with_extensions(&*extension_tools)
168                        } else {
169                            crate::ToolRegistry::without_subagent()
170                        }
171                    } else {
172                        crate::ToolRegistry::without_subagent()
173                    };
174                    runtime.set_tools(tools);
175
176                    let cancel = crate::CancellationToken::new();
177                    let cancel_inner = cancel.clone();
178                    tokio::spawn(async move {
179                        let _ = shutdown_rx.await;
180                        cancel_inner.cancel();
181                    });
182
183                    let mut stream = runtime.run_stream_with_messages(vec![serde_json::json!({"role": "user", "content": task})], cancel, Some(steer_rx), None).await;
184
185                    let mut tool_count = 0u32;
186                    let mut total_input_tokens = 0u64;
187                    let mut total_output_tokens = 0u64;
188                    let mut total_cache_read = 0u64;
189                    let mut total_cache_creation = 0u64;
190
191                    let timeout_fut = tokio::time::sleep(Duration::from_secs(timeout_secs));
192                    tokio::pin!(timeout_fut);
193
194                    loop {
195                        tokio::select! {
196                            event = stream.next() => {
197                                let Some(event) = event else { break };
198                                match event {
199                                    crate::StreamEvent::Llm(LlmEvent::Thinking(_)) => {
200                                        if let Some(ref tx) = tx_events_a {
201                                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
202                                                subagent_id,
203                                                agent_name: label_a.clone(),
204                                                status: "💭 thinking...".to_string(),
205                                            }));
206                                        }
207                                    }
208                                    crate::StreamEvent::Llm(LlmEvent::Text(text)) => {
209                                        state_a.write().unwrap().partial_text.push_str(&text);
210                                    }
211                                    crate::StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name: name, .. }) => {
212                                        tool_count += 1;
213                                        if let Some(ref tx) = tx_events_a {
214                                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
215                                                subagent_id,
216                                                agent_name: label_a.clone(),
217                                                status: format!("⚙ {} (tool #{})", name, tool_count),
218                                            }));
219                                        }
220                                    }
221                                    crate::StreamEvent::Llm(LlmEvent::ToolUse { tool_name, input, .. }) => {
222                                        let input_str = input.to_string();
223                                        let input_preview: String = input_str.chars().take(200).collect();
224                                        state_a.write().unwrap().tool_log
225                                            .push(format!("[tool_use]: {} — {}", tool_name, input_preview));
226                                        let detail = match tool_name.as_str() {
227                                            "bash" => {
228                                                let cmd = input["command"].as_str().unwrap_or("");
229                                                let preview: String = cmd.chars().take(60).collect();
230                                                format!("$ {}", preview)
231                                            }
232                                            "read"  => format!("reading {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
233                                            "write" => format!("writing {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
234                                            "edit"  => format!("editing {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
235                                            "grep"  => format!("grep /{}/", input["pattern"].as_str().unwrap_or("?").chars().take(30).collect::<String>()),
236                                            "find"  => format!("find {}", input["pattern"].as_str().unwrap_or("?")),
237                                            "ls"    => format!("ls {}", input["path"].as_str().unwrap_or(".").rsplit('/').next().unwrap_or(".")),
238                                            other   => {
239                                                if other.starts_with("ext__") {
240                                                    other.splitn(3, "__").last().unwrap_or(other).to_string()
241                                                } else {
242                                                    other.to_string()
243                                                }
244                                            }
245                                        };
246                                        if let Some(ref tx) = tx_events_a {
247                                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
248                                                subagent_id,
249                                                agent_name: label_a.clone(),
250                                                status: detail,
251                                            }));
252                                        }
253                                    }
254                                    crate::StreamEvent::Llm(LlmEvent::ToolResult { result, .. }) => {
255                                        let preview: String = result.chars().take(300).collect();
256                                        state_a.write().unwrap().tool_log
257                                            .push(format!("[tool_result]: {}", preview));
258                                    }
259                                    crate::StreamEvent::Session(SessionEvent::Usage {
260                                        input_tokens, output_tokens,
261                                        cache_read_input_tokens, cache_creation_input_tokens,
262                                        model: _,
263                                    }) => {
264                                        total_input_tokens    += input_tokens;
265                                        total_output_tokens   += output_tokens;
266                                        total_cache_read      += cache_read_input_tokens;
267                                        total_cache_creation  += cache_creation_input_tokens;
268                                    }
269                                    crate::StreamEvent::Session(SessionEvent::Error(e)) => return Err(e),
270                                    crate::StreamEvent::Session(SessionEvent::Done) => break,
271                                    _ => {}
272                                }
273                            }
274                            _ = &mut timeout_fut => {
275                                let (partial, log) = {
276                                    let mut s = state_a.write().unwrap();
277                                    s.status = SubagentStatus::TimedOut;
278                                    s.conversation_state = vec![
279                                        serde_json::json!({"role": "user", "content": task_for_timeout.clone()}),
280                                        serde_json::json!({"role": "assistant", "content": &s.partial_text}),
281                                    ];
282                                    (s.partial_text.clone(), s.tool_log.clone())
283                                };
284                                let mut text = format!("[TIMED OUT after {}s — partial results below]\n\n", timeout_secs);
285                                if !log.is_empty() {
286                                    text.push_str(&log.join("\n"));
287                                    text.push('\n');
288                                }
289                                if !partial.is_empty() {
290                                    text.push_str("\n[partial response]:\n");
291                                    text.push_str(&partial);
292                                }
293                                return Ok(SubagentResult {
294                                    text,
295                                    model: model_a.clone(),
296                                    input_tokens: total_input_tokens,
297                                    output_tokens: total_output_tokens,
298                                    cache_read: total_cache_read,
299                                    cache_creation: total_cache_creation,
300                                    tool_count,
301                                });
302                            }
303                        }
304                    }
305
306                    Ok(SubagentResult {
307                        text: state_a.write().unwrap().partial_text.clone(),
308                        model: model_a.clone(),
309                        input_tokens: total_input_tokens,
310                        output_tokens: total_output_tokens,
311                        cache_read: total_cache_read,
312                        cache_creation: total_cache_creation,
313                        tool_count,
314                    })
315                });
316
317                match outcome {
318                    Ok(sa_result) => {
319                        // Only overwrite Running → Completed (don't stomp TimedOut).
320                        {
321                            let mut s = state_t.write().unwrap();
322                            if matches!(s.status, SubagentStatus::Running) {
323                                s.status = SubagentStatus::Completed;
324                                s.conversation_state = vec![
325                                    serde_json::json!({"role": "user", "content": task_for_complete.clone()}),
326                                    serde_json::json!({"role": "assistant", "content": sa_result.text.clone()}),
327                                ];
328                            }
329                        }
330                        let elapsed = start_time.elapsed().as_secs_f64();
331                        let preview: String = sa_result.text.chars().take(120).collect();
332                        if let Some(ref tx) = tx_events_inner {
333                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
334                                subagent_id,
335                                agent_name: label_inner.clone(),
336                                result_preview: preview,
337                                duration_secs: elapsed,
338                            }));
339                        }
340                        let _ = result_tx.send(sa_result);
341                    }
342                    Err(e) => {
343                        state_t.write().unwrap().status = SubagentStatus::Failed(e.clone());
344                        let elapsed = start_time.elapsed().as_secs_f64();
345                        if let Some(ref tx) = tx_events_inner {
346                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
347                                subagent_id,
348                                agent_name: label_inner.clone(),
349                                result_preview: format!("ERROR: {}", e),
350                                duration_secs: elapsed,
351                            }));
352                        }
353                        // drop result_tx — collect() will surface the closed channel
354                    }
355                }
356            }));
357
358            if let Err(panic_info) = panic_result {
359                let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
360                    s.to_string()
361                } else if let Some(s) = panic_info.downcast_ref::<String>() {
362                    s.clone()
363                } else {
364                    "unknown panic".to_string()
365                };
366                tracing::error!("Subagent thread panicked: {}", msg);
367                state_t.write().unwrap().status = SubagentStatus::Failed(format!("panic: {}", msg));
368            }
369        });
370
371        // ── Build handle + register ────────────────────────────────────────────
372        let handle = SubagentHandle::new(
373            handle_id.clone(),
374            label.clone(),
375            task_preview,
376            model,
377            system_prompt_for_handle,
378            timeout_secs,
379            state,
380            Some(steer_tx),
381            Some(shutdown_tx),
382            Some(result_rx),
383        );
384
385        if let Some(registry) = &ctx.capabilities.subagent_registry {
386            let mut reg = registry.lock().unwrap();
387            reg.register(handle);
388            if let Some(h) = reg.get_mut(&handle_id) {
389                h.set_thread_handle(thread_handle);
390            }
391        } else {
392            return Err(RuntimeError::Tool(
393                "subagent_start requires a subagent_registry in ToolContext".to_string()
394            ));
395        }
396
397        Ok(json!({
398            "handle_id":  handle_id,
399            "agent_name": label,
400            "status":     "running"
401        }).to_string())
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use crate::tools::test_helpers::create_tool_context;
409    use crate::tools::{SubagentRegistry, Tool};
410    use serde_json::json;
411    use std::sync::{Arc, Mutex};
412
413    #[tokio::test]
414    async fn test_subagent_start_blank_agent_uses_system_prompt() {
415        let tool = SubagentStartTool;
416        let mut ctx = create_tool_context();
417        ctx.capabilities.subagent_registry = Some(Arc::new(Mutex::new(SubagentRegistry::new())));
418
419        let params = json!({
420            "agent": "",
421            "system_prompt": "You are a concise test subagent. Reply with only: ok",
422            "task": "Say ok",
423            "model": "claude-sonnet-4-6",
424            "timeout": 1
425        });
426
427        let result = tool.execute(params, ctx).await;
428        assert!(result.is_ok(), "blank agent should not be resolved as ~/.synaps-cli/agents/.md: {result:?}");
429        let body: serde_json::Value = serde_json::from_str(&result.unwrap()).unwrap();
430        assert_eq!(body["agent_name"], "inline");
431        assert!(body["handle_id"].as_str().unwrap_or_default().starts_with("sa_"));
432    }
433}