Skip to main content

bamboo_agent_core/tools/
result_handler.rs

1use std::collections::VecDeque;
2use std::sync::Arc;
3
4use tokio::sync::mpsc;
5
6use crate::composition::CompositionExecutor;
7use crate::tools::executor::execute_tool_call_with_context;
8use crate::tools::{
9    convert_from_standard_result, AgenticToolResult, ToolCall, ToolError, ToolExecutionContext,
10    ToolExecutor, ToolResult,
11};
12use crate::{AgentEvent, Message, Session};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ToolHandlingOutcome {
16    Continue,
17    AwaitingClarification,
18    WaitingForChildren,
19}
20
21fn is_waiting_for_children_control(result: &ToolResult) -> bool {
22    if result.display_preference.as_deref() == Some("runtime_control:waiting_for_children") {
23        return true;
24    }
25
26    result.result.trim_start().starts_with('{')
27        && serde_json::from_str::<serde_json::Value>(&result.result)
28            .ok()
29            .and_then(|value| value.get("runtime_control").cloned())
30            .and_then(|control| control.as_str().map(str::to_string))
31            .is_some_and(|control| control == "waiting_for_children")
32}
33
34pub const MAX_SUB_ACTIONS: usize = 64;
35
36pub fn parse_tool_args(arguments: &str) -> std::result::Result<serde_json::Value, ToolError> {
37    let args_raw = arguments.trim();
38
39    if args_raw.is_empty() {
40        return Ok(serde_json::json!({}));
41    }
42
43    serde_json::from_str(args_raw)
44        .map_err(|error| ToolError::InvalidArguments(format!("Invalid JSON arguments: {error}")))
45}
46
47fn trim_end_whitespace_in_place(value: &mut String) {
48    let trimmed_len = value.trim_end_matches(char::is_whitespace).len();
49    value.truncate(trimmed_len);
50}
51
52fn strip_trailing_commas_in_place(value: &mut String) {
53    loop {
54        trim_end_whitespace_in_place(value);
55        if value.ends_with(',') {
56            value.pop();
57            continue;
58        }
59        break;
60    }
61}
62
63fn preview_for_log(value: &str, max_chars: usize) -> String {
64    let mut iter = value.chars();
65    let mut preview = String::new();
66    for _ in 0..max_chars {
67        match iter.next() {
68            Some(ch) => preview.push(ch),
69            None => break,
70        }
71    }
72    if iter.next().is_some() {
73        preview.push_str("...");
74    }
75    preview.replace('\n', "\\n").replace('\r', "\\r")
76}
77
78fn attempt_repair_truncated_json(arguments: &str) -> Option<String> {
79    let args_raw = arguments.trim();
80    if args_raw.is_empty() {
81        return None;
82    }
83    if !args_raw.starts_with('{') && !args_raw.starts_with('[') {
84        return None;
85    }
86
87    let mut stack: Vec<char> = Vec::new();
88    let mut in_string = false;
89    let mut escaped = false;
90
91    for ch in args_raw.chars() {
92        if in_string {
93            if escaped {
94                escaped = false;
95                continue;
96            }
97            match ch {
98                '\\' => escaped = true,
99                '"' => in_string = false,
100                _ => {}
101            }
102            continue;
103        }
104
105        match ch {
106            '"' => in_string = true,
107            '{' => stack.push('}'),
108            '[' => stack.push(']'),
109            '}' | ']' => {
110                if stack.last().copied() == Some(ch) {
111                    stack.pop();
112                } else {
113                    return None;
114                }
115            }
116            _ => {}
117        }
118    }
119
120    if !in_string && stack.is_empty() {
121        return None;
122    }
123
124    let mut repaired = args_raw.to_string();
125    if in_string {
126        repaired.push('"');
127    }
128
129    while let Some(closing) = stack.pop() {
130        strip_trailing_commas_in_place(&mut repaired);
131        repaired.push(closing);
132    }
133
134    strip_trailing_commas_in_place(&mut repaired);
135    Some(repaired)
136}
137
138/// Parse tool args with graceful fallback:
139/// 1) strict JSON parse
140/// 2) attempt repair for truncated/incomplete JSON
141/// 3) fallback to empty object to keep the session alive
142pub fn parse_tool_args_best_effort(arguments: &str) -> (serde_json::Value, Option<String>) {
143    let args_raw = arguments.trim();
144    if args_raw.is_empty() {
145        return (serde_json::json!({}), None);
146    }
147
148    match serde_json::from_str::<serde_json::Value>(args_raw) {
149        Ok(parsed) => (parsed, None),
150        Err(primary_error) => {
151            if let Some(repaired_json) = attempt_repair_truncated_json(args_raw) {
152                match serde_json::from_str::<serde_json::Value>(&repaired_json) {
153                    Ok(parsed) => {
154                        let warning = format!(
155                            "Invalid JSON arguments recovered via auto-repair: original_error={}, repaired_preview=\"{}\"",
156                            primary_error,
157                            preview_for_log(&repaired_json, 180)
158                        );
159                        return (parsed, Some(warning));
160                    }
161                    Err(repair_error) => {
162                        let warning = format!(
163                            "Invalid JSON arguments: {} (auto-repair failed: {}); falling back to empty object",
164                            primary_error, repair_error
165                        );
166                        return (serde_json::json!({}), Some(warning));
167                    }
168                }
169            }
170
171            let warning = format!(
172                "Invalid JSON arguments: {}; falling back to empty object",
173                primary_error
174            );
175            (serde_json::json!({}), Some(warning))
176        }
177    }
178}
179
180pub fn try_parse_agentic_result(result: &ToolResult) -> Option<AgenticToolResult> {
181    if result.result.trim_start().starts_with('{') {
182        if let Ok(parsed) = serde_json::from_str::<AgenticToolResult>(&result.result) {
183            return Some(parsed);
184        }
185    }
186
187    match result.display_preference.as_deref() {
188        Some("clarification") | Some("actions_needed") => {
189            Some(convert_from_standard_result(result.clone()))
190        }
191        _ => None,
192    }
193}
194
195pub async fn handle_tool_result_with_agentic_support(
196    result: &ToolResult,
197    tool_call: &ToolCall,
198    event_tx: &mpsc::Sender<AgentEvent>,
199    session: &mut Session,
200    tools: &dyn ToolExecutor,
201    composition_executor: Option<Arc<CompositionExecutor>>,
202) -> ToolHandlingOutcome {
203    let should_wait_for_children = is_waiting_for_children_control(result);
204    if should_wait_for_children {
205        session.metadata.insert(
206            "runtime.suspend_reason".to_string(),
207            "waiting_for_children".to_string(),
208        );
209    }
210    let Some(agentic_result) = try_parse_agentic_result(result) else {
211        session.add_message(Message::tool_result_with_status(
212            tool_call.id.clone(),
213            result.result.clone(),
214            result.success,
215        ));
216        return if should_wait_for_children {
217            ToolHandlingOutcome::WaitingForChildren
218        } else {
219            ToolHandlingOutcome::Continue
220        };
221    };
222
223    match agentic_result {
224        AgenticToolResult::Success { result } => {
225            session.add_message(Message::tool_result(tool_call.id.clone(), result));
226            if should_wait_for_children {
227                ToolHandlingOutcome::WaitingForChildren
228            } else {
229                ToolHandlingOutcome::Continue
230            }
231        }
232        AgenticToolResult::Error { error } => {
233            let _ = event_tx
234                .send(AgentEvent::ToolError {
235                    tool_call_id: tool_call.id.clone(),
236                    error: error.clone(),
237                })
238                .await;
239
240            session.add_message(Message::tool_result_with_status(
241                tool_call.id.clone(),
242                format!("Error: {error}"),
243                false,
244            ));
245
246            ToolHandlingOutcome::Continue
247        }
248        AgenticToolResult::NeedClarification { question, options } => {
249            send_clarification_request(event_tx, question.clone(), options).await;
250
251            session.add_message(Message::tool_result(
252                tool_call.id.clone(),
253                format!("Clarification needed: {question}"),
254            ));
255
256            ToolHandlingOutcome::AwaitingClarification
257        }
258        AgenticToolResult::NeedMoreActions { actions, reason } => {
259            session.add_message(Message::tool_result(
260                tool_call.id.clone(),
261                format!(
262                    "Need more actions: {reason} ({} actions pending)",
263                    actions.len()
264                ),
265            ));
266
267            execute_sub_actions(&actions, event_tx, session, tools, composition_executor).await
268        }
269    }
270}
271
272pub async fn send_clarification_request(
273    event_tx: &mpsc::Sender<AgentEvent>,
274    question: String,
275    options: Option<Vec<String>>,
276) {
277    let _ = event_tx
278        .send(AgentEvent::NeedClarification {
279            question,
280            options,
281            tool_call_id: None,
282            allow_custom: true,
283        })
284        .await;
285}
286
287pub async fn execute_sub_actions(
288    actions: &[ToolCall],
289    event_tx: &mpsc::Sender<AgentEvent>,
290    session: &mut Session,
291    tools: &dyn ToolExecutor,
292    composition_executor: Option<Arc<CompositionExecutor>>,
293) -> ToolHandlingOutcome {
294    let mut pending: VecDeque<ToolCall> = actions.iter().cloned().collect();
295    let mut processed = 0usize;
296    let available_tools = tools.list_tools();
297
298    while let Some(action) = pending.pop_front() {
299        if processed >= MAX_SUB_ACTIONS {
300            let error = format!("Reached max sub-action limit ({MAX_SUB_ACTIONS})");
301            let _ = event_tx
302                .send(AgentEvent::ToolError {
303                    tool_call_id: action.id.clone(),
304                    error: error.clone(),
305                })
306                .await;
307            session.add_message(Message::tool_result_with_status(
308                action.id.clone(),
309                error,
310                false,
311            ));
312            return ToolHandlingOutcome::Continue;
313        }
314
315        processed += 1;
316
317        let args =
318            parse_tool_args(&action.function.arguments).unwrap_or_else(|_| serde_json::json!({}));
319
320        let _ = event_tx
321            .send(AgentEvent::ToolStart {
322                tool_call_id: action.id.clone(),
323                tool_name: action.function.name.clone(),
324                arguments: args,
325            })
326            .await;
327
328        let tool_ctx = ToolExecutionContext {
329            session_id: Some(&session.id),
330            tool_call_id: &action.id,
331            event_tx: Some(event_tx),
332            available_tool_schemas: Some(available_tools.as_slice()),
333        };
334
335        match execute_tool_call_with_context(&action, tools, composition_executor.clone(), tool_ctx)
336            .await
337        {
338            Ok(result) => {
339                let _ = event_tx
340                    .send(AgentEvent::ToolComplete {
341                        tool_call_id: action.id.clone(),
342                        result: result.clone(),
343                    })
344                    .await;
345
346                match try_parse_agentic_result(&result) {
347                    Some(AgenticToolResult::Success { result }) => {
348                        session.add_message(Message::tool_result(action.id.clone(), result));
349                    }
350                    Some(AgenticToolResult::Error { error }) => {
351                        let _ = event_tx
352                            .send(AgentEvent::ToolError {
353                                tool_call_id: action.id.clone(),
354                                error: error.clone(),
355                            })
356                            .await;
357                        session.add_message(Message::tool_result_with_status(
358                            action.id.clone(),
359                            format!("Error: {error}"),
360                            false,
361                        ));
362                    }
363                    Some(AgenticToolResult::NeedClarification { question, options }) => {
364                        send_clarification_request(event_tx, question.clone(), options).await;
365                        session.add_message(Message::tool_result(
366                            action.id.clone(),
367                            format!("Clarification needed: {question}"),
368                        ));
369                        return ToolHandlingOutcome::AwaitingClarification;
370                    }
371                    Some(AgenticToolResult::NeedMoreActions {
372                        actions: next_actions,
373                        reason,
374                    }) => {
375                        session.add_message(Message::tool_result(
376                            action.id.clone(),
377                            format!(
378                                "Need more actions: {reason} ({} actions pending)",
379                                next_actions.len()
380                            ),
381                        ));
382                        pending.extend(next_actions);
383                    }
384                    None => {
385                        session.add_message(Message::tool_result_with_status(
386                            action.id.clone(),
387                            result.result.clone(),
388                            result.success,
389                        ));
390                    }
391                }
392            }
393            Err(error) => {
394                let error_msg = error.to_string();
395                let _ = event_tx
396                    .send(AgentEvent::ToolError {
397                        tool_call_id: action.id.clone(),
398                        error: error_msg.clone(),
399                    })
400                    .await;
401                session.add_message(Message::tool_result_with_status(
402                    action.id.clone(),
403                    format!("Error: {error_msg}"),
404                    false,
405                ));
406            }
407        }
408    }
409
410    ToolHandlingOutcome::Continue
411}
412
413#[cfg(test)]
414mod tests {
415    use async_trait::async_trait;
416    use std::collections::HashMap;
417    use std::sync::Arc;
418    use tokio::sync::mpsc;
419
420    use crate::tools::{FunctionCall, ToolSchema};
421
422    use super::*;
423
424    struct StaticExecutor {
425        results: HashMap<String, ToolResult>,
426    }
427
428    impl StaticExecutor {
429        fn new(results: HashMap<String, ToolResult>) -> Self {
430            Self { results }
431        }
432    }
433
434    #[async_trait]
435    impl ToolExecutor for StaticExecutor {
436        async fn execute(&self, call: &ToolCall) -> crate::tools::executor::Result<ToolResult> {
437            self.results
438                .get(&call.function.name)
439                .cloned()
440                .ok_or_else(|| ToolError::NotFound(call.function.name.clone()))
441        }
442
443        fn list_tools(&self) -> Vec<ToolSchema> {
444            Vec::new()
445        }
446    }
447
448    fn make_tool_call(id: &str, name: &str, arguments: &str) -> ToolCall {
449        ToolCall {
450            id: id.to_string(),
451            tool_type: "function".to_string(),
452            function: FunctionCall {
453                name: name.to_string(),
454                arguments: arguments.to_string(),
455            },
456        }
457    }
458
459    #[tokio::test]
460    async fn need_clarification_sends_event() {
461        let (event_tx, mut event_rx) = mpsc::channel(8);
462        let tools: Arc<dyn ToolExecutor> = Arc::new(StaticExecutor::new(HashMap::new()));
463        let mut session = Session::new("s1", "test-model");
464        let tool_call = make_tool_call("call_parent", "smart_tool", "{}");
465
466        let result = ToolResult {
467            success: true,
468            result: serde_json::to_string(&AgenticToolResult::NeedClarification {
469                question: "Which file should I inspect?".to_string(),
470                options: Some(vec!["src/main.rs".to_string(), "src/lib.rs".to_string()]),
471            })
472            .unwrap(),
473            display_preference: None,
474        };
475
476        let outcome = handle_tool_result_with_agentic_support(
477            &result,
478            &tool_call,
479            &event_tx,
480            &mut session,
481            tools.as_ref(),
482            None,
483        )
484        .await;
485
486        assert_eq!(outcome, ToolHandlingOutcome::AwaitingClarification);
487
488        let event = event_rx.recv().await.expect("missing clarification event");
489        match event {
490            AgentEvent::NeedClarification {
491                question, options, ..
492            } => {
493                assert_eq!(question, "Which file should I inspect?");
494                assert_eq!(
495                    options,
496                    Some(vec!["src/main.rs".to_string(), "src/lib.rs".to_string()])
497                );
498            }
499            other => panic!("unexpected event: {other:?}"),
500        }
501    }
502
503    #[tokio::test]
504    async fn need_more_actions_executes_sub_actions() {
505        let (event_tx, mut event_rx) = mpsc::channel(16);
506        let sub_action = make_tool_call("call_sub", "sub_tool", "{}");
507        let parent_call = make_tool_call("call_parent", "smart_tool", "{}");
508
509        let mut results = HashMap::new();
510        results.insert(
511            "sub_tool".to_string(),
512            ToolResult {
513                success: true,
514                result: "sub-action-done".to_string(),
515                display_preference: None,
516            },
517        );
518        let tools: Arc<dyn ToolExecutor> = Arc::new(StaticExecutor::new(results));
519        let mut session = Session::new("s2", "test-model");
520
521        let result = ToolResult {
522            success: true,
523            result: serde_json::to_string(&AgenticToolResult::NeedMoreActions {
524                actions: vec![sub_action],
525                reason: "Need workspace context".to_string(),
526            })
527            .unwrap(),
528            display_preference: None,
529        };
530
531        let outcome = handle_tool_result_with_agentic_support(
532            &result,
533            &parent_call,
534            &event_tx,
535            &mut session,
536            tools.as_ref(),
537            None,
538        )
539        .await;
540
541        assert_eq!(outcome, ToolHandlingOutcome::Continue);
542        assert!(session
543            .messages
544            .iter()
545            .any(
546                |message| message.tool_call_id.as_deref() == Some("call_sub")
547                    && message.content == "sub-action-done"
548            ));
549
550        let mut saw_sub_start = false;
551        let mut saw_sub_complete = false;
552
553        while let Ok(event) = event_rx.try_recv() {
554            match event {
555                AgentEvent::ToolStart { tool_call_id, .. } if tool_call_id == "call_sub" => {
556                    saw_sub_start = true;
557                }
558                AgentEvent::ToolComplete { tool_call_id, .. } if tool_call_id == "call_sub" => {
559                    saw_sub_complete = true;
560                }
561                _ => {}
562            }
563        }
564
565        assert!(saw_sub_start);
566        assert!(saw_sub_complete);
567    }
568
569    #[test]
570    fn parse_tool_args_rejects_invalid_json() {
571        let error = parse_tool_args("not-json").expect_err("invalid json should fail");
572        assert!(matches!(error, ToolError::InvalidArguments(_)));
573    }
574
575    #[test]
576    fn parse_tool_args_best_effort_repairs_truncated_json() {
577        let (parsed, warning) = parse_tool_args_best_effort(r#"{"path":"README.md""#);
578
579        assert_eq!(
580            parsed.get("path").and_then(|v| v.as_str()),
581            Some("README.md")
582        );
583        assert!(warning.is_some());
584    }
585
586    #[test]
587    fn parse_tool_args_best_effort_falls_back_to_empty_object() {
588        let (parsed, warning) = parse_tool_args_best_effort("not-json");
589
590        assert_eq!(parsed, serde_json::json!({}));
591        assert!(warning.is_some());
592    }
593}