use std::sync::Arc;
use agent_client_protocol_schema::{
Content as AcpContent, ContentBlock, ToolCallContent, ToolCallId, ToolCallStatus,
ToolCallUpdateFields,
};
use futures::StreamExt;
use serde_json::Value as JsonValue;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use crate::event::{AgentEvent, PermissionResolution};
use crate::fs::FsBackend;
use crate::http::HttpClient;
use crate::llm::{ImageData, Message, MessageContent, Role, ToolResultBody, ToolResultContent};
use crate::policy::{PolicyCtx, PolicyDecision, RecordedOutcome};
use crate::session::TurnError;
use crate::session::events::EventEmitter;
use crate::shell::ShellBackend;
use crate::tool::{Tool, ToolContext, ToolError, ToolEvent};
use super::TurnRunner;
use super::hooks::PreToolHookFlow;
use super::llm_drive::{ToolUseAccumulated, parse_args};
impl TurnRunner<'_> {
pub(super) async fn decide_permissions(
&self,
tool_uses: &[ToolUseAccumulated],
) -> Result<DecisionFlow, TurnError> {
let mut approved: Vec<Approved> = Vec::with_capacity(tool_uses.len());
for tu in tool_uses {
let id = ToolCallId::new(tu.id.clone());
let Some(tool) = self.tools.get(&tu.name) else {
let reason = format!("tool not found: {}", tu.name);
self.emit_tool_failed(&id, &tu.name, reason.clone()).await;
approved.push(Approved::FailedArgs {
id: id.clone(),
tool_use_id: tu.id.clone(),
name: tu.name.clone(),
reason,
});
continue;
};
let mut args: JsonValue = match parse_args(&tu.args_buf) {
Ok(v) => v,
Err(reason) => {
let reason = format!("invalid args: {reason}");
self.emit_tool_failed(&id, &tu.name, reason.clone()).await;
approved.push(Approved::FailedArgs {
id: id.clone(),
tool_use_id: tu.id.clone(),
name: tu.name.clone(),
reason,
});
continue;
}
};
let safety_hint_pre = tool.safety_hint(&args);
match self
.fire_pre_tool_use(&id, &tu.name, &args, safety_hint_pre)
.await
{
PreToolHookFlow::Continue { args: new_args } => {
args = new_args;
}
PreToolHookFlow::Block(reason) => {
self.emit_tool_failed(&id, &tu.name, reason).await;
approved.push(Approved::Denied {
id: id.clone(),
tool_use_id: tu.id.clone(),
name: tu.name.clone(),
});
continue;
}
}
let describe_ctx = ToolContext::new(
self.cwd,
self.cancel.clone(),
self.fs.clone(),
self.shell.clone(),
self.http.clone(),
&self.config.model,
)
.with_current_provider(&self.config.provider);
let description = tool.describe(&args, describe_ctx).await;
let mut started_fields =
with_status(description.fields.clone(), ToolCallStatus::Pending);
if started_fields.raw_input.is_none() {
started_fields.raw_input = Some(args.clone());
}
self.events
.emit(AgentEvent::ToolCallStarted {
id: id.clone(),
name: tu.name.clone(),
fields: started_fields,
})
.await;
let safety_hint = tool.safety_hint(&args);
let decision =
self.policy
.classify(PolicyCtx::new(&tu.name, safety_hint, &args, self.cwd));
self.events
.emit(AgentEvent::PolicyDecision {
id: id.clone(),
decision: decision.clone(),
})
.await;
match decision {
PolicyDecision::Allow => approved.push(Approved::Run {
id,
tool_use_id: tu.id.clone(),
tool: tool.clone(),
args,
}),
PolicyDecision::Deny => {
self.emit_tool_failed(&id, &tu.name, "denied by policy".to_string())
.await;
approved.push(Approved::Denied {
id: id.clone(),
tool_use_id: tu.id.clone(),
name: tu.name.clone(),
});
}
PolicyDecision::Ask(ask) => {
if ask.options.is_empty() {
self.emit_tool_failed(&id, &tu.name, "denied by policy".to_string())
.await;
approved.push(Approved::Denied {
id: id.clone(),
tool_use_id: tu.id.clone(),
name: tu.name.clone(),
});
continue;
}
let outcome = self.permissions.wait(id.clone(), self.cancel.clone()).await;
self.events
.emit(AgentEvent::PermissionResolved {
id: id.clone(),
outcome: outcome.clone(),
})
.await;
match outcome {
PermissionResolution::Selected { option_id } => {
let allows = ask
.options
.iter()
.find(|o| o.id == option_id)
.map(|o| o.allows)
.unwrap_or(false);
self.policy.record(
PolicyCtx::new(&tu.name, safety_hint, &args, self.cwd),
RecordedOutcome::Selected { option_id, allows },
);
if allows {
approved.push(Approved::Run {
id,
tool_use_id: tu.id.clone(),
tool: tool.clone(),
args,
});
} else {
self.emit_tool_failed(&id, &tu.name, "denied by user".to_string())
.await;
approved.push(Approved::Denied {
id: id.clone(),
tool_use_id: tu.id.clone(),
name: tu.name.clone(),
});
}
}
PermissionResolution::Cancelled => {
self.policy.record(
PolicyCtx::new(&tu.name, safety_hint, &args, self.cwd),
RecordedOutcome::Cancelled,
);
return Ok(DecisionFlow::Cancelled);
}
}
}
}
}
Ok(DecisionFlow::Continue(approved))
}
async fn emit_tool_failed(&self, id: &ToolCallId, name: &str, text: String) {
let fields = failed_fields_text(text);
self.events
.emit(AgentEvent::ToolCallStarted {
id: id.clone(),
name: name.to_owned(),
fields: fields.clone(),
})
.await;
self.events
.emit(AgentEvent::ToolCallFinished {
id: id.clone(),
fields,
})
.await;
}
pub(super) async fn run_tools_concurrently(&self, approved: Vec<Approved>) -> Vec<ToolResult> {
let mut joinset: JoinSet<ToolResult> = JoinSet::new();
let mut results: Vec<ToolResult> = Vec::with_capacity(approved.len());
let semaphore = (self.config.max_concurrent_tools > 0).then(|| {
Arc::new(tokio::sync::Semaphore::new(
self.config.max_concurrent_tools,
))
});
for a in approved {
match a {
Approved::Run {
id,
tool_use_id,
tool,
args,
} => {
let cancel = self.cancel.child_token();
let events = self.events.clone();
let cwd = self.cwd.to_path_buf();
let fs = self.fs.clone();
let shell = self.shell.clone();
let http = self.http.clone();
let model = self.config.model.clone();
let provider = self.config.provider.clone();
let background = self.background.clone();
let goal = self.goal.clone();
let policy = self.policy.clone();
let subagent_depth = self.config.subagent_max_depth;
let name = tool.schema().name.clone();
let span = tracing::info_span!(
"tool_call",
tool = %name,
tool_call_id = %id,
);
let semaphore = semaphore.clone();
joinset.spawn(
async move {
let _permit = match semaphore {
Some(sem) => {
Some(sem.acquire_owned().await.expect("semaphore not closed"))
}
None => None,
};
drive_tool_stream(
id,
tool_use_id,
name,
tool,
args,
cwd,
cancel,
events,
fs,
shell,
http,
model,
provider,
background,
goal,
policy,
subagent_depth,
)
.await
}
.instrument(span),
);
}
Approved::Denied {
id,
tool_use_id,
name,
} => {
results.push(ToolResult {
id,
name,
tool_use_id,
body: ToolResultBody::Text {
text: "denied".to_string(),
},
is_error: true,
fields: None,
error: Some("denied".to_string()),
});
}
Approved::FailedArgs {
id,
tool_use_id,
name,
reason,
} => {
results.push(ToolResult {
id,
name,
tool_use_id,
body: ToolResultBody::Text {
text: reason.clone(),
},
is_error: true,
fields: None,
error: Some(reason),
});
}
}
}
while let Some(res) = joinset.join_next().await {
match res {
Ok(r) => results.push(r),
Err(join_err) => {
tracing::error!(error = ?join_err, "tool task panicked");
results.push(ToolResult {
id: ToolCallId::new(""),
name: String::new(),
tool_use_id: String::new(),
body: ToolResultBody::Text {
text: format!("tool task crashed: {join_err}"),
},
is_error: true,
fields: None,
error: Some(format!("tool task crashed: {join_err}")),
});
}
}
}
for result in results.iter_mut() {
self.fire_post_tool_hook(result).await;
}
results
}
}
impl<'a> TurnRunner<'a> {}
pub(super) enum Approved {
Run {
id: ToolCallId,
tool_use_id: String,
tool: Arc<dyn Tool>,
args: JsonValue,
},
Denied {
id: ToolCallId,
tool_use_id: String,
name: String,
},
FailedArgs {
id: ToolCallId,
tool_use_id: String,
name: String,
reason: String,
},
}
pub(super) enum DecisionFlow {
Continue(Vec<Approved>),
Cancelled,
}
pub(super) struct ToolResult {
#[allow(dead_code)]
pub(super) id: ToolCallId,
pub(super) name: String,
pub(super) tool_use_id: String,
pub(super) body: ToolResultBody,
pub(super) is_error: bool,
#[allow(dead_code)]
pub(super) fields: Option<ToolCallUpdateFields>,
#[allow(dead_code)]
pub(super) error: Option<String>,
}
pub(super) fn approved_tool_name(a: &Approved) -> String {
match a {
Approved::Run { tool, .. } => tool.schema().name.clone(),
Approved::Denied { name, .. } | Approved::FailedArgs { name, .. } => name.clone(),
}
}
pub(super) fn oversized_rejection_text(tokens: u64, window: u64) -> String {
format!(
"[tool output rejected: ~{tokens} tokens exceeds the model context window of \
{window} tokens, so it cannot be added to the conversation. Re-run the tool with \
a narrower request — e.g. a tighter pattern, fewer files, a smaller line range, \
or pagination — so the result fits.]"
)
}
pub(super) fn reject_oversized_results(results: &mut [ToolResult], window: Option<u64>) -> usize {
let Some(window) = window else {
return 0;
};
let mut rejected = 0;
for r in results.iter_mut() {
let tokens = super::microcompact::estimate_tool_result_tokens(&r.body);
if tokens > window {
r.body = ToolResultBody::Text {
text: oversized_rejection_text(tokens, window),
};
r.is_error = true;
rejected += 1;
}
}
rejected
}
pub(super) fn tool_results_message(results: Vec<ToolResult>) -> Message {
Message {
role: Role::User,
content: results
.into_iter()
.map(|r| MessageContent::ToolResult {
tool_use_id: r.tool_use_id,
output: r.body,
is_error: r.is_error,
})
.collect(),
}
}
fn with_status(mut f: ToolCallUpdateFields, status: ToolCallStatus) -> ToolCallUpdateFields {
f.status = Some(status);
f
}
fn failed_fields_text(text: String) -> ToolCallUpdateFields {
let mut f = ToolCallUpdateFields::default();
f.status = Some(ToolCallStatus::Failed);
f.content = Some(vec![ToolCallContent::Content(AcpContent::new(text))]);
f
}
fn extract_body(fields: &ToolCallUpdateFields) -> Option<ToolResultBody> {
let raw = fields.content.as_ref()?;
let mut blocks: Vec<ToolResultContent> = Vec::new();
let mut has_image = false;
for c in raw {
let ToolCallContent::Content(inner) = c else {
continue;
};
match &inner.content {
ContentBlock::Text(t) => blocks.push(ToolResultContent::Text {
text: t.text.clone(),
}),
ContentBlock::Image(img) => {
has_image = true;
blocks.push(ToolResultContent::Image {
mime: img.mime_type.clone(),
data: ImageData::Base64 {
encoded: img.data.clone(),
},
});
}
_ => {}
}
}
if blocks.is_empty() {
return None;
}
if has_image {
return Some(ToolResultBody::Content { blocks });
}
let text = blocks
.into_iter()
.filter_map(|b| match b {
ToolResultContent::Text { text } => Some(text),
_ => None,
})
.collect::<Vec<_>>()
.join("\n");
Some(ToolResultBody::Text { text })
}
#[allow(clippy::too_many_arguments)]
async fn drive_tool_stream(
id: ToolCallId,
tool_use_id: String,
name: String,
tool: Arc<dyn Tool>,
args: JsonValue,
cwd: std::path::PathBuf,
cancel: CancellationToken,
events: Arc<EventEmitter>,
fs: Arc<dyn FsBackend>,
shell: Arc<dyn ShellBackend>,
http: Arc<dyn HttpClient>,
model: String,
provider: String,
background: Option<crate::session::BackgroundTasks>,
goal: Option<Arc<crate::session::GoalState>>,
policy: Arc<dyn crate::policy::SandboxPolicy>,
subagent_depth: u32,
) -> ToolResult {
let mut ctx = ToolContext::new(
&cwd,
cancel.clone(),
fs.clone(),
shell.clone(),
http.clone(),
&model,
)
.with_current_provider(&provider)
.with_policy(policy)
.with_subagent_depth(subagent_depth);
if let Some(bg) = background {
ctx = ctx.with_background(bg);
}
if let Some(goal) = goal {
ctx = ctx.with_goal(goal);
}
ctx = ctx.with_subagent_bridge(crate::tool::SubagentBridge {
parent_events: events.clone(),
parent_tool_call_id: id.clone(),
});
let mut stream = tool.execute(args, ctx);
let mut last_body: Option<ToolResultBody> = None;
while let Some(ev) = stream.next().await {
match ev {
ToolEvent::Progress(fields) => {
if let Some(body) = extract_body(&fields) {
last_body = Some(body);
}
events
.emit(AgentEvent::ToolCallProgress {
id: id.clone(),
fields: with_status(fields, ToolCallStatus::InProgress),
})
.await;
}
ToolEvent::Completed(fields) => {
if let Some(body) = extract_body(&fields) {
last_body = Some(body);
}
let fields = with_status(fields, ToolCallStatus::Completed);
events
.emit(AgentEvent::ToolCallFinished {
id: id.clone(),
fields: fields.clone(),
})
.await;
return ToolResult {
id,
name,
tool_use_id,
body: last_body.unwrap_or(ToolResultBody::Text {
text: String::new(),
}),
is_error: false,
fields: Some(fields),
error: None,
};
}
ToolEvent::Failed(err) => {
let text = err.to_string();
let is_cancel = matches!(err, ToolError::Canceled);
events
.emit(AgentEvent::ToolCallFinished {
id: id.clone(),
fields: failed_fields_text(text.clone()),
})
.await;
return ToolResult {
id,
name,
tool_use_id,
body: ToolResultBody::Text { text: text.clone() },
is_error: !is_cancel,
fields: None,
error: Some(text),
};
}
}
}
events
.emit(AgentEvent::ToolCallFinished {
id: id.clone(),
fields: failed_fields_text("tool stream closed without terminal event".to_string()),
})
.await;
let text = "tool stream closed without terminal event".to_string();
ToolResult {
id,
name,
tool_use_id,
body: ToolResultBody::Text { text: text.clone() },
is_error: true,
fields: None,
error: Some(text),
}
}
#[cfg(test)]
mod tests;