Skip to main content

agent_engine/tools/subagent/
start.rs

1//! SubagentStartTool — dispatch a reactive subagent and return a handle_id immediately.
2//!
3//! Unlike the one-shot `subagent` tool, this tool returns *before* the subagent
4//! finishes. The caller gets a `handle_id` they can poll via `subagent_status`,
5//! steer via `subagent_steer`, or block on via `subagent_collect`.
6
7use serde_json::{json, Value};
8use std::sync::atomic::Ordering;
9use std::sync::{Arc, RwLock};
10use std::time::Duration;
11use tokio::sync::{mpsc, oneshot};
12
13use crate::{Result, RuntimeError, LlmEvent, SessionEvent, AgentEvent};
14use super::super::{Tool, ToolContext, resolve_agent_prompt, NEXT_SUBAGENT_ID};
15use crate::runtime::subagent::{SubagentHandle, SubagentResult, SubagentStatus, SubagentState};
16
17pub struct SubagentStartTool;
18
19#[async_trait::async_trait]
20impl Tool for SubagentStartTool {
21    fn name(&self) -> &str { "subagent_start" }
22
23    fn description(&self) -> &str {
24        "Dispatch a reactive subagent and return immediately with a handle_id. \
25         The subagent runs in the background — use subagent_status to poll, \
26         subagent_steer to inject guidance mid-run, and subagent_collect to poll for the result (non-blocking — call \
27         repeatedly until done). Use this for parallel execution or when you \
28         want to continue working while the subagent runs. For simple sequential \
29         delegation, use subagent instead. Provide either an agent name (resolves \
30         from ~/.synaps-cli/agents/<name>.md) or a system_prompt string directly."
31    }
32
33    fn parameters(&self) -> Value {
34        json!({
35            "type": "object",
36            "properties": {
37                "agent": {
38                    "type": "string",
39                    "description": "Agent name — resolves to ~/.synaps-cli/agents/<name>.md. Mutually exclusive with system_prompt."
40                },
41                "system_prompt": {
42                    "type": "string",
43                    "description": "Inline system prompt for the subagent. Use when you don't have a named agent file."
44                },
45                "task": {
46                    "type": "string",
47                    "description": "The task/prompt to send to the subagent."
48                },
49                "model": {
50                    "type": "string",
51                    "description": "Model override (default: claude-sonnet-4-6). Use claude-opus-4-7 for complex tasks."
52                },
53                "timeout": {
54                    "type": "integer",
55                    "description": "Timeout in seconds (default: 300). Increase for long-running tasks."
56                }
57            },
58            "required": ["task"]
59        })
60    }
61
62    async fn execute(&self, params: Value, ctx: ToolContext) -> Result<String> {
63        // ── Parse params ───────────────────────────────────────────────────────
64        let task = params["task"].as_str()
65            .ok_or_else(|| RuntimeError::Tool("Missing 'task' parameter".to_string()))?
66            .to_string();
67
68        // Treat blank / whitespace / control-char strings as absent.
69        // Some model providers serialize "unset" as "" rather than omitting the field,
70        // and we must not try to resolve "" (or " ", or "\u{0}") as an agent name —
71        // doing so produces a hard "Agent '' not found" error that the model then
72        // retries forever with sentinel values instead of falling back to system_prompt.
73        let is_blank = |s: &String| s.chars().all(|c| c.is_whitespace() || c.is_control());
74        let agent_name = params["agent"]
75            .as_str()
76            .map(|s| s.to_string())
77            .filter(|s| !is_blank(s));
78        let inline_prompt = params["system_prompt"]
79            .as_str()
80            .map(|s| s.to_string())
81            .filter(|s| !is_blank(s));
82        let model_override = params["model"].as_str().map(|s| s.to_string());
83        let timeout_secs   = params["timeout"].as_u64().unwrap_or(ctx.limits.subagent_timeout);
84
85        let system_prompt = match (&agent_name, &inline_prompt) {
86            (Some(name), _) => resolve_agent_prompt(name).map_err(RuntimeError::Tool)?,
87            (None, Some(p)) => p.clone(),
88            (None, None) => {
89                return Err(RuntimeError::Tool(
90                    "Must provide either 'agent' (name) or 'system_prompt' (inline). Got neither.".to_string()
91                ));
92            }
93        };
94
95        let label = agent_name.as_deref().unwrap_or("inline").to_string();
96        let model = model_override.unwrap_or_else(|| crate::models::default_model().to_string());
97        let task_preview: String = task.chars().take(80).collect();
98        let task_full = task.clone();
99        let subagent_id = NEXT_SUBAGENT_ID.fetch_add(1, Ordering::Relaxed);
100        let handle_id = format!("sa_{}", subagent_id);
101
102        tracing::info!("subagent_start: dispatching '{}' (id={}) model={}", label, handle_id, model);
103
104        // ── Shared state ───────────────────────────────────────────────────────
105        let state = Arc::new(RwLock::new(SubagentState::new()));
106
107        // ── Channels ───────────────────────────────────────────────────────────
108        let (steer_tx, steer_rx) = mpsc::unbounded_channel::<String>();
109        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
110        let (result_tx, result_rx) = oneshot::channel::<SubagentResult>();
111
112        // ── Forward SubagentStart event to TUI ─────────────────────────────────
113        if let Some(ref tx) = ctx.channels.tx_events {
114            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentStart {
115                subagent_id,
116                agent_name: label.clone(),
117                task_preview: task_preview.clone(),
118            }));
119        }
120
121        // ── Clone state for the spawned thread ─────────────────────────────────
122        let state_t          = Arc::clone(&state);
123        let task_full_a      = task_full.clone();
124        let label_inner      = label.clone();
125        let model_inner      = model.clone();
126        let tx_events_inner  = ctx.channels.tx_events.clone();
127        let start_time       = std::time::Instant::now();
128
129        // ── Spawn subagent thread (mirrors subagent.rs) ────────────────────────
130        let system_prompt_for_handle = system_prompt.clone();
131        let thread_handle = std::thread::spawn(move || {
132            let panic_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
133                let rt = match tokio::runtime::Builder::new_current_thread()
134                    .enable_all()
135                    .build()
136                {
137                    Ok(rt) => rt,
138                    Err(e) => {
139                        state_t.write().unwrap().status =
140                            SubagentStatus::Failed(format!("tokio runtime: {}", e));
141                        return;
142                    }
143                };
144
145                // Clones for the async block — the outer closure still needs the originals.
146                let state_a        = Arc::clone(&state_t);
147                let label_a        = label_inner.clone();
148                let model_a        = model_inner.clone();
149                let tx_events_a    = tx_events_inner.clone();
150                let task_for_timeout = task_full_a.clone();
151                let task_for_complete = task_full_a;
152
153                let outcome: std::result::Result<SubagentResult, String> = rt.block_on(async move {
154                    use futures::StreamExt;
155
156                    let mut runtime = match crate::Runtime::new().await {
157                        Ok(r) => r,
158                        Err(e) => return Err(format!("Failed to create subagent runtime: {}", e)),
159                    };
160
161                    runtime.set_system_prompt(system_prompt);
162                    runtime.set_model(model_a.clone());
163                    let tools = if let Some(ext_mgr) = crate::runtime::openai::extension_manager_for_routing() {
164                        let mgr = ext_mgr.read().await;
165                        if let Some(shared) = mgr.tools_shared() {
166                            let extension_tools = shared.read().await;
167                            crate::ToolRegistry::without_subagent_with_extensions(&extension_tools)
168                        } else {
169                            crate::ToolRegistry::without_subagent()
170                        }
171                    } else {
172                        crate::ToolRegistry::without_subagent()
173                    };
174                    runtime.set_tools(tools);
175
176                    let cancel = crate::CancellationToken::new();
177                    let cancel_inner = cancel.clone();
178                    tokio::spawn(async move {
179                        let _ = shutdown_rx.await;
180                        cancel_inner.cancel();
181                    });
182
183                    let mut stream = runtime.run_stream_with_messages(vec![serde_json::json!({"role": "user", "content": task})], cancel, Some(steer_rx), None, false).await;
184
185                    let mut tool_count = 0u32;
186                    let mut total_input_tokens = 0u64;
187                    let mut total_output_tokens = 0u64;
188                    let mut total_cache_read = 0u64;
189                    let mut total_cache_creation = 0u64;
190                    // TTL split: None only if no turn ever reported one; otherwise summed.
191                    let mut total_cache_5m: Option<u64> = None;
192                    let mut total_cache_1h: Option<u64> = None;
193
194                    let timeout_fut = tokio::time::sleep(Duration::from_secs(timeout_secs));
195                    tokio::pin!(timeout_fut);
196
197                    loop {
198                        tokio::select! {
199                            event = stream.next() => {
200                                let Some(event) = event else { break };
201                                match event {
202                                    crate::StreamEvent::Llm(LlmEvent::Thinking(_)) => {
203                                        if let Some(ref tx) = tx_events_a {
204                                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
205                                                subagent_id,
206                                                agent_name: label_a.clone(),
207                                                status: "💭 thinking...".to_string(),
208                                            }));
209                                        }
210                                    }
211                                    crate::StreamEvent::Llm(LlmEvent::Text(text)) => {
212                                        state_a.write().unwrap().partial_text.push_str(&text);
213                                    }
214                                    crate::StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name: name, .. }) => {
215                                        tool_count += 1;
216                                        if let Some(ref tx) = tx_events_a {
217                                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
218                                                subagent_id,
219                                                agent_name: label_a.clone(),
220                                                status: format!("⚙ {} (tool #{})", name, tool_count),
221                                            }));
222                                        }
223                                    }
224                                    crate::StreamEvent::Llm(LlmEvent::ToolUse { tool_name, input, .. }) => {
225                                        let input_str = input.to_string();
226                                        let input_preview: String = input_str.chars().take(200).collect();
227                                        state_a.write().unwrap().tool_log
228                                            .push(format!("[tool_use]: {} — {}", tool_name, input_preview));
229                                        let detail = match tool_name.as_str() {
230                                            "bash" => {
231                                                let cmd = input["command"].as_str().unwrap_or("");
232                                                let preview: String = cmd.chars().take(60).collect();
233                                                format!("$ {}", preview)
234                                            }
235                                            "read"  => format!("reading {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
236                                            "write" => format!("writing {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
237                                            "edit"  => format!("editing {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
238                                            "grep"  => format!("grep /{}/", input["pattern"].as_str().unwrap_or("?").chars().take(30).collect::<String>()),
239                                            "find"  => format!("find {}", input["pattern"].as_str().unwrap_or("?")),
240                                            "ls"    => format!("ls {}", input["path"].as_str().unwrap_or(".").rsplit('/').next().unwrap_or(".")),
241                                            other   => {
242                                                if other.starts_with("ext__") {
243                                                    other.splitn(3, "__").last().unwrap_or(other).to_string()
244                                                } else {
245                                                    other.to_string()
246                                                }
247                                            }
248                                        };
249                                        if let Some(ref tx) = tx_events_a {
250                                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
251                                                subagent_id,
252                                                agent_name: label_a.clone(),
253                                                status: detail,
254                                            }));
255                                        }
256                                    }
257                                    crate::StreamEvent::Llm(LlmEvent::ToolResult { result, .. }) => {
258                                        let preview: String = result.chars().take(300).collect();
259                                        state_a.write().unwrap().tool_log
260                                            .push(format!("[tool_result]: {}", preview));
261                                    }
262                                    crate::StreamEvent::Session(SessionEvent::Usage {
263                                        input_tokens, output_tokens,
264                                        cache_read_input_tokens, cache_creation_input_tokens,
265                                        cache_creation_5m, cache_creation_1h,
266                                        model: _,
267                                    }) => {
268                                        total_input_tokens    += input_tokens;
269                                        total_output_tokens   += output_tokens;
270                                        total_cache_read      += cache_read_input_tokens;
271                                        total_cache_creation  += cache_creation_input_tokens;
272                                        crate::core::rpc_dispatch::merge_split(&mut total_cache_5m, cache_creation_5m);
273                                        crate::core::rpc_dispatch::merge_split(&mut total_cache_1h, cache_creation_1h);
274                                    }
275                                    crate::StreamEvent::Session(SessionEvent::Error(e)) => return Err(e),
276                                    crate::StreamEvent::Session(SessionEvent::Done) => break,
277                                    _ => {}
278                                }
279                            }
280                            _ = &mut timeout_fut => {
281                                let (partial, log) = {
282                                    let mut s = state_a.write().unwrap();
283                                    s.status = SubagentStatus::TimedOut;
284                                    s.conversation_state = vec![
285                                        serde_json::json!({"role": "user", "content": task_for_timeout.clone()}),
286                                        serde_json::json!({"role": "assistant", "content": &s.partial_text}),
287                                    ];
288                                    (s.partial_text.clone(), s.tool_log.clone())
289                                };
290                                let mut text = format!("[TIMED OUT after {}s — partial results below]\n\n", timeout_secs);
291                                if !log.is_empty() {
292                                    text.push_str(&log.join("\n"));
293                                    text.push('\n');
294                                }
295                                if !partial.is_empty() {
296                                    text.push_str("\n[partial response]:\n");
297                                    text.push_str(&partial);
298                                }
299                                return Ok(SubagentResult {
300                                    text,
301                                    model: model_a.clone(),
302                                    input_tokens: total_input_tokens,
303                                    output_tokens: total_output_tokens,
304                                    cache_read: total_cache_read,
305                                    cache_creation: total_cache_creation,
306                                    cache_creation_5m: total_cache_5m,
307                                    cache_creation_1h: total_cache_1h,
308                                    tool_count,
309                                });
310                            }
311                        }
312                    }
313
314                    Ok(SubagentResult {
315                        text: state_a.write().unwrap().partial_text.clone(),
316                        model: model_a.clone(),
317                        input_tokens: total_input_tokens,
318                        output_tokens: total_output_tokens,
319                        cache_read: total_cache_read,
320                        cache_creation: total_cache_creation,
321                        cache_creation_5m: total_cache_5m,
322                        cache_creation_1h: total_cache_1h,
323                        tool_count,
324                    })
325                });
326
327                match outcome {
328                    Ok(sa_result) => {
329                        // Only overwrite Running → Completed (don't stomp TimedOut).
330                        {
331                            let mut s = state_t.write().unwrap();
332                            if matches!(s.status, SubagentStatus::Running) {
333                                s.status = SubagentStatus::Completed;
334                                s.conversation_state = vec![
335                                    serde_json::json!({"role": "user", "content": task_for_complete.clone()}),
336                                    serde_json::json!({"role": "assistant", "content": sa_result.text.clone()}),
337                                ];
338                            }
339                        }
340                        let elapsed = start_time.elapsed().as_secs_f64();
341                        let preview: String = sa_result.text.chars().take(120).collect();
342                        if let Some(ref tx) = tx_events_inner {
343                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
344                                subagent_id,
345                                agent_name: label_inner.clone(),
346                                result_preview: preview,
347                                duration_secs: elapsed,
348                            }));
349                        }
350                        let _ = result_tx.send(sa_result);
351                    }
352                    Err(e) => {
353                        state_t.write().unwrap().status = SubagentStatus::Failed(e.clone());
354                        let elapsed = start_time.elapsed().as_secs_f64();
355                        if let Some(ref tx) = tx_events_inner {
356                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
357                                subagent_id,
358                                agent_name: label_inner.clone(),
359                                result_preview: format!("ERROR: {}", e),
360                                duration_secs: elapsed,
361                            }));
362                        }
363                        // drop result_tx — collect() will surface the closed channel
364                    }
365                }
366            }));
367
368            if let Err(panic_info) = panic_result {
369                let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
370                    s.to_string()
371                } else if let Some(s) = panic_info.downcast_ref::<String>() {
372                    s.clone()
373                } else {
374                    "unknown panic".to_string()
375                };
376                tracing::error!("Subagent thread panicked: {}", msg);
377                state_t.write().unwrap().status = SubagentStatus::Failed(format!("panic: {}", msg));
378            }
379        });
380
381        // ── Build handle + register ────────────────────────────────────────────
382        let handle = SubagentHandle::new(
383            handle_id.clone(),
384            label.clone(),
385            task_preview,
386            model,
387            system_prompt_for_handle,
388            timeout_secs,
389            state,
390            Some(steer_tx),
391            Some(shutdown_tx),
392            Some(result_rx),
393        );
394
395        if let Some(registry) = &ctx.capabilities.subagent_registry {
396            let mut reg = registry.lock().unwrap();
397            reg.register(handle);
398            if let Some(h) = reg.get_mut(&handle_id) {
399                h.set_thread_handle(thread_handle);
400            }
401        } else {
402            return Err(RuntimeError::Tool(
403                "subagent_start requires a subagent_registry in ToolContext".to_string()
404            ));
405        }
406
407        Ok(json!({
408            "handle_id":  handle_id,
409            "agent_name": label,
410            "status":     "running"
411        }).to_string())
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use crate::tools::test_helpers::create_tool_context;
419    use crate::tools::{SubagentRegistry, Tool};
420    use serde_json::json;
421    use std::sync::{Arc, Mutex};
422
423    #[tokio::test]
424    async fn test_subagent_start_blank_agent_uses_system_prompt() {
425        let tool = SubagentStartTool;
426        let mut ctx = create_tool_context();
427        ctx.capabilities.subagent_registry = Some(Arc::new(Mutex::new(SubagentRegistry::new())));
428
429        let params = json!({
430            "agent": "",
431            "system_prompt": "You are a concise test subagent. Reply with only: ok",
432            "task": "Say ok",
433            "model": "claude-sonnet-4-6",
434            "timeout": 1
435        });
436
437        let result = tool.execute(params, ctx).await;
438        assert!(result.is_ok(), "blank agent should not be resolved as ~/.synaps-cli/agents/.md: {result:?}");
439        let body: serde_json::Value = serde_json::from_str(&result.unwrap()).unwrap();
440        assert_eq!(body["agent_name"], "inline");
441        assert!(body["handle_id"].as_str().unwrap_or_default().starts_with("sa_"));
442    }
443}