use std::sync::Arc;
use std::time::Instant;
use futures::StreamExt;
use tokio_util::sync::CancellationToken;
use super::helpers::{catch_tool_panic, millis_to_u64, send_event, wrap_and_send};
use super::idempotency::{execute_with_idempotency, try_get_cached_result};
use super::listen::{
ListenWaitError, build_listen_confirmation_input, cancel_listen_with_warning,
wait_for_listen_ready,
};
use super::types::{
ConfirmedToolExecutionContext, ListenReady, ListenUpdateContext, ListenWaitParams,
ToolCallExecutionContext, ToolExecutionOutcome,
};
use crate::authority::EventAuthority;
use crate::events::AgentEvent;
use crate::hooks::{AgentHooks, ToolAuditSink, ToolDecision};
use crate::llm::{Content, ContentBlock, Message, Role};
use crate::stores::{EventStore, MessageStore};
use crate::tools::{
ErasedAsyncTool, ErasedListenTool, ErasedTool, ErasedToolStatus, ListenStopReason, ToolContext,
};
use crate::types::{
AgentError, ListenExecutionContext, PendingToolCallInfo, ThreadId, ToolInvocation, ToolOutcome,
ToolResult, ToolTier,
};
use agent_sdk_foundation::audit::{
AuditProvenance, ToolAuditOutcome, ToolAuditRecord, ToolAuditRecordParams,
};
fn build_invocation(pending: &PendingToolCallInfo, tier: ToolTier) -> ToolInvocation {
ToolInvocation {
tool_call_id: pending.id.clone(),
tool_name: pending.name.clone(),
display_name: pending.display_name.clone(),
tier,
requested_input: pending.input.clone(),
effective_input: pending.effective_input.clone(),
listen_context: pending.listen_context.clone(),
}
}
async fn emit_audit(
sink: &Arc<dyn ToolAuditSink>,
provenance: &AuditProvenance,
pending: &PendingToolCallInfo,
tier: ToolTier,
turn: usize,
outcome: ToolAuditOutcome,
) {
let record = ToolAuditRecord::new(ToolAuditRecordParams {
tool_call_id: pending.id.clone(),
tool_name: pending.name.clone(),
display_name: pending.display_name.clone(),
tier,
requested_input: pending.input.clone(),
effective_input: pending.effective_input.clone(),
turn,
provenance: provenance.clone(),
outcome,
});
sink.record(record).await;
}
pub(super) async fn execute_tool_call<Ctx, H>(
pending: &PendingToolCallInfo,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
) -> ToolExecutionOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
#[cfg(feature = "otel")]
let (mut tool_span, tool_kind) = start_tool_span(pending, ctx);
let outcome = execute_tool_call_inner(
pending,
ctx,
#[cfg(feature = "otel")]
&mut tool_span,
)
.await;
#[cfg(feature = "otel")]
finish_tool_span(&mut tool_span, &outcome, &pending.name, tool_kind);
outcome
}
#[cfg(feature = "otel")]
fn start_tool_span<Ctx, H>(
pending: &PendingToolCallInfo,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
) -> (opentelemetry::global::BoxedSpan, &'static str)
where
Ctx: Send + Sync + 'static,
H: AgentHooks,
{
use crate::observability::{attrs, baggage, langfuse, spans};
use opentelemetry::KeyValue;
let mut span_attrs = vec![
KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "execute_tool"),
KeyValue::new(attrs::GEN_AI_TOOL_NAME, pending.name.clone()),
KeyValue::new(attrs::GEN_AI_TOOL_CALL_ID, pending.id.clone()),
];
if !pending.display_name.is_empty() {
span_attrs.push(KeyValue::new(
attrs::SDK_TOOL_DISPLAY_NAME,
pending.display_name.clone(),
));
}
let tool_kind: &'static str = if let Some(tool) = ctx.tools.get(&pending.name) {
span_attrs.push(KeyValue::new(
attrs::SDK_TOOL_TIER,
attrs::tool_tier_str(tool.tier()),
));
"sync"
} else if let Some(tool) = ctx.tools.get_async(&pending.name) {
span_attrs.push(KeyValue::new(
attrs::SDK_TOOL_TIER,
attrs::tool_tier_str(tool.tier()),
));
"async"
} else if let Some(tool) = ctx.tools.get_listen(&pending.name) {
span_attrs.push(KeyValue::new(
attrs::SDK_TOOL_TIER,
attrs::tool_tier_str(tool.tier()),
));
"listen"
} else {
"unknown"
};
span_attrs.push(KeyValue::new(attrs::SDK_TOOL_KIND, tool_kind));
let mut span = spans::start_internal_span("execute_tool", span_attrs);
baggage::copy_baggage_to_active_span(&mut span);
langfuse::tag_observation(&mut span, langfuse::ObservationType::Tool);
(span, tool_kind)
}
#[cfg(feature = "otel")]
fn finish_tool_span(
span: &mut opentelemetry::global::BoxedSpan,
outcome: &ToolExecutionOutcome,
tool_name: &str,
tool_kind: &'static str,
) {
use crate::observability::{attrs, metrics, spans};
use opentelemetry::KeyValue;
use opentelemetry::trace::Span;
let metrics_handle = metrics::Metrics::global();
let (outcome_str, duration_ms) = match outcome {
ToolExecutionOutcome::Completed { result, .. } => {
let outcome_str = if result.output.starts_with("Unknown tool:") {
span.set_attribute(KeyValue::new(attrs::ERROR_TYPE, "unknown_tool"));
span.set_status(opentelemetry::trace::Status::error(result.output.clone()));
"error"
} else if result.output.starts_with("Blocked:") {
"blocked"
} else if result.output.starts_with("Rejected:") {
"rejected"
} else if result.success {
"success"
} else {
"error"
};
span.set_attribute(KeyValue::new(attrs::SDK_TOOL_OUTCOME, outcome_str));
if let Some(ms) = result.duration_ms {
span.set_attribute(attrs::kv_i64(
attrs::SDK_TOOL_DURATION_MS,
i64::try_from(ms).unwrap_or(i64::MAX),
));
}
(outcome_str, result.duration_ms)
}
ToolExecutionOutcome::RequiresConfirmation { tool_name, .. } => {
span.set_attribute(attrs::kv_bool(attrs::SDK_TOOL_CONFIRMATION_REQUIRED, true));
span.set_attribute(KeyValue::new(
attrs::SDK_TOOL_OUTCOME,
"awaiting_confirmation",
));
spans::add_event(
span,
"tool.confirmation_required",
vec![KeyValue::new(attrs::GEN_AI_TOOL_NAME, tool_name.clone())],
);
("awaiting_confirmation", None)
}
ToolExecutionOutcome::Error(error) => {
span.set_attribute(KeyValue::new(attrs::ERROR_TYPE, "event_store"));
span.set_status(opentelemetry::trace::Status::error(error.message.clone()));
span.set_attribute(KeyValue::new(attrs::SDK_TOOL_OUTCOME, "error"));
("error", None)
}
};
metrics_handle.record_tool_execution(tool_name, tool_kind, outcome_str, duration_ms);
span.end();
}
async fn execute_tool_call_inner<Ctx, H>(
pending: &PendingToolCallInfo,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
#[cfg(feature = "otel")] tool_span: &mut opentelemetry::global::BoxedSpan,
) -> ToolExecutionOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
if let Some(cached_result) = try_get_cached_result(ctx.execution_store, &pending.id).await {
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
pending.tier,
ctx.turn,
ToolAuditOutcome::Cached {
result: cached_result.clone(),
},
)
.await;
#[cfg(feature = "otel")]
crate::observability::spans::add_event(
tool_span,
"tool.cached_result_returned",
vec![opentelemetry::KeyValue::new(
crate::observability::attrs::GEN_AI_TOOL_CALL_ID,
pending.id.clone(),
)],
);
return ToolExecutionOutcome::Completed {
tool_id: pending.id.clone(),
result: cached_result,
};
}
if let Some(listen_tool) = ctx.tools.get_listen(&pending.name) {
return execute_listen_tool_call(pending, listen_tool, ctx).await;
}
if let Some(async_tool) = ctx.tools.get_async(&pending.name) {
return execute_async_tool_call(
pending,
async_tool,
ctx,
#[cfg(feature = "otel")]
tool_span,
)
.await;
}
let Some(tool) = ctx.tools.get(&pending.name) else {
let result = ToolResult::error(format!("Unknown tool: {}", pending.name));
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
ToolTier::Confirm,
ctx.turn,
ToolAuditOutcome::Completed {
result: result.clone(),
},
)
.await;
return ToolExecutionOutcome::Completed {
tool_id: pending.id.clone(),
result,
};
};
execute_sync_tool_call(pending, tool, ctx).await
}
pub(super) async fn execute_listen_tool_call<Ctx, H>(
pending: &PendingToolCallInfo,
listen_tool: &Arc<dyn ErasedListenTool<Ctx>>,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
) -> ToolExecutionOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
let tier = listen_tool.tier();
if let Err(error) = send_event(
ctx.event_store,
ctx.thread_id,
ctx.turn,
ctx.hooks,
ctx.authority,
AgentEvent::tool_call_start(
&pending.id,
&pending.name,
&pending.display_name,
pending.input.clone(),
tier,
),
)
.await
{
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: None,
error: error.message.clone(),
},
)
.await;
return ToolExecutionOutcome::Error(error);
}
let tool_start = Instant::now();
let ready = match wait_for_listen_ready(ListenWaitParams {
pending,
tool: listen_tool,
tool_context: ctx.tool_context,
update_ctx: ListenUpdateContext {
pending,
hooks: ctx.hooks,
event_store: ctx.event_store,
thread_id: ctx.thread_id,
turn: ctx.turn,
authority: ctx.authority,
},
})
.await
{
Ok(ready) => ready,
Err(ListenWaitError::Tool(result)) => {
return finish_listen_ready_failure(pending, ctx, tool_start, result).await;
}
Err(ListenWaitError::Event(error)) => return ToolExecutionOutcome::Error(error),
};
match ctx
.hooks
.pre_tool_use(&build_invocation(pending, tier))
.await
{
ToolDecision::Allow => {
handle_listen_tool_allow(pending, listen_tool, ctx, &ready, tool_start).await
}
ToolDecision::Block(reason) => {
handle_listen_tool_block(pending, listen_tool, ctx, &ready, reason).await
}
ToolDecision::RequiresConfirmation(description) => {
handle_listen_tool_confirmation(pending, ctx, ready, description).await
}
_ => {
handle_listen_tool_block(
pending,
listen_tool,
ctx,
&ready,
"unrecognized tool decision".to_string(),
)
.await
}
}
}
pub(super) async fn finish_listen_ready_failure<Ctx, H>(
pending: &PendingToolCallInfo,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
tool_start: Instant,
mut result: ToolResult,
) -> ToolExecutionOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
result.duration_ms = Some(millis_to_u64(tool_start.elapsed().as_millis()));
ctx.hooks.post_tool_use(&pending.name, &result).await;
let tier = ctx
.tools
.get_listen(&pending.name)
.map_or(ToolTier::Confirm, |t| t.tier());
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::Invalidated {
reason: result.output.clone(),
},
)
.await;
if let Err(error) = send_event(
ctx.event_store,
ctx.thread_id,
ctx.turn,
ctx.hooks,
ctx.authority,
AgentEvent::tool_call_end(
&pending.id,
&pending.name,
&pending.display_name,
result.clone(),
),
)
.await
{
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: Some(result),
error: error.message.clone(),
},
)
.await;
return ToolExecutionOutcome::Error(error);
}
ToolExecutionOutcome::Completed {
tool_id: pending.id.clone(),
result,
}
}
pub(super) async fn handle_listen_tool_allow<Ctx, H>(
pending: &PendingToolCallInfo,
listen_tool: &Arc<dyn ErasedListenTool<Ctx>>,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
ready: &ListenReady,
tool_start: Instant,
) -> ToolExecutionOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
let tier = listen_tool.tier();
let controls = ToolBoundaryControls::from_tool_context(ctx.tool_context);
let result =
match execute_with_idempotency(ctx.execution_store, pending, ctx.thread_id, async {
race_boundary_result(
&controls,
tool_start,
catch_tool_panic(listen_execute_ok(
listen_tool,
ctx.tool_context,
&ready.operation_id,
ready.revision,
tool_start,
)),
)
.await
})
.await
{
Ok(result) => result,
Err(error) => {
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: None,
error: error.message.clone(),
},
)
.await;
return ToolExecutionOutcome::Error(error);
}
};
ctx.hooks.post_tool_use(&pending.name, &result).await;
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::Completed {
result: result.clone(),
},
)
.await;
if let Err(error) = send_event(
ctx.event_store,
ctx.thread_id,
ctx.turn,
ctx.hooks,
ctx.authority,
AgentEvent::tool_call_end(
&pending.id,
&pending.name,
&pending.display_name,
result.clone(),
),
)
.await
{
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: Some(result),
error: error.message.clone(),
},
)
.await;
return ToolExecutionOutcome::Error(error);
}
ToolExecutionOutcome::Completed {
tool_id: pending.id.clone(),
result,
}
}
pub(super) async fn handle_listen_tool_block<Ctx, H>(
pending: &PendingToolCallInfo,
listen_tool: &Arc<dyn ErasedListenTool<Ctx>>,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
ready: &ListenReady,
reason: String,
) -> ToolExecutionOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
let tier = listen_tool.tier();
cancel_listen_with_warning(
listen_tool,
ctx.tool_context,
&ready.operation_id,
ListenStopReason::Blocked,
&pending.id,
&pending.name,
)
.await;
let result = ToolResult::error(format!("Blocked: {reason}"));
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::Blocked {
reason: reason.clone(),
},
)
.await;
if let Err(error) = send_event(
ctx.event_store,
ctx.thread_id,
ctx.turn,
ctx.hooks,
ctx.authority,
AgentEvent::tool_call_end(
&pending.id,
&pending.name,
&pending.display_name,
result.clone(),
),
)
.await
{
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: Some(result),
error: error.message.clone(),
},
)
.await;
return ToolExecutionOutcome::Error(error);
}
ToolExecutionOutcome::Completed {
tool_id: pending.id.clone(),
result,
}
}
pub(super) async fn handle_listen_tool_confirmation<Ctx, H>(
pending: &PendingToolCallInfo,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
ready: ListenReady,
description: String,
) -> ToolExecutionOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
let tier = ctx
.tools
.get_listen(&pending.name)
.map_or(ToolTier::Confirm, |t| t.tier());
let input = build_listen_confirmation_input(&pending.input, &ready);
let listen_context = ListenExecutionContext {
operation_id: ready.operation_id.clone(),
revision: ready.revision,
snapshot: ready.snapshot.clone(),
expires_at: ready.expires_at,
};
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::RequiresConfirmation {
description: description.clone(),
listen_context: Some(listen_context.clone()),
},
)
.await;
if let Err(error) = send_event(
ctx.event_store,
ctx.thread_id,
ctx.turn,
ctx.hooks,
ctx.authority,
AgentEvent::ToolRequiresConfirmation {
id: pending.id.clone(),
name: pending.name.clone(),
display_name: pending.display_name.clone(),
input: input.clone(),
description: description.clone(),
},
)
.await
{
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: None,
error: error.message.clone(),
},
)
.await;
return ToolExecutionOutcome::Error(error);
}
ToolExecutionOutcome::RequiresConfirmation {
tool_id: pending.id.clone(),
tool_name: pending.name.clone(),
display_name: pending.display_name.clone(),
input,
description,
listen_context: Some(listen_context),
}
}
async fn send_tool_call_start_event<Ctx, H>(
pending: &PendingToolCallInfo,
tier: ToolTier,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
) -> Result<(), AgentError>
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
send_event(
ctx.event_store,
ctx.thread_id,
ctx.turn,
ctx.hooks,
ctx.authority,
AgentEvent::tool_call_start(
&pending.id,
&pending.name,
&pending.display_name,
pending.input.clone(),
tier,
),
)
.await
}
async fn send_tool_call_end_event<Ctx, H>(
pending: &PendingToolCallInfo,
result: &ToolResult,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
) -> Result<(), AgentError>
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
send_event(
ctx.event_store,
ctx.thread_id,
ctx.turn,
ctx.hooks,
ctx.authority,
AgentEvent::tool_call_end(
&pending.id,
&pending.name,
&pending.display_name,
result.clone(),
),
)
.await
}
async fn block_tool_call<Ctx, H>(
pending: &PendingToolCallInfo,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
tier: ToolTier,
reason: String,
) -> ToolExecutionOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
let result = ToolResult::error(format!("Blocked: {reason}"));
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::Blocked {
reason: reason.clone(),
},
)
.await;
if let Err(error) = send_tool_call_end_event(pending, &result, ctx).await {
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: Some(result),
error: error.message.clone(),
},
)
.await;
return ToolExecutionOutcome::Error(error);
}
ToolExecutionOutcome::Completed {
tool_id: pending.id.clone(),
result,
}
}
async fn require_tool_confirmation<Ctx, H>(
pending: &PendingToolCallInfo,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
tier: ToolTier,
description: String,
) -> ToolExecutionOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::RequiresConfirmation {
description: description.clone(),
listen_context: None,
},
)
.await;
if let Err(error) = send_event(
ctx.event_store,
ctx.thread_id,
ctx.turn,
ctx.hooks,
ctx.authority,
AgentEvent::ToolRequiresConfirmation {
id: pending.id.clone(),
name: pending.name.clone(),
display_name: pending.display_name.clone(),
input: pending.input.clone(),
description: description.clone(),
},
)
.await
{
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: None,
error: error.message.clone(),
},
)
.await;
return ToolExecutionOutcome::Error(error);
}
ToolExecutionOutcome::RequiresConfirmation {
tool_id: pending.id.clone(),
tool_name: pending.name.clone(),
display_name: pending.display_name.clone(),
input: pending.input.clone(),
description,
listen_context: None,
}
}
async fn complete_async_tool_call<Ctx, H>(
pending: &PendingToolCallInfo,
async_tool: &Arc<dyn ErasedAsyncTool<Ctx>>,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
#[cfg(feature = "otel")] tool_span: &mut opentelemetry::global::BoxedSpan,
) -> ToolExecutionOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
let tier = async_tool.tier();
let result = match execute_with_idempotency(
ctx.execution_store,
pending,
ctx.thread_id,
execute_async_tool(AsyncToolExecutionParams {
pending,
tool: async_tool,
tool_context: ctx.tool_context,
event_store: ctx.event_store,
thread_id: ctx.thread_id,
turn: ctx.turn,
authority: ctx.authority,
#[cfg(feature = "otel")]
tool_span: Some(tool_span),
}),
)
.await
{
Ok(result) => result,
Err(error) => {
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: None,
error: error.message.clone(),
},
)
.await;
return ToolExecutionOutcome::Error(error);
}
};
ctx.hooks.post_tool_use(&pending.name, &result).await;
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::Completed {
result: result.clone(),
},
)
.await;
if let Err(error) = send_tool_call_end_event(pending, &result, ctx).await {
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: Some(result),
error: error.message.clone(),
},
)
.await;
return ToolExecutionOutcome::Error(error);
}
ToolExecutionOutcome::Completed {
tool_id: pending.id.clone(),
result,
}
}
const CANCELLED_BY_USER: &str = "Cancelled by user";
const TIMED_OUT_AT_BOUNDARY: &str = "Tool timed out";
#[derive(Clone)]
pub(super) struct ToolBoundaryControls {
cancel: Option<CancellationToken>,
timeout: Option<std::time::Duration>,
}
impl ToolBoundaryControls {
pub(super) fn from_tool_context<Ctx>(tool_context: &ToolContext<Ctx>) -> Self {
Self {
cancel: tool_context.cancel_token(),
timeout: tool_context.tool_timeout(),
}
}
const fn is_noop(&self) -> bool {
self.cancel.is_none() && self.timeout.is_none()
}
}
#[derive(Clone, Copy)]
enum BoundaryStop {
Cancelled,
TimedOut,
}
impl BoundaryStop {
fn synthesize(self, tool_start: Instant) -> ToolResult {
let duration = millis_to_u64(tool_start.elapsed().as_millis());
match self {
Self::Cancelled => ToolResult::success(CANCELLED_BY_USER).with_duration(duration),
Self::TimedOut => ToolResult::error(TIMED_OUT_AT_BOUNDARY).with_duration(duration),
}
}
}
async fn wait_for_boundary_stop(controls: &ToolBoundaryControls) -> BoundaryStop {
let cancel = controls.cancel.clone();
let cancelled = async move {
match cancel {
Some(token) => token.cancelled().await,
None => std::future::pending::<()>().await,
}
};
let timeout = controls.timeout;
let timed_out = async move {
match timeout {
Some(duration) => tokio::time::sleep(duration).await,
None => std::future::pending::<()>().await,
}
};
tokio::select! {
biased;
() = cancelled => BoundaryStop::Cancelled,
() = timed_out => BoundaryStop::TimedOut,
}
}
async fn race_boundary_result<F>(
controls: &ToolBoundaryControls,
tool_start: Instant,
produce: F,
) -> Result<ToolResult, AgentError>
where
F: std::future::Future<Output = Result<ToolResult, AgentError>>,
{
if controls.is_noop() {
return produce.await;
}
tokio::select! {
biased;
result = produce => result,
stop = wait_for_boundary_stop(controls) => Ok(stop.synthesize(tool_start)),
}
}
async fn sync_execute_ok<F>(execute: F, tool_start: Instant) -> Result<ToolResult, AgentError>
where
F: std::future::Future<Output = anyhow::Result<ToolResult>>,
{
Ok(match execute.await {
Ok(mut value) => {
value.duration_ms = Some(millis_to_u64(tool_start.elapsed().as_millis()));
value
}
Err(error) => ToolResult::error(format!("Tool error: {error:#}"))
.with_duration(millis_to_u64(tool_start.elapsed().as_millis())),
})
}
async fn listen_execute_ok<Ctx>(
listen_tool: &Arc<dyn ErasedListenTool<Ctx>>,
tool_context: &ToolContext<Ctx>,
operation_id: &str,
revision: u64,
tool_start: Instant,
) -> Result<ToolResult, AgentError>
where
Ctx: Send + Sync + Clone,
{
Ok(
match listen_tool
.execute(tool_context, operation_id, revision)
.await
{
Ok(mut value) => {
value.duration_ms = Some(millis_to_u64(tool_start.elapsed().as_millis()));
value
}
Err(error) => ToolResult::error(format!("Listen execute error: {error}"))
.with_duration(millis_to_u64(tool_start.elapsed().as_millis())),
},
)
}
async fn complete_sync_tool_call<Ctx, H>(
pending: &PendingToolCallInfo,
tool: &Arc<dyn ErasedTool<Ctx>>,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
) -> ToolExecutionOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
let tier = tool.tier();
let tool_start = Instant::now();
let controls = ToolBoundaryControls::from_tool_context(ctx.tool_context);
let result =
match execute_with_idempotency(ctx.execution_store, pending, ctx.thread_id, async {
race_boundary_result(
&controls,
tool_start,
catch_tool_panic(sync_execute_ok(
tool.execute(ctx.tool_context, pending.input.clone()),
tool_start,
)),
)
.await
})
.await
{
Ok(result) => result,
Err(error) => {
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: None,
error: error.message.clone(),
},
)
.await;
return ToolExecutionOutcome::Error(error);
}
};
ctx.hooks.post_tool_use(&pending.name, &result).await;
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::Completed {
result: result.clone(),
},
)
.await;
if let Err(error) = send_tool_call_end_event(pending, &result, ctx).await {
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: Some(result),
error: error.message.clone(),
},
)
.await;
return ToolExecutionOutcome::Error(error);
}
ToolExecutionOutcome::Completed {
tool_id: pending.id.clone(),
result,
}
}
pub(super) async fn execute_async_tool_call<Ctx, H>(
pending: &PendingToolCallInfo,
async_tool: &Arc<dyn ErasedAsyncTool<Ctx>>,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
#[cfg(feature = "otel")] tool_span: &mut opentelemetry::global::BoxedSpan,
) -> ToolExecutionOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
let tier = async_tool.tier();
if let Err(error) = send_tool_call_start_event(pending, tier, ctx).await {
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: None,
error: error.message.clone(),
},
)
.await;
return ToolExecutionOutcome::Error(error);
}
match ctx
.hooks
.pre_tool_use(&build_invocation(pending, tier))
.await
{
ToolDecision::Allow => {
complete_async_tool_call(
pending,
async_tool,
ctx,
#[cfg(feature = "otel")]
tool_span,
)
.await
}
ToolDecision::Block(reason) => block_tool_call(pending, ctx, tier, reason).await,
ToolDecision::RequiresConfirmation(description) => {
require_tool_confirmation(pending, ctx, tier, description).await
}
_ => block_tool_call(pending, ctx, tier, "unrecognized tool decision".to_string()).await,
}
}
pub(super) async fn execute_sync_tool_call<Ctx, H>(
pending: &PendingToolCallInfo,
tool: &Arc<dyn ErasedTool<Ctx>>,
ctx: &ToolCallExecutionContext<'_, Ctx, H>,
) -> ToolExecutionOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
let tier = tool.tier();
if let Err(error) = send_tool_call_start_event(pending, tier, ctx).await {
emit_audit(
ctx.audit_sink,
ctx.provenance,
pending,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: None,
error: error.message.clone(),
},
)
.await;
return ToolExecutionOutcome::Error(error);
}
match ctx
.hooks
.pre_tool_use(&build_invocation(pending, tier))
.await
{
ToolDecision::Allow => complete_sync_tool_call(pending, tool, ctx).await,
ToolDecision::Block(reason) => block_tool_call(pending, ctx, tier, reason).await,
ToolDecision::RequiresConfirmation(description) => {
require_tool_confirmation(pending, ctx, tier, description).await
}
_ => block_tool_call(pending, ctx, tier, "unrecognized tool decision".to_string()).await,
}
}
pub(super) struct AsyncToolExecutionParams<'a, Ctx> {
pub(super) pending: &'a PendingToolCallInfo,
pub(super) tool: &'a Arc<dyn ErasedAsyncTool<Ctx>>,
pub(super) tool_context: &'a ToolContext<Ctx>,
pub(super) event_store: &'a Arc<dyn EventStore>,
pub(super) thread_id: &'a ThreadId,
pub(super) turn: usize,
pub(super) authority: &'a Arc<dyn EventAuthority>,
#[cfg(feature = "otel")]
pub(super) tool_span: Option<&'a mut opentelemetry::global::BoxedSpan>,
}
pub(super) async fn execute_async_tool<Ctx>(
params: AsyncToolExecutionParams<'_, Ctx>,
) -> Result<ToolResult, AgentError>
where
Ctx: Send + Sync + Clone,
{
let controls = ToolBoundaryControls::from_tool_context(params.tool_context);
let tool_start = Instant::now();
race_boundary_result(
&controls,
tool_start,
catch_tool_panic(execute_async_tool_inner(params, tool_start)),
)
.await
}
async fn execute_async_tool_inner<Ctx>(
params: AsyncToolExecutionParams<'_, Ctx>,
tool_start: Instant,
) -> Result<ToolResult, AgentError>
where
Ctx: Send + Sync + Clone,
{
let AsyncToolExecutionParams {
pending,
tool,
tool_context,
event_store,
thread_id,
turn,
authority,
#[cfg(feature = "otel")]
tool_span,
} = params;
let outcome = match tool.execute(tool_context, pending.input.clone()).await {
Ok(o) => o,
Err(e) => {
return Ok(ToolResult::error(format!("Tool error: {e:#}"))
.with_duration(millis_to_u64(tool_start.elapsed().as_millis())));
}
};
match outcome {
ToolOutcome::Success(mut result) | ToolOutcome::Failed(mut result) => {
result.duration_ms = Some(millis_to_u64(tool_start.elapsed().as_millis()));
Ok(result)
}
ToolOutcome::InProgress {
operation_id,
message,
} => {
stream_async_tool_progress(StreamAsyncToolProgressParams {
pending,
tool,
tool_context,
event_store,
thread_id,
turn,
authority,
operation_id,
initial_message: message,
tool_start,
#[cfg(feature = "otel")]
tool_span,
})
.await
}
}
}
struct StreamAsyncToolProgressParams<'a, Ctx> {
pending: &'a PendingToolCallInfo,
tool: &'a Arc<dyn ErasedAsyncTool<Ctx>>,
tool_context: &'a ToolContext<Ctx>,
event_store: &'a Arc<dyn EventStore>,
thread_id: &'a ThreadId,
turn: usize,
authority: &'a Arc<dyn EventAuthority>,
operation_id: String,
initial_message: String,
tool_start: Instant,
#[cfg(feature = "otel")]
tool_span: Option<&'a mut opentelemetry::global::BoxedSpan>,
}
async fn stream_async_tool_progress<Ctx>(
params: StreamAsyncToolProgressParams<'_, Ctx>,
) -> Result<ToolResult, AgentError>
where
Ctx: Send + Sync + Clone,
{
let StreamAsyncToolProgressParams {
pending,
tool,
tool_context,
event_store,
thread_id,
turn,
authority,
operation_id,
initial_message,
tool_start,
#[cfg(feature = "otel")]
mut tool_span,
} = params;
#[cfg(feature = "otel")]
if let Some(span) = tool_span.as_deref_mut() {
crate::observability::spans::add_event(
span,
"tool.async.started",
vec![opentelemetry::KeyValue::new(
crate::observability::attrs::GEN_AI_TOOL_NAME,
pending.name.clone(),
)],
);
}
wrap_and_send(
event_store,
thread_id,
turn,
AgentEvent::tool_progress(
&pending.id,
&pending.name,
&pending.display_name,
"started",
&initial_message,
None,
),
authority,
)
.await?;
let mut stream = tool.check_status_stream(tool_context, &operation_id);
#[cfg(feature = "otel")]
let mut poll_index: u64 = 0;
while let Some(status) = stream.next().await {
match status {
ErasedToolStatus::Progress {
stage,
message,
data,
} => {
#[cfg(feature = "otel")]
if let Some(span) = tool_span.as_deref_mut() {
poll_index += 1;
crate::observability::spans::add_event(
span,
"tool.async.poll",
vec![
opentelemetry::KeyValue::new(
crate::observability::attrs::SDK_TOOL_PROGRESS_STAGE,
stage.clone(),
),
crate::observability::attrs::kv_i64(
crate::observability::attrs::SDK_TOOL_POLL_INDEX,
i64::try_from(poll_index).unwrap_or(i64::MAX),
),
],
);
}
wrap_and_send(
event_store,
thread_id,
turn,
AgentEvent::tool_progress(
&pending.id,
&pending.name,
&pending.display_name,
stage,
message,
data,
),
authority,
)
.await?;
}
ErasedToolStatus::Completed(mut result) | ErasedToolStatus::Failed(mut result) => {
result.duration_ms = Some(millis_to_u64(tool_start.elapsed().as_millis()));
return Ok(result);
}
}
}
Ok(
ToolResult::error("Async tool stream ended without completion")
.with_duration(millis_to_u64(tool_start.elapsed().as_millis())),
)
}
pub(super) async fn execute_confirmed_tool<Ctx, H>(
awaiting_tool: &PendingToolCallInfo,
rejection_reason: Option<String>,
ctx: &ConfirmedToolExecutionContext<'_, Ctx, H>,
) -> Result<ToolResult, AgentError>
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
if let Some(reason) = rejection_reason {
return handle_confirmed_tool_rejection(awaiting_tool, ctx, reason).await;
}
let tier = awaiting_tool.tier;
let hook_decision = ctx
.hooks
.pre_tool_use(&build_invocation(awaiting_tool, tier))
.await;
if let ToolDecision::Block(reason) = &hook_decision {
log::warn!(
"pre_tool_use returned Block for confirmed tool '{}': {reason} -- rejecting at resume time",
awaiting_tool.name
);
emit_audit(
ctx.audit_sink,
ctx.provenance,
awaiting_tool,
tier,
ctx.turn,
ToolAuditOutcome::Blocked {
reason: reason.clone(),
},
)
.await;
let result = ToolResult::error(format!("Blocked at resume: {reason}"));
return finish_confirmed_tool(awaiting_tool, ctx, result).await;
}
if let Some(cached_result) = try_get_cached_result(ctx.execution_store, &awaiting_tool.id).await
{
emit_audit(
ctx.audit_sink,
ctx.provenance,
awaiting_tool,
tier,
ctx.turn,
ToolAuditOutcome::Cached {
result: cached_result.clone(),
},
)
.await;
return finish_confirmed_tool(awaiting_tool, ctx, cached_result).await;
}
let result = execute_confirmed_tool_inner(awaiting_tool, ctx).await?;
finish_confirmed_tool(awaiting_tool, ctx, result).await
}
pub(super) async fn handle_confirmed_tool_rejection<Ctx, H>(
awaiting_tool: &PendingToolCallInfo,
ctx: &ConfirmedToolExecutionContext<'_, Ctx, H>,
reason: String,
) -> Result<ToolResult, AgentError>
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
if let Some(listen_tool) = ctx.tools.get_listen(&awaiting_tool.name)
&& let Some(listen) = awaiting_tool.listen_context.as_ref()
{
cancel_listen_with_warning(
listen_tool,
ctx.tool_context,
&listen.operation_id,
ListenStopReason::UserRejected,
&awaiting_tool.id,
&awaiting_tool.name,
)
.await;
}
let tier = awaiting_tool.tier;
emit_audit(
ctx.audit_sink,
ctx.provenance,
awaiting_tool,
tier,
ctx.turn,
ToolAuditOutcome::Blocked {
reason: reason.clone(),
},
)
.await;
let result = ToolResult::error(format!("Rejected: {reason}"));
if let Err(error) = send_event(
ctx.event_store,
ctx.thread_id,
ctx.turn,
ctx.hooks,
ctx.authority,
AgentEvent::tool_call_end(
&awaiting_tool.id,
&awaiting_tool.name,
&awaiting_tool.display_name,
result.clone(),
),
)
.await
{
emit_audit(
ctx.audit_sink,
ctx.provenance,
awaiting_tool,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: Some(result.clone()),
error: error.message.clone(),
},
)
.await;
return Err(error);
}
Ok(result)
}
pub(super) async fn execute_confirmed_tool_inner<Ctx, H>(
awaiting_tool: &PendingToolCallInfo,
ctx: &ConfirmedToolExecutionContext<'_, Ctx, H>,
) -> Result<ToolResult, AgentError>
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
if let Some(listen_tool) = ctx.tools.get_listen(&awaiting_tool.name) {
let Some(listen) = awaiting_tool.listen_context.as_ref() else {
return Ok(ToolResult::error(format!(
"Listen context missing for tool: {}",
awaiting_tool.name
)));
};
let tool_start = Instant::now();
let controls = ToolBoundaryControls::from_tool_context(ctx.tool_context);
return execute_with_idempotency(
ctx.execution_store,
awaiting_tool,
ctx.thread_id,
async {
race_boundary_result(
&controls,
tool_start,
catch_tool_panic(listen_execute_ok(
listen_tool,
ctx.tool_context,
&listen.operation_id,
listen.revision,
tool_start,
)),
)
.await
},
)
.await;
}
if let Some(async_tool) = ctx.tools.get_async(&awaiting_tool.name) {
return execute_with_idempotency(
ctx.execution_store,
awaiting_tool,
ctx.thread_id,
execute_async_tool(AsyncToolExecutionParams {
pending: awaiting_tool,
tool: async_tool,
tool_context: ctx.tool_context,
event_store: ctx.event_store,
thread_id: ctx.thread_id,
turn: ctx.turn,
authority: ctx.authority,
#[cfg(feature = "otel")]
tool_span: None,
}),
)
.await;
}
if let Some(tool) = ctx.tools.get(&awaiting_tool.name) {
let tool_start = Instant::now();
let controls = ToolBoundaryControls::from_tool_context(ctx.tool_context);
return execute_with_idempotency(
ctx.execution_store,
awaiting_tool,
ctx.thread_id,
async {
race_boundary_result(
&controls,
tool_start,
catch_tool_panic(sync_execute_ok(
tool.execute(ctx.tool_context, awaiting_tool.input.clone()),
tool_start,
)),
)
.await
},
)
.await;
}
Ok(ToolResult::error(format!(
"Unknown tool: {}",
awaiting_tool.name
)))
}
pub(super) async fn finish_confirmed_tool<Ctx, H>(
awaiting_tool: &PendingToolCallInfo,
ctx: &ConfirmedToolExecutionContext<'_, Ctx, H>,
result: ToolResult,
) -> Result<ToolResult, AgentError>
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
ctx.hooks.post_tool_use(&awaiting_tool.name, &result).await;
let tier = awaiting_tool.tier;
emit_audit(
ctx.audit_sink,
ctx.provenance,
awaiting_tool,
tier,
ctx.turn,
ToolAuditOutcome::Completed {
result: result.clone(),
},
)
.await;
if let Err(error) = send_event(
ctx.event_store,
ctx.thread_id,
ctx.turn,
ctx.hooks,
ctx.authority,
AgentEvent::tool_call_end(
&awaiting_tool.id,
&awaiting_tool.name,
&awaiting_tool.display_name,
result.clone(),
),
)
.await
{
emit_audit(
ctx.audit_sink,
ctx.provenance,
awaiting_tool,
tier,
ctx.turn,
ToolAuditOutcome::PersistenceFailed {
result: Some(result.clone()),
error: error.message.clone(),
},
)
.await;
return Err(error);
}
Ok(result)
}
pub(super) async fn append_tool_results<M>(
tool_results: &[(String, ToolResult)],
thread_id: &ThreadId,
message_store: &Arc<M>,
) -> Result<(), AgentError>
where
M: MessageStore,
{
if tool_results.is_empty() {
return Ok(());
}
let mut blocks: Vec<ContentBlock> = Vec::new();
for (tool_id, result) in tool_results {
blocks.push(ContentBlock::ToolResult {
tool_use_id: tool_id.clone(),
content: result.output.clone(),
is_error: if result.success { None } else { Some(true) },
});
for doc in &result.documents {
if doc.media_type.starts_with("image/") {
blocks.push(ContentBlock::Image {
source: doc.clone(),
});
} else {
blocks.push(ContentBlock::Document {
source: doc.clone(),
});
}
}
}
let batch_msg = Message {
role: Role::User,
content: Content::Blocks(blocks),
};
if let Err(e) = message_store.append(thread_id, batch_msg).await {
return Err(AgentError::new(
format!("Failed to append tool results: {e}"),
false,
));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::append_tool_results;
use crate::llm::{Content, ContentBlock};
use crate::stores::{InMemoryStore, MessageStore};
use crate::types::{ThreadId, ToolResult};
use std::sync::Arc;
#[tokio::test]
async fn test_append_tool_results_preserves_raw_output_content() -> anyhow::Result<()> {
let store = Arc::new(InMemoryStore::new());
let thread_id = ThreadId::from_string("thread-structured");
let result = ToolResult::error("command failed").with_duration(17);
append_tool_results(&[("tool_1".to_string(), result)], &thread_id, &store).await?;
let history = store.get_history(&thread_id).await?;
let Content::Blocks(blocks) = &history[0].content else {
anyhow::bail!("expected blocks")
};
let ContentBlock::ToolResult {
content, is_error, ..
} = &blocks[0]
else {
anyhow::bail!("expected tool result block")
};
assert_eq!(content, "command failed");
assert_eq!(*is_error, Some(true));
Ok(())
}
#[tokio::test]
async fn test_append_tool_results_uses_image_block_for_images() -> anyhow::Result<()> {
let store = Arc::new(InMemoryStore::new());
let thread_id = ThreadId::from_string("thread-1");
let result = ToolResult::success("attached image").with_documents(vec![
crate::llm::ContentSource::new("image/png", "ZmFrZQ=="),
]);
append_tool_results(&[("tool_1".to_string(), result)], &thread_id, &store).await?;
let history = store.get_history(&thread_id).await?;
assert_eq!(history.len(), 1);
let Content::Blocks(blocks) = &history[0].content else {
anyhow::bail!("expected blocks")
};
assert!(matches!(blocks[0], ContentBlock::ToolResult { .. }));
assert!(matches!(blocks[1], ContentBlock::Image { .. }));
Ok(())
}
#[tokio::test]
async fn test_append_tool_results_uses_document_block_for_pdfs() -> anyhow::Result<()> {
let store = Arc::new(InMemoryStore::new());
let thread_id = ThreadId::from_string("thread-2");
let result = ToolResult::success("attached pdf").with_documents(vec![
crate::llm::ContentSource::new("application/pdf", "ZmFrZQ=="),
]);
append_tool_results(&[("tool_1".to_string(), result)], &thread_id, &store).await?;
let history = store.get_history(&thread_id).await?;
let Content::Blocks(blocks) = &history[0].content else {
anyhow::bail!("expected blocks")
};
assert!(matches!(blocks[1], ContentBlock::Document { .. }));
Ok(())
}
}