use crate::config::Config;
use crate::session::chat::session::ChatSession;
use crate::session::chat::ToolProcessor;
use crate::session::output::OutputMode;
use crate::{log_debug, log_info};
use anyhow::Result;
use colored::Colorize;
use std::io::IsTerminal;
pub enum ToolExecutionContext<'a> {
MainSession {
chat_session: &'a mut ChatSession,
tool_processor: &'a mut ToolProcessor,
},
Layer {
session_name: String,
layer_config: Box<crate::session::layers::LayerConfig>,
layer_name: String,
},
}
impl ToolExecutionContext<'_> {
pub fn session_name(&self) -> &str {
match self {
ToolExecutionContext::MainSession { chat_session, .. } => {
&chat_session.session.info.name
}
ToolExecutionContext::Layer { session_name, .. } => session_name,
}
}
pub fn execution_context(&self) -> Option<String> {
match self {
ToolExecutionContext::MainSession { .. } => None, ToolExecutionContext::Layer { layer_name, .. } => Some(layer_name.clone()),
}
}
pub fn is_tool_allowed(&self, tool_name: &str) -> bool {
match self {
ToolExecutionContext::MainSession { .. } => true, ToolExecutionContext::Layer { layer_config, .. } => {
let server_name = crate::mcp::tool_map::get_tool_server_name(tool_name)
.unwrap_or_else(|| "unknown".to_string());
layer_config.mcp.is_tool_allowed(tool_name, &server_name)
}
}
}
pub fn error_tracker(
&mut self,
) -> Option<&mut crate::session::chat::tool_error_tracker::ToolErrorTracker> {
match self {
ToolExecutionContext::MainSession { tool_processor, .. } => {
Some(&mut tool_processor.error_tracker)
}
ToolExecutionContext::Layer { .. } => None, }
}
pub fn increment_tool_calls(&mut self) {
if let ToolExecutionContext::MainSession { chat_session, .. } = self {
chat_session.session.info.tool_calls += 1;
}
}
pub fn handle_declined_output(&mut self, tool_id: &str) {
if let ToolExecutionContext::MainSession { chat_session, .. } = self {
handle_declined_in_session(tool_id, chat_session);
}
}
}
pub async fn execute_tools_in_context(
current_tool_calls: Vec<crate::mcp::McpToolCall>,
context: &mut ToolExecutionContext<'_>,
config: &Config,
operation_cancelled: Option<tokio::sync::watch::Receiver<bool>>,
mode: OutputMode,
) -> Result<(Vec<crate::mcp::McpToolResult>, u64)> {
let allowed_tool_calls: Vec<_> = current_tool_calls
.into_iter()
.filter(|tool_call| {
if context.is_tool_allowed(&tool_call.tool_name) {
true
} else {
if !mode.should_suppress_cli_output() {
println!(
"{} {} {}",
"Tool".red(),
tool_call.tool_name,
"not allowed in this context".red()
);
}
false
}
})
.collect();
if allowed_tool_calls.is_empty() {
return Ok((Vec::new(), 0));
}
execute_tools_with_context(
allowed_tool_calls,
context,
config,
operation_cancelled,
mode,
)
.await
}
pub async fn execute_tools_parallel(
current_tool_calls: Vec<crate::mcp::McpToolCall>,
chat_session: &mut ChatSession,
config: &Config,
tool_processor: &mut ToolProcessor,
operation_cancelled: tokio::sync::watch::Receiver<bool>,
mode: OutputMode,
) -> Result<(Vec<crate::mcp::McpToolResult>, u64)> {
let mut context = ToolExecutionContext::MainSession {
chat_session,
tool_processor,
};
let result = execute_tools_in_context(
current_tool_calls.clone(),
&mut context,
config,
Some(operation_cancelled),
mode,
)
.await;
result
}
async fn execute_tools_with_context(
current_tool_calls: Vec<crate::mcp::McpToolCall>,
context: &mut ToolExecutionContext<'_>,
config: &Config,
operation_cancelled: Option<tokio::sync::watch::Receiver<bool>>,
mode: OutputMode,
) -> Result<(Vec<crate::mcp::McpToolResult>, u64)> {
let mut tool_tasks = Vec::new();
let is_single_tool = current_tool_calls.len() == 1;
for (index, tool_call) in current_tool_calls.clone().iter().enumerate() {
context.increment_tool_calls();
let original_tool_id = tool_call.tool_id.clone();
let tool_name = tool_call.tool_name.clone();
let tool_index = index + 1;
let config_clone = config.clone();
let params_clone = tool_call.parameters.clone();
let _ = crate::session::logger::log_tool_call(
context.session_name(),
&tool_name,
&original_tool_id,
¶ms_clone,
);
let tool_id_for_task = original_tool_id.clone();
let tool_call_clone = tool_call.clone(); let cancel_token_for_task = operation_cancelled.clone();
let session_id = context.session_name().to_string();
let task = match context {
ToolExecutionContext::MainSession { .. } => {
tokio::spawn(async move {
crate::session::context::with_session_id(session_id, async move {
let mut call_with_id = tool_call_clone.clone();
call_with_id.tool_id = tool_id_for_task.clone();
crate::mcp::execute_tool_call(
&call_with_id,
&config_clone,
cancel_token_for_task,
)
.await
})
.await
})
}
ToolExecutionContext::Layer { layer_config, .. } => {
let layer_config_clone = (**layer_config).clone();
tokio::spawn(async move {
crate::session::context::with_session_id(session_id, async move {
let mut call_with_id = tool_call_clone.clone();
call_with_id.tool_id = tool_id_for_task.clone();
crate::mcp::execute_layer_tool_call(
&call_with_id,
&config_clone,
&layer_config_clone,
cancel_token_for_task, )
.await
})
.await
})
}
};
tool_tasks.push((tool_name, task, original_tool_id, tool_index));
}
let mut tool_results = Vec::new();
let mut _has_error = false;
let mut total_tool_time_ms = 0;
let task_info: Vec<(String, String, usize)> = tool_tasks
.iter()
.map(|(tool_name, _, tool_id, tool_index)| {
(tool_name.clone(), tool_id.clone(), *tool_index)
})
.collect();
let tasks: Vec<_> = tool_tasks.into_iter().map(|(_, task, _, _)| task).collect();
let all_tasks = futures::future::join_all(tasks);
tokio::pin!(all_tasks);
let task_results: Vec<_> = tokio::select! {
results = &mut all_tasks => results,
_ = async {
if let Some(ref cancel_rx) = operation_cancelled {
let mut cancel_rx_clone = cancel_rx.clone();
while !*cancel_rx_clone.borrow() {
if cancel_rx_clone.changed().await.is_err() {
break;
}
}
} else {
std::future::pending::<()>().await;
}
} => {
use colored::*;
if !mode.should_suppress_cli_output() {
println!(
"{}",
"🛑 All tool execution cancelled - returning to input".bright_yellow()
);
}
if !mode.should_suppress_cli_output() {
for (tool_name, _, _) in task_info {
println!(
"{}",
format!("🛑 Tool '{}' cancelled - server preserved", tool_name).bright_yellow()
);
}
}
return Ok((Vec::new(), total_tool_time_ms));
}
};
if !mode.should_suppress_cli_output() {
use crate::session::chat::get_animation_manager;
get_animation_manager().stop_current().await;
}
for ((tool_name, tool_id, tool_index), task_result) in task_info.into_iter().zip(task_results) {
let tool_call_info = current_tool_calls
.iter()
.find(|tc| tc.tool_id == tool_id)
.or_else(|| {
current_tool_calls
.iter()
.find(|tc| tc.tool_name == tool_name)
});
let stored_tool_call = tool_call_info.cloned();
match task_result {
Ok(result) => match result {
Ok((res, tool_time_ms)) => {
if res.is_error() {
_has_error = true;
if let Some(error_tracker) = context.error_tracker() {
let has_hit_threshold = error_tracker.record_error(&tool_name);
if has_hit_threshold {
crate::log_debug!("Tool '{}' has hit error threshold", tool_name);
}
}
let error_content = res.extract_content();
let error = anyhow::anyhow!("{}", error_content);
display_tool_error(
&stored_tool_call,
&tool_name,
&error,
tool_index,
config,
mode,
context.execution_context(),
)
.await;
tool_results.push(res.clone());
total_tool_time_ms += tool_time_ms;
} else {
if let Some(error_tracker) = context.error_tracker() {
error_tracker.record_success(&tool_name);
}
let display_params = ToolDisplayParams {
stored_tool_call: &stored_tool_call,
tool_name: &tool_name,
tool_id: &tool_id,
tool_index,
is_single_tool,
};
display_tool_success(
display_params,
&res,
tool_time_ms,
config,
mode,
context.session_name(),
context.execution_context(),
)
.await;
tool_results.push(res.clone());
total_tool_time_ms += tool_time_ms;
}
}
Err(e) => {
_has_error = true;
if e.to_string().contains("LARGE_OUTPUT_DECLINED_BY_USER") {
context.handle_declined_output(&tool_id);
continue;
}
display_tool_error(
&stored_tool_call,
&tool_name,
&e,
tool_index,
config,
mode,
context.execution_context(),
)
.await;
let loop_detected = if let Some(error_tracker) = context.error_tracker() {
error_tracker.record_error(&tool_name)
} else {
false
};
if loop_detected {
if let Some(error_tracker) = context.error_tracker() {
if !mode.should_suppress_cli_output() {
println!("{}", format!("⚠ Warning: {} failed {} times in a row - AI should try a different approach",
tool_name, error_tracker.max_consecutive_errors()).bright_yellow());
}
let loop_error_result = crate::mcp::McpToolResult::error(
tool_name.clone(),
tool_id.clone(),
format!("LOOP DETECTED: Tool '{}' failed {} consecutive times. Last error: {}. Please try a completely different approach or ask the user for guidance.", tool_name, error_tracker.max_consecutive_errors(), e),
);
tool_results.push(loop_error_result);
}
} else {
let error_result = if let Some(error_tracker) = context.error_tracker() {
crate::mcp::McpToolResult::error(
tool_name.clone(),
tool_id.clone(),
format!(
"Tool execution failed (attempt {}/{}): {}",
error_tracker.get_error_count(&tool_name),
error_tracker.max_consecutive_errors(),
e
),
)
} else {
crate::mcp::McpToolResult::error(
tool_name.clone(),
tool_id.clone(),
format!("Tool execution failed: {}", e),
)
};
tool_results.push(error_result);
if let Some(error_tracker) = context.error_tracker() {
log_info!(
"Tool '{}' failed {} of {} times. Adding error to context.",
tool_name,
error_tracker.get_error_count(&tool_name),
error_tracker.max_consecutive_errors()
);
}
}
}
},
Err(e) => {
_has_error = true;
if e.to_string().contains("LARGE_OUTPUT_DECLINED_BY_USER") {
context.handle_declined_output(&tool_id);
continue;
}
display_tool_error(
&stored_tool_call,
&tool_name,
&anyhow::anyhow!("{}", e),
tool_index,
config,
mode,
context.execution_context(),
)
.await;
if !mode.should_suppress_cli_output() {
println!("✗ Task error for '{}': {}", tool_name, e);
}
let error_result = crate::mcp::McpToolResult::error(
tool_name.clone(),
tool_id.clone(),
format!("Internal task error: {}", e),
);
tool_results.push(error_result);
}
}
}
let processed_results = handle_large_tool_results(tool_results, config, mode).await?;
Ok((processed_results, total_tool_time_ms))
}
async fn handle_large_tool_results(
results: Vec<crate::mcp::McpToolResult>,
config: &Config,
mode: OutputMode,
) -> Result<Vec<crate::mcp::McpToolResult>> {
use colored::Colorize;
use std::io::{stdin, stdout, Write};
let results: Vec<crate::mcp::McpToolResult> = results
.into_iter()
.map(|mut result| {
let content_str = result.extract_content();
let (truncated, was_truncated) = crate::utils::truncation::truncate_mcp_response_global(
&content_str,
config.mcp_response_tokens_threshold,
);
if was_truncated {
result.result =
rmcp::model::CallToolResult::success(vec![rmcp::model::Content::text(
truncated,
)]);
}
result
})
.collect();
let mut large_indices = Vec::new();
let mut total_tokens = 0;
for (index, result) in results.iter().enumerate() {
let estimated_tokens = crate::session::estimate_tokens(&result.extract_content());
if config.mcp_response_warning_threshold > 0
&& estimated_tokens > config.mcp_response_warning_threshold
{
large_indices.push((index, estimated_tokens));
total_tokens += estimated_tokens;
}
}
if large_indices.is_empty() {
return Ok(results);
}
if large_indices.len() == 1 {
let (index, _) = large_indices[0];
let result = &results[index];
let processed = crate::mcp::handle_large_response(result.clone(), config, mode).await?;
let mut new_results = results;
new_results[index] = processed;
return Ok(new_results);
}
if mode.should_suppress_cli_output() || !std::io::stdin().is_terminal() {
if !mode.should_suppress_cli_output() {
println!(
"{}",
format!(
"Large outputs from {} tools ({} total tokens) automatically declined in non-interactive mode.",
large_indices.len(), total_tokens
)
.bright_red()
);
}
let mut processed_results = results;
for (index, tokens) in large_indices {
processed_results[index] = crate::mcp::McpToolResult::error(
processed_results[index].tool_name.clone(),
processed_results[index].tool_id.clone(),
format!("Large output from tool '{}' ({} tokens) was automatically declined in non-interactive mode.", processed_results[index].tool_name, tokens)
);
}
return Ok(processed_results);
}
use crate::session::chat::get_animation_manager;
let animation_manager = get_animation_manager();
animation_manager.suspend().await;
println!(
"{}",
format!(
"⚠️ WARNING: {} tools produced large outputs (total: {} tokens)",
large_indices.len(),
total_tokens
)
.bright_yellow()
);
for (i, (index, tokens)) in large_indices.iter().enumerate() {
let result = &results[*index];
let server_name =
crate::session::chat::response::get_tool_server_name_async(&result.tool_name, config)
.await;
println!(
"{}",
format!(
"[{}] {} ({}) - {} tokens{}",
i + 1,
result.tool_name,
server_name,
tokens,
if !result.tool_id.is_empty() {
format!(" [ID: {}]", result.tool_id)
} else {
String::new()
}
)
.bright_yellow()
);
}
println!(
"{}",
"This may consume significant tokens and impact your usage limits.".bright_yellow()
);
print!("{}", "Do you want to continue? [y/N/1,2,3]: ".bright_cyan());
stdout().flush().unwrap();
let mut input = String::new();
stdin().read_line(&mut input).unwrap_or_default();
let input = input.trim();
animation_manager.resume();
let mut processed_results = results;
if input.is_empty() || input.to_lowercase().starts_with('n') {
println!("{}", "All large outputs declined by user.".bright_red());
for (index, tokens) in large_indices {
processed_results[index] = crate::mcp::McpToolResult::error(
processed_results[index].tool_name.clone(),
processed_results[index].tool_id.clone(),
format!(
"User declined large output from tool '{}' ({} tokens).",
processed_results[index].tool_name, tokens
),
);
}
} else if input.to_lowercase().starts_with('y') {
println!("{}", "Proceeding with all outputs...".bright_green());
} else {
let selected: std::collections::HashSet<usize> = input
.split(',')
.filter_map(|s| s.trim().parse::<usize>().ok())
.filter(|&i| i > 0 && i <= large_indices.len())
.collect();
if selected.is_empty() {
println!("{}", "Invalid selection. Declining all...".bright_red());
for (index, tokens) in large_indices {
processed_results[index] = crate::mcp::McpToolResult::error(
processed_results[index].tool_name.clone(),
processed_results[index].tool_id.clone(),
format!(
"User declined large output from tool '{}' ({} tokens).",
processed_results[index].tool_name, tokens
),
);
}
} else {
let kept_count = selected.len();
let declined_count = large_indices.len() - kept_count;
println!(
"{}",
format!(
"✓ Keeping {} tools, declining {} tools",
kept_count, declined_count
)
.bright_green()
);
for (i, (index, tokens)) in large_indices.iter().enumerate() {
if !selected.contains(&(i + 1)) {
processed_results[*index] = crate::mcp::McpToolResult::error(
processed_results[*index].tool_name.clone(),
processed_results[*index].tool_id.clone(),
format!(
"User selectively declined large output from tool '{}' ({} tokens).",
processed_results[*index].tool_name, tokens
),
);
}
}
}
}
Ok(processed_results)
}
struct ToolDisplayParams<'a> {
stored_tool_call: &'a Option<crate::mcp::McpToolCall>,
tool_name: &'a str,
tool_id: &'a str,
tool_index: usize,
is_single_tool: bool,
}
async fn display_tool_success(
params: ToolDisplayParams<'_>,
res: &crate::mcp::McpToolResult,
tool_time_ms: u64,
config: &Config,
mode: OutputMode,
session_name: &str,
execution_context: Option<String>, ) {
if !mode.should_suppress_cli_output() && (!params.is_single_tool || execution_context.is_some())
{
crate::session::chat::tool_display::display_individual_tool_header_with_context(
params.tool_name,
params.stored_tool_call,
config,
params.tool_index,
execution_context.as_deref(), )
.await;
}
if !mode.should_suppress_cli_output()
&& (config.get_log_level().is_info_enabled() || config.get_log_level().is_debug_enabled())
{
let content = res.extract_content();
if !content.trim().is_empty() {
if config.get_log_level().is_debug_enabled() {
println!("{}", content);
} else {
crate::session::chat::tool_display::display_tool_output_smart(&content);
}
}
}
if !mode.should_suppress_cli_output() {
let content = res.extract_content();
let token_count = crate::session::token_counter::estimate_tokens(&content);
let formatted_tokens = crate::session::chat::format_number(token_count as u64);
println!(
"✓ Tool '{}' completed in {}ms [{}]",
params.tool_name, tool_time_ms, formatted_tokens
);
println!("──────────────────");
}
let _ = crate::session::logger::log_tool_result(
session_name,
params.tool_id,
&serde_json::to_value(&res.result).unwrap_or_default(),
tool_time_ms,
);
}
async fn display_tool_error(
stored_tool_call: &Option<crate::mcp::McpToolCall>,
tool_name: &str,
error: &anyhow::Error,
tool_index: usize,
config: &Config,
mode: OutputMode,
execution_context: Option<String>,
) {
if mode.should_suppress_cli_output() {
return;
}
crate::session::chat::tool_display::display_individual_tool_header_with_context(
tool_name,
stored_tool_call,
config,
tool_index,
execution_context.as_deref(),
)
.await;
println!("✗ Tool '{}' failed: {}", tool_name, error);
}
fn handle_declined_in_session(tool_id: &str, chat_session: &mut ChatSession) {
println!("⚠ Tool output declined by user - removing tool call from conversation");
if let Some(last_msg) = chat_session.session.messages.last_mut() {
if last_msg.role == "assistant" {
if let Some(tool_calls_value) = &last_msg.tool_calls {
if let Ok(mut tool_calls_array) =
serde_json::from_value::<Vec<serde_json::Value>>(tool_calls_value.clone())
{
tool_calls_array
.retain(|tc| tc.get("id").and_then(|id| id.as_str()) != Some(tool_id));
if tool_calls_array.is_empty() {
last_msg.tool_calls = None;
log_debug!("Removed all tool calls from assistant message after user declined large output");
} else {
last_msg.tool_calls =
Some(serde_json::to_value(tool_calls_array).unwrap_or_default());
log_debug!(
"Removed declined tool call '{}' from assistant message",
tool_id
);
}
}
}
}
}
}
pub struct LayerToolExecutionParams {
pub tool_calls: Vec<crate::mcp::McpToolCall>,
pub session_name: String,
pub layer_config: crate::session::layers::LayerConfig,
pub layer_name: String,
pub operation_cancelled: Option<tokio::sync::watch::Receiver<bool>>,
pub mode: OutputMode,
}
pub async fn execute_layer_tool_calls_parallel(
config: &Config,
params: LayerToolExecutionParams,
) -> Result<(Vec<crate::mcp::McpToolResult>, u64)> {
let mut context = ToolExecutionContext::Layer {
session_name: params.session_name,
layer_config: Box::new(params.layer_config),
layer_name: params.layer_name,
};
execute_tools_in_context(
params.tool_calls,
&mut context,
config,
params.operation_cancelled,
params.mode,
)
.await
}