use std::sync::Arc;
use serde_json::Value;
use tokio::sync::mpsc;
use super::hooks::{AfterToolCallContext, BeforeToolCallContext};
use super::inflight::InflightSet;
use super::message::{
AssistantMessage, ContentBlock, EscalationReason, LoopEvent, LoopMessage, ToolResultMessage,
};
use super::result::LoopToolResult;
use super::tool::{AbortSignal, LoopTool, LoopToolUpdate};
use super::types::{Context, LoopConfig};
pub(crate) const SYNTAX_CHECK_PREFIX: &str = "Syntax check failed for ";
pub(crate) fn try_arm_escalation(config: &LoopConfig, reason: EscalationReason) {
use std::sync::atomic::Ordering;
let res = config
.escalation_remaining
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
if v == 0 { None } else { Some(v - 1) }
});
if res.is_err() {
tracing::debug!(
target: "dirge::agent_loop::escalation",
cap = %config.escalation_max_per_session,
"escalation budget exhausted; skipping arm",
);
return;
}
if let Ok(mut guard) = config.escalation_pending.lock() {
tracing::debug!(
target: "dirge::agent_loop::escalation",
reason = ?reason,
"escalation armed for next LLM call",
);
*guard = Some(reason);
}
}
#[derive(Debug, Clone)]
pub struct ExecutedToolCallBatch {
pub messages: Vec<ToolResultMessage>,
pub terminate: bool,
}
#[derive(Debug, Clone)]
pub struct ToolCall {
pub id: String,
pub name: String,
pub arguments: Value,
}
enum PrepareOutcome {
Prepared {
tool: Arc<dyn LoopTool>,
args: Value,
notes: Vec<String>,
},
Immediate {
result: LoopToolResult,
is_error: bool,
},
}
struct ExecutedOutcome {
result: LoopToolResult,
is_error: bool,
}
#[derive(Debug, Clone)]
struct FinalizedOutcome {
tool_call: ToolCall,
result: LoopToolResult,
is_error: bool,
}
pub async fn execute_tool_calls_sequential(
context: &Context,
assistant_message: &AssistantMessage,
tool_calls: &[ToolCall],
config: &LoopConfig,
signal: &AbortSignal,
emit: &mpsc::Sender<LoopEvent>,
inflight: &InflightSet,
) -> ExecutedToolCallBatch {
let mut finalized_calls: Vec<FinalizedOutcome> = Vec::with_capacity(tool_calls.len());
let mut messages: Vec<ToolResultMessage> = Vec::with_capacity(tool_calls.len());
for tool_call in tool_calls {
let _ = emit
.send(LoopEvent::ToolExecutionStart {
tool_call_id: tool_call.id.clone(),
tool_name: tool_call.name.clone(),
args: tool_call.arguments.clone(),
})
.await;
let prepared =
prepare_tool_call(context, assistant_message, tool_call, config, signal).await;
let finalized = match prepared {
PrepareOutcome::Immediate { result, is_error } => FinalizedOutcome {
tool_call: tool_call.clone(),
result,
is_error,
},
PrepareOutcome::Prepared { tool, args, notes } => {
let _inflight = inflight.guard(&tool_call.id);
let executed =
execute_prepared_tool_call(&tool, tool_call, &args, signal, emit).await;
let mut finalized = finalize_executed_tool_call(
context,
assistant_message,
tool_call,
&args,
executed,
config,
)
.await;
prepend_notes_to_result(&mut finalized.result, ¬es);
maybe_arm_escalation_for_syntactic_failure(
config,
tool_call,
&finalized.result,
finalized.is_error,
);
finalized
}
};
emit_tool_execution_end(&finalized, emit).await;
let result_msg = create_tool_result_message(&finalized);
emit_tool_result_message(&result_msg, emit).await;
finalized_calls.push(finalized);
messages.push(result_msg);
if signal.is_cancelled() {
break;
}
}
ExecutedToolCallBatch {
messages,
terminate: should_terminate_tool_batch(&finalized_calls),
}
}
async fn prepare_tool_call(
context: &Context,
assistant_message: &AssistantMessage,
tool_call: &ToolCall,
config: &LoopConfig,
signal: &AbortSignal,
) -> PrepareOutcome {
let tool = match context.tools.iter().find(|t| t.name() == tool_call.name) {
Some(t) => t.clone(),
None => {
let names: Vec<&str> = context.tools.iter().map(|t| t.name()).collect();
let mut msg = format!("Tool {} not found", tool_call.name);
if let Some(sugg) = super::suggest::closest(&tool_call.name, &names) {
msg.push_str(&format!(". Did you mean `{sugg}`?"));
}
return PrepareOutcome::Immediate {
result: create_error_tool_result(&msg),
is_error: true,
};
}
};
let prepared_args = tool.prepare_arguments(tool_call.arguments.clone());
let mut repair_notes: Vec<String> = Vec::new();
let mut validated_args = match crate::agent::agent_loop::tool_input_repair::validate_and_repair(
tool.parameters(),
&prepared_args,
) {
Ok(None) => {
prepared_args
}
Ok(Some(rr)) => {
let original_args = serde_json::to_string(&prepared_args).unwrap_or_default();
let original_truncated: String = if original_args.len() > 4096 {
format!(
"{}... ({} bytes truncated)",
crate::text::head(&original_args, 4096),
original_args.len() - 4096
)
} else {
original_args
};
tracing::info!(
target: "tool_repair",
model = config.model_name.as_deref().unwrap_or("unknown"),
tool = %tool_call.name,
repair = ?rr.kinds,
original_args = %original_truncated,
"tool input repaired"
);
let mut seen_kinds: std::collections::HashSet<
crate::agent::agent_loop::tool_input_repair::RepairKind,
> = std::collections::HashSet::new();
for kind in &rr.kinds {
if seen_kinds.insert(*kind) {
config.repair_stats.record(*kind);
}
}
repair_notes = rr.notes;
rr.repaired
}
Err(errors) => {
let msg = crate::agent::agent_loop::tool_input_repair::format_structured_error(
tool.parameters(),
&prepared_args,
&errors,
);
let original_args = serde_json::to_string(&prepared_args).unwrap_or_default();
let original_truncated: String = if original_args.len() > 16384 {
format!(
"{}... ({} bytes truncated)",
crate::text::head(&original_args, 16384),
original_args.len() - 16384
)
} else {
original_args
};
tracing::warn!(
target: "tool_input_invalid",
model = config.model_name.as_deref().unwrap_or("unknown"),
tool = %tool_call.name,
validation_errors = ?errors,
original_args = %original_truncated,
"tool input invalid after repair pass"
);
config.repair_stats.record_invalid();
try_arm_escalation(
config,
EscalationReason::RepairExhausted {
tool: tool_call.name.clone(),
},
);
return PrepareOutcome::Immediate {
result: create_error_tool_result(&msg),
is_error: true,
};
}
};
{
let mut sink = config
.truncation_notes
.lock()
.expect("truncation_notes poisoned");
if let Some(notes) = sink.remove(&tool_call.id) {
repair_notes.extend(notes);
}
}
if let Some(hook) = &config.before_tool_call {
let hook_ctx = BeforeToolCallContext {
assistant_message: assistant_message.clone(),
tool_call_id: tool_call.id.clone(),
tool_call_name: tool_call.name.clone(),
args: validated_args.clone(),
};
let ret = hook(hook_ctx).await;
validated_args = ret.args;
if signal.is_cancelled() {
return PrepareOutcome::Immediate {
result: create_error_tool_result("Operation aborted"),
is_error: true,
};
}
if let Some(before_result) = ret.result
&& before_result.block.unwrap_or(false)
{
let reason = before_result
.reason
.unwrap_or_else(|| "Tool execution was blocked".to_string());
return PrepareOutcome::Immediate {
result: create_error_tool_result(&reason),
is_error: true,
};
}
}
if signal.is_cancelled() {
return PrepareOutcome::Immediate {
result: create_error_tool_result("Operation aborted"),
is_error: true,
};
}
if let Some(tracker) = &config.file_touch_tracker {
tracker.record_tool_call(&tool_call.name, &validated_args);
}
PrepareOutcome::Prepared {
tool,
args: validated_args,
notes: repair_notes,
}
}
async fn execute_prepared_tool_call(
tool: &Arc<dyn LoopTool>,
tool_call: &ToolCall,
args: &Value,
signal: &AbortSignal,
emit: &mpsc::Sender<LoopEvent>,
) -> ExecutedOutcome {
let emit_clone = emit.clone();
let id_clone = tool_call.id.clone();
let name_clone = tool_call.name.clone();
let args_clone = tool_call.arguments.clone();
let dropped_count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let dropped_clone = dropped_count.clone();
let on_update: LoopToolUpdate = Arc::new(move |partial: &LoopToolResult| {
let evt = LoopEvent::ToolExecutionUpdate {
tool_call_id: id_clone.clone(),
tool_name: name_clone.clone(),
args: args_clone.clone(),
partial_result: partial.clone(),
};
if emit_clone.try_send(evt).is_err() {
let prev = dropped_clone.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if prev == 0 {
tracing::warn!(
target: "dirge::agent_loop::tools",
tool = %name_clone,
tool_call_id = %id_clone,
"ToolExecutionUpdate channel full or closed; dropping update events",
);
}
}
});
let exec_future = tool.execute(&tool_call.id, args.clone(), signal.clone(), on_update);
let signal_check = wait_for_cancel(signal.clone());
let outcome = tokio::select! {
biased; _ = signal_check => {
return ExecutedOutcome {
result: create_error_tool_result(
"tool execution aborted by cancellation signal",
),
is_error: true,
};
}
result = exec_future => result,
};
let final_dropped = dropped_count.load(std::sync::atomic::Ordering::Relaxed);
if final_dropped > 0 {
tracing::info!(
target: "dirge::agent_loop::tools",
tool = %tool_call.name,
tool_call_id = %tool_call.id,
dropped = final_dropped,
"ToolExecutionUpdate events dropped during tool execution",
);
}
match outcome {
Ok(result) => ExecutedOutcome {
result,
is_error: false,
},
Err(err) => ExecutedOutcome {
result: create_error_tool_result(&err),
is_error: true,
},
}
}
async fn finalize_executed_tool_call(
context: &Context,
assistant_message: &AssistantMessage,
tool_call: &ToolCall,
args: &Value,
executed: ExecutedOutcome,
config: &LoopConfig,
) -> FinalizedOutcome {
let mut result = executed.result;
let mut is_error = executed.is_error;
if let Some(hook) = &config.after_tool_call {
let hook_ctx = AfterToolCallContext {
assistant_message: assistant_message.clone(),
tool_call_id: tool_call.id.clone(),
tool_call_name: tool_call.name.clone(),
args: args.clone(),
result: result.clone(),
is_error,
};
if let Some(after) = hook(hook_ctx).await {
result = LoopToolResult {
content: after.content.unwrap_or(result.content),
details: after.details.unwrap_or(result.details),
terminate: after.terminate.or(result.terminate),
};
is_error = after.is_error.unwrap_or(is_error);
}
}
let _ = context;
if let Some(verifier) = &config.verifier {
verifier.record_outcome(&tool_call.name, args, &result, is_error);
}
FinalizedOutcome {
tool_call: tool_call.clone(),
result,
is_error,
}
}
fn should_terminate_tool_batch(finalized: &[FinalizedOutcome]) -> bool {
!finalized.is_empty()
&& finalized
.iter()
.all(|f| f.result.terminate.unwrap_or(false))
}
async fn wait_for_cancel(signal: AbortSignal) {
signal.cancelled().await;
}
pub(crate) fn maybe_arm_escalation_for_syntactic_failure(
config: &LoopConfig,
tool_call: &ToolCall,
result: &LoopToolResult,
is_error: bool,
) {
if !is_error {
return;
}
let text = result.content.iter().find_map(|b| {
let obj = b.as_object()?;
if obj.get("type").and_then(|t| t.as_str()) == Some("text") {
obj.get("text").and_then(|t| t.as_str())
} else {
None
}
});
let text = match text {
Some(t) => t,
None => return,
};
if !text.starts_with(SYNTAX_CHECK_PREFIX) {
return;
}
let after = &text[SYNTAX_CHECK_PREFIX.len()..];
let path = match after.find(':') {
Some(i) => after[..i].to_string(),
None => String::new(),
};
try_arm_escalation(
config,
EscalationReason::SyntacticFailure {
tool: tool_call.name.clone(),
path,
},
);
}
fn create_error_tool_result(message: &str) -> LoopToolResult {
LoopToolResult {
content: vec![serde_json::json!({"type": "text", "text": message})],
details: serde_json::json!({}),
terminate: None,
}
}
fn prepend_notes_to_result(result: &mut LoopToolResult, notes: &[String]) {
if notes.is_empty() {
return;
}
let joined = notes.join("\n");
let note_block = serde_json::json!({"type": "text", "text": joined});
result.content.insert(0, note_block);
}
async fn emit_tool_execution_end(finalized: &FinalizedOutcome, emit: &mpsc::Sender<LoopEvent>) {
let _ = emit
.send(LoopEvent::ToolExecutionEnd {
tool_call_id: finalized.tool_call.id.clone(),
tool_name: finalized.tool_call.name.clone(),
result: finalized.result.clone(),
is_error: finalized.is_error,
})
.await;
}
fn create_tool_result_message(finalized: &FinalizedOutcome) -> ToolResultMessage {
let content_blocks: Vec<ContentBlock> = finalized
.result
.content
.iter()
.map(content_value_to_block)
.collect();
ToolResultMessage {
tool_call_id: finalized.tool_call.id.clone(),
tool_name: finalized.tool_call.name.clone(),
content: content_blocks,
details: finalized.result.details.clone(),
is_error: finalized.is_error,
}
}
fn content_value_to_block(value: &Value) -> ContentBlock {
if let Some(obj) = value.as_object()
&& obj.get("type").and_then(|t| t.as_str()) == Some("text")
&& let Some(text) = obj.get("text").and_then(|t| t.as_str())
{
return ContentBlock::Text {
text: crate::sandbox::redact_secrets(text).into_owned(),
};
}
ContentBlock::Text {
text: crate::sandbox::redact_secrets(&value.to_string()).into_owned(),
}
}
async fn emit_tool_result_message(msg: &ToolResultMessage, emit: &mpsc::Sender<LoopEvent>) {
let _ = emit
.send(LoopEvent::MessageStart {
message: LoopMessage::ToolResult(msg.clone()),
})
.await;
let _ = emit
.send(LoopEvent::MessageEnd {
message: LoopMessage::ToolResult(msg.clone()),
})
.await;
}
pub async fn execute_tool_calls_parallel(
context: &Context,
assistant_message: &AssistantMessage,
tool_calls: &[ToolCall],
config: &LoopConfig,
signal: &AbortSignal,
emit: &mpsc::Sender<LoopEvent>,
inflight: &InflightSet,
) -> ExecutedToolCallBatch {
use futures::future::join_all;
use std::pin::Pin;
type ResolveFuture = Pin<Box<dyn Future<Output = FinalizedOutcome> + Send>>;
let mut entries: Vec<ResolveFuture> = Vec::with_capacity(tool_calls.len());
for tool_call in tool_calls {
let _ = emit
.send(LoopEvent::ToolExecutionStart {
tool_call_id: tool_call.id.clone(),
tool_name: tool_call.name.clone(),
args: tool_call.arguments.clone(),
})
.await;
let prepared =
prepare_tool_call(context, assistant_message, tool_call, config, signal).await;
match prepared {
PrepareOutcome::Immediate { result, is_error } => {
let finalized = FinalizedOutcome {
tool_call: tool_call.clone(),
result,
is_error,
};
emit_tool_execution_end(&finalized, emit).await;
entries.push(Box::pin(futures::future::ready(finalized)));
if signal.is_cancelled() {
break;
}
}
PrepareOutcome::Prepared { tool, args, notes } => {
let tool_call_clone = tool_call.clone();
let assistant_clone = assistant_message.clone();
let config_clone = config.clone();
let context_clone = context.clone();
let signal_clone = signal.clone();
let emit_clone = emit.clone();
let inflight_clone = inflight.clone();
let call_id = tool_call.id.clone();
entries.push(Box::pin(async move {
let _guard = inflight_clone.guard(&call_id);
let executed = execute_prepared_tool_call(
&tool,
&tool_call_clone,
&args,
&signal_clone,
&emit_clone,
)
.await;
let mut finalized = finalize_executed_tool_call(
&context_clone,
&assistant_clone,
&tool_call_clone,
&args,
executed,
&config_clone,
)
.await;
prepend_notes_to_result(&mut finalized.result, ¬es);
maybe_arm_escalation_for_syntactic_failure(
&config_clone,
&tool_call_clone,
&finalized.result,
finalized.is_error,
);
emit_tool_execution_end(&finalized, &emit_clone).await;
finalized
}));
if signal.is_cancelled() {
break;
}
}
}
}
let finalized: Vec<FinalizedOutcome> = join_all(entries).await;
let mut messages: Vec<ToolResultMessage> = Vec::with_capacity(finalized.len());
for f in &finalized {
let msg = create_tool_result_message(f);
emit_tool_result_message(&msg, emit).await;
messages.push(msg);
}
ExecutedToolCallBatch {
messages,
terminate: should_terminate_tool_batch(&finalized),
}
}
pub(crate) fn backfill_missing_tool_results(
calls: &[ToolCall],
results: &[ToolResultMessage],
) -> Vec<ToolResultMessage> {
let answered: std::collections::HashSet<&str> =
results.iter().map(|r| r.tool_call_id.as_str()).collect();
calls
.iter()
.filter(|c| !answered.contains(c.id.as_str()))
.map(|c| ToolResultMessage {
tool_call_id: c.id.clone(),
tool_name: c.name.clone(),
content: vec![ContentBlock::Text {
text: "[tool call not executed: it was suppressed as a repeated/looping call, or \
the run was interrupted. Do NOT repeat it — try a different approach.]"
.to_string(),
}],
details: Value::Null,
is_error: true,
})
.collect()
}
pub async fn execute_tool_calls(
context: &Context,
assistant_message: &AssistantMessage,
tool_calls: &[ToolCall],
config: &LoopConfig,
signal: &AbortSignal,
emit: &mpsc::Sender<LoopEvent>,
inflight: &InflightSet,
) -> ExecutedToolCallBatch {
let has_sequential = tool_calls.iter().any(|tc| {
context
.tools
.iter()
.find(|t| t.name() == tc.name)
.and_then(|t| t.execution_mode())
== Some(super::types::ToolExecutionMode::Sequential)
});
if config.tool_execution == super::types::ToolExecutionMode::Sequential || has_sequential {
execute_tool_calls_sequential(
context,
assistant_message,
tool_calls,
config,
signal,
emit,
inflight,
)
.await
} else {
execute_tool_calls_parallel(
context,
assistant_message,
tool_calls,
config,
signal,
emit,
inflight,
)
.await
}
}
#[cfg(test)]
pub async fn execute_tool_calls_from_msg(
context: &Context,
assistant_message: &AssistantMessage,
config: &LoopConfig,
signal: &AbortSignal,
emit: &mpsc::Sender<LoopEvent>,
inflight: &InflightSet,
) -> ExecutedToolCallBatch {
let tool_calls = extract_tool_calls(assistant_message);
execute_tool_calls(
context,
assistant_message,
&tool_calls,
config,
signal,
emit,
inflight,
)
.await
}
pub fn extract_tool_calls(msg: &AssistantMessage) -> Vec<ToolCall> {
msg.content
.iter()
.filter_map(|block| match block {
ContentBlock::ToolCall {
id,
name,
arguments,
} => Some(ToolCall {
id: id.clone(),
name: name.clone(),
arguments: arguments.clone(),
}),
_ => None,
})
.collect()
}
#[cfg(test)]
#[path = "tools_tests.rs"]
mod tests;