use anyhow::Result;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use crate::approval::ApprovalManager;
use crate::observability::{Observer, ObserverEvent};
use crate::tools::Tool;
use crate::util::truncate_with_ellipsis;
use super::loop_::{ParsedToolCall, ToolLoopCancelled, scrub_credentials};
pub(crate) fn find_tool<'a>(tools: &'a [Box<dyn Tool>], name: &str) -> Option<&'a dyn Tool> {
tools.iter().find(|t| t.name() == name).map(|t| t.as_ref())
}
pub(crate) struct ToolExecutionOutcome {
pub(crate) output: String,
pub(crate) success: bool,
pub(crate) error_reason: Option<String>,
pub(crate) duration: Duration,
}
pub(crate) async fn execute_one_tool(
call_name: &str,
call_arguments: serde_json::Value,
tools_registry: &[Box<dyn Tool>],
activated_tools: Option<&std::sync::Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
observer: &dyn Observer,
cancellation_token: Option<&CancellationToken>,
) -> Result<ToolExecutionOutcome> {
let args_summary = truncate_with_ellipsis(&call_arguments.to_string(), 300);
observer.record_event(&ObserverEvent::ToolCallStart {
tool: call_name.to_string(),
arguments: Some(args_summary),
});
let start = Instant::now();
let static_tool = find_tool(tools_registry, call_name);
let activated_arc = if static_tool.is_none() {
activated_tools.and_then(|at| at.lock().unwrap().get_resolved(call_name))
} else {
None
};
let Some(tool) = static_tool.or(activated_arc.as_deref()) else {
let reason = format!("Unknown tool: {call_name}");
let duration = start.elapsed();
observer.record_event(&ObserverEvent::ToolCall {
tool: call_name.to_string(),
duration,
success: false,
});
return Ok(ToolExecutionOutcome {
output: reason.clone(),
success: false,
error_reason: Some(scrub_credentials(&reason)),
duration,
});
};
let tool_future = tool.execute(call_arguments);
let tool_result = if let Some(token) = cancellation_token {
tokio::select! {
() = token.cancelled() => return Err(ToolLoopCancelled.into()),
result = tool_future => result,
}
} else {
tool_future.await
};
match tool_result {
Ok(r) => {
let duration = start.elapsed();
observer.record_event(&ObserverEvent::ToolCall {
tool: call_name.to_string(),
duration,
success: r.success,
});
if r.success {
Ok(ToolExecutionOutcome {
output: scrub_credentials(&r.output),
success: true,
error_reason: None,
duration,
})
} else {
let reason = r.error.unwrap_or(r.output);
Ok(ToolExecutionOutcome {
output: format!("Error: {reason}"),
success: false,
error_reason: Some(scrub_credentials(&reason)),
duration,
})
}
}
Err(e) => {
let duration = start.elapsed();
observer.record_event(&ObserverEvent::ToolCall {
tool: call_name.to_string(),
duration,
success: false,
});
let reason = format!("Error executing {call_name}: {e}");
Ok(ToolExecutionOutcome {
output: reason.clone(),
success: false,
error_reason: Some(scrub_credentials(&reason)),
duration,
})
}
}
}
pub(crate) fn should_execute_tools_in_parallel(
tool_calls: &[ParsedToolCall],
approval: Option<&ApprovalManager>,
) -> bool {
if tool_calls.len() <= 1 {
return false;
}
if tool_calls.iter().any(|call| call.name == "tool_search") {
return false;
}
if let Some(mgr) = approval {
if tool_calls.iter().any(|call| mgr.needs_approval(&call.name)) {
return false;
}
}
true
}
pub(crate) async fn execute_tools_parallel(
tool_calls: &[ParsedToolCall],
tools_registry: &[Box<dyn Tool>],
activated_tools: Option<&std::sync::Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
observer: &dyn Observer,
cancellation_token: Option<&CancellationToken>,
) -> Result<Vec<ToolExecutionOutcome>> {
let futures: Vec<_> = tool_calls
.iter()
.map(|call| {
execute_one_tool(
&call.name,
call.arguments.clone(),
tools_registry,
activated_tools,
observer,
cancellation_token,
)
})
.collect();
let results = futures_util::future::join_all(futures).await;
results.into_iter().collect()
}
pub(crate) async fn execute_tools_sequential(
tool_calls: &[ParsedToolCall],
tools_registry: &[Box<dyn Tool>],
activated_tools: Option<&std::sync::Arc<std::sync::Mutex<crate::tools::ActivatedToolSet>>>,
observer: &dyn Observer,
cancellation_token: Option<&CancellationToken>,
) -> Result<Vec<ToolExecutionOutcome>> {
let mut outcomes = Vec::with_capacity(tool_calls.len());
for call in tool_calls {
outcomes.push(
execute_one_tool(
&call.name,
call.arguments.clone(),
tools_registry,
activated_tools,
observer,
cancellation_token,
)
.await?,
);
}
Ok(outcomes)
}