use super::lifecycle::spawn_proxy_if_needed;
use crate::{core, mcp_stdio, tools};
use anyhow::Result;
pub(super) fn run_mcp_server() -> Result<()> {
use rmcp::ServiceExt;
std::env::set_var("LEAN_CTX_MCP_SERVER", "1");
crate::core::startup_guard::crash_loop_backoff(crate::core::startup_guard::MCP_PROCESS_NAME);
cleanup_orphan_mcp_processes();
let startup_lock = crate::core::startup_guard::try_acquire_lock(
"mcp-startup",
std::time::Duration::from_secs(3),
std::time::Duration::from_secs(30),
);
let parallelism = std::thread::available_parallelism().map_or(2, std::num::NonZeroUsize::get);
let worker_threads = resolve_worker_threads(parallelism);
let max_blocking_threads = (worker_threads * 4).clamp(8, 32);
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(worker_threads)
.max_blocking_threads(max_blocking_threads)
.enable_all()
.build()?;
let server = tools::create_server();
drop(startup_lock);
spawn_proxy_if_needed();
rt.block_on(async {
core::logging::init_mcp_logging();
core::protocol::set_mcp_context(true);
tracing::info!(
"lean-ctx v{} MCP server starting",
env!("CARGO_PKG_VERSION")
);
spawn_parent_watchdog();
let transport =
mcp_stdio::HybridStdioTransport::new_server(tokio::io::stdin(), tokio::io::stdout());
let server_handle = server.clone();
let service = match server.serve(transport).await {
Ok(s) => s,
Err(e) => {
let msg = e.to_string();
if msg.contains("expect initialized")
|| msg.contains("context canceled")
|| msg.contains("broken pipe")
{
tracing::debug!("Client disconnected before init: {msg}");
return Ok(());
}
return Err(e.into());
}
};
match service.waiting().await {
Ok(reason) => {
tracing::info!("MCP server stopped: {reason:?}");
}
Err(e) => {
let msg = e.to_string();
if msg.contains("broken pipe")
|| msg.contains("connection reset")
|| msg.contains("context canceled")
{
tracing::info!("MCP server: transport closed ({msg})");
} else {
tracing::error!("MCP server error: {msg}");
}
}
}
server_handle.shutdown().await;
core::stats::flush();
core::heatmap::flush();
core::mode_predictor::ModePredictor::flush();
core::feedback::FeedbackStore::flush();
Ok(())
})
}
fn cleanup_orphan_mcp_processes() {
#[cfg(unix)]
{
let my_pid = std::process::id();
let pids = crate::ipc::process::find_pids_by_name("lean-ctx");
for pid in pids {
if pid == my_pid {
continue;
}
if !is_orphan_mcp(pid) {
continue;
}
tracing::info!("[orphan-cleanup] killing orphan MCP process {pid} (parent=1)");
let _ = crate::ipc::process::terminate_gracefully(pid);
}
}
}
#[cfg(unix)]
fn is_orphan_mcp(pid: u32) -> bool {
let Ok(output) = std::process::Command::new("ps")
.args(["-o", "ppid=,command=", "-p", &pid.to_string()])
.output()
else {
return false;
};
let text = String::from_utf8_lossy(&output.stdout);
let line = text.trim();
if line.is_empty() {
return false;
}
let ppid_str = line.split_whitespace().next().unwrap_or("");
let ppid: u32 = ppid_str.trim().parse().unwrap_or(0);
ppid <= 1 && (line.contains("serve") || line.contains("mcp") || !line.contains("daemon"))
}
fn spawn_parent_watchdog() {
#[cfg(unix)]
{
let ppid = unsafe { libc::getppid() } as u32;
if ppid <= 1 {
return;
}
std::thread::Builder::new()
.name("parent-watchdog".into())
.spawn(move || {
loop {
std::thread::sleep(std::time::Duration::from_secs(5));
let current_ppid = unsafe { libc::getppid() } as u32;
if current_ppid != ppid || current_ppid <= 1 {
tracing::info!(
"[parent-watchdog] parent PID changed ({ppid} → {current_ppid}), \
IDE likely closed — exiting to prevent orphan"
);
core::stats::flush();
core::heatmap::flush();
std::process::exit(0);
}
}
})
.ok();
}
}
pub(super) fn resolve_worker_threads(parallelism: usize) -> usize {
std::env::var("LEAN_CTX_WORKER_THREADS")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or_else(|| parallelism.clamp(1, 4))
}