construct/agent/
tool_execution.rs1use anyhow::Result;
8use std::time::{Duration, Instant};
9use tokio_util::sync::CancellationToken;
10
11use crate::approval::ApprovalManager;
12use crate::observability::{Observer, ObserverEvent};
13use crate::tools::Tool;
14use crate::util::truncate_with_ellipsis;
15
16use super::loop_::{ParsedToolCall, ToolLoopCancelled, scrub_credentials};
18
19pub(crate) fn find_tool<'a>(tools: &'a [Box<dyn Tool>], name: &str) -> Option<&'a dyn Tool> {
23 tools.iter().find(|t| t.name() == name).map(|t| t.as_ref())
24}
25
26pub(crate) struct ToolExecutionOutcome {
29 pub(crate) output: String,
30 pub(crate) success: bool,
31 pub(crate) error_reason: Option<String>,
32 pub(crate) duration: Duration,
33}
34
35pub(crate) async fn execute_one_tool(
38 call_name: &str,
39 call_arguments: serde_json::Value,
40 tools_registry: &[Box<dyn Tool>],
41 activated_tools: Option<&std::sync::Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
42 observer: &dyn Observer,
43 cancellation_token: Option<&CancellationToken>,
44) -> Result<ToolExecutionOutcome> {
45 let args_summary = truncate_with_ellipsis(&call_arguments.to_string(), 300);
46 observer.record_event(&ObserverEvent::ToolCallStart {
47 tool: call_name.to_string(),
48 arguments: Some(args_summary),
49 });
50 let start = Instant::now();
51
52 let static_tool = find_tool(tools_registry, call_name);
53 let activated_arc = if static_tool.is_none() {
54 activated_tools.and_then(|at| at.lock().unwrap().get_resolved(call_name))
55 } else {
56 None
57 };
58 let Some(tool) = static_tool.or(activated_arc.as_deref()) else {
59 let reason = format!("Unknown tool: {call_name}");
60 let duration = start.elapsed();
61 observer.record_event(&ObserverEvent::ToolCall {
62 tool: call_name.to_string(),
63 duration,
64 success: false,
65 });
66 return Ok(ToolExecutionOutcome {
67 output: reason.clone(),
68 success: false,
69 error_reason: Some(scrub_credentials(&reason)),
70 duration,
71 });
72 };
73
74 let tool_future = tool.execute(call_arguments);
75 let tool_result = if let Some(token) = cancellation_token {
76 tokio::select! {
77 () = token.cancelled() => return Err(ToolLoopCancelled.into()),
78 result = tool_future => result,
79 }
80 } else {
81 tool_future.await
82 };
83
84 match tool_result {
85 Ok(r) => {
86 let duration = start.elapsed();
87 observer.record_event(&ObserverEvent::ToolCall {
88 tool: call_name.to_string(),
89 duration,
90 success: r.success,
91 });
92 if r.success {
93 Ok(ToolExecutionOutcome {
94 output: scrub_credentials(&r.output),
95 success: true,
96 error_reason: None,
97 duration,
98 })
99 } else {
100 let reason = r.error.unwrap_or(r.output);
101 Ok(ToolExecutionOutcome {
102 output: format!("Error: {reason}"),
103 success: false,
104 error_reason: Some(scrub_credentials(&reason)),
105 duration,
106 })
107 }
108 }
109 Err(e) => {
110 let duration = start.elapsed();
111 observer.record_event(&ObserverEvent::ToolCall {
112 tool: call_name.to_string(),
113 duration,
114 success: false,
115 });
116 let reason = format!("Error executing {call_name}: {e}");
117 Ok(ToolExecutionOutcome {
118 output: reason.clone(),
119 success: false,
120 error_reason: Some(scrub_credentials(&reason)),
121 duration,
122 })
123 }
124 }
125}
126
127pub(crate) fn should_execute_tools_in_parallel(
130 tool_calls: &[ParsedToolCall],
131 approval: Option<&ApprovalManager>,
132) -> bool {
133 if tool_calls.len() <= 1 {
134 return false;
135 }
136
137 if tool_calls.iter().any(|call| call.name == "tool_search") {
142 return false;
143 }
144
145 if let Some(mgr) = approval {
146 if tool_calls.iter().any(|call| mgr.needs_approval(&call.name)) {
147 return false;
150 }
151 }
152
153 true
154}
155
156pub(crate) async fn execute_tools_parallel(
159 tool_calls: &[ParsedToolCall],
160 tools_registry: &[Box<dyn Tool>],
161 activated_tools: Option<&std::sync::Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
162 observer: &dyn Observer,
163 cancellation_token: Option<&CancellationToken>,
164) -> Result<Vec<ToolExecutionOutcome>> {
165 let futures: Vec<_> = tool_calls
166 .iter()
167 .map(|call| {
168 execute_one_tool(
169 &call.name,
170 call.arguments.clone(),
171 tools_registry,
172 activated_tools,
173 observer,
174 cancellation_token,
175 )
176 })
177 .collect();
178
179 let results = futures_util::future::join_all(futures).await;
180 results.into_iter().collect()
181}
182
183pub(crate) async fn execute_tools_sequential(
186 tool_calls: &[ParsedToolCall],
187 tools_registry: &[Box<dyn Tool>],
188 activated_tools: Option<&std::sync::Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
189 observer: &dyn Observer,
190 cancellation_token: Option<&CancellationToken>,
191) -> Result<Vec<ToolExecutionOutcome>> {
192 let mut outcomes = Vec::with_capacity(tool_calls.len());
193
194 for call in tool_calls {
195 outcomes.push(
196 execute_one_tool(
197 &call.name,
198 call.arguments.clone(),
199 tools_registry,
200 activated_tools,
201 observer,
202 cancellation_token,
203 )
204 .await?,
205 );
206 }
207
208 Ok(outcomes)
209}