use super::process;
use super::{McpFunction, McpToolCall, McpToolResult};
use crate::config::{Config, McpConnectionType, McpServerConfig};
use crate::mcp::oauth::{self, token_store};
use anyhow::{anyhow, Result};
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_TYPE};
use reqwest::Client;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
lazy_static::lazy_static! {
static ref FUNCTION_CACHE: Arc<RwLock<HashMap<String, Vec<McpFunction>>>> =
Arc::new(RwLock::new(HashMap::new()));
static ref HTTP_SESSION_IDS: Arc<RwLock<HashMap<String, String>>> =
Arc::new(RwLock::new(HashMap::new()));
}
pub fn create_tools_list_request() -> Value {
create_tools_list_request_with_cursor(None)
}
pub fn create_tools_list_request_with_cursor(cursor: Option<&str>) -> Value {
let mut params = json!({});
if let Some(c) = cursor {
params["cursor"] = json!(c);
}
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": params
})
}
fn parse_sse_response(body: &str) -> Option<Value> {
for line in body.lines() {
if line.starts_with("data:") {
let json_data = line.trim_start_matches("data:").trim();
if let Ok(value) = serde_json::from_str(json_data) {
return Some(value);
}
}
}
None
}
fn is_sse_response(response: &reqwest::Response) -> bool {
response
.headers()
.get(CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(|ct| ct.contains("text/event-stream"))
.unwrap_or(false)
}
async fn parse_http_response_body(response: reqwest::Response) -> Result<Value> {
if is_sse_response(&response) {
let body = response.text().await?;
crate::log_debug!(
"SSE response body (first 500 chars): {}",
body.chars().take(500).collect::<String>()
);
if let Some(json_value) = parse_sse_response(&body) {
return Ok(json_value);
}
return Err(anyhow!(
"Failed to parse SSE response - no valid JSON data found"
));
}
let jsonrpc_response: Value = response.json().await?;
Ok(jsonrpc_response)
}
pub fn create_initialize_request() -> Value {
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {
"name": "octomind-health-check",
"version": env!("CARGO_PKG_VERSION")
}
}
})
}
fn create_session_initialize_request() -> Value {
let (role, spec, project, session_id, workdir) = process::get_session_context();
let session_obj = serde_json::json!({
"role": role,
"spec": spec,
"project": project,
"session_id": session_id,
"workdir": workdir,
});
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"clientInfo": {
"name": "octomind",
"version": env!("CARGO_PKG_VERSION")
},
"capabilities": {
"experimental": {
"session": session_obj
}
}
}
})
}
fn create_tools_call_request(tool_name: &str, parameters: &Value) -> Value {
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": parameters
}
})
}
pub fn add_session_id_header(headers: &mut HeaderMap, server_name: &str) {
if let Ok(ids) = HTTP_SESSION_IDS.read() {
if let Some(sid) = ids.get(server_name) {
if let Ok(val) = HeaderValue::from_str(sid) {
headers.insert("mcp-session-id", val);
}
}
}
}
pub fn clear_http_session_id(server_name: &str) {
if let Ok(mut ids) = HTTP_SESSION_IDS.write() {
ids.remove(server_name);
}
}
pub fn parse_tools_from_list_result(
list_result: &rmcp::model::ListToolsResult,
server: &McpServerConfig,
) -> Vec<McpFunction> {
let mut functions = Vec::new();
for tool in &list_result.tools {
let name = tool.name.as_ref();
if server.tools().is_empty()
|| crate::mcp::is_tool_allowed_by_patterns(name, server.tools())
{
let description = tool.description.as_deref().unwrap_or("").to_string();
let parameters = tool.schema_as_json_value();
functions.push(McpFunction {
name: name.to_string(),
description,
parameters,
});
}
}
functions
}
fn parse_tools_from_jsonrpc_response(
response: &Value,
server: &McpServerConfig,
) -> Result<(Vec<McpFunction>, Option<String>)> {
if let Some(error) = response.get("error") {
return Err(anyhow::anyhow!("JSON-RPC error from MCP server: {}", error));
}
if let Some(result_value) = response.get("result").cloned() {
let list_result = serde_json::from_value::<rmcp::model::ListToolsResult>(result_value)
.map_err(|e| anyhow::anyhow!("Failed to deserialize ListToolsResult: {}", e))?;
let next_cursor = list_result.next_cursor.clone();
let functions = parse_tools_from_list_result(&list_result, server);
Ok((functions, next_cursor))
} else {
Err(anyhow::anyhow!(
"Invalid JSON-RPC response: missing 'result' field"
))
}
}
pub async fn get_server_functions(server: &McpServerConfig) -> Result<Vec<McpFunction>> {
match server.connection_type() {
McpConnectionType::Http => {
let server_url = get_server_base_url(server).await?;
let client = Client::new();
let mut headers = HeaderMap::new();
headers.insert(
ACCEPT,
HeaderValue::from_static("application/json, text/event-stream"),
);
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
match oauth::discover_oauth_from_mcp_server(&server_url, server.name()).await {
Ok(discovered_oauth) => {
crate::log_debug!(
"MCP Authorization discovery succeeded for server '{}', attempting OAuth flow",
server.name()
);
match oauth::get_access_token(&discovered_oauth, server.name(), false).await {
Ok(Some(token)) => {
headers.insert(
AUTHORIZATION,
HeaderValue::from_str(&format!("Bearer {}", token))?,
);
crate::log_debug!(
"Using discovered OAuth access token for HTTP server '{}'",
server.name()
);
}
Ok(None) => {
crate::log_error!(
"OAuth authentication was cancelled for server '{}'",
server.name()
);
}
Err(e) => {
crate::log_error!(
"Failed to get OAuth access token for server '{}': {}",
server.name(),
e
);
}
}
}
Err(e) => {
crate::log_debug!(
"MCP Authorization discovery failed for server '{}': {}",
server.name(),
e
);
}
}
let schema_url = server_url;
let init_request = create_session_initialize_request();
let init_response = client
.post(&schema_url)
.headers(headers.clone())
.json(&init_request)
.send()
.await?;
if !init_response.status().is_success() {
return Err(anyhow::anyhow!(
"Failed to initialize MCP server: {}",
init_response.status()
));
}
if let Some(session_id) = init_response.headers().get("mcp-session-id") {
if let Ok(sid) = session_id.to_str() {
crate::log_debug!(
"HTTP server '{}' assigned session ID: {}",
server.name(),
sid
);
if let Ok(mut ids) = HTTP_SESSION_IDS.write() {
ids.insert(server.name().to_string(), sid.to_string());
}
}
}
let init_result = parse_http_response_body(init_response).await?;
if init_result.get("error").is_some() {
return Err(anyhow::anyhow!(
"Server returned error during initialization: {}",
init_result["error"]
));
}
if let Some(result_value) = init_result.get("result").cloned() {
match serde_json::from_value::<rmcp::model::InitializeResult>(result_value) {
Ok(init_info) => {
crate::log_debug!(
"HTTP server '{}': {} v{}, protocol {}",
server.name(),
init_info.server_info.name,
init_info.server_info.version,
init_info.protocol_version
);
if let Some(ref instructions) = init_info.instructions {
crate::log_debug!(
"Server '{}' instructions: {}",
server.name(),
instructions
);
}
process::store_server_capabilities(server.name(), init_info);
}
Err(e) => {
crate::log_debug!(
"Failed to parse InitializeResult for '{}': {}",
server.name(),
e
);
}
}
}
let initialized_notification = json!({
"jsonrpc": "2.0",
"method": "notifications/initialized"
});
let mut notif_headers = headers.clone();
add_session_id_header(&mut notif_headers, server.name());
let _ = client
.post(&schema_url)
.headers(notif_headers)
.json(&initialized_notification)
.send()
.await;
let mut all_functions = Vec::new();
let mut cursor: Option<String> = None;
const MAX_PAGES: usize = 20;
for page in 0..MAX_PAGES {
let jsonrpc_request = create_tools_list_request_with_cursor(cursor.as_deref());
crate::log_debug!(
"Making JSON-RPC tools/list request to HTTP server '{}' (page {}, cursor: {:?})",
server.name(),
page + 1,
cursor
);
let mut list_headers = headers.clone();
add_session_id_header(&mut list_headers, server.name());
let response = client
.post(&schema_url)
.headers(list_headers)
.json(&jsonrpc_request)
.send()
.await?;
if !response.status().is_success() {
return Err(anyhow::anyhow!(
"Failed to get schema from MCP server: {}",
response.status()
));
}
let jsonrpc_response = parse_http_response_body(response).await?;
crate::log_debug!(
"JSON-RPC response from server '{}': {}",
server.name(),
serde_json::to_string_pretty(&jsonrpc_response)
.unwrap_or_else(|_| jsonrpc_response.to_string())
);
let (functions, next_cursor) =
parse_tools_from_jsonrpc_response(&jsonrpc_response, server)?;
all_functions.extend(functions);
match next_cursor {
Some(c) if !c.is_empty() => cursor = Some(c),
_ => break,
}
}
Ok(all_functions)
}
McpConnectionType::Stdin => {
process::ensure_server_running(server).await?;
process::get_stdin_server_functions(server).await
}
McpConnectionType::Builtin => {
Err(anyhow::anyhow!(
"Built-in servers should not use get_server_functions"
))
}
}
}
pub async fn get_server_functions_cached(server: &McpServerConfig) -> Result<Vec<McpFunction>> {
let server_id = server.name();
{
let cache = FUNCTION_CACHE.read().unwrap();
if let Some(cached_functions) = cache.get(server_id) {
return Ok(cached_functions.clone());
}
}
let should_fetch = match server.connection_type() {
McpConnectionType::Http => server.url().is_some(), McpConnectionType::Stdin => is_server_running_for_cache_check(server),
McpConnectionType::Builtin => false, };
if should_fetch {
if crate::mcp::oauth::discovery::has_cached_discovery(server_id) {
match token_store::get_valid_token(server_id, 300).await {
Ok(None) => {
crate::log_debug!(
"Server '{}' requires OAuth but no token available - skipping cache fetch",
server_id
);
return Ok(Vec::new());
}
Err(e) => {
crate::log_debug!(
"Failed to check OAuth token for server '{}': {} - skipping cache fetch",
server_id,
e
);
return Ok(Vec::new());
}
Ok(Some(_)) => {
}
}
}
crate::log_debug!("Fetching function definitions from server '{}'", server_id);
match get_server_functions(server).await {
Ok(functions) => {
{
let mut cache = FUNCTION_CACHE.write().unwrap();
cache.insert(server_id.to_string(), functions.clone());
}
crate::log_debug!("Server '{}' returned {} tools", server_id, functions.len());
Ok(functions)
}
Err(e) => {
crate::log_error!(
"Failed to connect to HTTP server '{}': {}. Verify the server is running at the configured URL.",
server_id,
e
);
{
let mut cache = FUNCTION_CACHE.write().unwrap();
cache.insert(server_id.to_string(), Vec::new());
}
Ok(Vec::new())
}
}
} else {
crate::log_debug!(
"Server '{}' is not running - using fallback function definitions",
server_id
);
get_fallback_functions(server)
}
}
fn get_fallback_functions(server: &McpServerConfig) -> Result<Vec<McpFunction>> {
if !server.tools().is_empty() {
Ok(server
.tools()
.iter()
.map(|tool_name| McpFunction {
name: tool_name.clone(),
description: format!(
"External tool '{}' from server '{}' (server not running)",
tool_name,
server.name()
),
parameters: serde_json::json!({}),
})
.collect())
} else {
Ok(vec![])
}
}
fn is_server_running_for_cache_check(server: &McpServerConfig) -> bool {
let processes = process::SERVER_PROCESSES.read().unwrap();
if let Some(process_arc) = processes.get(server.name()) {
if let Ok(mut process) = process_arc.try_lock() {
match &mut *process {
process::ServerProcess::Http(child) => child
.try_wait()
.map(|status| status.is_none())
.unwrap_or(false),
process::ServerProcess::Stdin {
child, is_shutdown, ..
} => {
let process_alive = child
.try_wait()
.map(|status| status.is_none())
.unwrap_or(false);
let not_marked_shutdown =
!is_shutdown.load(std::sync::atomic::Ordering::SeqCst);
process_alive && not_marked_shutdown
}
}
} else {
true
}
} else {
false
}
}
pub fn clear_function_cache_for_server(server_name: &str) {
let mut cache = FUNCTION_CACHE.write().unwrap();
if cache.remove(server_name).is_some() {
crate::log_debug!(
"Cleared function cache for server '{}' due to restart",
server_name
);
}
}
pub fn clear_all_function_cache() {
let mut cache = FUNCTION_CACHE.write().unwrap();
let count = cache.len();
cache.clear();
if count > 0 {
crate::log_debug!("Cleared function cache for {} servers", count);
}
}
pub fn is_server_already_running_with_config(server: &crate::config::McpServerConfig) -> bool {
match server.connection_type() {
McpConnectionType::Builtin => {
{
let mut restart_info_guard = process::SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard
.entry(server.name().to_string())
.or_default();
info.health_status = process::ServerHealth::Running;
info.last_health_check = Some(std::time::SystemTime::now());
}
true
}
McpConnectionType::Http | McpConnectionType::Stdin => {
let is_process_running = {
let processes = process::SERVER_PROCESSES.read().unwrap();
if let Some(process_arc) = processes.get(server.name()) {
match process_arc.try_lock() {
Ok(mut process) => match &mut *process {
process::ServerProcess::Http(child) => child
.try_wait()
.map(|status| status.is_none())
.unwrap_or(false),
process::ServerProcess::Stdin {
child, is_shutdown, ..
} => {
let process_alive = child
.try_wait()
.map(|status| status.is_none())
.unwrap_or(false);
let not_marked_shutdown =
!is_shutdown.load(std::sync::atomic::Ordering::SeqCst);
process_alive && not_marked_shutdown
}
},
Err(_) => true,
}
} else {
false
}
};
let health_status = if is_process_running {
process::ServerHealth::Running
} else {
process::ServerHealth::Dead
};
{
let mut restart_info_guard = process::SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard
.entry(server.name().to_string())
.or_default();
info.health_status = health_status;
info.last_health_check = Some(std::time::SystemTime::now());
}
is_process_running
}
}
}
pub fn is_server_already_running(server_name: &str) -> bool {
if server_name == "core" || server_name == "filesystem" {
let mut restart_info_guard = process::SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard
.entry(server_name.to_string())
.or_default();
info.health_status = process::ServerHealth::Running;
info.last_health_check = Some(std::time::SystemTime::now());
return true;
}
let is_process_running = {
let processes = process::SERVER_PROCESSES.read().unwrap();
if let Some(process_arc) = processes.get(server_name) {
match process_arc.try_lock() {
Ok(mut process) => match &mut *process {
process::ServerProcess::Http(child) => child
.try_wait()
.map(|status| status.is_none())
.unwrap_or(false),
process::ServerProcess::Stdin {
child, is_shutdown, ..
} => {
let process_alive = child
.try_wait()
.map(|status| status.is_none())
.unwrap_or(false);
let not_marked_shutdown =
!is_shutdown.load(std::sync::atomic::Ordering::SeqCst);
process_alive && not_marked_shutdown
}
},
Err(_) => true,
}
} else {
false
}
};
let health_status = if is_process_running {
process::ServerHealth::Running
} else {
process::ServerHealth::Dead
};
{
let mut restart_info_guard = process::SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard
.entry(server_name.to_string())
.or_default();
info.health_status = health_status;
info.last_health_check = Some(std::time::SystemTime::now());
}
is_process_running
}
pub async fn execute_tool_call(
call: &McpToolCall,
server: &McpServerConfig,
cancellation_token: Option<tokio::sync::watch::Receiver<bool>>,
) -> Result<McpToolResult> {
if let Some(ref token) = cancellation_token {
if *token.borrow() {
return Err(anyhow::anyhow!("External tool execution cancelled"));
}
}
let server_health = process::get_server_health(server.name());
match server_health {
process::ServerHealth::Failed => {
return Err(anyhow::anyhow!(
"Server '{}' is in failed state. Cannot execute tool '{}'. Server will not be restarted automatically.",
server.name(),
call.tool_name
));
}
process::ServerHealth::Restarting => {
return Err(anyhow::anyhow!(
"Server '{}' is currently starting. Please try again in a moment.",
server.name()
));
}
process::ServerHealth::Dead => {
if server.connection_type() == McpConnectionType::Http {
crate::log_debug!(
"HTTP server '{}' health check failed, but allowing tool execution to proceed with fresh OAuth token",
server.name()
);
} else {
return Err(anyhow::anyhow!(
"Server '{}' is not running. Cannot execute tool '{}'. Server will not be restarted automatically.",
server.name(),
call.tool_name
));
}
}
process::ServerHealth::Unreachable => {
crate::log_debug!(
"Server '{}' marked as unreachable (likely auth issue in health check), but allowing tool execution to proceed with fresh OAuth token",
server.name()
);
}
process::ServerHealth::Running => {
}
}
execute_tool_with_cancellation(call, server, cancellation_token).await
}
async fn execute_tool_with_cancellation(
call: &McpToolCall,
server: &McpServerConfig,
cancellation_token: Option<tokio::sync::watch::Receiver<bool>>,
) -> Result<McpToolResult> {
if let Some(ref token) = cancellation_token {
if *token.borrow() {
return Err(anyhow::anyhow!("External tool execution cancelled"));
}
}
let tool_name = &call.tool_name;
let parameters = &call.parameters;
match server.connection_type() {
McpConnectionType::Http => {
if let Some(ref token) = cancellation_token {
if *token.borrow() {
return Err(anyhow::anyhow!("External tool execution cancelled"));
}
}
let server_url = get_server_base_url(server).await?;
let client = Client::builder()
.timeout(std::time::Duration::from_secs(server.timeout_seconds()))
.build()
.unwrap_or_else(|_| Client::new());
let mut headers = HeaderMap::new();
headers.insert(
ACCEPT,
HeaderValue::from_static("application/json, text/event-stream"),
);
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
match oauth::discover_oauth_from_mcp_server(&server_url, server.name()).await {
Ok(discovered_oauth) => {
crate::log_debug!(
"MCP Authorization discovery succeeded for server '{}' tool execution",
server.name()
);
match oauth::get_access_token(&discovered_oauth, server.name(), false).await {
Ok(Some(token)) => {
headers.insert(
AUTHORIZATION,
HeaderValue::from_str(&format!("Bearer {}", token))?,
);
crate::log_debug!(
"Using discovered OAuth access token for HTTP server '{}' tool execution",
server.name()
);
}
Ok(None) => {
crate::log_error!(
"OAuth authentication was cancelled for server '{}'",
server.name()
);
}
Err(e) => {
crate::log_error!(
"Failed to get OAuth access token for server '{}': {}",
server.name(),
e
);
}
}
}
Err(e) => {
crate::log_debug!(
"MCP Authorization discovery failed for server '{}' tool execution: {}",
server.name(),
e
);
}
}
let execute_url = server_url;
let request_body = create_tools_call_request(tool_name, parameters);
if let Some(ref token) = cancellation_token {
if *token.borrow() {
return Err(anyhow::anyhow!("External tool execution cancelled"));
}
}
add_session_id_header(&mut headers, server.name());
let response = client
.post(&execute_url)
.headers(headers)
.json(&request_body)
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await?;
return Err(anyhow::anyhow!(
"Failed to execute tool on MCP server: {}, {}",
status,
error_text
));
}
let result = parse_http_response_body(response).await?;
let tool_result = if result.get("error").is_some() {
let error_message = result
.get("error")
.and_then(|e| e.get("message"))
.and_then(|m| m.as_str())
.unwrap_or("External MCP server error");
McpToolResult::error(
tool_name.clone(),
call.tool_id.clone(),
error_message.to_string(),
)
} else {
let output = result.get("result").cloned().unwrap_or_default();
let call_tool_result = serde_json::from_value::<rmcp::model::CallToolResult>(
output,
)
.unwrap_or_else(|_| {
rmcp::model::CallToolResult::success(vec![rmcp::model::Content::text(
"No result",
)])
});
McpToolResult {
tool_name: tool_name.clone(),
tool_id: call.tool_id.clone(),
result: call_tool_result,
}
};
Ok(tool_result)
}
McpConnectionType::Stdin => {
process::execute_stdin_tool_call(call, server, cancellation_token).await
}
McpConnectionType::Builtin => {
Err(anyhow::anyhow!(
"Built-in servers should not use execute_tool_call"
))
}
}
}
async fn get_server_base_url(server: &McpServerConfig) -> Result<String> {
match server.connection_type() {
McpConnectionType::Http => {
if let Some(url) = server.url() {
crate::log_debug!(
"Using remote HTTP server '{}' at URL: {}",
server.name(),
url
);
let base_url = url.trim_end_matches("/").to_string();
crate::log_debug!(
"Processed base URL for server '{}': {}",
server.name(),
base_url
);
Ok(base_url)
} else if server.command().is_some() {
crate::log_debug!(
"Starting local HTTP server '{}' with command: {:?}",
server.name(),
server.command()
);
process::ensure_server_running(server).await
} else {
Err(anyhow::anyhow!("Invalid server configuration: neither URL nor command specified for server '{}'", server.name()))
}
}
McpConnectionType::Stdin => {
if server.command().is_some() {
process::ensure_server_running(server).await
} else {
Err(anyhow::anyhow!("Invalid server configuration: command not specified for stdin-based server '{}'", server.name()))
}
}
McpConnectionType::Builtin => {
Err(anyhow::anyhow!("Built-in servers don't have URLs"))
}
}
}
pub async fn get_all_server_functions(
config: &Config,
) -> Result<HashMap<String, (McpFunction, McpServerConfig)>> {
let mut functions = HashMap::new();
if config.mcp.servers.is_empty() {
return Ok(functions);
}
let servers: Vec<crate::config::McpServerConfig> = config.mcp.servers.to_vec();
for server in &servers {
let server_functions = get_server_functions(server).await?;
for func in server_functions {
functions.insert(func.name.clone(), (func, server.clone()));
}
}
Ok(functions)
}
pub fn cleanup_servers() -> Result<()> {
crate::mcp::health_monitor::stop_health_monitor();
process::stop_all_servers()
}
pub fn get_server_health_status(server_name: &str) -> process::ServerHealth {
process::get_server_health(server_name)
}
pub fn get_server_restart_info(server_name: &str) -> process::ServerRestartInfo {
process::get_server_restart_info(server_name)
}
pub fn reset_server_failure_state(server_name: &str) -> Result<()> {
process::reset_server_failure_state(server_name)
}
pub async fn perform_health_check_all_servers(
) -> std::collections::HashMap<String, process::ServerHealth> {
process::perform_health_check_all_servers().await
}
pub fn get_server_status_report(
) -> std::collections::HashMap<String, (process::ServerHealth, process::ServerRestartInfo)> {
process::get_server_status_report()
}