Skip to main content

construct/agent/
tool_execution.rs

1//! Tool execution helpers extracted from `loop_`.
2//!
3//! Contains the functions responsible for invoking tools (single, parallel,
4//! sequential) and the decision logic for choosing between parallel and
5//! sequential execution.
6
7use anyhow::Result;
8use std::time::{Duration, Instant};
9use tokio_util::sync::CancellationToken;
10
11use crate::approval::ApprovalManager;
12use crate::observability::{Observer, ObserverEvent};
13use crate::tools::Tool;
14use crate::util::truncate_with_ellipsis;
15
16// Items that still live in `loop_` — import via the parent module.
17use super::loop_::{ParsedToolCall, ToolLoopCancelled, scrub_credentials};
18
19// ── Helpers ──────────────────────────────────────────────────────────────
20
21/// Look up a tool by name in a slice of boxed `dyn Tool` values.
22pub(crate) fn find_tool<'a>(tools: &'a [Box<dyn Tool>], name: &str) -> Option<&'a dyn Tool> {
23    tools.iter().find(|t| t.name() == name).map(|t| t.as_ref())
24}
25
26// ── Outcome ──────────────────────────────────────────────────────────────
27
28pub(crate) struct ToolExecutionOutcome {
29    pub(crate) output: String,
30    pub(crate) success: bool,
31    pub(crate) error_reason: Option<String>,
32    pub(crate) duration: Duration,
33}
34
35// ── Single tool execution ────────────────────────────────────────────────
36
37pub(crate) async fn execute_one_tool(
38    call_name: &str,
39    call_arguments: serde_json::Value,
40    tools_registry: &[Box<dyn Tool>],
41    activated_tools: Option<&std::sync::Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
42    observer: &dyn Observer,
43    cancellation_token: Option<&CancellationToken>,
44) -> Result<ToolExecutionOutcome> {
45    let args_summary = truncate_with_ellipsis(&call_arguments.to_string(), 300);
46    observer.record_event(&ObserverEvent::ToolCallStart {
47        tool: call_name.to_string(),
48        arguments: Some(args_summary),
49    });
50    let start = Instant::now();
51
52    let static_tool = find_tool(tools_registry, call_name);
53    let activated_arc = if static_tool.is_none() {
54        activated_tools.and_then(|at| at.lock().unwrap().get_resolved(call_name))
55    } else {
56        None
57    };
58    let Some(tool) = static_tool.or(activated_arc.as_deref()) else {
59        let reason = format!("Unknown tool: {call_name}");
60        let duration = start.elapsed();
61        observer.record_event(&ObserverEvent::ToolCall {
62            tool: call_name.to_string(),
63            duration,
64            success: false,
65        });
66        return Ok(ToolExecutionOutcome {
67            output: reason.clone(),
68            success: false,
69            error_reason: Some(scrub_credentials(&reason)),
70            duration,
71        });
72    };
73
74    let tool_future = tool.execute(call_arguments);
75    let tool_result = if let Some(token) = cancellation_token {
76        tokio::select! {
77            () = token.cancelled() => return Err(ToolLoopCancelled.into()),
78            result = tool_future => result,
79        }
80    } else {
81        tool_future.await
82    };
83
84    match tool_result {
85        Ok(r) => {
86            let duration = start.elapsed();
87            observer.record_event(&ObserverEvent::ToolCall {
88                tool: call_name.to_string(),
89                duration,
90                success: r.success,
91            });
92            if r.success {
93                Ok(ToolExecutionOutcome {
94                    output: scrub_credentials(&r.output),
95                    success: true,
96                    error_reason: None,
97                    duration,
98                })
99            } else {
100                let reason = r.error.unwrap_or(r.output);
101                Ok(ToolExecutionOutcome {
102                    output: format!("Error: {reason}"),
103                    success: false,
104                    error_reason: Some(scrub_credentials(&reason)),
105                    duration,
106                })
107            }
108        }
109        Err(e) => {
110            let duration = start.elapsed();
111            observer.record_event(&ObserverEvent::ToolCall {
112                tool: call_name.to_string(),
113                duration,
114                success: false,
115            });
116            let reason = format!("Error executing {call_name}: {e}");
117            Ok(ToolExecutionOutcome {
118                output: reason.clone(),
119                success: false,
120                error_reason: Some(scrub_credentials(&reason)),
121                duration,
122            })
123        }
124    }
125}
126
127// ── Parallel / sequential decision ───────────────────────────────────────
128
129pub(crate) fn should_execute_tools_in_parallel(
130    tool_calls: &[ParsedToolCall],
131    approval: Option<&ApprovalManager>,
132) -> bool {
133    if tool_calls.len() <= 1 {
134        return false;
135    }
136
137    // tool_search activates deferred MCP tools into ActivatedToolSet.
138    // Running tool_search in parallel with the tools it activates causes a
139    // race condition where the tool lookup happens before activation completes.
140    // Force sequential execution whenever tool_search is in the batch.
141    if tool_calls.iter().any(|call| call.name == "tool_search") {
142        return false;
143    }
144
145    if let Some(mgr) = approval {
146        if tool_calls.iter().any(|call| mgr.needs_approval(&call.name)) {
147            // Approval-gated calls must keep sequential handling so the caller can
148            // enforce CLI prompt/deny policy consistently.
149            return false;
150        }
151    }
152
153    true
154}
155
156// ── Parallel execution ───────────────────────────────────────────────────
157
158pub(crate) async fn execute_tools_parallel(
159    tool_calls: &[ParsedToolCall],
160    tools_registry: &[Box<dyn Tool>],
161    activated_tools: Option<&std::sync::Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
162    observer: &dyn Observer,
163    cancellation_token: Option<&CancellationToken>,
164) -> Result<Vec<ToolExecutionOutcome>> {
165    let futures: Vec<_> = tool_calls
166        .iter()
167        .map(|call| {
168            execute_one_tool(
169                &call.name,
170                call.arguments.clone(),
171                tools_registry,
172                activated_tools,
173                observer,
174                cancellation_token,
175            )
176        })
177        .collect();
178
179    let results = futures_util::future::join_all(futures).await;
180    results.into_iter().collect()
181}
182
183// ── Sequential execution ─────────────────────────────────────────────────
184
185pub(crate) async fn execute_tools_sequential(
186    tool_calls: &[ParsedToolCall],
187    tools_registry: &[Box<dyn Tool>],
188    activated_tools: Option<&std::sync::Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
189    observer: &dyn Observer,
190    cancellation_token: Option<&CancellationToken>,
191) -> Result<Vec<ToolExecutionOutcome>> {
192    let mut outcomes = Vec::with_capacity(tool_calls.len());
193
194    for call in tool_calls {
195        outcomes.push(
196            execute_one_tool(
197                &call.name,
198                call.arguments.clone(),
199                tools_registry,
200                activated_tools,
201                observer,
202                cancellation_token,
203            )
204            .await?,
205        );
206    }
207
208    Ok(outcomes)
209}