Skip to main content

aft/lsp/
manager.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::str::FromStr;
4
5use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
6use lsp_types::notification::{
7    DidChangeTextDocument, DidChangeWatchedFiles, DidCloseTextDocument, DidOpenTextDocument,
8};
9use lsp_types::{
10    DidChangeTextDocumentParams, DidChangeWatchedFilesParams, DidCloseTextDocumentParams,
11    DidOpenTextDocumentParams, FileChangeType, FileEvent, TextDocumentContentChangeEvent,
12    TextDocumentIdentifier, TextDocumentItem, VersionedTextDocumentIdentifier,
13};
14
15use crate::config::Config;
16use crate::lsp::child_registry::LspChildRegistry;
17use crate::lsp::client::{LspClient, LspEvent, ServerState};
18use crate::lsp::diagnostics::{
19    from_lsp_diagnostics, DiagnosticEntry, DiagnosticsStore, StoredDiagnostic,
20};
21use crate::lsp::document::DocumentStore;
22use crate::lsp::registry::{resolve_lsp_binary, servers_for_file, ServerDef, ServerKind};
23use crate::lsp::roots::{find_workspace_root, ServerKey};
24use crate::lsp::LspError;
25use crate::slog_error;
26
27/// Outcome of attempting to ensure a server is running for a single matching
28/// `ServerDef`. Returned per matching server so the caller can report exactly
29/// what happened to the user instead of collapsing all failures into "no
30/// server".
31#[derive(Debug, Clone)]
32pub enum ServerAttemptResult {
33    /// Server is running and ready to serve requests for this file.
34    Ok { server_key: ServerKey },
35    /// No workspace root was found by walking up from the file looking for
36    /// any of the server's configured root markers.
37    NoRootMarker { looked_for: Vec<String> },
38    /// The server's binary could not be found on PATH (or override was
39    /// missing/invalid).
40    BinaryNotInstalled { binary: String },
41    /// Binary was found but spawning or initializing the server failed.
42    SpawnFailed { binary: String, reason: String },
43}
44
45/// One server's attempt to handle a file.
46#[derive(Debug, Clone)]
47pub struct ServerAttempt {
48    /// Stable server identifier (kind ID, e.g. "pyright", "rust-analyzer").
49    pub server_id: String,
50    /// Server display name from the registry.
51    pub server_name: String,
52    pub result: ServerAttemptResult,
53}
54
55/// Aggregate outcome of `ensure_server_for_file_detailed`. Distinguishes:
56/// - "No server registered for this file's extension" (`attempts.is_empty()`)
57/// - "Servers registered but none could start" (`successful.is_empty()` but
58///   `!attempts.is_empty()`)
59/// - "At least one server is ready" (`!successful.is_empty()`)
60#[derive(Debug, Clone, Default)]
61pub struct EnsureServerOutcomes {
62    /// Server keys that are now running and ready to serve requests.
63    pub successful: Vec<ServerKey>,
64    /// Per-server attempt records. Empty if no server is registered for the
65    /// file's extension.
66    pub attempts: Vec<ServerAttempt>,
67}
68
69impl EnsureServerOutcomes {
70    /// True if no server in the registry matched this file's extension.
71    pub fn no_server_registered(&self) -> bool {
72        self.attempts.is_empty()
73    }
74}
75
76/// Outcome of a post-edit diagnostics wait. Reports the per-server status
77/// alongside the fresh diagnostics, so the response layer can build an
78/// honest tri-state payload (`success: true` + `complete: bool` + named
79/// gap fields per `crates/aft/src/protocol.rs`).
80///
81/// `diagnostics` only contains entries from servers that proved freshness
82/// (version-match preferred, epoch-fallback for unversioned servers).
83/// Pre-edit cached entries are NEVER included — that's the whole point of
84/// this type.
85#[derive(Debug, Clone, Default)]
86pub struct PostEditWaitOutcome {
87    /// Diagnostics from servers whose response we verified is FOR the
88    /// post-edit document version (or whose epoch we saw advance after our
89    /// pre-edit snapshot, for unversioned servers).
90    pub diagnostics: Vec<StoredDiagnostic>,
91    /// Servers we expected to publish but didn't before the deadline.
92    /// Reported to the agent via `pending_lsp_servers` so they understand
93    /// the result is partial.
94    pub pending_servers: Vec<ServerKey>,
95    /// Servers whose process exited between notification and deadline.
96    /// Reported separately so the agent knows the gap is unrecoverable
97    /// without a server restart, not "wait longer."
98    pub exited_servers: Vec<ServerKey>,
99}
100
101/// Pre-edit freshness snapshot for one server/file pair.
102#[derive(Debug, Clone, Copy, Default)]
103pub struct PreEditSnapshot {
104    pub epoch: u64,
105    pub document_version_at_capture: Option<i32>,
106}
107
108pub fn post_edit_entry_is_fresh(
109    entry: &DiagnosticEntry,
110    target_version: i32,
111    pre: PreEditSnapshot,
112) -> bool {
113    if entry.epoch <= pre.epoch {
114        return false;
115    }
116
117    match entry.version {
118        Some(version) => version >= target_version,
119        None => true,
120    }
121}
122
123impl PostEditWaitOutcome {
124    /// True if every expected server reported a fresh result. False means
125    /// the agent should treat the diagnostics as a partial picture.
126    pub fn complete(&self) -> bool {
127        self.pending_servers.is_empty() && self.exited_servers.is_empty()
128    }
129}
130
131/// Per-server outcome of a `textDocument/diagnostic` (per-file pull) request.
132#[derive(Debug, Clone)]
133pub enum PullFileOutcome {
134    /// Server returned a full report; diagnostics stored.
135    Full { diagnostic_count: usize },
136    /// Server returned `kind: "unchanged"` — cached diagnostics still valid.
137    Unchanged,
138    /// Server returned a partial-result token; we don't subscribe to streamed
139    /// progress so the response is treated as a soft empty until the next pull.
140    PartialNotSupported,
141    /// Server doesn't advertise pull capability — caller should fall back to
142    /// push diagnostics for this server.
143    PullNotSupported,
144    /// The pull request failed (timeout, server error, etc.).
145    RequestFailed { reason: String },
146}
147
148/// Result of `pull_file_diagnostics` for one matching server.
149#[derive(Debug, Clone)]
150pub struct PullFileResult {
151    pub server_key: ServerKey,
152    pub outcome: PullFileOutcome,
153}
154
155/// Result of `pull_workspace_diagnostics` for a single server.
156#[derive(Debug, Clone)]
157pub struct PullWorkspaceResult {
158    pub server_key: ServerKey,
159    /// Files for which a Full report was received and cached. Files that came
160    /// back as `Unchanged` are NOT listed here because their cached entry was
161    /// already authoritative.
162    pub files_reported: Vec<PathBuf>,
163    /// True if the server returned a full response within the timeout.
164    pub complete: bool,
165    /// True if we cancelled (request timed out before the server responded).
166    pub cancelled: bool,
167    /// True if the server advertised workspace pull support. When false, the
168    /// other fields are empty and the caller should fall back to file-mode
169    /// pull or to push semantics.
170    pub supports_workspace: bool,
171}
172
173pub struct LspManager {
174    /// Active server instances, keyed by (ServerKind, workspace_root).
175    clients: HashMap<ServerKey, LspClient>,
176    /// Tracks opened documents and versions per active server.
177    documents: HashMap<ServerKey, DocumentStore>,
178    /// Stored publishDiagnostics payloads across all servers.
179    diagnostics: DiagnosticsStore,
180    /// Unified event channel — all server reader threads send here.
181    event_tx: Sender<LspEvent>,
182    event_rx: Receiver<LspEvent>,
183    /// Optional binary path overrides used by integration tests.
184    binary_overrides: HashMap<ServerKind, PathBuf>,
185    /// Extra env vars merged into every spawned LSP child. Used in tests to
186    /// drive the fake server's behavioral variants (`AFT_FAKE_LSP_PULL=1`,
187    /// `AFT_FAKE_LSP_WORKSPACE=1`, etc.). Production code does not set this.
188    extra_env: HashMap<String, String>,
189    /// Per-(kind,root) cache of spawn failures. Once a server fails to spawn
190    /// for a workspace root, we remember why and skip subsequent attempts for
191    /// the lifetime of this AFT process. Without this, every file open or
192    /// didChange retries `spawn_server` and logs a fresh ERROR — visible as
193    /// repeated `failed to spawn TypeScript Language Server: Could not find a
194    /// valid TypeScript installation` lines per edit.
195    ///
196    /// Entries are NEVER evicted automatically. The expected recovery path is
197    /// for the user to fix their environment (install the missing binary or
198    /// add a `tsconfig.json` / `package.json` with the right dependency) and
199    /// restart OpenCode/Pi, which spawns a fresh `aft` process with an empty
200    /// cache. We deliberately don't auto-retry on file events: the failure
201    /// modes we track here (binary not installed, init handshake failure)
202    /// don't fix themselves at runtime.
203    failed_spawns: HashMap<ServerKey, ServerAttemptResult>,
204    /// Server/root pairs for which we already logged that watched-file
205    /// notifications are skipped because the capability is absent.
206    watched_file_skip_logged: HashSet<ServerKey>,
207    /// Tracks PIDs of spawned LSP child processes so the signal handler can
208    /// kill them on SIGTERM/SIGINT before aft exits, preventing orphans.
209    /// Defaults to empty; production wires this from `AppContext`.
210    child_registry: LspChildRegistry,
211}
212
213impl LspManager {
214    pub fn new() -> Self {
215        let (event_tx, event_rx) = unbounded();
216        Self {
217            clients: HashMap::new(),
218            documents: HashMap::new(),
219            diagnostics: DiagnosticsStore::new(),
220            event_tx,
221            event_rx,
222            binary_overrides: HashMap::new(),
223            extra_env: HashMap::new(),
224            failed_spawns: HashMap::new(),
225            watched_file_skip_logged: HashSet::new(),
226            child_registry: LspChildRegistry::new(),
227        }
228    }
229
230    /// Set the child-PID registry. Must be called before any servers spawn.
231    pub fn set_child_registry(&mut self, registry: LspChildRegistry) {
232        self.child_registry = registry;
233    }
234
235    /// For testing: set an extra environment variable that gets passed to
236    /// every spawned LSP child process. Useful for driving fake-server
237    /// behavioral variants in integration tests.
238    pub fn set_extra_env(&mut self, key: &str, value: &str) {
239        self.extra_env.insert(key.to_string(), value.to_string());
240    }
241
242    /// Count active LSP server instances.
243    pub fn server_count(&self) -> usize {
244        self.clients.len()
245    }
246
247    /// For testing: override the binary for a server kind.
248    pub fn override_binary(&mut self, kind: ServerKind, binary_path: PathBuf) {
249        self.binary_overrides.insert(kind, binary_path);
250    }
251
252    /// Ensure a server is running for the given file. Spawns if needed.
253    /// Returns the active server keys for the file, or an empty vec if none match.
254    ///
255    /// This is the lightweight wrapper around [`ensure_server_for_file_detailed`]
256    /// that drops failure context. Prefer the detailed variant in command
257    /// handlers that need to surface honest error messages to the agent.
258    pub fn ensure_server_for_file(&mut self, file_path: &Path, config: &Config) -> Vec<ServerKey> {
259        self.ensure_server_for_file_detailed(file_path, config)
260            .successful
261    }
262
263    /// Detailed version of [`ensure_server_for_file`] that records every
264    /// matching server's outcome (`Ok` / `NoRootMarker` / `BinaryNotInstalled`
265    /// / `SpawnFailed`).
266    ///
267    /// Use this when the caller wants to honestly report _why_ a file has no
268    /// active server (e.g., to surface "bash-language-server not on PATH" to
269    /// the agent instead of silently returning `total: 0`).
270    pub fn ensure_server_for_file_detailed(
271        &mut self,
272        file_path: &Path,
273        config: &Config,
274    ) -> EnsureServerOutcomes {
275        let defs = servers_for_file(file_path, config);
276        let mut outcomes = EnsureServerOutcomes::default();
277
278        for def in defs {
279            let server_id = def.kind.id_str().to_string();
280            let server_name = def.name.to_string();
281
282            let Some(root) = find_workspace_root(file_path, &def.root_markers) else {
283                outcomes.attempts.push(ServerAttempt {
284                    server_id,
285                    server_name,
286                    result: ServerAttemptResult::NoRootMarker {
287                        looked_for: def.root_markers.iter().map(|s| s.to_string()).collect(),
288                    },
289                });
290                continue;
291            };
292
293            let key = ServerKey {
294                kind: def.kind.clone(),
295                root,
296            };
297
298            if !self.clients.contains_key(&key) {
299                // If we already tried and failed to spawn this server for this
300                // root, return the cached classification without retrying or
301                // re-logging. This prevents per-edit ERROR spam when the user's
302                // environment is missing a dependency the LSP needs (the
303                // typescript-language-server "Could not find a valid TypeScript
304                // installation" case is the canonical example).
305                if let Some(cached) = self.failed_spawns.get(&key) {
306                    outcomes.attempts.push(ServerAttempt {
307                        server_id,
308                        server_name,
309                        result: cached.clone(),
310                    });
311                    continue;
312                }
313
314                match self.spawn_server(&def, &key.root, config) {
315                    Ok(client) => {
316                        self.clients.insert(key.clone(), client);
317                        self.documents.entry(key.clone()).or_default();
318                    }
319                    Err(err) => {
320                        slog_error!("failed to spawn {}: {}", def.name, err);
321                        let result = classify_spawn_error(&def.binary, &err);
322                        // Remember the failure so subsequent file events skip
323                        // this (kind, root) pair instead of producing a fresh
324                        // spawn attempt + ERROR log per request.
325                        self.failed_spawns.insert(key.clone(), result.clone());
326                        outcomes.attempts.push(ServerAttempt {
327                            server_id,
328                            server_name,
329                            result,
330                        });
331                        continue;
332                    }
333                }
334            }
335
336            outcomes.attempts.push(ServerAttempt {
337                server_id,
338                server_name,
339                result: ServerAttemptResult::Ok {
340                    server_key: key.clone(),
341                },
342            });
343            outcomes.successful.push(key);
344        }
345
346        outcomes
347    }
348
349    /// Ensure a server is running using the default LSP registry.
350    /// Kept for integration tests that exercise built-in server helpers directly.
351    pub fn ensure_server_for_file_default(&mut self, file_path: &Path) -> Vec<ServerKey> {
352        self.ensure_server_for_file(file_path, &Config::default())
353    }
354    /// Ensure that servers are running for the file and that the document is open
355    /// in each server's DocumentStore. Reads file content from disk if not already open.
356    /// Returns the server keys for the file.
357    pub fn ensure_file_open(
358        &mut self,
359        file_path: &Path,
360        config: &Config,
361    ) -> Result<Vec<ServerKey>, LspError> {
362        let canonical_path = canonicalize_for_lsp(file_path)?;
363        let server_keys = self.ensure_server_for_file(&canonical_path, config);
364        if server_keys.is_empty() {
365            return Ok(server_keys);
366        }
367
368        let uri = uri_for_path(&canonical_path)?;
369        let language_id = language_id_for_extension(
370            canonical_path
371                .extension()
372                .and_then(|ext| ext.to_str())
373                .unwrap_or_default(),
374        )
375        .to_string();
376
377        for key in &server_keys {
378            let already_open = self
379                .documents
380                .get(key)
381                .is_some_and(|store| store.is_open(&canonical_path));
382
383            if !already_open {
384                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
385                if let Some(client) = self.clients.get_mut(key) {
386                    client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
387                        text_document: TextDocumentItem::new(
388                            uri.clone(),
389                            language_id.clone(),
390                            0,
391                            content,
392                        ),
393                    })?;
394                }
395                self.documents
396                    .entry(key.clone())
397                    .or_default()
398                    .open(canonical_path.clone());
399                continue;
400            }
401
402            // Document is already open. Check disk drift — if the file has
403            // been modified outside the AFT pipeline (other tool, manual
404            // edit, sibling session) we MUST send a didChange before any
405            // pull-diagnostic / hover query, otherwise the LSP server
406            // returns results computed from stale in-memory content.
407            //
408            // This is the regression fix Oracle flagged in finding #6:
409            // "ensure_file_open skips already-open files without checking
410            // if disk content changed."
411            let drifted = self
412                .documents
413                .get(key)
414                .is_some_and(|store| store.is_stale_on_disk(&canonical_path));
415            if drifted {
416                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
417                let next_version = self
418                    .documents
419                    .get(key)
420                    .and_then(|store| store.version(&canonical_path))
421                    .map(|v| v + 1)
422                    .unwrap_or(1);
423                if let Some(client) = self.clients.get_mut(key) {
424                    client.send_notification::<DidChangeTextDocument>(
425                        DidChangeTextDocumentParams {
426                            text_document: VersionedTextDocumentIdentifier::new(
427                                uri.clone(),
428                                next_version,
429                            ),
430                            content_changes: vec![TextDocumentContentChangeEvent {
431                                range: None,
432                                range_length: None,
433                                text: content,
434                            }],
435                        },
436                    )?;
437                }
438                if let Some(store) = self.documents.get_mut(key) {
439                    store.bump_version(&canonical_path);
440                }
441            }
442        }
443
444        Ok(server_keys)
445    }
446
447    pub fn ensure_file_open_default(
448        &mut self,
449        file_path: &Path,
450    ) -> Result<Vec<ServerKey>, LspError> {
451        self.ensure_file_open(file_path, &Config::default())
452    }
453
454    /// Notify relevant LSP servers that a file has been written/changed.
455    /// This is the main hook called after every file write in AFT.
456    ///
457    /// If the file's server isn't running yet, starts it (lazy spawn).
458    /// If the file isn't open in LSP yet, sends didOpen. Otherwise sends didChange.
459    pub fn notify_file_changed(
460        &mut self,
461        file_path: &Path,
462        content: &str,
463        config: &Config,
464    ) -> Result<(), LspError> {
465        self.notify_file_changed_versioned(file_path, content, config)
466            .map(|_| ())
467    }
468
469    /// Like `notify_file_changed`, but returns the target document version
470    /// per server so the post-edit waiter can match `publishDiagnostics`
471    /// against the exact version that this notification carried.
472    ///
473    /// Returns: `Vec<(ServerKey, target_version)>`. `target_version` is the
474    /// `version` field on the `VersionedTextDocumentIdentifier` we just sent
475    /// (post-bump). For freshly-opened documents (`didOpen`) the version is
476    /// `0`. Servers that don't honor versioned text document sync will not
477    /// echo this back on `publishDiagnostics`; the caller is expected to
478    /// fall back to the epoch-delta path for those.
479    pub fn notify_file_changed_versioned(
480        &mut self,
481        file_path: &Path,
482        content: &str,
483        config: &Config,
484    ) -> Result<Vec<(ServerKey, i32)>, LspError> {
485        let canonical_path = canonicalize_for_lsp(file_path)?;
486        let server_keys = self.ensure_server_for_file(&canonical_path, config);
487        if server_keys.is_empty() {
488            return Ok(Vec::new());
489        }
490
491        let uri = uri_for_path(&canonical_path)?;
492        let language_id = language_id_for_extension(
493            canonical_path
494                .extension()
495                .and_then(|ext| ext.to_str())
496                .unwrap_or_default(),
497        )
498        .to_string();
499
500        let mut versions: Vec<(ServerKey, i32)> = Vec::with_capacity(server_keys.len());
501
502        for key in server_keys {
503            let current_version = self
504                .documents
505                .get(&key)
506                .and_then(|store| store.version(&canonical_path));
507
508            if let Some(version) = current_version {
509                let next_version = version + 1;
510                if let Some(client) = self.clients.get_mut(&key) {
511                    client.send_notification::<DidChangeTextDocument>(
512                        DidChangeTextDocumentParams {
513                            text_document: VersionedTextDocumentIdentifier::new(
514                                uri.clone(),
515                                next_version,
516                            ),
517                            content_changes: vec![TextDocumentContentChangeEvent {
518                                range: None,
519                                range_length: None,
520                                text: content.to_string(),
521                            }],
522                        },
523                    )?;
524                }
525                if let Some(store) = self.documents.get_mut(&key) {
526                    store.bump_version(&canonical_path);
527                }
528                versions.push((key, next_version));
529                continue;
530            }
531
532            if let Some(client) = self.clients.get_mut(&key) {
533                client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
534                    text_document: TextDocumentItem::new(
535                        uri.clone(),
536                        language_id.clone(),
537                        0,
538                        content.to_string(),
539                    ),
540                })?;
541            }
542            self.documents
543                .entry(key.clone())
544                .or_default()
545                .open(canonical_path.clone());
546            // didOpen carries version 0 — that's the version the server
547            // will echo on its first publishDiagnostics for this document.
548            versions.push((key, 0));
549        }
550
551        Ok(versions)
552    }
553
554    pub fn notify_file_changed_default(
555        &mut self,
556        file_path: &Path,
557        content: &str,
558    ) -> Result<(), LspError> {
559        self.notify_file_changed(file_path, content, &Config::default())
560    }
561
562    /// Notify every active server whose workspace contains at least one changed
563    /// path that watched files changed. This is intentionally workspace-scoped
564    /// rather than extension-scoped: configuration edits such as `package.json`
565    /// or `tsconfig.json` affect a server's project graph even though those
566    /// files may not be documents handled by the server itself.
567    pub fn notify_files_watched_changed(
568        &mut self,
569        paths: &[(PathBuf, FileChangeType)],
570        _config: &Config,
571    ) -> Result<(), LspError> {
572        if paths.is_empty() {
573            return Ok(());
574        }
575
576        let mut canonical_events = Vec::with_capacity(paths.len());
577        for (path, typ) in paths {
578            let canonical_path = resolve_for_lsp_uri(path);
579            canonical_events.push((canonical_path, *typ));
580        }
581
582        let keys: Vec<ServerKey> = self.clients.keys().cloned().collect();
583        for key in keys {
584            let mut changes = Vec::new();
585            for (path, typ) in &canonical_events {
586                if !path.starts_with(&key.root) {
587                    continue;
588                }
589                changes.push(FileEvent::new(uri_for_path(path)?, *typ));
590            }
591
592            if changes.is_empty() {
593                continue;
594            }
595
596            if let Some(client) = self.clients.get_mut(&key) {
597                // Only send if the server advertised this capability (#32).
598                // Sending didChangeWatchedFiles to a server that didn't declare
599                // workspace.didChangeWatchedFiles causes spurious errors on some
600                // servers (e.g. older tsserver builds) and is a spec violation.
601                if !client.supports_watched_files() {
602                    if self.watched_file_skip_logged.insert(key.clone()) {
603                        log::debug!(
604                            "skipping didChangeWatchedFiles for {:?} (capability not declared)",
605                            key
606                        );
607                    }
608                    continue;
609                }
610                client.send_notification::<DidChangeWatchedFiles>(DidChangeWatchedFilesParams {
611                    changes,
612                })?;
613            }
614        }
615
616        Ok(())
617    }
618
619    /// Close a document in all servers that have it open.
620    pub fn notify_file_closed(&mut self, file_path: &Path) -> Result<(), LspError> {
621        let canonical_path = canonicalize_for_lsp(file_path)?;
622        let uri = uri_for_path(&canonical_path)?;
623        let keys: Vec<ServerKey> = self.documents.keys().cloned().collect();
624
625        for key in keys {
626            let was_open = self
627                .documents
628                .get(&key)
629                .map(|store| store.is_open(&canonical_path))
630                .unwrap_or(false);
631            if !was_open {
632                continue;
633            }
634
635            if let Some(client) = self.clients.get_mut(&key) {
636                client.send_notification::<DidCloseTextDocument>(DidCloseTextDocumentParams {
637                    text_document: TextDocumentIdentifier::new(uri.clone()),
638                })?;
639            }
640
641            if let Some(store) = self.documents.get_mut(&key) {
642                store.close(&canonical_path);
643            }
644        }
645
646        Ok(())
647    }
648
649    /// Get an active client for a file path, if one exists.
650    pub fn client_for_file(&self, file_path: &Path, config: &Config) -> Option<&LspClient> {
651        let key = self.server_key_for_file(file_path, config)?;
652        self.clients.get(&key)
653    }
654
655    pub fn client_for_file_default(&self, file_path: &Path) -> Option<&LspClient> {
656        self.client_for_file(file_path, &Config::default())
657    }
658
659    /// Get a mutable active client for a file path, if one exists.
660    pub fn client_for_file_mut(
661        &mut self,
662        file_path: &Path,
663        config: &Config,
664    ) -> Option<&mut LspClient> {
665        let key = self.server_key_for_file(file_path, config)?;
666        self.clients.get_mut(&key)
667    }
668
669    pub fn client_for_file_mut_default(&mut self, file_path: &Path) -> Option<&mut LspClient> {
670        self.client_for_file_mut(file_path, &Config::default())
671    }
672
673    /// Number of tracked server clients.
674    pub fn active_client_count(&self) -> usize {
675        self.clients.len()
676    }
677
678    /// Drain all pending LSP events. Call from the main loop.
679    pub fn drain_events(&mut self) -> Vec<LspEvent> {
680        let mut events = Vec::new();
681        while let Ok(event) = self.event_rx.try_recv() {
682            self.handle_event(&event);
683            events.push(event);
684        }
685        events
686    }
687
688    /// Wait for diagnostics to arrive for a specific file until a timeout expires.
689    pub fn wait_for_diagnostics(
690        &mut self,
691        file_path: &Path,
692        config: &Config,
693        timeout: std::time::Duration,
694    ) -> Vec<StoredDiagnostic> {
695        let deadline = std::time::Instant::now() + timeout;
696        self.wait_for_file_diagnostics(file_path, config, deadline)
697    }
698
699    pub fn wait_for_diagnostics_default(
700        &mut self,
701        file_path: &Path,
702        timeout: std::time::Duration,
703    ) -> Vec<StoredDiagnostic> {
704        self.wait_for_diagnostics(file_path, &Config::default(), timeout)
705    }
706
707    /// Test-only accessor for the diagnostics store. Used by integration
708    /// tests that need to inspect per-server entries (e.g., to verify that
709    /// `ServerKey::root` is populated correctly, not the empty path that
710    /// the legacy `publish_with_kind` path produced).
711    #[doc(hidden)]
712    pub fn diagnostics_store_for_test(&self) -> &DiagnosticsStore {
713        &self.diagnostics
714    }
715
716    /// Snapshot the current per-server epoch for every entry that exists
717    /// for `file_path`. Servers without an entry yet (never published)
718    /// are absent from the map; for those, `pre = 0` (any first publish
719    /// will be considered fresh under the epoch-fallback rule).
720    pub fn snapshot_diagnostic_epochs(&self, file_path: &Path) -> HashMap<ServerKey, u64> {
721        let lookup_path = normalize_lookup_path(file_path);
722        self.diagnostics
723            .entries_for_file(&lookup_path)
724            .into_iter()
725            .map(|(key, entry)| (key.clone(), entry.epoch))
726            .collect()
727    }
728
729    /// Snapshot the current diagnostic epoch and document version for every
730    /// active server relevant to `file_path` before a post-edit notification.
731    pub fn snapshot_pre_edit_state(&self, file_path: &Path) -> HashMap<ServerKey, PreEditSnapshot> {
732        let lookup_path = normalize_lookup_path(file_path);
733        let mut snapshots: HashMap<ServerKey, PreEditSnapshot> = self
734            .diagnostics
735            .entries_for_file(&lookup_path)
736            .into_iter()
737            .map(|(key, entry)| {
738                (
739                    key.clone(),
740                    PreEditSnapshot {
741                        epoch: entry.epoch,
742                        document_version_at_capture: None,
743                    },
744                )
745            })
746            .collect();
747
748        for (key, store) in &self.documents {
749            if let Some(version) = store.version(&lookup_path) {
750                snapshots
751                    .entry(key.clone())
752                    .or_default()
753                    .document_version_at_capture = Some(version);
754            }
755        }
756
757        snapshots
758    }
759
760    /// Wait for FRESH per-server diagnostics that match the just-sent
761    /// document version. This is the v0.17.3 post-edit path that fixes the
762    /// stale-diagnostics bug: instead of returning whatever is in the cache
763    /// when the deadline hits, we only return entries whose `version`
764    /// matches the post-edit target version (or, for servers that don't
765    /// participate in versioned sync, whose `epoch` was bumped after the
766    /// pre-edit snapshot).
767    ///
768    /// `expected_versions` should come from `notify_file_changed_versioned`
769    /// — one `(ServerKey, target_version)` per server we sent didChange/
770    /// didOpen to.
771    ///
772    /// `pre_snapshot` is the per-server epoch BEFORE the notification was
773    /// sent; it gates the epoch-fallback path so an old-version publish
774    /// arriving after `drain_events` and before `didChange` cannot be
775    /// mistaken for a fresh response.
776    ///
777    /// Returns a per-server tri-state: `Fresh` (publish matched target
778    /// version OR epoch advanced past snapshot for an unversioned server),
779    /// `Pending` (deadline hit before this server published anything we
780    /// could verify), or `Exited` (server died between notification and
781    /// deadline).
782    pub fn wait_for_post_edit_diagnostics(
783        &mut self,
784        file_path: &Path,
785        // `config` is intentionally accepted (matches sibling wait APIs and
786        // future-proofs us if freshness rules need it). Currently unused
787        // because expected_versions/pre_snapshot fully determine behavior.
788        _config: &Config,
789        expected_versions: &[(ServerKey, i32)],
790        pre_snapshot: &HashMap<ServerKey, PreEditSnapshot>,
791        timeout: std::time::Duration,
792    ) -> PostEditWaitOutcome {
793        let lookup_path = normalize_lookup_path(file_path);
794        let deadline = std::time::Instant::now() + timeout;
795
796        // Drain any events that arrived while we were sending didChange.
797        // The publishDiagnostics handler stores the version, so even
798        // pre-snapshot publishes that landed late won't be mistaken for
799        // fresh — the version-match check will reject them.
800        let _ = self.drain_events_for_file(&lookup_path);
801
802        let mut fresh: HashMap<ServerKey, Vec<StoredDiagnostic>> = HashMap::new();
803        let mut exited: Vec<ServerKey> = Vec::new();
804
805        loop {
806            // Check freshness for every expected server. A server is fresh
807            // if its current entry for this file satisfies either:
808            //   1. version-match: entry.version == Some(target_version), OR
809            //   2. push-only freshness: entry.version is None AND entry.epoch
810            //      advanced strictly after the pre-edit snapshot. Versioned
811            //      publishes must be >= the post-edit target version.
812            // Servers whose process has exited are reported separately.
813            for (key, target_version) in expected_versions {
814                if fresh.contains_key(key) || exited.contains(key) {
815                    continue;
816                }
817                if !self.clients.contains_key(key) {
818                    exited.push(key.clone());
819                    continue;
820                }
821                if let Some(entry) = self
822                    .diagnostics
823                    .entries_for_file(&lookup_path)
824                    .into_iter()
825                    .find_map(|(k, e)| if k == key { Some(e) } else { None })
826                {
827                    let pre = pre_snapshot.get(key).copied().unwrap_or_default();
828                    let is_fresh = post_edit_entry_is_fresh(entry, *target_version, pre);
829                    if is_fresh {
830                        fresh.insert(key.clone(), entry.diagnostics.clone());
831                    }
832                }
833            }
834
835            // All accounted for? Done.
836            if fresh.len() + exited.len() == expected_versions.len() {
837                break;
838            }
839
840            let now = std::time::Instant::now();
841            if now >= deadline {
842                break;
843            }
844
845            let timeout = deadline.saturating_duration_since(now);
846            match self.event_rx.recv_timeout(timeout) {
847                Ok(event) => {
848                    self.handle_event(&event);
849                }
850                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
851            }
852        }
853
854        // Pending = expected but neither fresh nor exited.
855        let pending: Vec<ServerKey> = expected_versions
856            .iter()
857            .filter(|(k, _)| !fresh.contains_key(k) && !exited.contains(k))
858            .map(|(k, _)| k.clone())
859            .collect();
860
861        // Build deduplicated, sorted diagnostics from the fresh servers only.
862        // Stale or pending servers contribute zero diagnostics.
863        let mut diagnostics: Vec<StoredDiagnostic> = fresh
864            .into_iter()
865            .flat_map(|(_, diags)| diags.into_iter())
866            .collect();
867        diagnostics.sort_by(|a, b| {
868            a.file
869                .cmp(&b.file)
870                .then(a.line.cmp(&b.line))
871                .then(a.column.cmp(&b.column))
872                .then(a.message.cmp(&b.message))
873        });
874
875        PostEditWaitOutcome {
876            diagnostics,
877            pending_servers: pending,
878            exited_servers: exited,
879        }
880    }
881
882    /// Wait for diagnostics to arrive for a specific file until a deadline.
883    ///
884    /// Drains already-queued events first, then blocks on the shared event
885    /// channel only until either `publishDiagnostics` arrives for this file or
886    /// the deadline is reached.
887    pub fn wait_for_file_diagnostics(
888        &mut self,
889        file_path: &Path,
890        config: &Config,
891        deadline: std::time::Instant,
892    ) -> Vec<StoredDiagnostic> {
893        let lookup_path = normalize_lookup_path(file_path);
894
895        if self.server_key_for_file(&lookup_path, config).is_none() {
896            return Vec::new();
897        }
898
899        loop {
900            if self.drain_events_for_file(&lookup_path) {
901                break;
902            }
903
904            let now = std::time::Instant::now();
905            if now >= deadline {
906                break;
907            }
908
909            let timeout = deadline.saturating_duration_since(now);
910            match self.event_rx.recv_timeout(timeout) {
911                Ok(event) => {
912                    if matches!(
913                        self.handle_event(&event),
914                        Some(ref published_file) if published_file.as_path() == lookup_path.as_path()
915                    ) {
916                        break;
917                    }
918                }
919                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
920            }
921        }
922
923        self.get_diagnostics_for_file(&lookup_path)
924            .into_iter()
925            .cloned()
926            .collect()
927    }
928
929    /// Default timeout for `textDocument/diagnostic` (per-file pull). Servers
930    /// usually respond in under 1s for files they've already analyzed; we
931    /// allow up to 10s before falling back to push semantics. Currently
932    /// surfaced via [`Self::pull_file_timeout`] for callers that want to
933    /// override the wait via the `wait_ms` knob.
934    pub const PULL_FILE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
935
936    /// Public accessor so command handlers can reuse the documented default.
937    pub fn pull_file_timeout() -> std::time::Duration {
938        Self::PULL_FILE_TIMEOUT
939    }
940
941    /// Default timeout for `workspace/diagnostic`. The LSP spec allows the
942    /// server to hold this open indefinitely; we cap at 10s and report
943    /// `complete: false` to the agent rather than hanging the bridge.
944    const PULL_WORKSPACE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
945
946    /// Issue a `textDocument/diagnostic` (LSP 3.17 per-file pull) request to
947    /// every server that supports pull diagnostics for the given file.
948    ///
949    /// Returns the per-server outcome. If a server reports `kind: "unchanged"`,
950    /// the cached entry's diagnostics are surfaced (deterministic re-use of
951    /// the previous response). If a server doesn't advertise pull capability,
952    /// it's skipped here — the caller should fall back to push for those.
953    ///
954    /// Side effects: results are stored in `DiagnosticsStore` so directory-mode
955    /// queries can aggregate them later.
956    pub fn pull_file_diagnostics(
957        &mut self,
958        file_path: &Path,
959        config: &Config,
960    ) -> Result<Vec<PullFileResult>, LspError> {
961        let canonical_path = canonicalize_for_lsp(file_path)?;
962        // Make sure servers are running and the document is open with fresh
963        // content (handles disk-drift via DocumentStore::is_stale_on_disk).
964        self.ensure_file_open(&canonical_path, config)?;
965
966        let server_keys = self.ensure_server_for_file(&canonical_path, config);
967        if server_keys.is_empty() {
968            return Ok(Vec::new());
969        }
970
971        let uri = uri_for_path(&canonical_path)?;
972        let mut results = Vec::with_capacity(server_keys.len());
973
974        for key in server_keys {
975            let supports_pull = self
976                .clients
977                .get(&key)
978                .and_then(|c| c.diagnostic_capabilities())
979                .is_some_and(|caps| caps.pull_diagnostics);
980
981            if !supports_pull {
982                results.push(PullFileResult {
983                    server_key: key.clone(),
984                    outcome: PullFileOutcome::PullNotSupported,
985                });
986                continue;
987            }
988
989            // Look up previous resultId for incremental requests.
990            let previous_result_id = self
991                .diagnostics
992                .entries_for_file(&canonical_path)
993                .into_iter()
994                .find(|(k, _)| **k == key)
995                .and_then(|(_, entry)| entry.result_id.clone());
996
997            let identifier = self
998                .clients
999                .get(&key)
1000                .and_then(|c| c.diagnostic_capabilities())
1001                .and_then(|caps| caps.identifier.clone());
1002
1003            let params = lsp_types::DocumentDiagnosticParams {
1004                text_document: lsp_types::TextDocumentIdentifier { uri: uri.clone() },
1005                identifier,
1006                previous_result_id,
1007                work_done_progress_params: Default::default(),
1008                partial_result_params: Default::default(),
1009            };
1010
1011            let outcome = match self.send_pull_request(&key, params) {
1012                Ok(report) => self.ingest_document_report(&key, &canonical_path, report),
1013                Err(err) => PullFileOutcome::RequestFailed {
1014                    reason: err.to_string(),
1015                },
1016            };
1017
1018            results.push(PullFileResult {
1019                server_key: key,
1020                outcome,
1021            });
1022        }
1023
1024        Ok(results)
1025    }
1026
1027    /// Issue a `workspace/diagnostic` request to a specific server. Cancels
1028    /// internally if `timeout` elapses before the server responds. Cached
1029    /// entries from the response are stored so directory-mode queries pick
1030    /// them up.
1031    pub fn pull_workspace_diagnostics(
1032        &mut self,
1033        server_key: &ServerKey,
1034        timeout: Option<std::time::Duration>,
1035    ) -> Result<PullWorkspaceResult, LspError> {
1036        let _timeout = timeout.unwrap_or(Self::PULL_WORKSPACE_TIMEOUT);
1037
1038        let supports_workspace = self
1039            .clients
1040            .get(server_key)
1041            .and_then(|c| c.diagnostic_capabilities())
1042            .is_some_and(|caps| caps.workspace_diagnostics);
1043
1044        if !supports_workspace {
1045            return Ok(PullWorkspaceResult {
1046                server_key: server_key.clone(),
1047                files_reported: Vec::new(),
1048                complete: false,
1049                cancelled: false,
1050                supports_workspace: false,
1051            });
1052        }
1053
1054        let identifier = self
1055            .clients
1056            .get(server_key)
1057            .and_then(|c| c.diagnostic_capabilities())
1058            .and_then(|caps| caps.identifier.clone());
1059
1060        let params = lsp_types::WorkspaceDiagnosticParams {
1061            identifier,
1062            previous_result_ids: Vec::new(),
1063            work_done_progress_params: Default::default(),
1064            partial_result_params: Default::default(),
1065        };
1066
1067        // Note: LspClient::send_request currently uses a fixed REQUEST_TIMEOUT
1068        // (30s, see client.rs). For workspace pull this is intentionally not
1069        // overridden because servers like rust-analyzer may legitimately take
1070        // many seconds on first request. The plugin bridge timeout (also 30s)
1071        // is what we ultimately defer to. In a future revision we should plumb
1072        // a custom timeout through send_request — for v0.16 we accept that
1073        // workspace pull obeys the standard request timeout.
1074        let result = match self
1075            .clients
1076            .get_mut(server_key)
1077            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?
1078            .send_request::<lsp_types::request::WorkspaceDiagnosticRequest>(params)
1079        {
1080            Ok(result) => result,
1081            Err(LspError::Timeout(_)) => {
1082                return Ok(PullWorkspaceResult {
1083                    server_key: server_key.clone(),
1084                    files_reported: Vec::new(),
1085                    complete: false,
1086                    cancelled: true,
1087                    supports_workspace: true,
1088                });
1089            }
1090            Err(err) => return Err(err),
1091        };
1092
1093        // Extract the items list. Partial responses stream via $/progress
1094        // notifications which we don't subscribe to — treat them as soft
1095        // empty (caller will see complete: true with files_reported empty,
1096        // matching "got a partial response, no full report").
1097        let items = match result {
1098            lsp_types::WorkspaceDiagnosticReportResult::Report(report) => report.items,
1099            lsp_types::WorkspaceDiagnosticReportResult::Partial(_) => Vec::new(),
1100        };
1101
1102        // Ingest each file report into the diagnostics store.
1103        let mut files_reported = Vec::with_capacity(items.len());
1104        for item in items {
1105            match item {
1106                lsp_types::WorkspaceDocumentDiagnosticReport::Full(full) => {
1107                    if let Some(file) = uri_to_path(&full.uri) {
1108                        let stored = from_lsp_diagnostics(
1109                            file.clone(),
1110                            full.full_document_diagnostic_report.items.clone(),
1111                        );
1112                        self.diagnostics.publish_with_result_id(
1113                            server_key.clone(),
1114                            file.clone(),
1115                            stored,
1116                            full.full_document_diagnostic_report.result_id.clone(),
1117                        );
1118                        files_reported.push(file);
1119                    }
1120                }
1121                lsp_types::WorkspaceDocumentDiagnosticReport::Unchanged(_unchanged) => {
1122                    // "Unchanged" means the previously cached report is still
1123                    // valid. We left it in place; nothing to do.
1124                }
1125            }
1126        }
1127
1128        Ok(PullWorkspaceResult {
1129            server_key: server_key.clone(),
1130            files_reported,
1131            complete: true,
1132            cancelled: false,
1133            supports_workspace: true,
1134        })
1135    }
1136
1137    /// Issue the per-file diagnostic request and return the report.
1138    fn send_pull_request(
1139        &mut self,
1140        key: &ServerKey,
1141        params: lsp_types::DocumentDiagnosticParams,
1142    ) -> Result<lsp_types::DocumentDiagnosticReportResult, LspError> {
1143        let client = self
1144            .clients
1145            .get_mut(key)
1146            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?;
1147        client.send_request::<lsp_types::request::DocumentDiagnosticRequest>(params)
1148    }
1149
1150    /// Store the result of a per-file pull request and return a structured
1151    /// outcome the caller can inspect.
1152    fn ingest_document_report(
1153        &mut self,
1154        key: &ServerKey,
1155        canonical_path: &Path,
1156        result: lsp_types::DocumentDiagnosticReportResult,
1157    ) -> PullFileOutcome {
1158        let report = match result {
1159            lsp_types::DocumentDiagnosticReportResult::Report(report) => report,
1160            lsp_types::DocumentDiagnosticReportResult::Partial(_) => {
1161                // Partial results stream in via $/progress notifications which
1162                // we don't currently subscribe to. Treat as a soft-empty
1163                // success — the next pull will get the full version.
1164                return PullFileOutcome::PartialNotSupported;
1165            }
1166        };
1167
1168        match report {
1169            lsp_types::DocumentDiagnosticReport::Full(full) => {
1170                let result_id = full.full_document_diagnostic_report.result_id.clone();
1171                let stored = from_lsp_diagnostics(
1172                    canonical_path.to_path_buf(),
1173                    full.full_document_diagnostic_report.items.clone(),
1174                );
1175                let count = stored.len();
1176                self.diagnostics.publish_with_result_id(
1177                    key.clone(),
1178                    canonical_path.to_path_buf(),
1179                    stored,
1180                    result_id,
1181                );
1182                PullFileOutcome::Full {
1183                    diagnostic_count: count,
1184                }
1185            }
1186            lsp_types::DocumentDiagnosticReport::Unchanged(_unchanged) => {
1187                // The server says cache is still valid. We don't refresh
1188                // anything; the existing entry's diagnostics remain authoritative.
1189                PullFileOutcome::Unchanged
1190            }
1191        }
1192    }
1193
1194    /// Shutdown all servers gracefully.
1195    pub fn shutdown_all(&mut self) {
1196        for (key, mut client) in self.clients.drain() {
1197            if let Err(err) = client.shutdown() {
1198                slog_error!("error shutting down {:?}: {}", key, err);
1199            }
1200        }
1201        self.documents.clear();
1202        self.diagnostics = DiagnosticsStore::new();
1203    }
1204
1205    /// Check if any server is active.
1206    pub fn has_active_servers(&self) -> bool {
1207        self.clients
1208            .values()
1209            .any(|client| client.state() == ServerState::Ready)
1210    }
1211
1212    /// Active server keys (running clients). Used by `lsp_diagnostics`
1213    /// directory mode to know which servers to ask for workspace pull.
1214    pub fn active_server_keys(&self) -> Vec<ServerKey> {
1215        self.clients.keys().cloned().collect()
1216    }
1217
1218    pub fn get_diagnostics_for_file(&self, file: &Path) -> Vec<&StoredDiagnostic> {
1219        let normalized = normalize_lookup_path(file);
1220        self.diagnostics.for_file(&normalized)
1221    }
1222
1223    pub fn get_diagnostics_for_directory(&self, dir: &Path) -> Vec<&StoredDiagnostic> {
1224        let normalized = normalize_lookup_path(dir);
1225        self.diagnostics.for_directory(&normalized)
1226    }
1227
1228    pub fn get_all_diagnostics(&self) -> Vec<&StoredDiagnostic> {
1229        self.diagnostics.all()
1230    }
1231
1232    fn drain_events_for_file(&mut self, file_path: &Path) -> bool {
1233        let mut saw_file_diagnostics = false;
1234        while let Ok(event) = self.event_rx.try_recv() {
1235            if matches!(
1236                self.handle_event(&event),
1237                Some(ref published_file) if published_file.as_path() == file_path
1238            ) {
1239                saw_file_diagnostics = true;
1240            }
1241        }
1242        saw_file_diagnostics
1243    }
1244
1245    fn handle_event(&mut self, event: &LspEvent) -> Option<PathBuf> {
1246        match event {
1247            LspEvent::Notification {
1248                server_kind,
1249                root,
1250                method,
1251                params: Some(params),
1252            } if method == "textDocument/publishDiagnostics" => {
1253                self.handle_publish_diagnostics(server_kind.clone(), root.clone(), params)
1254            }
1255            LspEvent::ServerExited { server_kind, root } => {
1256                let key = ServerKey {
1257                    kind: server_kind.clone(),
1258                    root: root.clone(),
1259                };
1260                self.clients.remove(&key);
1261                self.documents.remove(&key);
1262                self.diagnostics.clear_for_server(&key);
1263                None
1264            }
1265            _ => None,
1266        }
1267    }
1268
1269    fn handle_publish_diagnostics(
1270        &mut self,
1271        server: ServerKind,
1272        root: PathBuf,
1273        params: &serde_json::Value,
1274    ) -> Option<PathBuf> {
1275        if let Ok(publish_params) =
1276            serde_json::from_value::<lsp_types::PublishDiagnosticsParams>(params.clone())
1277        {
1278            let file = uri_to_path(&publish_params.uri)?;
1279            let stored = from_lsp_diagnostics(file.clone(), publish_params.diagnostics);
1280            // v0.17.3: store with real ServerKey { kind, root } and capture
1281            // the document `version` (when the server provided one) so the
1282            // post-edit waiter can reject stale publishes deterministically
1283            // via version-match (preferred) or epoch-delta (fallback). The
1284            // earlier `publish_with_kind` path silently dropped both.
1285            let key = ServerKey { kind: server, root };
1286            self.diagnostics
1287                .publish_full(key, file.clone(), stored, None, publish_params.version);
1288            return Some(file);
1289        }
1290        None
1291    }
1292
1293    fn spawn_server(
1294        &self,
1295        def: &ServerDef,
1296        root: &Path,
1297        config: &Config,
1298    ) -> Result<LspClient, LspError> {
1299        let binary = self.resolve_binary(def, config)?;
1300
1301        // Merge the server-defined env with our test-injected env.
1302        // `extra_env` is empty in production; tests use it to drive fake
1303        // server variants (AFT_FAKE_LSP_PULL=1, etc.).
1304        let mut merged_env = def.env.clone();
1305        for (key, value) in &self.extra_env {
1306            merged_env.insert(key.clone(), value.clone());
1307        }
1308
1309        let mut client = LspClient::spawn(
1310            def.kind.clone(),
1311            root.to_path_buf(),
1312            &binary,
1313            &def.args,
1314            &merged_env,
1315            self.event_tx.clone(),
1316            self.child_registry.clone(),
1317        )?;
1318        client.initialize(root, def.initialization_options.clone())?;
1319        Ok(client)
1320    }
1321
1322    fn resolve_binary(&self, def: &ServerDef, config: &Config) -> Result<PathBuf, LspError> {
1323        if let Some(path) = self.binary_overrides.get(&def.kind) {
1324            if path.exists() {
1325                return Ok(path.clone());
1326            }
1327            return Err(LspError::NotFound(format!(
1328                "override binary for {:?} not found: {}",
1329                def.kind,
1330                path.display()
1331            )));
1332        }
1333
1334        if let Some(path) = env_binary_override(&def.kind) {
1335            if path.exists() {
1336                return Ok(path);
1337            }
1338            return Err(LspError::NotFound(format!(
1339                "environment override binary for {:?} not found: {}",
1340                def.kind,
1341                path.display()
1342            )));
1343        }
1344
1345        // Layered resolution:
1346        //   1. <project_root>/node_modules/.bin/<binary>
1347        //   2. config.lsp_paths_extra (plugin auto-install cache, etc.)
1348        //   3. PATH via `which`
1349        resolve_lsp_binary(
1350            &def.binary,
1351            config.project_root.as_deref(),
1352            &config.lsp_paths_extra,
1353        )
1354        .ok_or_else(|| {
1355            LspError::NotFound(format!(
1356                "language server binary '{}' not found in node_modules/.bin, lsp_paths_extra, or PATH",
1357                def.binary
1358            ))
1359        })
1360    }
1361
1362    fn server_key_for_file(&self, file_path: &Path, config: &Config) -> Option<ServerKey> {
1363        for def in servers_for_file(file_path, config) {
1364            let root = find_workspace_root(file_path, &def.root_markers)?;
1365            let key = ServerKey {
1366                kind: def.kind.clone(),
1367                root,
1368            };
1369            if self.clients.contains_key(&key) {
1370                return Some(key);
1371            }
1372        }
1373        None
1374    }
1375}
1376
1377impl Default for LspManager {
1378    fn default() -> Self {
1379        Self::new()
1380    }
1381}
1382
1383fn canonicalize_for_lsp(file_path: &Path) -> Result<PathBuf, LspError> {
1384    std::fs::canonicalize(file_path).map_err(LspError::from)
1385}
1386
1387fn resolve_for_lsp_uri(file_path: &Path) -> PathBuf {
1388    if let Ok(path) = std::fs::canonicalize(file_path) {
1389        return path;
1390    }
1391
1392    let mut existing = file_path.to_path_buf();
1393    let mut missing = Vec::new();
1394    while !existing.exists() {
1395        let Some(name) = existing.file_name() else {
1396            break;
1397        };
1398        missing.push(name.to_owned());
1399        let Some(parent) = existing.parent() else {
1400            break;
1401        };
1402        existing = parent.to_path_buf();
1403    }
1404
1405    let mut resolved = std::fs::canonicalize(&existing).unwrap_or(existing);
1406    for segment in missing.into_iter().rev() {
1407        resolved.push(segment);
1408    }
1409    resolved
1410}
1411
1412fn uri_for_path(path: &Path) -> Result<lsp_types::Uri, LspError> {
1413    let url = url::Url::from_file_path(path).map_err(|_| {
1414        LspError::NotFound(format!(
1415            "failed to convert '{}' to file URI",
1416            path.display()
1417        ))
1418    })?;
1419    lsp_types::Uri::from_str(url.as_str()).map_err(|_| {
1420        LspError::NotFound(format!("failed to parse file URI for '{}'", path.display()))
1421    })
1422}
1423
1424fn language_id_for_extension(ext: &str) -> &'static str {
1425    match ext {
1426        "ts" => "typescript",
1427        "tsx" => "typescriptreact",
1428        "js" | "mjs" | "cjs" => "javascript",
1429        "jsx" => "javascriptreact",
1430        "py" | "pyi" => "python",
1431        "rs" => "rust",
1432        "go" => "go",
1433        "html" | "htm" => "html",
1434        _ => "plaintext",
1435    }
1436}
1437
1438fn normalize_lookup_path(path: &Path) -> PathBuf {
1439    std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1440}
1441
1442fn uri_to_path(uri: &lsp_types::Uri) -> Option<PathBuf> {
1443    let url = url::Url::parse(uri.as_str()).ok()?;
1444    url.to_file_path()
1445        .ok()
1446        .map(|path| normalize_lookup_path(&path))
1447}
1448
1449/// Classify an error returned by `spawn_server` into a structured
1450/// `ServerAttemptResult`. The two interesting cases for callers are:
1451/// - `BinaryNotInstalled` — the server's binary couldn't be resolved on PATH
1452///   or via override. The agent can be told "install bash-language-server".
1453/// - `SpawnFailed` — binary was found but spawning/initializing failed
1454///   (permissions, missing runtime, server crashed during initialize, etc.).
1455fn classify_spawn_error(binary: &str, err: &LspError) -> ServerAttemptResult {
1456    match err {
1457        // resolve_binary returns NotFound for both missing override paths and
1458        // missing PATH binaries. The "override missing" case is rare in
1459        // practice (only set in tests / env vars); we report all NotFound as
1460        // BinaryNotInstalled so the user sees an actionable install hint.
1461        LspError::NotFound(_) => ServerAttemptResult::BinaryNotInstalled {
1462            binary: binary.to_string(),
1463        },
1464        other => ServerAttemptResult::SpawnFailed {
1465            binary: binary.to_string(),
1466            reason: other.to_string(),
1467        },
1468    }
1469}
1470
1471fn env_binary_override(kind: &ServerKind) -> Option<PathBuf> {
1472    let id = kind.id_str();
1473    let suffix: String = id
1474        .chars()
1475        .map(|ch| {
1476            if ch.is_ascii_alphanumeric() {
1477                ch.to_ascii_uppercase()
1478            } else {
1479                '_'
1480            }
1481        })
1482        .collect();
1483    let key = format!("AFT_LSP_{suffix}_BINARY");
1484    std::env::var_os(key).map(PathBuf::from)
1485}