agent_engine/tools/subagent/
resume.rs1use serde_json::{json, Value};
8use std::sync::atomic::Ordering;
9use std::sync::{Arc, RwLock};
10use std::time::Duration;
11use tokio::sync::{mpsc, oneshot};
12
13use crate::{Result, RuntimeError, LlmEvent, SessionEvent, AgentEvent};
14use super::super::{Tool, ToolContext, NEXT_SUBAGENT_ID};
15use crate::runtime::subagent::{SubagentHandle, SubagentResult, SubagentStatus, SubagentState};
16
17pub struct SubagentResumeTool;
18
19#[async_trait::async_trait]
20impl Tool for SubagentResumeTool {
21 fn name(&self) -> &str { "subagent_resume" }
22
23 fn description(&self) -> &str {
24 "Resume a finished or timed-out reactive subagent with new instructions. \
25 The previous subagent's conversation state is prepended as context so the \
26 new run has full history. Returns a new handle_id — the original handle \
27 remains readable for comparison. Only works on subagents in \
28 finished/timed_out/failed state."
29 }
30
31 fn parameters(&self) -> Value {
32 json!({
33 "type": "object",
34 "properties": {
35 "handle_id": {
36 "type": "string",
37 "description": "Handle ID of the completed subagent to resume (e.g. \"sa_3\")."
38 },
39 "instructions": {
40 "type": "string",
41 "description": "New task or context to prepend to the resumed subagent. \
42 Injected before the prior conversation history."
43 }
44 },
45 "required": ["handle_id", "instructions"]
46 })
47 }
48
49 async fn execute(&self, params: Value, ctx: ToolContext) -> Result<String> {
50 let prior_handle_id = params["handle_id"].as_str()
51 .ok_or_else(|| RuntimeError::Tool("Missing 'handle_id' parameter".to_string()))?
52 .to_string();
53
54 let instructions = params["instructions"].as_str()
55 .ok_or_else(|| RuntimeError::Tool("Missing 'instructions' parameter".to_string()))?
56 .to_string();
57
58 let registry = ctx.capabilities.subagent_registry.as_ref()
59 .ok_or_else(|| RuntimeError::Tool(
60 "SubagentRegistry not available on this ToolContext".to_string()
61 ))?;
62
63 let (agent_name, model, prior_context, prior_system_prompt, prior_timeout) = {
65 let reg = registry.lock().unwrap();
66 let handle = reg.get(&prior_handle_id)
67 .ok_or_else(|| RuntimeError::Tool(
68 format!("No subagent found with handle_id '{}'", prior_handle_id)
69 ))?;
70
71 if handle.status() == SubagentStatus::Running {
72 return Err(RuntimeError::Tool(format!(
73 "Subagent '{}' is still running. Call subagent_collect first, \
74 or wait until it finishes.",
75 prior_handle_id
76 )));
77 }
78
79 let prior = {
80 let state = handle.conversation_state();
81 if state.is_empty() {
82 handle.partial_output()
83 } else {
84 serde_json::to_string(&state).unwrap_or_else(|_| handle.partial_output())
85 }
86 };
87
88 (handle.agent_name.clone(), handle.model.clone(), prior, handle.system_prompt.clone(), handle.timeout_secs)
89 };
90
91 let resumed_task = format!(
93 "{instructions}\n\n\
94 ---\n\
95 [Prior conversation context from handle {prior_handle_id}]\n\
96 {prior_context}"
97 );
98
99 let system_prompt = prior_system_prompt;
101 let timeout_secs = prior_timeout;
102 let label = agent_name.clone();
103 let task_preview: String = resumed_task.chars().take(80).collect();
104 let task_full = resumed_task.clone();
105 let subagent_id = NEXT_SUBAGENT_ID.fetch_add(1, Ordering::Relaxed);
106 let handle_id = format!("sa_{}", subagent_id);
107
108 tracing::info!(
109 "subagent_resume: dispatching '{}' (id={}, resumed_from={}) model={}",
110 label, handle_id, prior_handle_id, model
111 );
112
113 let state = Arc::new(RwLock::new(SubagentState::new()));
114
115 let (steer_tx, steer_rx) = mpsc::unbounded_channel::<String>();
116 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
117 let (result_tx, result_rx) = oneshot::channel::<SubagentResult>();
118
119 if let Some(ref tx) = ctx.channels.tx_events {
120 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentStart {
121 subagent_id,
122 agent_name: label.clone(),
123 task_preview: task_preview.clone(),
124 }));
125 }
126
127 let state_t = Arc::clone(&state);
128 let task_full_a = task_full.clone();
129 let label_inner = label.clone();
130 let model_inner = model.clone();
131 let tx_events_inner = ctx.channels.tx_events.clone();
132 let start_time = std::time::Instant::now();
133
134 let system_prompt_for_handle = system_prompt.clone();
135 let thread_handle = std::thread::spawn(move || {
136 let panic_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
137 let rt = match tokio::runtime::Builder::new_current_thread()
138 .enable_all()
139 .build()
140 {
141 Ok(rt) => rt,
142 Err(e) => {
143 state_t.write().unwrap().status =
144 SubagentStatus::Failed(format!("tokio runtime: {}", e));
145 return;
146 }
147 };
148
149 let state_a = Arc::clone(&state_t);
150 let label_a = label_inner.clone();
151 let model_a = model_inner.clone();
152 let tx_events_a = tx_events_inner.clone();
153 let task_for_timeout = task_full_a.clone();
154 let task_for_complete = task_full_a.clone();
155 let task_for_stream = task_full_a;
156
157 let outcome: std::result::Result<SubagentResult, String> = rt.block_on(async move {
158 use futures::StreamExt;
159
160 let mut runtime = match crate::Runtime::new().await {
161 Ok(r) => r,
162 Err(e) => return Err(format!("Failed to create subagent runtime: {}", e)),
163 };
164
165 runtime.set_system_prompt(system_prompt);
166 runtime.set_model(model_a.clone());
167 runtime.set_tools(crate::ToolRegistry::without_subagent());
168
169 let cancel = crate::CancellationToken::new();
170 let cancel_inner = cancel.clone();
171 tokio::spawn(async move {
172 let _ = shutdown_rx.await;
173 cancel_inner.cancel();
174 });
175
176 let mut stream = runtime.run_stream_with_messages(
177 vec![serde_json::json!({"role": "user", "content": task_for_stream})],
178 cancel,
179 Some(steer_rx),
180 None,
181 false,
182 ).await;
183
184 let mut tool_count = 0u32;
185 let mut total_input_tokens = 0u64;
186 let mut total_output_tokens = 0u64;
187 let mut total_cache_read = 0u64;
188 let mut total_cache_creation = 0u64;
189 let mut total_cache_5m: Option<u64> = None;
191 let mut total_cache_1h: Option<u64> = None;
192
193 let timeout_fut = tokio::time::sleep(Duration::from_secs(timeout_secs));
194 tokio::pin!(timeout_fut);
195
196 loop {
197 tokio::select! {
198 event = stream.next() => {
199 let Some(event) = event else { break };
200 match event {
201 crate::StreamEvent::Llm(LlmEvent::Thinking(_)) => {
202 if let Some(ref tx) = tx_events_a {
203 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
204 subagent_id,
205 agent_name: label_a.clone(),
206 status: "💭 thinking...".to_string(),
207 }));
208 }
209 }
210 crate::StreamEvent::Llm(LlmEvent::Text(text)) => {
211 state_a.write().unwrap().partial_text.push_str(&text);
212 }
213 crate::StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name: name, .. }) => {
214 tool_count += 1;
215 if let Some(ref tx) = tx_events_a {
216 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
217 subagent_id,
218 agent_name: label_a.clone(),
219 status: format!("⚙ {} (tool #{})", name, tool_count),
220 }));
221 }
222 }
223 crate::StreamEvent::Llm(LlmEvent::ToolUse { tool_name, input, .. }) => {
224 let input_str = input.to_string();
225 let input_preview: String = input_str.chars().take(200).collect();
226 state_a.write().unwrap().tool_log
227 .push(format!("[tool_use]: {} — {}", tool_name, input_preview));
228 let detail = match tool_name.as_str() {
229 "bash" => {
230 let cmd = input["command"].as_str().unwrap_or("");
231 let preview: String = cmd.chars().take(60).collect();
232 format!("$ {}", preview)
233 }
234 "read" => format!("reading {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
235 "write" => format!("writing {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
236 "edit" => format!("editing {}", input["path"].as_str().unwrap_or("?").rsplit('/').next().unwrap_or("?")),
237 "grep" => format!("grep /{}/", input["pattern"].as_str().unwrap_or("?").chars().take(30).collect::<String>()),
238 "find" => format!("find {}", input["pattern"].as_str().unwrap_or("?")),
239 "ls" => format!("ls {}", input["path"].as_str().unwrap_or(".").rsplit('/').next().unwrap_or(".")),
240 other => {
241 if other.starts_with("ext__") {
242 other.splitn(3, "__").last().unwrap_or(other).to_string()
243 } else {
244 other.to_string()
245 }
246 }
247 };
248 if let Some(ref tx) = tx_events_a {
249 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentUpdate {
250 subagent_id,
251 agent_name: label_a.clone(),
252 status: detail,
253 }));
254 }
255 }
256 crate::StreamEvent::Llm(LlmEvent::ToolResult { result, .. }) => {
257 let preview: String = result.chars().take(300).collect();
258 state_a.write().unwrap().tool_log
259 .push(format!("[tool_result]: {}", preview));
260 }
261 crate::StreamEvent::Session(SessionEvent::Usage {
262 input_tokens, output_tokens,
263 cache_read_input_tokens, cache_creation_input_tokens,
264 cache_creation_5m, cache_creation_1h,
265 model: _,
266 }) => {
267 total_input_tokens += input_tokens;
268 total_output_tokens += output_tokens;
269 total_cache_read += cache_read_input_tokens;
270 total_cache_creation += cache_creation_input_tokens;
271 crate::core::rpc_dispatch::merge_split(&mut total_cache_5m, cache_creation_5m);
272 crate::core::rpc_dispatch::merge_split(&mut total_cache_1h, cache_creation_1h);
273 }
274 crate::StreamEvent::Session(SessionEvent::Error(e)) => return Err(e),
275 crate::StreamEvent::Session(SessionEvent::Done) => break,
276 _ => {}
277 }
278 }
279 _ = &mut timeout_fut => {
280 let (partial, log) = {
281 let mut s = state_a.write().unwrap();
282 s.status = SubagentStatus::TimedOut;
283 s.conversation_state = vec![
284 serde_json::json!({"role": "user", "content": task_for_timeout.clone()}),
285 serde_json::json!({"role": "assistant", "content": &s.partial_text}),
286 ];
287 (s.partial_text.clone(), s.tool_log.clone())
288 };
289 let mut text = format!("[TIMED OUT after {}s — partial results below]\n\n", timeout_secs);
290 if !log.is_empty() {
291 text.push_str(&log.join("\n"));
292 text.push('\n');
293 }
294 if !partial.is_empty() {
295 text.push_str("\n[partial response]:\n");
296 text.push_str(&partial);
297 }
298 return Ok(SubagentResult {
299 text,
300 model: model_a.clone(),
301 input_tokens: total_input_tokens,
302 output_tokens: total_output_tokens,
303 cache_read: total_cache_read,
304 cache_creation: total_cache_creation,
305 cache_creation_5m: total_cache_5m,
306 cache_creation_1h: total_cache_1h,
307 tool_count,
308 });
309 }
310 }
311 }
312
313 Ok(SubagentResult {
314 text: state_a.write().unwrap().partial_text.clone(),
315 model: model_a.clone(),
316 input_tokens: total_input_tokens,
317 output_tokens: total_output_tokens,
318 cache_read: total_cache_read,
319 cache_creation: total_cache_creation,
320 cache_creation_5m: total_cache_5m,
321 cache_creation_1h: total_cache_1h,
322 tool_count,
323 })
324 });
325
326 match outcome {
327 Ok(sa_result) => {
328 {
329 let mut s = state_t.write().unwrap();
330 if matches!(s.status, SubagentStatus::Running) {
331 s.status = SubagentStatus::Completed;
332 s.conversation_state = vec![
333 serde_json::json!({"role": "user", "content": task_for_complete.clone()}),
334 serde_json::json!({"role": "assistant", "content": sa_result.text.clone()}),
335 ];
336 }
337 }
338 let elapsed = start_time.elapsed().as_secs_f64();
339 let preview: String = sa_result.text.chars().take(120).collect();
340 if let Some(ref tx) = tx_events_inner {
341 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
342 subagent_id,
343 agent_name: label_inner.clone(),
344 result_preview: preview,
345 duration_secs: elapsed,
346 }));
347 }
348 let _ = result_tx.send(sa_result);
349 }
350 Err(e) => {
351 state_t.write().unwrap().status = SubagentStatus::Failed(e.clone());
352 let elapsed = start_time.elapsed().as_secs_f64();
353 if let Some(ref tx) = tx_events_inner {
354 let _ = tx.send(crate::StreamEvent::Agent(AgentEvent::SubagentDone {
355 subagent_id,
356 agent_name: label_inner.clone(),
357 result_preview: format!("ERROR: {}", e),
358 duration_secs: elapsed,
359 }));
360 }
361 }
362 }
363 }));
364
365 if let Err(panic_info) = panic_result {
366 let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
367 s.to_string()
368 } else if let Some(s) = panic_info.downcast_ref::<String>() {
369 s.clone()
370 } else {
371 "unknown panic".to_string()
372 };
373 tracing::error!("Resumed subagent thread panicked: {}", msg);
374 state_t.write().unwrap().status = SubagentStatus::Failed(format!("panic: {}", msg));
375 }
376 });
377
378 let handle = SubagentHandle::new(
379 handle_id.clone(),
380 label.clone(),
381 task_preview,
382 model,
383 system_prompt_for_handle,
384 timeout_secs,
385 state,
386 Some(steer_tx),
387 Some(shutdown_tx),
388 Some(result_rx),
389 );
390
391 {
392 let mut reg = registry.lock().unwrap();
393 reg.register(handle);
394 if let Some(h) = reg.get_mut(&handle_id) {
395 h.set_thread_handle(thread_handle);
396 }
397 }
398
399 Ok(json!({
400 "handle_id": handle_id,
401 "resumed_from": prior_handle_id,
402 "agent_name": label,
403 "status": "running"
404 }).to_string())
405 }
406}