Skip to main content

agent_engine/tools/subagent/
resume.rs

1//! SubagentResumeTool — restart a finished or timed-out subagent with new instructions.
2//!
3//! Takes the completed conversation state stored in the prior `SubagentHandle`,
4//! prepends new instructions, and dispatches a fresh subagent via the same flow as
5//! `subagent_start`. The caller gets a new `handle_id` for the continuation run.
6
7use serde_json::{json, Value};
8use std::sync::atomic::Ordering;
9use std::sync::{Arc, RwLock};
10use std::time::Duration;
11use tokio::sync::{mpsc, oneshot};
12
13use crate::{Result, RuntimeError, LlmEvent, SessionEvent, AgentEvent};
14use super::super::{Tool, ToolContext, NEXT_SUBAGENT_ID};
15use crate::runtime::subagent::{SubagentHandle, SubagentResult, SubagentStatus, SubagentState};
16
17pub struct SubagentResumeTool;
18
19#[async_trait::async_trait]
20impl Tool for SubagentResumeTool {
21    fn name(&self) -> &str { "subagent_resume" }
22
23    fn description(&self) -> &str {
24        "Resume a finished or timed-out reactive subagent with new instructions. \
25         The previous subagent's conversation state is prepended as context so the \
26         new run has full history. Returns a new handle_id — the original handle \
27         remains readable for comparison. Only works on subagents in \
28         finished/timed_out/failed state."
29    }
30
31    fn parameters(&self) -> Value {
32        json!({
33            "type": "object",
34            "properties": {
35                "handle_id": {
36                    "type": "string",
37                    "description": "Handle ID of the completed subagent to resume (e.g. \"sa_3\")."
38                },
39                "instructions": {
40                    "type": "string",
41                    "description": "New task or context to prepend to the resumed subagent. \
42                                    Injected before the prior conversation history."
43                }
44            },
45            "required": ["handle_id", "instructions"]
46        })
47    }
48
49    async fn execute(&self, params: Value, ctx: ToolContext) -> Result<String> {
50        let prior_handle_id = params["handle_id"].as_str()
51            .ok_or_else(|| RuntimeError::Tool("Missing 'handle_id' parameter".to_string()))?
52            .to_string();
53
54        let instructions = params["instructions"].as_str()
55            .ok_or_else(|| RuntimeError::Tool("Missing 'instructions' parameter".to_string()))?
56            .to_string();
57
58        let registry = ctx.capabilities.subagent_registry.as_ref()
59            .ok_or_else(|| RuntimeError::Tool(
60                "SubagentRegistry not available on this ToolContext".to_string()
61            ))?;
62
63        // Extract prior state under the lock, release immediately.
64        let (agent_name, model, prior_context, prior_system_prompt, prior_timeout) = {
65            let reg = registry.lock().unwrap();
66            let handle = reg.get(&prior_handle_id)
67                .ok_or_else(|| RuntimeError::Tool(
68                    format!("No subagent found with handle_id '{}'", prior_handle_id)
69                ))?;
70
71            if handle.status() == SubagentStatus::Running {
72                return Err(RuntimeError::Tool(format!(
73                    "Subagent '{}' is still running. Call subagent_collect first, \
74                     or wait until it finishes.",
75                    prior_handle_id
76                )));
77            }
78
79            let prior = {
80                let state = handle.conversation_state();
81                if state.is_empty() {
82                    handle.partial_output()
83                } else {
84                    serde_json::to_string(&state).unwrap_or_else(|_| handle.partial_output())
85                }
86            };
87
88            (handle.agent_name.clone(), handle.model.clone(), prior, handle.system_prompt.clone(), handle.timeout_secs)
89        };
90
91        // ── Build resumed task: new instructions → separator → prior context.
92        let resumed_task = format!(
93            "{instructions}\n\n\
94             ---\n\
95             [Prior conversation context from handle {prior_handle_id}]\n\
96             {prior_context}"
97        );
98
99        // Restore the original system prompt and timeout from the prior handle
100        let system_prompt = prior_system_prompt;
101        let timeout_secs = prior_timeout;
102        let label = agent_name.clone();
103        let task_preview: String = resumed_task.chars().take(80).collect();
104        let task_full = resumed_task.clone();
105        let subagent_id = NEXT_SUBAGENT_ID.fetch_add(1, Ordering::Relaxed);
106        let handle_id = format!("sa_{}", subagent_id);
107
108        tracing::info!(
109            "subagent_resume: dispatching '{}' (id={}, resumed_from={}) model={}",
110            label, handle_id, prior_handle_id, model
111        );
112
113        let state = Arc::new(RwLock::new(SubagentState::new()));
114
115        let (steer_tx, steer_rx) = mpsc::unbounded_channel::<String>();
116        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
117        let (result_tx, result_rx) = oneshot::channel::<SubagentResult>();
118
119        if let Some(ref tx) = ctx.channels.tx_events {
120            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentStart {
121                subagent_id,
122                agent_name: label.clone(),
123                task_preview: task_preview.clone(),
124            }));
125        }
126
127        let state_t         = Arc::clone(&state);
128        let task_full_a     = task_full.clone();
129        let label_inner     = label.clone();
130        let model_inner     = model.clone();
131        let tx_events_inner = ctx.channels.tx_events.clone();
132        let start_time      = std::time::Instant::now();
133
134        let system_prompt_for_handle = system_prompt.clone();
135        let thread_handle = std::thread::spawn(move || {
136            let panic_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
137                let rt = match tokio::runtime::Builder::new_current_thread()
138                    .enable_all()
139                    .build()
140                {
141                    Ok(rt) => rt,
142                    Err(e) => {
143                        state_t.write().unwrap().status =
144                            SubagentStatus::Failed(format!("tokio runtime: {}", e));
145                        return;
146                    }
147                };
148
149                let state_a           = Arc::clone(&state_t);
150                let label_a           = label_inner.clone();
151                let model_a           = model_inner.clone();
152                let tx_events_a       = tx_events_inner.clone();
153                let task_for_timeout  = task_full_a.clone();
154                let task_for_complete = task_full_a.clone();
155                let task_for_stream   = task_full_a;
156
157                let outcome: std::result::Result<SubagentResult, String> = rt.block_on(async move {
158                    use futures::StreamExt;
159
160                    let mut runtime = match crate::Runtime::new().await {
161                        Ok(r) => r,
162                        Err(e) => return Err(format!("Failed to create subagent runtime: {}", e)),
163                    };
164
165                    runtime.set_system_prompt(system_prompt);
166                    runtime.set_model(model_a.clone());
167                    runtime.set_tools(crate::ToolRegistry::without_subagent());
168
169                    let cancel = crate::CancellationToken::new();
170                    let cancel_inner = cancel.clone();
171                    tokio::spawn(async move {
172                        let _ = shutdown_rx.await;
173                        cancel_inner.cancel();
174                    });
175
176                    let mut stream = runtime.run_stream_with_messages(
177                        vec![serde_json::json!({"role": "user", "content": task_for_stream})],
178                        cancel,
179                        Some(steer_rx),
180                        None,
181                        false,
182                    ).await;
183
184                    let mut tool_count = 0u32;
185                    let mut total_input_tokens = 0u64;
186                    let mut total_output_tokens = 0u64;
187                    let mut total_cache_read = 0u64;
188                    let mut total_cache_creation = 0u64;
189                    // TTL split: None only if no turn ever reported one; otherwise summed.
190                    let mut total_cache_5m: Option<u64> = None;
191                    let mut total_cache_1h: Option<u64> = None;
192
193                    let timeout_fut = tokio::time::sleep(Duration::from_secs(timeout_secs));
194                    tokio::pin!(timeout_fut);
195
196                    loop {
197                        tokio::select! {
198                            event = stream.next() => {
199                                let Some(event) = event else { break };
200                                match event {
201                                    crate::StreamEvent::Llm(LlmEvent::Thinking(_)) => {
202                                        if let Some(ref tx) = tx_events_a {
203                                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
204                                                subagent_id,
205                                                agent_name: label_a.clone(),
206                                                status: "💭 thinking...".to_string(),
207                                            }));
208                                        }
209                                    }
210                                    crate::StreamEvent::Llm(LlmEvent::Text(text)) => {
211                                        state_a.write().unwrap().partial_text.push_str(&text);
212                                    }
213                                    crate::StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name: name, .. }) => {
214                                        tool_count += 1;
215                                        if let Some(ref tx) = tx_events_a {
216                                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
217                                                subagent_id,
218                                                agent_name: label_a.clone(),
219                                                status: format!("⚙ {} (tool #{})", name, tool_count),
220                                            }));
221                                        }
222                                    }
223                                    crate::StreamEvent::Llm(LlmEvent::ToolUse { tool_name, input, .. }) => {
224                                        let input_str = input.to_string();
225                                        let input_preview: String = input_str.chars().take(200).collect();
226                                        state_a.write().unwrap().tool_log
227                                            .push(format!("[tool_use]: {} — {}", tool_name, input_preview));
228                                        let detail = match tool_name.as_str() {
229                                            "bash" => {
230                                                let cmd = input["command"].as_str().unwrap_or("");
231                                                let preview: String = cmd.chars().take(60).collect();
232                                                format!("$ {}", preview)
233                                            }
234                                            "read"  => format!("reading {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
235                                            "write" => format!("writing {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
236                                            "edit"  => format!("editing {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
237                                            "grep"  => format!("grep /{}/", input["pattern"].as_str().unwrap_or("?").chars().take(30).collect::<String>()),
238                                            "find"  => format!("find {}", input["pattern"].as_str().unwrap_or("?")),
239                                            "ls"    => format!("ls {}", input["path"].as_str().unwrap_or(".").rsplit('/').next().unwrap_or(".")),
240                                            other   => {
241                                                if other.starts_with("ext__") {
242                                                    other.splitn(3, "__").last().unwrap_or(other).to_string()
243                                                } else {
244                                                    other.to_string()
245                                                }
246                                            }
247                                        };
248                                        if let Some(ref tx) = tx_events_a {
249                                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
250                                                subagent_id,
251                                                agent_name: label_a.clone(),
252                                                status: detail,
253                                            }));
254                                        }
255                                    }
256                                    crate::StreamEvent::Llm(LlmEvent::ToolResult { result, .. }) => {
257                                        let preview: String = result.chars().take(300).collect();
258                                        state_a.write().unwrap().tool_log
259                                            .push(format!("[tool_result]: {}", preview));
260                                    }
261                                    crate::StreamEvent::Session(SessionEvent::Usage {
262                                        input_tokens, output_tokens,
263                                        cache_read_input_tokens, cache_creation_input_tokens,
264                                        cache_creation_5m, cache_creation_1h,
265                                        model: _,
266                                    }) => {
267                                        total_input_tokens    += input_tokens;
268                                        total_output_tokens   += output_tokens;
269                                        total_cache_read      += cache_read_input_tokens;
270                                        total_cache_creation  += cache_creation_input_tokens;
271                                        crate::core::rpc_dispatch::merge_split(&mut total_cache_5m, cache_creation_5m);
272                                        crate::core::rpc_dispatch::merge_split(&mut total_cache_1h, cache_creation_1h);
273                                    }
274                                    crate::StreamEvent::Session(SessionEvent::Error(e)) => return Err(e),
275                                    crate::StreamEvent::Session(SessionEvent::Done) => break,
276                                    _ => {}
277                                }
278                            }
279                            _ = &mut timeout_fut => {
280                                let (partial, log) = {
281                                    let mut s = state_a.write().unwrap();
282                                    s.status = SubagentStatus::TimedOut;
283                                    s.conversation_state = vec![
284                                        serde_json::json!({"role": "user", "content": task_for_timeout.clone()}),
285                                        serde_json::json!({"role": "assistant", "content": &s.partial_text}),
286                                    ];
287                                    (s.partial_text.clone(), s.tool_log.clone())
288                                };
289                                let mut text = format!("[TIMED OUT after {}s — partial results below]\n\n", timeout_secs);
290                                if !log.is_empty() {
291                                    text.push_str(&log.join("\n"));
292                                    text.push('\n');
293                                }
294                                if !partial.is_empty() {
295                                    text.push_str("\n[partial response]:\n");
296                                    text.push_str(&partial);
297                                }
298                                return Ok(SubagentResult {
299                                    text,
300                                    model: model_a.clone(),
301                                    input_tokens: total_input_tokens,
302                                    output_tokens: total_output_tokens,
303                                    cache_read: total_cache_read,
304                                    cache_creation: total_cache_creation,
305                                    cache_creation_5m: total_cache_5m,
306                                    cache_creation_1h: total_cache_1h,
307                                    tool_count,
308                                });
309                            }
310                        }
311                    }
312
313                    Ok(SubagentResult {
314                        text: state_a.write().unwrap().partial_text.clone(),
315                        model: model_a.clone(),
316                        input_tokens: total_input_tokens,
317                        output_tokens: total_output_tokens,
318                        cache_read: total_cache_read,
319                        cache_creation: total_cache_creation,
320                        cache_creation_5m: total_cache_5m,
321                        cache_creation_1h: total_cache_1h,
322                        tool_count,
323                    })
324                });
325
326                match outcome {
327                    Ok(sa_result) => {
328                        {
329                            let mut s = state_t.write().unwrap();
330                            if matches!(s.status, SubagentStatus::Running) {
331                                s.status = SubagentStatus::Completed;
332                                s.conversation_state = vec![
333                                    serde_json::json!({"role": "user", "content": task_for_complete.clone()}),
334                                    serde_json::json!({"role": "assistant", "content": sa_result.text.clone()}),
335                                ];
336                            }
337                        }
338                        let elapsed = start_time.elapsed().as_secs_f64();
339                        let preview: String = sa_result.text.chars().take(120).collect();
340                        if let Some(ref tx) = tx_events_inner {
341                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
342                                subagent_id,
343                                agent_name: label_inner.clone(),
344                                result_preview: preview,
345                                duration_secs: elapsed,
346                            }));
347                        }
348                        let _ = result_tx.send(sa_result);
349                    }
350                    Err(e) => {
351                        state_t.write().unwrap().status = SubagentStatus::Failed(e.clone());
352                        let elapsed = start_time.elapsed().as_secs_f64();
353                        if let Some(ref tx) = tx_events_inner {
354                            let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
355                                subagent_id,
356                                agent_name: label_inner.clone(),
357                                result_preview: format!("ERROR: {}", e),
358                                duration_secs: elapsed,
359                            }));
360                        }
361                    }
362                }
363            }));
364
365            if let Err(panic_info) = panic_result {
366                let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
367                    s.to_string()
368                } else if let Some(s) = panic_info.downcast_ref::<String>() {
369                    s.clone()
370                } else {
371                    "unknown panic".to_string()
372                };
373                tracing::error!("Resumed subagent thread panicked: {}", msg);
374                state_t.write().unwrap().status = SubagentStatus::Failed(format!("panic: {}", msg));
375            }
376        });
377
378        let handle = SubagentHandle::new(
379            handle_id.clone(),
380            label.clone(),
381            task_preview,
382            model,
383            system_prompt_for_handle,
384            timeout_secs,
385            state,
386            Some(steer_tx),
387            Some(shutdown_tx),
388            Some(result_rx),
389        );
390
391        {
392            let mut reg = registry.lock().unwrap();
393            reg.register(handle);
394            if let Some(h) = reg.get_mut(&handle_id) {
395                h.set_thread_handle(thread_handle);
396            }
397        }
398
399        Ok(json!({
400            "handle_id":    handle_id,
401            "resumed_from": prior_handle_id,
402            "agent_name":   label,
403            "status":       "running"
404        }).to_string())
405    }
406}