Skip to main content

aft/lsp/
manager.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3
4use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
5use lsp_types::notification::{
6    DidChangeTextDocument, DidChangeWatchedFiles, DidCloseTextDocument, DidOpenTextDocument,
7};
8use lsp_types::{
9    DidChangeTextDocumentParams, DidChangeWatchedFilesParams, DidCloseTextDocumentParams,
10    DidOpenTextDocumentParams, FileChangeType, FileEvent, TextDocumentContentChangeEvent,
11    TextDocumentIdentifier, TextDocumentItem, VersionedTextDocumentIdentifier,
12};
13
14use crate::config::Config;
15use crate::lsp::child_registry::LspChildRegistry;
16use crate::lsp::client::{LspClient, LspEvent, ServerState};
17use crate::lsp::diagnostics::{
18    from_lsp_diagnostics, DiagnosticEntry, DiagnosticsStore, StoredDiagnostic,
19};
20use crate::lsp::document::DocumentStore;
21use crate::lsp::position::{uri_for_path, uri_to_path};
22use crate::lsp::registry::{resolve_lsp_binary, servers_for_file, ServerDef, ServerKind};
23use crate::lsp::roots::{find_workspace_root, ServerKey};
24use crate::lsp::LspError;
25use crate::slog_error;
26
27const STDERR_REASON_BYTES: usize = 2 * 1024;
28
29/// Outcome of attempting to ensure a server is running for a single matching
30/// `ServerDef`. Returned per matching server so the caller can report exactly
31/// what happened to the user instead of collapsing all failures into "no
32/// server".
33#[derive(Debug, Clone)]
34pub enum ServerAttemptResult {
35    /// Server is running and ready to serve requests for this file.
36    Ok { server_key: ServerKey },
37    /// No workspace root was found by walking up from the file looking for
38    /// any of the server's configured root markers.
39    NoRootMarker { looked_for: Vec<String> },
40    /// The server's binary could not be found on PATH (or override was
41    /// missing/invalid).
42    BinaryNotInstalled { binary: String },
43    /// Binary was found but spawning or initializing the server failed.
44    SpawnFailed { binary: String, reason: String },
45}
46
47/// One server's attempt to handle a file.
48#[derive(Debug, Clone)]
49pub struct ServerAttempt {
50    /// Stable server identifier (kind ID, e.g. "pyright", "rust-analyzer").
51    pub server_id: String,
52    /// Server display name from the registry.
53    pub server_name: String,
54    pub result: ServerAttemptResult,
55}
56
57/// Aggregate outcome of `ensure_server_for_file_detailed`. Distinguishes:
58/// - "No server registered for this file's extension" (`attempts.is_empty()`)
59/// - "Servers registered but none could start" (`successful.is_empty()` but
60///   `!attempts.is_empty()`)
61/// - "At least one server is ready" (`!successful.is_empty()`)
62#[derive(Debug, Clone, Default)]
63pub struct EnsureServerOutcomes {
64    /// Server keys that are now running and ready to serve requests.
65    pub successful: Vec<ServerKey>,
66    /// Per-server attempt records. Empty if no server is registered for the
67    /// file's extension.
68    pub attempts: Vec<ServerAttempt>,
69}
70
71impl EnsureServerOutcomes {
72    /// True if no server in the registry matched this file's extension.
73    pub fn no_server_registered(&self) -> bool {
74        self.attempts.is_empty()
75    }
76}
77
78/// Outcome of a post-edit diagnostics wait. Reports the per-server status
79/// alongside the fresh diagnostics, so the response layer can build an
80/// honest tri-state payload (`success: true` + `complete: bool` + named
81/// gap fields per `crates/aft/src/protocol.rs`).
82///
83/// `diagnostics` only contains entries from servers that proved freshness
84/// (version-match preferred, epoch-fallback for unversioned servers).
85/// Pre-edit cached entries are NEVER included — that's the whole point of
86/// this type.
87#[derive(Debug, Clone, Default)]
88pub struct PostEditWaitOutcome {
89    /// Diagnostics from servers whose response we verified is FOR the
90    /// post-edit document version (or whose epoch we saw advance after our
91    /// pre-edit snapshot, for unversioned servers).
92    pub diagnostics: Vec<StoredDiagnostic>,
93    /// Servers we expected to publish but didn't before the deadline.
94    /// Reported to the agent via `pending_lsp_servers` so they understand
95    /// the result is partial.
96    pub pending_servers: Vec<ServerKey>,
97    /// Servers whose process exited between notification and deadline.
98    /// Reported separately so the agent knows the gap is unrecoverable
99    /// without a server restart, not "wait longer."
100    pub exited_servers: Vec<ServerKey>,
101}
102
103/// Pre-edit freshness snapshot for one server/file pair.
104#[derive(Debug, Clone, Copy, Default)]
105pub struct PreEditSnapshot {
106    pub epoch: u64,
107    pub document_version_at_capture: Option<i32>,
108}
109
110pub fn post_edit_entry_is_fresh(
111    entry: &DiagnosticEntry,
112    target_version: i32,
113    pre: PreEditSnapshot,
114) -> bool {
115    if entry.epoch <= pre.epoch {
116        return false;
117    }
118
119    match entry.version {
120        Some(version) => version >= target_version,
121        // Unversioned publishDiagnostics payloads cannot prove which document
122        // state they describe. Epoch advancement only proves arrival order; an
123        // old analysis result can still arrive after our pre-snapshot. Treat as
124        // pending/partial rather than fresh.
125        None => false,
126    }
127}
128
129impl PostEditWaitOutcome {
130    /// True if every expected server reported a fresh result. False means
131    /// the agent should treat the diagnostics as a partial picture.
132    pub fn complete(&self) -> bool {
133        self.pending_servers.is_empty() && self.exited_servers.is_empty()
134    }
135}
136
137/// Per-server outcome of a `textDocument/diagnostic` (per-file pull) request.
138#[derive(Debug, Clone)]
139pub enum PullFileOutcome {
140    /// Server returned a full report; diagnostics stored.
141    Full { diagnostic_count: usize },
142    /// Server returned `kind: "unchanged"` — cached diagnostics still valid.
143    Unchanged,
144    /// Server returned a partial-result token; we don't subscribe to streamed
145    /// progress so the response is treated as a soft empty until the next pull.
146    PartialNotSupported,
147    /// Server doesn't advertise pull capability — caller should fall back to
148    /// push diagnostics for this server.
149    PullNotSupported,
150    /// The pull request failed (timeout, server error, etc.).
151    RequestFailed { reason: String },
152}
153
154/// Result of `pull_file_diagnostics` for one matching server.
155#[derive(Debug, Clone)]
156pub struct PullFileResult {
157    pub server_key: ServerKey,
158    pub outcome: PullFileOutcome,
159}
160
161/// Result of `pull_workspace_diagnostics` for a single server.
162#[derive(Debug, Clone)]
163pub struct PullWorkspaceResult {
164    pub server_key: ServerKey,
165    /// Files for which a Full report was received and cached. Files that came
166    /// back as `Unchanged` are NOT listed here because their cached entry was
167    /// already authoritative.
168    pub files_reported: Vec<PathBuf>,
169    /// True if the server returned a full response within the timeout.
170    pub complete: bool,
171    /// True if we cancelled (request timed out before the server responded).
172    pub cancelled: bool,
173    /// True if the server advertised workspace pull support. When false, the
174    /// other fields are empty and the caller should fall back to file-mode
175    /// pull or to push semantics.
176    pub supports_workspace: bool,
177}
178
179pub struct LspManager {
180    /// Active server instances, keyed by (ServerKind, workspace_root).
181    clients: HashMap<ServerKey, LspClient>,
182    /// Binary names for active server instances. Kept separate from
183    /// `LspClient` so crash handling can report the installable binary name
184    /// after a post-initialize process exit.
185    server_binaries: HashMap<ServerKey, String>,
186    /// Tracks opened documents and versions per active server.
187    documents: HashMap<ServerKey, DocumentStore>,
188    /// Stored publishDiagnostics payloads across all servers.
189    diagnostics: DiagnosticsStore,
190    /// Unified event channel — all server reader threads send here.
191    event_tx: Sender<LspEvent>,
192    event_rx: Receiver<LspEvent>,
193    /// Optional binary path overrides used by integration tests.
194    binary_overrides: HashMap<ServerKind, PathBuf>,
195    /// Extra env vars merged into every spawned LSP child. Used in tests to
196    /// drive the fake server's behavioral variants (`AFT_FAKE_LSP_PULL=1`,
197    /// `AFT_FAKE_LSP_WORKSPACE=1`, etc.). Production code does not set this.
198    extra_env: HashMap<String, String>,
199    /// Per-(kind,root) cache of spawn failures. Once a server fails to spawn
200    /// for a workspace root, we remember why and skip subsequent attempts for
201    /// the lifetime of this AFT process. Without this, every file open or
202    /// didChange retries `spawn_server` and logs a fresh ERROR — visible as
203    /// repeated `failed to spawn TypeScript Language Server: Could not find a
204    /// valid TypeScript installation` lines per edit.
205    ///
206    /// Entries are NEVER evicted automatically. The expected recovery path is
207    /// for the user to fix their environment (install the missing binary or
208    /// add a `tsconfig.json` / `package.json` with the right dependency) and
209    /// restart OpenCode/Pi, which spawns a fresh `aft` process with an empty
210    /// cache. We deliberately don't auto-retry on file events: the failure
211    /// modes we track here (binary not installed, init handshake failure)
212    /// don't fix themselves at runtime.
213    failed_spawns: HashMap<ServerKey, ServerAttemptResult>,
214    /// Server/root pairs for which we already logged that watched-file
215    /// notifications are skipped because the capability is absent.
216    watched_file_skip_logged: HashSet<ServerKey>,
217    /// Tracks PIDs of spawned LSP child processes so the signal handler can
218    /// kill them on SIGTERM/SIGINT before aft exits, preventing orphans.
219    /// Defaults to empty; production wires this from `AppContext`.
220    child_registry: LspChildRegistry,
221}
222
223impl LspManager {
224    pub fn new() -> Self {
225        let (event_tx, event_rx) = unbounded();
226        Self {
227            clients: HashMap::new(),
228            server_binaries: HashMap::new(),
229            documents: HashMap::new(),
230            diagnostics: DiagnosticsStore::new(),
231            event_tx,
232            event_rx,
233            binary_overrides: HashMap::new(),
234            extra_env: HashMap::new(),
235            failed_spawns: HashMap::new(),
236            watched_file_skip_logged: HashSet::new(),
237            child_registry: LspChildRegistry::new(),
238        }
239    }
240
241    /// Set the child-PID registry. Must be called before any servers spawn.
242    pub fn set_child_registry(&mut self, registry: LspChildRegistry) {
243        self.child_registry = registry;
244    }
245
246    /// For testing: set an extra environment variable that gets passed to
247    /// every spawned LSP child process. Useful for driving fake-server
248    /// behavioral variants in integration tests.
249    pub fn set_extra_env(&mut self, key: &str, value: &str) {
250        self.extra_env.insert(key.to_string(), value.to_string());
251    }
252
253    /// Count active LSP server instances.
254    pub fn server_count(&self) -> usize {
255        self.clients.len()
256    }
257
258    /// For testing: override the binary for a server kind.
259    pub fn override_binary(&mut self, kind: ServerKind, binary_path: PathBuf) {
260        self.binary_overrides.insert(kind, binary_path);
261    }
262
263    /// Ensure a server is running for the given file. Spawns if needed.
264    /// Returns the active server keys for the file, or an empty vec if none match.
265    ///
266    /// This is the lightweight wrapper around [`ensure_server_for_file_detailed`]
267    /// that drops failure context. Prefer the detailed variant in command
268    /// handlers that need to surface honest error messages to the agent.
269    pub fn ensure_server_for_file(&mut self, file_path: &Path, config: &Config) -> Vec<ServerKey> {
270        self.ensure_server_for_file_detailed(file_path, config)
271            .successful
272    }
273
274    /// Detailed version of [`ensure_server_for_file`] that records every
275    /// matching server's outcome (`Ok` / `NoRootMarker` / `BinaryNotInstalled`
276    /// / `SpawnFailed`).
277    ///
278    /// Use this when the caller wants to honestly report _why_ a file has no
279    /// active server (e.g., to surface "bash-language-server not on PATH" to
280    /// the agent instead of silently returning `total: 0`).
281    pub fn ensure_server_for_file_detailed(
282        &mut self,
283        file_path: &Path,
284        config: &Config,
285    ) -> EnsureServerOutcomes {
286        let defs = servers_for_file(file_path, config);
287        let mut outcomes = EnsureServerOutcomes::default();
288
289        for def in defs {
290            let server_id = def.kind.id_str().to_string();
291            let server_name = def.name.to_string();
292
293            let Some(root) = find_workspace_root(file_path, &def.root_markers) else {
294                outcomes.attempts.push(ServerAttempt {
295                    server_id,
296                    server_name,
297                    result: ServerAttemptResult::NoRootMarker {
298                        looked_for: def.root_markers.iter().map(|s| s.to_string()).collect(),
299                    },
300                });
301                continue;
302            };
303
304            let key = ServerKey {
305                kind: def.kind.clone(),
306                root,
307            };
308
309            if !self.clients.contains_key(&key) {
310                // If we already tried and failed to spawn this server for this
311                // root, return the cached classification without retrying or
312                // re-logging. This prevents per-edit ERROR spam when the user's
313                // environment is missing a dependency the LSP needs (the
314                // typescript-language-server "Could not find a valid TypeScript
315                // installation" case is the canonical example).
316                if let Some(cached) = self.failed_spawns.get(&key) {
317                    outcomes.attempts.push(ServerAttempt {
318                        server_id,
319                        server_name,
320                        result: cached.clone(),
321                    });
322                    continue;
323                }
324
325                match self.spawn_server(&def, &key.root, config) {
326                    Ok(client) => {
327                        self.clients.insert(key.clone(), client);
328                        self.server_binaries.insert(key.clone(), def.binary.clone());
329                        self.documents.entry(key.clone()).or_default();
330                    }
331                    Err(err) => {
332                        slog_error!("failed to spawn {}: {}", def.name, err);
333                        let result = classify_spawn_error(&def.binary, &err);
334                        // Remember the failure so subsequent file events skip
335                        // this (kind, root) pair instead of producing a fresh
336                        // spawn attempt + ERROR log per request.
337                        self.failed_spawns.insert(key.clone(), result.clone());
338                        outcomes.attempts.push(ServerAttempt {
339                            server_id,
340                            server_name,
341                            result,
342                        });
343                        continue;
344                    }
345                }
346            }
347
348            outcomes.attempts.push(ServerAttempt {
349                server_id,
350                server_name,
351                result: ServerAttemptResult::Ok {
352                    server_key: key.clone(),
353                },
354            });
355            outcomes.successful.push(key);
356        }
357
358        outcomes
359    }
360
361    /// Ensure a server is running using the default LSP registry.
362    /// Kept for integration tests that exercise built-in server helpers directly.
363    pub fn ensure_server_for_file_default(&mut self, file_path: &Path) -> Vec<ServerKey> {
364        self.ensure_server_for_file(file_path, &Config::default())
365    }
366    /// Ensure that servers are running for the file and that the document is open
367    /// in each server's DocumentStore. Reads file content from disk if not already open.
368    /// Returns the server keys for the file.
369    pub fn ensure_file_open(
370        &mut self,
371        file_path: &Path,
372        config: &Config,
373    ) -> Result<Vec<ServerKey>, LspError> {
374        let canonical_path = canonicalize_for_lsp(file_path)?;
375        let server_keys = self.ensure_server_for_file(&canonical_path, config);
376        if server_keys.is_empty() {
377            return Ok(server_keys);
378        }
379
380        let uri = uri_for_path(&canonical_path)?;
381        let language_id = language_id_for_extension(
382            canonical_path
383                .extension()
384                .and_then(|ext| ext.to_str())
385                .unwrap_or_default(),
386        )
387        .to_string();
388
389        for key in &server_keys {
390            let already_open = self
391                .documents
392                .get(key)
393                .is_some_and(|store| store.is_open(&canonical_path));
394
395            if !already_open {
396                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
397                if let Some(client) = self.clients.get_mut(key) {
398                    client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
399                        text_document: TextDocumentItem::new(
400                            uri.clone(),
401                            language_id.clone(),
402                            0,
403                            content,
404                        ),
405                    })?;
406                }
407                self.documents
408                    .entry(key.clone())
409                    .or_default()
410                    .open(canonical_path.clone());
411                continue;
412            }
413
414            // Document is already open. Check disk drift — if the file has
415            // been modified outside the AFT pipeline (other tool, manual
416            // edit, sibling session) we MUST send a didChange before any
417            // pull-diagnostic / hover query, otherwise the LSP server
418            // returns results computed from stale in-memory content.
419            //
420            // This is the regression fix Oracle flagged in finding #6:
421            // "ensure_file_open skips already-open files without checking
422            // if disk content changed."
423            let drifted = self
424                .documents
425                .get(key)
426                .is_some_and(|store| store.is_stale_on_disk(&canonical_path));
427            if drifted {
428                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
429                let next_version = self
430                    .documents
431                    .get(key)
432                    .and_then(|store| store.version(&canonical_path))
433                    .map(|v| v + 1)
434                    .unwrap_or(1);
435                if let Some(client) = self.clients.get_mut(key) {
436                    client.send_notification::<DidChangeTextDocument>(
437                        DidChangeTextDocumentParams {
438                            text_document: VersionedTextDocumentIdentifier::new(
439                                uri.clone(),
440                                next_version,
441                            ),
442                            content_changes: vec![TextDocumentContentChangeEvent {
443                                range: None,
444                                range_length: None,
445                                text: content,
446                            }],
447                        },
448                    )?;
449                }
450                if let Some(store) = self.documents.get_mut(key) {
451                    store.bump_version(&canonical_path);
452                }
453            }
454        }
455
456        Ok(server_keys)
457    }
458
459    pub fn ensure_file_open_default(
460        &mut self,
461        file_path: &Path,
462    ) -> Result<Vec<ServerKey>, LspError> {
463        self.ensure_file_open(file_path, &Config::default())
464    }
465
466    /// Notify relevant LSP servers that a file has been written/changed.
467    /// This is the main hook called after every file write in AFT.
468    ///
469    /// If the file's server isn't running yet, starts it (lazy spawn).
470    /// If the file isn't open in LSP yet, sends didOpen. Otherwise sends didChange.
471    pub fn notify_file_changed(
472        &mut self,
473        file_path: &Path,
474        content: &str,
475        config: &Config,
476    ) -> Result<(), LspError> {
477        self.notify_file_changed_versioned(file_path, content, config)
478            .map(|_| ())
479    }
480
481    /// Like `notify_file_changed`, but returns the target document version
482    /// per server so the post-edit waiter can match `publishDiagnostics`
483    /// against the exact version that this notification carried.
484    ///
485    /// Returns: `Vec<(ServerKey, target_version)>`. `target_version` is the
486    /// `version` field on the `VersionedTextDocumentIdentifier` we just sent
487    /// (post-bump). For freshly-opened documents (`didOpen`) the version is
488    /// `0`. Servers that don't honor versioned text document sync will not
489    /// echo this back on `publishDiagnostics`; the caller is expected to
490    /// fall back to the epoch-delta path for those.
491    pub fn notify_file_changed_versioned(
492        &mut self,
493        file_path: &Path,
494        content: &str,
495        config: &Config,
496    ) -> Result<Vec<(ServerKey, i32)>, LspError> {
497        let canonical_path = canonicalize_for_lsp(file_path)?;
498        let server_keys = self.ensure_server_for_file(&canonical_path, config);
499        if server_keys.is_empty() {
500            return Ok(Vec::new());
501        }
502
503        let uri = uri_for_path(&canonical_path)?;
504        let language_id = language_id_for_extension(
505            canonical_path
506                .extension()
507                .and_then(|ext| ext.to_str())
508                .unwrap_or_default(),
509        )
510        .to_string();
511
512        let mut versions: Vec<(ServerKey, i32)> = Vec::with_capacity(server_keys.len());
513
514        for key in server_keys {
515            let current_version = self
516                .documents
517                .get(&key)
518                .and_then(|store| store.version(&canonical_path));
519
520            if let Some(version) = current_version {
521                let next_version = version + 1;
522                if let Some(client) = self.clients.get_mut(&key) {
523                    client.send_notification::<DidChangeTextDocument>(
524                        DidChangeTextDocumentParams {
525                            text_document: VersionedTextDocumentIdentifier::new(
526                                uri.clone(),
527                                next_version,
528                            ),
529                            content_changes: vec![TextDocumentContentChangeEvent {
530                                range: None,
531                                range_length: None,
532                                text: content.to_string(),
533                            }],
534                        },
535                    )?;
536                }
537                if let Some(store) = self.documents.get_mut(&key) {
538                    store.bump_version(&canonical_path);
539                }
540                versions.push((key, next_version));
541                continue;
542            }
543
544            if let Some(client) = self.clients.get_mut(&key) {
545                client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
546                    text_document: TextDocumentItem::new(
547                        uri.clone(),
548                        language_id.clone(),
549                        0,
550                        content.to_string(),
551                    ),
552                })?;
553            }
554            self.documents
555                .entry(key.clone())
556                .or_default()
557                .open(canonical_path.clone());
558            // didOpen carries version 0 — that's the version the server
559            // will echo on its first publishDiagnostics for this document.
560            versions.push((key, 0));
561        }
562
563        Ok(versions)
564    }
565
566    pub fn notify_file_changed_default(
567        &mut self,
568        file_path: &Path,
569        content: &str,
570    ) -> Result<(), LspError> {
571        self.notify_file_changed(file_path, content, &Config::default())
572    }
573
574    /// Notify every active server whose workspace contains at least one changed
575    /// path that watched files changed. This is intentionally workspace-scoped
576    /// rather than extension-scoped: configuration edits such as `package.json`
577    /// or `tsconfig.json` affect a server's project graph even though those
578    /// files may not be documents handled by the server itself.
579    pub fn notify_files_watched_changed(
580        &mut self,
581        paths: &[(PathBuf, FileChangeType)],
582        _config: &Config,
583    ) -> Result<(), LspError> {
584        if paths.is_empty() {
585            return Ok(());
586        }
587
588        let mut canonical_events = Vec::with_capacity(paths.len());
589        for (path, typ) in paths {
590            let canonical_path = resolve_for_lsp_uri(path);
591            canonical_events.push((canonical_path, *typ));
592        }
593
594        let keys: Vec<ServerKey> = self.clients.keys().cloned().collect();
595        for key in keys {
596            let mut changes = Vec::new();
597            for (path, typ) in &canonical_events {
598                if !path.starts_with(&key.root) {
599                    continue;
600                }
601                changes.push(FileEvent::new(uri_for_path(path)?, *typ));
602            }
603
604            if changes.is_empty() {
605                continue;
606            }
607
608            if let Some(client) = self.clients.get_mut(&key) {
609                // Send when the server either advertised initialize-time
610                // watched-file support or dynamically registered a watcher.
611                // The dynamic client capability we send during initialize only
612                // permits runtime registration; it is tracked separately via
613                // `has_watched_file_registration()`.
614                let supports_static_watched_files = client.supports_watched_files();
615                let has_dynamic_registration = client.has_watched_file_registration();
616                if !(supports_static_watched_files || has_dynamic_registration) {
617                    if self.watched_file_skip_logged.insert(key.clone()) {
618                        log::debug!(
619                            "skipping didChangeWatchedFiles for {:?} (not supported or registered)",
620                            key
621                        );
622                    }
623                    continue;
624                }
625                client.send_notification::<DidChangeWatchedFiles>(DidChangeWatchedFilesParams {
626                    changes,
627                })?;
628            }
629        }
630
631        Ok(())
632    }
633
634    /// Close a document in all servers that have it open.
635    pub fn notify_file_closed(&mut self, file_path: &Path) -> Result<(), LspError> {
636        let canonical_path = canonicalize_for_lsp(file_path)?;
637        let uri = uri_for_path(&canonical_path)?;
638        let keys: Vec<ServerKey> = self.documents.keys().cloned().collect();
639
640        for key in keys {
641            let was_open = self
642                .documents
643                .get(&key)
644                .map(|store| store.is_open(&canonical_path))
645                .unwrap_or(false);
646            if !was_open {
647                continue;
648            }
649
650            if let Some(client) = self.clients.get_mut(&key) {
651                client.send_notification::<DidCloseTextDocument>(DidCloseTextDocumentParams {
652                    text_document: TextDocumentIdentifier::new(uri.clone()),
653                })?;
654            }
655
656            if let Some(store) = self.documents.get_mut(&key) {
657                store.close(&canonical_path);
658            }
659            self.diagnostics
660                .clear_for_server_file(&key, &canonical_path);
661        }
662
663        Ok(())
664    }
665
666    /// Get an active client for a file path, if one exists.
667    pub fn client_for_file(&self, file_path: &Path, config: &Config) -> Option<&LspClient> {
668        let key = self.server_key_for_file(file_path, config)?;
669        self.clients.get(&key)
670    }
671
672    pub fn client_for_file_default(&self, file_path: &Path) -> Option<&LspClient> {
673        self.client_for_file(file_path, &Config::default())
674    }
675
676    /// Get a mutable active client for a file path, if one exists.
677    pub fn client_for_file_mut(
678        &mut self,
679        file_path: &Path,
680        config: &Config,
681    ) -> Option<&mut LspClient> {
682        let key = self.server_key_for_file(file_path, config)?;
683        self.clients.get_mut(&key)
684    }
685
686    pub fn client_for_file_mut_default(&mut self, file_path: &Path) -> Option<&mut LspClient> {
687        self.client_for_file_mut(file_path, &Config::default())
688    }
689
690    /// Number of tracked server clients.
691    pub fn active_client_count(&self) -> usize {
692        self.clients.len()
693    }
694
695    /// Drain all pending LSP events. Call from the main loop.
696    pub fn drain_events(&mut self) -> Vec<LspEvent> {
697        let mut events = Vec::new();
698        while let Ok(event) = self.event_rx.try_recv() {
699            self.handle_event(&event);
700            events.push(event);
701        }
702        events
703    }
704
705    /// Wait for diagnostics to arrive for a specific file until a timeout expires.
706    pub fn wait_for_diagnostics(
707        &mut self,
708        file_path: &Path,
709        config: &Config,
710        timeout: std::time::Duration,
711    ) -> Vec<StoredDiagnostic> {
712        let deadline = std::time::Instant::now() + timeout;
713        self.wait_for_file_diagnostics(file_path, config, deadline)
714    }
715
716    pub fn wait_for_diagnostics_default(
717        &mut self,
718        file_path: &Path,
719        timeout: std::time::Duration,
720    ) -> Vec<StoredDiagnostic> {
721        self.wait_for_diagnostics(file_path, &Config::default(), timeout)
722    }
723
724    /// Test-only accessor for the diagnostics store. Used by integration
725    /// tests that need to inspect per-server entries (e.g., to verify that
726    /// `ServerKey::root` is populated correctly, not the empty path that
727    /// the legacy `publish_with_kind` path produced).
728    #[doc(hidden)]
729    pub fn diagnostics_store_for_test(&self) -> &DiagnosticsStore {
730        &self.diagnostics
731    }
732
733    /// Snapshot the current per-server epoch for every entry that exists
734    /// for `file_path`. Servers without an entry yet (never published)
735    /// are absent from the map; for those, `pre = 0` (any first publish
736    /// will be considered fresh under the epoch-fallback rule).
737    pub fn snapshot_diagnostic_epochs(&self, file_path: &Path) -> HashMap<ServerKey, u64> {
738        let lookup_path = normalize_lookup_path(file_path);
739        self.diagnostics
740            .entries_for_file(&lookup_path)
741            .into_iter()
742            .map(|(key, entry)| (key.clone(), entry.epoch))
743            .collect()
744    }
745
746    /// Snapshot the current diagnostic epoch and document version for every
747    /// active server relevant to `file_path` before a post-edit notification.
748    pub fn snapshot_pre_edit_state(&self, file_path: &Path) -> HashMap<ServerKey, PreEditSnapshot> {
749        let lookup_path = normalize_lookup_path(file_path);
750        let mut snapshots: HashMap<ServerKey, PreEditSnapshot> = self
751            .diagnostics
752            .entries_for_file(&lookup_path)
753            .into_iter()
754            .map(|(key, entry)| {
755                (
756                    key.clone(),
757                    PreEditSnapshot {
758                        epoch: entry.epoch,
759                        document_version_at_capture: None,
760                    },
761                )
762            })
763            .collect();
764
765        for (key, store) in &self.documents {
766            if let Some(version) = store.version(&lookup_path) {
767                snapshots
768                    .entry(key.clone())
769                    .or_default()
770                    .document_version_at_capture = Some(version);
771            }
772        }
773
774        snapshots
775    }
776
777    /// True when the current diagnostic entry for `server_key` can be tied to
778    /// that server's current in-memory document version for `file_path`.
779    ///
780    /// File-mode `lsp_diagnostics` uses this for push-only fallback after it
781    /// has synced/opened the document. Versioned publishes are accepted when
782    /// they match the current document version; unversioned publishes are not
783    /// accepted as fresh because epoch/wall-clock ordering alone is racy.
784    pub fn diagnostic_entry_is_fresh_for_document(
785        &self,
786        file_path: &Path,
787        server_key: &ServerKey,
788        pre: PreEditSnapshot,
789    ) -> bool {
790        let lookup_path = normalize_lookup_path(file_path);
791        let Some(entry) = self
792            .diagnostics
793            .entries_for_file(&lookup_path)
794            .into_iter()
795            .find_map(|(key, entry)| if key == server_key { Some(entry) } else { None })
796        else {
797            return false;
798        };
799
800        let target_version = self
801            .documents
802            .get(server_key)
803            .and_then(|store| store.version(&lookup_path))
804            .or(pre.document_version_at_capture)
805            .unwrap_or(0);
806
807        matches!(entry.version, Some(version) if version >= target_version)
808    }
809
810    /// Wait for FRESH per-server diagnostics that match the just-sent
811    /// document version. This is the v0.17.3 post-edit path that fixes the
812    /// stale-diagnostics bug: instead of returning whatever is in the cache
813    /// when the deadline hits, we only return entries whose `version`
814    /// matches the post-edit target version (or, for servers that don't
815    /// participate in versioned sync, whose `epoch` was bumped after the
816    /// pre-edit snapshot).
817    ///
818    /// `expected_versions` should come from `notify_file_changed_versioned`
819    /// — one `(ServerKey, target_version)` per server we sent didChange/
820    /// didOpen to.
821    ///
822    /// `pre_snapshot` is the per-server epoch BEFORE the notification was
823    /// sent; it gates the epoch-fallback path so an old-version publish
824    /// arriving after `drain_events` and before `didChange` cannot be
825    /// mistaken for a fresh response.
826    ///
827    /// Returns a per-server tri-state: `Fresh` (publish matched target
828    /// version OR epoch advanced past snapshot for an unversioned server),
829    /// `Pending` (deadline hit before this server published anything we
830    /// could verify), or `Exited` (server died between notification and
831    /// deadline).
832    pub fn wait_for_post_edit_diagnostics(
833        &mut self,
834        file_path: &Path,
835        // `config` is intentionally accepted (matches sibling wait APIs and
836        // future-proofs us if freshness rules need it). Currently unused
837        // because expected_versions/pre_snapshot fully determine behavior.
838        _config: &Config,
839        expected_versions: &[(ServerKey, i32)],
840        pre_snapshot: &HashMap<ServerKey, PreEditSnapshot>,
841        timeout: std::time::Duration,
842    ) -> PostEditWaitOutcome {
843        let lookup_path = normalize_lookup_path(file_path);
844        let deadline = std::time::Instant::now() + timeout;
845
846        // Drain any events that arrived while we were sending didChange.
847        // The publishDiagnostics handler stores the version, so even
848        // pre-snapshot publishes that landed late won't be mistaken for
849        // fresh — the version-match check will reject them.
850        let _ = self.drain_events_for_file(&lookup_path);
851
852        let mut fresh: HashMap<ServerKey, Vec<StoredDiagnostic>> = HashMap::new();
853        let mut exited: Vec<ServerKey> = Vec::new();
854
855        loop {
856            // Check freshness for every expected server. A server is fresh
857            // if its current entry for this file satisfies either:
858            //   1. version-match: entry.version == Some(target_version), OR
859            //   2. push-only freshness: entry.version is None AND entry.epoch
860            //      advanced strictly after the pre-edit snapshot. Versioned
861            //      publishes must be >= the post-edit target version.
862            // Servers whose process has exited are reported separately.
863            for (key, target_version) in expected_versions {
864                if fresh.contains_key(key) || exited.contains(key) {
865                    continue;
866                }
867                if !self.clients.contains_key(key) {
868                    exited.push(key.clone());
869                    continue;
870                }
871                if let Some(entry) = self
872                    .diagnostics
873                    .entries_for_file(&lookup_path)
874                    .into_iter()
875                    .find_map(|(k, e)| if k == key { Some(e) } else { None })
876                {
877                    let pre = pre_snapshot.get(key).copied().unwrap_or_default();
878                    let is_fresh = post_edit_entry_is_fresh(entry, *target_version, pre);
879                    if is_fresh {
880                        fresh.insert(key.clone(), entry.diagnostics.clone());
881                    }
882                }
883            }
884
885            // All accounted for? Done.
886            if fresh.len() + exited.len() == expected_versions.len() {
887                break;
888            }
889
890            let now = std::time::Instant::now();
891            if now >= deadline {
892                break;
893            }
894
895            let timeout = deadline.saturating_duration_since(now);
896            match self.event_rx.recv_timeout(timeout) {
897                Ok(event) => {
898                    self.handle_event(&event);
899                }
900                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
901            }
902        }
903
904        // Pending = expected but neither fresh nor exited.
905        let pending: Vec<ServerKey> = expected_versions
906            .iter()
907            .filter(|(k, _)| !fresh.contains_key(k) && !exited.contains(k))
908            .map(|(k, _)| k.clone())
909            .collect();
910
911        // Build deduplicated, sorted diagnostics from the fresh servers only.
912        // Stale or pending servers contribute zero diagnostics.
913        let mut diagnostics: Vec<StoredDiagnostic> = fresh
914            .into_iter()
915            .flat_map(|(_, diags)| diags.into_iter())
916            .collect();
917        diagnostics.sort_by(|a, b| {
918            a.file
919                .cmp(&b.file)
920                .then(a.line.cmp(&b.line))
921                .then(a.column.cmp(&b.column))
922                .then(a.message.cmp(&b.message))
923        });
924
925        PostEditWaitOutcome {
926            diagnostics,
927            pending_servers: pending,
928            exited_servers: exited,
929        }
930    }
931
932    /// Wait for diagnostics to arrive for a specific file until a deadline.
933    ///
934    /// Drains already-queued events first, then blocks on the shared event
935    /// channel only until either `publishDiagnostics` arrives for this file or
936    /// the deadline is reached.
937    pub fn wait_for_file_diagnostics(
938        &mut self,
939        file_path: &Path,
940        config: &Config,
941        deadline: std::time::Instant,
942    ) -> Vec<StoredDiagnostic> {
943        let lookup_path = normalize_lookup_path(file_path);
944
945        if self.server_key_for_file(&lookup_path, config).is_none() {
946            return Vec::new();
947        }
948
949        loop {
950            if self.drain_events_for_file(&lookup_path) {
951                break;
952            }
953
954            let now = std::time::Instant::now();
955            if now >= deadline {
956                break;
957            }
958
959            let timeout = deadline.saturating_duration_since(now);
960            match self.event_rx.recv_timeout(timeout) {
961                Ok(event) => {
962                    if matches!(
963                        self.handle_event(&event),
964                        Some(ref published_file) if published_file.as_path() == lookup_path.as_path()
965                    ) {
966                        break;
967                    }
968                }
969                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
970            }
971        }
972
973        self.get_diagnostics_for_file(&lookup_path)
974            .into_iter()
975            .cloned()
976            .collect()
977    }
978
979    /// Default timeout for `textDocument/diagnostic` (per-file pull). Servers
980    /// usually respond in under 1s for files they've already analyzed; we
981    /// allow up to 10s before falling back to push semantics. Currently
982    /// surfaced via [`Self::pull_file_timeout`] for callers that want to
983    /// override the wait via the `wait_ms` knob.
984    pub const PULL_FILE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
985
986    /// Public accessor so command handlers can reuse the documented default.
987    pub fn pull_file_timeout() -> std::time::Duration {
988        Self::PULL_FILE_TIMEOUT
989    }
990
991    /// Default timeout for `workspace/diagnostic`. The LSP spec allows the
992    /// server to hold this open indefinitely; we cap at 10s and report
993    /// `complete: false` to the agent rather than hanging the bridge.
994    const PULL_WORKSPACE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
995
996    /// Issue a `textDocument/diagnostic` (LSP 3.17 per-file pull) request to
997    /// every server that supports pull diagnostics for the given file.
998    ///
999    /// Returns the per-server outcome. If a server reports `kind: "unchanged"`,
1000    /// the cached entry's diagnostics are surfaced (deterministic re-use of
1001    /// the previous response). If a server doesn't advertise pull capability,
1002    /// it's skipped here — the caller should fall back to push for those.
1003    ///
1004    /// Side effects: results are stored in `DiagnosticsStore` so directory-mode
1005    /// queries can aggregate them later.
1006    pub fn pull_file_diagnostics(
1007        &mut self,
1008        file_path: &Path,
1009        config: &Config,
1010    ) -> Result<Vec<PullFileResult>, LspError> {
1011        let canonical_path = canonicalize_for_lsp(file_path)?;
1012        // Make sure servers are running and the document is open with fresh
1013        // content (handles disk-drift via DocumentStore::is_stale_on_disk).
1014        self.ensure_file_open(&canonical_path, config)?;
1015
1016        let server_keys = self.ensure_server_for_file(&canonical_path, config);
1017        if server_keys.is_empty() {
1018            return Ok(Vec::new());
1019        }
1020
1021        let uri = uri_for_path(&canonical_path)?;
1022        let mut results = Vec::with_capacity(server_keys.len());
1023
1024        for key in server_keys {
1025            let supports_pull = self
1026                .clients
1027                .get(&key)
1028                .and_then(|c| c.diagnostic_capabilities())
1029                .is_some_and(|caps| caps.pull_diagnostics);
1030
1031            if !supports_pull {
1032                results.push(PullFileResult {
1033                    server_key: key.clone(),
1034                    outcome: PullFileOutcome::PullNotSupported,
1035                });
1036                continue;
1037            }
1038
1039            // Look up previous resultId for incremental requests.
1040            let previous_result_id = self
1041                .diagnostics
1042                .entries_for_file(&canonical_path)
1043                .into_iter()
1044                .find(|(k, _)| **k == key)
1045                .and_then(|(_, entry)| entry.result_id.clone());
1046
1047            let identifier = self
1048                .clients
1049                .get(&key)
1050                .and_then(|c| c.diagnostic_capabilities())
1051                .and_then(|caps| caps.identifier.clone());
1052
1053            let params = lsp_types::DocumentDiagnosticParams {
1054                text_document: lsp_types::TextDocumentIdentifier { uri: uri.clone() },
1055                identifier,
1056                previous_result_id,
1057                work_done_progress_params: Default::default(),
1058                partial_result_params: Default::default(),
1059            };
1060
1061            let outcome = match self.send_pull_request(&key, params) {
1062                Ok(report) => self.ingest_document_report(&key, &canonical_path, report),
1063                Err(err) => {
1064                    if let Some(result) = self.cache_post_initialize_exit(&key, &err) {
1065                        PullFileOutcome::RequestFailed {
1066                            reason: server_attempt_result_reason(&result),
1067                        }
1068                    } else if recoverable_pull_rejection(&err)
1069                        && self.clients.get(&key).is_some_and(|client| {
1070                            matches!(
1071                                client.state(),
1072                                ServerState::Ready | ServerState::Initializing
1073                            )
1074                        })
1075                    {
1076                        PullFileOutcome::RequestFailed {
1077                            reason: format!("pull_rejected_push_fallback: {err}"),
1078                        }
1079                    } else {
1080                        PullFileOutcome::RequestFailed {
1081                            reason: err.to_string(),
1082                        }
1083                    }
1084                }
1085            };
1086
1087            results.push(PullFileResult {
1088                server_key: key,
1089                outcome,
1090            });
1091        }
1092
1093        Ok(results)
1094    }
1095
1096    /// Issue a `workspace/diagnostic` request to a specific server. Cancels
1097    /// internally if `timeout` elapses before the server responds. Cached
1098    /// entries from the response are stored so directory-mode queries pick
1099    /// them up.
1100    pub fn pull_workspace_diagnostics(
1101        &mut self,
1102        server_key: &ServerKey,
1103        timeout: Option<std::time::Duration>,
1104    ) -> Result<PullWorkspaceResult, LspError> {
1105        let timeout = timeout.unwrap_or(Self::PULL_WORKSPACE_TIMEOUT);
1106
1107        let supports_workspace = self
1108            .clients
1109            .get(server_key)
1110            .and_then(|c| c.diagnostic_capabilities())
1111            .is_some_and(|caps| caps.workspace_diagnostics);
1112
1113        if !supports_workspace {
1114            return Ok(PullWorkspaceResult {
1115                server_key: server_key.clone(),
1116                files_reported: Vec::new(),
1117                complete: false,
1118                cancelled: false,
1119                supports_workspace: false,
1120            });
1121        }
1122
1123        let identifier = self
1124            .clients
1125            .get(server_key)
1126            .and_then(|c| c.diagnostic_capabilities())
1127            .and_then(|caps| caps.identifier.clone());
1128
1129        let params = lsp_types::WorkspaceDiagnosticParams {
1130            identifier,
1131            previous_result_ids: Vec::new(),
1132            work_done_progress_params: Default::default(),
1133            partial_result_params: Default::default(),
1134        };
1135
1136        let result = match self
1137            .clients
1138            .get_mut(server_key)
1139            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?
1140            .send_request_with_timeout::<lsp_types::request::WorkspaceDiagnosticRequest>(
1141                params, timeout,
1142            ) {
1143            Ok(result) => result,
1144            Err(LspError::Timeout(_)) => {
1145                return Ok(PullWorkspaceResult {
1146                    server_key: server_key.clone(),
1147                    files_reported: Vec::new(),
1148                    complete: false,
1149                    cancelled: true,
1150                    supports_workspace: true,
1151                });
1152            }
1153            Err(err) => {
1154                if let Some(result) = self.cache_post_initialize_exit(server_key, &err) {
1155                    return Err(LspError::ServerNotReady(server_attempt_result_reason(
1156                        &result,
1157                    )));
1158                }
1159                return Err(err);
1160            }
1161        };
1162
1163        // Extract the items list. Partial responses are not a complete
1164        // workspace view, but the partial payload can still contain useful
1165        // document reports; ingest those while surfacing complete=false.
1166        let (items, complete) = match result {
1167            lsp_types::WorkspaceDiagnosticReportResult::Report(report) => (report.items, true),
1168            lsp_types::WorkspaceDiagnosticReportResult::Partial(partial) => (partial.items, false),
1169        };
1170
1171        // Ingest each file report into the diagnostics store.
1172        let mut files_reported = Vec::with_capacity(items.len());
1173        for item in items {
1174            match item {
1175                lsp_types::WorkspaceDocumentDiagnosticReport::Full(full) => {
1176                    if let Some(file) = uri_to_path(&full.uri) {
1177                        let stored = from_lsp_diagnostics(
1178                            file.clone(),
1179                            full.full_document_diagnostic_report.items.clone(),
1180                        );
1181                        self.diagnostics.publish_with_result_id(
1182                            server_key.clone(),
1183                            file.clone(),
1184                            stored,
1185                            full.full_document_diagnostic_report.result_id.clone(),
1186                        );
1187                        files_reported.push(file);
1188                    }
1189                }
1190                lsp_types::WorkspaceDocumentDiagnosticReport::Unchanged(_unchanged) => {
1191                    // "Unchanged" means the previously cached report is still
1192                    // valid. We left it in place; nothing to do.
1193                }
1194            }
1195        }
1196
1197        Ok(PullWorkspaceResult {
1198            server_key: server_key.clone(),
1199            files_reported,
1200            complete,
1201            cancelled: false,
1202            supports_workspace: true,
1203        })
1204    }
1205
1206    fn cache_post_initialize_exit(
1207        &mut self,
1208        key: &ServerKey,
1209        err: &LspError,
1210    ) -> Option<ServerAttemptResult> {
1211        let binary = self
1212            .server_binaries
1213            .get(key)
1214            .cloned()
1215            .unwrap_or_else(|| key.kind.id_str().to_string());
1216        let (status, stderr_tail) = {
1217            let client = self.clients.get_mut(key)?;
1218            let mut status = client.child_exit_status();
1219            for _ in 0..10 {
1220                if status.is_some() {
1221                    break;
1222                }
1223                std::thread::sleep(std::time::Duration::from_millis(10));
1224                status = client.child_exit_status();
1225            }
1226            let status = status?;
1227            wait_for_stderr_tail(client);
1228            (status, client.stderr_tail())
1229        };
1230        let reason = format_post_initialize_exit_reason(&binary, status, &stderr_tail, err);
1231        let result = ServerAttemptResult::SpawnFailed { binary, reason };
1232        self.clients.remove(key);
1233        self.server_binaries.remove(key);
1234        self.documents.remove(key);
1235        self.diagnostics.clear_for_server(key);
1236        self.failed_spawns.insert(key.clone(), result.clone());
1237        Some(result)
1238    }
1239
1240    /// Issue the per-file diagnostic request and return the report.
1241    fn send_pull_request(
1242        &mut self,
1243        key: &ServerKey,
1244        params: lsp_types::DocumentDiagnosticParams,
1245    ) -> Result<lsp_types::DocumentDiagnosticReportResult, LspError> {
1246        let client = self
1247            .clients
1248            .get_mut(key)
1249            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?;
1250        client.send_request::<lsp_types::request::DocumentDiagnosticRequest>(params)
1251    }
1252
1253    /// Store the result of a per-file pull request and return a structured
1254    /// outcome the caller can inspect.
1255    fn ingest_document_report(
1256        &mut self,
1257        key: &ServerKey,
1258        canonical_path: &Path,
1259        result: lsp_types::DocumentDiagnosticReportResult,
1260    ) -> PullFileOutcome {
1261        let report = match result {
1262            lsp_types::DocumentDiagnosticReportResult::Report(report) => report,
1263            lsp_types::DocumentDiagnosticReportResult::Partial(_) => {
1264                // Partial results stream in via $/progress notifications which
1265                // we don't currently subscribe to. Treat as a soft-empty
1266                // success — the next pull will get the full version.
1267                return PullFileOutcome::PartialNotSupported;
1268            }
1269        };
1270
1271        match report {
1272            lsp_types::DocumentDiagnosticReport::Full(full) => {
1273                let result_id = full.full_document_diagnostic_report.result_id.clone();
1274                let stored = from_lsp_diagnostics(
1275                    canonical_path.to_path_buf(),
1276                    full.full_document_diagnostic_report.items.clone(),
1277                );
1278                let count = stored.len();
1279                self.diagnostics.publish_with_result_id(
1280                    key.clone(),
1281                    canonical_path.to_path_buf(),
1282                    stored,
1283                    result_id,
1284                );
1285                PullFileOutcome::Full {
1286                    diagnostic_count: count,
1287                }
1288            }
1289            lsp_types::DocumentDiagnosticReport::Unchanged(_unchanged) => {
1290                // The server says cache is still valid. That is only usable if
1291                // we already have a report for this exact server/file; an
1292                // initial `unchanged` response cannot prove freshness.
1293                if self
1294                    .diagnostics
1295                    .has_report_for_server_file(key, canonical_path)
1296                {
1297                    PullFileOutcome::Unchanged
1298                } else {
1299                    PullFileOutcome::RequestFailed {
1300                        reason: "no_cache_for_unchanged".to_string(),
1301                    }
1302                }
1303            }
1304        }
1305    }
1306
1307    /// Shutdown all servers gracefully.
1308    pub fn shutdown_all(&mut self) {
1309        for (key, mut client) in self.clients.drain() {
1310            if let Err(err) = client.shutdown() {
1311                slog_error!("error shutting down {:?}: {}", key, err);
1312            }
1313        }
1314        self.server_binaries.clear();
1315        self.documents.clear();
1316        self.diagnostics = DiagnosticsStore::new();
1317    }
1318
1319    /// Check if any server is active.
1320    pub fn has_active_servers(&self) -> bool {
1321        self.clients
1322            .values()
1323            .any(|client| client.state() == ServerState::Ready)
1324    }
1325
1326    /// Active server keys (running clients). Used by `lsp_diagnostics`
1327    /// directory mode to know which servers to ask for workspace pull.
1328    pub fn active_server_keys(&self) -> Vec<ServerKey> {
1329        self.clients.keys().cloned().collect()
1330    }
1331
1332    pub fn get_diagnostics_for_file(&self, file: &Path) -> Vec<&StoredDiagnostic> {
1333        let normalized = normalize_lookup_path(file);
1334        self.diagnostics.for_file(&normalized)
1335    }
1336
1337    pub fn get_diagnostics_for_directory(&self, dir: &Path) -> Vec<&StoredDiagnostic> {
1338        let normalized = normalize_lookup_path(dir);
1339        self.diagnostics.for_directory(&normalized)
1340    }
1341
1342    pub fn get_all_diagnostics(&self) -> Vec<&StoredDiagnostic> {
1343        self.diagnostics.all()
1344    }
1345
1346    fn drain_events_for_file(&mut self, file_path: &Path) -> bool {
1347        let mut saw_file_diagnostics = false;
1348        while let Ok(event) = self.event_rx.try_recv() {
1349            if matches!(
1350                self.handle_event(&event),
1351                Some(ref published_file) if published_file.as_path() == file_path
1352            ) {
1353                saw_file_diagnostics = true;
1354            }
1355        }
1356        saw_file_diagnostics
1357    }
1358
1359    fn handle_event(&mut self, event: &LspEvent) -> Option<PathBuf> {
1360        match event {
1361            LspEvent::Notification {
1362                server_kind,
1363                root,
1364                method,
1365                params: Some(params),
1366            } if method == "textDocument/publishDiagnostics" => {
1367                self.handle_publish_diagnostics(server_kind.clone(), root.clone(), params)
1368            }
1369            LspEvent::ServerExited { server_kind, root } => {
1370                let key = ServerKey {
1371                    kind: server_kind.clone(),
1372                    root: root.clone(),
1373                };
1374                self.clients.remove(&key);
1375                self.server_binaries.remove(&key);
1376                self.documents.remove(&key);
1377                self.diagnostics.clear_for_server(&key);
1378                None
1379            }
1380            _ => None,
1381        }
1382    }
1383
1384    fn handle_publish_diagnostics(
1385        &mut self,
1386        server: ServerKind,
1387        root: PathBuf,
1388        params: &serde_json::Value,
1389    ) -> Option<PathBuf> {
1390        if let Ok(publish_params) =
1391            serde_json::from_value::<lsp_types::PublishDiagnosticsParams>(params.clone())
1392        {
1393            let file = uri_to_path(&publish_params.uri)?;
1394            let stored = from_lsp_diagnostics(file.clone(), publish_params.diagnostics);
1395            // v0.17.3: store with real ServerKey { kind, root } and capture
1396            // the document `version` (when the server provided one) so the
1397            // post-edit waiter can reject stale publishes deterministically
1398            // via version-match (preferred) or epoch-delta (fallback). The
1399            // earlier `publish_with_kind` path silently dropped both.
1400            let key = ServerKey { kind: server, root };
1401            self.diagnostics
1402                .publish_full(key, file.clone(), stored, None, publish_params.version);
1403            return Some(file);
1404        }
1405        None
1406    }
1407
1408    fn spawn_server(
1409        &self,
1410        def: &ServerDef,
1411        root: &Path,
1412        config: &Config,
1413    ) -> Result<LspClient, LspError> {
1414        let binary = self.resolve_binary(def, config)?;
1415
1416        // Merge the server-defined env with our test-injected env.
1417        // `extra_env` is empty in production; tests use it to drive fake
1418        // server variants (AFT_FAKE_LSP_PULL=1, etc.).
1419        let mut merged_env = def.env.clone();
1420        for (key, value) in &self.extra_env {
1421            merged_env.insert(key.clone(), value.clone());
1422        }
1423
1424        let mut client = LspClient::spawn(
1425            def.kind.clone(),
1426            root.to_path_buf(),
1427            &binary,
1428            &def.args,
1429            &merged_env,
1430            self.event_tx.clone(),
1431            self.child_registry.clone(),
1432        )?;
1433        if let Err(err) = client.initialize(root, def.initialization_options.clone()) {
1434            wait_for_stderr_tail(&mut client);
1435            let stderr_tail = client.stderr_tail();
1436            let reason = if client.child_exited() || !stderr_tail.is_empty() {
1437                format_initialize_failure_reason(&def.binary, &stderr_tail, &err)
1438            } else {
1439                format!("server failed during initialize: {err}")
1440            };
1441            return Err(LspError::ServerNotReady(reason));
1442        }
1443        Ok(client)
1444    }
1445
1446    fn resolve_binary(&self, def: &ServerDef, config: &Config) -> Result<PathBuf, LspError> {
1447        if let Some(path) = self.binary_overrides.get(&def.kind) {
1448            if path.exists() {
1449                return Ok(path.clone());
1450            }
1451            return Err(LspError::NotFound(format!(
1452                "override binary for {:?} not found: {}",
1453                def.kind,
1454                path.display()
1455            )));
1456        }
1457
1458        if let Some(path) = env_binary_override(&def.kind) {
1459            if path.exists() {
1460                return Ok(path);
1461            }
1462            return Err(LspError::NotFound(format!(
1463                "environment override binary for {:?} not found: {}",
1464                def.kind,
1465                path.display()
1466            )));
1467        }
1468
1469        // Layered resolution:
1470        //   1. <project_root>/node_modules/.bin/<binary>
1471        //   2. config.lsp_paths_extra (plugin auto-install cache, etc.)
1472        //   3. PATH via `which`
1473        resolve_lsp_binary(
1474            &def.binary,
1475            config.project_root.as_deref(),
1476            &config.lsp_paths_extra,
1477        )
1478        .ok_or_else(|| {
1479            LspError::NotFound(format!(
1480                "language server binary '{}' not found in node_modules/.bin, lsp_paths_extra, or PATH",
1481                def.binary
1482            ))
1483        })
1484    }
1485
1486    fn server_key_for_file(&self, file_path: &Path, config: &Config) -> Option<ServerKey> {
1487        for def in servers_for_file(file_path, config) {
1488            let root = find_workspace_root(file_path, &def.root_markers)?;
1489            let key = ServerKey {
1490                kind: def.kind.clone(),
1491                root,
1492            };
1493            if self.clients.contains_key(&key) {
1494                return Some(key);
1495            }
1496        }
1497        None
1498    }
1499}
1500
1501impl Default for LspManager {
1502    fn default() -> Self {
1503        Self::new()
1504    }
1505}
1506
1507fn wait_for_stderr_tail(client: &mut LspClient) {
1508    for _ in 0..10 {
1509        if !client.stderr_tail().is_empty() {
1510            break;
1511        }
1512        std::thread::sleep(std::time::Duration::from_millis(10));
1513    }
1514}
1515
1516fn recoverable_pull_rejection(err: &LspError) -> bool {
1517    matches!(
1518        err,
1519        LspError::ServerError {
1520            code: -32601 | -32602,
1521            ..
1522        }
1523    )
1524}
1525
1526fn server_attempt_result_reason(result: &ServerAttemptResult) -> String {
1527    match result {
1528        ServerAttemptResult::SpawnFailed { binary, reason } => {
1529            format!("spawn_failed: {binary} ({reason})")
1530        }
1531        ServerAttemptResult::BinaryNotInstalled { binary } => {
1532            format!("binary_not_installed: {binary}")
1533        }
1534        ServerAttemptResult::NoRootMarker { looked_for } => {
1535            format!("no_root_marker (looked for: {})", looked_for.join(", "))
1536        }
1537        ServerAttemptResult::Ok { .. } => "ok".to_string(),
1538    }
1539}
1540
1541fn format_stderr_tail_for_reason(stderr_tail: &str) -> String {
1542    truncate_stderr_tail_for_reason(stderr_tail)
1543        .lines()
1544        .map(|line| format!("  {line}"))
1545        .collect::<Vec<_>>()
1546        .join("\n")
1547}
1548
1549fn truncate_stderr_tail_for_reason(stderr_tail: &str) -> String {
1550    if stderr_tail.len() <= STDERR_REASON_BYTES {
1551        return stderr_tail.to_string();
1552    }
1553
1554    let ellipsis = "...";
1555    let target_len = STDERR_REASON_BYTES.saturating_sub(ellipsis.len());
1556    let mut start = stderr_tail.len() - target_len;
1557    while start < stderr_tail.len() && !stderr_tail.is_char_boundary(start) {
1558        start += 1;
1559    }
1560    format!("{ellipsis}{}", &stderr_tail[start..])
1561}
1562
1563fn format_initialize_failure_reason(binary: &str, stderr_tail: &str, err: &LspError) -> String {
1564    let mut reason = format!("server crashed during initialize: {err}");
1565    if !stderr_tail.is_empty() {
1566        reason.push_str("; stderr (last 64 lines):\n");
1567        reason.push_str(&format_stderr_tail_for_reason(stderr_tail));
1568        reason.push_str("\n\n");
1569        reason.push_str(&failure_hint(binary, stderr_tail));
1570    }
1571    reason
1572}
1573
1574fn format_post_initialize_exit_reason(
1575    binary: &str,
1576    status: std::process::ExitStatus,
1577    stderr_tail: &str,
1578    err: &LspError,
1579) -> String {
1580    let code = status
1581        .code()
1582        .map(|c| c.to_string())
1583        .unwrap_or_else(|| "signal/unknown".to_string());
1584    let mut reason = format!("server exited after initialize (code {code}): {err}");
1585    if !stderr_tail.is_empty() {
1586        reason.push_str("; stderr (last 64 lines):\n");
1587        reason.push_str(&format_stderr_tail_for_reason(stderr_tail));
1588        reason.push_str("\n\n");
1589        reason.push_str(&failure_hint(binary, stderr_tail));
1590    }
1591    reason
1592}
1593
1594fn failure_hint(binary: &str, stderr_tail: &str) -> String {
1595    if stderr_tail.contains("MODULE_NOT_FOUND") || stderr_tail.contains("Cannot find module") {
1596        let package_manager = infer_package_manager(stderr_tail);
1597        format!(
1598            "Your package-manager shim resolves to a missing file. Try reinstalling: {package_manager} install -g {binary} --force. Common cause: hard-link breakage from fs migration or store prune."
1599        )
1600    } else {
1601        format!("Hint: see stderr above for '{binary}' failure details.")
1602    }
1603}
1604
1605fn infer_package_manager(stderr_tail: &str) -> &'static str {
1606    let lower = stderr_tail.to_ascii_lowercase();
1607    if lower.contains(".pnpm/") || lower.contains(".pnpm\\") || lower.contains("/pnpm/") {
1608        "pnpm"
1609    } else if lower.contains(".yarn/")
1610        || lower.contains(".yarn\\")
1611        || lower.contains("/yarn/")
1612        || lower.contains("yarn")
1613    {
1614        "yarn"
1615    } else {
1616        "npm"
1617    }
1618}
1619
1620fn canonicalize_for_lsp(file_path: &Path) -> Result<PathBuf, LspError> {
1621    std::fs::canonicalize(file_path).map_err(LspError::from)
1622}
1623
1624fn resolve_for_lsp_uri(file_path: &Path) -> PathBuf {
1625    if let Ok(path) = std::fs::canonicalize(file_path) {
1626        return path;
1627    }
1628
1629    let mut existing = file_path.to_path_buf();
1630    let mut missing = Vec::new();
1631    while !existing.exists() {
1632        let Some(name) = existing.file_name() else {
1633            break;
1634        };
1635        missing.push(name.to_owned());
1636        let Some(parent) = existing.parent() else {
1637            break;
1638        };
1639        existing = parent.to_path_buf();
1640    }
1641
1642    let mut resolved = std::fs::canonicalize(&existing).unwrap_or(existing);
1643    for segment in missing.into_iter().rev() {
1644        resolved.push(segment);
1645    }
1646    resolved
1647}
1648
1649fn language_id_for_extension(ext: &str) -> &'static str {
1650    match ext {
1651        "ts" => "typescript",
1652        "tsx" => "typescriptreact",
1653        "js" | "mjs" | "cjs" => "javascript",
1654        "jsx" => "javascriptreact",
1655        "py" | "pyi" => "python",
1656        "rs" => "rust",
1657        "go" => "go",
1658        "html" | "htm" => "html",
1659        _ => "plaintext",
1660    }
1661}
1662
1663fn normalize_lookup_path(path: &Path) -> PathBuf {
1664    std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1665}
1666
1667/// Classify an error returned by `spawn_server` into a structured
1668/// `ServerAttemptResult`. The two interesting cases for callers are:
1669/// - `BinaryNotInstalled` — the server's binary couldn't be resolved on PATH
1670///   or via override. The agent can be told "install bash-language-server".
1671/// - `SpawnFailed` — binary was found but spawning/initializing failed
1672///   (permissions, missing runtime, server crashed during initialize, etc.).
1673fn classify_spawn_error(binary: &str, err: &LspError) -> ServerAttemptResult {
1674    match err {
1675        // resolve_binary returns NotFound for both missing override paths and
1676        // missing PATH binaries. The "override missing" case is rare in
1677        // practice (only set in tests / env vars); we report all NotFound as
1678        // BinaryNotInstalled so the user sees an actionable install hint.
1679        LspError::NotFound(_) => ServerAttemptResult::BinaryNotInstalled {
1680            binary: binary.to_string(),
1681        },
1682        other => ServerAttemptResult::SpawnFailed {
1683            binary: binary.to_string(),
1684            reason: other.to_string(),
1685        },
1686    }
1687}
1688
1689fn env_binary_override(kind: &ServerKind) -> Option<PathBuf> {
1690    let id = kind.id_str();
1691    let suffix: String = id
1692        .chars()
1693        .map(|ch| {
1694            if ch.is_ascii_alphanumeric() {
1695                ch.to_ascii_uppercase()
1696            } else {
1697                '_'
1698            }
1699        })
1700        .collect();
1701    let key = format!("AFT_LSP_{suffix}_BINARY");
1702    std::env::var_os(key).map(PathBuf::from)
1703}