use super::process::{self, is_server_running, ServerHealth};
use crate::config::{Config, McpConnectionType, McpServerConfig};
use serde_json::Value;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;
enum HttpHealthResult {
Healthy, Unreachable, Dead, }
static HEALTH_MONITOR_RUNNING: AtomicBool = AtomicBool::new(false);
const HEALTH_CHECK_INTERVAL_SECONDS: u64 = 120;
fn parse_sse_response(body: &str) -> Option<Value> {
for line in body.lines() {
if line.starts_with("data:") {
let json_data = line.trim_start_matches("data:").trim();
if let Ok(value) = serde_json::from_str(json_data) {
return Some(value);
}
}
}
None
}
fn is_sse_response(response: &reqwest::Response) -> bool {
response
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(|ct| ct.contains("text/event-stream"))
.unwrap_or(false)
}
async fn parse_http_response_body(response: reqwest::Response) -> Result<Value, anyhow::Error> {
if is_sse_response(&response) {
let body = response.text().await?;
crate::log_debug!(
"SSE response body (first 500 chars): {}",
body.chars().take(500).collect::<String>()
);
if let Some(json_value) = parse_sse_response(&body) {
return Ok(json_value);
}
return Err(anyhow::anyhow!(
"Failed to parse SSE response - no valid JSON data found"
));
}
let jsonrpc_response: Value = response.json().await?;
Ok(jsonrpc_response)
}
pub async fn start_health_monitor(config: Arc<Config>) -> Result<(), anyhow::Error> {
if HEALTH_MONITOR_RUNNING
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
crate::log_debug!("Health monitor is already running");
return Ok(());
}
crate::log_debug!(
"Starting MCP server health monitor (checking every {}s)",
HEALTH_CHECK_INTERVAL_SECONDS
);
let external_servers: Vec<McpServerConfig> = config
.mcp
.servers
.iter()
.filter(|server| {
matches!(
server.connection_type(),
McpConnectionType::Http | McpConnectionType::Stdin
)
})
.cloned()
.collect();
if external_servers.is_empty() {
crate::log_debug!("No external servers to monitor, health monitor stopping");
HEALTH_MONITOR_RUNNING.store(false, Ordering::SeqCst);
return Ok(());
}
crate::log_debug!(
"Health monitor will track {} external servers: {}",
external_servers.len(),
external_servers
.iter()
.map(|s| {
let server_type = match s.connection_type() {
McpConnectionType::Stdin => "stdio",
McpConnectionType::Http => "http",
McpConnectionType::Builtin => "builtin",
};
format!("{}({})", s.name(), server_type)
})
.collect::<Vec<_>>()
.join(", ")
);
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;
let mut check_interval = interval(Duration::from_secs(HEALTH_CHECK_INTERVAL_SECONDS));
loop {
check_interval.tick().await;
if !HEALTH_MONITOR_RUNNING.load(Ordering::SeqCst) {
crate::log_debug!("Health monitor stopping");
break;
}
for server in &external_servers {
if let Err(e) = check_server_health_and_restart_if_dead(server).await {
crate::log_error!(
"Health check failed for server '{}': {}. Verify the server is running at the configured URL.",
server.name(),
e
);
}
}
}
crate::log_debug!("Health monitor task completed");
});
Ok(())
}
pub fn stop_health_monitor() {
if HEALTH_MONITOR_RUNNING
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
crate::log_debug!("Stopping health monitor");
}
}
async fn check_server_health_and_restart_if_dead(
server: &McpServerConfig,
) -> Result<(), anyhow::Error> {
let health_status = match server.connection_type() {
McpConnectionType::Stdin => {
if is_server_running(server.name()) {
ServerHealth::Running
} else {
ServerHealth::Dead
}
}
McpConnectionType::Http => {
match perform_http_health_check(server).await {
Ok(HttpHealthResult::Healthy) => ServerHealth::Running,
Ok(HttpHealthResult::Unreachable) => ServerHealth::Unreachable,
Ok(HttpHealthResult::Dead) | Err(_) => ServerHealth::Dead,
}
}
McpConnectionType::Builtin => {
ServerHealth::Running
}
};
let restart_info = process::get_server_restart_info(server.name());
crate::log_debug!(
"Health check: server '{}' status = {:?}, restart_count = {}",
server.name(),
health_status,
restart_info.restart_count
);
{
let mut restart_info_guard = process::SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard
.entry(server.name().to_string())
.or_default();
info.health_status = health_status;
info.last_health_check = Some(std::time::SystemTime::now());
}
match health_status {
ServerHealth::Dead => {
crate::log_debug!(
"Health monitor detected dead server '{}' - attempting restart",
server.name()
);
if restart_info.restart_count >= 3 {
crate::log_debug!(
"Server '{}' has exceeded max restart attempts ({}), marking as failed",
server.name(),
restart_info.restart_count
);
let mut restart_info_guard = process::SERVER_RESTART_INFO.write().unwrap();
if let Some(info) = restart_info_guard.get_mut(server.name()) {
info.health_status = ServerHealth::Failed;
}
return Ok(());
}
if let Some(last_restart) = restart_info.last_restart_time {
let time_since_restart = std::time::SystemTime::now()
.duration_since(last_restart)
.unwrap_or(std::time::Duration::from_secs(0));
if time_since_restart < Duration::from_secs(30) {
crate::log_debug!(
"Server '{}' is in cooldown period, skipping restart attempt",
server.name()
);
return Ok(());
}
}
match restart_dead_server(server).await {
Ok(()) => {
crate::log_info!(
"Health monitor successfully restarted dead server '{}'",
server.name()
);
}
Err(e) => {
crate::log_debug!(
"Health monitor failed to restart dead server '{}': {}",
server.name(),
e
);
}
}
}
ServerHealth::Unreachable => {
crate::log_debug!(
"Health monitor: server '{}' is unreachable - check configuration/authentication",
server.name()
);
}
ServerHealth::Failed => {
if let Some(last_restart) = restart_info.last_restart_time {
let time_since_last_restart = std::time::SystemTime::now()
.duration_since(last_restart)
.unwrap_or(std::time::Duration::from_secs(0));
if time_since_last_restart > Duration::from_secs(300) {
crate::log_debug!(
"Resetting failed state for server '{}' after cooldown period",
server.name()
);
if let Err(e) = process::reset_server_failure_state(server.name()) {
crate::log_debug!(
"Failed to reset failure state for server '{}': {}",
server.name(),
e
);
}
}
}
}
ServerHealth::Running => {
if !verify_server_responsiveness(server).await {
crate::log_debug!(
"Health monitor: server '{}' process is running but not responsive (this is normal for failed requests)",
server.name()
);
}
}
ServerHealth::Restarting => {
crate::log_debug!(
"Health monitor: server '{}' is currently restarting",
server.name()
);
}
}
Ok(())
}
async fn restart_dead_server(server: &McpServerConfig) -> Result<(), anyhow::Error> {
let can_restart = match server.connection_type() {
McpConnectionType::Stdin => true, McpConnectionType::Http => server.command().is_some(), McpConnectionType::Builtin => false, };
if !can_restart {
crate::log_debug!(
"Server '{}' is a remote server and cannot be restarted by health monitor",
server.name()
);
return Ok(()); }
crate::log_debug!(
"Health monitor attempting to restart dead server '{}'",
server.name()
);
match process::ensure_server_running(server).await {
Ok(_) => {
crate::log_info!(
"Health monitor successfully restarted dead server '{}'",
server.name()
);
Ok(())
}
Err(e) => {
crate::log_debug!(
"Health monitor failed to restart dead server '{}': {}",
server.name(),
e
);
Err(e)
}
}
}
async fn verify_server_responsiveness(server: &McpServerConfig) -> bool {
match server.connection_type() {
McpConnectionType::Stdin => {
process::is_server_running(server.name())
}
McpConnectionType::Http => {
process::is_server_running(server.name())
}
McpConnectionType::Builtin => {
true
}
}
}
pub fn is_health_monitor_running() -> bool {
HEALTH_MONITOR_RUNNING.load(Ordering::SeqCst)
}
pub async fn force_health_check(config: &Config) -> Result<(), anyhow::Error> {
crate::log_debug!("Forcing health check on all external servers");
let external_servers: Vec<McpServerConfig> = config
.mcp
.servers
.iter()
.filter(|server| {
matches!(
server.connection_type(),
McpConnectionType::Http | McpConnectionType::Stdin
)
})
.cloned()
.collect();
for server in &external_servers {
if let Err(e) = check_server_health_and_restart_if_dead(server).await {
crate::log_debug!(
"Force health check error for server '{}': {}",
server.name(),
e
);
}
}
Ok(())
}
async fn perform_http_health_check(
server: &McpServerConfig,
) -> Result<HttpHealthResult, anyhow::Error> {
if let Some(url) = server.url() {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5)) .build()?;
let health_url = url.trim_end_matches("/");
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::ACCEPT,
reqwest::header::HeaderValue::from_static("application/json, text/event-stream"),
);
headers.insert(
reqwest::header::CONTENT_TYPE,
reqwest::header::HeaderValue::from_static("application/json"),
);
match crate::mcp::oauth::discover_oauth_from_mcp_server(health_url, server.name()).await {
Ok(discovered_oauth) => {
crate::log_debug!(
"HEALTH_CHECK: MCP Authorization discovery succeeded for server '{}', attempting OAuth",
server.name()
);
match crate::mcp::oauth::get_access_token(&discovered_oauth, server.name(), false)
.await
{
Ok(Some(token)) => {
headers.insert(
reqwest::header::AUTHORIZATION,
reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token))?,
);
crate::log_debug!(
"HEALTH_CHECK: Using discovered OAuth access token for server '{}', token_prefix='{}...'",
server.name(),
token.chars().take(10).collect::<String>()
);
}
Ok(None) => {
crate::log_debug!(
"HEALTH_CHECK: OAuth authentication was cancelled for server '{}'",
server.name()
);
}
Err(e) => {
crate::log_debug!(
"HEALTH_CHECK: Failed to get OAuth access token for server '{}': {}",
server.name(),
e
);
}
}
}
Err(e) => {
crate::log_debug!(
"HEALTH_CHECK: MCP Authorization discovery failed for server '{}': {}",
server.name(),
e
);
}
}
let jsonrpc_request = crate::mcp::server::create_tools_list_request();
crate::mcp::server::add_session_id_header(&mut headers, server.name());
match client
.post(health_url)
.headers(headers)
.json(&jsonrpc_request)
.send()
.await
{
Ok(response) => {
let status = response.status();
if status.is_success() {
match parse_http_response_body(response).await {
Ok(json_response) => {
if json_response.get("result").is_some() {
crate::log_debug!(
"HTTP health check for '{}': ✅ Healthy (status: {}, valid JSON-RPC response)",
server.name(),
status
);
Ok(HttpHealthResult::Healthy)
} else if json_response.get("error").is_some() {
crate::log_debug!(
"HTTP health check for '{}': ⚠️ Server returned JSON-RPC error",
server.name()
);
Ok(HttpHealthResult::Healthy) } else {
crate::log_error!(
"HTTP health check for '{}': ❌ Invalid JSON-RPC response",
server.name()
);
Ok(HttpHealthResult::Dead)
}
}
Err(e) => {
crate::log_error!(
"HTTP health check for '{}': ❌ Failed to parse response body: {}",
server.name(),
e
);
Ok(HttpHealthResult::Dead)
}
}
} else if status == 401 || status == 403 {
crate::log_error!(
"HTTP health check for '{}': 🔒 Authentication failed (status: {}) - check your credentials",
server.name(),
status
);
Ok(HttpHealthResult::Unreachable)
} else if status.is_server_error() {
crate::log_error!(
"HTTP health check for '{}': ⚠️ Server error (status: {})",
server.name(),
status
);
Ok(HttpHealthResult::Dead)
} else {
crate::log_error!(
"HTTP health check for '{}': ❌ Unhealthy (status: {})",
server.name(),
status
);
Ok(HttpHealthResult::Dead)
}
}
Err(e) => {
crate::log_error!(
"HTTP health check for '{}': ❌ Connection failed - {}",
server.name(),
e
);
Ok(HttpHealthResult::Dead)
}
}
} else {
Err(anyhow::anyhow!("No URL configured for HTTP server"))
}
}