use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::Weak;
use std::time::Duration;
use std::time::Instant;
use anyhow::Result;
use tokio::sync::Mutex;
fn ttl_for_language(language: &str, global: Duration) -> Duration {
match language {
"kotlin" => Duration::from_secs(2 * 3600),
_ => global,
}
}
fn restart_cost_tier(language: &str) -> u8 {
match language {
"kotlin" | "java" => 2,
_ => 1,
}
}
use super::client::{LspClient, LspServerConfig};
use super::servers;
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub struct LspKey {
pub language: String,
pub project_root: PathBuf,
}
impl LspKey {
pub fn new(language: &str, project_root: &Path) -> Self {
Self {
language: language.to_string(),
project_root: project_root.to_path_buf(),
}
}
}
pub struct LspManager {
clients: Mutex<HashMap<LspKey, Arc<LspClient>>>,
last_used: Mutex<HashMap<LspKey, Instant>>,
starting: StdMutex<HashMap<LspKey, tokio::sync::watch::Receiver<Option<bool>>>>,
max_clients: usize,
idle_ttl: Duration,
pending_first_response: StdMutex<HashMap<LspKey, i64>>,
pub(crate) pending_reason: StdMutex<HashMap<LspKey, String>>,
project_root: Option<std::path::PathBuf>,
startup_failures: StdMutex<HashMap<LspKey, (usize, Instant)>>,
cold_start_until: StdMutex<HashMap<LspKey, Instant>>,
#[cfg(test)]
project_root_for_test: Option<std::path::PathBuf>,
}
impl Default for LspManager {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for LspKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}@{}", self.language, self.project_root.display())
}
}
struct StartingCleanup<'a> {
starting: &'a StdMutex<HashMap<LspKey, tokio::sync::watch::Receiver<Option<bool>>>>,
key: LspKey,
}
impl Drop for StartingCleanup<'_> {
fn drop(&mut self) {
if let Ok(mut map) = self.starting.lock() {
map.remove(&self.key);
}
}
}
#[cfg(unix)]
const MUX_STDERR_TAIL_LINES: usize = 40;
#[cfg(unix)]
pub(super) fn mux_failure_is_index_contention(detail: &str) -> bool {
detail.contains("Resource temporarily unavailable")
|| detail.contains("RocksDBException")
|| detail.contains("another mux instance holds the lock")
}
#[cfg(unix)]
pub(super) fn mux_failure_report(stdout_line: &str, stderr_tail: &[String]) -> (String, String) {
let detail = stderr_tail.join("\n");
let summary = if !detail.trim().is_empty() {
detail.trim().to_string()
} else if !stdout_line.trim().is_empty() {
stdout_line.trim().to_string()
} else {
"(no diagnostic output — mux exited silently)".to_string()
};
let message = format!("mux process failed to start: {summary}");
let hint = if mux_failure_is_index_contention(&format!("{stdout_line}\n{detail}")) {
"The workspace's LSP index is locked by another running server (RocksDB/mux lock). \
A stale or concurrent LSP for this workspace holds it. Close other sessions on this \
workspace, or locate the holder with `fuser <lsp-home>/.../rocks/*/LOCK` and stop it, \
then retry."
.to_string()
} else {
"Check that another codescout mux isn't already running for this workspace and that \
the lock-file directory is writable; the detail above is the mux child's own stderr."
.to_string()
};
(message, hint)
}
#[cfg(unix)]
fn posix_write_lock_is_held(path: &std::path::Path) -> bool {
use std::os::unix::io::AsRawFd;
let file = match std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(path)
{
Ok(f) => f,
Err(_) => return false,
};
let mut fl: libc::flock = unsafe { std::mem::zeroed() };
fl.l_type = libc::F_WRLCK as libc::c_short;
fl.l_whence = libc::SEEK_SET as libc::c_short;
fl.l_start = 0;
fl.l_len = 0; let rc = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_SETLK, &fl) };
if rc == -1 {
return matches!(
std::io::Error::last_os_error().raw_os_error(),
Some(libc::EAGAIN) | Some(libc::EACCES)
);
}
let mut unlock: libc::flock = unsafe { std::mem::zeroed() };
unlock.l_type = libc::F_UNLCK as libc::c_short;
unlock.l_whence = libc::SEEK_SET as libc::c_short;
unsafe { libc::fcntl(file.as_raw_fd(), libc::F_SETLK, &unlock) };
false
}
#[cfg(unix)]
fn kotlin_index_lock_held(language: &str, workspace_root: &std::path::Path) -> bool {
if language != "kotlin" {
return false;
}
let ws_hash = crate::lsp::mux::workspace_hash(workspace_root);
let analyzer_dir =
crate::lsp::servers::kotlin_analyzer_home(&ws_hash).join(".config/JetBrains/analyzer");
if !analyzer_dir.exists() {
return false;
}
walkdir::WalkDir::new(&analyzer_dir)
.into_iter()
.filter_map(Result::ok)
.any(|e| e.file_name() == "LOCK" && posix_write_lock_is_held(e.path()))
}
#[cfg(unix)]
fn pids_holding_fd_on(lock_path: &std::path::Path, canon: &std::path::Path) -> Vec<i32> {
let mut pids = Vec::new();
let Ok(proc_dir) = std::fs::read_dir("/proc") else {
return pids;
};
for entry in proc_dir.flatten() {
let name = entry.file_name();
let Some(pid) = name.to_str().and_then(|s| s.parse::<i32>().ok()) else {
continue;
};
if pid == std::process::id() as i32 {
continue;
}
let Ok(fds) = std::fs::read_dir(entry.path().join("fd")) else {
continue;
};
for fd in fds.flatten() {
if let Ok(target) = std::fs::read_link(fd.path()) {
if target == *canon || target == *lock_path {
pids.push(pid);
break;
}
}
}
}
pids
}
#[cfg(unix)]
async fn reap_holders_of_lock(lock_path: &std::path::Path) -> anyhow::Result<bool> {
if !posix_write_lock_is_held(lock_path) {
return Ok(false);
}
let canon = std::fs::canonicalize(lock_path).unwrap_or_else(|_| lock_path.to_path_buf());
let holders = pids_holding_fd_on(lock_path, &canon);
if holders.is_empty() {
return Ok(false);
}
for &pid in &holders {
unsafe {
libc::kill(pid, libc::SIGTERM);
}
tracing::warn!("reaped orphan index-lock holder pid={pid} (SIGTERM)");
}
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
for pid in pids_holding_fd_on(lock_path, &canon) {
unsafe {
libc::kill(pid, libc::SIGKILL);
}
tracing::warn!("orphan index-lock holder pid={pid} survived SIGTERM → SIGKILL");
}
Ok(true)
}
#[cfg(unix)]
async fn reap_orphan_index_holder(
language: &str,
workspace_root: &std::path::Path,
) -> anyhow::Result<bool> {
if language != "kotlin" {
return Ok(false);
}
let ws_hash = crate::lsp::mux::workspace_hash(workspace_root);
let analyzer_dir =
crate::lsp::servers::kotlin_analyzer_home(&ws_hash).join(".config/JetBrains/analyzer");
if !analyzer_dir.exists() {
return Ok(false);
}
let mut any = false;
for e in walkdir::WalkDir::new(&analyzer_dir)
.into_iter()
.filter_map(Result::ok)
{
if e.file_name() == "LOCK" {
any |= reap_holders_of_lock(e.path()).await?;
}
}
Ok(any)
}
#[cfg(unix)]
pub(super) fn build_mux_args(
workspace_root: &std::path::Path,
socket_path: &std::path::Path,
lock_path: &std::path::Path,
config: &crate::lsp::client::LspServerConfig,
) -> Vec<String> {
let idle = config.idle_timeout_secs.unwrap_or(300);
let mut args = vec![
"mux".to_string(),
"--socket".to_string(),
socket_path.to_string_lossy().to_string(),
"--lock".to_string(),
lock_path.to_string_lossy().to_string(),
"--cwd".to_string(),
workspace_root.to_string_lossy().to_string(),
"--idle-timeout".to_string(),
idle.to_string(),
];
for (k, v) in &config.env {
args.push("--env".to_string());
args.push(format!("{k}={v}"));
}
args.push("--".to_string());
args.push(config.command.clone());
args.extend(config.args.iter().cloned());
args
}
fn strip_deleted_suffix(exe: &Path) -> Option<PathBuf> {
exe.to_string_lossy()
.strip_suffix(" (deleted)")
.map(PathBuf::from)
}
fn stable_codescout_binary() -> Option<PathBuf> {
let candidates = [
std::env::var_os("CARGO_HOME").map(|c| PathBuf::from(c).join("bin").join("codescout")),
std::env::var_os("HOME").map(|h| PathBuf::from(h).join(".cargo/bin/codescout")),
];
candidates.into_iter().flatten().find(|p| p.exists())
}
fn resolve_mux_binary() -> Result<PathBuf> {
let exe = std::env::current_exe()
.map_err(|e| anyhow::anyhow!("failed to determine codescout binary path: {e}"))?;
if exe.exists() {
return Ok(exe);
}
if let Some(live) = strip_deleted_suffix(&exe) {
if live.exists() {
tracing::warn!(
"codescout binary was replaced since this server started; spawning mux from rebuilt {}",
live.display()
);
return Ok(live);
}
}
if let Some(fallback) = stable_codescout_binary() {
tracing::warn!(
"current_exe() ({}) is gone (binary rebuilt mid-session?); spawning mux from {}",
exe.display(),
fallback.display()
);
return Ok(fallback);
}
anyhow::bail!(
"codescout binary at {} is gone (rebuilt mid-session?) and no stable fallback was found — reconnect the MCP server (/mcp) to load the rebuilt binary",
exe.display()
)
}
pub(super) fn resolve_mux_flag(default: bool, override_: Option<bool>) -> bool {
override_.unwrap_or(default)
}
#[cfg_attr(windows, allow(dead_code))]
fn is_test_runner_exe(exe: &std::path::Path) -> bool {
exe.parent()
.and_then(|p| p.file_name())
.map(|n| n == "deps")
.unwrap_or(false)
}
#[cfg(unix)]
fn claim_mux_lock(lock_path: &std::path::Path) -> std::io::Result<Option<std::fs::File>> {
use fs4::fs_std::FileExt;
use std::os::unix::fs::OpenOptionsExt;
let file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.mode(0o600)
.open(lock_path)?;
match file.try_lock_exclusive() {
Ok(()) => Ok(Some(file)),
Err(_) => Ok(None),
}
}
#[cfg(unix)]
fn mux_socket_unreachable_error(
language: &str,
socket_path: &std::path::Path,
lock_path: &std::path::Path,
) -> anyhow::Error {
crate::tools::RecoverableError::with_hint(
format!(
"mux for {language} holds its lock ({}) but its socket ({}) is \
unreachable — the mux process is wedged or mid-restart",
lock_path.display(),
socket_path.display(),
),
"A live process holds this workspace's mux lock yet isn't accepting \
connections. Wait a few seconds and retry (it may be restarting); if it \
persists, locate the holder with `fuser` on the lock file and stop it, \
then retry.",
)
.into()
}
impl LspManager {
const CIRCUIT_BREAKER_MAX_FAILURES: usize = 5;
const CIRCUIT_BREAKER_WINDOW: Duration = Duration::from_secs(60);
const COLD_START_GRACE: Duration = Duration::from_secs(5 * 60);
pub const DEFAULT_IDLE_TTL: Duration = Duration::from_secs(30 * 60);
pub fn new() -> Self {
Self {
clients: Mutex::new(HashMap::new()),
last_used: Mutex::new(HashMap::new()),
starting: StdMutex::new(HashMap::new()),
max_clients: 10,
idle_ttl: Self::DEFAULT_IDLE_TTL,
pending_first_response: StdMutex::new(HashMap::new()),
pending_reason: StdMutex::new(HashMap::new()),
project_root: None,
startup_failures: StdMutex::new(HashMap::new()),
cold_start_until: StdMutex::new(HashMap::new()),
#[cfg(test)]
project_root_for_test: None,
}
}
pub async fn get_or_start(
&self,
language: &str,
workspace_root: &Path,
mux_override: Option<bool>,
) -> Result<Arc<LspClient>> {
let key = LspKey::new(language, workspace_root);
{
let clients = self.clients.lock().await;
if let Some(client) = clients.get(&key) {
if client.is_alive() {
drop(clients);
self.last_used
.lock()
.await
.insert(key.clone(), Instant::now());
let clients = self.clients.lock().await;
if let Some(client) = clients.get(&key) {
return Ok(client.clone());
}
}
}
}
{
let failures = self
.startup_failures
.lock()
.unwrap_or_else(|e| e.into_inner());
if let Some((count, first_failure)) = failures.get(&key) {
if first_failure.elapsed() < Self::CIRCUIT_BREAKER_WINDOW
&& *count >= Self::CIRCUIT_BREAKER_MAX_FAILURES
{
return Err(crate::tools::RecoverableError::with_hint(
format!(
"LSP server for {} failed to start {} times in {}s — circuit-breaker open",
language,
count,
first_failure.elapsed().as_secs(),
),
format!(
"Another process may hold the workspace lock. Check for other \
codescout instances or editors targeting this project. The breaker \
resets after {}s of inactivity.",
Self::CIRCUIT_BREAKER_WINDOW.as_secs()
),
)
.into());
}
}
}
let mut config = servers::default_config(language, workspace_root).ok_or_else(|| {
anyhow::anyhow!("No LSP server configured for language: {}", language)
})?;
config.mux = resolve_mux_flag(config.mux, mux_override);
#[cfg(unix)]
if config.mux {
match self
.get_or_start_via_mux(language, workspace_root, config.clone())
.await
{
Ok(client) => {
let key = LspKey::new(language, workspace_root);
{
let mut clients = self.clients.lock().await;
clients.insert(key.clone(), client.clone());
}
self.last_used.lock().await.insert(key, Instant::now());
return Ok(client);
}
Err(e) => {
if mux_failure_is_index_contention(&e.to_string()) {
return Err(e);
}
if kotlin_index_lock_held(language, workspace_root) {
return Err(crate::tools::RecoverableError::with_hint(
format!(
"kotlin LSP index is locked by another process — the mux \
could not start: {e}"
),
"Another kotlin-lsp holds this workspace's RocksDB index lock. \
Close other sessions on this workspace, or locate the holder \
with `fuser <kotlin-lsp-home>/.../rocks/*/LOCK` and stop it, \
then retry.",
)
.into());
}
let exe_is_test = std::env::current_exe()
.map(|p| is_test_runner_exe(&p))
.unwrap_or(false); if !exe_is_test {
return Err(crate::tools::RecoverableError::with_hint(
format!("mux startup failed for {language}: {e}"),
"codescout will not fall back to a direct LSP for a \
multiplexed language — that would open a second process \
on the shared index. Retry in a moment; if it persists, \
check for an orphaned LSP with \
`fuser <kotlin-lsp-home>/.../rocks/*/LOCK` and stop it.",
)
.into());
}
tracing::warn!(
"Mux startup failed for {language} in a test runner, \
falling back to direct LSP: {e}"
);
config.mux = false;
}
}
}
self.evict_lru_if_at_capacity().await;
let mut rx_opt = None;
let tx_opt;
{
let mut starting = self.starting.lock().unwrap_or_else(|e| e.into_inner());
if let Some(existing_rx) = starting.get(&key) {
rx_opt = Some(existing_rx.clone());
tx_opt = None;
} else {
let (tx, rx) = tokio::sync::watch::channel(None);
starting.insert(key.clone(), rx);
tx_opt = Some(tx);
}
}
if let Some(mut rx) = rx_opt {
let _ = rx.wait_for(|v| v.is_some()).await;
{
let clients = self.clients.lock().await;
if let Some(client) = clients.get(&key) {
if client.is_alive() {
return Ok(client.clone());
}
}
}
let (tx, rx) = tokio::sync::watch::channel(None);
{
let mut starting = self.starting.lock().unwrap_or_else(|e| e.into_inner());
starting.insert(key.clone(), rx);
}
return self.do_start(&key, config, tx).await;
}
self.do_start(&key, config, tx_opt.expect("tx_opt is always Some when rx_opt is None — set in the same exclusive branch above"))
.await
}
async fn evict_lru_if_at_capacity(&self) {
let evict_info: Option<(LspKey, Option<Arc<LspClient>>)> = {
let at_capacity = self.clients.lock().await.len() >= self.max_clients;
if at_capacity {
let oldest_key = {
let last_used = self.last_used.lock().await;
last_used
.iter()
.min_by_key(|(k, t)| (restart_cost_tier(&k.language), *t))
.map(|(k, _)| k.clone())
};
if let Some(oldest_key) = oldest_key {
let mut clients = self.clients.lock().await;
if clients.len() >= self.max_clients {
self.pending_reason
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(oldest_key.clone(), "lru_evicted".to_string());
self.pending_first_response
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(&oldest_key);
let evict_client = clients.remove(&oldest_key);
Some((oldest_key, evict_client))
} else {
None
}
} else {
None
}
} else {
None
}
};
if let Some((oldest_key, evict_client)) = evict_info {
self.last_used.lock().await.remove(&oldest_key);
if let Some(old) = evict_client {
tracing::info!("LRU evicting LSP client: {}", oldest_key);
let _ = old.shutdown().await;
}
}
}
#[cfg(unix)]
async fn get_or_start_via_mux(
&self,
language: &str,
workspace_root: &Path,
config: LspServerConfig,
) -> Result<Arc<LspClient>> {
use anyhow::Context;
let socket_path = crate::lsp::mux::socket_path_for_workspace(language, workspace_root);
let lock_path = crate::lsp::mux::lock_path_for_workspace(language, workspace_root);
let mut last_err: Option<anyhow::Error> = None;
for round in 0..2u32 {
let need_spawn =
match claim_mux_lock(&lock_path).context("Failed to open mux lock file")? {
Some(guard) => {
drop(guard);
true
}
None => {
if round > 0 {
return Err(mux_socket_unreachable_error(
language,
&socket_path,
&lock_path,
));
}
tracing::info!(
"mux already running for {}, connecting to {:?}",
language,
socket_path
);
false
}
};
if need_spawn {
let exe = resolve_mux_binary()?;
match reap_orphan_index_holder(language, workspace_root).await {
Ok(true) => {
tracing::info!("reaped orphan index holder for {language} before mux spawn")
}
Ok(false) => {}
Err(e) => tracing::warn!("orphan reap probe failed (continuing): {e}"),
}
let mux_args = build_mux_args(workspace_root, &socket_path, &lock_path, &config);
let mut child = tokio::process::Command::new(&exe)
.args(&mux_args)
.stdout(std::process::Stdio::piped())
.stdin(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.context("Failed to spawn mux process")?;
let stderr_tail = std::sync::Arc::new(std::sync::Mutex::new(
std::collections::VecDeque::<String>::with_capacity(MUX_STDERR_TAIL_LINES),
));
let stderr_drain = child.stderr.take().map(|stderr| {
let tail = stderr_tail.clone();
tokio::spawn(async move {
use tokio::io::AsyncBufReadExt;
let mut reader = tokio::io::BufReader::new(stderr);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) | Err(_) => break,
Ok(_) => {
let mut tail = tail.lock().unwrap_or_else(|e| e.into_inner());
if tail.len() == MUX_STDERR_TAIL_LINES {
tail.pop_front();
}
tail.push_back(line.trim_end().to_string());
}
}
}
})
});
let stdout = child.stdout.take().expect("stdout piped");
let mut reader = tokio::io::BufReader::new(stdout);
let mut line = String::new();
let read_result = tokio::time::timeout(
std::time::Duration::from_secs(120),
tokio::io::AsyncBufReadExt::read_line(&mut reader, &mut line),
)
.await;
let is_ready =
matches!(read_result, Ok(Ok(n)) if n > 0) && line.trim().starts_with("ready");
if is_ready {
tracing::info!("mux process ready for {} at {:?}", language, socket_path);
} else {
if let Some(handle) = stderr_drain {
let _ = tokio::time::timeout(std::time::Duration::from_millis(500), handle)
.await;
}
let tail: Vec<String> = stderr_tail
.lock()
.unwrap_or_else(|e| e.into_inner())
.iter()
.cloned()
.collect();
return Err(match read_result {
Err(_) => {
let extra = if tail.is_empty() {
String::new()
} else {
format!(" — mux stderr: {}", tail.join(" | "))
};
crate::tools::RecoverableError::with_hint(
format!("mux process timed out waiting for ready (120s){extra}"),
"The LSP server is slow to initialize (Gradle/Cargo index?). \
Retry in a moment; if the problem persists, check server logs.",
)
.into()
}
Ok(read) => {
let stdout_line = match read {
Ok(_) => line.trim().to_string(),
Err(e) => format!("(stdout read error: {e})"),
};
let (message, hint) = mux_failure_report(&stdout_line, &tail);
crate::tools::RecoverableError::with_hint(message, hint).into()
}
});
}
}
let mut connect_err = None;
let mut connected = None;
for attempt in 0..5u32 {
if attempt > 0 {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
match LspClient::connect(&socket_path, workspace_root.to_path_buf()).await {
Ok(client) => {
connected = Some(client);
break;
}
Err(e) => connect_err = Some(e),
}
}
match connected {
Some(client) => return Ok(Arc::new(client)),
None => {
let e = connect_err.expect("connect loop ran at least once");
if need_spawn {
return Err(e);
}
last_err = Some(e);
}
}
}
Err(last_err.expect("connect failed at least once before exhausting rounds"))
}
async fn do_start(
&self,
key: &LspKey,
config: LspServerConfig,
tx: tokio::sync::watch::Sender<Option<bool>>,
) -> Result<Arc<LspClient>> {
let _cleanup = StartingCleanup {
starting: &self.starting,
key: key.clone(),
};
let stale_client = {
let mut clients = self.clients.lock().await;
if let Some(client) = clients.get(key) {
if !client.is_alive() {
clients.remove(key)
} else {
None
}
} else {
None
}
};
if let Some(old) = stale_client {
let _ = old.shutdown().await;
}
let start_time = std::time::Instant::now();
let result = LspClient::start(config).await.map(Arc::new);
match result {
Ok(new_client) => {
{
let mut clients = self.clients.lock().await;
clients.insert(key.clone(), new_client.clone());
}
self.last_used
.lock()
.await
.insert(key.clone(), Instant::now());
let reason = self
.pending_reason
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(key)
.unwrap_or_else(|| "new_session".to_string());
let handshake_ms = start_time.elapsed().as_millis() as i64;
tracing::info!(
"LSP initialized in {}ms (language: {}, reason: {})",
handshake_ms,
key.language,
reason
);
let project_root_opt = self.project_root.clone();
#[cfg(test)]
let project_root_opt = self.project_root_for_test.clone().or(project_root_opt);
if let Some(root) = project_root_opt {
let lang = key.language.clone();
let reason_clone = reason.clone();
let rowid_result = tokio::task::spawn_blocking(move || {
let conn = crate::usage::db::open_db(&root)?;
crate::usage::db::write_lsp_event(&conn, &lang, &reason_clone, handshake_ms)
})
.await;
if let Ok(Ok(rowid)) = rowid_result {
self.pending_first_response
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(key.clone(), rowid);
}
}
self.startup_failures
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(key);
self.cold_start_until
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(key.clone(), Instant::now() + Self::COLD_START_GRACE);
let _ = tx.send(Some(true));
Ok(new_client)
}
Err(e) => {
{
let handshake_ms = start_time.elapsed().as_millis() as i64;
let reason = self
.pending_reason
.lock()
.unwrap_or_else(|p| p.into_inner())
.get(key)
.cloned()
.unwrap_or_else(|| "new_session".to_string());
let project_root_opt = self.project_root.clone();
#[cfg(test)]
let project_root_opt = self.project_root_for_test.clone().or(project_root_opt);
if let Some(root) = project_root_opt {
let lang = key.language.clone();
let err_str = e.to_string();
let _ = tokio::task::spawn_blocking(move || {
let conn = crate::usage::db::open_db(&root)?;
crate::usage::db::write_lsp_failure(
&conn,
&lang,
&reason,
handshake_ms,
&err_str,
)
})
.await;
}
}
let in_grace = self
.cold_start_until
.lock()
.unwrap_or_else(|e| e.into_inner())
.get(key)
.is_some_and(|until| Instant::now() < *until);
if in_grace {
tracing::info!(
"LSP startup failure for {} suppressed by cold-start grace period",
key
);
} else {
let mut failures = self
.startup_failures
.lock()
.unwrap_or_else(|e| e.into_inner());
let entry = failures.entry(key.clone()).or_insert((0, Instant::now()));
if entry.1.elapsed() >= Self::CIRCUIT_BREAKER_WINDOW {
*entry = (1, Instant::now());
} else {
entry.0 += 1;
}
if entry.0 >= Self::CIRCUIT_BREAKER_MAX_FAILURES {
tracing::warn!(
"LSP circuit-breaker tripped for {} ({} failures in {}s)",
key,
entry.0,
entry.1.elapsed().as_secs()
);
}
}
let _ = tx.send(Some(false));
Err(e)
}
}
}
pub async fn get(&self, language: &str, project_root: &Path) -> Option<Arc<LspClient>> {
let key = LspKey::new(language, project_root);
let clients = self.clients.lock().await;
clients.get(&key).filter(|c| c.is_alive()).cloned()
}
pub async fn shutdown_all(&self) {
let mut clients = self.clients.lock().await;
for (key, client) in clients.drain() {
tracing::info!("Shutting down LSP for: {}", key);
match client.shutdown().await {
Ok(()) => tracing::debug!("LSP server shut down cleanly: {}", key),
Err(e) => tracing::warn!("Error shutting down LSP for {}: {}", key, e),
}
}
self.last_used.lock().await.clear();
}
pub async fn active_languages(&self) -> Vec<String> {
let clients = self.clients.lock().await;
let mut langs: Vec<String> = clients
.iter()
.filter(|(_, c)| c.is_alive())
.map(|(key, _)| key.language.clone())
.collect();
langs.sort();
langs.dedup();
langs
}
pub async fn notify_file_changed(&self, path: &std::path::Path) {
let clients: Vec<_> = self
.clients
.lock()
.await
.iter()
.filter(|(key, _)| path.starts_with(&key.project_root))
.map(|(_, client)| client.clone())
.collect();
for client in clients {
if client.is_alive() {
let _ = client.did_change(path).await;
}
}
}
pub async fn record_first_response_inner(
&self,
language: &str,
workspace_root: &std::path::Path,
elapsed_ms: i64,
) {
let key = LspKey::new(language, workspace_root);
let pending = self
.pending_first_response
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(&key);
let Some(rowid) = pending else { return };
tracing::debug!(
"LSP first response in {}ms (language: {})",
elapsed_ms,
language
);
let project_root_opt = self.project_root.clone();
#[cfg(test)]
let project_root_opt = self.project_root_for_test.clone().or(project_root_opt);
let Some(root) = project_root_opt else { return };
let _ = tokio::task::spawn_blocking(move || {
if let Ok(conn) = crate::usage::db::open_db(&root) {
let _ = crate::usage::db::update_lsp_first_response(&conn, rowid, elapsed_ms);
}
})
.await;
}
#[cfg(test)]
pub fn starting_count_sync(&self) -> usize {
self.starting.lock().unwrap().len()
}
#[cfg(test)]
pub async fn get_or_start_for_test(
&self,
language: &str,
config: LspServerConfig,
) -> Result<Arc<LspClient>> {
let workspace_root = config.workspace_root.clone();
let key = LspKey::new(language, &workspace_root);
{
let clients = self.clients.lock().await;
if let Some(client) = clients.get(&key) {
if client.is_alive() {
return Ok(client.clone());
}
}
}
let mut rx_opt = None;
let tx_opt;
{
let mut starting = self.starting.lock().unwrap_or_else(|e| e.into_inner());
if let Some(existing_rx) = starting.get(&key) {
rx_opt = Some(existing_rx.clone());
tx_opt = None;
} else {
let (tx, rx) = tokio::sync::watch::channel(None);
starting.insert(key.clone(), rx);
tx_opt = Some(tx);
}
}
if let Some(mut rx) = rx_opt {
let _ = rx.wait_for(|v| v.is_some()).await;
{
let clients = self.clients.lock().await;
if let Some(client) = clients.get(&key) {
if client.is_alive() {
return Ok(client.clone());
}
}
}
let (tx, rx) = tokio::sync::watch::channel(None);
{
let mut starting = self.starting.lock().unwrap_or_else(|e| e.into_inner());
starting.insert(key.clone(), rx);
}
return self.do_start(&key, config, tx).await;
}
self.do_start(&key, config, tx_opt.expect("tx_opt is always Some when rx_opt is None — set in the same exclusive branch above"))
.await
}
#[cfg(test)]
pub async fn new_for_test_with_root(project_root: &std::path::Path) -> Arc<Self> {
let mut mgr = Self::new();
mgr.project_root_for_test = Some(project_root.to_path_buf());
Arc::new(mgr)
}
}
impl LspManager {
fn new_arc_inner(ttl: Duration, project_root: Option<std::path::PathBuf>) -> Arc<Self> {
let mut mgr = Self::new();
mgr.idle_ttl = ttl;
mgr.project_root = project_root;
let arc = Arc::new(mgr);
let weak = Arc::downgrade(&arc);
tokio::spawn(async move {
Self::idle_eviction_loop(weak, ttl).await;
});
arc
}
pub fn new_arc() -> Arc<Self> {
Self::new_arc_inner(Self::DEFAULT_IDLE_TTL, None)
}
pub fn new_arc_with_ttl(ttl: Duration) -> Arc<Self> {
Self::new_arc_inner(ttl, None)
}
pub fn new_arc_with_root(project_root: std::path::PathBuf) -> Arc<Self> {
Self::new_arc_inner(Duration::from_secs(30 * 60), Some(project_root))
}
pub(crate) async fn evict_idle(&self, ttl: Duration) {
let now = Instant::now();
let idle_keys: Vec<LspKey> = {
let last_used = self.last_used.lock().await;
last_used
.iter()
.filter(|(k, t)| now.duration_since(**t) > ttl_for_language(&k.language, ttl))
.map(|(k, _)| k.clone())
.collect()
};
for key in idle_keys {
let client = {
let mut clients = self.clients.lock().await;
self.pending_reason
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(key.clone(), "idle_evicted".to_string());
self.pending_first_response
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(&key);
clients.remove(&key)
};
self.last_used.lock().await.remove(&key);
if let Some(c) = client {
tracing::info!("Idle TTL evicting LSP client: {}", key);
let _ = c.shutdown().await;
}
}
}
async fn idle_eviction_loop(weak: Weak<Self>, ttl: Duration) {
let interval = ttl / 4;
loop {
tokio::time::sleep(interval).await;
match weak.upgrade() {
Some(mgr) => mgr.evict_idle(ttl).await,
None => break,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ra_config_or_skip(dir: &std::path::Path) -> Option<LspServerConfig> {
use std::process::Command as StdCommand;
if StdCommand::new("rust-analyzer")
.arg("--version")
.output()
.is_err()
{
return None;
}
std::fs::write(
dir.join("Cargo.toml"),
"[package]\nname = \"t\"\nversion = \"0.1.0\"\nedition = \"2021\"\n",
)
.unwrap();
std::fs::create_dir_all(dir.join("src")).unwrap();
std::fs::write(dir.join("src/lib.rs"), "pub fn f() {}").unwrap();
Some(LspServerConfig {
command: "rust-analyzer".into(),
args: vec![],
workspace_root: dir.to_path_buf(),
init_timeout: Some(std::time::Duration::from_secs(30)),
mux: false,
env: vec![],
idle_timeout_secs: None,
})
}
#[tokio::test]
async fn manager_starts_empty() {
let mgr = LspManager::new();
assert!(mgr.active_languages().await.is_empty());
assert!(mgr.get("rust", Path::new("/tmp")).await.is_none());
}
#[tokio::test]
async fn manager_errors_for_unknown_language() {
let mgr = LspManager::new();
let dir = tempfile::tempdir().unwrap();
let result = mgr.get_or_start("brainfuck", dir.path(), None).await;
assert!(result.is_err());
}
#[tokio::test]
async fn manager_shutdown_all_empty() {
let mgr = LspManager::new();
mgr.shutdown_all().await; }
#[tokio::test]
async fn failed_start_cleans_up_starting_map() {
let mgr = LspManager::new();
let dir = tempfile::tempdir().unwrap();
assert_eq!(mgr.starting_count_sync(), 0, "map should start empty");
let result = mgr.get_or_start("brainfuck", dir.path(), None).await;
assert!(result.is_err());
assert_eq!(
mgr.starting_count_sync(),
0,
"map should be clean after failed start"
);
}
#[cfg_attr(
windows,
ignore = "`sleep` is not available in Windows cmd.exe; fake-LSP needs a platform-native infinite-blocker"
)]
#[tokio::test]
async fn cancelled_get_or_start_cleans_up_starting_map() {
let dir = tempfile::tempdir().unwrap();
let mgr = LspManager::new();
assert_eq!(mgr.starting_count_sync(), 0, "map should start empty");
let config = LspServerConfig {
command: "sleep".into(),
args: vec!["99999".into()],
workspace_root: dir.path().to_path_buf(),
init_timeout: Some(std::time::Duration::from_secs(30)),
mux: false,
env: vec![],
idle_timeout_secs: None,
};
let cancelled = tokio::time::timeout(
std::time::Duration::from_millis(100),
mgr.get_or_start_for_test("fake-slow-lsp", config),
)
.await;
assert!(cancelled.is_err(), "expected outer timeout");
assert_eq!(
mgr.starting_count_sync(),
0,
"stale starting entry leaked after cancellation"
);
}
#[tokio::test]
async fn shutdown_all_stops_running_servers() {
use std::process::Command as StdCommand;
if StdCommand::new("rust-analyzer")
.arg("--version")
.output()
.is_err()
{
eprintln!("Skipping: rust-analyzer not installed");
return;
}
let dir = tempfile::tempdir().unwrap();
std::fs::write(
dir.path().join("Cargo.toml"),
"[package]\nname = \"t\"\nversion = \"0.1.0\"\nedition = \"2021\"\n",
)
.unwrap();
std::fs::create_dir_all(dir.path().join("src")).unwrap();
std::fs::write(dir.path().join("src/lib.rs"), "pub fn f() {}").unwrap();
let mgr = LspManager::new();
let client = mgr
.get_or_start("rust", dir.path(), Some(false))
.await
.unwrap();
assert!(client.is_alive());
mgr.shutdown_all().await;
assert!(!client.is_alive());
assert!(mgr.active_languages().await.is_empty());
}
#[tokio::test]
async fn same_language_different_roots_get_separate_clients() {
let key1 = LspKey::new("rust", Path::new("/project-a"));
let key2 = LspKey::new("rust", Path::new("/project-b"));
assert_ne!(key1, key2);
let mut map: HashMap<LspKey, &str> = HashMap::new();
map.insert(key1.clone(), "client-a");
map.insert(key2.clone(), "client-b");
assert_eq!(map.get(&key1), Some(&"client-a"));
assert_eq!(map.get(&key2), Some(&"client-b"));
}
#[test]
fn lsp_key_same_language_same_root_is_equal() {
let k1 = LspKey::new("typescript", Path::new("/workspace/mcp-server"));
let k2 = LspKey::new("typescript", Path::new("/workspace/mcp-server"));
assert_eq!(k1, k2);
}
#[test]
fn lsp_key_display() {
let key = LspKey::new("rust", Path::new("/my/project"));
assert_eq!(format!("{}", key), "rust@/my/project");
}
#[test]
fn kotlin_gets_2h_ttl_regardless_of_global() {
let global = Duration::from_secs(30 * 60);
assert_eq!(
ttl_for_language("kotlin", global),
Duration::from_secs(2 * 3600)
);
}
#[test]
fn non_kotlin_languages_use_global_ttl() {
let global = Duration::from_secs(30 * 60);
for lang in &["rust", "typescript", "java", "python", "go"] {
assert_eq!(
ttl_for_language(lang, global),
global,
"expected global TTL for language: {lang}"
);
}
}
#[tokio::test]
async fn evict_idle_clears_stale_last_used_entries() {
let mgr = LspManager::new();
let key = LspKey::new("rust", Path::new("/stale-project"));
let stale = Instant::now()
.checked_sub(std::time::Duration::from_millis(100))
.expect("process has been running > 100ms");
mgr.last_used.lock().await.insert(key.clone(), stale);
assert_eq!(mgr.last_used.lock().await.len(), 1);
mgr.evict_idle(std::time::Duration::from_millis(1)).await;
assert_eq!(mgr.last_used.lock().await.len(), 0);
}
#[test]
fn restart_cost_tier_orders_cheap_languages_first() {
assert_eq!(restart_cost_tier("rust"), 1);
assert_eq!(restart_cost_tier("typescript"), 1);
assert_eq!(restart_cost_tier("python"), 1);
assert_eq!(restart_cost_tier("javascript"), 1);
assert_eq!(restart_cost_tier("bash"), 1);
assert_eq!(restart_cost_tier("html"), 1);
assert_eq!(restart_cost_tier("kotlin"), 2);
assert_eq!(restart_cost_tier("java"), 2);
}
#[tokio::test]
async fn lru_eviction_prefers_cheap_languages_over_kotlin() {
let mgr = LspManager::new();
let kotlin_key = LspKey::new("kotlin", Path::new("/proj-a"));
let rust_key = LspKey::new("rust", Path::new("/proj-b"));
let kotlin_time = Instant::now()
.checked_sub(std::time::Duration::from_secs(60))
.expect("process has been running > 60s");
let rust_time = Instant::now();
{
let mut lu = mgr.last_used.lock().await;
lu.insert(kotlin_key.clone(), kotlin_time);
lu.insert(rust_key.clone(), rust_time);
}
let oldest_key = {
let last_used = mgr.last_used.lock().await;
last_used
.iter()
.min_by_key(|(k, t)| (restart_cost_tier(&k.language), *t))
.map(|(k, _)| k.clone())
};
assert_eq!(
oldest_key,
Some(rust_key),
"cost-aware LRU must pick rust over kotlin even though kotlin is older"
);
}
#[tokio::test]
async fn lru_eviction_evicts_kotlin_only_when_pool_is_all_expensive() {
let mgr = LspManager::new();
let kotlin_key = LspKey::new("kotlin", Path::new("/proj-a"));
let java_key = LspKey::new("java", Path::new("/proj-b"));
let kotlin_time = Instant::now()
.checked_sub(std::time::Duration::from_secs(60))
.expect("process has been running > 60s");
let java_time = Instant::now();
{
let mut lu = mgr.last_used.lock().await;
lu.insert(kotlin_key.clone(), kotlin_time);
lu.insert(java_key.clone(), java_time);
}
let oldest_key = {
let last_used = mgr.last_used.lock().await;
last_used
.iter()
.min_by_key(|(k, t)| (restart_cost_tier(&k.language), *t))
.map(|(k, _)| k.clone())
};
assert_eq!(
oldest_key,
Some(kotlin_key),
"with no cheap victims available, fall back to pure LRU and evict kotlin"
);
}
#[tokio::test]
async fn evict_idle_preserves_recent_entries() {
let mgr = LspManager::new();
let key = LspKey::new("typescript", Path::new("/fresh-project"));
mgr.last_used
.lock()
.await
.insert(key.clone(), Instant::now());
mgr.evict_idle(std::time::Duration::from_secs(3600)).await;
assert_eq!(mgr.last_used.lock().await.len(), 1);
}
#[tokio::test]
async fn cold_start_grace_suppresses_circuit_breaker_increment() {
let mgr = LspManager::new();
let key = LspKey::new("kotlin", std::path::Path::new("/proj"));
mgr.cold_start_until
.lock()
.unwrap()
.insert(key.clone(), Instant::now() + Duration::from_secs(300));
let in_grace = mgr
.cold_start_until
.lock()
.unwrap()
.get(&key)
.is_some_and(|until| Instant::now() < *until);
if !in_grace {
let mut failures = mgr.startup_failures.lock().unwrap();
let entry = failures.entry(key.clone()).or_insert((0, Instant::now()));
entry.0 += 1;
}
assert_eq!(
mgr.startup_failures.lock().unwrap().get(&key).map(|e| e.0),
None,
"circuit-breaker must not be incremented during cold-start grace"
);
}
#[tokio::test]
async fn cold_start_grace_expired_counts_failure() {
let mgr = LspManager::new();
let key = LspKey::new("kotlin", std::path::Path::new("/proj2"));
mgr.cold_start_until
.lock()
.unwrap()
.insert(key.clone(), Instant::now() - Duration::from_secs(1));
let in_grace = mgr
.cold_start_until
.lock()
.unwrap()
.get(&key)
.is_some_and(|until| Instant::now() < *until);
if !in_grace {
let mut failures = mgr.startup_failures.lock().unwrap();
let entry = failures.entry(key.clone()).or_insert((0, Instant::now()));
entry.0 += 1;
}
assert_eq!(
mgr.startup_failures.lock().unwrap().get(&key).map(|e| e.0),
Some(1),
"circuit-breaker must be incremented after grace period expires"
);
}
#[tokio::test]
async fn idle_background_task_evicts_after_ttl() {
use std::process::Command as StdCommand;
if StdCommand::new("rust-analyzer")
.arg("--version")
.output()
.is_err()
{
eprintln!("Skipping: rust-analyzer not installed");
return;
}
let dir = tempfile::tempdir().unwrap();
std::fs::write(
dir.path().join("Cargo.toml"),
"[package]\nname = \"t\"\nversion = \"0.1.0\"\nedition = \"2021\"\n",
)
.unwrap();
std::fs::create_dir_all(dir.path().join("src")).unwrap();
std::fs::write(dir.path().join("src/lib.rs"), "pub fn f() {}").unwrap();
let ttl = std::time::Duration::from_millis(300);
let mgr = LspManager::new_arc_with_ttl(ttl);
mgr.get_or_start("rust", dir.path(), Some(false))
.await
.unwrap();
assert!(
!mgr.active_languages().await.is_empty(),
"client should be alive"
);
tokio::time::sleep(ttl * 4).await;
assert!(
mgr.active_languages().await.is_empty(),
"idle client should have been evicted after TTL"
);
}
#[tokio::test]
async fn do_start_records_lsp_event_to_db() {
let dir = tempfile::TempDir::new().unwrap();
let Some(config) = ra_config_or_skip(dir.path()) else {
eprintln!("Skipping: rust-analyzer not installed");
return;
};
let mgr = LspManager::new_for_test_with_root(dir.path()).await;
mgr.get_or_start_for_test("rust", config).await.unwrap();
let conn = crate::usage::db::open_db(dir.path()).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM lsp_events", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 1);
let (lang, reason): (String, String) = conn
.query_row("SELECT language, reason FROM lsp_events LIMIT 1", [], |r| {
Ok((r.get(0)?, r.get(1)?))
})
.unwrap();
assert_eq!(lang, "rust");
assert_eq!(reason, "new_session");
}
#[tokio::test]
async fn do_start_records_failure_event_when_start_fails() {
let dir = tempfile::TempDir::new().unwrap();
let mgr = LspManager::new_for_test_with_root(dir.path()).await;
let config = LspServerConfig {
command: "codescout-nonexistent-lsp-binary-xyz".into(),
args: vec![],
workspace_root: dir.path().to_path_buf(),
init_timeout: Some(std::time::Duration::from_secs(5)),
mux: false,
env: vec![],
idle_timeout_secs: None,
};
let result = mgr.get_or_start_for_test("kotlin", config).await;
assert!(result.is_err(), "start with a bogus binary must fail");
let conn = crate::usage::db::open_db(dir.path()).unwrap();
let (outcome, error): (String, Option<String>) = conn
.query_row("SELECT outcome, error FROM lsp_events LIMIT 1", [], |r| {
Ok((r.get(0)?, r.get(1)?))
})
.unwrap();
assert_eq!(outcome, "failed");
assert!(error.is_some());
}
#[tokio::test]
async fn do_start_reason_evicted_consumes_pending_reason() {
let dir = tempfile::TempDir::new().unwrap();
let Some(config) = ra_config_or_skip(dir.path()) else {
eprintln!("Skipping: rust-analyzer not installed");
return;
};
let mgr = LspManager::new_for_test_with_root(dir.path()).await;
let key = LspKey::new("rust", dir.path());
mgr.pending_reason
.lock()
.unwrap()
.insert(key, "idle_evicted".to_string());
mgr.get_or_start_for_test("rust", config).await.unwrap();
assert!(mgr.pending_reason.lock().unwrap().is_empty());
let conn = crate::usage::db::open_db(dir.path()).unwrap();
let reason: String = conn
.query_row("SELECT reason FROM lsp_events LIMIT 1", [], |r| r.get(0))
.unwrap();
assert_eq!(reason, "idle_evicted");
}
#[tokio::test]
async fn record_first_response_consumes_pending_and_updates_db() {
let dir = tempfile::TempDir::new().unwrap();
let Some(config) = ra_config_or_skip(dir.path()) else {
eprintln!("Skipping: rust-analyzer not installed");
return;
};
let mgr = LspManager::new_for_test_with_root(dir.path()).await;
mgr.get_or_start_for_test("rust", config).await.unwrap();
mgr.record_first_response_inner("rust", dir.path(), 9100)
.await;
assert!(mgr.pending_first_response.lock().unwrap().is_empty());
let conn = crate::usage::db::open_db(dir.path()).unwrap();
let val: Option<i64> = conn
.query_row(
"SELECT first_response_ms FROM lsp_events LIMIT 1",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(val, Some(9100));
}
#[tokio::test]
async fn record_first_response_noop_when_no_pending() {
let dir = tempfile::TempDir::new().unwrap();
let mgr = LspManager::new_for_test_with_root(dir.path()).await;
mgr.record_first_response_inner("rust", dir.path(), 5000)
.await;
}
#[tokio::test]
async fn record_first_response_second_call_is_noop() {
let dir = tempfile::TempDir::new().unwrap();
let Some(config) = ra_config_or_skip(dir.path()) else {
eprintln!("Skipping: rust-analyzer not installed");
return;
};
let mgr = LspManager::new_for_test_with_root(dir.path()).await;
mgr.get_or_start_for_test("rust", config).await.unwrap();
mgr.record_first_response_inner("rust", dir.path(), 9100)
.await;
mgr.record_first_response_inner("rust", dir.path(), 1234)
.await;
let conn = crate::usage::db::open_db(dir.path()).unwrap();
let val: Option<i64> = conn
.query_row(
"SELECT first_response_ms FROM lsp_events LIMIT 1",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(val, Some(9100));
}
#[cfg(unix)]
#[test]
fn mux_failure_is_index_contention_detects_lock_signatures() {
use super::mux_failure_is_index_contention as is_contention;
assert!(is_contention(
"org.rocksdb.RocksDBException: While lock file: …/rocks/v492/LOCK: \
Resource temporarily unavailable"
));
assert!(is_contention(
"Resource temporarily unavailable (os error 11)"
));
assert!(is_contention("Error: another mux instance holds the lock"));
assert!(!is_contention("failed to spawn LSP server: kotlin-lsp"));
assert!(!is_contention(""));
}
#[cfg(unix)]
#[test]
fn mux_failure_report_surfaces_stderr_cause_with_index_hint() {
use super::mux_failure_report;
let tail = vec![
"Error: another mux instance holds the lock".to_string(),
"Caused by:".to_string(),
" Resource temporarily unavailable (os error 11)".to_string(),
];
let (message, hint) = mux_failure_report("", &tail);
assert!(message.starts_with("mux process failed to start:"));
assert!(
message.contains("Resource temporarily unavailable"),
"real cause must be surfaced, got: {message}"
);
assert!(
hint.contains("index is locked"),
"index-contention hint expected, got: {hint}"
);
}
#[cfg(unix)]
#[test]
fn mux_failure_report_handles_silent_exit_with_generic_hint() {
use super::mux_failure_report;
let (message, hint) = mux_failure_report("", &[]);
assert!(message.contains("no diagnostic output"), "got: {message}");
assert!(
hint.contains("another codescout mux isn't already running"),
"generic hint expected when no contention signature, got: {hint}"
);
}
#[cfg(unix)]
#[test]
fn mux_contention_report_round_trips_through_get_or_start_guard() {
use super::{mux_failure_is_index_contention, mux_failure_report};
let tail = vec!["org.rocksdb.RocksDBException: While lock file: \
…/analyzer/workspaces/<h>/rocks/v492/LOCK: \
Resource temporarily unavailable"
.to_string()];
let (message, hint) = mux_failure_report("", &tail);
let err = crate::tools::RecoverableError::with_hint(message, hint);
assert!(
mux_failure_is_index_contention(&err.to_string()),
"the error get_or_start_via_mux returns for a held RocksDB lock must itself \
trip get_or_start's contention guard (→ return Err, no direct fallback); got: {err}"
);
}
#[cfg(unix)]
#[test]
fn non_contention_mux_report_does_not_trip_get_or_start_guard() {
use super::{mux_failure_is_index_contention, mux_failure_report};
let (message, hint) =
mux_failure_report("", &["failed to spawn LSP server: kotlin-lsp".to_string()]);
let err = crate::tools::RecoverableError::with_hint(message, hint);
assert!(
!mux_failure_is_index_contention(&err.to_string()),
"a generic mux failure must NOT trip the contention guard (get_or_start should \
fall back to direct mode); got: {err}"
);
}
#[cfg(unix)]
#[test]
fn posix_write_lock_is_held_false_on_unlocked_file() {
let dir = tempfile::tempdir().unwrap();
let lock = dir.path().join("LOCK");
std::fs::write(&lock, b"").unwrap();
assert!(!super::posix_write_lock_is_held(&lock));
assert!(!super::posix_write_lock_is_held(&dir.path().join("nope")));
}
#[cfg(unix)]
#[test]
fn claim_mux_lock_some_when_free_none_when_held() {
use fs4::fs_std::FileExt;
let dir = tempfile::tempdir().unwrap();
let lock = dir.path().join("mux.lock");
let guard = super::claim_mux_lock(&lock).unwrap();
assert!(guard.is_some(), "a free flock should be claimable");
drop(guard);
let holder = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(&lock)
.unwrap();
holder.try_lock_exclusive().unwrap();
assert!(
super::claim_mux_lock(&lock).unwrap().is_none(),
"a held flock must read as None — no live-mux assumption"
);
drop(holder);
assert!(
super::claim_mux_lock(&lock).unwrap().is_some(),
"lock should be reclaimable after the holder releases"
);
}
#[cfg(unix)]
#[test]
fn mux_socket_unreachable_error_names_situation_and_routes_to_holder() {
let rendered = super::mux_socket_unreachable_error(
"kotlin",
std::path::Path::new("/run/user/1000/codescout-kotlin-mux-abc.sock"),
std::path::Path::new("/run/user/1000/codescout-kotlin-mux-abc.lock"),
)
.to_string();
assert!(
rendered.contains("wedged or mid-restart"),
"should name the wedged-mux situation: {rendered}"
);
assert!(
rendered.contains(".lock") && rendered.contains(".sock"),
"should cite both the lock and the socket: {rendered}"
);
assert!(
rendered.contains("fuser"),
"should route the human to the holder: {rendered}"
);
}
#[cfg(unix)]
#[test]
fn kotlin_index_lock_held_false_for_non_kotlin_and_missing_home() {
assert!(!super::kotlin_index_lock_held(
"rust",
std::path::Path::new("/tmp/cs-nonexistent-ws")
));
let dir = tempfile::tempdir().unwrap();
assert!(!super::kotlin_index_lock_held("kotlin", dir.path()));
}
#[cfg(unix)]
#[test]
#[ignore = "spawns a python3 fcntl holder; gated like the rust-analyzer mux test"]
fn posix_write_lock_is_held_true_when_another_process_holds_it() {
let dir = tempfile::tempdir().unwrap();
let lock = dir.path().join("LOCK");
std::fs::write(&lock, b"").unwrap();
let mut holder = std::process::Command::new("python3")
.arg("-c")
.arg(format!(
"import fcntl,time; f=open(r'{}', 'r+'); \
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB); \
print('held', flush=True); time.sleep(10)",
lock.display()
))
.stdout(std::process::Stdio::piped())
.spawn()
.expect("spawn python3 holder");
{
use std::io::Read;
let mut buf = [0u8; 4];
let _ = holder.stdout.as_mut().unwrap().read(&mut buf); }
let held = super::posix_write_lock_is_held(&lock);
let _ = holder.kill();
let _ = holder.wait();
assert!(
held,
"probe must detect a POSIX write-lock held by another process"
);
}
#[cfg(unix)]
#[tokio::test]
#[ignore = "spawns a python3 flock holder + drives the mux connect path; gated like the posix_write_lock tests"]
async fn get_or_start_via_mux_surfaces_wedged_error_when_flock_held_socket_absent() {
let dir = tempfile::tempdir().unwrap();
let ws = dir.path();
let lock_path = crate::lsp::mux::lock_path_for_workspace("rust", ws);
let sock_path = crate::lsp::mux::socket_path_for_workspace("rust", ws);
std::fs::write(&lock_path, b"").unwrap();
let _ = std::fs::remove_file(&sock_path);
let mut holder = std::process::Command::new("python3")
.arg("-c")
.arg(format!(
"import fcntl,time; f=open(r'{}', 'r+'); \
fcntl.flock(f, fcntl.LOCK_EX); \
print('held', flush=True); time.sleep(10)",
lock_path.display()
))
.stdout(std::process::Stdio::piped())
.spawn()
.expect("spawn python3 flock holder");
{
use std::io::Read;
let mut buf = [0u8; 4];
let _ = holder.stdout.as_mut().unwrap().read(&mut buf); }
let mgr = LspManager::new();
let config = LspServerConfig {
command: "rust-analyzer".into(),
args: vec![],
workspace_root: ws.to_path_buf(),
init_timeout: None,
mux: true,
env: vec![],
idle_timeout_secs: None,
};
let result = mgr.get_or_start_via_mux("rust", ws, config).await;
let _ = holder.kill();
let _ = holder.wait();
let msg = match result {
Ok(_) => panic!("held flock + absent socket must error, not connect or spawn"),
Err(e) => e.to_string(),
};
assert!(
msg.contains("wedged or mid-restart"),
"must surface the wedged-mux situation, got: {msg}"
);
assert!(
msg.contains("fuser"),
"must route the human to the lock holder, got: {msg}"
);
}
#[cfg(unix)]
#[tokio::test]
#[ignore = "spawns a python3 fcntl holder; gated like posix_write_lock tests"]
async fn reap_holders_of_lock_kills_an_orphan_holder() {
use std::io::Read;
let dir = tempfile::tempdir().unwrap();
let lock = dir.path().join("LOCK");
std::fs::write(&lock, b"").unwrap();
let mut holder = std::process::Command::new("python3")
.arg("-c")
.arg(format!(
"import fcntl,time; f=open(r'{}', 'r+'); \
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB); \
print('held', flush=True); time.sleep(30)",
lock.display()
))
.stdout(std::process::Stdio::piped())
.spawn()
.expect("spawn holder");
{
let mut b = [0u8; 4];
let _ = holder.stdout.as_mut().unwrap().read(&mut b);
}
assert!(super::posix_write_lock_is_held(&lock), "precondition: held");
let reaped = super::reap_holders_of_lock(&lock).await.expect("reap ok");
assert!(reaped, "should report a reap happened");
assert!(
!super::posix_write_lock_is_held(&lock),
"lock freed after reap"
);
let _ = holder.wait();
}
#[cfg(unix)]
#[tokio::test]
async fn reap_holders_of_lock_noop_when_unheld() {
let dir = tempfile::tempdir().unwrap();
let lock = dir.path().join("LOCK");
std::fs::write(&lock, b"").unwrap();
assert!(
!super::reap_holders_of_lock(&lock).await.unwrap(),
"no holder → no reap"
);
}
#[cfg(unix)]
#[test]
fn build_mux_args_includes_env_forwarding() {
use std::path::PathBuf;
let cfg = crate::lsp::client::LspServerConfig {
command: "fakelsp".into(),
args: vec!["--stdio".into()],
workspace_root: PathBuf::from("/tmp/ws"),
init_timeout: None,
mux: true,
env: vec![
("GRADLE_USER_HOME".into(), "/tmp/g".into()),
("FOO".into(), "bar".into()),
],
idle_timeout_secs: Some(123),
};
let args = crate::lsp::manager::build_mux_args(
&PathBuf::from("/tmp/ws"),
&PathBuf::from("/tmp/sock"),
&PathBuf::from("/tmp/lock"),
&cfg,
);
let idle_idx = args.iter().position(|a| a == "--idle-timeout").unwrap();
assert_eq!(args[idle_idx + 1], "123");
let dash_idx = args.iter().position(|a| a == "--").unwrap();
let env_args: Vec<_> = args[..dash_idx]
.iter()
.zip(args[1..dash_idx].iter())
.filter(|(a, _)| *a == "--env")
.map(|(_, b)| b.clone())
.collect();
assert!(env_args.contains(&"GRADLE_USER_HOME=/tmp/g".to_string()));
assert!(env_args.contains(&"FOO=bar".to_string()));
assert_eq!(args[dash_idx + 1], "fakelsp");
assert_eq!(args[dash_idx + 2], "--stdio");
}
#[cfg(unix)]
#[test]
fn build_mux_args_defaults_idle_timeout_to_300_when_none() {
use std::path::PathBuf;
let cfg = crate::lsp::client::LspServerConfig {
command: "x".into(),
args: vec![],
workspace_root: PathBuf::from("/tmp/ws"),
init_timeout: None,
mux: true,
env: vec![],
idle_timeout_secs: None,
};
let args = crate::lsp::manager::build_mux_args(
&PathBuf::from("/tmp/ws"),
&PathBuf::from("/tmp/sock"),
&PathBuf::from("/tmp/lock"),
&cfg,
);
let idle_idx = args.iter().position(|a| a == "--idle-timeout").unwrap();
assert_eq!(args[idle_idx + 1], "300");
}
#[test]
fn strip_deleted_suffix_recovers_rebuilt_path() {
assert_eq!(
strip_deleted_suffix(std::path::Path::new(
"/abs/target/release/codescout (deleted)"
)),
Some(std::path::PathBuf::from("/abs/target/release/codescout"))
);
assert_eq!(
strip_deleted_suffix(std::path::Path::new("/abs/target/release/codescout")),
None
);
}
#[test]
fn resolve_mux_flag_override_wins() {
assert!(!crate::lsp::manager::resolve_mux_flag(true, Some(false)));
assert!(crate::lsp::manager::resolve_mux_flag(false, Some(true)));
}
#[test]
fn resolve_mux_flag_none_uses_default() {
assert!(crate::lsp::manager::resolve_mux_flag(true, None));
assert!(!crate::lsp::manager::resolve_mux_flag(false, None));
}
#[test]
fn is_test_runner_exe_classifies_by_deps_location() {
use std::path::Path;
assert!(super::is_test_runner_exe(Path::new(
"/repo/target/debug/deps/codescout-3ba224a8427ce46d"
)));
assert!(super::is_test_runner_exe(Path::new(
"/repo/target/release/deps/some_integration_test-9f2a1b3c"
)));
assert!(!super::is_test_runner_exe(Path::new(
"/home/u/.cargo/bin/codescout"
)));
assert!(!super::is_test_runner_exe(Path::new(
"/repo/target/debug/codescout"
)));
assert!(!super::is_test_runner_exe(Path::new("/usr/bin/cargo")));
}
#[tokio::test]
async fn project_override_forces_direct_path_for_rust() {
let mgr = LspManager::new();
let dir = tempfile::tempdir().unwrap();
let default_mux = servers::default_config("rust", dir.path())
.map(|c| c.mux)
.unwrap_or(false);
let effective = resolve_mux_flag(default_mux, Some(false));
assert!(!effective, "project opt-out must force direct-process path");
let effective_default = resolve_mux_flag(default_mux, None);
assert_eq!(effective_default, default_mux);
drop(mgr);
}
}