1use std::collections::VecDeque;
2use std::sync::Arc;
3
4use tokio::sync::mpsc;
5
6use crate::composition::CompositionExecutor;
7use crate::tools::executor::execute_tool_call_with_context;
8use crate::tools::{
9 convert_from_standard_result, AgenticToolResult, ToolCall, ToolError, ToolExecutionContext,
10 ToolExecutor, ToolResult,
11};
12use crate::{AgentEvent, Message, Session};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ToolHandlingOutcome {
16 Continue,
17 AwaitingClarification,
18 WaitingForChildren,
19}
20
21fn is_waiting_for_children_control(result: &ToolResult) -> bool {
22 if result.display_preference.as_deref() == Some("runtime_control:waiting_for_children") {
23 return true;
24 }
25
26 result.result.trim_start().starts_with('{')
27 && serde_json::from_str::<serde_json::Value>(&result.result)
28 .ok()
29 .and_then(|value| value.get("runtime_control").cloned())
30 .and_then(|control| control.as_str().map(str::to_string))
31 .is_some_and(|control| control == "waiting_for_children")
32}
33
34pub const MAX_SUB_ACTIONS: usize = 64;
35
36pub fn parse_tool_args(arguments: &str) -> std::result::Result<serde_json::Value, ToolError> {
37 let args_raw = arguments.trim();
38
39 if args_raw.is_empty() {
40 return Ok(serde_json::json!({}));
41 }
42
43 serde_json::from_str(args_raw)
44 .map_err(|error| ToolError::InvalidArguments(format!("Invalid JSON arguments: {error}")))
45}
46
47fn trim_end_whitespace_in_place(value: &mut String) {
48 let trimmed_len = value.trim_end_matches(char::is_whitespace).len();
49 value.truncate(trimmed_len);
50}
51
52fn strip_trailing_commas_in_place(value: &mut String) {
53 loop {
54 trim_end_whitespace_in_place(value);
55 if value.ends_with(',') {
56 value.pop();
57 continue;
58 }
59 break;
60 }
61}
62
63fn preview_for_log(value: &str, max_chars: usize) -> String {
64 let mut iter = value.chars();
65 let mut preview = String::new();
66 for _ in 0..max_chars {
67 match iter.next() {
68 Some(ch) => preview.push(ch),
69 None => break,
70 }
71 }
72 if iter.next().is_some() {
73 preview.push_str("...");
74 }
75 preview.replace('\n', "\\n").replace('\r', "\\r")
76}
77
78fn attempt_repair_truncated_json(arguments: &str) -> Option<String> {
79 let args_raw = arguments.trim();
80 if args_raw.is_empty() {
81 return None;
82 }
83 if !args_raw.starts_with('{') && !args_raw.starts_with('[') {
84 return None;
85 }
86
87 let mut stack: Vec<char> = Vec::new();
88 let mut in_string = false;
89 let mut escaped = false;
90
91 for ch in args_raw.chars() {
92 if in_string {
93 if escaped {
94 escaped = false;
95 continue;
96 }
97 match ch {
98 '\\' => escaped = true,
99 '"' => in_string = false,
100 _ => {}
101 }
102 continue;
103 }
104
105 match ch {
106 '"' => in_string = true,
107 '{' => stack.push('}'),
108 '[' => stack.push(']'),
109 '}' | ']' => {
110 if stack.last().copied() == Some(ch) {
111 stack.pop();
112 } else {
113 return None;
114 }
115 }
116 _ => {}
117 }
118 }
119
120 if !in_string && stack.is_empty() {
121 return None;
122 }
123
124 let mut repaired = args_raw.to_string();
125 if in_string {
126 repaired.push('"');
127 }
128
129 while let Some(closing) = stack.pop() {
130 strip_trailing_commas_in_place(&mut repaired);
131 repaired.push(closing);
132 }
133
134 strip_trailing_commas_in_place(&mut repaired);
135 Some(repaired)
136}
137
138pub fn parse_tool_args_best_effort(arguments: &str) -> (serde_json::Value, Option<String>) {
143 let args_raw = arguments.trim();
144 if args_raw.is_empty() {
145 return (serde_json::json!({}), None);
146 }
147
148 match serde_json::from_str::<serde_json::Value>(args_raw) {
149 Ok(parsed) => (parsed, None),
150 Err(primary_error) => {
151 if let Some(repaired_json) = attempt_repair_truncated_json(args_raw) {
152 match serde_json::from_str::<serde_json::Value>(&repaired_json) {
153 Ok(parsed) => {
154 let warning = format!(
155 "Invalid JSON arguments recovered via auto-repair: original_error={}, repaired_preview=\"{}\"",
156 primary_error,
157 preview_for_log(&repaired_json, 180)
158 );
159 return (parsed, Some(warning));
160 }
161 Err(repair_error) => {
162 let warning = format!(
163 "Invalid JSON arguments: {} (auto-repair failed: {}); falling back to empty object",
164 primary_error, repair_error
165 );
166 return (serde_json::json!({}), Some(warning));
167 }
168 }
169 }
170
171 let warning = format!(
172 "Invalid JSON arguments: {}; falling back to empty object",
173 primary_error
174 );
175 (serde_json::json!({}), Some(warning))
176 }
177 }
178}
179
180pub fn try_parse_agentic_result(result: &ToolResult) -> Option<AgenticToolResult> {
181 if result.result.trim_start().starts_with('{') {
182 if let Ok(parsed) = serde_json::from_str::<AgenticToolResult>(&result.result) {
183 return Some(parsed);
184 }
185 }
186
187 match result.display_preference.as_deref() {
188 Some("clarification") | Some("actions_needed") => {
189 Some(convert_from_standard_result(result.clone()))
190 }
191 _ => None,
192 }
193}
194
195pub async fn handle_tool_result_with_agentic_support(
196 result: &ToolResult,
197 tool_call: &ToolCall,
198 event_tx: &mpsc::Sender<AgentEvent>,
199 session: &mut Session,
200 tools: &dyn ToolExecutor,
201 composition_executor: Option<Arc<CompositionExecutor>>,
202) -> ToolHandlingOutcome {
203 let should_wait_for_children = is_waiting_for_children_control(result);
204 if should_wait_for_children {
205 session.metadata.insert(
206 "runtime.suspend_reason".to_string(),
207 "waiting_for_children".to_string(),
208 );
209 }
210 let Some(agentic_result) = try_parse_agentic_result(result) else {
211 session.add_message(Message::tool_result_with_status(
212 tool_call.id.clone(),
213 result.result.clone(),
214 result.success,
215 ));
216 return if should_wait_for_children {
217 ToolHandlingOutcome::WaitingForChildren
218 } else {
219 ToolHandlingOutcome::Continue
220 };
221 };
222
223 match agentic_result {
224 AgenticToolResult::Success { result } => {
225 session.add_message(Message::tool_result(tool_call.id.clone(), result));
226 if should_wait_for_children {
227 ToolHandlingOutcome::WaitingForChildren
228 } else {
229 ToolHandlingOutcome::Continue
230 }
231 }
232 AgenticToolResult::Error { error } => {
233 let _ = event_tx
234 .send(AgentEvent::ToolError {
235 tool_call_id: tool_call.id.clone(),
236 error: error.clone(),
237 })
238 .await;
239
240 session.add_message(Message::tool_result_with_status(
241 tool_call.id.clone(),
242 format!("Error: {error}"),
243 false,
244 ));
245
246 ToolHandlingOutcome::Continue
247 }
248 AgenticToolResult::NeedClarification { question, options } => {
249 send_clarification_request(event_tx, question.clone(), options).await;
250
251 session.add_message(Message::tool_result(
252 tool_call.id.clone(),
253 format!("Clarification needed: {question}"),
254 ));
255
256 ToolHandlingOutcome::AwaitingClarification
257 }
258 AgenticToolResult::NeedMoreActions { actions, reason } => {
259 session.add_message(Message::tool_result(
260 tool_call.id.clone(),
261 format!(
262 "Need more actions: {reason} ({} actions pending)",
263 actions.len()
264 ),
265 ));
266
267 execute_sub_actions(&actions, event_tx, session, tools, composition_executor).await
268 }
269 }
270}
271
272pub async fn send_clarification_request(
273 event_tx: &mpsc::Sender<AgentEvent>,
274 question: String,
275 options: Option<Vec<String>>,
276) {
277 let _ = event_tx
278 .send(AgentEvent::NeedClarification {
279 question,
280 options,
281 tool_call_id: None,
282 allow_custom: true,
283 })
284 .await;
285}
286
287pub async fn execute_sub_actions(
288 actions: &[ToolCall],
289 event_tx: &mpsc::Sender<AgentEvent>,
290 session: &mut Session,
291 tools: &dyn ToolExecutor,
292 composition_executor: Option<Arc<CompositionExecutor>>,
293) -> ToolHandlingOutcome {
294 let mut pending: VecDeque<ToolCall> = actions.iter().cloned().collect();
295 let mut processed = 0usize;
296 let available_tools = tools.list_tools();
297
298 while let Some(action) = pending.pop_front() {
299 if processed >= MAX_SUB_ACTIONS {
300 let error = format!("Reached max sub-action limit ({MAX_SUB_ACTIONS})");
301 let _ = event_tx
302 .send(AgentEvent::ToolError {
303 tool_call_id: action.id.clone(),
304 error: error.clone(),
305 })
306 .await;
307 session.add_message(Message::tool_result_with_status(
308 action.id.clone(),
309 error,
310 false,
311 ));
312 return ToolHandlingOutcome::Continue;
313 }
314
315 processed += 1;
316
317 let args =
318 parse_tool_args(&action.function.arguments).unwrap_or_else(|_| serde_json::json!({}));
319
320 let _ = event_tx
321 .send(AgentEvent::ToolStart {
322 tool_call_id: action.id.clone(),
323 tool_name: action.function.name.clone(),
324 arguments: args,
325 })
326 .await;
327
328 let tool_ctx = ToolExecutionContext {
329 session_id: Some(&session.id),
330 tool_call_id: &action.id,
331 event_tx: Some(event_tx),
332 available_tool_schemas: Some(available_tools.as_slice()),
333 };
334
335 match execute_tool_call_with_context(&action, tools, composition_executor.clone(), tool_ctx)
336 .await
337 {
338 Ok(result) => {
339 let _ = event_tx
340 .send(AgentEvent::ToolComplete {
341 tool_call_id: action.id.clone(),
342 result: result.clone(),
343 })
344 .await;
345
346 match try_parse_agentic_result(&result) {
347 Some(AgenticToolResult::Success { result }) => {
348 session.add_message(Message::tool_result(action.id.clone(), result));
349 }
350 Some(AgenticToolResult::Error { error }) => {
351 let _ = event_tx
352 .send(AgentEvent::ToolError {
353 tool_call_id: action.id.clone(),
354 error: error.clone(),
355 })
356 .await;
357 session.add_message(Message::tool_result_with_status(
358 action.id.clone(),
359 format!("Error: {error}"),
360 false,
361 ));
362 }
363 Some(AgenticToolResult::NeedClarification { question, options }) => {
364 send_clarification_request(event_tx, question.clone(), options).await;
365 session.add_message(Message::tool_result(
366 action.id.clone(),
367 format!("Clarification needed: {question}"),
368 ));
369 return ToolHandlingOutcome::AwaitingClarification;
370 }
371 Some(AgenticToolResult::NeedMoreActions {
372 actions: next_actions,
373 reason,
374 }) => {
375 session.add_message(Message::tool_result(
376 action.id.clone(),
377 format!(
378 "Need more actions: {reason} ({} actions pending)",
379 next_actions.len()
380 ),
381 ));
382 pending.extend(next_actions);
383 }
384 None => {
385 session.add_message(Message::tool_result_with_status(
386 action.id.clone(),
387 result.result.clone(),
388 result.success,
389 ));
390 }
391 }
392 }
393 Err(error) => {
394 let error_msg = error.to_string();
395 let _ = event_tx
396 .send(AgentEvent::ToolError {
397 tool_call_id: action.id.clone(),
398 error: error_msg.clone(),
399 })
400 .await;
401 session.add_message(Message::tool_result_with_status(
402 action.id.clone(),
403 format!("Error: {error_msg}"),
404 false,
405 ));
406 }
407 }
408 }
409
410 ToolHandlingOutcome::Continue
411}
412
413#[cfg(test)]
414mod tests {
415 use async_trait::async_trait;
416 use std::collections::HashMap;
417 use std::sync::Arc;
418 use tokio::sync::mpsc;
419
420 use crate::tools::{FunctionCall, ToolSchema};
421
422 use super::*;
423
424 struct StaticExecutor {
425 results: HashMap<String, ToolResult>,
426 }
427
428 impl StaticExecutor {
429 fn new(results: HashMap<String, ToolResult>) -> Self {
430 Self { results }
431 }
432 }
433
434 #[async_trait]
435 impl ToolExecutor for StaticExecutor {
436 async fn execute(&self, call: &ToolCall) -> crate::tools::executor::Result<ToolResult> {
437 self.results
438 .get(&call.function.name)
439 .cloned()
440 .ok_or_else(|| ToolError::NotFound(call.function.name.clone()))
441 }
442
443 fn list_tools(&self) -> Vec<ToolSchema> {
444 Vec::new()
445 }
446 }
447
448 fn make_tool_call(id: &str, name: &str, arguments: &str) -> ToolCall {
449 ToolCall {
450 id: id.to_string(),
451 tool_type: "function".to_string(),
452 function: FunctionCall {
453 name: name.to_string(),
454 arguments: arguments.to_string(),
455 },
456 }
457 }
458
459 #[tokio::test]
460 async fn need_clarification_sends_event() {
461 let (event_tx, mut event_rx) = mpsc::channel(8);
462 let tools: Arc<dyn ToolExecutor> = Arc::new(StaticExecutor::new(HashMap::new()));
463 let mut session = Session::new("s1", "test-model");
464 let tool_call = make_tool_call("call_parent", "smart_tool", "{}");
465
466 let result = ToolResult {
467 success: true,
468 result: serde_json::to_string(&AgenticToolResult::NeedClarification {
469 question: "Which file should I inspect?".to_string(),
470 options: Some(vec!["src/main.rs".to_string(), "src/lib.rs".to_string()]),
471 })
472 .unwrap(),
473 display_preference: None,
474 };
475
476 let outcome = handle_tool_result_with_agentic_support(
477 &result,
478 &tool_call,
479 &event_tx,
480 &mut session,
481 tools.as_ref(),
482 None,
483 )
484 .await;
485
486 assert_eq!(outcome, ToolHandlingOutcome::AwaitingClarification);
487
488 let event = event_rx.recv().await.expect("missing clarification event");
489 match event {
490 AgentEvent::NeedClarification {
491 question, options, ..
492 } => {
493 assert_eq!(question, "Which file should I inspect?");
494 assert_eq!(
495 options,
496 Some(vec!["src/main.rs".to_string(), "src/lib.rs".to_string()])
497 );
498 }
499 other => panic!("unexpected event: {other:?}"),
500 }
501 }
502
503 #[tokio::test]
504 async fn need_more_actions_executes_sub_actions() {
505 let (event_tx, mut event_rx) = mpsc::channel(16);
506 let sub_action = make_tool_call("call_sub", "sub_tool", "{}");
507 let parent_call = make_tool_call("call_parent", "smart_tool", "{}");
508
509 let mut results = HashMap::new();
510 results.insert(
511 "sub_tool".to_string(),
512 ToolResult {
513 success: true,
514 result: "sub-action-done".to_string(),
515 display_preference: None,
516 },
517 );
518 let tools: Arc<dyn ToolExecutor> = Arc::new(StaticExecutor::new(results));
519 let mut session = Session::new("s2", "test-model");
520
521 let result = ToolResult {
522 success: true,
523 result: serde_json::to_string(&AgenticToolResult::NeedMoreActions {
524 actions: vec![sub_action],
525 reason: "Need workspace context".to_string(),
526 })
527 .unwrap(),
528 display_preference: None,
529 };
530
531 let outcome = handle_tool_result_with_agentic_support(
532 &result,
533 &parent_call,
534 &event_tx,
535 &mut session,
536 tools.as_ref(),
537 None,
538 )
539 .await;
540
541 assert_eq!(outcome, ToolHandlingOutcome::Continue);
542 assert!(session
543 .messages
544 .iter()
545 .any(
546 |message| message.tool_call_id.as_deref() == Some("call_sub")
547 && message.content == "sub-action-done"
548 ));
549
550 let mut saw_sub_start = false;
551 let mut saw_sub_complete = false;
552
553 while let Ok(event) = event_rx.try_recv() {
554 match event {
555 AgentEvent::ToolStart { tool_call_id, .. } if tool_call_id == "call_sub" => {
556 saw_sub_start = true;
557 }
558 AgentEvent::ToolComplete { tool_call_id, .. } if tool_call_id == "call_sub" => {
559 saw_sub_complete = true;
560 }
561 _ => {}
562 }
563 }
564
565 assert!(saw_sub_start);
566 assert!(saw_sub_complete);
567 }
568
569 #[test]
570 fn parse_tool_args_rejects_invalid_json() {
571 let error = parse_tool_args("not-json").expect_err("invalid json should fail");
572 assert!(matches!(error, ToolError::InvalidArguments(_)));
573 }
574
575 #[test]
576 fn parse_tool_args_best_effort_repairs_truncated_json() {
577 let (parsed, warning) = parse_tool_args_best_effort(r#"{"path":"README.md""#);
578
579 assert_eq!(
580 parsed.get("path").and_then(|v| v.as_str()),
581 Some("README.md")
582 );
583 assert!(warning.is_some());
584 }
585
586 #[test]
587 fn parse_tool_args_best_effort_falls_back_to_empty_object() {
588 let (parsed, warning) = parse_tool_args_best_effort("not-json");
589
590 assert_eq!(parsed, serde_json::json!({}));
591 assert!(warning.is_some());
592 }
593}