use crate::config::McpConnectionType;
use crate::log_debug;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::io::{IsTerminal, Write};
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone)]
pub enum McpInitProgress {
Starting { servers: Vec<String> },
Completed {
server: String,
success: bool,
function_count: usize,
},
}
pub mod hint_accumulator;
pub mod tool_map;
pub mod utils;
pub mod workdir;
pub use utils::{
ensure_tool_call_ids, extract_mcp_content, guess_tool_category, parse_tool_calls,
tool_results_to_messages, ToolResponseMessage,
};
pub use workdir::{
get_thread_original_working_directory, get_thread_working_directory,
set_session_working_directory, set_thread_working_directory,
};
lazy_static::lazy_static! {
static ref INTERNAL_FUNCTION_CACHE: Arc<RwLock<std::collections::HashMap<String, Vec<McpFunction>>>> =
Arc::new(RwLock::new(std::collections::HashMap::new()));
}
pub mod oauth;
pub mod agent;
pub mod core;
pub mod health_monitor;
pub mod process;
pub mod server;
pub mod shared_utils;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpToolCall {
pub tool_name: String,
pub parameters: Value,
#[serde(default)]
pub tool_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpToolResult {
pub tool_name: String,
pub result: Value,
#[serde(default)]
pub tool_id: String,
}
impl McpToolResult {
pub fn success(tool_name: String, tool_id: String, content: String) -> Self {
Self {
tool_name,
tool_id,
result: json!({
"content": [
{
"type": "text",
"text": content
}
],
"isError": false
}),
}
}
pub fn success_with_metadata(
tool_name: String,
tool_id: String,
content: String,
metadata: serde_json::Value,
) -> Self {
Self {
tool_name,
tool_id,
result: json!({
"content": [
{
"type": "text",
"text": content
}
],
"isError": false,
"metadata": metadata
}),
}
}
pub fn error(tool_name: String, tool_id: String, error_message: String) -> Self {
Self {
tool_name,
tool_id,
result: json!({
"content": [
{
"type": "text",
"text": error_message
}
],
"isError": true
}),
}
}
pub fn is_error(&self) -> bool {
self.result
.get("isError")
.and_then(|v| v.as_bool())
.unwrap_or(false)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpFunction {
pub name: String,
pub description: String,
pub parameters: Value,
}
pub async fn initialize_servers_for_role(config: &crate::config::Config) -> Result<()> {
initialize_servers_for_role_with_callback(config, None).await
}
pub async fn initialize_servers_for_role_with_callback(
config: &crate::config::Config,
progress_callback: Option<&(dyn Fn(McpInitProgress) + Send + Sync)>,
) -> Result<()> {
if config.mcp.servers.is_empty() {
crate::log_debug!("No MCP servers enabled for this role");
return Ok(());
}
crate::log_debug!(
"Initializing {} MCP servers for role",
config.mcp.servers.len()
);
let external_servers: Vec<_> = config
.mcp
.servers
.iter()
.filter(|server| {
matches!(
server.connection_type(),
McpConnectionType::Http | McpConnectionType::Stdin
)
})
.filter(|server| !server::is_server_already_running_with_config(server))
.collect();
for server in config.mcp.servers.iter().filter(|s| {
!matches!(
s.connection_type(),
McpConnectionType::Http | McpConnectionType::Stdin
)
}) {
crate::log_debug!(
"Skipping initialization for internal server: {} ({:?})",
server.name(),
server.connection_type()
);
}
if let Some(callback) = &progress_callback {
callback(McpInitProgress::Starting {
servers: external_servers
.iter()
.map(|s| s.name().to_string())
.collect(),
});
}
let init_futures: Vec<_> = external_servers
.into_iter()
.map(|server| {
let name = server.name().to_string();
let callback = progress_callback;
async move {
crate::log_debug!("Initializing external server: {}", name);
let result = server::get_server_functions(server).await;
let (success, function_count) = match &result {
Ok(functions) => (true, functions.len()),
Err(_) => (false, 0),
};
if let Some(cb) = callback {
cb(McpInitProgress::Completed {
server: name.clone(),
success,
function_count,
});
}
(name, result)
}
})
.collect();
let results = futures::future::join_all(init_futures).await;
for (server_name, result) in results {
match result {
Ok(functions) => {
crate::log_debug!(
"Successfully initialized server '{}' with {} functions",
server_name,
functions.len()
);
for func in &functions {
crate::log_debug!(" - Available: {}", func.name);
}
}
Err(e) => {
crate::log_debug!(
"Failed to initialize server '{}': {} (will retry on first use)",
server_name,
e
);
}
}
}
let config_arc = std::sync::Arc::new(config.clone());
if let Err(e) = health_monitor::start_health_monitor(config_arc).await {
crate::log_debug!("Failed to start health monitor: {}", e);
}
crate::log_debug!("MCP server initialization completed");
Ok(())
}
pub async fn initialize_mcp_for_role(role: &str, config: &crate::config::Config) -> Result<()> {
initialize_mcp_for_role_with_callback(role, config, None).await
}
pub async fn initialize_mcp_for_role_with_callback(
role: &str,
config: &crate::config::Config,
progress_callback: Option<&(dyn Fn(McpInitProgress) + Send + Sync)>,
) -> Result<()> {
let config_for_role = config.get_merged_config_for_role(role);
process::init_session_context(role);
if let Err(e) =
initialize_servers_for_role_with_callback(&config_for_role, progress_callback).await
{
crate::log_debug!("Warning: Failed to initialize MCP servers: {}", e);
}
if let Err(e) = tool_map::initialize_tool_map(&config_for_role).await {
crate::log_debug!("Warning: Failed to initialize tool map: {}", e);
}
Ok(())
}
async fn server_functions_for(
server: &crate::config::McpServerConfig,
config: &crate::config::Config,
) -> Vec<McpFunction> {
match server.connection_type() {
McpConnectionType::Builtin => match server.name() {
"core" => {
get_filtered_server_functions("core", server.tools(), core::get_all_functions)
}
"agent" => {
let fns = agent::get_all_functions(config);
filter_tools_by_patterns(fns, server.tools())
}
other => {
crate::log_debug!("Unknown builtin server: {}", other);
Vec::new()
}
},
McpConnectionType::Http | McpConnectionType::Stdin => {
match server::get_server_functions_cached(server).await {
Ok(fns) => filter_tools_by_patterns(fns, server.tools()),
Err(e) => {
crate::log_error!(
"Failed to get cached functions from external server '{}': {} (will be available when server starts)",
server.name(),
e
);
Vec::new()
}
}
}
}
}
pub async fn get_available_functions(config: &crate::config::Config) -> Vec<McpFunction> {
if config.mcp.servers.is_empty() {
crate::log_debug!("MCP has no servers configured, no functions available");
return Vec::new();
}
let mut functions = Vec::new();
for server in &config.mcp.servers {
functions.extend(server_functions_for(server, config).await);
}
functions.extend(crate::mcp::core::dynamic::get_all_functions());
functions.extend(crate::mcp::core::dynamic_agents::get_all_functions());
functions
}
fn filter_tools_by_patterns(tools: Vec<McpFunction>, allowed_tools: &[String]) -> Vec<McpFunction> {
if allowed_tools.is_empty() {
tools
} else {
tools
.into_iter()
.filter(|func| is_tool_allowed_by_patterns(&func.name, allowed_tools))
.collect()
}
}
pub fn is_tool_allowed_by_patterns(tool_name: &str, allowed_tools: &[String]) -> bool {
if allowed_tools.is_empty() {
return true;
}
for pattern in allowed_tools {
if pattern.ends_with('*') {
let prefix = &pattern[..pattern.len() - 1];
if tool_name.starts_with(prefix) {
return true;
}
} else {
if tool_name == pattern {
return true;
}
}
}
false
}
pub fn get_filtered_server_functions<F>(
server_type: &str,
allowed_tools: &[String],
get_functions: F,
) -> Vec<McpFunction>
where
F: FnOnce() -> Vec<McpFunction>,
{
let cache_key = if allowed_tools.is_empty() {
format!("{}_all", server_type)
} else {
format!("{}_{}", server_type, allowed_tools.join(","))
};
{
let cache = INTERNAL_FUNCTION_CACHE.read().unwrap();
if let Some(cached_functions) = cache.get(&cache_key) {
return cached_functions.clone();
}
}
crate::log_debug!("Computing and caching {} functions", server_type);
let all_functions = get_functions();
let filtered_functions = if allowed_tools.is_empty() {
all_functions
} else {
all_functions
.into_iter()
.filter(|func| is_tool_allowed_by_patterns(&func.name, allowed_tools))
.collect()
};
{
let mut cache = INTERNAL_FUNCTION_CACHE.write().unwrap();
cache.insert(cache_key, filtered_functions.clone());
}
filtered_functions
}
pub fn clear_function_cache() {
let mut cache = INTERNAL_FUNCTION_CACHE.write().unwrap();
let count = cache.len();
cache.clear();
if count > 0 {
crate::log_debug!("Cleared internal function cache for {} entries", count);
}
}
pub async fn execute_tool_call(
call: &McpToolCall,
config: &crate::config::Config,
cancellation_token: Option<tokio::sync::watch::Receiver<bool>>,
) -> Result<(McpToolResult, u64)> {
log_debug!("Debug: Executing tool call: {}", call.tool_name);
log_debug!(
"Debug: MCP has {} servers configured",
config.mcp.servers.len()
);
if let Ok(params) = serde_json::to_string_pretty(&call.parameters) {
log_debug!("Debug: Tool parameters: {}", params);
}
if config.mcp.servers.is_empty() {
return Err(anyhow::anyhow!("MCP has no servers configured"));
}
if let Some(ref token) = cancellation_token {
if *token.borrow() {
return Err(anyhow::anyhow!("Tool execution cancelled"));
}
}
let tool_start = std::time::Instant::now();
let result = try_execute_tool_call(call, config, cancellation_token).await;
let tool_duration = tool_start.elapsed();
let tool_time_ms = tool_duration.as_millis() as u64;
match result {
Ok(tool_result) => {
Ok((tool_result, tool_time_ms))
}
Err(e) => Err(e),
}
}
pub async fn build_tool_server_map(
config: &crate::config::Config,
) -> std::collections::HashMap<String, crate::config::McpServerConfig> {
let mut tool_map = std::collections::HashMap::new();
for server in &config.mcp.servers {
let server_functions = server_functions_for(server, config).await;
for function in server_functions {
tool_map
.entry(function.name)
.or_insert_with(|| server.clone());
}
}
crate::log_debug!("Built tool-to-server map with {} tools", tool_map.len());
tool_map
}
async fn try_execute_tool_call(
call: &McpToolCall,
config: &crate::config::Config,
cancellation_token: Option<tokio::sync::watch::Receiver<bool>>,
) -> Result<McpToolResult> {
if config.mcp.servers.is_empty() {
return Err(anyhow::anyhow!("MCP has no servers configured"));
}
let tool_execution_future =
execute_tool_without_cancellation(call, config, cancellation_token.clone());
if let Some(token) = cancellation_token {
if *token.borrow() {
return Err(anyhow::anyhow!("Tool execution cancelled"));
}
let mut cancel_receiver = token.clone();
let cancellation_future = async move {
loop {
if *cancel_receiver.borrow() {
break;
}
cancel_receiver.changed().await.ok();
}
};
tokio::select! {
biased;
_ = cancellation_future => {
Err(anyhow::anyhow!("Tool execution cancelled during execution"))
}
result = tool_execution_future => {
result
}
}
} else {
tool_execution_future.await
}
}
async fn route_builtin_tool(
call: &McpToolCall,
server_name: &str,
config: &crate::config::Config,
cancellation_token: Option<tokio::sync::watch::Receiver<bool>>,
) -> Result<McpToolResult> {
match server_name {
"core" => {
crate::log_debug!("Executing '{}' via core builtin server", call.tool_name);
let result = match call.tool_name.as_str() {
"plan" => core::execute_plan(call)
.await
.map_err(|e| format!("Plan execution failed: {}", e)),
"mcp" => core::execute_mcp_command(call, config)
.await
.map_err(|e| format!("MCP management failed: {}", e)),
"agent" => core::execute_agent_tool_command(call)
.await
.map_err(|e| format!("Agent management failed: {}", e)),
"schedule" => core::execute_schedule_tool(call)
.await
.map_err(|e| format!("Schedule execution failed: {}", e)),
"skill" => core::execute_skill_tool(call)
.await
.map_err(|e| format!("Skill tool failed: {}", e)),
other => {
return Err(anyhow::anyhow!(
"Tool '{}' not implemented in core server",
other
))
}
};
match result {
Ok(mut r) => {
r.tool_id = call.tool_id.clone();
Ok(r)
}
Err(msg) => Ok(McpToolResult::error(
call.tool_name.clone(),
call.tool_id.clone(),
msg,
)),
}
}
"agent" => {
if !call.tool_name.starts_with("agent_") {
return Err(anyhow::anyhow!(
"Tool '{}' not implemented in agent server",
call.tool_name
));
}
crate::log_debug!(
"Executing agent tool '{}' via agent builtin server",
call.tool_name
);
let mut result = agent::execute_agent_command(call, config, cancellation_token).await?;
result.tool_id = call.tool_id.clone();
Ok(result)
}
other => Err(anyhow::anyhow!("Unknown builtin server: {}", other)),
}
}
async fn execute_tool_without_cancellation(
call: &McpToolCall,
config: &crate::config::Config,
cancellation_token: Option<tokio::sync::watch::Receiver<bool>>,
) -> Result<McpToolResult> {
if let Some(target_server) = tool_map::get_server_for_tool(&call.tool_name) {
crate::log_debug!(
"Routing tool '{}' to server '{}' ({:?})",
call.tool_name,
target_server.name(),
target_server.connection_type()
);
return match target_server.connection_type() {
McpConnectionType::Builtin => {
route_builtin_tool(call, target_server.name(), config, cancellation_token).await
}
McpConnectionType::Http | McpConnectionType::Stdin => {
let mut result = server::execute_tool_call(call, &target_server, None).await?;
result.tool_id = call.tool_id.clone();
Ok(result)
}
};
}
let available_tools = tool_map::get_all_tool_names();
Err(anyhow::anyhow!(
"Tool '{}' not found in any configured MCP server. Available tools: {}",
call.tool_name,
if available_tools.is_empty() {
"none (tool map not initialized)".to_string()
} else {
available_tools.join(", ")
}
))
}
pub async fn handle_large_response(
result: McpToolResult,
config: &crate::config::Config,
mode: crate::session::output::OutputMode,
) -> Result<McpToolResult> {
let estimated_tokens = crate::session::estimate_tokens(&format!("{}", result.result));
if config.mcp_response_warning_threshold > 0
&& estimated_tokens > config.mcp_response_warning_threshold
{
use colored::Colorize;
let suppress_cli_output = mode.should_suppress_cli_output();
let non_interactive = !mode.is_interactive() || !std::io::stdin().is_terminal();
let server_name =
crate::session::chat::response::get_tool_server_name_async(&result.tool_name, config)
.await;
if !suppress_cli_output {
println!(
"{}",
format!(
"! WARNING: Tool '{}' ({}){} produced a large output ({} tokens)",
result.tool_name,
server_name,
if !result.tool_id.is_empty() {
format!(" [ID: {}]", result.tool_id)
} else {
String::new()
},
estimated_tokens
)
.bright_yellow()
);
println!(
"{}",
"This may consume significant tokens and impact your usage limits.".bright_yellow()
);
}
if suppress_cli_output || non_interactive {
if !suppress_cli_output {
println!(
"{}",
format!(
"Large output from '{}' ({}) automatically declined in non-interactive mode. Continuing...",
result.tool_name, server_name
)
.bright_red()
);
}
return Ok(McpToolResult::error(
result.tool_name.clone(),
result.tool_id.clone(),
format!("Large output from tool '{}' ({} tokens) was automatically declined in non-interactive mode to avoid excessive token usage. The tool executed successfully but the output was too large.", result.tool_name, estimated_tokens)
));
}
use crate::session::chat::get_animation_manager;
let animation_manager = get_animation_manager();
animation_manager.suspend().await;
print!(
"{}",
"Do you want to continue with this large output? [y/N]: ".bright_cyan()
);
std::io::stdout().flush().unwrap();
let mut input = String::new();
std::io::stdin().read_line(&mut input).unwrap_or_default();
animation_manager.resume();
if !input.trim().to_lowercase().starts_with('y') {
println!(
"{}",
format!(
"Large output from '{}' ({}) declined by user. Continuing conversation...",
result.tool_name, server_name
)
.bright_red()
);
return Ok(McpToolResult::error(
result.tool_name.clone(),
result.tool_id.clone(),
format!("User declined to process large output from tool '{}' ({} tokens). The tool executed successfully but the output was too large and the user chose not to include it in the conversation to avoid excessive token usage.", result.tool_name, estimated_tokens)
));
}
println!("{}", "Proceeding with full output...".bright_green());
}
Ok(result)
}
pub async fn execute_layer_tool_call(
call: &McpToolCall,
config: &crate::config::Config,
layer_config: &crate::session::layers::LayerConfig,
cancellation_token: Option<tokio::sync::watch::Receiver<bool>>,
) -> Result<(McpToolResult, u64)> {
if layer_config.mcp.server_refs.is_empty() {
return Err(anyhow::anyhow!("Tool execution is disabled for this layer"));
}
let server_name = crate::mcp::tool_map::get_tool_server_name(&call.tool_name)
.unwrap_or_else(|| "unknown".to_string());
if !layer_config
.mcp
.is_tool_allowed(&call.tool_name, &server_name)
{
return Err(anyhow::anyhow!(
"Tool '{}' is not allowed for this layer",
call.tool_name
));
}
execute_tool_call(call, config, cancellation_token).await
}
pub async fn execute_tool_calls(
calls: &[McpToolCall],
config: &crate::config::Config,
) -> Vec<Result<(McpToolResult, u64)>> {
let mut results = Vec::new();
for call in calls {
let result = execute_tool_call(call, config, None).await;
results.push(result);
}
results
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_mcp_tool_result_is_error() {
let success_result = McpToolResult::success(
"test_tool".to_string(),
"test_id".to_string(),
"Success message".to_string(),
);
assert!(
!success_result.is_error(),
"Success result should not be an error"
);
let error_result = McpToolResult::error(
"test_tool".to_string(),
"test_id".to_string(),
"Error message".to_string(),
);
assert!(error_result.is_error(), "Error result should be an error");
let manual_result = McpToolResult {
tool_name: "test_tool".to_string(),
tool_id: "test_id".to_string(),
result: json!({
"content": [{"type": "text", "text": "No isError field"}]
}),
};
assert!(
!manual_result.is_error(),
"Result without isError field should default to false"
);
let explicit_false_result = McpToolResult {
tool_name: "test_tool".to_string(),
tool_id: "test_id".to_string(),
result: json!({
"content": [{"type": "text", "text": "Explicit false"}],
"isError": false
}),
};
assert!(
!explicit_false_result.is_error(),
"Result with isError: false should not be an error"
);
let explicit_true_result = McpToolResult {
tool_name: "test_tool".to_string(),
tool_id: "test_id".to_string(),
result: json!({
"content": [{"type": "text", "text": "Explicit true"}],
"isError": true
}),
};
assert!(
explicit_true_result.is_error(),
"Result with isError: true should be an error"
);
}
}