Skip to main content

aft/lsp/
manager.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3
4use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
5use lsp_types::notification::{
6    DidChangeTextDocument, DidChangeWatchedFiles, DidCloseTextDocument, DidOpenTextDocument,
7};
8use lsp_types::{
9    DidChangeTextDocumentParams, DidChangeWatchedFilesParams, DidCloseTextDocumentParams,
10    DidOpenTextDocumentParams, FileChangeType, FileEvent, TextDocumentContentChangeEvent,
11    TextDocumentIdentifier, TextDocumentItem, VersionedTextDocumentIdentifier,
12};
13
14use crate::config::Config;
15use crate::lsp::child_registry::LspChildRegistry;
16use crate::lsp::client::{LspClient, LspEvent, ServerState};
17use crate::lsp::diagnostics::{
18    from_lsp_diagnostics, DiagnosticEntry, DiagnosticsStore, StoredDiagnostic,
19};
20use crate::lsp::document::DocumentStore;
21use crate::lsp::position::{uri_for_path, uri_to_path};
22use crate::lsp::registry::{resolve_lsp_binary, servers_for_file, ServerDef, ServerKind};
23use crate::lsp::roots::{find_workspace_root, ServerKey};
24use crate::lsp::LspError;
25use crate::slog_error;
26
27/// Outcome of attempting to ensure a server is running for a single matching
28/// `ServerDef`. Returned per matching server so the caller can report exactly
29/// what happened to the user instead of collapsing all failures into "no
30/// server".
31#[derive(Debug, Clone)]
32pub enum ServerAttemptResult {
33    /// Server is running and ready to serve requests for this file.
34    Ok { server_key: ServerKey },
35    /// No workspace root was found by walking up from the file looking for
36    /// any of the server's configured root markers.
37    NoRootMarker { looked_for: Vec<String> },
38    /// The server's binary could not be found on PATH (or override was
39    /// missing/invalid).
40    BinaryNotInstalled { binary: String },
41    /// Binary was found but spawning or initializing the server failed.
42    SpawnFailed { binary: String, reason: String },
43}
44
45/// One server's attempt to handle a file.
46#[derive(Debug, Clone)]
47pub struct ServerAttempt {
48    /// Stable server identifier (kind ID, e.g. "pyright", "rust-analyzer").
49    pub server_id: String,
50    /// Server display name from the registry.
51    pub server_name: String,
52    pub result: ServerAttemptResult,
53}
54
55/// Aggregate outcome of `ensure_server_for_file_detailed`. Distinguishes:
56/// - "No server registered for this file's extension" (`attempts.is_empty()`)
57/// - "Servers registered but none could start" (`successful.is_empty()` but
58///   `!attempts.is_empty()`)
59/// - "At least one server is ready" (`!successful.is_empty()`)
60#[derive(Debug, Clone, Default)]
61pub struct EnsureServerOutcomes {
62    /// Server keys that are now running and ready to serve requests.
63    pub successful: Vec<ServerKey>,
64    /// Per-server attempt records. Empty if no server is registered for the
65    /// file's extension.
66    pub attempts: Vec<ServerAttempt>,
67}
68
69impl EnsureServerOutcomes {
70    /// True if no server in the registry matched this file's extension.
71    pub fn no_server_registered(&self) -> bool {
72        self.attempts.is_empty()
73    }
74}
75
76/// Outcome of a post-edit diagnostics wait. Reports the per-server status
77/// alongside the fresh diagnostics, so the response layer can build an
78/// honest tri-state payload (`success: true` + `complete: bool` + named
79/// gap fields per `crates/aft/src/protocol.rs`).
80///
81/// `diagnostics` only contains entries from servers that proved freshness
82/// (version-match preferred, epoch-fallback for unversioned servers).
83/// Pre-edit cached entries are NEVER included — that's the whole point of
84/// this type.
85#[derive(Debug, Clone, Default)]
86pub struct PostEditWaitOutcome {
87    /// Diagnostics from servers whose response we verified is FOR the
88    /// post-edit document version (or whose epoch we saw advance after our
89    /// pre-edit snapshot, for unversioned servers).
90    pub diagnostics: Vec<StoredDiagnostic>,
91    /// Servers we expected to publish but didn't before the deadline.
92    /// Reported to the agent via `pending_lsp_servers` so they understand
93    /// the result is partial.
94    pub pending_servers: Vec<ServerKey>,
95    /// Servers whose process exited between notification and deadline.
96    /// Reported separately so the agent knows the gap is unrecoverable
97    /// without a server restart, not "wait longer."
98    pub exited_servers: Vec<ServerKey>,
99}
100
101/// Pre-edit freshness snapshot for one server/file pair.
102#[derive(Debug, Clone, Copy, Default)]
103pub struct PreEditSnapshot {
104    pub epoch: u64,
105    pub document_version_at_capture: Option<i32>,
106}
107
108pub fn post_edit_entry_is_fresh(
109    entry: &DiagnosticEntry,
110    target_version: i32,
111    pre: PreEditSnapshot,
112) -> bool {
113    if entry.epoch <= pre.epoch {
114        return false;
115    }
116
117    match entry.version {
118        Some(version) => version >= target_version,
119        // Unversioned publishDiagnostics payloads cannot prove which document
120        // state they describe. Epoch advancement only proves arrival order; an
121        // old analysis result can still arrive after our pre-snapshot. Treat as
122        // pending/partial rather than fresh.
123        None => false,
124    }
125}
126
127impl PostEditWaitOutcome {
128    /// True if every expected server reported a fresh result. False means
129    /// the agent should treat the diagnostics as a partial picture.
130    pub fn complete(&self) -> bool {
131        self.pending_servers.is_empty() && self.exited_servers.is_empty()
132    }
133}
134
135/// Per-server outcome of a `textDocument/diagnostic` (per-file pull) request.
136#[derive(Debug, Clone)]
137pub enum PullFileOutcome {
138    /// Server returned a full report; diagnostics stored.
139    Full { diagnostic_count: usize },
140    /// Server returned `kind: "unchanged"` — cached diagnostics still valid.
141    Unchanged,
142    /// Server returned a partial-result token; we don't subscribe to streamed
143    /// progress so the response is treated as a soft empty until the next pull.
144    PartialNotSupported,
145    /// Server doesn't advertise pull capability — caller should fall back to
146    /// push diagnostics for this server.
147    PullNotSupported,
148    /// The pull request failed (timeout, server error, etc.).
149    RequestFailed { reason: String },
150}
151
152/// Result of `pull_file_diagnostics` for one matching server.
153#[derive(Debug, Clone)]
154pub struct PullFileResult {
155    pub server_key: ServerKey,
156    pub outcome: PullFileOutcome,
157}
158
159/// Result of `pull_workspace_diagnostics` for a single server.
160#[derive(Debug, Clone)]
161pub struct PullWorkspaceResult {
162    pub server_key: ServerKey,
163    /// Files for which a Full report was received and cached. Files that came
164    /// back as `Unchanged` are NOT listed here because their cached entry was
165    /// already authoritative.
166    pub files_reported: Vec<PathBuf>,
167    /// True if the server returned a full response within the timeout.
168    pub complete: bool,
169    /// True if we cancelled (request timed out before the server responded).
170    pub cancelled: bool,
171    /// True if the server advertised workspace pull support. When false, the
172    /// other fields are empty and the caller should fall back to file-mode
173    /// pull or to push semantics.
174    pub supports_workspace: bool,
175}
176
177pub struct LspManager {
178    /// Active server instances, keyed by (ServerKind, workspace_root).
179    clients: HashMap<ServerKey, LspClient>,
180    /// Tracks opened documents and versions per active server.
181    documents: HashMap<ServerKey, DocumentStore>,
182    /// Stored publishDiagnostics payloads across all servers.
183    diagnostics: DiagnosticsStore,
184    /// Unified event channel — all server reader threads send here.
185    event_tx: Sender<LspEvent>,
186    event_rx: Receiver<LspEvent>,
187    /// Optional binary path overrides used by integration tests.
188    binary_overrides: HashMap<ServerKind, PathBuf>,
189    /// Extra env vars merged into every spawned LSP child. Used in tests to
190    /// drive the fake server's behavioral variants (`AFT_FAKE_LSP_PULL=1`,
191    /// `AFT_FAKE_LSP_WORKSPACE=1`, etc.). Production code does not set this.
192    extra_env: HashMap<String, String>,
193    /// Per-(kind,root) cache of spawn failures. Once a server fails to spawn
194    /// for a workspace root, we remember why and skip subsequent attempts for
195    /// the lifetime of this AFT process. Without this, every file open or
196    /// didChange retries `spawn_server` and logs a fresh ERROR — visible as
197    /// repeated `failed to spawn TypeScript Language Server: Could not find a
198    /// valid TypeScript installation` lines per edit.
199    ///
200    /// Entries are NEVER evicted automatically. The expected recovery path is
201    /// for the user to fix their environment (install the missing binary or
202    /// add a `tsconfig.json` / `package.json` with the right dependency) and
203    /// restart OpenCode/Pi, which spawns a fresh `aft` process with an empty
204    /// cache. We deliberately don't auto-retry on file events: the failure
205    /// modes we track here (binary not installed, init handshake failure)
206    /// don't fix themselves at runtime.
207    failed_spawns: HashMap<ServerKey, ServerAttemptResult>,
208    /// Server/root pairs for which we already logged that watched-file
209    /// notifications are skipped because the capability is absent.
210    watched_file_skip_logged: HashSet<ServerKey>,
211    /// Tracks PIDs of spawned LSP child processes so the signal handler can
212    /// kill them on SIGTERM/SIGINT before aft exits, preventing orphans.
213    /// Defaults to empty; production wires this from `AppContext`.
214    child_registry: LspChildRegistry,
215}
216
217impl LspManager {
218    pub fn new() -> Self {
219        let (event_tx, event_rx) = unbounded();
220        Self {
221            clients: HashMap::new(),
222            documents: HashMap::new(),
223            diagnostics: DiagnosticsStore::new(),
224            event_tx,
225            event_rx,
226            binary_overrides: HashMap::new(),
227            extra_env: HashMap::new(),
228            failed_spawns: HashMap::new(),
229            watched_file_skip_logged: HashSet::new(),
230            child_registry: LspChildRegistry::new(),
231        }
232    }
233
234    /// Set the child-PID registry. Must be called before any servers spawn.
235    pub fn set_child_registry(&mut self, registry: LspChildRegistry) {
236        self.child_registry = registry;
237    }
238
239    /// For testing: set an extra environment variable that gets passed to
240    /// every spawned LSP child process. Useful for driving fake-server
241    /// behavioral variants in integration tests.
242    pub fn set_extra_env(&mut self, key: &str, value: &str) {
243        self.extra_env.insert(key.to_string(), value.to_string());
244    }
245
246    /// Count active LSP server instances.
247    pub fn server_count(&self) -> usize {
248        self.clients.len()
249    }
250
251    /// For testing: override the binary for a server kind.
252    pub fn override_binary(&mut self, kind: ServerKind, binary_path: PathBuf) {
253        self.binary_overrides.insert(kind, binary_path);
254    }
255
256    /// Ensure a server is running for the given file. Spawns if needed.
257    /// Returns the active server keys for the file, or an empty vec if none match.
258    ///
259    /// This is the lightweight wrapper around [`ensure_server_for_file_detailed`]
260    /// that drops failure context. Prefer the detailed variant in command
261    /// handlers that need to surface honest error messages to the agent.
262    pub fn ensure_server_for_file(&mut self, file_path: &Path, config: &Config) -> Vec<ServerKey> {
263        self.ensure_server_for_file_detailed(file_path, config)
264            .successful
265    }
266
267    /// Detailed version of [`ensure_server_for_file`] that records every
268    /// matching server's outcome (`Ok` / `NoRootMarker` / `BinaryNotInstalled`
269    /// / `SpawnFailed`).
270    ///
271    /// Use this when the caller wants to honestly report _why_ a file has no
272    /// active server (e.g., to surface "bash-language-server not on PATH" to
273    /// the agent instead of silently returning `total: 0`).
274    pub fn ensure_server_for_file_detailed(
275        &mut self,
276        file_path: &Path,
277        config: &Config,
278    ) -> EnsureServerOutcomes {
279        let defs = servers_for_file(file_path, config);
280        let mut outcomes = EnsureServerOutcomes::default();
281
282        for def in defs {
283            let server_id = def.kind.id_str().to_string();
284            let server_name = def.name.to_string();
285
286            let Some(root) = find_workspace_root(file_path, &def.root_markers) else {
287                outcomes.attempts.push(ServerAttempt {
288                    server_id,
289                    server_name,
290                    result: ServerAttemptResult::NoRootMarker {
291                        looked_for: def.root_markers.iter().map(|s| s.to_string()).collect(),
292                    },
293                });
294                continue;
295            };
296
297            let key = ServerKey {
298                kind: def.kind.clone(),
299                root,
300            };
301
302            if !self.clients.contains_key(&key) {
303                // If we already tried and failed to spawn this server for this
304                // root, return the cached classification without retrying or
305                // re-logging. This prevents per-edit ERROR spam when the user's
306                // environment is missing a dependency the LSP needs (the
307                // typescript-language-server "Could not find a valid TypeScript
308                // installation" case is the canonical example).
309                if let Some(cached) = self.failed_spawns.get(&key) {
310                    outcomes.attempts.push(ServerAttempt {
311                        server_id,
312                        server_name,
313                        result: cached.clone(),
314                    });
315                    continue;
316                }
317
318                match self.spawn_server(&def, &key.root, config) {
319                    Ok(client) => {
320                        self.clients.insert(key.clone(), client);
321                        self.documents.entry(key.clone()).or_default();
322                    }
323                    Err(err) => {
324                        slog_error!("failed to spawn {}: {}", def.name, err);
325                        let result = classify_spawn_error(&def.binary, &err);
326                        // Remember the failure so subsequent file events skip
327                        // this (kind, root) pair instead of producing a fresh
328                        // spawn attempt + ERROR log per request.
329                        self.failed_spawns.insert(key.clone(), result.clone());
330                        outcomes.attempts.push(ServerAttempt {
331                            server_id,
332                            server_name,
333                            result,
334                        });
335                        continue;
336                    }
337                }
338            }
339
340            outcomes.attempts.push(ServerAttempt {
341                server_id,
342                server_name,
343                result: ServerAttemptResult::Ok {
344                    server_key: key.clone(),
345                },
346            });
347            outcomes.successful.push(key);
348        }
349
350        outcomes
351    }
352
353    /// Ensure a server is running using the default LSP registry.
354    /// Kept for integration tests that exercise built-in server helpers directly.
355    pub fn ensure_server_for_file_default(&mut self, file_path: &Path) -> Vec<ServerKey> {
356        self.ensure_server_for_file(file_path, &Config::default())
357    }
358    /// Ensure that servers are running for the file and that the document is open
359    /// in each server's DocumentStore. Reads file content from disk if not already open.
360    /// Returns the server keys for the file.
361    pub fn ensure_file_open(
362        &mut self,
363        file_path: &Path,
364        config: &Config,
365    ) -> Result<Vec<ServerKey>, LspError> {
366        let canonical_path = canonicalize_for_lsp(file_path)?;
367        let server_keys = self.ensure_server_for_file(&canonical_path, config);
368        if server_keys.is_empty() {
369            return Ok(server_keys);
370        }
371
372        let uri = uri_for_path(&canonical_path)?;
373        let language_id = language_id_for_extension(
374            canonical_path
375                .extension()
376                .and_then(|ext| ext.to_str())
377                .unwrap_or_default(),
378        )
379        .to_string();
380
381        for key in &server_keys {
382            let already_open = self
383                .documents
384                .get(key)
385                .is_some_and(|store| store.is_open(&canonical_path));
386
387            if !already_open {
388                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
389                if let Some(client) = self.clients.get_mut(key) {
390                    client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
391                        text_document: TextDocumentItem::new(
392                            uri.clone(),
393                            language_id.clone(),
394                            0,
395                            content,
396                        ),
397                    })?;
398                }
399                self.documents
400                    .entry(key.clone())
401                    .or_default()
402                    .open(canonical_path.clone());
403                continue;
404            }
405
406            // Document is already open. Check disk drift — if the file has
407            // been modified outside the AFT pipeline (other tool, manual
408            // edit, sibling session) we MUST send a didChange before any
409            // pull-diagnostic / hover query, otherwise the LSP server
410            // returns results computed from stale in-memory content.
411            //
412            // This is the regression fix Oracle flagged in finding #6:
413            // "ensure_file_open skips already-open files without checking
414            // if disk content changed."
415            let drifted = self
416                .documents
417                .get(key)
418                .is_some_and(|store| store.is_stale_on_disk(&canonical_path));
419            if drifted {
420                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
421                let next_version = self
422                    .documents
423                    .get(key)
424                    .and_then(|store| store.version(&canonical_path))
425                    .map(|v| v + 1)
426                    .unwrap_or(1);
427                if let Some(client) = self.clients.get_mut(key) {
428                    client.send_notification::<DidChangeTextDocument>(
429                        DidChangeTextDocumentParams {
430                            text_document: VersionedTextDocumentIdentifier::new(
431                                uri.clone(),
432                                next_version,
433                            ),
434                            content_changes: vec![TextDocumentContentChangeEvent {
435                                range: None,
436                                range_length: None,
437                                text: content,
438                            }],
439                        },
440                    )?;
441                }
442                if let Some(store) = self.documents.get_mut(key) {
443                    store.bump_version(&canonical_path);
444                }
445            }
446        }
447
448        Ok(server_keys)
449    }
450
451    pub fn ensure_file_open_default(
452        &mut self,
453        file_path: &Path,
454    ) -> Result<Vec<ServerKey>, LspError> {
455        self.ensure_file_open(file_path, &Config::default())
456    }
457
458    /// Notify relevant LSP servers that a file has been written/changed.
459    /// This is the main hook called after every file write in AFT.
460    ///
461    /// If the file's server isn't running yet, starts it (lazy spawn).
462    /// If the file isn't open in LSP yet, sends didOpen. Otherwise sends didChange.
463    pub fn notify_file_changed(
464        &mut self,
465        file_path: &Path,
466        content: &str,
467        config: &Config,
468    ) -> Result<(), LspError> {
469        self.notify_file_changed_versioned(file_path, content, config)
470            .map(|_| ())
471    }
472
473    /// Like `notify_file_changed`, but returns the target document version
474    /// per server so the post-edit waiter can match `publishDiagnostics`
475    /// against the exact version that this notification carried.
476    ///
477    /// Returns: `Vec<(ServerKey, target_version)>`. `target_version` is the
478    /// `version` field on the `VersionedTextDocumentIdentifier` we just sent
479    /// (post-bump). For freshly-opened documents (`didOpen`) the version is
480    /// `0`. Servers that don't honor versioned text document sync will not
481    /// echo this back on `publishDiagnostics`; the caller is expected to
482    /// fall back to the epoch-delta path for those.
483    pub fn notify_file_changed_versioned(
484        &mut self,
485        file_path: &Path,
486        content: &str,
487        config: &Config,
488    ) -> Result<Vec<(ServerKey, i32)>, LspError> {
489        let canonical_path = canonicalize_for_lsp(file_path)?;
490        let server_keys = self.ensure_server_for_file(&canonical_path, config);
491        if server_keys.is_empty() {
492            return Ok(Vec::new());
493        }
494
495        let uri = uri_for_path(&canonical_path)?;
496        let language_id = language_id_for_extension(
497            canonical_path
498                .extension()
499                .and_then(|ext| ext.to_str())
500                .unwrap_or_default(),
501        )
502        .to_string();
503
504        let mut versions: Vec<(ServerKey, i32)> = Vec::with_capacity(server_keys.len());
505
506        for key in server_keys {
507            let current_version = self
508                .documents
509                .get(&key)
510                .and_then(|store| store.version(&canonical_path));
511
512            if let Some(version) = current_version {
513                let next_version = version + 1;
514                if let Some(client) = self.clients.get_mut(&key) {
515                    client.send_notification::<DidChangeTextDocument>(
516                        DidChangeTextDocumentParams {
517                            text_document: VersionedTextDocumentIdentifier::new(
518                                uri.clone(),
519                                next_version,
520                            ),
521                            content_changes: vec![TextDocumentContentChangeEvent {
522                                range: None,
523                                range_length: None,
524                                text: content.to_string(),
525                            }],
526                        },
527                    )?;
528                }
529                if let Some(store) = self.documents.get_mut(&key) {
530                    store.bump_version(&canonical_path);
531                }
532                versions.push((key, next_version));
533                continue;
534            }
535
536            if let Some(client) = self.clients.get_mut(&key) {
537                client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
538                    text_document: TextDocumentItem::new(
539                        uri.clone(),
540                        language_id.clone(),
541                        0,
542                        content.to_string(),
543                    ),
544                })?;
545            }
546            self.documents
547                .entry(key.clone())
548                .or_default()
549                .open(canonical_path.clone());
550            // didOpen carries version 0 — that's the version the server
551            // will echo on its first publishDiagnostics for this document.
552            versions.push((key, 0));
553        }
554
555        Ok(versions)
556    }
557
558    pub fn notify_file_changed_default(
559        &mut self,
560        file_path: &Path,
561        content: &str,
562    ) -> Result<(), LspError> {
563        self.notify_file_changed(file_path, content, &Config::default())
564    }
565
566    /// Notify every active server whose workspace contains at least one changed
567    /// path that watched files changed. This is intentionally workspace-scoped
568    /// rather than extension-scoped: configuration edits such as `package.json`
569    /// or `tsconfig.json` affect a server's project graph even though those
570    /// files may not be documents handled by the server itself.
571    pub fn notify_files_watched_changed(
572        &mut self,
573        paths: &[(PathBuf, FileChangeType)],
574        _config: &Config,
575    ) -> Result<(), LspError> {
576        if paths.is_empty() {
577            return Ok(());
578        }
579
580        let mut canonical_events = Vec::with_capacity(paths.len());
581        for (path, typ) in paths {
582            let canonical_path = resolve_for_lsp_uri(path);
583            canonical_events.push((canonical_path, *typ));
584        }
585
586        let keys: Vec<ServerKey> = self.clients.keys().cloned().collect();
587        for key in keys {
588            let mut changes = Vec::new();
589            for (path, typ) in &canonical_events {
590                if !path.starts_with(&key.root) {
591                    continue;
592                }
593                changes.push(FileEvent::new(uri_for_path(path)?, *typ));
594            }
595
596            if changes.is_empty() {
597                continue;
598            }
599
600            if let Some(client) = self.clients.get_mut(&key) {
601                // Only send after the server dynamically registers interest.
602                // `workspace/didChangeWatchedFiles` is client-owned; a server's
603                // initialize-time dynamicRegistration shape is not a subscription.
604                if !client.has_watched_file_registration() {
605                    if self.watched_file_skip_logged.insert(key.clone()) {
606                        log::debug!(
607                            "skipping didChangeWatchedFiles for {:?} (not dynamically registered)",
608                            key
609                        );
610                    }
611                    continue;
612                }
613                client.send_notification::<DidChangeWatchedFiles>(DidChangeWatchedFilesParams {
614                    changes,
615                })?;
616            }
617        }
618
619        Ok(())
620    }
621
622    /// Close a document in all servers that have it open.
623    pub fn notify_file_closed(&mut self, file_path: &Path) -> Result<(), LspError> {
624        let canonical_path = canonicalize_for_lsp(file_path)?;
625        let uri = uri_for_path(&canonical_path)?;
626        let keys: Vec<ServerKey> = self.documents.keys().cloned().collect();
627
628        for key in keys {
629            let was_open = self
630                .documents
631                .get(&key)
632                .map(|store| store.is_open(&canonical_path))
633                .unwrap_or(false);
634            if !was_open {
635                continue;
636            }
637
638            if let Some(client) = self.clients.get_mut(&key) {
639                client.send_notification::<DidCloseTextDocument>(DidCloseTextDocumentParams {
640                    text_document: TextDocumentIdentifier::new(uri.clone()),
641                })?;
642            }
643
644            if let Some(store) = self.documents.get_mut(&key) {
645                store.close(&canonical_path);
646            }
647        }
648
649        Ok(())
650    }
651
652    /// Get an active client for a file path, if one exists.
653    pub fn client_for_file(&self, file_path: &Path, config: &Config) -> Option<&LspClient> {
654        let key = self.server_key_for_file(file_path, config)?;
655        self.clients.get(&key)
656    }
657
658    pub fn client_for_file_default(&self, file_path: &Path) -> Option<&LspClient> {
659        self.client_for_file(file_path, &Config::default())
660    }
661
662    /// Get a mutable active client for a file path, if one exists.
663    pub fn client_for_file_mut(
664        &mut self,
665        file_path: &Path,
666        config: &Config,
667    ) -> Option<&mut LspClient> {
668        let key = self.server_key_for_file(file_path, config)?;
669        self.clients.get_mut(&key)
670    }
671
672    pub fn client_for_file_mut_default(&mut self, file_path: &Path) -> Option<&mut LspClient> {
673        self.client_for_file_mut(file_path, &Config::default())
674    }
675
676    /// Number of tracked server clients.
677    pub fn active_client_count(&self) -> usize {
678        self.clients.len()
679    }
680
681    /// Drain all pending LSP events. Call from the main loop.
682    pub fn drain_events(&mut self) -> Vec<LspEvent> {
683        let mut events = Vec::new();
684        while let Ok(event) = self.event_rx.try_recv() {
685            self.handle_event(&event);
686            events.push(event);
687        }
688        events
689    }
690
691    /// Wait for diagnostics to arrive for a specific file until a timeout expires.
692    pub fn wait_for_diagnostics(
693        &mut self,
694        file_path: &Path,
695        config: &Config,
696        timeout: std::time::Duration,
697    ) -> Vec<StoredDiagnostic> {
698        let deadline = std::time::Instant::now() + timeout;
699        self.wait_for_file_diagnostics(file_path, config, deadline)
700    }
701
702    pub fn wait_for_diagnostics_default(
703        &mut self,
704        file_path: &Path,
705        timeout: std::time::Duration,
706    ) -> Vec<StoredDiagnostic> {
707        self.wait_for_diagnostics(file_path, &Config::default(), timeout)
708    }
709
710    /// Test-only accessor for the diagnostics store. Used by integration
711    /// tests that need to inspect per-server entries (e.g., to verify that
712    /// `ServerKey::root` is populated correctly, not the empty path that
713    /// the legacy `publish_with_kind` path produced).
714    #[doc(hidden)]
715    pub fn diagnostics_store_for_test(&self) -> &DiagnosticsStore {
716        &self.diagnostics
717    }
718
719    /// Snapshot the current per-server epoch for every entry that exists
720    /// for `file_path`. Servers without an entry yet (never published)
721    /// are absent from the map; for those, `pre = 0` (any first publish
722    /// will be considered fresh under the epoch-fallback rule).
723    pub fn snapshot_diagnostic_epochs(&self, file_path: &Path) -> HashMap<ServerKey, u64> {
724        let lookup_path = normalize_lookup_path(file_path);
725        self.diagnostics
726            .entries_for_file(&lookup_path)
727            .into_iter()
728            .map(|(key, entry)| (key.clone(), entry.epoch))
729            .collect()
730    }
731
732    /// Snapshot the current diagnostic epoch and document version for every
733    /// active server relevant to `file_path` before a post-edit notification.
734    pub fn snapshot_pre_edit_state(&self, file_path: &Path) -> HashMap<ServerKey, PreEditSnapshot> {
735        let lookup_path = normalize_lookup_path(file_path);
736        let mut snapshots: HashMap<ServerKey, PreEditSnapshot> = self
737            .diagnostics
738            .entries_for_file(&lookup_path)
739            .into_iter()
740            .map(|(key, entry)| {
741                (
742                    key.clone(),
743                    PreEditSnapshot {
744                        epoch: entry.epoch,
745                        document_version_at_capture: None,
746                    },
747                )
748            })
749            .collect();
750
751        for (key, store) in &self.documents {
752            if let Some(version) = store.version(&lookup_path) {
753                snapshots
754                    .entry(key.clone())
755                    .or_default()
756                    .document_version_at_capture = Some(version);
757            }
758        }
759
760        snapshots
761    }
762
763    /// True when the current diagnostic entry for `server_key` can be tied to
764    /// that server's current in-memory document version for `file_path`.
765    ///
766    /// File-mode `lsp_diagnostics` uses this for push-only fallback after it
767    /// has synced/opened the document. Versioned publishes are accepted when
768    /// they match the current document version; unversioned publishes are not
769    /// accepted as fresh because epoch/wall-clock ordering alone is racy.
770    pub fn diagnostic_entry_is_fresh_for_document(
771        &self,
772        file_path: &Path,
773        server_key: &ServerKey,
774        pre: PreEditSnapshot,
775    ) -> bool {
776        let lookup_path = normalize_lookup_path(file_path);
777        let Some(entry) = self
778            .diagnostics
779            .entries_for_file(&lookup_path)
780            .into_iter()
781            .find_map(|(key, entry)| if key == server_key { Some(entry) } else { None })
782        else {
783            return false;
784        };
785
786        let target_version = self
787            .documents
788            .get(server_key)
789            .and_then(|store| store.version(&lookup_path))
790            .or(pre.document_version_at_capture)
791            .unwrap_or(0);
792
793        matches!(entry.version, Some(version) if version >= target_version)
794    }
795
796    /// Wait for FRESH per-server diagnostics that match the just-sent
797    /// document version. This is the v0.17.3 post-edit path that fixes the
798    /// stale-diagnostics bug: instead of returning whatever is in the cache
799    /// when the deadline hits, we only return entries whose `version`
800    /// matches the post-edit target version (or, for servers that don't
801    /// participate in versioned sync, whose `epoch` was bumped after the
802    /// pre-edit snapshot).
803    ///
804    /// `expected_versions` should come from `notify_file_changed_versioned`
805    /// — one `(ServerKey, target_version)` per server we sent didChange/
806    /// didOpen to.
807    ///
808    /// `pre_snapshot` is the per-server epoch BEFORE the notification was
809    /// sent; it gates the epoch-fallback path so an old-version publish
810    /// arriving after `drain_events` and before `didChange` cannot be
811    /// mistaken for a fresh response.
812    ///
813    /// Returns a per-server tri-state: `Fresh` (publish matched target
814    /// version OR epoch advanced past snapshot for an unversioned server),
815    /// `Pending` (deadline hit before this server published anything we
816    /// could verify), or `Exited` (server died between notification and
817    /// deadline).
818    pub fn wait_for_post_edit_diagnostics(
819        &mut self,
820        file_path: &Path,
821        // `config` is intentionally accepted (matches sibling wait APIs and
822        // future-proofs us if freshness rules need it). Currently unused
823        // because expected_versions/pre_snapshot fully determine behavior.
824        _config: &Config,
825        expected_versions: &[(ServerKey, i32)],
826        pre_snapshot: &HashMap<ServerKey, PreEditSnapshot>,
827        timeout: std::time::Duration,
828    ) -> PostEditWaitOutcome {
829        let lookup_path = normalize_lookup_path(file_path);
830        let deadline = std::time::Instant::now() + timeout;
831
832        // Drain any events that arrived while we were sending didChange.
833        // The publishDiagnostics handler stores the version, so even
834        // pre-snapshot publishes that landed late won't be mistaken for
835        // fresh — the version-match check will reject them.
836        let _ = self.drain_events_for_file(&lookup_path);
837
838        let mut fresh: HashMap<ServerKey, Vec<StoredDiagnostic>> = HashMap::new();
839        let mut exited: Vec<ServerKey> = Vec::new();
840
841        loop {
842            // Check freshness for every expected server. A server is fresh
843            // if its current entry for this file satisfies either:
844            //   1. version-match: entry.version == Some(target_version), OR
845            //   2. push-only freshness: entry.version is None AND entry.epoch
846            //      advanced strictly after the pre-edit snapshot. Versioned
847            //      publishes must be >= the post-edit target version.
848            // Servers whose process has exited are reported separately.
849            for (key, target_version) in expected_versions {
850                if fresh.contains_key(key) || exited.contains(key) {
851                    continue;
852                }
853                if !self.clients.contains_key(key) {
854                    exited.push(key.clone());
855                    continue;
856                }
857                if let Some(entry) = self
858                    .diagnostics
859                    .entries_for_file(&lookup_path)
860                    .into_iter()
861                    .find_map(|(k, e)| if k == key { Some(e) } else { None })
862                {
863                    let pre = pre_snapshot.get(key).copied().unwrap_or_default();
864                    let is_fresh = post_edit_entry_is_fresh(entry, *target_version, pre);
865                    if is_fresh {
866                        fresh.insert(key.clone(), entry.diagnostics.clone());
867                    }
868                }
869            }
870
871            // All accounted for? Done.
872            if fresh.len() + exited.len() == expected_versions.len() {
873                break;
874            }
875
876            let now = std::time::Instant::now();
877            if now >= deadline {
878                break;
879            }
880
881            let timeout = deadline.saturating_duration_since(now);
882            match self.event_rx.recv_timeout(timeout) {
883                Ok(event) => {
884                    self.handle_event(&event);
885                }
886                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
887            }
888        }
889
890        // Pending = expected but neither fresh nor exited.
891        let pending: Vec<ServerKey> = expected_versions
892            .iter()
893            .filter(|(k, _)| !fresh.contains_key(k) && !exited.contains(k))
894            .map(|(k, _)| k.clone())
895            .collect();
896
897        // Build deduplicated, sorted diagnostics from the fresh servers only.
898        // Stale or pending servers contribute zero diagnostics.
899        let mut diagnostics: Vec<StoredDiagnostic> = fresh
900            .into_iter()
901            .flat_map(|(_, diags)| diags.into_iter())
902            .collect();
903        diagnostics.sort_by(|a, b| {
904            a.file
905                .cmp(&b.file)
906                .then(a.line.cmp(&b.line))
907                .then(a.column.cmp(&b.column))
908                .then(a.message.cmp(&b.message))
909        });
910
911        PostEditWaitOutcome {
912            diagnostics,
913            pending_servers: pending,
914            exited_servers: exited,
915        }
916    }
917
918    /// Wait for diagnostics to arrive for a specific file until a deadline.
919    ///
920    /// Drains already-queued events first, then blocks on the shared event
921    /// channel only until either `publishDiagnostics` arrives for this file or
922    /// the deadline is reached.
923    pub fn wait_for_file_diagnostics(
924        &mut self,
925        file_path: &Path,
926        config: &Config,
927        deadline: std::time::Instant,
928    ) -> Vec<StoredDiagnostic> {
929        let lookup_path = normalize_lookup_path(file_path);
930
931        if self.server_key_for_file(&lookup_path, config).is_none() {
932            return Vec::new();
933        }
934
935        loop {
936            if self.drain_events_for_file(&lookup_path) {
937                break;
938            }
939
940            let now = std::time::Instant::now();
941            if now >= deadline {
942                break;
943            }
944
945            let timeout = deadline.saturating_duration_since(now);
946            match self.event_rx.recv_timeout(timeout) {
947                Ok(event) => {
948                    if matches!(
949                        self.handle_event(&event),
950                        Some(ref published_file) if published_file.as_path() == lookup_path.as_path()
951                    ) {
952                        break;
953                    }
954                }
955                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
956            }
957        }
958
959        self.get_diagnostics_for_file(&lookup_path)
960            .into_iter()
961            .cloned()
962            .collect()
963    }
964
965    /// Default timeout for `textDocument/diagnostic` (per-file pull). Servers
966    /// usually respond in under 1s for files they've already analyzed; we
967    /// allow up to 10s before falling back to push semantics. Currently
968    /// surfaced via [`Self::pull_file_timeout`] for callers that want to
969    /// override the wait via the `wait_ms` knob.
970    pub const PULL_FILE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
971
972    /// Public accessor so command handlers can reuse the documented default.
973    pub fn pull_file_timeout() -> std::time::Duration {
974        Self::PULL_FILE_TIMEOUT
975    }
976
977    /// Default timeout for `workspace/diagnostic`. The LSP spec allows the
978    /// server to hold this open indefinitely; we cap at 10s and report
979    /// `complete: false` to the agent rather than hanging the bridge.
980    const PULL_WORKSPACE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
981
982    /// Issue a `textDocument/diagnostic` (LSP 3.17 per-file pull) request to
983    /// every server that supports pull diagnostics for the given file.
984    ///
985    /// Returns the per-server outcome. If a server reports `kind: "unchanged"`,
986    /// the cached entry's diagnostics are surfaced (deterministic re-use of
987    /// the previous response). If a server doesn't advertise pull capability,
988    /// it's skipped here — the caller should fall back to push for those.
989    ///
990    /// Side effects: results are stored in `DiagnosticsStore` so directory-mode
991    /// queries can aggregate them later.
992    pub fn pull_file_diagnostics(
993        &mut self,
994        file_path: &Path,
995        config: &Config,
996    ) -> Result<Vec<PullFileResult>, LspError> {
997        let canonical_path = canonicalize_for_lsp(file_path)?;
998        // Make sure servers are running and the document is open with fresh
999        // content (handles disk-drift via DocumentStore::is_stale_on_disk).
1000        self.ensure_file_open(&canonical_path, config)?;
1001
1002        let server_keys = self.ensure_server_for_file(&canonical_path, config);
1003        if server_keys.is_empty() {
1004            return Ok(Vec::new());
1005        }
1006
1007        let uri = uri_for_path(&canonical_path)?;
1008        let mut results = Vec::with_capacity(server_keys.len());
1009
1010        for key in server_keys {
1011            let supports_pull = self
1012                .clients
1013                .get(&key)
1014                .and_then(|c| c.diagnostic_capabilities())
1015                .is_some_and(|caps| caps.pull_diagnostics);
1016
1017            if !supports_pull {
1018                results.push(PullFileResult {
1019                    server_key: key.clone(),
1020                    outcome: PullFileOutcome::PullNotSupported,
1021                });
1022                continue;
1023            }
1024
1025            // Look up previous resultId for incremental requests.
1026            let previous_result_id = self
1027                .diagnostics
1028                .entries_for_file(&canonical_path)
1029                .into_iter()
1030                .find(|(k, _)| **k == key)
1031                .and_then(|(_, entry)| entry.result_id.clone());
1032
1033            let identifier = self
1034                .clients
1035                .get(&key)
1036                .and_then(|c| c.diagnostic_capabilities())
1037                .and_then(|caps| caps.identifier.clone());
1038
1039            let params = lsp_types::DocumentDiagnosticParams {
1040                text_document: lsp_types::TextDocumentIdentifier { uri: uri.clone() },
1041                identifier,
1042                previous_result_id,
1043                work_done_progress_params: Default::default(),
1044                partial_result_params: Default::default(),
1045            };
1046
1047            let outcome = match self.send_pull_request(&key, params) {
1048                Ok(report) => self.ingest_document_report(&key, &canonical_path, report),
1049                Err(err) => PullFileOutcome::RequestFailed {
1050                    reason: err.to_string(),
1051                },
1052            };
1053
1054            results.push(PullFileResult {
1055                server_key: key,
1056                outcome,
1057            });
1058        }
1059
1060        Ok(results)
1061    }
1062
1063    /// Issue a `workspace/diagnostic` request to a specific server. Cancels
1064    /// internally if `timeout` elapses before the server responds. Cached
1065    /// entries from the response are stored so directory-mode queries pick
1066    /// them up.
1067    pub fn pull_workspace_diagnostics(
1068        &mut self,
1069        server_key: &ServerKey,
1070        timeout: Option<std::time::Duration>,
1071    ) -> Result<PullWorkspaceResult, LspError> {
1072        let timeout = timeout.unwrap_or(Self::PULL_WORKSPACE_TIMEOUT);
1073
1074        let supports_workspace = self
1075            .clients
1076            .get(server_key)
1077            .and_then(|c| c.diagnostic_capabilities())
1078            .is_some_and(|caps| caps.workspace_diagnostics);
1079
1080        if !supports_workspace {
1081            return Ok(PullWorkspaceResult {
1082                server_key: server_key.clone(),
1083                files_reported: Vec::new(),
1084                complete: false,
1085                cancelled: false,
1086                supports_workspace: false,
1087            });
1088        }
1089
1090        let identifier = self
1091            .clients
1092            .get(server_key)
1093            .and_then(|c| c.diagnostic_capabilities())
1094            .and_then(|caps| caps.identifier.clone());
1095
1096        let params = lsp_types::WorkspaceDiagnosticParams {
1097            identifier,
1098            previous_result_ids: Vec::new(),
1099            work_done_progress_params: Default::default(),
1100            partial_result_params: Default::default(),
1101        };
1102
1103        let result = match self
1104            .clients
1105            .get_mut(server_key)
1106            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?
1107            .send_request_with_timeout::<lsp_types::request::WorkspaceDiagnosticRequest>(
1108                params, timeout,
1109            ) {
1110            Ok(result) => result,
1111            Err(LspError::Timeout(_)) => {
1112                return Ok(PullWorkspaceResult {
1113                    server_key: server_key.clone(),
1114                    files_reported: Vec::new(),
1115                    complete: false,
1116                    cancelled: true,
1117                    supports_workspace: true,
1118                });
1119            }
1120            Err(err) => return Err(err),
1121        };
1122
1123        // Extract the items list. Partial responses are not a complete
1124        // workspace view, but the partial payload can still contain useful
1125        // document reports; ingest those while surfacing complete=false.
1126        let (items, complete) = match result {
1127            lsp_types::WorkspaceDiagnosticReportResult::Report(report) => (report.items, true),
1128            lsp_types::WorkspaceDiagnosticReportResult::Partial(partial) => (partial.items, false),
1129        };
1130
1131        // Ingest each file report into the diagnostics store.
1132        let mut files_reported = Vec::with_capacity(items.len());
1133        for item in items {
1134            match item {
1135                lsp_types::WorkspaceDocumentDiagnosticReport::Full(full) => {
1136                    if let Some(file) = uri_to_path(&full.uri) {
1137                        let stored = from_lsp_diagnostics(
1138                            file.clone(),
1139                            full.full_document_diagnostic_report.items.clone(),
1140                        );
1141                        self.diagnostics.publish_with_result_id(
1142                            server_key.clone(),
1143                            file.clone(),
1144                            stored,
1145                            full.full_document_diagnostic_report.result_id.clone(),
1146                        );
1147                        files_reported.push(file);
1148                    }
1149                }
1150                lsp_types::WorkspaceDocumentDiagnosticReport::Unchanged(_unchanged) => {
1151                    // "Unchanged" means the previously cached report is still
1152                    // valid. We left it in place; nothing to do.
1153                }
1154            }
1155        }
1156
1157        Ok(PullWorkspaceResult {
1158            server_key: server_key.clone(),
1159            files_reported,
1160            complete,
1161            cancelled: false,
1162            supports_workspace: true,
1163        })
1164    }
1165
1166    /// Issue the per-file diagnostic request and return the report.
1167    fn send_pull_request(
1168        &mut self,
1169        key: &ServerKey,
1170        params: lsp_types::DocumentDiagnosticParams,
1171    ) -> Result<lsp_types::DocumentDiagnosticReportResult, LspError> {
1172        let client = self
1173            .clients
1174            .get_mut(key)
1175            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?;
1176        client.send_request::<lsp_types::request::DocumentDiagnosticRequest>(params)
1177    }
1178
1179    /// Store the result of a per-file pull request and return a structured
1180    /// outcome the caller can inspect.
1181    fn ingest_document_report(
1182        &mut self,
1183        key: &ServerKey,
1184        canonical_path: &Path,
1185        result: lsp_types::DocumentDiagnosticReportResult,
1186    ) -> PullFileOutcome {
1187        let report = match result {
1188            lsp_types::DocumentDiagnosticReportResult::Report(report) => report,
1189            lsp_types::DocumentDiagnosticReportResult::Partial(_) => {
1190                // Partial results stream in via $/progress notifications which
1191                // we don't currently subscribe to. Treat as a soft-empty
1192                // success — the next pull will get the full version.
1193                return PullFileOutcome::PartialNotSupported;
1194            }
1195        };
1196
1197        match report {
1198            lsp_types::DocumentDiagnosticReport::Full(full) => {
1199                let result_id = full.full_document_diagnostic_report.result_id.clone();
1200                let stored = from_lsp_diagnostics(
1201                    canonical_path.to_path_buf(),
1202                    full.full_document_diagnostic_report.items.clone(),
1203                );
1204                let count = stored.len();
1205                self.diagnostics.publish_with_result_id(
1206                    key.clone(),
1207                    canonical_path.to_path_buf(),
1208                    stored,
1209                    result_id,
1210                );
1211                PullFileOutcome::Full {
1212                    diagnostic_count: count,
1213                }
1214            }
1215            lsp_types::DocumentDiagnosticReport::Unchanged(_unchanged) => {
1216                // The server says cache is still valid. We don't refresh
1217                // anything; the existing entry's diagnostics remain authoritative.
1218                PullFileOutcome::Unchanged
1219            }
1220        }
1221    }
1222
1223    /// Shutdown all servers gracefully.
1224    pub fn shutdown_all(&mut self) {
1225        for (key, mut client) in self.clients.drain() {
1226            if let Err(err) = client.shutdown() {
1227                slog_error!("error shutting down {:?}: {}", key, err);
1228            }
1229        }
1230        self.documents.clear();
1231        self.diagnostics = DiagnosticsStore::new();
1232    }
1233
1234    /// Check if any server is active.
1235    pub fn has_active_servers(&self) -> bool {
1236        self.clients
1237            .values()
1238            .any(|client| client.state() == ServerState::Ready)
1239    }
1240
1241    /// Active server keys (running clients). Used by `lsp_diagnostics`
1242    /// directory mode to know which servers to ask for workspace pull.
1243    pub fn active_server_keys(&self) -> Vec<ServerKey> {
1244        self.clients.keys().cloned().collect()
1245    }
1246
1247    pub fn get_diagnostics_for_file(&self, file: &Path) -> Vec<&StoredDiagnostic> {
1248        let normalized = normalize_lookup_path(file);
1249        self.diagnostics.for_file(&normalized)
1250    }
1251
1252    pub fn get_diagnostics_for_directory(&self, dir: &Path) -> Vec<&StoredDiagnostic> {
1253        let normalized = normalize_lookup_path(dir);
1254        self.diagnostics.for_directory(&normalized)
1255    }
1256
1257    pub fn get_all_diagnostics(&self) -> Vec<&StoredDiagnostic> {
1258        self.diagnostics.all()
1259    }
1260
1261    fn drain_events_for_file(&mut self, file_path: &Path) -> bool {
1262        let mut saw_file_diagnostics = false;
1263        while let Ok(event) = self.event_rx.try_recv() {
1264            if matches!(
1265                self.handle_event(&event),
1266                Some(ref published_file) if published_file.as_path() == file_path
1267            ) {
1268                saw_file_diagnostics = true;
1269            }
1270        }
1271        saw_file_diagnostics
1272    }
1273
1274    fn handle_event(&mut self, event: &LspEvent) -> Option<PathBuf> {
1275        match event {
1276            LspEvent::Notification {
1277                server_kind,
1278                root,
1279                method,
1280                params: Some(params),
1281            } if method == "textDocument/publishDiagnostics" => {
1282                self.handle_publish_diagnostics(server_kind.clone(), root.clone(), params)
1283            }
1284            LspEvent::ServerExited { server_kind, root } => {
1285                let key = ServerKey {
1286                    kind: server_kind.clone(),
1287                    root: root.clone(),
1288                };
1289                self.clients.remove(&key);
1290                self.documents.remove(&key);
1291                self.diagnostics.clear_for_server(&key);
1292                None
1293            }
1294            _ => None,
1295        }
1296    }
1297
1298    fn handle_publish_diagnostics(
1299        &mut self,
1300        server: ServerKind,
1301        root: PathBuf,
1302        params: &serde_json::Value,
1303    ) -> Option<PathBuf> {
1304        if let Ok(publish_params) =
1305            serde_json::from_value::<lsp_types::PublishDiagnosticsParams>(params.clone())
1306        {
1307            let file = uri_to_path(&publish_params.uri)?;
1308            let stored = from_lsp_diagnostics(file.clone(), publish_params.diagnostics);
1309            // v0.17.3: store with real ServerKey { kind, root } and capture
1310            // the document `version` (when the server provided one) so the
1311            // post-edit waiter can reject stale publishes deterministically
1312            // via version-match (preferred) or epoch-delta (fallback). The
1313            // earlier `publish_with_kind` path silently dropped both.
1314            let key = ServerKey { kind: server, root };
1315            self.diagnostics
1316                .publish_full(key, file.clone(), stored, None, publish_params.version);
1317            return Some(file);
1318        }
1319        None
1320    }
1321
1322    fn spawn_server(
1323        &self,
1324        def: &ServerDef,
1325        root: &Path,
1326        config: &Config,
1327    ) -> Result<LspClient, LspError> {
1328        let binary = self.resolve_binary(def, config)?;
1329
1330        // Merge the server-defined env with our test-injected env.
1331        // `extra_env` is empty in production; tests use it to drive fake
1332        // server variants (AFT_FAKE_LSP_PULL=1, etc.).
1333        let mut merged_env = def.env.clone();
1334        for (key, value) in &self.extra_env {
1335            merged_env.insert(key.clone(), value.clone());
1336        }
1337
1338        let mut client = LspClient::spawn(
1339            def.kind.clone(),
1340            root.to_path_buf(),
1341            &binary,
1342            &def.args,
1343            &merged_env,
1344            self.event_tx.clone(),
1345            self.child_registry.clone(),
1346        )?;
1347        client.initialize(root, def.initialization_options.clone())?;
1348        Ok(client)
1349    }
1350
1351    fn resolve_binary(&self, def: &ServerDef, config: &Config) -> Result<PathBuf, LspError> {
1352        if let Some(path) = self.binary_overrides.get(&def.kind) {
1353            if path.exists() {
1354                return Ok(path.clone());
1355            }
1356            return Err(LspError::NotFound(format!(
1357                "override binary for {:?} not found: {}",
1358                def.kind,
1359                path.display()
1360            )));
1361        }
1362
1363        if let Some(path) = env_binary_override(&def.kind) {
1364            if path.exists() {
1365                return Ok(path);
1366            }
1367            return Err(LspError::NotFound(format!(
1368                "environment override binary for {:?} not found: {}",
1369                def.kind,
1370                path.display()
1371            )));
1372        }
1373
1374        // Layered resolution:
1375        //   1. <project_root>/node_modules/.bin/<binary>
1376        //   2. config.lsp_paths_extra (plugin auto-install cache, etc.)
1377        //   3. PATH via `which`
1378        resolve_lsp_binary(
1379            &def.binary,
1380            config.project_root.as_deref(),
1381            &config.lsp_paths_extra,
1382        )
1383        .ok_or_else(|| {
1384            LspError::NotFound(format!(
1385                "language server binary '{}' not found in node_modules/.bin, lsp_paths_extra, or PATH",
1386                def.binary
1387            ))
1388        })
1389    }
1390
1391    fn server_key_for_file(&self, file_path: &Path, config: &Config) -> Option<ServerKey> {
1392        for def in servers_for_file(file_path, config) {
1393            let root = find_workspace_root(file_path, &def.root_markers)?;
1394            let key = ServerKey {
1395                kind: def.kind.clone(),
1396                root,
1397            };
1398            if self.clients.contains_key(&key) {
1399                return Some(key);
1400            }
1401        }
1402        None
1403    }
1404}
1405
1406impl Default for LspManager {
1407    fn default() -> Self {
1408        Self::new()
1409    }
1410}
1411
1412fn canonicalize_for_lsp(file_path: &Path) -> Result<PathBuf, LspError> {
1413    std::fs::canonicalize(file_path).map_err(LspError::from)
1414}
1415
1416fn resolve_for_lsp_uri(file_path: &Path) -> PathBuf {
1417    if let Ok(path) = std::fs::canonicalize(file_path) {
1418        return path;
1419    }
1420
1421    let mut existing = file_path.to_path_buf();
1422    let mut missing = Vec::new();
1423    while !existing.exists() {
1424        let Some(name) = existing.file_name() else {
1425            break;
1426        };
1427        missing.push(name.to_owned());
1428        let Some(parent) = existing.parent() else {
1429            break;
1430        };
1431        existing = parent.to_path_buf();
1432    }
1433
1434    let mut resolved = std::fs::canonicalize(&existing).unwrap_or(existing);
1435    for segment in missing.into_iter().rev() {
1436        resolved.push(segment);
1437    }
1438    resolved
1439}
1440
1441fn language_id_for_extension(ext: &str) -> &'static str {
1442    match ext {
1443        "ts" => "typescript",
1444        "tsx" => "typescriptreact",
1445        "js" | "mjs" | "cjs" => "javascript",
1446        "jsx" => "javascriptreact",
1447        "py" | "pyi" => "python",
1448        "rs" => "rust",
1449        "go" => "go",
1450        "html" | "htm" => "html",
1451        _ => "plaintext",
1452    }
1453}
1454
1455fn normalize_lookup_path(path: &Path) -> PathBuf {
1456    std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1457}
1458
1459/// Classify an error returned by `spawn_server` into a structured
1460/// `ServerAttemptResult`. The two interesting cases for callers are:
1461/// - `BinaryNotInstalled` — the server's binary couldn't be resolved on PATH
1462///   or via override. The agent can be told "install bash-language-server".
1463/// - `SpawnFailed` — binary was found but spawning/initializing failed
1464///   (permissions, missing runtime, server crashed during initialize, etc.).
1465fn classify_spawn_error(binary: &str, err: &LspError) -> ServerAttemptResult {
1466    match err {
1467        // resolve_binary returns NotFound for both missing override paths and
1468        // missing PATH binaries. The "override missing" case is rare in
1469        // practice (only set in tests / env vars); we report all NotFound as
1470        // BinaryNotInstalled so the user sees an actionable install hint.
1471        LspError::NotFound(_) => ServerAttemptResult::BinaryNotInstalled {
1472            binary: binary.to_string(),
1473        },
1474        other => ServerAttemptResult::SpawnFailed {
1475            binary: binary.to_string(),
1476            reason: other.to_string(),
1477        },
1478    }
1479}
1480
1481fn env_binary_override(kind: &ServerKind) -> Option<PathBuf> {
1482    let id = kind.id_str();
1483    let suffix: String = id
1484        .chars()
1485        .map(|ch| {
1486            if ch.is_ascii_alphanumeric() {
1487                ch.to_ascii_uppercase()
1488            } else {
1489                '_'
1490            }
1491        })
1492        .collect();
1493    let key = format!("AFT_LSP_{suffix}_BINARY");
1494    std::env::var_os(key).map(PathBuf::from)
1495}