Skip to main content

aft/lsp/
manager.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3
4use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
5use lsp_types::notification::{
6    DidChangeTextDocument, DidChangeWatchedFiles, DidCloseTextDocument, DidOpenTextDocument,
7};
8use lsp_types::{
9    DidChangeTextDocumentParams, DidChangeWatchedFilesParams, DidCloseTextDocumentParams,
10    DidOpenTextDocumentParams, FileChangeType, FileEvent, TextDocumentContentChangeEvent,
11    TextDocumentIdentifier, TextDocumentItem, VersionedTextDocumentIdentifier,
12};
13
14use crate::config::Config;
15use crate::lsp::child_registry::LspChildRegistry;
16use crate::lsp::client::{LspClient, LspEvent, ServerState};
17use crate::lsp::diagnostics::{
18    from_lsp_diagnostics, DiagnosticEntry, DiagnosticsStore, StoredDiagnostic,
19};
20use crate::lsp::document::DocumentStore;
21use crate::lsp::position::{uri_for_path, uri_to_path};
22use crate::lsp::registry::{resolve_lsp_binary, servers_for_file, ServerDef, ServerKind};
23use crate::lsp::roots::{find_workspace_root, ServerKey};
24use crate::lsp::LspError;
25use crate::slog_error;
26
27/// Outcome of attempting to ensure a server is running for a single matching
28/// `ServerDef`. Returned per matching server so the caller can report exactly
29/// what happened to the user instead of collapsing all failures into "no
30/// server".
31#[derive(Debug, Clone)]
32pub enum ServerAttemptResult {
33    /// Server is running and ready to serve requests for this file.
34    Ok { server_key: ServerKey },
35    /// No workspace root was found by walking up from the file looking for
36    /// any of the server's configured root markers.
37    NoRootMarker { looked_for: Vec<String> },
38    /// The server's binary could not be found on PATH (or override was
39    /// missing/invalid).
40    BinaryNotInstalled { binary: String },
41    /// Binary was found but spawning or initializing the server failed.
42    SpawnFailed { binary: String, reason: String },
43}
44
45/// One server's attempt to handle a file.
46#[derive(Debug, Clone)]
47pub struct ServerAttempt {
48    /// Stable server identifier (kind ID, e.g. "pyright", "rust-analyzer").
49    pub server_id: String,
50    /// Server display name from the registry.
51    pub server_name: String,
52    pub result: ServerAttemptResult,
53}
54
55/// Aggregate outcome of `ensure_server_for_file_detailed`. Distinguishes:
56/// - "No server registered for this file's extension" (`attempts.is_empty()`)
57/// - "Servers registered but none could start" (`successful.is_empty()` but
58///   `!attempts.is_empty()`)
59/// - "At least one server is ready" (`!successful.is_empty()`)
60#[derive(Debug, Clone, Default)]
61pub struct EnsureServerOutcomes {
62    /// Server keys that are now running and ready to serve requests.
63    pub successful: Vec<ServerKey>,
64    /// Per-server attempt records. Empty if no server is registered for the
65    /// file's extension.
66    pub attempts: Vec<ServerAttempt>,
67}
68
69impl EnsureServerOutcomes {
70    /// True if no server in the registry matched this file's extension.
71    pub fn no_server_registered(&self) -> bool {
72        self.attempts.is_empty()
73    }
74}
75
76/// Outcome of a post-edit diagnostics wait. Reports the per-server status
77/// alongside the fresh diagnostics, so the response layer can build an
78/// honest tri-state payload (`success: true` + `complete: bool` + named
79/// gap fields per `crates/aft/src/protocol.rs`).
80///
81/// `diagnostics` only contains entries from servers that proved freshness
82/// (version-match preferred, epoch-fallback for unversioned servers).
83/// Pre-edit cached entries are NEVER included — that's the whole point of
84/// this type.
85#[derive(Debug, Clone, Default)]
86pub struct PostEditWaitOutcome {
87    /// Diagnostics from servers whose response we verified is FOR the
88    /// post-edit document version (or whose epoch we saw advance after our
89    /// pre-edit snapshot, for unversioned servers).
90    pub diagnostics: Vec<StoredDiagnostic>,
91    /// Servers we expected to publish but didn't before the deadline.
92    /// Reported to the agent via `pending_lsp_servers` so they understand
93    /// the result is partial.
94    pub pending_servers: Vec<ServerKey>,
95    /// Servers whose process exited between notification and deadline.
96    /// Reported separately so the agent knows the gap is unrecoverable
97    /// without a server restart, not "wait longer."
98    pub exited_servers: Vec<ServerKey>,
99}
100
101/// Pre-edit freshness snapshot for one server/file pair.
102#[derive(Debug, Clone, Copy, Default)]
103pub struct PreEditSnapshot {
104    pub epoch: u64,
105    pub document_version_at_capture: Option<i32>,
106}
107
108pub fn post_edit_entry_is_fresh(
109    entry: &DiagnosticEntry,
110    target_version: i32,
111    pre: PreEditSnapshot,
112) -> bool {
113    if entry.epoch <= pre.epoch {
114        return false;
115    }
116
117    match entry.version {
118        Some(version) => version >= target_version,
119        // Unversioned publishDiagnostics payloads cannot prove which document
120        // state they describe. Epoch advancement only proves arrival order; an
121        // old analysis result can still arrive after our pre-snapshot. Treat as
122        // pending/partial rather than fresh.
123        None => false,
124    }
125}
126
127impl PostEditWaitOutcome {
128    /// True if every expected server reported a fresh result. False means
129    /// the agent should treat the diagnostics as a partial picture.
130    pub fn complete(&self) -> bool {
131        self.pending_servers.is_empty() && self.exited_servers.is_empty()
132    }
133}
134
135/// Per-server outcome of a `textDocument/diagnostic` (per-file pull) request.
136#[derive(Debug, Clone)]
137pub enum PullFileOutcome {
138    /// Server returned a full report; diagnostics stored.
139    Full { diagnostic_count: usize },
140    /// Server returned `kind: "unchanged"` — cached diagnostics still valid.
141    Unchanged,
142    /// Server returned a partial-result token; we don't subscribe to streamed
143    /// progress so the response is treated as a soft empty until the next pull.
144    PartialNotSupported,
145    /// Server doesn't advertise pull capability — caller should fall back to
146    /// push diagnostics for this server.
147    PullNotSupported,
148    /// The pull request failed (timeout, server error, etc.).
149    RequestFailed { reason: String },
150}
151
152/// Result of `pull_file_diagnostics` for one matching server.
153#[derive(Debug, Clone)]
154pub struct PullFileResult {
155    pub server_key: ServerKey,
156    pub outcome: PullFileOutcome,
157}
158
159/// Result of `pull_workspace_diagnostics` for a single server.
160#[derive(Debug, Clone)]
161pub struct PullWorkspaceResult {
162    pub server_key: ServerKey,
163    /// Files for which a Full report was received and cached. Files that came
164    /// back as `Unchanged` are NOT listed here because their cached entry was
165    /// already authoritative.
166    pub files_reported: Vec<PathBuf>,
167    /// True if the server returned a full response within the timeout.
168    pub complete: bool,
169    /// True if we cancelled (request timed out before the server responded).
170    pub cancelled: bool,
171    /// True if the server advertised workspace pull support. When false, the
172    /// other fields are empty and the caller should fall back to file-mode
173    /// pull or to push semantics.
174    pub supports_workspace: bool,
175}
176
177pub struct LspManager {
178    /// Active server instances, keyed by (ServerKind, workspace_root).
179    clients: HashMap<ServerKey, LspClient>,
180    /// Tracks opened documents and versions per active server.
181    documents: HashMap<ServerKey, DocumentStore>,
182    /// Stored publishDiagnostics payloads across all servers.
183    diagnostics: DiagnosticsStore,
184    /// Unified event channel — all server reader threads send here.
185    event_tx: Sender<LspEvent>,
186    event_rx: Receiver<LspEvent>,
187    /// Optional binary path overrides used by integration tests.
188    binary_overrides: HashMap<ServerKind, PathBuf>,
189    /// Extra env vars merged into every spawned LSP child. Used in tests to
190    /// drive the fake server's behavioral variants (`AFT_FAKE_LSP_PULL=1`,
191    /// `AFT_FAKE_LSP_WORKSPACE=1`, etc.). Production code does not set this.
192    extra_env: HashMap<String, String>,
193    /// Per-(kind,root) cache of spawn failures. Once a server fails to spawn
194    /// for a workspace root, we remember why and skip subsequent attempts for
195    /// the lifetime of this AFT process. Without this, every file open or
196    /// didChange retries `spawn_server` and logs a fresh ERROR — visible as
197    /// repeated `failed to spawn TypeScript Language Server: Could not find a
198    /// valid TypeScript installation` lines per edit.
199    ///
200    /// Entries are NEVER evicted automatically. The expected recovery path is
201    /// for the user to fix their environment (install the missing binary or
202    /// add a `tsconfig.json` / `package.json` with the right dependency) and
203    /// restart OpenCode/Pi, which spawns a fresh `aft` process with an empty
204    /// cache. We deliberately don't auto-retry on file events: the failure
205    /// modes we track here (binary not installed, init handshake failure)
206    /// don't fix themselves at runtime.
207    failed_spawns: HashMap<ServerKey, ServerAttemptResult>,
208    /// Server/root pairs for which we already logged that watched-file
209    /// notifications are skipped because the capability is absent.
210    watched_file_skip_logged: HashSet<ServerKey>,
211    /// Tracks PIDs of spawned LSP child processes so the signal handler can
212    /// kill them on SIGTERM/SIGINT before aft exits, preventing orphans.
213    /// Defaults to empty; production wires this from `AppContext`.
214    child_registry: LspChildRegistry,
215}
216
217impl LspManager {
218    pub fn new() -> Self {
219        let (event_tx, event_rx) = unbounded();
220        Self {
221            clients: HashMap::new(),
222            documents: HashMap::new(),
223            diagnostics: DiagnosticsStore::new(),
224            event_tx,
225            event_rx,
226            binary_overrides: HashMap::new(),
227            extra_env: HashMap::new(),
228            failed_spawns: HashMap::new(),
229            watched_file_skip_logged: HashSet::new(),
230            child_registry: LspChildRegistry::new(),
231        }
232    }
233
234    /// Set the child-PID registry. Must be called before any servers spawn.
235    pub fn set_child_registry(&mut self, registry: LspChildRegistry) {
236        self.child_registry = registry;
237    }
238
239    /// For testing: set an extra environment variable that gets passed to
240    /// every spawned LSP child process. Useful for driving fake-server
241    /// behavioral variants in integration tests.
242    pub fn set_extra_env(&mut self, key: &str, value: &str) {
243        self.extra_env.insert(key.to_string(), value.to_string());
244    }
245
246    /// Count active LSP server instances.
247    pub fn server_count(&self) -> usize {
248        self.clients.len()
249    }
250
251    /// For testing: override the binary for a server kind.
252    pub fn override_binary(&mut self, kind: ServerKind, binary_path: PathBuf) {
253        self.binary_overrides.insert(kind, binary_path);
254    }
255
256    /// Ensure a server is running for the given file. Spawns if needed.
257    /// Returns the active server keys for the file, or an empty vec if none match.
258    ///
259    /// This is the lightweight wrapper around [`ensure_server_for_file_detailed`]
260    /// that drops failure context. Prefer the detailed variant in command
261    /// handlers that need to surface honest error messages to the agent.
262    pub fn ensure_server_for_file(&mut self, file_path: &Path, config: &Config) -> Vec<ServerKey> {
263        self.ensure_server_for_file_detailed(file_path, config)
264            .successful
265    }
266
267    /// Detailed version of [`ensure_server_for_file`] that records every
268    /// matching server's outcome (`Ok` / `NoRootMarker` / `BinaryNotInstalled`
269    /// / `SpawnFailed`).
270    ///
271    /// Use this when the caller wants to honestly report _why_ a file has no
272    /// active server (e.g., to surface "bash-language-server not on PATH" to
273    /// the agent instead of silently returning `total: 0`).
274    pub fn ensure_server_for_file_detailed(
275        &mut self,
276        file_path: &Path,
277        config: &Config,
278    ) -> EnsureServerOutcomes {
279        let defs = servers_for_file(file_path, config);
280        let mut outcomes = EnsureServerOutcomes::default();
281
282        for def in defs {
283            let server_id = def.kind.id_str().to_string();
284            let server_name = def.name.to_string();
285
286            let Some(root) = find_workspace_root(file_path, &def.root_markers) else {
287                outcomes.attempts.push(ServerAttempt {
288                    server_id,
289                    server_name,
290                    result: ServerAttemptResult::NoRootMarker {
291                        looked_for: def.root_markers.iter().map(|s| s.to_string()).collect(),
292                    },
293                });
294                continue;
295            };
296
297            let key = ServerKey {
298                kind: def.kind.clone(),
299                root,
300            };
301
302            if !self.clients.contains_key(&key) {
303                // If we already tried and failed to spawn this server for this
304                // root, return the cached classification without retrying or
305                // re-logging. This prevents per-edit ERROR spam when the user's
306                // environment is missing a dependency the LSP needs (the
307                // typescript-language-server "Could not find a valid TypeScript
308                // installation" case is the canonical example).
309                if let Some(cached) = self.failed_spawns.get(&key) {
310                    outcomes.attempts.push(ServerAttempt {
311                        server_id,
312                        server_name,
313                        result: cached.clone(),
314                    });
315                    continue;
316                }
317
318                match self.spawn_server(&def, &key.root, config) {
319                    Ok(client) => {
320                        self.clients.insert(key.clone(), client);
321                        self.documents.entry(key.clone()).or_default();
322                    }
323                    Err(err) => {
324                        slog_error!("failed to spawn {}: {}", def.name, err);
325                        let result = classify_spawn_error(&def.binary, &err);
326                        // Remember the failure so subsequent file events skip
327                        // this (kind, root) pair instead of producing a fresh
328                        // spawn attempt + ERROR log per request.
329                        self.failed_spawns.insert(key.clone(), result.clone());
330                        outcomes.attempts.push(ServerAttempt {
331                            server_id,
332                            server_name,
333                            result,
334                        });
335                        continue;
336                    }
337                }
338            }
339
340            outcomes.attempts.push(ServerAttempt {
341                server_id,
342                server_name,
343                result: ServerAttemptResult::Ok {
344                    server_key: key.clone(),
345                },
346            });
347            outcomes.successful.push(key);
348        }
349
350        outcomes
351    }
352
353    /// Ensure a server is running using the default LSP registry.
354    /// Kept for integration tests that exercise built-in server helpers directly.
355    pub fn ensure_server_for_file_default(&mut self, file_path: &Path) -> Vec<ServerKey> {
356        self.ensure_server_for_file(file_path, &Config::default())
357    }
358    /// Ensure that servers are running for the file and that the document is open
359    /// in each server's DocumentStore. Reads file content from disk if not already open.
360    /// Returns the server keys for the file.
361    pub fn ensure_file_open(
362        &mut self,
363        file_path: &Path,
364        config: &Config,
365    ) -> Result<Vec<ServerKey>, LspError> {
366        let canonical_path = canonicalize_for_lsp(file_path)?;
367        let server_keys = self.ensure_server_for_file(&canonical_path, config);
368        if server_keys.is_empty() {
369            return Ok(server_keys);
370        }
371
372        let uri = uri_for_path(&canonical_path)?;
373        let language_id = language_id_for_extension(
374            canonical_path
375                .extension()
376                .and_then(|ext| ext.to_str())
377                .unwrap_or_default(),
378        )
379        .to_string();
380
381        for key in &server_keys {
382            let already_open = self
383                .documents
384                .get(key)
385                .is_some_and(|store| store.is_open(&canonical_path));
386
387            if !already_open {
388                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
389                if let Some(client) = self.clients.get_mut(key) {
390                    client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
391                        text_document: TextDocumentItem::new(
392                            uri.clone(),
393                            language_id.clone(),
394                            0,
395                            content,
396                        ),
397                    })?;
398                }
399                self.documents
400                    .entry(key.clone())
401                    .or_default()
402                    .open(canonical_path.clone());
403                continue;
404            }
405
406            // Document is already open. Check disk drift — if the file has
407            // been modified outside the AFT pipeline (other tool, manual
408            // edit, sibling session) we MUST send a didChange before any
409            // pull-diagnostic / hover query, otherwise the LSP server
410            // returns results computed from stale in-memory content.
411            //
412            // This is the regression fix Oracle flagged in finding #6:
413            // "ensure_file_open skips already-open files without checking
414            // if disk content changed."
415            let drifted = self
416                .documents
417                .get(key)
418                .is_some_and(|store| store.is_stale_on_disk(&canonical_path));
419            if drifted {
420                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
421                let next_version = self
422                    .documents
423                    .get(key)
424                    .and_then(|store| store.version(&canonical_path))
425                    .map(|v| v + 1)
426                    .unwrap_or(1);
427                if let Some(client) = self.clients.get_mut(key) {
428                    client.send_notification::<DidChangeTextDocument>(
429                        DidChangeTextDocumentParams {
430                            text_document: VersionedTextDocumentIdentifier::new(
431                                uri.clone(),
432                                next_version,
433                            ),
434                            content_changes: vec![TextDocumentContentChangeEvent {
435                                range: None,
436                                range_length: None,
437                                text: content,
438                            }],
439                        },
440                    )?;
441                }
442                if let Some(store) = self.documents.get_mut(key) {
443                    store.bump_version(&canonical_path);
444                }
445            }
446        }
447
448        Ok(server_keys)
449    }
450
451    pub fn ensure_file_open_default(
452        &mut self,
453        file_path: &Path,
454    ) -> Result<Vec<ServerKey>, LspError> {
455        self.ensure_file_open(file_path, &Config::default())
456    }
457
458    /// Notify relevant LSP servers that a file has been written/changed.
459    /// This is the main hook called after every file write in AFT.
460    ///
461    /// If the file's server isn't running yet, starts it (lazy spawn).
462    /// If the file isn't open in LSP yet, sends didOpen. Otherwise sends didChange.
463    pub fn notify_file_changed(
464        &mut self,
465        file_path: &Path,
466        content: &str,
467        config: &Config,
468    ) -> Result<(), LspError> {
469        self.notify_file_changed_versioned(file_path, content, config)
470            .map(|_| ())
471    }
472
473    /// Like `notify_file_changed`, but returns the target document version
474    /// per server so the post-edit waiter can match `publishDiagnostics`
475    /// against the exact version that this notification carried.
476    ///
477    /// Returns: `Vec<(ServerKey, target_version)>`. `target_version` is the
478    /// `version` field on the `VersionedTextDocumentIdentifier` we just sent
479    /// (post-bump). For freshly-opened documents (`didOpen`) the version is
480    /// `0`. Servers that don't honor versioned text document sync will not
481    /// echo this back on `publishDiagnostics`; the caller is expected to
482    /// fall back to the epoch-delta path for those.
483    pub fn notify_file_changed_versioned(
484        &mut self,
485        file_path: &Path,
486        content: &str,
487        config: &Config,
488    ) -> Result<Vec<(ServerKey, i32)>, LspError> {
489        let canonical_path = canonicalize_for_lsp(file_path)?;
490        let server_keys = self.ensure_server_for_file(&canonical_path, config);
491        if server_keys.is_empty() {
492            return Ok(Vec::new());
493        }
494
495        let uri = uri_for_path(&canonical_path)?;
496        let language_id = language_id_for_extension(
497            canonical_path
498                .extension()
499                .and_then(|ext| ext.to_str())
500                .unwrap_or_default(),
501        )
502        .to_string();
503
504        let mut versions: Vec<(ServerKey, i32)> = Vec::with_capacity(server_keys.len());
505
506        for key in server_keys {
507            let current_version = self
508                .documents
509                .get(&key)
510                .and_then(|store| store.version(&canonical_path));
511
512            if let Some(version) = current_version {
513                let next_version = version + 1;
514                if let Some(client) = self.clients.get_mut(&key) {
515                    client.send_notification::<DidChangeTextDocument>(
516                        DidChangeTextDocumentParams {
517                            text_document: VersionedTextDocumentIdentifier::new(
518                                uri.clone(),
519                                next_version,
520                            ),
521                            content_changes: vec![TextDocumentContentChangeEvent {
522                                range: None,
523                                range_length: None,
524                                text: content.to_string(),
525                            }],
526                        },
527                    )?;
528                }
529                if let Some(store) = self.documents.get_mut(&key) {
530                    store.bump_version(&canonical_path);
531                }
532                versions.push((key, next_version));
533                continue;
534            }
535
536            if let Some(client) = self.clients.get_mut(&key) {
537                client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
538                    text_document: TextDocumentItem::new(
539                        uri.clone(),
540                        language_id.clone(),
541                        0,
542                        content.to_string(),
543                    ),
544                })?;
545            }
546            self.documents
547                .entry(key.clone())
548                .or_default()
549                .open(canonical_path.clone());
550            // didOpen carries version 0 — that's the version the server
551            // will echo on its first publishDiagnostics for this document.
552            versions.push((key, 0));
553        }
554
555        Ok(versions)
556    }
557
558    pub fn notify_file_changed_default(
559        &mut self,
560        file_path: &Path,
561        content: &str,
562    ) -> Result<(), LspError> {
563        self.notify_file_changed(file_path, content, &Config::default())
564    }
565
566    /// Notify every active server whose workspace contains at least one changed
567    /// path that watched files changed. This is intentionally workspace-scoped
568    /// rather than extension-scoped: configuration edits such as `package.json`
569    /// or `tsconfig.json` affect a server's project graph even though those
570    /// files may not be documents handled by the server itself.
571    pub fn notify_files_watched_changed(
572        &mut self,
573        paths: &[(PathBuf, FileChangeType)],
574        _config: &Config,
575    ) -> Result<(), LspError> {
576        if paths.is_empty() {
577            return Ok(());
578        }
579
580        let mut canonical_events = Vec::with_capacity(paths.len());
581        for (path, typ) in paths {
582            let canonical_path = resolve_for_lsp_uri(path);
583            canonical_events.push((canonical_path, *typ));
584        }
585
586        let keys: Vec<ServerKey> = self.clients.keys().cloned().collect();
587        for key in keys {
588            let mut changes = Vec::new();
589            for (path, typ) in &canonical_events {
590                if !path.starts_with(&key.root) {
591                    continue;
592                }
593                changes.push(FileEvent::new(uri_for_path(path)?, *typ));
594            }
595
596            if changes.is_empty() {
597                continue;
598            }
599
600            if let Some(client) = self.clients.get_mut(&key) {
601                // Only send after the server dynamically registers interest.
602                // `workspace/didChangeWatchedFiles` is client-owned; a server's
603                // initialize-time dynamicRegistration shape is not a subscription.
604                if !client.has_watched_file_registration() {
605                    if self.watched_file_skip_logged.insert(key.clone()) {
606                        log::debug!(
607                            "skipping didChangeWatchedFiles for {:?} (not dynamically registered)",
608                            key
609                        );
610                    }
611                    continue;
612                }
613                client.send_notification::<DidChangeWatchedFiles>(DidChangeWatchedFilesParams {
614                    changes,
615                })?;
616            }
617        }
618
619        Ok(())
620    }
621
622    /// Close a document in all servers that have it open.
623    pub fn notify_file_closed(&mut self, file_path: &Path) -> Result<(), LspError> {
624        let canonical_path = canonicalize_for_lsp(file_path)?;
625        let uri = uri_for_path(&canonical_path)?;
626        let keys: Vec<ServerKey> = self.documents.keys().cloned().collect();
627
628        for key in keys {
629            let was_open = self
630                .documents
631                .get(&key)
632                .map(|store| store.is_open(&canonical_path))
633                .unwrap_or(false);
634            if !was_open {
635                continue;
636            }
637
638            if let Some(client) = self.clients.get_mut(&key) {
639                client.send_notification::<DidCloseTextDocument>(DidCloseTextDocumentParams {
640                    text_document: TextDocumentIdentifier::new(uri.clone()),
641                })?;
642            }
643
644            if let Some(store) = self.documents.get_mut(&key) {
645                store.close(&canonical_path);
646            }
647        }
648
649        Ok(())
650    }
651
652    /// Get an active client for a file path, if one exists.
653    pub fn client_for_file(&self, file_path: &Path, config: &Config) -> Option<&LspClient> {
654        let key = self.server_key_for_file(file_path, config)?;
655        self.clients.get(&key)
656    }
657
658    pub fn client_for_file_default(&self, file_path: &Path) -> Option<&LspClient> {
659        self.client_for_file(file_path, &Config::default())
660    }
661
662    /// Get a mutable active client for a file path, if one exists.
663    pub fn client_for_file_mut(
664        &mut self,
665        file_path: &Path,
666        config: &Config,
667    ) -> Option<&mut LspClient> {
668        let key = self.server_key_for_file(file_path, config)?;
669        self.clients.get_mut(&key)
670    }
671
672    pub fn client_for_file_mut_default(&mut self, file_path: &Path) -> Option<&mut LspClient> {
673        self.client_for_file_mut(file_path, &Config::default())
674    }
675
676    /// Number of tracked server clients.
677    pub fn active_client_count(&self) -> usize {
678        self.clients.len()
679    }
680
681    /// Drain all pending LSP events. Call from the main loop.
682    pub fn drain_events(&mut self) -> Vec<LspEvent> {
683        let mut events = Vec::new();
684        while let Ok(event) = self.event_rx.try_recv() {
685            self.handle_event(&event);
686            events.push(event);
687        }
688        events
689    }
690
691    /// Wait for diagnostics to arrive for a specific file until a timeout expires.
692    pub fn wait_for_diagnostics(
693        &mut self,
694        file_path: &Path,
695        config: &Config,
696        timeout: std::time::Duration,
697    ) -> Vec<StoredDiagnostic> {
698        let deadline = std::time::Instant::now() + timeout;
699        self.wait_for_file_diagnostics(file_path, config, deadline)
700    }
701
702    pub fn wait_for_diagnostics_default(
703        &mut self,
704        file_path: &Path,
705        timeout: std::time::Duration,
706    ) -> Vec<StoredDiagnostic> {
707        self.wait_for_diagnostics(file_path, &Config::default(), timeout)
708    }
709
710    /// Test-only accessor for the diagnostics store. Used by integration
711    /// tests that need to inspect per-server entries (e.g., to verify that
712    /// `ServerKey::root` is populated correctly, not the empty path that
713    /// the legacy `publish_with_kind` path produced).
714    #[doc(hidden)]
715    pub fn diagnostics_store_for_test(&self) -> &DiagnosticsStore {
716        &self.diagnostics
717    }
718
719    /// Snapshot the current per-server epoch for every entry that exists
720    /// for `file_path`. Servers without an entry yet (never published)
721    /// are absent from the map; for those, `pre = 0` (any first publish
722    /// will be considered fresh under the epoch-fallback rule).
723    pub fn snapshot_diagnostic_epochs(&self, file_path: &Path) -> HashMap<ServerKey, u64> {
724        let lookup_path = normalize_lookup_path(file_path);
725        self.diagnostics
726            .entries_for_file(&lookup_path)
727            .into_iter()
728            .map(|(key, entry)| (key.clone(), entry.epoch))
729            .collect()
730    }
731
732    /// Snapshot the current diagnostic epoch and document version for every
733    /// active server relevant to `file_path` before a post-edit notification.
734    pub fn snapshot_pre_edit_state(&self, file_path: &Path) -> HashMap<ServerKey, PreEditSnapshot> {
735        let lookup_path = normalize_lookup_path(file_path);
736        let mut snapshots: HashMap<ServerKey, PreEditSnapshot> = self
737            .diagnostics
738            .entries_for_file(&lookup_path)
739            .into_iter()
740            .map(|(key, entry)| {
741                (
742                    key.clone(),
743                    PreEditSnapshot {
744                        epoch: entry.epoch,
745                        document_version_at_capture: None,
746                    },
747                )
748            })
749            .collect();
750
751        for (key, store) in &self.documents {
752            if let Some(version) = store.version(&lookup_path) {
753                snapshots
754                    .entry(key.clone())
755                    .or_default()
756                    .document_version_at_capture = Some(version);
757            }
758        }
759
760        snapshots
761    }
762
763    /// True when the current diagnostic entry for `server_key` can be tied to
764    /// that server's current in-memory document version for `file_path`.
765    ///
766    /// File-mode `lsp_diagnostics` uses this for push-only fallback after it
767    /// has synced/opened the document. Versioned publishes are accepted when
768    /// they match the current document version; unversioned publishes are not
769    /// accepted as fresh because epoch/wall-clock ordering alone is racy.
770    pub fn diagnostic_entry_is_fresh_for_document(
771        &self,
772        file_path: &Path,
773        server_key: &ServerKey,
774        pre: PreEditSnapshot,
775    ) -> bool {
776        let lookup_path = normalize_lookup_path(file_path);
777        let Some(entry) = self
778            .diagnostics
779            .entries_for_file(&lookup_path)
780            .into_iter()
781            .find_map(|(key, entry)| if key == server_key { Some(entry) } else { None })
782        else {
783            return false;
784        };
785
786        let target_version = self
787            .documents
788            .get(server_key)
789            .and_then(|store| store.version(&lookup_path))
790            .or(pre.document_version_at_capture)
791            .unwrap_or(0);
792
793        matches!(entry.version, Some(version) if version >= target_version)
794    }
795
796    /// Wait for FRESH per-server diagnostics that match the just-sent
797    /// document version. This is the v0.17.3 post-edit path that fixes the
798    /// stale-diagnostics bug: instead of returning whatever is in the cache
799    /// when the deadline hits, we only return entries whose `version`
800    /// matches the post-edit target version (or, for servers that don't
801    /// participate in versioned sync, whose `epoch` was bumped after the
802    /// pre-edit snapshot).
803    ///
804    /// `expected_versions` should come from `notify_file_changed_versioned`
805    /// — one `(ServerKey, target_version)` per server we sent didChange/
806    /// didOpen to.
807    ///
808    /// `pre_snapshot` is the per-server epoch BEFORE the notification was
809    /// sent; it gates the epoch-fallback path so an old-version publish
810    /// arriving after `drain_events` and before `didChange` cannot be
811    /// mistaken for a fresh response.
812    ///
813    /// Returns a per-server tri-state: `Fresh` (publish matched target
814    /// version OR epoch advanced past snapshot for an unversioned server),
815    /// `Pending` (deadline hit before this server published anything we
816    /// could verify), or `Exited` (server died between notification and
817    /// deadline).
818    pub fn wait_for_post_edit_diagnostics(
819        &mut self,
820        file_path: &Path,
821        // `config` is intentionally accepted (matches sibling wait APIs and
822        // future-proofs us if freshness rules need it). Currently unused
823        // because expected_versions/pre_snapshot fully determine behavior.
824        _config: &Config,
825        expected_versions: &[(ServerKey, i32)],
826        pre_snapshot: &HashMap<ServerKey, PreEditSnapshot>,
827        timeout: std::time::Duration,
828    ) -> PostEditWaitOutcome {
829        let lookup_path = normalize_lookup_path(file_path);
830        let deadline = std::time::Instant::now() + timeout;
831
832        // Drain any events that arrived while we were sending didChange.
833        // The publishDiagnostics handler stores the version, so even
834        // pre-snapshot publishes that landed late won't be mistaken for
835        // fresh — the version-match check will reject them.
836        let _ = self.drain_events_for_file(&lookup_path);
837
838        let mut fresh: HashMap<ServerKey, Vec<StoredDiagnostic>> = HashMap::new();
839        let mut exited: Vec<ServerKey> = Vec::new();
840
841        loop {
842            // Check freshness for every expected server. A server is fresh
843            // if its current entry for this file satisfies either:
844            //   1. version-match: entry.version == Some(target_version), OR
845            //   2. push-only freshness: entry.version is None AND entry.epoch
846            //      advanced strictly after the pre-edit snapshot. Versioned
847            //      publishes must be >= the post-edit target version.
848            // Servers whose process has exited are reported separately.
849            for (key, target_version) in expected_versions {
850                if fresh.contains_key(key) || exited.contains(key) {
851                    continue;
852                }
853                if !self.clients.contains_key(key) {
854                    exited.push(key.clone());
855                    continue;
856                }
857                if let Some(entry) = self
858                    .diagnostics
859                    .entries_for_file(&lookup_path)
860                    .into_iter()
861                    .find_map(|(k, e)| if k == key { Some(e) } else { None })
862                {
863                    let pre = pre_snapshot.get(key).copied().unwrap_or_default();
864                    let is_fresh = post_edit_entry_is_fresh(entry, *target_version, pre);
865                    if is_fresh {
866                        fresh.insert(key.clone(), entry.diagnostics.clone());
867                    }
868                }
869            }
870
871            // All accounted for? Done.
872            if fresh.len() + exited.len() == expected_versions.len() {
873                break;
874            }
875
876            let now = std::time::Instant::now();
877            if now >= deadline {
878                break;
879            }
880
881            let timeout = deadline.saturating_duration_since(now);
882            match self.event_rx.recv_timeout(timeout) {
883                Ok(event) => {
884                    self.handle_event(&event);
885                }
886                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
887            }
888        }
889
890        // Pending = expected but neither fresh nor exited.
891        let pending: Vec<ServerKey> = expected_versions
892            .iter()
893            .filter(|(k, _)| !fresh.contains_key(k) && !exited.contains(k))
894            .map(|(k, _)| k.clone())
895            .collect();
896
897        // Build deduplicated, sorted diagnostics from the fresh servers only.
898        // Stale or pending servers contribute zero diagnostics.
899        let mut diagnostics: Vec<StoredDiagnostic> = fresh
900            .into_iter()
901            .flat_map(|(_, diags)| diags.into_iter())
902            .collect();
903        diagnostics.sort_by(|a, b| {
904            a.file
905                .cmp(&b.file)
906                .then(a.line.cmp(&b.line))
907                .then(a.column.cmp(&b.column))
908                .then(a.message.cmp(&b.message))
909        });
910
911        PostEditWaitOutcome {
912            diagnostics,
913            pending_servers: pending,
914            exited_servers: exited,
915        }
916    }
917
918    /// Wait for diagnostics to arrive for a specific file until a deadline.
919    ///
920    /// Drains already-queued events first, then blocks on the shared event
921    /// channel only until either `publishDiagnostics` arrives for this file or
922    /// the deadline is reached.
923    pub fn wait_for_file_diagnostics(
924        &mut self,
925        file_path: &Path,
926        config: &Config,
927        deadline: std::time::Instant,
928    ) -> Vec<StoredDiagnostic> {
929        let lookup_path = normalize_lookup_path(file_path);
930
931        if self.server_key_for_file(&lookup_path, config).is_none() {
932            return Vec::new();
933        }
934
935        loop {
936            if self.drain_events_for_file(&lookup_path) {
937                break;
938            }
939
940            let now = std::time::Instant::now();
941            if now >= deadline {
942                break;
943            }
944
945            let timeout = deadline.saturating_duration_since(now);
946            match self.event_rx.recv_timeout(timeout) {
947                Ok(event) => {
948                    if matches!(
949                        self.handle_event(&event),
950                        Some(ref published_file) if published_file.as_path() == lookup_path.as_path()
951                    ) {
952                        break;
953                    }
954                }
955                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
956            }
957        }
958
959        self.get_diagnostics_for_file(&lookup_path)
960            .into_iter()
961            .cloned()
962            .collect()
963    }
964
965    /// Default timeout for `textDocument/diagnostic` (per-file pull). Servers
966    /// usually respond in under 1s for files they've already analyzed; we
967    /// allow up to 10s before falling back to push semantics. Currently
968    /// surfaced via [`Self::pull_file_timeout`] for callers that want to
969    /// override the wait via the `wait_ms` knob.
970    pub const PULL_FILE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
971
972    /// Public accessor so command handlers can reuse the documented default.
973    pub fn pull_file_timeout() -> std::time::Duration {
974        Self::PULL_FILE_TIMEOUT
975    }
976
977    /// Default timeout for `workspace/diagnostic`. The LSP spec allows the
978    /// server to hold this open indefinitely; we cap at 10s and report
979    /// `complete: false` to the agent rather than hanging the bridge.
980    const PULL_WORKSPACE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
981
982    /// Issue a `textDocument/diagnostic` (LSP 3.17 per-file pull) request to
983    /// every server that supports pull diagnostics for the given file.
984    ///
985    /// Returns the per-server outcome. If a server reports `kind: "unchanged"`,
986    /// the cached entry's diagnostics are surfaced (deterministic re-use of
987    /// the previous response). If a server doesn't advertise pull capability,
988    /// it's skipped here — the caller should fall back to push for those.
989    ///
990    /// Side effects: results are stored in `DiagnosticsStore` so directory-mode
991    /// queries can aggregate them later.
992    pub fn pull_file_diagnostics(
993        &mut self,
994        file_path: &Path,
995        config: &Config,
996    ) -> Result<Vec<PullFileResult>, LspError> {
997        let canonical_path = canonicalize_for_lsp(file_path)?;
998        // Make sure servers are running and the document is open with fresh
999        // content (handles disk-drift via DocumentStore::is_stale_on_disk).
1000        self.ensure_file_open(&canonical_path, config)?;
1001
1002        let server_keys = self.ensure_server_for_file(&canonical_path, config);
1003        if server_keys.is_empty() {
1004            return Ok(Vec::new());
1005        }
1006
1007        let uri = uri_for_path(&canonical_path)?;
1008        let mut results = Vec::with_capacity(server_keys.len());
1009
1010        for key in server_keys {
1011            let supports_pull = self
1012                .clients
1013                .get(&key)
1014                .and_then(|c| c.diagnostic_capabilities())
1015                .is_some_and(|caps| caps.pull_diagnostics);
1016
1017            if !supports_pull {
1018                results.push(PullFileResult {
1019                    server_key: key.clone(),
1020                    outcome: PullFileOutcome::PullNotSupported,
1021                });
1022                continue;
1023            }
1024
1025            // Look up previous resultId for incremental requests.
1026            let previous_result_id = self
1027                .diagnostics
1028                .entries_for_file(&canonical_path)
1029                .into_iter()
1030                .find(|(k, _)| **k == key)
1031                .and_then(|(_, entry)| entry.result_id.clone());
1032
1033            let identifier = self
1034                .clients
1035                .get(&key)
1036                .and_then(|c| c.diagnostic_capabilities())
1037                .and_then(|caps| caps.identifier.clone());
1038
1039            let params = lsp_types::DocumentDiagnosticParams {
1040                text_document: lsp_types::TextDocumentIdentifier { uri: uri.clone() },
1041                identifier,
1042                previous_result_id,
1043                work_done_progress_params: Default::default(),
1044                partial_result_params: Default::default(),
1045            };
1046
1047            let outcome = match self.send_pull_request(&key, params) {
1048                Ok(report) => self.ingest_document_report(&key, &canonical_path, report),
1049                Err(err) => {
1050                    if let Some(result) =
1051                        self.cache_post_initialize_exit(&key, key.kind.id_str(), &err)
1052                    {
1053                        PullFileOutcome::RequestFailed {
1054                            reason: server_attempt_result_reason(&result),
1055                        }
1056                    } else {
1057                        PullFileOutcome::RequestFailed {
1058                            reason: err.to_string(),
1059                        }
1060                    }
1061                }
1062            };
1063
1064            results.push(PullFileResult {
1065                server_key: key,
1066                outcome,
1067            });
1068        }
1069
1070        Ok(results)
1071    }
1072
1073    /// Issue a `workspace/diagnostic` request to a specific server. Cancels
1074    /// internally if `timeout` elapses before the server responds. Cached
1075    /// entries from the response are stored so directory-mode queries pick
1076    /// them up.
1077    pub fn pull_workspace_diagnostics(
1078        &mut self,
1079        server_key: &ServerKey,
1080        timeout: Option<std::time::Duration>,
1081    ) -> Result<PullWorkspaceResult, LspError> {
1082        let timeout = timeout.unwrap_or(Self::PULL_WORKSPACE_TIMEOUT);
1083
1084        let supports_workspace = self
1085            .clients
1086            .get(server_key)
1087            .and_then(|c| c.diagnostic_capabilities())
1088            .is_some_and(|caps| caps.workspace_diagnostics);
1089
1090        if !supports_workspace {
1091            return Ok(PullWorkspaceResult {
1092                server_key: server_key.clone(),
1093                files_reported: Vec::new(),
1094                complete: false,
1095                cancelled: false,
1096                supports_workspace: false,
1097            });
1098        }
1099
1100        let identifier = self
1101            .clients
1102            .get(server_key)
1103            .and_then(|c| c.diagnostic_capabilities())
1104            .and_then(|caps| caps.identifier.clone());
1105
1106        let params = lsp_types::WorkspaceDiagnosticParams {
1107            identifier,
1108            previous_result_ids: Vec::new(),
1109            work_done_progress_params: Default::default(),
1110            partial_result_params: Default::default(),
1111        };
1112
1113        let result = match self
1114            .clients
1115            .get_mut(server_key)
1116            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?
1117            .send_request_with_timeout::<lsp_types::request::WorkspaceDiagnosticRequest>(
1118                params, timeout,
1119            ) {
1120            Ok(result) => result,
1121            Err(LspError::Timeout(_)) => {
1122                return Ok(PullWorkspaceResult {
1123                    server_key: server_key.clone(),
1124                    files_reported: Vec::new(),
1125                    complete: false,
1126                    cancelled: true,
1127                    supports_workspace: true,
1128                });
1129            }
1130            Err(err) => {
1131                if let Some(result) =
1132                    self.cache_post_initialize_exit(server_key, server_key.kind.id_str(), &err)
1133                {
1134                    return Err(LspError::ServerNotReady(server_attempt_result_reason(
1135                        &result,
1136                    )));
1137                }
1138                return Err(err);
1139            }
1140        };
1141
1142        // Extract the items list. Partial responses are not a complete
1143        // workspace view, but the partial payload can still contain useful
1144        // document reports; ingest those while surfacing complete=false.
1145        let (items, complete) = match result {
1146            lsp_types::WorkspaceDiagnosticReportResult::Report(report) => (report.items, true),
1147            lsp_types::WorkspaceDiagnosticReportResult::Partial(partial) => (partial.items, false),
1148        };
1149
1150        // Ingest each file report into the diagnostics store.
1151        let mut files_reported = Vec::with_capacity(items.len());
1152        for item in items {
1153            match item {
1154                lsp_types::WorkspaceDocumentDiagnosticReport::Full(full) => {
1155                    if let Some(file) = uri_to_path(&full.uri) {
1156                        let stored = from_lsp_diagnostics(
1157                            file.clone(),
1158                            full.full_document_diagnostic_report.items.clone(),
1159                        );
1160                        self.diagnostics.publish_with_result_id(
1161                            server_key.clone(),
1162                            file.clone(),
1163                            stored,
1164                            full.full_document_diagnostic_report.result_id.clone(),
1165                        );
1166                        files_reported.push(file);
1167                    }
1168                }
1169                lsp_types::WorkspaceDocumentDiagnosticReport::Unchanged(_unchanged) => {
1170                    // "Unchanged" means the previously cached report is still
1171                    // valid. We left it in place; nothing to do.
1172                }
1173            }
1174        }
1175
1176        Ok(PullWorkspaceResult {
1177            server_key: server_key.clone(),
1178            files_reported,
1179            complete,
1180            cancelled: false,
1181            supports_workspace: true,
1182        })
1183    }
1184
1185    fn cache_post_initialize_exit(
1186        &mut self,
1187        key: &ServerKey,
1188        binary: &str,
1189        err: &LspError,
1190    ) -> Option<ServerAttemptResult> {
1191        let (status, stderr_tail) = {
1192            let client = self.clients.get_mut(key)?;
1193            let mut status = client.child_exit_status();
1194            for _ in 0..10 {
1195                if status.is_some() {
1196                    break;
1197                }
1198                std::thread::sleep(std::time::Duration::from_millis(10));
1199                status = client.child_exit_status();
1200            }
1201            let status = status?;
1202            wait_for_stderr_tail(client);
1203            (status, client.stderr_tail())
1204        };
1205        let reason = format_post_initialize_exit_reason(binary, status, &stderr_tail, err);
1206        let result = ServerAttemptResult::SpawnFailed {
1207            binary: binary.to_string(),
1208            reason,
1209        };
1210        self.clients.remove(key);
1211        self.documents.remove(key);
1212        self.diagnostics.clear_for_server(key);
1213        self.failed_spawns.insert(key.clone(), result.clone());
1214        Some(result)
1215    }
1216
1217    /// Issue the per-file diagnostic request and return the report.
1218    fn send_pull_request(
1219        &mut self,
1220        key: &ServerKey,
1221        params: lsp_types::DocumentDiagnosticParams,
1222    ) -> Result<lsp_types::DocumentDiagnosticReportResult, LspError> {
1223        let client = self
1224            .clients
1225            .get_mut(key)
1226            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?;
1227        client.send_request::<lsp_types::request::DocumentDiagnosticRequest>(params)
1228    }
1229
1230    /// Store the result of a per-file pull request and return a structured
1231    /// outcome the caller can inspect.
1232    fn ingest_document_report(
1233        &mut self,
1234        key: &ServerKey,
1235        canonical_path: &Path,
1236        result: lsp_types::DocumentDiagnosticReportResult,
1237    ) -> PullFileOutcome {
1238        let report = match result {
1239            lsp_types::DocumentDiagnosticReportResult::Report(report) => report,
1240            lsp_types::DocumentDiagnosticReportResult::Partial(_) => {
1241                // Partial results stream in via $/progress notifications which
1242                // we don't currently subscribe to. Treat as a soft-empty
1243                // success — the next pull will get the full version.
1244                return PullFileOutcome::PartialNotSupported;
1245            }
1246        };
1247
1248        match report {
1249            lsp_types::DocumentDiagnosticReport::Full(full) => {
1250                let result_id = full.full_document_diagnostic_report.result_id.clone();
1251                let stored = from_lsp_diagnostics(
1252                    canonical_path.to_path_buf(),
1253                    full.full_document_diagnostic_report.items.clone(),
1254                );
1255                let count = stored.len();
1256                self.diagnostics.publish_with_result_id(
1257                    key.clone(),
1258                    canonical_path.to_path_buf(),
1259                    stored,
1260                    result_id,
1261                );
1262                PullFileOutcome::Full {
1263                    diagnostic_count: count,
1264                }
1265            }
1266            lsp_types::DocumentDiagnosticReport::Unchanged(_unchanged) => {
1267                // The server says cache is still valid. We don't refresh
1268                // anything; the existing entry's diagnostics remain authoritative.
1269                PullFileOutcome::Unchanged
1270            }
1271        }
1272    }
1273
1274    /// Shutdown all servers gracefully.
1275    pub fn shutdown_all(&mut self) {
1276        for (key, mut client) in self.clients.drain() {
1277            if let Err(err) = client.shutdown() {
1278                slog_error!("error shutting down {:?}: {}", key, err);
1279            }
1280        }
1281        self.documents.clear();
1282        self.diagnostics = DiagnosticsStore::new();
1283    }
1284
1285    /// Check if any server is active.
1286    pub fn has_active_servers(&self) -> bool {
1287        self.clients
1288            .values()
1289            .any(|client| client.state() == ServerState::Ready)
1290    }
1291
1292    /// Active server keys (running clients). Used by `lsp_diagnostics`
1293    /// directory mode to know which servers to ask for workspace pull.
1294    pub fn active_server_keys(&self) -> Vec<ServerKey> {
1295        self.clients.keys().cloned().collect()
1296    }
1297
1298    pub fn get_diagnostics_for_file(&self, file: &Path) -> Vec<&StoredDiagnostic> {
1299        let normalized = normalize_lookup_path(file);
1300        self.diagnostics.for_file(&normalized)
1301    }
1302
1303    pub fn get_diagnostics_for_directory(&self, dir: &Path) -> Vec<&StoredDiagnostic> {
1304        let normalized = normalize_lookup_path(dir);
1305        self.diagnostics.for_directory(&normalized)
1306    }
1307
1308    pub fn get_all_diagnostics(&self) -> Vec<&StoredDiagnostic> {
1309        self.diagnostics.all()
1310    }
1311
1312    fn drain_events_for_file(&mut self, file_path: &Path) -> bool {
1313        let mut saw_file_diagnostics = false;
1314        while let Ok(event) = self.event_rx.try_recv() {
1315            if matches!(
1316                self.handle_event(&event),
1317                Some(ref published_file) if published_file.as_path() == file_path
1318            ) {
1319                saw_file_diagnostics = true;
1320            }
1321        }
1322        saw_file_diagnostics
1323    }
1324
1325    fn handle_event(&mut self, event: &LspEvent) -> Option<PathBuf> {
1326        match event {
1327            LspEvent::Notification {
1328                server_kind,
1329                root,
1330                method,
1331                params: Some(params),
1332            } if method == "textDocument/publishDiagnostics" => {
1333                self.handle_publish_diagnostics(server_kind.clone(), root.clone(), params)
1334            }
1335            LspEvent::ServerExited { server_kind, root } => {
1336                let key = ServerKey {
1337                    kind: server_kind.clone(),
1338                    root: root.clone(),
1339                };
1340                self.clients.remove(&key);
1341                self.documents.remove(&key);
1342                self.diagnostics.clear_for_server(&key);
1343                None
1344            }
1345            _ => None,
1346        }
1347    }
1348
1349    fn handle_publish_diagnostics(
1350        &mut self,
1351        server: ServerKind,
1352        root: PathBuf,
1353        params: &serde_json::Value,
1354    ) -> Option<PathBuf> {
1355        if let Ok(publish_params) =
1356            serde_json::from_value::<lsp_types::PublishDiagnosticsParams>(params.clone())
1357        {
1358            let file = uri_to_path(&publish_params.uri)?;
1359            let stored = from_lsp_diagnostics(file.clone(), publish_params.diagnostics);
1360            // v0.17.3: store with real ServerKey { kind, root } and capture
1361            // the document `version` (when the server provided one) so the
1362            // post-edit waiter can reject stale publishes deterministically
1363            // via version-match (preferred) or epoch-delta (fallback). The
1364            // earlier `publish_with_kind` path silently dropped both.
1365            let key = ServerKey { kind: server, root };
1366            self.diagnostics
1367                .publish_full(key, file.clone(), stored, None, publish_params.version);
1368            return Some(file);
1369        }
1370        None
1371    }
1372
1373    fn spawn_server(
1374        &self,
1375        def: &ServerDef,
1376        root: &Path,
1377        config: &Config,
1378    ) -> Result<LspClient, LspError> {
1379        let binary = self.resolve_binary(def, config)?;
1380
1381        // Merge the server-defined env with our test-injected env.
1382        // `extra_env` is empty in production; tests use it to drive fake
1383        // server variants (AFT_FAKE_LSP_PULL=1, etc.).
1384        let mut merged_env = def.env.clone();
1385        for (key, value) in &self.extra_env {
1386            merged_env.insert(key.clone(), value.clone());
1387        }
1388
1389        let mut client = LspClient::spawn(
1390            def.kind.clone(),
1391            root.to_path_buf(),
1392            &binary,
1393            &def.args,
1394            &merged_env,
1395            self.event_tx.clone(),
1396            self.child_registry.clone(),
1397        )?;
1398        if let Err(err) = client.initialize(root, def.initialization_options.clone()) {
1399            wait_for_stderr_tail(&mut client);
1400            let stderr_tail = client.stderr_tail();
1401            let reason = if client.child_exited() || !stderr_tail.is_empty() {
1402                format_initialize_failure_reason(&def.binary, &stderr_tail, &err)
1403            } else {
1404                format!("server failed during initialize: {err}")
1405            };
1406            return Err(LspError::ServerNotReady(reason));
1407        }
1408        Ok(client)
1409    }
1410
1411    fn resolve_binary(&self, def: &ServerDef, config: &Config) -> Result<PathBuf, LspError> {
1412        if let Some(path) = self.binary_overrides.get(&def.kind) {
1413            if path.exists() {
1414                return Ok(path.clone());
1415            }
1416            return Err(LspError::NotFound(format!(
1417                "override binary for {:?} not found: {}",
1418                def.kind,
1419                path.display()
1420            )));
1421        }
1422
1423        if let Some(path) = env_binary_override(&def.kind) {
1424            if path.exists() {
1425                return Ok(path);
1426            }
1427            return Err(LspError::NotFound(format!(
1428                "environment override binary for {:?} not found: {}",
1429                def.kind,
1430                path.display()
1431            )));
1432        }
1433
1434        // Layered resolution:
1435        //   1. <project_root>/node_modules/.bin/<binary>
1436        //   2. config.lsp_paths_extra (plugin auto-install cache, etc.)
1437        //   3. PATH via `which`
1438        resolve_lsp_binary(
1439            &def.binary,
1440            config.project_root.as_deref(),
1441            &config.lsp_paths_extra,
1442        )
1443        .ok_or_else(|| {
1444            LspError::NotFound(format!(
1445                "language server binary '{}' not found in node_modules/.bin, lsp_paths_extra, or PATH",
1446                def.binary
1447            ))
1448        })
1449    }
1450
1451    fn server_key_for_file(&self, file_path: &Path, config: &Config) -> Option<ServerKey> {
1452        for def in servers_for_file(file_path, config) {
1453            let root = find_workspace_root(file_path, &def.root_markers)?;
1454            let key = ServerKey {
1455                kind: def.kind.clone(),
1456                root,
1457            };
1458            if self.clients.contains_key(&key) {
1459                return Some(key);
1460            }
1461        }
1462        None
1463    }
1464}
1465
1466impl Default for LspManager {
1467    fn default() -> Self {
1468        Self::new()
1469    }
1470}
1471
1472fn wait_for_stderr_tail(client: &mut LspClient) {
1473    for _ in 0..10 {
1474        if !client.stderr_tail().is_empty() {
1475            break;
1476        }
1477        std::thread::sleep(std::time::Duration::from_millis(10));
1478    }
1479}
1480
1481fn server_attempt_result_reason(result: &ServerAttemptResult) -> String {
1482    match result {
1483        ServerAttemptResult::SpawnFailed { binary, reason } => {
1484            format!("spawn_failed: {binary} ({reason})")
1485        }
1486        ServerAttemptResult::BinaryNotInstalled { binary } => {
1487            format!("binary_not_installed: {binary}")
1488        }
1489        ServerAttemptResult::NoRootMarker { looked_for } => {
1490            format!("no_root_marker (looked for: {})", looked_for.join(", "))
1491        }
1492        ServerAttemptResult::Ok { .. } => "ok".to_string(),
1493    }
1494}
1495
1496fn indent_tail(stderr_tail: &str, max_lines: usize) -> String {
1497    stderr_tail
1498        .lines()
1499        .rev()
1500        .take(max_lines)
1501        .collect::<Vec<_>>()
1502        .into_iter()
1503        .rev()
1504        .map(|line| format!("  {line}"))
1505        .collect::<Vec<_>>()
1506        .join("\n")
1507}
1508
1509fn format_initialize_failure_reason(binary: &str, stderr_tail: &str, err: &LspError) -> String {
1510    let mut reason = String::from("server crashed during initialize");
1511    if !stderr_tail.is_empty() {
1512        reason.push_str(". stderr tail (last 8 lines):\n");
1513        reason.push_str(&indent_tail(stderr_tail, 8));
1514    } else {
1515        reason.push_str(&format!(": {err}"));
1516    }
1517    reason.push_str("\n\n");
1518    reason.push_str(&failure_hint(binary, stderr_tail));
1519    reason
1520}
1521
1522fn format_post_initialize_exit_reason(
1523    binary: &str,
1524    status: std::process::ExitStatus,
1525    stderr_tail: &str,
1526    err: &LspError,
1527) -> String {
1528    let code = status
1529        .code()
1530        .map(|c| c.to_string())
1531        .unwrap_or_else(|| "signal/unknown".to_string());
1532    let mut reason = format!("server exited after initialize (code {code}): {err}");
1533    if !stderr_tail.is_empty() {
1534        reason.push_str(". stderr tail (last 8 lines):\n");
1535        reason.push_str(&indent_tail(stderr_tail, 8));
1536        reason.push_str("\n\n");
1537        reason.push_str(&failure_hint(binary, stderr_tail));
1538    }
1539    reason
1540}
1541
1542fn failure_hint(binary: &str, stderr_tail: &str) -> String {
1543    if stderr_tail.contains("MODULE_NOT_FOUND") || stderr_tail.contains("Cannot find module") {
1544        format!(
1545            "Hint: '{binary}' shim resolves to a missing module file. Common cause: package-manager \
1546             store corruption from filesystem migration, backup-restore, or store pruning. \
1547             Fix: reinstall (e.g. `npm install -g <package> --force` or pnpm/yarn equivalent), \
1548             or remove `lsp.servers.<id>` from your config to fall back to AFT's built-in (if available)."
1549        )
1550    } else {
1551        format!("Hint: see stderr above for '{binary}' failure details.")
1552    }
1553}
1554
1555fn canonicalize_for_lsp(file_path: &Path) -> Result<PathBuf, LspError> {
1556    std::fs::canonicalize(file_path).map_err(LspError::from)
1557}
1558
1559fn resolve_for_lsp_uri(file_path: &Path) -> PathBuf {
1560    if let Ok(path) = std::fs::canonicalize(file_path) {
1561        return path;
1562    }
1563
1564    let mut existing = file_path.to_path_buf();
1565    let mut missing = Vec::new();
1566    while !existing.exists() {
1567        let Some(name) = existing.file_name() else {
1568            break;
1569        };
1570        missing.push(name.to_owned());
1571        let Some(parent) = existing.parent() else {
1572            break;
1573        };
1574        existing = parent.to_path_buf();
1575    }
1576
1577    let mut resolved = std::fs::canonicalize(&existing).unwrap_or(existing);
1578    for segment in missing.into_iter().rev() {
1579        resolved.push(segment);
1580    }
1581    resolved
1582}
1583
1584fn language_id_for_extension(ext: &str) -> &'static str {
1585    match ext {
1586        "ts" => "typescript",
1587        "tsx" => "typescriptreact",
1588        "js" | "mjs" | "cjs" => "javascript",
1589        "jsx" => "javascriptreact",
1590        "py" | "pyi" => "python",
1591        "rs" => "rust",
1592        "go" => "go",
1593        "html" | "htm" => "html",
1594        _ => "plaintext",
1595    }
1596}
1597
1598fn normalize_lookup_path(path: &Path) -> PathBuf {
1599    std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1600}
1601
1602/// Classify an error returned by `spawn_server` into a structured
1603/// `ServerAttemptResult`. The two interesting cases for callers are:
1604/// - `BinaryNotInstalled` — the server's binary couldn't be resolved on PATH
1605///   or via override. The agent can be told "install bash-language-server".
1606/// - `SpawnFailed` — binary was found but spawning/initializing failed
1607///   (permissions, missing runtime, server crashed during initialize, etc.).
1608fn classify_spawn_error(binary: &str, err: &LspError) -> ServerAttemptResult {
1609    match err {
1610        // resolve_binary returns NotFound for both missing override paths and
1611        // missing PATH binaries. The "override missing" case is rare in
1612        // practice (only set in tests / env vars); we report all NotFound as
1613        // BinaryNotInstalled so the user sees an actionable install hint.
1614        LspError::NotFound(_) => ServerAttemptResult::BinaryNotInstalled {
1615            binary: binary.to_string(),
1616        },
1617        other => ServerAttemptResult::SpawnFailed {
1618            binary: binary.to_string(),
1619            reason: other.to_string(),
1620        },
1621    }
1622}
1623
1624fn env_binary_override(kind: &ServerKind) -> Option<PathBuf> {
1625    let id = kind.id_str();
1626    let suffix: String = id
1627        .chars()
1628        .map(|ch| {
1629            if ch.is_ascii_alphanumeric() {
1630                ch.to_ascii_uppercase()
1631            } else {
1632                '_'
1633            }
1634        })
1635        .collect();
1636    let key = format!("AFT_LSP_{suffix}_BINARY");
1637    std::env::var_os(key).map(PathBuf::from)
1638}