Skip to main content

krait/lsp/
pool.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use anyhow::{bail, Context};
7use tokio::sync::{Mutex, OwnedMutexGuard};
8use tracing::{debug, info, warn};
9
10use super::client::LspClient;
11use super::diagnostics::DiagnosticStore;
12use super::files::FileTracker;
13use super::install;
14use crate::detect::{language_for_file, Language};
15
16/// Maximum crashes before permanent failure (`MultiRoot` only).
17const MAX_CRASHES: u32 = 5;
18
19/// Backoff delays (seconds) indexed by crash count (0-based). Last value is the cap.
20const BACKOFF_DELAYS_SECS: &[u64] = &[2, 4, 8, 16, 32, 60];
21
22/// After this many seconds of uptime, reset the crash counter.
23const STABILITY_RESET_SECS: u64 = 300; // 5 minutes
24
25/// Default max concurrent sessions for LRU fallback.
26/// Supports up to 15 language servers + spare capacity for LRU overflow.
27const DEFAULT_MAX_LRU_SESSIONS: usize = 20;
28
29/// LSP client and file tracker bundled together.
30pub struct LspSession {
31    pub client: LspClient,
32    pub file_tracker: FileTracker,
33}
34
35/// Per-server slot with lifecycle state.
36struct ServerSlot {
37    session: LspSession,
38    started_at: Instant,
39    last_used_at: Instant,
40    server_name: String,
41}
42
43/// Strategy for managing a language's LSP server(s).
44enum ServerStrategy {
45    /// One server, multiple workspace folders dynamically attached.
46    MultiRoot(Box<ServerSlot>),
47    /// Multiple servers (one per workspace root), LRU-evicted at cap.
48    LruPerRoot(HashMap<PathBuf, ServerSlot>),
49}
50
51/// Per-language mutable state — one per language, guarded by its own Mutex.
52///
53/// The outer `LspMultiplexer` holds one `Arc<Mutex<LanguageState>>` per language.
54/// Multiple tasks can hold different language locks concurrently.
55pub struct LanguageState {
56    /// `None` = not started yet (lazy init pending).
57    strategy: Option<ServerStrategy>,
58    crash_count: u32,
59    /// Set when the language has permanently failed (too many crashes).
60    pub failed: Option<String>,
61}
62
63impl LanguageState {
64    fn new() -> Self {
65        Self {
66            strategy: None,
67            crash_count: 0,
68            failed: None,
69        }
70    }
71
72    /// Get a mutable reference to any active session for this language.
73    pub fn session_mut(&mut self) -> Option<&mut LspSession> {
74        match &mut self.strategy {
75            Some(ServerStrategy::MultiRoot(slot)) => Some(&mut slot.session),
76            Some(ServerStrategy::LruPerRoot(slots)) => {
77                let (_, slot) = slots.iter_mut().max_by_key(|(_, s)| s.last_used_at)?;
78                slot.last_used_at = Instant::now();
79                Some(&mut slot.session)
80            }
81            None => None,
82        }
83    }
84
85    /// Whether a live session exists.
86    #[must_use]
87    pub fn is_ready(&self) -> bool {
88        match &self.strategy {
89            Some(ServerStrategy::MultiRoot(_)) => true,
90            Some(ServerStrategy::LruPerRoot(slots)) => !slots.is_empty(),
91            None => false,
92        }
93    }
94}
95
96/// Why a server is not ready for queries.
97#[derive(Debug, Clone)]
98pub enum NotReadyReason {
99    /// Server process has not been started yet (lazy init pending).
100    NotStarted,
101    /// Server crashed and exceeded max retries.
102    Failed(String),
103    /// No server configured for this language/scope.
104    NotFound,
105}
106
107impl std::fmt::Display for NotReadyReason {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        match self {
110            Self::NotStarted => write!(f, "server not yet started"),
111            Self::Failed(reason) => write!(f, "server failed: {reason}"),
112            Self::NotFound => write!(f, "no server configured for this scope"),
113        }
114    }
115}
116
117/// Status info for a language server.
118#[derive(Debug, serde::Serialize)]
119pub struct ServerStatus {
120    pub server_name: String,
121    pub language: String,
122    pub status: &'static str,
123    pub uptime_secs: u64,
124    pub open_files: usize,
125    pub attached_folders: usize,
126    pub total_folders: usize,
127}
128
129/// Readiness summary for the multiplexer.
130#[derive(Debug)]
131pub struct Readiness {
132    pub ready: usize,
133    pub total: usize,
134}
135
136impl Readiness {
137    #[must_use]
138    pub fn is_all_ready(&self) -> bool {
139        self.total > 0 && self.ready >= self.total
140    }
141}
142
143/// Config values that can be set after construction (written once, read many).
144struct PoolConfig {
145    max_lru_sessions: usize,
146    priority_roots: HashSet<PathBuf>,
147    /// Max concurrent language server processes across all languages.
148    /// `None` = unlimited (default).
149    max_language_servers: Option<usize>,
150}
151
152/// Manages LSP server instances — one per language (multi-root) or per-root (LRU fallback).
153///
154/// **Concurrency model**: each language has its own `Arc<Mutex<LanguageState>>`.
155/// Callers acquire a per-language `OwnedMutexGuard` for the duration of LSP I/O.
156/// Different languages can be queried concurrently — no global lock is held during I/O.
157///
158/// **Lazy startup**: servers are started on first query, not at daemon boot.
159/// **Crash recovery**: exponential backoff for multi-root, silent removal for LRU.
160pub struct LspMultiplexer {
161    /// Per-language locks. Map is immutable after construction — only inner state changes.
162    languages: HashMap<Language, Arc<Mutex<LanguageState>>>,
163    /// Read-only after daemon start. Never mutated after `new()` is called.
164    pub project_root: PathBuf,
165    pub workspace_roots: Vec<(Language, PathBuf)>,
166    /// Config settable via `set_max_lru_sessions` / `set_priority_roots` (written once).
167    config: std::sync::RwLock<PoolConfig>,
168    /// Optional diagnostic store — set once after construction via `set_diagnostic_store()`.
169    diagnostic_store: std::sync::OnceLock<Arc<DiagnosticStore>>,
170    /// Tracks last-used timestamp per language for global LRU eviction.
171    last_used: std::sync::RwLock<HashMap<Language, Instant>>,
172}
173
174/// Backward-compatible type alias.
175pub type LspPool = LspMultiplexer;
176
177impl LspMultiplexer {
178    /// Create a new multiplexer for a project.
179    ///
180    /// Pre-populates per-language slots for all known languages so the
181    /// `languages` map never needs to grow after construction.
182    #[must_use]
183    pub fn new(project_root: PathBuf, workspace_roots: Vec<(Language, PathBuf)>) -> Self {
184        // Pre-populate one slot per unique language
185        let unique_langs: HashSet<Language> = workspace_roots.iter().map(|(l, _)| *l).collect();
186        let languages = unique_langs
187            .into_iter()
188            .map(|l| (l, Arc::new(Mutex::new(LanguageState::new()))))
189            .collect();
190
191        Self {
192            languages,
193            project_root,
194            workspace_roots,
195            config: std::sync::RwLock::new(PoolConfig {
196                max_lru_sessions: DEFAULT_MAX_LRU_SESSIONS,
197                priority_roots: HashSet::new(),
198                max_language_servers: None,
199            }),
200            diagnostic_store: std::sync::OnceLock::new(),
201            last_used: std::sync::RwLock::new(HashMap::new()),
202        }
203    }
204
205    /// Attach a diagnostic store so all new LSP clients collect diagnostics.
206    ///
207    /// Must be called before the first query. Subsequent calls are no-ops.
208    pub fn set_diagnostic_store(&self, store: Arc<DiagnosticStore>) {
209        let _ = self.diagnostic_store.set(store);
210    }
211
212    /// Set the maximum number of concurrent LRU sessions (from config).
213    pub fn set_max_lru_sessions(&self, max: usize) {
214        if let Ok(mut cfg) = self.config.write() {
215            cfg.max_lru_sessions = max;
216        }
217    }
218
219    /// Set the maximum number of concurrent language server processes (from config).
220    pub fn set_max_language_servers(&self, max: usize) {
221        if let Ok(mut cfg) = self.config.write() {
222            cfg.max_language_servers = Some(max);
223        }
224    }
225
226    /// Set priority workspace roots that are exempt from LRU eviction.
227    pub fn set_priority_roots(&self, roots: HashSet<PathBuf>) {
228        if let Ok(mut cfg) = self.config.write() {
229            cfg.priority_roots = roots;
230        }
231    }
232
233    /// Get the priority workspace roots.
234    #[must_use]
235    pub fn priority_roots(&self) -> HashSet<PathBuf> {
236        self.config
237            .read()
238            .map_or_else(|_| HashSet::new(), |cfg| cfg.priority_roots.clone())
239    }
240
241    /// Acquire a per-language guard for the given language, booting the server if needed.
242    ///
243    /// The returned guard holds the per-language mutex for the duration of LSP I/O.
244    /// Other languages remain accessible concurrently.
245    ///
246    /// # Errors
247    /// Returns an error if the server cannot be started or has permanently failed.
248    pub async fn get_or_start(
249        &self,
250        lang: Language,
251    ) -> anyhow::Result<OwnedMutexGuard<LanguageState>> {
252        let lock = self.language_lock(lang)?;
253        let mut guard = lock.lock_owned().await;
254
255        self.ensure_running(&mut guard, lang).await?;
256
257        // For empty LRU pool, boot the initial root
258        if Self::is_lru_empty(&guard) {
259            let root = self.initial_root(lang);
260            self.boot_lru_session(&mut guard, lang, &root).await?;
261        }
262
263        // Touch last-used for global LRU tracking
264        self.touch_language(lang);
265
266        Ok(guard)
267    }
268
269    /// Route a file to its language server, attaching the workspace folder if needed.
270    ///
271    /// Returns a per-language guard that the caller holds during LSP I/O.
272    ///
273    /// # Errors
274    /// Returns an error if language cannot be detected or server fails to start.
275    pub async fn route_for_file(
276        &self,
277        file_path: &Path,
278    ) -> anyhow::Result<OwnedMutexGuard<LanguageState>> {
279        let lang = language_for_file(file_path)
280            .ok_or_else(|| anyhow::anyhow!("unknown language for {}", file_path.display()))?;
281
282        let root = self
283            .find_nearest_workspace(file_path, lang)
284            .unwrap_or_else(|| self.project_root.clone());
285
286        let lock = self.language_lock(lang)?;
287        let mut guard = lock.lock_owned().await;
288
289        self.ensure_running(&mut guard, lang).await?;
290        self.route_with_root(&mut guard, lang, &root).await?;
291
292        // Touch last-used for global LRU tracking
293        self.touch_language(lang);
294
295        Ok(guard)
296    }
297
298    /// Attach all discovered workspace folders for a language.
299    ///
300    /// Acquires the per-language lock internally.
301    ///
302    /// # Errors
303    /// Returns an error if the server is not running or attachment fails.
304    pub async fn attach_all_workspaces(&self, lang: Language) -> anyhow::Result<()> {
305        let lock = self.language_lock(lang)?;
306        let mut guard = lock.lock_owned().await;
307        self.attach_all_workspaces_inner(&mut guard, lang).await
308    }
309
310    /// Attach all workspaces using an already-held language guard.
311    ///
312    /// Use this variant when you already hold the per-language lock.
313    ///
314    /// # Errors
315    /// Returns an error if attachment fails.
316    pub async fn attach_all_workspaces_with_guard(
317        &self,
318        lang: Language,
319        guard: &mut OwnedMutexGuard<LanguageState>,
320    ) -> anyhow::Result<()> {
321        self.attach_all_workspaces_inner(guard, lang).await
322    }
323
324    /// Find the nearest discovered workspace root for a file.
325    #[must_use]
326    pub fn find_nearest_workspace(&self, file_path: &Path, lang: Language) -> Option<PathBuf> {
327        self.workspace_roots
328            .iter()
329            .filter(|(l, _)| *l == lang)
330            .filter(|(_, root)| file_path.starts_with(root))
331            .max_by_key(|(_, root)| root.components().count())
332            .map(|(_, root)| root.clone())
333    }
334
335    /// Get all unique languages detected in the project.
336    #[must_use]
337    pub fn unique_languages(&self) -> Vec<Language> {
338        let mut langs: Vec<Language> = self
339            .workspace_roots
340            .iter()
341            .map(|(l, _)| *l)
342            .collect::<HashSet<_>>()
343            .into_iter()
344            .collect();
345        langs.sort_by_key(|l| l.name());
346        langs
347    }
348
349    /// Pre-warm LRU sessions for priority workspace roots.
350    ///
351    /// # Errors
352    /// Returns an error if a session fails to boot (non-fatal, logged by caller).
353    pub async fn warm_priority_roots(&self) -> anyhow::Result<()> {
354        let roots_to_warm: Vec<(Language, PathBuf)> = {
355            let cfg = self
356                .config
357                .read()
358                .map_err(|_| anyhow::anyhow!("config lock poisoned"))?;
359            self.workspace_roots
360                .iter()
361                .filter(|(_, root)| cfg.priority_roots.contains(root))
362                .cloned()
363                .collect()
364        };
365
366        for (lang, root) in &roots_to_warm {
367            // Only meaningful for LRU strategy
368            let Ok(lock) = self.language_lock(*lang) else {
369                continue;
370            };
371            let mut guard = lock.lock_owned().await;
372
373            // Skip if not LRU or already warm
374            let is_lru = matches!(guard.strategy, Some(ServerStrategy::LruPerRoot(_)));
375            if !is_lru {
376                continue;
377            }
378            let already_warm = match &guard.strategy {
379                Some(ServerStrategy::LruPerRoot(slots)) => slots.contains_key(root),
380                _ => true,
381            };
382            if already_warm {
383                continue;
384            }
385
386            info!("pre-warming priority workspace: {lang}:{}", root.display());
387            self.boot_lru_session(&mut guard, *lang, root).await?;
388        }
389        Ok(())
390    }
391
392    /// Get all active (running) languages.
393    #[must_use]
394    pub fn active_languages(&self) -> Vec<Language> {
395        let mut langs: Vec<Language> = self
396            .languages
397            .iter()
398            .filter(|(_, lock)| {
399                lock.try_lock().map_or(true, |g| g.is_ready()) // treat "in use" as active
400            })
401            .map(|(l, _)| *l)
402            .collect();
403        langs.sort_by_key(|l| l.name());
404        langs
405    }
406
407    /// Get status info for all known languages. Uses `try_lock` — busy languages show as "ready".
408    #[must_use]
409    pub fn status(&self) -> Vec<ServerStatus> {
410        let mut statuses = Vec::new();
411        let mut seen = HashSet::new();
412
413        for (lang, _) in &self.workspace_roots {
414            if !seen.insert(lang) {
415                continue;
416            }
417
418            let total_folders = self
419                .workspace_roots
420                .iter()
421                .filter(|(l, _)| l == lang)
422                .count();
423
424            let Some(lock) = self.languages.get(lang) else {
425                statuses.push(pending_status(*lang, total_folders));
426                continue;
427            };
428
429            match lock.try_lock() {
430                Ok(guard) => {
431                    statuses.push(slot_status(*lang, &guard, total_folders));
432                }
433                Err(_) => {
434                    // Lock is held — server is busy (actively processing a query)
435                    statuses.push(ServerStatus {
436                        server_name: default_server_name(*lang),
437                        language: lang.name().to_string(),
438                        status: "ready",
439                        uptime_secs: 0,
440                        open_files: 0,
441                        attached_folders: 0,
442                        total_folders,
443                    });
444                }
445            }
446        }
447
448        statuses
449    }
450
451    /// Readiness summary: how many languages have running servers.
452    #[must_use]
453    pub fn readiness(&self) -> Readiness {
454        let unique_langs: HashSet<Language> =
455            self.workspace_roots.iter().map(|(l, _)| *l).collect();
456        let ready = self
457            .languages
458            .iter()
459            .filter(|(_, lock)| lock.try_lock().map_or(true, |g| g.is_ready()))
460            .count();
461        Readiness {
462            ready,
463            total: unique_langs.len(),
464        }
465    }
466
467    /// Check if a language has a running session (best-effort, non-blocking).
468    #[must_use]
469    pub fn is_ready(&self, lang: Language) -> bool {
470        self.languages
471            .get(&lang)
472            .and_then(|l| l.try_lock().ok())
473            .is_some_and(|g| g.is_ready())
474    }
475
476    /// Get all known workspace roots.
477    #[must_use]
478    pub fn workspace_roots(&self) -> &[(Language, PathBuf)] {
479        &self.workspace_roots
480    }
481
482    /// Get the project root.
483    #[must_use]
484    pub fn project_root(&self) -> &Path {
485        &self.project_root
486    }
487
488    /// Shut down all active LSP sessions.
489    pub async fn shutdown_all(&self) {
490        for (lang, lock) in &self.languages {
491            let mut guard = lock.lock().await;
492            match guard.strategy.take() {
493                Some(ServerStrategy::MultiRoot(mut slot)) => {
494                    shutdown_slot(*lang, None, &mut slot).await;
495                }
496                Some(ServerStrategy::LruPerRoot(slots)) => {
497                    for (root, mut slot) in slots {
498                        shutdown_slot(*lang, Some(&root), &mut slot).await;
499                    }
500                }
501                None => {}
502            }
503        }
504    }
505
506    // ─── Internal helpers ──────────────────────────────────────────────
507
508    /// Record that a language was just used (for global LRU tracking).
509    fn touch_language(&self, lang: Language) {
510        if let Ok(mut map) = self.last_used.write() {
511            map.insert(lang, Instant::now());
512        }
513    }
514
515    /// Count how many languages currently have running servers.
516    fn active_language_count(&self) -> usize {
517        self.languages
518            .iter()
519            .filter(|(_, lock)| lock.try_lock().map_or(true, |g| g.is_ready()))
520            .count()
521    }
522
523    /// If the global language-server cap is reached, evict the least-recently-used language.
524    ///
525    /// This shuts down the entire language server (all sessions) for the evicted language.
526    /// The evicted language will be re-booted on next query (lazy init).
527    async fn evict_global_lru_if_needed(&self, current_lang: Language) {
528        let max = match self.config.read() {
529            Ok(cfg) => cfg.max_language_servers,
530            Err(_) => return,
531        };
532        let Some(max) = max else { return };
533
534        let active = self.active_language_count();
535        if active < max {
536            return;
537        }
538
539        // Find the LRU language (not current, not in-use)
540        let victim = {
541            let Ok(last_used) = self.last_used.read() else {
542                return;
543            };
544            self.languages
545                .keys()
546                .filter(|&&l| l != current_lang)
547                .filter(|l| {
548                    self.languages
549                        .get(l)
550                        .and_then(|lock| lock.try_lock().ok())
551                        .is_some_and(|g| g.is_ready())
552                })
553                .min_by_key(|l| last_used.get(l).copied().unwrap_or(Instant::now()))
554                .copied()
555        };
556
557        if let Some(victim_lang) = victim {
558            info!(
559                "global LRU: evicting {victim_lang} (cap={max}, active={active}) to make room for {current_lang}"
560            );
561            if let Err(e) = self.restart_language(victim_lang).await {
562                warn!("global LRU eviction of {victim_lang} failed: {e}");
563            }
564            // Remove from last_used tracking
565            if let Ok(mut map) = self.last_used.write() {
566                map.remove(&victim_lang);
567            }
568        }
569    }
570
571    /// Shut down and clear the server for a language so it will be re-booted on next query.
572    ///
573    /// # Errors
574    /// Returns `Err` if the language is not registered in this multiplexer.
575    pub async fn restart_language(&self, lang: Language) -> anyhow::Result<()> {
576        let lock = self.language_lock(lang)?;
577        let mut guard = lock.lock().await;
578        match guard.strategy.take() {
579            Some(ServerStrategy::MultiRoot(mut slot)) => {
580                shutdown_slot(lang, None, &mut slot).await;
581            }
582            Some(ServerStrategy::LruPerRoot(slots)) => {
583                for (root, mut slot) in slots {
584                    shutdown_slot(lang, Some(&root), &mut slot).await;
585                }
586            }
587            None => {}
588        }
589        guard.failed = None;
590        guard.crash_count = 0;
591        Ok(())
592    }
593
594    /// Get the `Arc<Mutex<LanguageState>>` for a language.
595    ///
596    /// # Errors
597    /// Returns `Err` if the language has no registered slot in this multiplexer.
598    pub fn language_lock(&self, lang: Language) -> anyhow::Result<Arc<Mutex<LanguageState>>> {
599        self.languages
600            .get(&lang)
601            .cloned()
602            .ok_or_else(|| anyhow::anyhow!("no language slot for {lang}"))
603    }
604
605    /// Ensure a server for the given language is healthy or boot one.
606    /// Caller must already hold the per-language guard.
607    async fn ensure_running(
608        &self,
609        state: &mut LanguageState,
610        lang: Language,
611    ) -> anyhow::Result<()> {
612        if let Some(ref reason) = state.failed {
613            bail!("LSP {lang} permanently failed: {reason}");
614        }
615
616        // No strategy yet → first boot
617        if state.strategy.is_none() {
618            // Check global language-server cap before booting
619            self.evict_global_lru_if_needed(lang).await;
620            let root = self.initial_root(lang);
621            return self.boot_first_server(state, lang, &root).await;
622        }
623
624        // Health check
625        let action = check_health(state, lang);
626        match action {
627            HealthAction::Healthy | HealthAction::LruCleaned => Ok(()),
628            HealthAction::MultiRootCrashed { crash_count } => {
629                if crash_count >= MAX_CRASHES {
630                    let reason = format!("crashed {crash_count} times, giving up");
631                    state.failed = Some(reason.clone());
632                    tracing::error!("LSP {lang} permanently failed: {reason}");
633                    bail!("LSP {lang} permanently failed: {reason}");
634                }
635                state.crash_count = crash_count;
636                state.strategy = None;
637                let delay_secs = BACKOFF_DELAYS_SECS
638                    .get(crash_count.saturating_sub(1) as usize)
639                    .copied()
640                    .unwrap_or(60);
641                warn!("LSP {lang} crashed ({crash_count}×), restarting in {delay_secs}s");
642                tokio::time::sleep(Duration::from_secs(delay_secs)).await;
643                let root = self.initial_root(lang);
644                self.boot_first_server(state, lang, &root).await
645            }
646        }
647    }
648
649    /// Boot the first server for a language and determine the strategy.
650    async fn boot_first_server(
651        &self,
652        state: &mut LanguageState,
653        lang: Language,
654        workspace_root: &Path,
655    ) -> anyhow::Result<()> {
656        let slot = self.boot_slot(lang, workspace_root).await?;
657
658        let is_multi_root = slot.session.client.supports_workspace_folders();
659        let strategy_name = if is_multi_root { "multi-root" } else { "LRU" };
660        info!(
661            "multiplexer: {lang} ({}) using {strategy_name} strategy",
662            slot.server_name
663        );
664
665        if is_multi_root {
666            state.strategy = Some(ServerStrategy::MultiRoot(Box::new(slot)));
667        } else {
668            let mut slots = HashMap::new();
669            slots.insert(workspace_root.to_path_buf(), slot);
670            state.strategy = Some(ServerStrategy::LruPerRoot(slots));
671        }
672        Ok(())
673    }
674
675    /// Boot a new LSP session for an LRU slot.
676    async fn boot_lru_session(
677        &self,
678        state: &mut LanguageState,
679        lang: Language,
680        workspace_root: &Path,
681    ) -> anyhow::Result<()> {
682        // Evict if at cap
683        let max_sessions = self
684            .config
685            .read()
686            .map(|c| c.max_lru_sessions)
687            .unwrap_or(DEFAULT_MAX_LRU_SESSIONS);
688
689        let needs_evict = match &state.strategy {
690            Some(ServerStrategy::LruPerRoot(slots)) => slots.len() >= max_sessions,
691            _ => false,
692        };
693        if needs_evict {
694            self.evict_lru(state, lang).await?;
695        }
696
697        let slot = self.boot_slot(lang, workspace_root).await?;
698        info!(
699            "multiplexer: LRU {lang} ({}) @ {} ready",
700            slot.server_name,
701            workspace_root.display()
702        );
703
704        match &mut state.strategy {
705            Some(ServerStrategy::LruPerRoot(slots)) => {
706                slots.insert(workspace_root.to_path_buf(), slot);
707            }
708            _ => bail!("expected LRU strategy for {lang}"),
709        }
710        Ok(())
711    }
712
713    /// Boot a single LSP server slot (shared by both strategies).
714    async fn boot_slot(&self, lang: Language, workspace_root: &Path) -> anyhow::Result<ServerSlot> {
715        let (binary_path, entry) = install::ensure_server(lang).await?;
716
717        let mut client =
718            LspClient::start_with_binary(&binary_path, entry.args, lang, workspace_root)
719                .map_err(|e| anyhow::anyhow!("{e}"))?;
720
721        if let Some(store) = self.diagnostic_store.get() {
722            client.set_diagnostic_store(Arc::clone(store));
723        }
724
725        client
726            .initialize(workspace_root)
727            .await
728            .with_context(|| format!("LSP initialize failed for {lang}"))?;
729
730        let server_name = client.server_name().to_string();
731
732        let mut file_tracker = FileTracker::new(lang);
733        if let Some(warmup_file) = find_warmup_file(workspace_root, lang) {
734            if let Err(e) = file_tracker
735                .ensure_open(&warmup_file, client.transport_mut())
736                .await
737            {
738                debug!("warmup file open failed (non-fatal): {e}");
739            } else {
740                debug!("warmup: opened {}", warmup_file.display());
741                probe_until_ready(&mut client, &warmup_file).await;
742            }
743        }
744
745        let now = Instant::now();
746        Ok(ServerSlot {
747            session: LspSession {
748                client,
749                file_tracker,
750            },
751            started_at: now,
752            last_used_at: now,
753            server_name,
754        })
755    }
756
757    /// Evict the least-recently-used LRU session for a language.
758    async fn evict_lru(&self, state: &mut LanguageState, lang: Language) -> anyhow::Result<()> {
759        let priority_roots = self
760            .config
761            .read()
762            .map(|c| c.priority_roots.clone())
763            .unwrap_or_default();
764
765        let oldest_root = match &state.strategy {
766            Some(ServerStrategy::LruPerRoot(slots)) => slots
767                .iter()
768                .filter(|(root, _)| !priority_roots.contains(*root))
769                .min_by_key(|(_, s)| s.last_used_at)
770                .map(|(root, _)| root.clone()),
771            _ => None,
772        };
773
774        if oldest_root.is_none() {
775            if let Some(ServerStrategy::LruPerRoot(slots)) = &state.strategy {
776                if !slots.is_empty() {
777                    warn!(
778                        "all {} LRU sessions for {lang} are priority — exceeding cap",
779                        slots.len()
780                    );
781                }
782            }
783            return Ok(());
784        }
785
786        if let Some(root) = oldest_root {
787            if let Some(ServerStrategy::LruPerRoot(slots)) = &mut state.strategy {
788                if let Some(mut slot) = slots.remove(&root) {
789                    info!("evicting LRU session for {lang}:{}", root.display());
790                    shutdown_slot(lang, Some(&root), &mut slot).await;
791                }
792            }
793        }
794        Ok(())
795    }
796
797    /// Route to the correct session for a language + workspace root.
798    async fn route_with_root(
799        &self,
800        state: &mut LanguageState,
801        lang: Language,
802        root: &Path,
803    ) -> anyhow::Result<()> {
804        match &state.strategy {
805            Some(ServerStrategy::MultiRoot(_)) => {
806                // Attach folder if not already attached
807                let Some(ServerStrategy::MultiRoot(slot)) = &mut state.strategy else {
808                    anyhow::bail!("unexpected server strategy for {lang}")
809                };
810                if !slot.session.client.is_folder_attached(root) {
811                    slot.session.client.attach_folder(root).await?;
812                }
813                Ok(())
814            }
815            Some(ServerStrategy::LruPerRoot(_)) => {
816                let needs_boot = match &state.strategy {
817                    Some(ServerStrategy::LruPerRoot(slots)) => !slots.contains_key(root),
818                    _ => false,
819                };
820                if needs_boot {
821                    self.boot_lru_session(state, lang, root).await?;
822                }
823                // Touch last_used_at
824                if let Some(ServerStrategy::LruPerRoot(slots)) = &mut state.strategy {
825                    if let Some(slot) = slots.get_mut(root) {
826                        slot.last_used_at = Instant::now();
827                    }
828                }
829                Ok(())
830            }
831            None => bail!("no server for {lang}"),
832        }
833    }
834
835    /// Attach all workspace folders to a multi-root server.
836    async fn attach_all_workspaces_inner(
837        &self,
838        state: &mut LanguageState,
839        lang: Language,
840    ) -> anyhow::Result<()> {
841        // Only meaningful for MultiRoot
842        if !matches!(state.strategy, Some(ServerStrategy::MultiRoot(_))) {
843            return Ok(());
844        }
845
846        let roots: Vec<PathBuf> = self
847            .workspace_roots
848            .iter()
849            .filter(|(l, _)| *l == lang)
850            .map(|(_, r)| r.clone())
851            .collect();
852
853        if let Some(ServerStrategy::MultiRoot(slot)) = &mut state.strategy {
854            for root in &roots {
855                if !slot.session.client.is_folder_attached(root) {
856                    slot.session.client.attach_folder(root).await?;
857                }
858            }
859        }
860        Ok(())
861    }
862
863    /// Check if a language uses LRU strategy with an empty pool.
864    fn is_lru_empty(state: &LanguageState) -> bool {
865        matches!(
866            &state.strategy,
867            Some(ServerStrategy::LruPerRoot(slots)) if slots.is_empty()
868        )
869    }
870
871    /// Get the first workspace root for a language, or fall back to project root.
872    fn initial_root(&self, lang: Language) -> PathBuf {
873        self.workspace_roots
874            .iter()
875            .find(|(l, _)| *l == lang)
876            .map_or_else(|| self.project_root.clone(), |(_, r)| r.clone())
877    }
878}
879
880/// Result of checking strategy health.
881enum HealthAction {
882    Healthy,
883    MultiRootCrashed { crash_count: u32 },
884    LruCleaned,
885}
886
887/// Check the health of a language state (does not hold outer pool lock).
888fn check_health(state: &mut LanguageState, lang: Language) -> HealthAction {
889    match &mut state.strategy {
890        Some(ServerStrategy::MultiRoot(slot)) => {
891            if slot.session.client.transport_mut().is_alive() {
892                if slot.started_at.elapsed().as_secs() >= STABILITY_RESET_SECS {
893                    state.crash_count = 0;
894                }
895                HealthAction::Healthy
896            } else {
897                state.crash_count += 1;
898                let crash_count = state.crash_count;
899                warn!("LSP {lang} crashed (count: {crash_count})");
900                HealthAction::MultiRootCrashed { crash_count }
901            }
902        }
903        Some(ServerStrategy::LruPerRoot(slots)) => {
904            let mut dead = Vec::new();
905            for (root, slot) in slots.iter_mut() {
906                if !slot.session.client.transport_mut().is_alive() {
907                    dead.push(root.clone());
908                }
909            }
910            for r in &dead {
911                warn!("LRU session for {lang}:{} crashed, removed", r.display());
912                slots.remove(r);
913            }
914            HealthAction::LruCleaned
915        }
916        None => HealthAction::Healthy,
917    }
918}
919
920/// Build a pending `ServerStatus` for a language.
921fn pending_status(lang: Language, total_folders: usize) -> ServerStatus {
922    ServerStatus {
923        server_name: default_server_name(lang),
924        language: lang.name().to_string(),
925        status: "pending",
926        uptime_secs: 0,
927        open_files: 0,
928        attached_folders: 0,
929        total_folders,
930    }
931}
932
933/// Build a `ServerStatus` from a `LanguageState` guard.
934fn slot_status(lang: Language, state: &LanguageState, total_folders: usize) -> ServerStatus {
935    match &state.strategy {
936        Some(ServerStrategy::MultiRoot(slot)) => ServerStatus {
937            server_name: slot.server_name.clone(),
938            language: lang.name().to_string(),
939            status: "ready",
940            uptime_secs: slot.started_at.elapsed().as_secs(),
941            open_files: slot.session.file_tracker.open_count(),
942            attached_folders: slot.session.client.attached_folders().len(),
943            total_folders,
944        },
945        Some(ServerStrategy::LruPerRoot(slots)) if !slots.is_empty() => {
946            let total_files: usize = slots
947                .values()
948                .map(|s| s.session.file_tracker.open_count())
949                .sum();
950            let oldest = slots
951                .values()
952                .map(|s| s.started_at)
953                .min()
954                .unwrap_or_else(Instant::now);
955            let name = slots
956                .values()
957                .next()
958                .map_or_else(|| default_server_name(lang), |s| s.server_name.clone());
959            ServerStatus {
960                server_name: name,
961                language: lang.name().to_string(),
962                status: "ready",
963                uptime_secs: oldest.elapsed().as_secs(),
964                open_files: total_files,
965                attached_folders: slots.len(),
966                total_folders,
967            }
968        }
969        _ => {
970            let status = if state.failed.is_some() {
971                "failed"
972            } else {
973                "pending"
974            };
975            ServerStatus {
976                server_name: default_server_name(lang),
977                language: lang.name().to_string(),
978                status,
979                uptime_secs: 0,
980                open_files: 0,
981                attached_folders: 0,
982                total_folders,
983            }
984        }
985    }
986}
987
988/// Gracefully shut down a server slot.
989async fn shutdown_slot(lang: Language, root: Option<&Path>, slot: &mut ServerSlot) {
990    let _ = slot
991        .session
992        .file_tracker
993        .close_all(slot.session.client.transport_mut())
994        .await;
995    let label = root.map_or_else(String::new, |r| format!(":{}", r.display()));
996    if let Err(e) = slot.session.client.shutdown().await {
997        warn!("LSP shutdown error for {lang}{label}: {e}");
998    }
999}
1000
1001/// Get the default server binary name for a language.
1002fn default_server_name(lang: Language) -> String {
1003    use super::registry::get_entry;
1004    get_entry(lang).map_or_else(|| lang.name().to_string(), |e| e.binary_name.to_string())
1005}
1006
1007/// Probe the LSP server with documentSymbol until it responds or max attempts reached.
1008///
1009/// Uses `wait_for_response_with_timeout` so every response is consumed internally —
1010/// no orphaned responses in the transport pipe. Budget: 5 × (2s + 500ms) ≈ 12.5s max.
1011async fn probe_until_ready(client: &mut LspClient, warmup_file: &std::path::Path) {
1012    use super::client::path_to_uri;
1013
1014    const PROBE_TIMEOUT: Duration = Duration::from_secs(2);
1015    const RETRY_DELAY: Duration = Duration::from_millis(500);
1016    const MAX_ATTEMPTS: u8 = 5;
1017
1018    let uri = match path_to_uri(warmup_file) {
1019        Ok(u) => u,
1020        Err(e) => {
1021            debug!("probe_until_ready: could not get URI: {e}");
1022            return;
1023        }
1024    };
1025
1026    for attempt in 0..MAX_ATTEMPTS {
1027        let probe = client
1028            .transport_mut()
1029            .send_request(
1030                "textDocument/documentSymbol",
1031                serde_json::json!({ "textDocument": { "uri": uri.as_str() } }),
1032            )
1033            .await;
1034        if let Ok(req_id) = probe {
1035            match client
1036                .wait_for_response_with_timeout(req_id, PROBE_TIMEOUT)
1037                .await
1038            {
1039                Ok(resp) if resp != serde_json::Value::Null => {
1040                    debug!("probe_until_ready: ready after {} attempts", attempt + 1);
1041                    return;
1042                }
1043                Ok(_) => {
1044                    debug!(
1045                        "probe_until_ready: null response on attempt {}",
1046                        attempt + 1
1047                    );
1048                }
1049                Err(e) => {
1050                    debug!("probe_until_ready: attempt {} failed: {e}", attempt + 1);
1051                }
1052            }
1053        }
1054        tokio::time::sleep(RETRY_DELAY).await;
1055    }
1056    debug!("probe_until_ready: giving up after {MAX_ATTEMPTS} attempts");
1057}
1058
1059/// Find a single representative source file to open for warmup.
1060fn find_warmup_file(workspace_root: &Path, lang: Language) -> Option<PathBuf> {
1061    let extensions = lang.extensions();
1062    let search_dirs = [
1063        workspace_root.join("src"),
1064        workspace_root.join("lib"),
1065        workspace_root.to_path_buf(),
1066    ];
1067
1068    for dir in &search_dirs {
1069        if let Some(f) = find_first_source_file(dir, extensions) {
1070            return Some(f);
1071        }
1072    }
1073    None
1074}
1075
1076fn find_first_source_file(dir: &Path, extensions: &[&str]) -> Option<PathBuf> {
1077    if !dir.is_dir() {
1078        return None;
1079    }
1080    let Ok(entries) = std::fs::read_dir(dir) else {
1081        return None;
1082    };
1083    for entry in entries.filter_map(Result::ok) {
1084        let path = entry.path();
1085        if path.is_file() {
1086            if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
1087                if extensions.contains(&ext) {
1088                    return Some(path);
1089                }
1090            }
1091        }
1092    }
1093    None
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098    use super::*;
1099
1100    #[test]
1101    fn multiplexer_starts_empty() {
1102        let mux = LspMultiplexer::new(PathBuf::from("/tmp"), vec![]);
1103        assert!(mux.active_languages().is_empty());
1104        assert!(mux.status().is_empty());
1105    }
1106
1107    #[test]
1108    fn not_ready_reason_display() {
1109        assert_eq!(
1110            NotReadyReason::NotStarted.to_string(),
1111            "server not yet started"
1112        );
1113        assert_eq!(
1114            NotReadyReason::Failed("crashed".to_string()).to_string(),
1115            "server failed: crashed"
1116        );
1117        assert_eq!(
1118            NotReadyReason::NotFound.to_string(),
1119            "no server configured for this scope"
1120        );
1121    }
1122
1123    #[test]
1124    fn readiness_tracks_unique_languages() {
1125        let roots = vec![
1126            (Language::TypeScript, PathBuf::from("/project/packages/api")),
1127            (Language::TypeScript, PathBuf::from("/project/packages/web")),
1128            (Language::Rust, PathBuf::from("/project")),
1129        ];
1130        let mux = LspMultiplexer::new(PathBuf::from("/project"), roots);
1131        let r = mux.readiness();
1132        assert_eq!(r.ready, 0);
1133        assert_eq!(r.total, 2);
1134        assert!(!r.is_all_ready());
1135    }
1136
1137    #[test]
1138    fn unique_languages_deduplicates() {
1139        let roots = vec![
1140            (Language::TypeScript, PathBuf::from("/project/packages/api")),
1141            (Language::TypeScript, PathBuf::from("/project/packages/web")),
1142            (Language::Rust, PathBuf::from("/project")),
1143        ];
1144        let mux = LspMultiplexer::new(PathBuf::from("/project"), roots);
1145        let langs = mux.unique_languages();
1146        assert_eq!(langs.len(), 2);
1147    }
1148
1149    #[test]
1150    fn status_shows_pending_with_folder_counts() {
1151        let roots = vec![
1152            (Language::TypeScript, PathBuf::from("/project/packages/api")),
1153            (Language::TypeScript, PathBuf::from("/project/packages/web")),
1154        ];
1155        let mux = LspMultiplexer::new(PathBuf::from("/project"), roots);
1156        let statuses = mux.status();
1157        assert_eq!(statuses.len(), 1);
1158        assert_eq!(statuses[0].status, "pending");
1159        assert_eq!(statuses[0].total_folders, 2);
1160        assert_eq!(statuses[0].attached_folders, 0);
1161    }
1162
1163    #[test]
1164    fn find_nearest_workspace_picks_deepest() {
1165        let roots = vec![
1166            (Language::TypeScript, PathBuf::from("/project")),
1167            (Language::TypeScript, PathBuf::from("/project/packages/api")),
1168        ];
1169        let mux = LspMultiplexer::new(PathBuf::from("/project"), roots);
1170        let result = mux.find_nearest_workspace(
1171            Path::new("/project/packages/api/src/main.ts"),
1172            Language::TypeScript,
1173        );
1174        assert_eq!(result, Some(PathBuf::from("/project/packages/api")));
1175    }
1176
1177    #[test]
1178    fn find_nearest_workspace_returns_none_for_wrong_lang() {
1179        let roots = vec![(Language::Rust, PathBuf::from("/project"))];
1180        let mux = LspMultiplexer::new(PathBuf::from("/project"), roots);
1181        let result =
1182            mux.find_nearest_workspace(Path::new("/project/src/index.ts"), Language::TypeScript);
1183        assert!(result.is_none());
1184    }
1185
1186    #[test]
1187    fn initial_root_picks_first_for_language() {
1188        let roots = vec![
1189            (Language::TypeScript, PathBuf::from("/project/packages/api")),
1190            (Language::TypeScript, PathBuf::from("/project/packages/web")),
1191        ];
1192        let mux = LspMultiplexer::new(PathBuf::from("/project"), roots);
1193        assert_eq!(
1194            mux.initial_root(Language::TypeScript),
1195            PathBuf::from("/project/packages/api")
1196        );
1197    }
1198
1199    #[test]
1200    fn initial_root_falls_back_to_project_root() {
1201        let roots = vec![(Language::Rust, PathBuf::from("/project"))];
1202        let mux = LspMultiplexer::new(PathBuf::from("/project"), roots);
1203        assert_eq!(mux.initial_root(Language::Go), PathBuf::from("/project"));
1204    }
1205
1206    #[test]
1207    fn is_ready_for_unbooted() {
1208        let roots = vec![(Language::Rust, PathBuf::from("/project"))];
1209        let mux = LspMultiplexer::new(PathBuf::from("/project"), roots);
1210        assert!(!mux.is_ready(Language::Rust));
1211    }
1212
1213    #[test]
1214    fn find_warmup_file_prefers_src() {
1215        let dir = tempfile::tempdir().unwrap();
1216        let src = dir.path().join("src");
1217        std::fs::create_dir(&src).unwrap();
1218        std::fs::write(src.join("main.rs"), "fn main() {}").unwrap();
1219        std::fs::write(dir.path().join("build.rs"), "fn main() {}").unwrap();
1220
1221        let result = find_warmup_file(dir.path(), Language::Rust);
1222        assert!(result.is_some());
1223        assert!(result.unwrap().starts_with(&src));
1224    }
1225
1226    #[test]
1227    fn find_warmup_file_finds_ts() {
1228        let dir = tempfile::tempdir().unwrap();
1229        let src = dir.path().join("src");
1230        std::fs::create_dir(&src).unwrap();
1231        std::fs::write(src.join("index.ts"), "export {}").unwrap();
1232
1233        let result = find_warmup_file(dir.path(), Language::TypeScript);
1234        assert!(result.is_some());
1235        assert_eq!(result.unwrap().extension().unwrap(), "ts");
1236    }
1237
1238    #[test]
1239    fn find_warmup_file_returns_none_for_empty() {
1240        let dir = tempfile::tempdir().unwrap();
1241        let result = find_warmup_file(dir.path(), Language::Go);
1242        assert!(result.is_none());
1243    }
1244
1245    #[test]
1246    fn set_max_lru_sessions() {
1247        let mux = LspMultiplexer::new(PathBuf::from("/project"), vec![]);
1248        mux.set_max_lru_sessions(5);
1249        assert_eq!(mux.config.read().unwrap().max_lru_sessions, 5);
1250    }
1251
1252    #[test]
1253    fn set_and_get_priority_roots() {
1254        let mux = LspMultiplexer::new(PathBuf::from("/project"), vec![]);
1255        assert!(mux.priority_roots().is_empty());
1256
1257        let roots: HashSet<PathBuf> = [
1258            PathBuf::from("/project/packages/core"),
1259            PathBuf::from("/project/packages/api"),
1260        ]
1261        .into();
1262        mux.set_priority_roots(roots);
1263        assert_eq!(mux.priority_roots().len(), 2);
1264        assert!(mux
1265            .priority_roots()
1266            .contains(&PathBuf::from("/project/packages/core")));
1267    }
1268
1269    #[tokio::test]
1270    #[ignore = "requires rust-analyzer installed"]
1271    async fn multiplexer_starts_lsp_on_demand() {
1272        let fixture = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/rust-hello");
1273        let roots = vec![(Language::Rust, fixture.clone())];
1274        let mux = LspMultiplexer::new(fixture.clone(), roots);
1275
1276        let guard = mux.get_or_start(Language::Rust).await;
1277        assert!(guard.is_ok());
1278        assert_eq!(mux.active_languages(), vec![Language::Rust]);
1279
1280        mux.shutdown_all().await;
1281    }
1282
1283    #[tokio::test]
1284    #[ignore = "requires rust-analyzer installed"]
1285    async fn multiplexer_reuses_existing_client() {
1286        let fixture = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/rust-hello");
1287        let roots = vec![(Language::Rust, fixture.clone())];
1288        let mux = LspMultiplexer::new(fixture.clone(), roots);
1289
1290        mux.get_or_start(Language::Rust).await.unwrap();
1291        assert_eq!(mux.active_languages().len(), 1);
1292
1293        mux.get_or_start(Language::Rust).await.unwrap();
1294        assert_eq!(mux.active_languages().len(), 1);
1295
1296        mux.shutdown_all().await;
1297    }
1298
1299    #[tokio::test]
1300    #[ignore = "requires rust-analyzer installed"]
1301    async fn multiplexer_shutdown_all() {
1302        let fixture = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/rust-hello");
1303        let roots = vec![(Language::Rust, fixture.clone())];
1304        let mux = LspMultiplexer::new(fixture.clone(), roots);
1305
1306        mux.get_or_start(Language::Rust).await.unwrap();
1307        assert_eq!(mux.active_languages().len(), 1);
1308
1309        mux.shutdown_all().await;
1310        assert!(mux.active_languages().is_empty());
1311    }
1312
1313    #[tokio::test]
1314    #[ignore = "requires rust-analyzer installed"]
1315    async fn multiplexer_status_shows_info() {
1316        let fixture = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/rust-hello");
1317        let roots = vec![(Language::Rust, fixture.clone())];
1318        let mux = LspMultiplexer::new(fixture.clone(), roots);
1319
1320        mux.get_or_start(Language::Rust).await.unwrap();
1321
1322        let statuses = mux.status();
1323        assert_eq!(statuses.len(), 1);
1324        assert_eq!(statuses[0].language, "rust");
1325        assert_eq!(statuses[0].server_name, "rust-analyzer");
1326        assert_eq!(statuses[0].status, "ready");
1327        assert_eq!(statuses[0].total_folders, 1);
1328
1329        mux.shutdown_all().await;
1330    }
1331}