Skip to main content

aft/lsp/
manager.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3
4use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
5use lsp_types::notification::{
6    DidChangeTextDocument, DidChangeWatchedFiles, DidCloseTextDocument, DidOpenTextDocument,
7};
8use lsp_types::{
9    DidChangeTextDocumentParams, DidChangeWatchedFilesParams, DidCloseTextDocumentParams,
10    DidOpenTextDocumentParams, FileChangeType, FileEvent, TextDocumentContentChangeEvent,
11    TextDocumentIdentifier, TextDocumentItem, VersionedTextDocumentIdentifier,
12};
13
14use crate::config::Config;
15use crate::lsp::child_registry::LspChildRegistry;
16use crate::lsp::client::{LspClient, LspEvent, ServerState};
17use crate::lsp::diagnostics::{
18    from_lsp_diagnostics, DiagnosticEntry, DiagnosticsStore, StoredDiagnostic,
19};
20use crate::lsp::document::DocumentStore;
21use crate::lsp::position::{uri_for_path, uri_to_path};
22use crate::lsp::pull_params::{
23    AftDocumentDiagnosticParams, AftDocumentDiagnosticRequest, AftWorkspaceDiagnosticParams,
24    AftWorkspaceDiagnosticRequest,
25};
26use crate::lsp::registry::{resolve_lsp_binary, servers_for_file, ServerDef, ServerKind};
27use crate::lsp::roots::{find_workspace_root, ServerKey};
28use crate::lsp::LspError;
29use crate::slog_error;
30
31const STDERR_REASON_BYTES: usize = 2 * 1024;
32
33/// Outcome of attempting to ensure a server is running for a single matching
34/// `ServerDef`. Returned per matching server so the caller can report exactly
35/// what happened to the user instead of collapsing all failures into "no
36/// server".
37#[derive(Debug, Clone)]
38pub enum ServerAttemptResult {
39    /// Server is running and ready to serve requests for this file.
40    Ok { server_key: ServerKey },
41    /// No workspace root was found by walking up from the file looking for
42    /// any of the server's configured root markers.
43    NoRootMarker { looked_for: Vec<String> },
44    /// The server's binary could not be found on PATH (or override was
45    /// missing/invalid).
46    BinaryNotInstalled { binary: String },
47    /// Binary was found but spawning or initializing the server failed.
48    SpawnFailed { binary: String, reason: String },
49}
50
51/// One server's attempt to handle a file.
52#[derive(Debug, Clone)]
53pub struct ServerAttempt {
54    /// Stable server identifier (kind ID, e.g. "pyright", "rust-analyzer").
55    pub server_id: String,
56    /// Server display name from the registry.
57    pub server_name: String,
58    pub result: ServerAttemptResult,
59}
60
61/// Aggregate outcome of `ensure_server_for_file_detailed`. Distinguishes:
62/// - "No server registered for this file's extension" (`attempts.is_empty()`)
63/// - "Servers registered but none could start" (`successful.is_empty()` but
64///   `!attempts.is_empty()`)
65/// - "At least one server is ready" (`!successful.is_empty()`)
66#[derive(Debug, Clone, Default)]
67pub struct EnsureServerOutcomes {
68    /// Server keys that are now running and ready to serve requests.
69    pub successful: Vec<ServerKey>,
70    /// Per-server attempt records. Empty if no server is registered for the
71    /// file's extension.
72    pub attempts: Vec<ServerAttempt>,
73}
74
75impl EnsureServerOutcomes {
76    /// True if no server in the registry matched this file's extension.
77    pub fn no_server_registered(&self) -> bool {
78        self.attempts.is_empty()
79    }
80
81    /// True when servers matched the file's extension but none actually apply
82    /// to this project — i.e. nothing started and every attempt failed the root
83    /// marker check (e.g. oxlint registered for `.ts` with no `.oxlintrc.json`).
84    /// Distinct from `no_server_registered` (extension unsupported) and from a
85    /// real outage (binary missing / spawn failed): a missing root marker is a
86    /// filesystem fact that never changes mid-scan, so such a file will never
87    /// produce diagnostics and must not be reported as "pending".
88    pub fn only_inapplicable_root_markers(&self) -> bool {
89        self.successful.is_empty()
90            && !self.attempts.is_empty()
91            && self
92                .attempts
93                .iter()
94                .all(|attempt| matches!(attempt.result, ServerAttemptResult::NoRootMarker { .. }))
95    }
96}
97
98/// Outcome of a post-edit diagnostics wait. Reports the per-server status
99/// alongside the fresh diagnostics, so the response layer can build an
100/// honest tri-state payload (`success: true` + `complete: bool` + named
101/// gap fields per `crates/aft/src/protocol.rs`).
102///
103/// `diagnostics` only contains entries from servers that proved freshness
104/// (version-match preferred, epoch-fallback for unversioned servers).
105/// Pre-edit cached entries are NEVER included — that's the whole point of
106/// this type.
107#[derive(Debug, Clone, Default)]
108pub struct PostEditWaitOutcome {
109    /// Diagnostics from servers whose response we verified is FOR the
110    /// post-edit document version (or whose epoch we saw advance after our
111    /// pre-edit snapshot, for unversioned servers).
112    pub diagnostics: Vec<StoredDiagnostic>,
113    /// Servers we expected to publish but didn't before the deadline.
114    /// Reported to the agent via `pending_lsp_servers` so they understand
115    /// the result is partial.
116    pub pending_servers: Vec<ServerKey>,
117    /// Servers whose process exited between notification and deadline.
118    /// Reported separately so the agent knows the gap is unrecoverable
119    /// without a server restart, not "wait longer."
120    pub exited_servers: Vec<ServerKey>,
121}
122
123/// Pre-edit freshness snapshot for one server/file pair.
124#[derive(Debug, Clone, Copy, Default)]
125pub struct PreEditSnapshot {
126    pub epoch: u64,
127    pub document_version_at_capture: Option<i32>,
128}
129
130pub fn post_edit_entry_is_fresh(
131    entry: &DiagnosticEntry,
132    target_version: i32,
133    pre: PreEditSnapshot,
134) -> bool {
135    if entry.epoch <= pre.epoch {
136        return false;
137    }
138
139    match entry.version {
140        Some(version) => version >= target_version,
141        // Unversioned publishDiagnostics payloads cannot prove which document
142        // state they describe. Epoch advancement only proves arrival order; an
143        // old analysis result can still arrive after our pre-snapshot. Treat as
144        // pending/partial rather than fresh.
145        None => false,
146    }
147}
148
149impl PostEditWaitOutcome {
150    /// True if every expected server reported a fresh result. False means
151    /// the agent should treat the diagnostics as a partial picture.
152    pub fn complete(&self) -> bool {
153        self.pending_servers.is_empty() && self.exited_servers.is_empty()
154    }
155}
156
157/// Per-server outcome of a `textDocument/diagnostic` (per-file pull) request.
158#[derive(Debug, Clone)]
159pub enum PullFileOutcome {
160    /// Server returned a full report; diagnostics stored.
161    Full { diagnostic_count: usize },
162    /// Server returned `kind: "unchanged"` — cached diagnostics still valid.
163    Unchanged,
164    /// Server returned a partial-result token; we don't subscribe to streamed
165    /// progress so the response is treated as a soft empty until the next pull.
166    PartialNotSupported,
167    /// Server doesn't advertise pull capability — caller should fall back to
168    /// push diagnostics for this server.
169    PullNotSupported,
170    /// The pull request failed (timeout, server error, etc.).
171    RequestFailed { reason: String },
172}
173
174/// Result of `pull_file_diagnostics` for one matching server.
175#[derive(Debug, Clone)]
176pub struct PullFileResult {
177    pub server_key: ServerKey,
178    pub outcome: PullFileOutcome,
179}
180
181/// Result of `pull_workspace_diagnostics` for a single server.
182#[derive(Debug, Clone)]
183pub struct PullWorkspaceResult {
184    pub server_key: ServerKey,
185    /// Files for which a Full report was received and cached. Files that came
186    /// back as `Unchanged` are NOT listed here because their cached entry was
187    /// already authoritative.
188    pub files_reported: Vec<PathBuf>,
189    /// True if the server returned a full response within the timeout.
190    pub complete: bool,
191    /// True if we cancelled (request timed out before the server responded).
192    pub cancelled: bool,
193    /// True if the server advertised workspace pull support. When false, the
194    /// other fields are empty and the caller should fall back to file-mode
195    /// pull or to push semantics.
196    pub supports_workspace: bool,
197}
198
199pub struct LspManager {
200    /// Active server instances, keyed by (ServerKind, workspace_root).
201    clients: HashMap<ServerKey, LspClient>,
202    /// Binary names for active server instances. Kept separate from
203    /// `LspClient` so crash handling can report the installable binary name
204    /// after a post-initialize process exit.
205    server_binaries: HashMap<ServerKey, String>,
206    /// Tracks opened documents and versions per active server.
207    documents: HashMap<ServerKey, DocumentStore>,
208    /// Stored publishDiagnostics payloads across all servers.
209    diagnostics: DiagnosticsStore,
210    /// Unified event channel — all server reader threads send here.
211    event_tx: Sender<LspEvent>,
212    event_rx: Receiver<LspEvent>,
213    /// Optional binary path overrides used by integration tests.
214    binary_overrides: HashMap<ServerKind, PathBuf>,
215    /// Extra env vars merged into every spawned LSP child. Used in tests to
216    /// drive the fake server's behavioral variants (`AFT_FAKE_LSP_PULL=1`,
217    /// `AFT_FAKE_LSP_WORKSPACE=1`, etc.). Production code does not set this.
218    extra_env: HashMap<String, String>,
219    /// Per-(kind,root) cache of spawn failures. Once a server fails to spawn
220    /// for a workspace root, we remember why and skip subsequent attempts for
221    /// the lifetime of this AFT process. Without this, every file open or
222    /// didChange retries `spawn_server` and logs a fresh ERROR — visible as
223    /// repeated `failed to spawn TypeScript Language Server: Could not find a
224    /// valid TypeScript installation` lines per edit.
225    ///
226    /// Entries are NEVER evicted automatically. The expected recovery path is
227    /// for the user to fix their environment (install the missing binary or
228    /// add a `tsconfig.json` / `package.json` with the right dependency) and
229    /// restart OpenCode/Pi, which spawns a fresh `aft` process with an empty
230    /// cache. We deliberately don't auto-retry on file events: the failure
231    /// modes we track here (binary not installed, init handshake failure)
232    /// don't fix themselves at runtime.
233    failed_spawns: HashMap<ServerKey, ServerAttemptResult>,
234    /// Server/root pairs for which we already logged that watched-file
235    /// notifications are skipped because the capability is absent.
236    watched_file_skip_logged: HashSet<ServerKey>,
237    /// Tracks PIDs of spawned LSP child processes so the signal handler can
238    /// kill them on SIGTERM/SIGINT before aft exits, preventing orphans.
239    /// Defaults to empty; production wires this from `AppContext`.
240    child_registry: LspChildRegistry,
241}
242
243impl LspManager {
244    pub fn new() -> Self {
245        let (event_tx, event_rx) = unbounded();
246        Self {
247            clients: HashMap::new(),
248            server_binaries: HashMap::new(),
249            documents: HashMap::new(),
250            diagnostics: DiagnosticsStore::new(),
251            event_tx,
252            event_rx,
253            binary_overrides: HashMap::new(),
254            extra_env: HashMap::new(),
255            failed_spawns: HashMap::new(),
256            watched_file_skip_logged: HashSet::new(),
257            child_registry: LspChildRegistry::new(),
258        }
259    }
260
261    /// Set the child-PID registry. Must be called before any servers spawn.
262    pub fn set_child_registry(&mut self, registry: LspChildRegistry) {
263        self.child_registry = registry;
264    }
265
266    /// For testing: set an extra environment variable that gets passed to
267    /// every spawned LSP child process. Useful for driving fake-server
268    /// behavioral variants in integration tests.
269    pub fn set_extra_env(&mut self, key: &str, value: &str) {
270        self.extra_env.insert(key.to_string(), value.to_string());
271    }
272
273    /// Count active LSP server instances.
274    pub fn server_count(&self) -> usize {
275        self.clients.len()
276    }
277
278    /// For testing: override the binary for a server kind.
279    pub fn override_binary(&mut self, kind: ServerKind, binary_path: PathBuf) {
280        self.binary_overrides.insert(kind, binary_path);
281    }
282
283    /// Ensure a server is running for the given file. Spawns if needed.
284    /// Returns the active server keys for the file, or an empty vec if none match.
285    ///
286    /// This is the lightweight wrapper around [`ensure_server_for_file_detailed`]
287    /// that drops failure context. Prefer the detailed variant in command
288    /// handlers that need to surface honest error messages to the agent.
289    pub fn ensure_server_for_file(&mut self, file_path: &Path, config: &Config) -> Vec<ServerKey> {
290        self.ensure_server_for_file_detailed(file_path, config)
291            .successful
292    }
293
294    /// Detailed version of [`ensure_server_for_file`] that records every
295    /// matching server's outcome (`Ok` / `NoRootMarker` / `BinaryNotInstalled`
296    /// / `SpawnFailed`).
297    ///
298    /// Use this when the caller wants to honestly report _why_ a file has no
299    /// active server (e.g., to surface "bash-language-server not on PATH" to
300    /// the agent instead of silently returning `total: 0`).
301    pub fn ensure_server_for_file_detailed(
302        &mut self,
303        file_path: &Path,
304        config: &Config,
305    ) -> EnsureServerOutcomes {
306        let defs = servers_for_file(file_path, config);
307        let mut outcomes = EnsureServerOutcomes::default();
308
309        for def in defs {
310            let server_id = def.kind.id_str().to_string();
311            let server_name = def.name.to_string();
312
313            let Some(root) = find_workspace_root(file_path, &def.root_markers) else {
314                outcomes.attempts.push(ServerAttempt {
315                    server_id,
316                    server_name,
317                    result: ServerAttemptResult::NoRootMarker {
318                        looked_for: def.root_markers.iter().map(|s| s.to_string()).collect(),
319                    },
320                });
321                continue;
322            };
323
324            let key = ServerKey {
325                kind: def.kind.clone(),
326                root,
327            };
328
329            if !self.clients.contains_key(&key) {
330                // If we already tried and failed to spawn this server for this
331                // root, return the cached classification without retrying or
332                // re-logging. This prevents per-edit ERROR spam when the user's
333                // environment is missing a dependency the LSP needs (the
334                // typescript-language-server "Could not find a valid TypeScript
335                // installation" case is the canonical example).
336                if let Some(cached) = self.failed_spawns.get(&key) {
337                    outcomes.attempts.push(ServerAttempt {
338                        server_id,
339                        server_name,
340                        result: cached.clone(),
341                    });
342                    continue;
343                }
344
345                match self.spawn_server(&def, &key.root, config) {
346                    Ok(client) => {
347                        self.clients.insert(key.clone(), client);
348                        self.server_binaries.insert(key.clone(), def.binary.clone());
349                        self.documents.entry(key.clone()).or_default();
350                    }
351                    Err(err) => {
352                        slog_error!("failed to spawn {}: {}", def.name, err);
353                        let result = classify_spawn_error(&def.binary, &err);
354                        // Remember the failure so subsequent file events skip
355                        // this (kind, root) pair instead of producing a fresh
356                        // spawn attempt + ERROR log per request.
357                        self.failed_spawns.insert(key.clone(), result.clone());
358                        outcomes.attempts.push(ServerAttempt {
359                            server_id,
360                            server_name,
361                            result,
362                        });
363                        continue;
364                    }
365                }
366            }
367
368            outcomes.attempts.push(ServerAttempt {
369                server_id,
370                server_name,
371                result: ServerAttemptResult::Ok {
372                    server_key: key.clone(),
373                },
374            });
375            outcomes.successful.push(key);
376        }
377
378        outcomes
379    }
380
381    /// Ensure a server is running using the default LSP registry.
382    /// Kept for integration tests that exercise built-in server helpers directly.
383    pub fn ensure_server_for_file_default(&mut self, file_path: &Path) -> Vec<ServerKey> {
384        self.ensure_server_for_file(file_path, &Config::default())
385    }
386    /// Ensure that servers are running for the file and that the document is open
387    /// in each server's DocumentStore. Reads file content from disk if not already open.
388    /// Returns the server keys for the file.
389    pub fn ensure_file_open(
390        &mut self,
391        file_path: &Path,
392        config: &Config,
393    ) -> Result<Vec<ServerKey>, LspError> {
394        let canonical_path = canonicalize_for_lsp(file_path)?;
395        let server_keys = self.ensure_server_for_file(&canonical_path, config);
396        if server_keys.is_empty() {
397            return Ok(server_keys);
398        }
399
400        let uri = uri_for_path(&canonical_path)?;
401        let language_id = language_id_for_extension(
402            canonical_path
403                .extension()
404                .and_then(|ext| ext.to_str())
405                .unwrap_or_default(),
406        )
407        .to_string();
408
409        for key in &server_keys {
410            let already_open = self
411                .documents
412                .get(key)
413                .is_some_and(|store| store.is_open(&canonical_path));
414
415            if !already_open {
416                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
417                if let Some(client) = self.clients.get_mut(key) {
418                    client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
419                        text_document: TextDocumentItem::new(
420                            uri.clone(),
421                            language_id.clone(),
422                            0,
423                            content,
424                        ),
425                    })?;
426                }
427                self.documents
428                    .entry(key.clone())
429                    .or_default()
430                    .open(canonical_path.clone());
431                continue;
432            }
433
434            // Document is already open. Check disk drift — if the file has
435            // been modified outside the AFT pipeline (other tool, manual
436            // edit, sibling session) we MUST send a didChange before any
437            // pull-diagnostic / hover query, otherwise the LSP server
438            // returns results computed from stale in-memory content.
439            //
440            // This is the regression fix Oracle flagged in finding #6:
441            // "ensure_file_open skips already-open files without checking
442            // if disk content changed."
443            let drifted = self
444                .documents
445                .get(key)
446                .is_some_and(|store| store.is_stale_on_disk(&canonical_path));
447            if drifted {
448                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
449                let next_version = self
450                    .documents
451                    .get(key)
452                    .and_then(|store| store.version(&canonical_path))
453                    .map(|v| v + 1)
454                    .unwrap_or(1);
455                if let Some(client) = self.clients.get_mut(key) {
456                    client.send_notification::<DidChangeTextDocument>(
457                        DidChangeTextDocumentParams {
458                            text_document: VersionedTextDocumentIdentifier::new(
459                                uri.clone(),
460                                next_version,
461                            ),
462                            content_changes: vec![TextDocumentContentChangeEvent {
463                                range: None,
464                                range_length: None,
465                                text: content,
466                            }],
467                        },
468                    )?;
469                }
470                if let Some(store) = self.documents.get_mut(key) {
471                    store.bump_version(&canonical_path);
472                }
473            }
474        }
475
476        Ok(server_keys)
477    }
478
479    pub fn ensure_file_open_default(
480        &mut self,
481        file_path: &Path,
482    ) -> Result<Vec<ServerKey>, LspError> {
483        self.ensure_file_open(file_path, &Config::default())
484    }
485
486    /// Notify relevant LSP servers that a file has been written/changed.
487    /// This is the main hook called after every file write in AFT.
488    ///
489    /// If the file's server isn't running yet, starts it (lazy spawn).
490    /// If the file isn't open in LSP yet, sends didOpen. Otherwise sends didChange.
491    pub fn notify_file_changed(
492        &mut self,
493        file_path: &Path,
494        content: &str,
495        config: &Config,
496    ) -> Result<(), LspError> {
497        self.notify_file_changed_versioned(file_path, content, config)
498            .map(|_| ())
499    }
500
501    /// Like `notify_file_changed`, but returns the target document version
502    /// per server so the post-edit waiter can match `publishDiagnostics`
503    /// against the exact version that this notification carried.
504    ///
505    /// Returns: `Vec<(ServerKey, target_version)>`. `target_version` is the
506    /// `version` field on the `VersionedTextDocumentIdentifier` we just sent
507    /// (post-bump). For freshly-opened documents (`didOpen`) the version is
508    /// `0`. Servers that don't honor versioned text document sync will not
509    /// echo this back on `publishDiagnostics`; the caller is expected to
510    /// fall back to the epoch-delta path for those.
511    pub fn notify_file_changed_versioned(
512        &mut self,
513        file_path: &Path,
514        content: &str,
515        config: &Config,
516    ) -> Result<Vec<(ServerKey, i32)>, LspError> {
517        let canonical_path = canonicalize_for_lsp(file_path)?;
518        let server_keys = self.ensure_server_for_file(&canonical_path, config);
519        if server_keys.is_empty() {
520            return Ok(Vec::new());
521        }
522
523        let uri = uri_for_path(&canonical_path)?;
524        let language_id = language_id_for_extension(
525            canonical_path
526                .extension()
527                .and_then(|ext| ext.to_str())
528                .unwrap_or_default(),
529        )
530        .to_string();
531
532        let mut versions: Vec<(ServerKey, i32)> = Vec::with_capacity(server_keys.len());
533
534        for key in server_keys {
535            let current_version = self
536                .documents
537                .get(&key)
538                .and_then(|store| store.version(&canonical_path));
539
540            if let Some(version) = current_version {
541                let next_version = version + 1;
542                if let Some(client) = self.clients.get_mut(&key) {
543                    client.send_notification::<DidChangeTextDocument>(
544                        DidChangeTextDocumentParams {
545                            text_document: VersionedTextDocumentIdentifier::new(
546                                uri.clone(),
547                                next_version,
548                            ),
549                            content_changes: vec![TextDocumentContentChangeEvent {
550                                range: None,
551                                range_length: None,
552                                text: content.to_string(),
553                            }],
554                        },
555                    )?;
556                }
557                if let Some(store) = self.documents.get_mut(&key) {
558                    store.bump_version(&canonical_path);
559                }
560                versions.push((key, next_version));
561                continue;
562            }
563
564            if let Some(client) = self.clients.get_mut(&key) {
565                client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
566                    text_document: TextDocumentItem::new(
567                        uri.clone(),
568                        language_id.clone(),
569                        0,
570                        content.to_string(),
571                    ),
572                })?;
573            }
574            self.documents
575                .entry(key.clone())
576                .or_default()
577                .open(canonical_path.clone());
578            // didOpen carries version 0 — that's the version the server
579            // will echo on its first publishDiagnostics for this document.
580            versions.push((key, 0));
581        }
582
583        Ok(versions)
584    }
585
586    pub fn notify_file_changed_default(
587        &mut self,
588        file_path: &Path,
589        content: &str,
590    ) -> Result<(), LspError> {
591        self.notify_file_changed(file_path, content, &Config::default())
592    }
593
594    /// Notify every active server whose workspace contains at least one changed
595    /// path that watched files changed. This is intentionally workspace-scoped
596    /// rather than extension-scoped: configuration edits such as `package.json`
597    /// or `tsconfig.json` affect a server's project graph even though those
598    /// files may not be documents handled by the server itself.
599    pub fn notify_files_watched_changed(
600        &mut self,
601        paths: &[(PathBuf, FileChangeType)],
602        _config: &Config,
603    ) -> Result<(), LspError> {
604        if paths.is_empty() {
605            return Ok(());
606        }
607
608        let mut canonical_events = Vec::with_capacity(paths.len());
609        for (path, typ) in paths {
610            let canonical_path = resolve_for_lsp_uri(path);
611            canonical_events.push((canonical_path, *typ));
612        }
613
614        let keys: Vec<ServerKey> = self.clients.keys().cloned().collect();
615        for key in keys {
616            let mut changes = Vec::new();
617            for (path, typ) in &canonical_events {
618                if !path.starts_with(&key.root) {
619                    continue;
620                }
621                changes.push(FileEvent::new(uri_for_path(path)?, *typ));
622            }
623
624            if changes.is_empty() {
625                continue;
626            }
627
628            if let Some(client) = self.clients.get_mut(&key) {
629                // Send when the server either advertised initialize-time
630                // watched-file support or dynamically registered a watcher.
631                // The dynamic client capability we send during initialize only
632                // permits runtime registration; it is tracked separately via
633                // `has_watched_file_registration()`.
634                let supports_static_watched_files = client.supports_watched_files();
635                let has_dynamic_registration = client.has_watched_file_registration();
636                if !(supports_static_watched_files || has_dynamic_registration) {
637                    if self.watched_file_skip_logged.insert(key.clone()) {
638                        log::debug!(
639                            "skipping didChangeWatchedFiles for {:?} (not supported or registered)",
640                            key
641                        );
642                    }
643                    continue;
644                }
645                client.send_notification::<DidChangeWatchedFiles>(DidChangeWatchedFilesParams {
646                    changes,
647                })?;
648            }
649        }
650
651        Ok(())
652    }
653
654    /// Close a document in all servers that have it open.
655    pub fn notify_file_closed(&mut self, file_path: &Path) -> Result<(), LspError> {
656        let canonical_path = canonicalize_for_lsp(file_path)?;
657        let uri = uri_for_path(&canonical_path)?;
658        let keys: Vec<ServerKey> = self.documents.keys().cloned().collect();
659
660        for key in keys {
661            let was_open = self
662                .documents
663                .get(&key)
664                .map(|store| store.is_open(&canonical_path))
665                .unwrap_or(false);
666            if !was_open {
667                continue;
668            }
669
670            if let Some(client) = self.clients.get_mut(&key) {
671                client.send_notification::<DidCloseTextDocument>(DidCloseTextDocumentParams {
672                    text_document: TextDocumentIdentifier::new(uri.clone()),
673                })?;
674            }
675
676            if let Some(store) = self.documents.get_mut(&key) {
677                store.close(&canonical_path);
678            }
679            self.diagnostics
680                .clear_for_server_file(&key, &canonical_path);
681        }
682
683        Ok(())
684    }
685
686    /// Get an active client for a file path, if one exists.
687    pub fn client_for_file(&self, file_path: &Path, config: &Config) -> Option<&LspClient> {
688        let key = self.server_key_for_file(file_path, config)?;
689        self.clients.get(&key)
690    }
691
692    pub fn client_for_file_default(&self, file_path: &Path) -> Option<&LspClient> {
693        self.client_for_file(file_path, &Config::default())
694    }
695
696    /// Get a mutable active client for a file path, if one exists.
697    pub fn client_for_file_mut(
698        &mut self,
699        file_path: &Path,
700        config: &Config,
701    ) -> Option<&mut LspClient> {
702        let key = self.server_key_for_file(file_path, config)?;
703        self.clients.get_mut(&key)
704    }
705
706    pub fn client_for_file_mut_default(&mut self, file_path: &Path) -> Option<&mut LspClient> {
707        self.client_for_file_mut(file_path, &Config::default())
708    }
709
710    /// Number of tracked server clients.
711    pub fn active_client_count(&self) -> usize {
712        self.clients.len()
713    }
714
715    /// Drain all pending LSP events. Call from the main loop.
716    pub fn drain_events(&mut self) -> Vec<LspEvent> {
717        let mut events = Vec::new();
718        while let Ok(event) = self.event_rx.try_recv() {
719            self.handle_event(&event);
720            events.push(event);
721        }
722        events
723    }
724
725    /// Wait for diagnostics to arrive for a specific file until a timeout expires.
726    pub fn wait_for_diagnostics(
727        &mut self,
728        file_path: &Path,
729        config: &Config,
730        timeout: std::time::Duration,
731    ) -> Vec<StoredDiagnostic> {
732        let deadline = std::time::Instant::now() + timeout;
733        self.wait_for_file_diagnostics(file_path, config, deadline)
734    }
735
736    pub fn wait_for_diagnostics_default(
737        &mut self,
738        file_path: &Path,
739        timeout: std::time::Duration,
740    ) -> Vec<StoredDiagnostic> {
741        self.wait_for_diagnostics(file_path, &Config::default(), timeout)
742    }
743
744    /// Test-only accessor for the diagnostics store. Used by integration
745    /// tests that need to inspect per-server entries (e.g., to verify that
746    /// `ServerKey::root` is populated correctly, not the empty path that
747    /// the legacy `publish_with_kind` path produced).
748    #[doc(hidden)]
749    pub fn diagnostics_store_for_test(&self) -> &DiagnosticsStore {
750        &self.diagnostics
751    }
752
753    #[doc(hidden)]
754    #[cfg(test)]
755    pub fn diagnostics_store_mut_for_test(&mut self) -> &mut DiagnosticsStore {
756        &mut self.diagnostics
757    }
758
759    /// Error/warning counts across the entire warm diagnostics set (all files
760    /// any server has published for this session). Powers the agent status bar;
761    /// reads the continuously-drained store with no extra LSP round-trip.
762    pub fn warm_error_warning_counts(&self) -> (usize, usize) {
763        self.diagnostics.error_warning_counts()
764    }
765
766    /// Snapshot the current per-server epoch for every entry that exists
767    /// for `file_path`. Servers without an entry yet (never published)
768    /// are absent from the map; for those, `pre = 0` (any first publish
769    /// will be considered fresh under the epoch-fallback rule).
770    pub fn snapshot_diagnostic_epochs(&self, file_path: &Path) -> HashMap<ServerKey, u64> {
771        let lookup_path = normalize_lookup_path(file_path);
772        self.diagnostics
773            .entries_for_file(&lookup_path)
774            .into_iter()
775            .map(|(key, entry)| (key.clone(), entry.epoch))
776            .collect()
777    }
778
779    /// Snapshot the current diagnostic epoch and document version for every
780    /// active server relevant to `file_path` before a post-edit notification.
781    pub fn snapshot_pre_edit_state(&self, file_path: &Path) -> HashMap<ServerKey, PreEditSnapshot> {
782        let lookup_path = normalize_lookup_path(file_path);
783        let mut snapshots: HashMap<ServerKey, PreEditSnapshot> = self
784            .diagnostics
785            .entries_for_file(&lookup_path)
786            .into_iter()
787            .map(|(key, entry)| {
788                (
789                    key.clone(),
790                    PreEditSnapshot {
791                        epoch: entry.epoch,
792                        document_version_at_capture: None,
793                    },
794                )
795            })
796            .collect();
797
798        for (key, store) in &self.documents {
799            if let Some(version) = store.version(&lookup_path) {
800                snapshots
801                    .entry(key.clone())
802                    .or_default()
803                    .document_version_at_capture = Some(version);
804            }
805        }
806
807        snapshots
808    }
809
810    /// True when the current diagnostic entry for `server_key` can be tied to
811    /// that server's current in-memory document version for `file_path`.
812    ///
813    /// File-mode `lsp_diagnostics` uses this for push-only fallback after it
814    /// has synced/opened the document. Versioned publishes are accepted when
815    /// they match the current document version; unversioned publishes are not
816    /// accepted as fresh because epoch/wall-clock ordering alone is racy.
817    pub fn diagnostic_entry_is_fresh_for_document(
818        &self,
819        file_path: &Path,
820        server_key: &ServerKey,
821        pre: PreEditSnapshot,
822    ) -> bool {
823        let lookup_path = normalize_lookup_path(file_path);
824        let Some(entry) = self
825            .diagnostics
826            .entries_for_file(&lookup_path)
827            .into_iter()
828            .find_map(|(key, entry)| if key == server_key { Some(entry) } else { None })
829        else {
830            return false;
831        };
832
833        let target_version = self
834            .documents
835            .get(server_key)
836            .and_then(|store| store.version(&lookup_path))
837            .or(pre.document_version_at_capture)
838            .unwrap_or(0);
839
840        matches!(entry.version, Some(version) if version >= target_version)
841    }
842
843    /// Wait for FRESH per-server diagnostics that match the just-sent
844    /// document version. This is the v0.17.3 post-edit path that fixes the
845    /// stale-diagnostics bug: instead of returning whatever is in the cache
846    /// when the deadline hits, we only return entries whose `version`
847    /// matches the post-edit target version (or, for servers that don't
848    /// participate in versioned sync, whose `epoch` was bumped after the
849    /// pre-edit snapshot).
850    ///
851    /// `expected_versions` should come from `notify_file_changed_versioned`
852    /// — one `(ServerKey, target_version)` per server we sent didChange/
853    /// didOpen to.
854    ///
855    /// `pre_snapshot` is the per-server epoch BEFORE the notification was
856    /// sent; it gates the epoch-fallback path so an old-version publish
857    /// arriving after `drain_events` and before `didChange` cannot be
858    /// mistaken for a fresh response.
859    ///
860    /// Returns a per-server tri-state: `Fresh` (publish matched target
861    /// version OR epoch advanced past snapshot for an unversioned server),
862    /// `Pending` (deadline hit before this server published anything we
863    /// could verify), or `Exited` (server died between notification and
864    /// deadline).
865    pub fn wait_for_post_edit_diagnostics(
866        &mut self,
867        file_path: &Path,
868        // `config` is intentionally accepted (matches sibling wait APIs and
869        // future-proofs us if freshness rules need it). Currently unused
870        // because expected_versions/pre_snapshot fully determine behavior.
871        _config: &Config,
872        expected_versions: &[(ServerKey, i32)],
873        pre_snapshot: &HashMap<ServerKey, PreEditSnapshot>,
874        timeout: std::time::Duration,
875    ) -> PostEditWaitOutcome {
876        let lookup_path = normalize_lookup_path(file_path);
877        let deadline = std::time::Instant::now() + timeout;
878
879        // Drain any events that arrived while we were sending didChange.
880        // The publishDiagnostics handler stores the version, so even
881        // pre-snapshot publishes that landed late won't be mistaken for
882        // fresh — the version-match check will reject them.
883        let _ = self.drain_events_for_file(&lookup_path);
884
885        let mut fresh: HashMap<ServerKey, Vec<StoredDiagnostic>> = HashMap::new();
886        let mut exited: Vec<ServerKey> = Vec::new();
887
888        loop {
889            // Check freshness for every expected server. A server is fresh
890            // if its current entry for this file satisfies either:
891            //   1. version-match: entry.version == Some(target_version), OR
892            //   2. push-only freshness: entry.version is None AND entry.epoch
893            //      advanced strictly after the pre-edit snapshot. Versioned
894            //      publishes must be >= the post-edit target version.
895            // Servers whose process has exited are reported separately.
896            for (key, target_version) in expected_versions {
897                if fresh.contains_key(key) || exited.contains(key) {
898                    continue;
899                }
900                if !self.clients.contains_key(key) {
901                    exited.push(key.clone());
902                    continue;
903                }
904                if let Some(entry) = self
905                    .diagnostics
906                    .entries_for_file(&lookup_path)
907                    .into_iter()
908                    .find_map(|(k, e)| if k == key { Some(e) } else { None })
909                {
910                    let pre = pre_snapshot.get(key).copied().unwrap_or_default();
911                    let is_fresh = post_edit_entry_is_fresh(entry, *target_version, pre);
912                    if is_fresh {
913                        fresh.insert(key.clone(), entry.diagnostics.clone());
914                    }
915                }
916            }
917
918            // All accounted for? Done.
919            if fresh.len() + exited.len() == expected_versions.len() {
920                break;
921            }
922
923            let now = std::time::Instant::now();
924            if now >= deadline {
925                break;
926            }
927
928            let timeout = deadline.saturating_duration_since(now);
929            match self.event_rx.recv_timeout(timeout) {
930                Ok(event) => {
931                    self.handle_event(&event);
932                }
933                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
934            }
935        }
936
937        // Pending = expected but neither fresh nor exited.
938        let pending: Vec<ServerKey> = expected_versions
939            .iter()
940            .filter(|(k, _)| !fresh.contains_key(k) && !exited.contains(k))
941            .map(|(k, _)| k.clone())
942            .collect();
943
944        // Build deduplicated, sorted diagnostics from the fresh servers only.
945        // Stale or pending servers contribute zero diagnostics.
946        let mut diagnostics: Vec<StoredDiagnostic> = fresh
947            .into_iter()
948            .flat_map(|(_, diags)| diags.into_iter())
949            .collect();
950        diagnostics.sort_by(|a, b| {
951            a.file
952                .cmp(&b.file)
953                .then(a.line.cmp(&b.line))
954                .then(a.column.cmp(&b.column))
955                .then(a.message.cmp(&b.message))
956        });
957
958        PostEditWaitOutcome {
959            diagnostics,
960            pending_servers: pending,
961            exited_servers: exited,
962        }
963    }
964
965    /// Wait for diagnostics to arrive for a specific file until a deadline.
966    ///
967    /// Drains already-queued events first, then blocks on the shared event
968    /// channel only until either `publishDiagnostics` arrives for this file or
969    /// the deadline is reached.
970    pub fn wait_for_file_diagnostics(
971        &mut self,
972        file_path: &Path,
973        config: &Config,
974        deadline: std::time::Instant,
975    ) -> Vec<StoredDiagnostic> {
976        let lookup_path = normalize_lookup_path(file_path);
977
978        if self.server_key_for_file(&lookup_path, config).is_none() {
979            return Vec::new();
980        }
981
982        loop {
983            if self.drain_events_for_file(&lookup_path) {
984                break;
985            }
986
987            let now = std::time::Instant::now();
988            if now >= deadline {
989                break;
990            }
991
992            let timeout = deadline.saturating_duration_since(now);
993            match self.event_rx.recv_timeout(timeout) {
994                Ok(event) => {
995                    if matches!(
996                        self.handle_event(&event),
997                        Some(ref published_file) if published_file.as_path() == lookup_path.as_path()
998                    ) {
999                        break;
1000                    }
1001                }
1002                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
1003            }
1004        }
1005
1006        self.get_diagnostics_for_file(&lookup_path)
1007            .into_iter()
1008            .cloned()
1009            .collect()
1010    }
1011
1012    /// Default timeout for `textDocument/diagnostic` (per-file pull). Servers
1013    /// usually respond in under 1s for files they've already analyzed; we
1014    /// allow up to 10s before falling back to push semantics. Currently
1015    /// surfaced via [`Self::pull_file_timeout`] for callers that want to
1016    /// override the wait via the `wait_ms` knob.
1017    pub const PULL_FILE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
1018
1019    /// Public accessor so command handlers can reuse the documented default.
1020    pub fn pull_file_timeout() -> std::time::Duration {
1021        Self::PULL_FILE_TIMEOUT
1022    }
1023
1024    /// Default timeout for `workspace/diagnostic`. The LSP spec allows the
1025    /// server to hold this open indefinitely; we cap at 10s and report
1026    /// `complete: false` to the agent rather than hanging the bridge.
1027    const PULL_WORKSPACE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
1028
1029    /// Issue a `textDocument/diagnostic` (LSP 3.17 per-file pull) request to
1030    /// every server that supports pull diagnostics for the given file.
1031    ///
1032    /// Returns the per-server outcome. If a server reports `kind: "unchanged"`,
1033    /// the cached entry's diagnostics are surfaced (deterministic re-use of
1034    /// the previous response). If a server doesn't advertise pull capability,
1035    /// it's skipped here — the caller should fall back to push for those.
1036    ///
1037    /// Side effects: results are stored in `DiagnosticsStore` so directory-mode
1038    /// queries can aggregate them later.
1039    pub fn pull_file_diagnostics(
1040        &mut self,
1041        file_path: &Path,
1042        config: &Config,
1043    ) -> Result<Vec<PullFileResult>, LspError> {
1044        let canonical_path = canonicalize_for_lsp(file_path)?;
1045        // Make sure servers are running and the document is open with fresh
1046        // content (handles disk-drift via DocumentStore::is_stale_on_disk).
1047        self.ensure_file_open(&canonical_path, config)?;
1048
1049        let server_keys = self.ensure_server_for_file(&canonical_path, config);
1050        if server_keys.is_empty() {
1051            return Ok(Vec::new());
1052        }
1053
1054        let uri = uri_for_path(&canonical_path)?;
1055        let mut results = Vec::with_capacity(server_keys.len());
1056
1057        for key in server_keys {
1058            let supports_pull = self
1059                .clients
1060                .get(&key)
1061                .and_then(|c| c.diagnostic_capabilities())
1062                .is_some_and(|caps| caps.pull_diagnostics);
1063
1064            if !supports_pull {
1065                results.push(PullFileResult {
1066                    server_key: key.clone(),
1067                    outcome: PullFileOutcome::PullNotSupported,
1068                });
1069                continue;
1070            }
1071
1072            // Look up previous resultId for incremental requests.
1073            let previous_result_id = self
1074                .diagnostics
1075                .entries_for_file(&canonical_path)
1076                .into_iter()
1077                .find(|(k, _)| **k == key)
1078                .and_then(|(_, entry)| entry.result_id.clone());
1079
1080            let identifier = self
1081                .clients
1082                .get(&key)
1083                .and_then(|c| c.diagnostic_capabilities())
1084                .and_then(|caps| caps.identifier.clone());
1085
1086            let params = AftDocumentDiagnosticParams {
1087                text_document: lsp_types::TextDocumentIdentifier { uri: uri.clone() },
1088                identifier,
1089                previous_result_id,
1090                work_done_progress_params: Default::default(),
1091                partial_result_params: Default::default(),
1092            };
1093
1094            let outcome = match self.send_pull_request(&key, params) {
1095                Ok(report) => self.ingest_document_report(&key, &canonical_path, report),
1096                Err(err) => {
1097                    if let Some(result) = self.cache_post_initialize_exit(&key, &err) {
1098                        PullFileOutcome::RequestFailed {
1099                            reason: server_attempt_result_reason(&result),
1100                        }
1101                    } else if recoverable_pull_rejection(&err)
1102                        && self.clients.get(&key).is_some_and(|client| {
1103                            matches!(
1104                                client.state(),
1105                                ServerState::Ready | ServerState::Initializing
1106                            )
1107                        })
1108                    {
1109                        PullFileOutcome::RequestFailed {
1110                            reason: format!("pull_rejected_push_fallback: {err}"),
1111                        }
1112                    } else {
1113                        PullFileOutcome::RequestFailed {
1114                            reason: err.to_string(),
1115                        }
1116                    }
1117                }
1118            };
1119
1120            results.push(PullFileResult {
1121                server_key: key,
1122                outcome,
1123            });
1124        }
1125
1126        Ok(results)
1127    }
1128
1129    /// Issue a `workspace/diagnostic` request to a specific server. Cancels
1130    /// internally if `timeout` elapses before the server responds. Cached
1131    /// entries from the response are stored so directory-mode queries pick
1132    /// them up.
1133    pub fn pull_workspace_diagnostics(
1134        &mut self,
1135        server_key: &ServerKey,
1136        timeout: Option<std::time::Duration>,
1137    ) -> Result<PullWorkspaceResult, LspError> {
1138        let timeout = timeout.unwrap_or(Self::PULL_WORKSPACE_TIMEOUT);
1139
1140        let supports_workspace = self
1141            .clients
1142            .get(server_key)
1143            .and_then(|c| c.diagnostic_capabilities())
1144            .is_some_and(|caps| caps.workspace_diagnostics);
1145
1146        if !supports_workspace {
1147            return Ok(PullWorkspaceResult {
1148                server_key: server_key.clone(),
1149                files_reported: Vec::new(),
1150                complete: false,
1151                cancelled: false,
1152                supports_workspace: false,
1153            });
1154        }
1155
1156        let identifier = self
1157            .clients
1158            .get(server_key)
1159            .and_then(|c| c.diagnostic_capabilities())
1160            .and_then(|caps| caps.identifier.clone());
1161
1162        let params = AftWorkspaceDiagnosticParams {
1163            identifier,
1164            previous_result_ids: Vec::new(),
1165            work_done_progress_params: Default::default(),
1166            partial_result_params: Default::default(),
1167        };
1168
1169        let result = match self
1170            .clients
1171            .get_mut(server_key)
1172            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?
1173            .send_request_with_timeout::<AftWorkspaceDiagnosticRequest>(params, timeout)
1174        {
1175            Ok(result) => result,
1176            Err(LspError::Timeout(_)) => {
1177                return Ok(PullWorkspaceResult {
1178                    server_key: server_key.clone(),
1179                    files_reported: Vec::new(),
1180                    complete: false,
1181                    cancelled: true,
1182                    supports_workspace: true,
1183                });
1184            }
1185            Err(err) => {
1186                if let Some(result) = self.cache_post_initialize_exit(server_key, &err) {
1187                    return Err(LspError::ServerNotReady(server_attempt_result_reason(
1188                        &result,
1189                    )));
1190                }
1191                return Err(err);
1192            }
1193        };
1194
1195        // Extract the items list. Partial responses are not a complete
1196        // workspace view, but the partial payload can still contain useful
1197        // document reports; ingest those while surfacing complete=false.
1198        let (items, complete) = match result {
1199            lsp_types::WorkspaceDiagnosticReportResult::Report(report) => (report.items, true),
1200            lsp_types::WorkspaceDiagnosticReportResult::Partial(partial) => (partial.items, false),
1201        };
1202
1203        // Ingest each file report into the diagnostics store.
1204        let mut files_reported = Vec::with_capacity(items.len());
1205        for item in items {
1206            match item {
1207                lsp_types::WorkspaceDocumentDiagnosticReport::Full(full) => {
1208                    if let Some(file) = uri_to_path(&full.uri) {
1209                        let stored = from_lsp_diagnostics(
1210                            file.clone(),
1211                            full.full_document_diagnostic_report.items.clone(),
1212                        );
1213                        self.diagnostics.publish_with_result_id(
1214                            server_key.clone(),
1215                            file.clone(),
1216                            stored,
1217                            full.full_document_diagnostic_report.result_id.clone(),
1218                        );
1219                        files_reported.push(file);
1220                    }
1221                }
1222                lsp_types::WorkspaceDocumentDiagnosticReport::Unchanged(_unchanged) => {
1223                    // "Unchanged" means the previously cached report is still
1224                    // valid. We left it in place; nothing to do.
1225                }
1226            }
1227        }
1228
1229        Ok(PullWorkspaceResult {
1230            server_key: server_key.clone(),
1231            files_reported,
1232            complete,
1233            cancelled: false,
1234            supports_workspace: true,
1235        })
1236    }
1237
1238    fn cache_post_initialize_exit(
1239        &mut self,
1240        key: &ServerKey,
1241        err: &LspError,
1242    ) -> Option<ServerAttemptResult> {
1243        let binary = self
1244            .server_binaries
1245            .get(key)
1246            .cloned()
1247            .unwrap_or_else(|| key.kind.id_str().to_string());
1248        let (status, stderr_tail) = {
1249            let client = self.clients.get_mut(key)?;
1250            let mut status = client.child_exit_status();
1251            for _ in 0..10 {
1252                if status.is_some() {
1253                    break;
1254                }
1255                std::thread::sleep(std::time::Duration::from_millis(10));
1256                status = client.child_exit_status();
1257            }
1258            let status = status?;
1259            wait_for_stderr_tail(client);
1260            (status, client.stderr_tail())
1261        };
1262        let reason = format_post_initialize_exit_reason(&binary, status, &stderr_tail, err);
1263        let result = ServerAttemptResult::SpawnFailed { binary, reason };
1264        self.clients.remove(key);
1265        self.server_binaries.remove(key);
1266        self.documents.remove(key);
1267        self.diagnostics.clear_for_server(key);
1268        self.failed_spawns.insert(key.clone(), result.clone());
1269        Some(result)
1270    }
1271
1272    /// Issue the per-file diagnostic request and return the report.
1273    fn send_pull_request(
1274        &mut self,
1275        key: &ServerKey,
1276        params: AftDocumentDiagnosticParams,
1277    ) -> Result<lsp_types::DocumentDiagnosticReportResult, LspError> {
1278        let client = self
1279            .clients
1280            .get_mut(key)
1281            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?;
1282        client.send_request::<AftDocumentDiagnosticRequest>(params)
1283    }
1284
1285    /// Store the result of a per-file pull request and return a structured
1286    /// outcome the caller can inspect.
1287    fn ingest_document_report(
1288        &mut self,
1289        key: &ServerKey,
1290        canonical_path: &Path,
1291        result: lsp_types::DocumentDiagnosticReportResult,
1292    ) -> PullFileOutcome {
1293        let report = match result {
1294            lsp_types::DocumentDiagnosticReportResult::Report(report) => report,
1295            lsp_types::DocumentDiagnosticReportResult::Partial(_) => {
1296                // Partial results stream in via $/progress notifications which
1297                // we don't currently subscribe to. Treat as a soft-empty
1298                // success — the next pull will get the full version.
1299                return PullFileOutcome::PartialNotSupported;
1300            }
1301        };
1302
1303        match report {
1304            lsp_types::DocumentDiagnosticReport::Full(full) => {
1305                let result_id = full.full_document_diagnostic_report.result_id.clone();
1306                let stored = from_lsp_diagnostics(
1307                    canonical_path.to_path_buf(),
1308                    full.full_document_diagnostic_report.items.clone(),
1309                );
1310                let count = stored.len();
1311                self.diagnostics.publish_with_result_id(
1312                    key.clone(),
1313                    canonical_path.to_path_buf(),
1314                    stored,
1315                    result_id,
1316                );
1317                PullFileOutcome::Full {
1318                    diagnostic_count: count,
1319                }
1320            }
1321            lsp_types::DocumentDiagnosticReport::Unchanged(_unchanged) => {
1322                // The server says cache is still valid. That is only usable if
1323                // we already have a report for this exact server/file; an
1324                // initial `unchanged` response cannot prove freshness.
1325                if self
1326                    .diagnostics
1327                    .has_report_for_server_file(key, canonical_path)
1328                {
1329                    PullFileOutcome::Unchanged
1330                } else {
1331                    PullFileOutcome::RequestFailed {
1332                        reason: "no_cache_for_unchanged".to_string(),
1333                    }
1334                }
1335            }
1336        }
1337    }
1338
1339    /// Shutdown all servers gracefully.
1340    pub fn shutdown_all(&mut self) {
1341        for (key, mut client) in self.clients.drain() {
1342            if let Err(err) = client.shutdown() {
1343                slog_error!("error shutting down {:?}: {}", key, err);
1344            }
1345        }
1346        self.server_binaries.clear();
1347        self.documents.clear();
1348        self.diagnostics = DiagnosticsStore::new();
1349    }
1350
1351    /// Check if any server is active.
1352    pub fn has_active_servers(&self) -> bool {
1353        self.clients
1354            .values()
1355            .any(|client| client.state() == ServerState::Ready)
1356    }
1357
1358    /// Active server keys (running clients). Used by `lsp_diagnostics`
1359    /// directory mode to know which servers to ask for workspace pull.
1360    pub fn active_server_keys(&self) -> Vec<ServerKey> {
1361        self.clients.keys().cloned().collect()
1362    }
1363
1364    pub fn get_diagnostics_for_file(&self, file: &Path) -> Vec<&StoredDiagnostic> {
1365        let normalized = normalize_lookup_path(file);
1366        self.diagnostics.for_file(&normalized)
1367    }
1368
1369    /// Drop all cached diagnostics for a file across every server. Called when a
1370    /// file is deleted/renamed away so its diagnostics don't linger in the warm
1371    /// set (no server republishes for a vanished path), inflating the
1372    /// error/warning counts in the status bar and `aft_inspect`.
1373    ///
1374    /// The store key is the canonical path from publish time, but a deleted file
1375    /// can no longer be canonicalized directly (`canonicalize` needs the file to
1376    /// exist). We therefore try several equivalent forms: the raw path, the
1377    /// canonicalize-or-fallback form, and — crucially — a reconstruction that
1378    /// canonicalizes the still-present parent directory and rejoins the file
1379    /// name, which reproduces the publish-time key even across `/var`↔
1380    /// `/private/var`-style symlink aliasing. Returns true if anything was
1381    /// removed.
1382    pub fn clear_diagnostics_for_file(&mut self, file: &Path) -> bool {
1383        let mut removed = self.diagnostics.clear_for_file(file);
1384
1385        let normalized = normalize_lookup_path(file);
1386        if normalized != file {
1387            removed |= self.diagnostics.clear_for_file(&normalized);
1388        }
1389
1390        // Reconstruct the canonical key via the parent dir (which still exists
1391        // for a just-deleted file) so symlink-aliased roots still match.
1392        if let (Some(parent), Some(name)) = (file.parent(), file.file_name()) {
1393            if let Ok(canonical_parent) = std::fs::canonicalize(parent) {
1394                let reconstructed = canonical_parent.join(name);
1395                if reconstructed != file && reconstructed != normalized {
1396                    removed |= self.diagnostics.clear_for_file(&reconstructed);
1397                }
1398            }
1399        }
1400
1401        removed
1402    }
1403
1404    pub fn get_diagnostics_for_directory(&self, dir: &Path) -> Vec<&StoredDiagnostic> {
1405        let normalized = normalize_lookup_path(dir);
1406        self.diagnostics.for_directory(&normalized)
1407    }
1408
1409    pub fn get_all_diagnostics(&self) -> Vec<&StoredDiagnostic> {
1410        self.diagnostics.all()
1411    }
1412
1413    /// True if any LSP server has reported diagnostics at least once, including
1414    /// an empty report that proves a checked-clean file. This lets callers avoid
1415    /// treating an empty flattened diagnostic list as trustworthy when no server
1416    /// has actually run.
1417    pub fn has_any_diagnostic_reports(&self) -> bool {
1418        !self.diagnostics.is_empty()
1419    }
1420
1421    /// True if any server has reported for this file, including an empty
1422    /// checked-clean report.
1423    pub fn has_diagnostic_report_for_file(&self, file: &Path) -> bool {
1424        let normalized = normalize_lookup_path(file);
1425        self.diagnostics.has_any_report_for_file(&normalized)
1426    }
1427
1428    /// True if this exact server/file pair has a diagnostic report, including
1429    /// an empty checked-clean report.
1430    pub fn has_diagnostic_report_for_server_file(&self, server: &ServerKey, file: &Path) -> bool {
1431        let normalized = normalize_lookup_path(file);
1432        self.diagnostics
1433            .has_report_for_server_file(server, &normalized)
1434    }
1435
1436    fn drain_events_for_file(&mut self, file_path: &Path) -> bool {
1437        let mut saw_file_diagnostics = false;
1438        while let Ok(event) = self.event_rx.try_recv() {
1439            if matches!(
1440                self.handle_event(&event),
1441                Some(ref published_file) if published_file.as_path() == file_path
1442            ) {
1443                saw_file_diagnostics = true;
1444            }
1445        }
1446        saw_file_diagnostics
1447    }
1448
1449    fn handle_event(&mut self, event: &LspEvent) -> Option<PathBuf> {
1450        match event {
1451            LspEvent::Notification {
1452                server_kind,
1453                root,
1454                method,
1455                params: Some(params),
1456            } if method == "textDocument/publishDiagnostics" => {
1457                self.handle_publish_diagnostics(server_kind.clone(), root.clone(), params)
1458            }
1459            LspEvent::ServerExited { server_kind, root } => {
1460                let key = ServerKey {
1461                    kind: server_kind.clone(),
1462                    root: root.clone(),
1463                };
1464                self.clients.remove(&key);
1465                self.server_binaries.remove(&key);
1466                self.documents.remove(&key);
1467                self.diagnostics.clear_for_server(&key);
1468                None
1469            }
1470            _ => None,
1471        }
1472    }
1473
1474    fn handle_publish_diagnostics(
1475        &mut self,
1476        server: ServerKind,
1477        root: PathBuf,
1478        params: &serde_json::Value,
1479    ) -> Option<PathBuf> {
1480        if let Ok(publish_params) =
1481            serde_json::from_value::<lsp_types::PublishDiagnosticsParams>(params.clone())
1482        {
1483            let file = uri_to_path(&publish_params.uri)?;
1484            let stored = from_lsp_diagnostics(file.clone(), publish_params.diagnostics);
1485            // v0.17.3: store with real ServerKey { kind, root } and capture
1486            // the document `version` (when the server provided one) so the
1487            // post-edit waiter can reject stale publishes deterministically
1488            // via version-match (preferred) or epoch-delta (fallback). The
1489            // earlier `publish_with_kind` path silently dropped both.
1490            let key = ServerKey { kind: server, root };
1491            self.diagnostics
1492                .publish_full(key, file.clone(), stored, None, publish_params.version);
1493            return Some(file);
1494        }
1495        None
1496    }
1497
1498    fn spawn_server(
1499        &self,
1500        def: &ServerDef,
1501        root: &Path,
1502        config: &Config,
1503    ) -> Result<LspClient, LspError> {
1504        let binary = self.resolve_binary(def, config)?;
1505
1506        // Merge the server-defined env with our test-injected env.
1507        // `extra_env` is empty in production; tests use it to drive fake
1508        // server variants (AFT_FAKE_LSP_PULL=1, etc.).
1509        let mut merged_env = def.env.clone();
1510        for (key, value) in &self.extra_env {
1511            merged_env.insert(key.clone(), value.clone());
1512        }
1513
1514        let mut client = LspClient::spawn(
1515            def.kind.clone(),
1516            root.to_path_buf(),
1517            &binary,
1518            &def.args,
1519            &merged_env,
1520            self.event_tx.clone(),
1521            self.child_registry.clone(),
1522        )?;
1523        if let Err(err) = client.initialize(root, def.initialization_options.clone()) {
1524            wait_for_stderr_tail(&mut client);
1525            let stderr_tail = client.stderr_tail();
1526            let reason = if client.child_exited() || !stderr_tail.is_empty() {
1527                format_initialize_failure_reason(&def.binary, &stderr_tail, &err)
1528            } else {
1529                format!("server failed during initialize: {err}")
1530            };
1531            return Err(LspError::ServerNotReady(reason));
1532        }
1533        Ok(client)
1534    }
1535
1536    fn resolve_binary(&self, def: &ServerDef, config: &Config) -> Result<PathBuf, LspError> {
1537        if let Some(path) = self.binary_overrides.get(&def.kind) {
1538            if path.exists() {
1539                return Ok(path.clone());
1540            }
1541            return Err(LspError::NotFound(format!(
1542                "override binary for {:?} not found: {}",
1543                def.kind,
1544                path.display()
1545            )));
1546        }
1547
1548        if let Some(path) = env_binary_override(&def.kind) {
1549            if path.exists() {
1550                return Ok(path);
1551            }
1552            return Err(LspError::NotFound(format!(
1553                "environment override binary for {:?} not found: {}",
1554                def.kind,
1555                path.display()
1556            )));
1557        }
1558
1559        // Layered resolution:
1560        //   1. <project_root>/node_modules/.bin/<binary>
1561        //   2. config.lsp_paths_extra (plugin auto-install cache, etc.)
1562        //   3. PATH via `which`
1563        resolve_lsp_binary(
1564            &def.binary,
1565            config.project_root.as_deref(),
1566            &config.lsp_paths_extra,
1567        )
1568        .ok_or_else(|| {
1569            LspError::NotFound(format!(
1570                "language server binary '{}' not found in node_modules/.bin, lsp_paths_extra, or PATH",
1571                def.binary
1572            ))
1573        })
1574    }
1575
1576    fn server_key_for_file(&self, file_path: &Path, config: &Config) -> Option<ServerKey> {
1577        for def in servers_for_file(file_path, config) {
1578            let root = find_workspace_root(file_path, &def.root_markers)?;
1579            let key = ServerKey {
1580                kind: def.kind.clone(),
1581                root,
1582            };
1583            if self.clients.contains_key(&key) {
1584                return Some(key);
1585            }
1586        }
1587        None
1588    }
1589}
1590
1591impl Default for LspManager {
1592    fn default() -> Self {
1593        Self::new()
1594    }
1595}
1596
1597fn wait_for_stderr_tail(client: &mut LspClient) {
1598    for _ in 0..10 {
1599        if !client.stderr_tail().is_empty() {
1600            break;
1601        }
1602        std::thread::sleep(std::time::Duration::from_millis(10));
1603    }
1604}
1605
1606fn recoverable_pull_rejection(err: &LspError) -> bool {
1607    matches!(
1608        err,
1609        LspError::ServerError {
1610            code: -32601 | -32602,
1611            ..
1612        }
1613    )
1614}
1615
1616fn server_attempt_result_reason(result: &ServerAttemptResult) -> String {
1617    match result {
1618        ServerAttemptResult::SpawnFailed { binary, reason } => {
1619            format!("spawn_failed: {binary} ({reason})")
1620        }
1621        ServerAttemptResult::BinaryNotInstalled { binary } => {
1622            format!("binary_not_installed: {binary}")
1623        }
1624        ServerAttemptResult::NoRootMarker { looked_for } => {
1625            format!("no_root_marker (looked for: {})", looked_for.join(", "))
1626        }
1627        ServerAttemptResult::Ok { .. } => "ok".to_string(),
1628    }
1629}
1630
1631fn format_stderr_tail_for_reason(stderr_tail: &str) -> String {
1632    truncate_stderr_tail_for_reason(stderr_tail)
1633        .lines()
1634        .map(|line| format!("  {line}"))
1635        .collect::<Vec<_>>()
1636        .join("\n")
1637}
1638
1639fn truncate_stderr_tail_for_reason(stderr_tail: &str) -> String {
1640    if stderr_tail.len() <= STDERR_REASON_BYTES {
1641        return stderr_tail.to_string();
1642    }
1643
1644    let ellipsis = "...";
1645    let target_len = STDERR_REASON_BYTES.saturating_sub(ellipsis.len());
1646    let mut start = stderr_tail.len() - target_len;
1647    while start < stderr_tail.len() && !stderr_tail.is_char_boundary(start) {
1648        start += 1;
1649    }
1650    format!("{ellipsis}{}", &stderr_tail[start..])
1651}
1652
1653fn format_initialize_failure_reason(binary: &str, stderr_tail: &str, err: &LspError) -> String {
1654    let mut reason = format!("server crashed during initialize: {err}");
1655    if !stderr_tail.is_empty() {
1656        reason.push_str("; stderr (last 64 lines):\n");
1657        reason.push_str(&format_stderr_tail_for_reason(stderr_tail));
1658        reason.push_str("\n\n");
1659        reason.push_str(&failure_hint(binary, stderr_tail));
1660    }
1661    reason
1662}
1663
1664fn format_post_initialize_exit_reason(
1665    binary: &str,
1666    status: std::process::ExitStatus,
1667    stderr_tail: &str,
1668    err: &LspError,
1669) -> String {
1670    let code = status
1671        .code()
1672        .map(|c| c.to_string())
1673        .unwrap_or_else(|| "signal/unknown".to_string());
1674    let mut reason = format!("server exited after initialize (code {code}): {err}");
1675    if !stderr_tail.is_empty() {
1676        reason.push_str("; stderr (last 64 lines):\n");
1677        reason.push_str(&format_stderr_tail_for_reason(stderr_tail));
1678        reason.push_str("\n\n");
1679        reason.push_str(&failure_hint(binary, stderr_tail));
1680    }
1681    reason
1682}
1683
1684fn failure_hint(binary: &str, stderr_tail: &str) -> String {
1685    if stderr_tail.contains("MODULE_NOT_FOUND") || stderr_tail.contains("Cannot find module") {
1686        let package_manager = infer_package_manager(stderr_tail);
1687        format!(
1688            "Your package-manager shim resolves to a missing file. Try reinstalling: {package_manager} install -g {binary} --force. Common cause: hard-link breakage from fs migration or store prune."
1689        )
1690    } else if let Some(component) = rustup_missing_component(stderr_tail) {
1691        // The binary on PATH is rustup's proxy shim, but the toolchain
1692        // component isn't installed, so rustup rejects the dispatch with
1693        // "Unknown binary '<name>' in ... toolchain". The actionable fix is to
1694        // add the component, not anything about the binary itself.
1695        format!("'{component}' is a rustup proxy but the component is not installed. Install it: rustup component add {component}")
1696    } else {
1697        format!("Hint: see stderr above for '{binary}' failure details.")
1698    }
1699}
1700
1701/// Detect the rustup "proxy shim without installed component" failure and
1702/// return the component name to add. rustup prints
1703/// `error: Unknown binary '<name>' in official toolchain '<triple>'` when a
1704/// `~/.cargo/bin/<name>` proxy is on PATH but the component was never installed
1705/// (the canonical case is `rust-analyzer`, which ships as an opt-in component).
1706fn rustup_missing_component(stderr_tail: &str) -> Option<String> {
1707    let marker = "Unknown binary '";
1708    let start = stderr_tail.find(marker)? + marker.len();
1709    let rest = &stderr_tail[start..];
1710    let end = rest.find('\'')?;
1711    let name = &rest[..end];
1712    // Only treat it as a rustup-component issue when the toolchain phrasing is
1713    // present, so an unrelated "Unknown binary" message doesn't mislead.
1714    if name.is_empty() || !stderr_tail.contains("toolchain") {
1715        return None;
1716    }
1717    Some(name.to_string())
1718}
1719
1720fn infer_package_manager(stderr_tail: &str) -> &'static str {
1721    let lower = stderr_tail.to_ascii_lowercase();
1722    if lower.contains(".pnpm/") || lower.contains(".pnpm\\") || lower.contains("/pnpm/") {
1723        "pnpm"
1724    } else if lower.contains(".yarn/")
1725        || lower.contains(".yarn\\")
1726        || lower.contains("/yarn/")
1727        || lower.contains("yarn")
1728    {
1729        "yarn"
1730    } else {
1731        "npm"
1732    }
1733}
1734
1735fn canonicalize_for_lsp(file_path: &Path) -> Result<PathBuf, LspError> {
1736    std::fs::canonicalize(file_path).map_err(LspError::from)
1737}
1738
1739fn resolve_for_lsp_uri(file_path: &Path) -> PathBuf {
1740    if let Ok(path) = std::fs::canonicalize(file_path) {
1741        return path;
1742    }
1743
1744    let mut existing = file_path.to_path_buf();
1745    let mut missing = Vec::new();
1746    while !existing.exists() {
1747        let Some(name) = existing.file_name() else {
1748            break;
1749        };
1750        missing.push(name.to_owned());
1751        let Some(parent) = existing.parent() else {
1752            break;
1753        };
1754        existing = parent.to_path_buf();
1755    }
1756
1757    let mut resolved = std::fs::canonicalize(&existing).unwrap_or(existing);
1758    for segment in missing.into_iter().rev() {
1759        resolved.push(segment);
1760    }
1761    resolved
1762}
1763
1764fn language_id_for_extension(ext: &str) -> &'static str {
1765    match ext {
1766        "ts" => "typescript",
1767        "tsx" => "typescriptreact",
1768        "js" | "mjs" | "cjs" => "javascript",
1769        "jsx" => "javascriptreact",
1770        "py" | "pyi" => "python",
1771        "rs" => "rust",
1772        "go" => "go",
1773        "html" | "htm" => "html",
1774        _ => "plaintext",
1775    }
1776}
1777
1778fn normalize_lookup_path(path: &Path) -> PathBuf {
1779    std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1780}
1781
1782/// Classify an error returned by `spawn_server` into a structured
1783/// `ServerAttemptResult`. The two interesting cases for callers are:
1784/// - `BinaryNotInstalled` — the server's binary couldn't be resolved on PATH
1785///   or via override. The agent can be told "install bash-language-server".
1786/// - `SpawnFailed` — binary was found but spawning/initializing failed
1787///   (permissions, missing runtime, server crashed during initialize, etc.).
1788fn classify_spawn_error(binary: &str, err: &LspError) -> ServerAttemptResult {
1789    match err {
1790        // resolve_binary returns NotFound for both missing override paths and
1791        // missing PATH binaries. The "override missing" case is rare in
1792        // practice (only set in tests / env vars); we report all NotFound as
1793        // BinaryNotInstalled so the user sees an actionable install hint.
1794        LspError::NotFound(_) => ServerAttemptResult::BinaryNotInstalled {
1795            binary: binary.to_string(),
1796        },
1797        other => ServerAttemptResult::SpawnFailed {
1798            binary: binary.to_string(),
1799            reason: other.to_string(),
1800        },
1801    }
1802}
1803
1804fn env_binary_override(kind: &ServerKind) -> Option<PathBuf> {
1805    let id = kind.id_str();
1806    let suffix: String = id
1807        .chars()
1808        .map(|ch| {
1809            if ch.is_ascii_alphanumeric() {
1810                ch.to_ascii_uppercase()
1811            } else {
1812                '_'
1813            }
1814        })
1815        .collect();
1816    let key = format!("AFT_LSP_{suffix}_BINARY");
1817    std::env::var_os(key).map(PathBuf::from)
1818}
1819
1820#[cfg(test)]
1821mod failure_hint_tests {
1822    use super::{failure_hint, rustup_missing_component};
1823
1824    #[test]
1825    fn detects_rustup_proxy_without_component() {
1826        // The exact rustup stderr for a proxy shim whose component is missing.
1827        let stderr = "error: Unknown binary 'rust-analyzer' in official toolchain 'stable-aarch64-apple-darwin'.";
1828        assert_eq!(
1829            rustup_missing_component(stderr).as_deref(),
1830            Some("rust-analyzer")
1831        );
1832        let hint = failure_hint("rust-analyzer", stderr);
1833        assert!(
1834            hint.contains("rustup component add rust-analyzer"),
1835            "expected actionable rustup hint, got: {hint}"
1836        );
1837    }
1838
1839    #[test]
1840    fn ignores_unknown_binary_without_toolchain_phrasing() {
1841        // "Unknown binary" without the rustup toolchain phrasing must not be
1842        // misattributed to a rustup component issue.
1843        let stderr = "fatal: Unknown binary 'foo' was requested by the linker.";
1844        assert_eq!(rustup_missing_component(stderr), None);
1845        assert!(failure_hint("foo", stderr).starts_with("Hint: see stderr"));
1846    }
1847
1848    #[test]
1849    fn npm_module_not_found_still_wins() {
1850        // The existing package-manager-shim case is unaffected.
1851        let stderr = "Error: Cannot find module '/x/typescript-language-server/lib/cli.mjs'";
1852        let hint = failure_hint("typescript-language-server", stderr);
1853        assert!(hint.contains("install -g"), "got: {hint}");
1854    }
1855}
1856
1857#[cfg(test)]
1858mod clear_diagnostics_tests {
1859    use std::path::PathBuf;
1860
1861    use super::LspManager;
1862    use crate::lsp::diagnostics::{DiagnosticSeverity, StoredDiagnostic};
1863    use crate::lsp::registry::ServerKind;
1864    use crate::lsp::roots::ServerKey;
1865
1866    fn err_diag(file: &PathBuf) -> StoredDiagnostic {
1867        StoredDiagnostic {
1868            file: file.clone(),
1869            line: 1,
1870            column: 1,
1871            end_line: 1,
1872            end_column: 2,
1873            severity: DiagnosticSeverity::Error,
1874            message: "boom".into(),
1875            code: None,
1876            source: None,
1877        }
1878    }
1879
1880    // A just-deleted file can no longer be canonicalized directly, but its
1881    // store key was the canonical path from publish time. The manager must
1882    // reconstruct that key via the still-present parent dir so symlink-aliased
1883    // roots (macOS /var -> /private/var) still match and the diagnostic clears.
1884    #[test]
1885    fn clear_diagnostics_for_deleted_file_matches_canonical_key() {
1886        let dir = tempfile::tempdir().unwrap();
1887        // Canonicalize the parent the way publish time would have.
1888        let canonical_dir = std::fs::canonicalize(dir.path()).unwrap();
1889        let canonical_file = canonical_dir.join("gone.ts");
1890        // Write then remove the file so its parent exists but the file does not,
1891        // mirroring the post-delete state the watcher observes.
1892        std::fs::write(&canonical_file, "x").unwrap();
1893
1894        let mut manager = LspManager::new();
1895        let key = ServerKey {
1896            kind: ServerKind::TypeScript,
1897            root: canonical_dir.clone(),
1898        };
1899        manager.diagnostics_store_mut_for_test().publish(
1900            key,
1901            canonical_file.clone(),
1902            vec![err_diag(&canonical_file)],
1903        );
1904        assert_eq!(manager.warm_error_warning_counts(), (1, 0));
1905
1906        std::fs::remove_file(&canonical_file).unwrap();
1907
1908        // Clear by the NON-canonical path the watcher might hand us (the raw
1909        // tempdir path, which on macOS differs from the canonical /private form).
1910        let watcher_path = dir.path().join("gone.ts");
1911        let removed = manager.clear_diagnostics_for_file(&watcher_path);
1912
1913        assert!(removed, "expected the deleted file's diagnostic to clear");
1914        assert_eq!(manager.warm_error_warning_counts(), (0, 0));
1915    }
1916
1917    #[test]
1918    fn clear_diagnostics_for_unknown_file_is_noop() {
1919        let mut manager = LspManager::new();
1920        assert!(!manager.clear_diagnostics_for_file(&PathBuf::from("/nope/missing.ts")));
1921        assert_eq!(manager.warm_error_warning_counts(), (0, 0));
1922    }
1923}