use super::{McpFunction, McpToolCall, McpToolResult};
use crate::config::{McpConnectionType, McpServerConfig};
use anyhow::Result;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::process::{Child, Command, Stdio};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime};
use tokio::time::sleep;
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ServerHealth {
Running, Dead, Restarting, Failed, Unreachable, }
#[derive(Debug, Clone)]
pub struct ServerRestartInfo {
pub restart_count: u32,
pub last_restart_time: Option<SystemTime>,
pub health_status: ServerHealth,
pub consecutive_failures: u32,
pub last_health_check: Option<SystemTime>,
}
impl Default for ServerRestartInfo {
fn default() -> Self {
Self {
restart_count: 0,
last_restart_time: None,
health_status: ServerHealth::Running,
consecutive_failures: 0,
last_health_check: None,
}
}
}
lazy_static::lazy_static! {
pub static ref SERVER_RESTART_INFO: Arc<RwLock<HashMap<String, ServerRestartInfo>>> =
Arc::new(RwLock::new(HashMap::new()));
static ref SERVER_RESTART_MUTEXES: Arc<RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>> =
Arc::new(RwLock::new(HashMap::new()));
}
type InFlightHandle =
Arc<std::sync::Mutex<Option<tokio::task::JoinHandle<anyhow::Result<serde_json::Value>>>>>;
lazy_static::lazy_static! {
pub static ref SERVER_PROCESSES: Arc<RwLock<HashMap<String, Arc<Mutex<ServerProcess>>>>> =
Arc::new(RwLock::new(HashMap::new()));
static ref SERVER_IN_FLIGHT: Arc<RwLock<HashMap<String, InFlightHandle>>> =
Arc::new(RwLock::new(HashMap::new()));
}
lazy_static::lazy_static! {
static ref CLI_NOTIFICATION_SENDER: RwLock<Option<tokio::sync::mpsc::UnboundedSender<crate::websocket::ServerMessage>>> =
RwLock::new(None);
static ref CLI_PENDING_NOTIFICATIONS: RwLock<Vec<crate::websocket::ServerMessage>> =
RwLock::new(Vec::new());
}
lazy_static::lazy_static! {
static ref CLI_SESSION_CONTEXT: RwLock<(String, String)> = RwLock::new((String::new(), String::new()));
}
pub fn derive_project_id() -> String {
use sha2::{Digest, Sha256};
let source = std::process::Command::new("git")
.args(["remote", "get-url", "origin"])
.output()
.ok()
.filter(|o| o.status.success())
.and_then(|o| String::from_utf8(o.stdout).ok())
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| {
std::env::current_dir()
.unwrap_or_default()
.to_string_lossy()
.into_owned()
});
let hash = Sha256::digest(source.as_bytes());
format!("{:x}", hash)[..16].to_string()
}
pub fn derive_project_id_from_path(path: &std::path::Path) -> String {
use sha2::{Digest, Sha256};
let source = std::process::Command::new("git")
.args(["remote", "get-url", "origin"])
.current_dir(path)
.output()
.ok()
.filter(|o| o.status.success())
.and_then(|o| String::from_utf8(o.stdout).ok())
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| path.to_string_lossy().into_owned());
let hash = Sha256::digest(source.as_bytes());
format!("{:x}", hash)[..16].to_string()
}
pub fn set_session_context(role: &str, project: &str) {
if let Some(_session_id) = crate::session::context::current_session_id() {
}
*CLI_SESSION_CONTEXT.write().unwrap() = (role.to_string(), project.to_string());
}
pub fn get_session_context() -> (String, String) {
if let Some(session_id) = crate::session::context::current_session_id() {
if let Some(role) = crate::session::context::get_session_role(&session_id) {
let project = crate::session::context::get_session_workdir_anchor(&session_id)
.map(|p| crate::mcp::process::derive_project_id_from_path(&p))
.unwrap_or_default();
return (role, project);
}
}
CLI_SESSION_CONTEXT.read().unwrap().clone()
}
pub fn init_session_context(role: &str) {
let project = derive_project_id();
set_session_context(role, &project);
}
pub fn set_notification_sender(
session_id: Option<String>,
tx: tokio::sync::mpsc::UnboundedSender<crate::websocket::ServerMessage>,
) {
match session_id {
Some(sid) => {
crate::session::context::register_notification_sender(sid, tx);
}
None => {
let pending = {
let mut guard = CLI_PENDING_NOTIFICATIONS.write().unwrap();
std::mem::take(&mut *guard)
};
for msg in pending {
let _ = tx.send(msg);
}
let mut guard = CLI_NOTIFICATION_SENDER.write().unwrap();
*guard = Some(tx);
}
}
}
pub fn clear_notification_sender(session_id: Option<String>) {
match session_id {
Some(sid) => {
crate::session::context::unregister_notification_sender(sid);
}
None => {
let mut guard = CLI_NOTIFICATION_SENDER.write().unwrap();
*guard = None;
}
}
}
pub fn send_notification_message(msg: crate::websocket::ServerMessage) {
if let Some(session_id) = crate::session::context::current_session_id() {
if let Some(sender) = crate::session::context::get_notification_sender_by_id(&session_id) {
let _ = sender.send(msg);
return;
}
}
let sender = CLI_NOTIFICATION_SENDER.read().unwrap();
if let Some(tx) = sender.as_ref() {
let _ = tx.send(msg);
}
}
fn emit_notification(server_name: &str, method: &str, params: &serde_json::Value) {
let msg = crate::websocket::ServerMessage::McpNotification(
crate::websocket::McpNotificationPayload {
server: server_name.to_string(),
method: method.to_string(),
params: params.clone(),
},
);
if let Some(session_id) = crate::session::context::current_session_id() {
if let Some(sender) = crate::session::context::get_notification_sender_by_id(&session_id) {
let _ = sender.send(msg);
return;
}
}
let sender = CLI_NOTIFICATION_SENDER.read().unwrap();
if let Some(tx) = sender.as_ref() {
let _ = tx.send(msg);
} else {
drop(sender); CLI_PENDING_NOTIFICATIONS.write().unwrap().push(msg);
}
}
pub enum ServerProcess {
Http(Child),
Stdin {
child: Child,
reader: BufReader<std::process::ChildStdout>,
writer: BufWriter<std::process::ChildStdin>,
next_id: Arc<AtomicU64>, is_shutdown: Arc<AtomicBool>, },
}
impl ServerProcess {
pub fn kill(&mut self) -> Result<()> {
match self {
ServerProcess::Http(child) => {
child
.kill()
.map_err(|e| anyhow::anyhow!("Failed to kill HTTP process: {}", e))?;
let start = std::time::Instant::now();
let timeout = std::time::Duration::from_secs(5);
while start.elapsed() < timeout {
match child.try_wait() {
Ok(Some(_)) => return Ok(()), Ok(None) => std::thread::sleep(std::time::Duration::from_millis(100)),
Err(e) => {
return Err(anyhow::anyhow!("Error waiting for HTTP process: {}", e))
}
}
}
crate::log_debug!("HTTP process did not terminate within timeout, may be zombie");
Ok(())
}
ServerProcess::Stdin {
child,
is_shutdown,
writer,
..
} => {
is_shutdown.store(true, Ordering::SeqCst);
if let Err(e) = writer.flush() {
crate::log_debug!("Failed to flush stdin before shutdown: {}", e);
}
std::thread::sleep(std::time::Duration::from_millis(100));
match child.try_wait() {
Ok(Some(_)) => {
crate::log_debug!("Process terminated gracefully after stdin close");
return Ok(());
}
Ok(None) => {
crate::log_debug!(
"Process didn't terminate after stdin close, sending SIGTERM"
);
}
Err(e) => {
crate::log_debug!("Error checking process status: {}", e);
}
}
#[cfg(unix)]
{
let pid = child.id();
let pgid = pid as libc::pid_t;
unsafe {
libc::kill(-pgid, libc::SIGTERM);
}
crate::log_debug!(
"Sent SIGTERM to process group {} for graceful shutdown",
pgid
);
std::thread::sleep(std::time::Duration::from_millis(200));
match child.try_wait() {
Ok(Some(_)) => {
crate::log_debug!("Process terminated after SIGTERM");
return Ok(());
}
_ => {
crate::log_debug!("Process still alive after SIGTERM, sending SIGKILL");
}
}
unsafe {
libc::kill(-pgid, libc::SIGKILL);
}
}
#[cfg(not(unix))]
{
let _ = child.kill();
}
let start = std::time::Instant::now();
let timeout = std::time::Duration::from_secs(5);
while start.elapsed() < timeout {
match child.try_wait() {
Ok(Some(_)) => return Ok(()), Ok(None) => std::thread::sleep(std::time::Duration::from_millis(100)),
Err(e) => {
return Err(anyhow::anyhow!("Error waiting for stdin process: {}", e))
}
}
}
crate::log_debug!("Stdin process did not terminate within timeout, may be zombie");
Ok(())
}
}
}
pub fn try_wait(&mut self) -> Result<Option<std::process::ExitStatus>> {
match self {
ServerProcess::Http(child) => child
.try_wait()
.map_err(|e| anyhow::anyhow!("Failed to check HTTP process: {}", e)),
ServerProcess::Stdin { child, .. } => child
.try_wait()
.map_err(|e| anyhow::anyhow!("Failed to check stdin process: {}", e)),
}
}
}
fn get_server_restart_mutex(server_id: &str) -> Arc<tokio::sync::Mutex<()>> {
let mutexes = SERVER_RESTART_MUTEXES.read().unwrap();
if let Some(mutex) = mutexes.get(server_id) {
return mutex.clone();
}
drop(mutexes);
let mut mutexes = SERVER_RESTART_MUTEXES.write().unwrap();
if let Some(mutex) = mutexes.get(server_id) {
return mutex.clone();
}
let new_mutex = Arc::new(tokio::sync::Mutex::new(()));
mutexes.insert(server_id.to_string(), new_mutex.clone());
new_mutex
}
fn cleanup_server_restart_mutex(server_id: &str) {
let mut mutexes = SERVER_RESTART_MUTEXES.write().unwrap();
mutexes.remove(server_id);
}
pub async fn ensure_server_running(server: &McpServerConfig) -> Result<String> {
let server_id = server.name();
let restart_mutex = get_server_restart_mutex(server_id);
let _guard = restart_mutex.lock().await;
crate::log_debug!("Checking server '{}' status for potential start", server_id);
let result = start_server_once_if_needed(server).await;
crate::log_debug!("Completed server '{}' check", server_id);
result
}
async fn start_server_once_if_needed(server: &McpServerConfig) -> Result<String> {
let server_id = server.name();
{
let processes = SERVER_PROCESSES.read().unwrap();
if let Some(process_arc) = processes.get(server_id) {
let mut process = process_arc.lock().unwrap();
let is_alive = match &mut *process {
ServerProcess::Http(child) => child
.try_wait()
.map(|status| status.is_none())
.unwrap_or(false),
ServerProcess::Stdin {
child, is_shutdown, ..
} => {
let process_alive = child
.try_wait()
.map(|status| status.is_none())
.unwrap_or(false);
let not_marked_shutdown = !is_shutdown.load(Ordering::SeqCst);
process_alive && not_marked_shutdown
}
};
if is_alive {
{
let mut restart_info_guard = SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard.entry(server_id.to_string()).or_default();
info.health_status = ServerHealth::Running;
info.last_health_check = Some(SystemTime::now());
}
crate::log_debug!("Server '{}' is already running and healthy", server_id);
match server.connection_type() {
McpConnectionType::Http => return get_server_url(server),
McpConnectionType::Stdin => return Ok("stdin://".to_string() + server_id),
McpConnectionType::Builtin => {
unreachable!("Builtin servers should not use this function")
}
}
} else {
crate::log_info!(
"Server '{}' process is dead - cleaning up before restart",
server_id
);
if let Err(e) = process.kill() {
crate::log_debug!("Failed to kill dead server process '{}': {}", server_id, e);
}
{
let mut restart_info_guard = SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard.entry(server_id.to_string()).or_default();
info.health_status = ServerHealth::Dead;
}
}
} else {
crate::log_debug!(
"Server '{}' not found in registry - needs initial start",
server_id
);
}
}
{
let mut processes = SERVER_PROCESSES.write().unwrap();
processes.remove(server_id);
}
{
let mut in_flight_map = SERVER_IN_FLIGHT.write().unwrap();
in_flight_map.remove(server_id);
}
crate::log_info!("Starting MCP server: {}", server_id);
match start_server_process(server).await {
Ok(url) => {
{
let mut restart_info_guard = SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard.entry(server_id.to_string()).or_default();
info.health_status = ServerHealth::Running;
info.restart_count += 1; info.last_restart_time = Some(SystemTime::now());
info.last_health_check = Some(SystemTime::now());
info.consecutive_failures = 0;
}
crate::log_info!("Successfully started server '{}'", server_id);
Ok(url)
}
Err(e) => {
{
let mut restart_info_guard = SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard.entry(server_id.to_string()).or_default();
info.health_status = ServerHealth::Failed;
info.consecutive_failures += 1;
}
crate::log_error!("Failed to start server '{}': {}", server_id, e);
Err(anyhow::anyhow!(
"Failed to start server '{}': {}",
server_id,
e
))
}
}
}
async fn start_server_process(server: &McpServerConfig) -> Result<String> {
let (command, args) = match server {
McpServerConfig::Stdin { command, args, .. } => (command.as_str(), args.as_slice()),
McpServerConfig::Http { url, .. } => {
return Err(anyhow::anyhow!(
"HTTP server '{}' should not be started as a process (URL: {}) - use Stdin type for local processes",
server.name(),
url
));
}
McpServerConfig::Builtin { .. } => {
return Err(anyhow::anyhow!(
"Builtin server '{}' should not be started as external process",
server.name()
));
}
};
let mut cmd = Command::new(command);
if !args.is_empty() {
cmd.args(args);
}
#[cfg(unix)]
{
use std::os::unix::process::CommandExt;
cmd.process_group(0); }
#[cfg(windows)]
{
use std::os::windows::process::CommandExt;
cmd.creation_flags(0x00000200); }
match server.connection_type() {
McpConnectionType::Http => {
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
crate::log_debug!(
"🚀 Starting MCP server (HTTP mode, signal-isolated): {}",
server.name()
);
let child = cmd.spawn().map_err(|e| {
anyhow::anyhow!("Failed to start MCP server '{}': {}", server.name(), e)
})?;
{
let mut processes = SERVER_PROCESSES.write().unwrap();
processes.insert(
server.name().to_string(),
Arc::new(Mutex::new(ServerProcess::Http(child))),
);
}
crate::mcp::server::clear_function_cache_for_server(server.name());
let start_time = Instant::now();
let max_wait = Duration::from_secs(10);
let server_url = get_server_url(server)?;
loop {
if start_time.elapsed() > max_wait {
return Err(anyhow::anyhow!(
"Timed out waiting for MCP server to start: {}",
server.name()
));
}
if can_connect(&server_url).await {
crate::log_debug!("✅ MCP server started: {} at {}", server.name(), server_url);
return Ok(server_url);
}
sleep(Duration::from_millis(500)).await;
}
}
McpConnectionType::Stdin => {
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
crate::log_debug!(
"🚀 Starting MCP server (stdin mode, signal-isolated): {}",
server.name()
);
let mut child = cmd.spawn().map_err(|e| {
anyhow::anyhow!("Failed to start MCP server '{}': {}", server.name(), e)
})?;
let child_stdin = child.stdin.take().ok_or_else(|| {
anyhow::anyhow!("Failed to open stdin for MCP server: {}", server.name())
})?;
let child_stdout = child.stdout.take().ok_or_else(|| {
anyhow::anyhow!("Failed to open stdout for MCP server: {}", server.name())
})?;
let writer = BufWriter::new(child_stdin);
let reader = BufReader::new(child_stdout);
let server_process = ServerProcess::Stdin {
child,
reader,
writer,
next_id: Arc::new(AtomicU64::new(1)),
is_shutdown: Arc::new(AtomicBool::new(false)),
};
{
let mut in_flight_map = SERVER_IN_FLIGHT.write().unwrap();
in_flight_map.insert(
server.name().to_string(),
Arc::new(std::sync::Mutex::new(None)),
);
}
{
let mut processes = SERVER_PROCESSES.write().unwrap();
processes.insert(
server.name().to_string(),
Arc::new(Mutex::new(server_process)),
);
}
crate::mcp::server::clear_function_cache_for_server(server.name());
let _process_arc = {
let processes = SERVER_PROCESSES.read().unwrap();
processes.get(server.name()).cloned().ok_or_else(|| {
anyhow::anyhow!("Server not found right after creation: {}", server.name())
})?
};
let init_result = initialize_stdin_server(server.name()).await;
if let Err(e) = &init_result {
crate::log_error!(
"Failed to initialize stdin MCP server '{}': {}",
server.name(),
e
);
if let Err(cleanup_err) = cleanup_server_process(server.name()) {
crate::log_debug!(
"Failed to cleanup server '{}' after init failure: {}",
server.name(),
cleanup_err
);
}
return Err(anyhow::anyhow!(
"Failed to initialize stdin MCP server '{}': {}",
server.name(),
e
));
}
let stdin_url = format!("stdin://{}", server.name());
Ok(stdin_url)
}
McpConnectionType::Builtin => Err(anyhow::anyhow!(
"Builtin servers should not use process management"
)),
}
}
async fn initialize_stdin_server(server_name: &str) -> Result<()> {
let (role, project) = get_session_context();
let init_message = json!({
"jsonrpc": "2.0",
"id": 1, "method": "initialize",
"params": {
"clientInfo": {
"name": "octomind",
"version": env!("CARGO_PKG_VERSION")
},
"protocolVersion": "2025-03-26",
"capabilities": {
"experimental": {
"session": {
"role": role,
"project": project
}
}
}
}
});
let response = communicate_with_stdin_server(server_name, &init_message, 1, None).await?;
if let Some(error) = response.get("error") {
return Err(anyhow::anyhow!(
"Server returned error during initialization: {}",
error
));
}
if response.get("result").is_none() {
return Err(anyhow::anyhow!(
"Server did not return a valid result during initialization"
));
}
let initialized_message = json!({
"jsonrpc": "2.0",
"method": "notifications/initialized",
"params": {}
});
let _ = try_communicate_with_stdin_server(server_name, &initialized_message, 0).await;
Ok(())
}
async fn can_connect(url: &str) -> bool {
if url.starts_with("stdin://") {
return true;
}
match reqwest::Client::new().get(url).send().await {
Ok(response) => response.status().is_success(),
Err(_) => false,
}
}
fn get_server_url(server: &McpServerConfig) -> Result<String> {
if let Some(url) = server.url() {
return Ok(url.to_string());
}
if let McpConnectionType::Stdin = server.connection_type() {
return Ok(format!("stdin://{}", server.name()));
}
Ok("http://localhost:8008".to_string())
}
pub async fn communicate_with_stdin_server(
server_name: &str,
message: &Value,
override_id: u64,
cancellation_token: Option<tokio::sync::watch::Receiver<bool>>,
) -> Result<Value> {
communicate_with_stdin_server_extended_timeout(
server_name,
message,
override_id,
15,
cancellation_token,
)
.await
}
pub async fn communicate_with_stdin_server_extended_timeout(
server_name: &str,
message: &Value,
override_id: u64,
timeout_seconds: u64,
cancellation_token: Option<tokio::sync::watch::Receiver<bool>>,
) -> Result<Value> {
if let Some(ref token) = cancellation_token {
if *token.borrow() {
return Err(anyhow::anyhow!("Operation cancelled before communication"));
}
}
let server_process = {
let processes = SERVER_PROCESSES
.read()
.map_err(|_| anyhow::anyhow!("Failed to acquire read lock on server processes"))?;
processes
.get(server_name)
.cloned()
.ok_or_else(|| anyhow::anyhow!("Server not found: {}", server_name))?
};
let in_flight_arc = {
let in_flight_map = SERVER_IN_FLIGHT.read().unwrap();
in_flight_map
.get(server_name)
.cloned()
.ok_or_else(|| anyhow::anyhow!("No in-flight slot for server: {}", server_name))?
};
let previous_handle = in_flight_arc.lock().unwrap().take();
if let Some(handle) = previous_handle {
let wait_secs = std::time::Duration::from_secs(5);
if tokio::time::timeout(wait_secs, handle).await.is_err() {
crate::log_debug!(
"Previous in-flight task for server '{}' did not finish in time — marking dead for restart",
server_name
);
let mut restart_info_guard = SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard
.entry(server_name.to_string())
.or_default();
info.health_status = ServerHealth::Dead;
return Err(anyhow::anyhow!(
"Server '{}' previous operation timed out — will restart on next call",
server_name
));
}
}
let (final_message, request_id, child_pid) = {
let mut process_guard = server_process
.lock()
.map_err(|_| anyhow::anyhow!("Failed to acquire lock on server process"))?;
match &mut *process_guard {
ServerProcess::Stdin {
next_id,
is_shutdown,
child,
..
} => {
if is_shutdown.load(Ordering::SeqCst) {
return Err(anyhow::anyhow!("Server {} is shut down", server_name));
}
let actual_id = if override_id > 0 {
override_id
} else {
next_id.fetch_add(1, Ordering::SeqCst)
};
let mut final_msg = message.clone();
if let Some(obj) = final_msg.as_object_mut() {
obj.insert("id".to_string(), json!(actual_id));
if !obj.contains_key("jsonrpc") {
obj.insert("jsonrpc".to_string(), json!("2.0"));
}
}
let pid = child.id();
(final_msg, actual_id, pid)
}
_ => {
return Err(anyhow::anyhow!(
"Server {} is not a stdin-based server",
server_name
))
}
}
};
let server_name_for_error = server_name.to_string();
let server_name_for_closure = server_name.to_string();
let final_message_clone = final_message.clone();
let request_id_clone = request_id;
let blocking_handle = tokio::task::spawn_blocking(move || {
let mut process = server_process
.lock()
.map_err(|_| anyhow::anyhow!("Failed to acquire lock on server process"))?;
match &mut *process {
ServerProcess::Stdin {
writer,
reader,
is_shutdown,
..
} => {
if is_shutdown.load(Ordering::SeqCst) {
return Err(anyhow::anyhow!(
"Server {} is shut down",
server_name_for_closure
));
}
let mut message_str = serde_json::to_string(&final_message_clone)?
.trim_end()
.to_string();
message_str.push('\n');
match writer.write_all(message_str.as_bytes()) {
Ok(_) => {}
Err(e) => {
if e.kind() == std::io::ErrorKind::BrokenPipe {
{
let mut restart_info_guard = SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard
.entry(server_name_for_closure.clone())
.or_default();
info.health_status = ServerHealth::Dead;
}
crate::log_debug!("Broken pipe detected on write for server '{}', marking for cleanup", server_name_for_closure);
return Err(anyhow::anyhow!(
"Server '{}' appears to have died (broken pipe on write). Will attempt restart on next call.",
server_name_for_closure
));
}
return Err(anyhow::anyhow!("Failed to write to stdin: {}", e));
}
}
match writer.flush() {
Ok(_) => {}
Err(e) => {
if e.kind() == std::io::ErrorKind::BrokenPipe {
{
let mut restart_info_guard = SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard
.entry(server_name_for_closure.clone())
.or_default();
info.health_status = ServerHealth::Dead;
}
crate::log_debug!("Broken pipe detected on flush for server '{}', marking for cleanup", server_name_for_closure);
return Err(anyhow::anyhow!(
"Server '{}' appears to have died (broken pipe on flush). Will attempt restart on next call.",
server_name_for_closure
));
}
return Err(anyhow::anyhow!("Failed to flush stdin: {}", e));
}
}
let response = loop {
let mut response_str = String::new();
let read_result = reader
.read_line(&mut response_str)
.map_err(|e| anyhow::anyhow!("Failed to read from stdout: {}", e))?;
if read_result == 0 {
return Err(anyhow::anyhow!(
"Server closed connection while reading response"
));
}
let msg: Value = match serde_json::from_str(&response_str) {
Ok(v) => v,
Err(_) => {
let trimmed = response_str.trim();
if !trimmed.is_empty() {
eprintln!(
"⚠️ MCP '{}' prints: {}",
server_name_for_closure, trimmed
);
}
continue;
}
};
if msg.get("method").is_some() && msg.get("id").is_none() {
let method = msg
.get("method")
.and_then(|m| m.as_str())
.unwrap_or("unknown");
let params = msg
.get("params")
.cloned()
.unwrap_or(serde_json::Value::Null);
emit_notification(&server_name_for_closure, method, ¶ms);
continue;
}
break msg;
};
let response_id = response.get("id").and_then(|id| id.as_u64()).unwrap_or(0);
if response_id != request_id_clone && override_id > 0 {
return Err(anyhow::anyhow!(
"Response ID {} does not match request ID {}",
response_id,
request_id_clone
));
}
Ok(response)
}
ServerProcess::Http(_) => Err(anyhow::anyhow!(
"Server {} is not a stdin-based server",
server_name_for_closure
)),
}
});
let cancellation_token_clone = cancellation_token.clone();
let cancellation_future = async move {
if let Some(mut token) = cancellation_token_clone {
while !*token.borrow() {
if token.changed().await.is_err() {
break;
}
}
} else {
std::future::pending::<()>().await;
}
};
let mut handle_opt = Some(blocking_handle);
tokio::select! {
result = tokio::time::timeout(
std::time::Duration::from_secs(timeout_seconds),
handle_opt.take().unwrap(),
) => {
*in_flight_arc.lock().unwrap() = None;
match result {
Ok(task_result) => task_result?,
Err(_) => Err(anyhow::anyhow!("Timeout ({} seconds) communicating with stdin server: {}", timeout_seconds, server_name_for_error))
}
},
_ = cancellation_future => {
*in_flight_arc.lock().unwrap() = handle_opt.take();
#[cfg(unix)]
{
let pgid = child_pid as libc::pid_t;
unsafe {
libc::kill(-pgid, libc::SIGTERM);
}
crate::log_debug!(
"Sent SIGTERM to process group {} (server '{}') on cancellation",
pgid,
server_name_for_error
);
std::thread::sleep(std::time::Duration::from_millis(200));
unsafe {
libc::kill(-pgid, libc::SIGKILL);
}
crate::log_debug!(
"Sent SIGKILL to process group {} (server '{}') on cancellation",
pgid,
server_name_for_error
);
}
{
let mut restart_info_guard = SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard
.entry(server_name_for_error.clone())
.or_default();
info.health_status = ServerHealth::Dead;
}
Err(anyhow::anyhow!("Operation cancelled while communicating with server: {}", server_name_for_error))
}
}
}
pub async fn get_stdin_server_functions(server: &McpServerConfig) -> Result<Vec<McpFunction>> {
let message = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list", "params": {}
});
let response = communicate_with_stdin_server(server.name(), &message, 1, None).await?;
let mut functions = Vec::new();
if let Some(error) = response.get("error") {
crate::log_error!(
"Warning: Server returned error during tools/list: {}",
error
);
return Ok(functions); }
if let Some(result) = response.get("result") {
if let Some(tools) = result.get("tools").and_then(|t| t.as_array()) {
for tool in tools {
if let (Some(name), Some(description)) = (
tool.get("name").and_then(|n| n.as_str()),
tool.get("description").and_then(|d| d.as_str()),
) {
if server.tools().is_empty()
|| crate::mcp::is_tool_allowed_by_patterns(name, server.tools())
{
let parameters = tool.get("inputSchema").cloned().unwrap_or(json!({}));
functions.push(McpFunction {
name: name.to_string(),
description: description.to_string(),
parameters,
});
}
}
}
}
} else {
crate::log_debug!("Invalid response format from tools/list: {}", response);
}
Ok(functions)
}
pub async fn execute_stdin_tool_call(
call: &McpToolCall,
server: &McpServerConfig,
cancellation_token: Option<tokio::sync::watch::Receiver<bool>>,
) -> Result<McpToolResult> {
let message = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call", "params": {
"name": call.tool_name,
"arguments": call.parameters
}
});
let response = match communicate_with_stdin_server_extended_timeout(
server.name(),
&message,
1,
server.timeout_seconds(),
cancellation_token,
)
.await
{
Ok(resp) => resp,
Err(e) => {
crate::log_error!("Error executing tool call '{}': {}", call.tool_name, e);
return Ok(McpToolResult::error(
call.tool_name.clone(),
call.tool_id.clone(),
format!("Error executing tool: {}", e),
));
}
};
if let Some(error) = response.get("error") {
let error_message = error
.get("message")
.and_then(|m| m.as_str())
.unwrap_or("Unknown error");
let error_code = error.get("code").and_then(|c| c.as_i64()).unwrap_or(-1);
let _output = json!({
"error": true,
"success": false,
"message": error_message,
"code": error_code
});
return Ok(McpToolResult::error(
call.tool_name.clone(),
call.tool_id.clone(),
format!("{} (code: {})", error_message, error_code),
));
}
let output = response
.get("result")
.cloned()
.unwrap_or(json!("No result"));
let tool_result = McpToolResult::success(
call.tool_name.clone(),
call.tool_id.clone(),
crate::mcp::extract_mcp_content(&output),
);
Ok(tool_result)
}
pub fn stop_all_servers() -> Result<()> {
let mut processes = SERVER_PROCESSES.write().unwrap();
for (name, process_arc) in processes.iter() {
crate::log_debug!("Stopping MCP server: {}", name);
match process_arc.try_lock() {
Ok(mut process) => {
if let Err(e) = process.kill() {
crate::log_error!("Failed to kill MCP server '{}': {}", name, e);
}
}
Err(_) => {
crate::log_debug!("Could not acquire lock for server '{}', may be busy", name);
}
}
}
processes.clear();
{
let mut in_flight_map = SERVER_IN_FLIGHT.write().unwrap();
in_flight_map.clear();
}
crate::mcp::server::clear_all_function_cache();
{
let mut mutexes = SERVER_RESTART_MUTEXES.write().unwrap();
mutexes.clear();
crate::log_debug!("Cleared all server restart mutexes");
}
Ok(())
}
pub fn cleanup_server_process(server_name: &str) -> Result<()> {
let mut processes = SERVER_PROCESSES.write().unwrap();
if let Some(process_arc) = processes.remove(server_name) {
match process_arc.try_lock() {
Ok(mut process) => {
crate::log_debug!("Cleaning up server process '{}'", server_name);
if let Err(e) = process.kill() {
crate::log_debug!("Failed to kill server process '{}': {}", server_name, e);
}
}
Err(_) => {
crate::log_debug!(
"Could not acquire lock for server '{}' during cleanup",
server_name
);
}
}
crate::mcp::server::clear_function_cache_for_server(server_name);
{
let mut in_flight_map = SERVER_IN_FLIGHT.write().unwrap();
in_flight_map.remove(server_name);
}
cleanup_server_restart_mutex(server_name);
crate::log_debug!("Server '{}' removed from registry", server_name);
Ok(())
} else {
Err(anyhow::anyhow!(
"Server '{}' not found in registry",
server_name
))
}
}
pub fn is_server_running(server_name: &str) -> bool {
let processes = SERVER_PROCESSES.read().unwrap();
if let Some(process_arc) = processes.get(server_name) {
let mut process = process_arc.lock().unwrap();
let is_alive = process
.try_wait()
.map(|status| status.is_none())
.unwrap_or(false);
{
let mut restart_info_guard = SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard
.entry(server_name.to_string())
.or_default();
info.health_status = if is_alive {
ServerHealth::Running
} else {
ServerHealth::Dead
};
info.last_health_check = Some(SystemTime::now());
}
is_alive
} else {
{
let mut restart_info_guard = SERVER_RESTART_INFO.write().unwrap();
let info = restart_info_guard
.entry(server_name.to_string())
.or_default();
info.last_health_check = Some(SystemTime::now());
}
false }
}
pub fn get_server_health(server_name: &str) -> ServerHealth {
let restart_info_guard = SERVER_RESTART_INFO.read().unwrap();
restart_info_guard
.get(server_name)
.map(|info| info.health_status)
.unwrap_or(ServerHealth::Dead)
}
pub fn get_server_restart_info(server_name: &str) -> ServerRestartInfo {
let restart_info_guard = SERVER_RESTART_INFO.read().unwrap();
restart_info_guard
.get(server_name)
.cloned()
.unwrap_or_default()
}
pub fn reset_server_failure_state(server_name: &str) -> Result<()> {
let mut restart_info_guard = SERVER_RESTART_INFO.write().unwrap();
if let Some(info) = restart_info_guard.get_mut(server_name) {
info.restart_count = 0;
info.consecutive_failures = 0;
info.health_status = ServerHealth::Dead; crate::log_debug!("Reset failure state for server '{}'", server_name);
Ok(())
} else {
Err(anyhow::anyhow!(
"Server '{}' not found in restart tracking",
server_name
))
}
}
pub async fn perform_health_check_all_servers() -> HashMap<String, ServerHealth> {
let mut health_status = HashMap::new();
let server_names: Vec<String> = {
let processes = SERVER_PROCESSES.read().unwrap();
processes.keys().cloned().collect()
};
for server_name in server_names {
let is_running = is_server_running(&server_name);
let health = if is_running {
ServerHealth::Running
} else {
ServerHealth::Dead
};
health_status.insert(server_name.clone(), health);
crate::log_debug!("Health check: Server '{}' is {:?}", server_name, health);
}
health_status
}
pub fn get_server_status_report() -> HashMap<String, (ServerHealth, ServerRestartInfo)> {
let mut report = HashMap::new();
let restart_info_guard = SERVER_RESTART_INFO.read().unwrap();
for (server_name, info) in restart_info_guard.iter() {
let current_health = get_server_health(server_name);
report.insert(server_name.clone(), (current_health, info.clone()));
}
report
}
async fn try_communicate_with_stdin_server(
server_name: &str,
message: &Value,
override_id: u64,
) -> Result<()> {
if let Err(e) = communicate_with_stdin_server(server_name, message, override_id, None).await {
crate::log_error!("Warning: Error sending notification to MCP server: {}", e);
}
Ok(())
}