Skip to main content

aft/lsp/
manager.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3
4use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
5use lsp_types::notification::{
6    DidChangeTextDocument, DidChangeWatchedFiles, DidCloseTextDocument, DidOpenTextDocument,
7};
8use lsp_types::{
9    DidChangeTextDocumentParams, DidChangeWatchedFilesParams, DidCloseTextDocumentParams,
10    DidOpenTextDocumentParams, FileChangeType, FileEvent, TextDocumentContentChangeEvent,
11    TextDocumentIdentifier, TextDocumentItem, VersionedTextDocumentIdentifier,
12};
13
14use crate::config::Config;
15use crate::lsp::child_registry::LspChildRegistry;
16use crate::lsp::client::{LspClient, LspEvent, ServerState};
17use crate::lsp::diagnostics::{
18    from_lsp_diagnostics, DiagnosticEntry, DiagnosticsStore, StoredDiagnostic,
19};
20use crate::lsp::document::DocumentStore;
21use crate::lsp::position::{uri_for_path, uri_to_path};
22use crate::lsp::pull_params::{
23    AftDocumentDiagnosticParams, AftDocumentDiagnosticRequest, AftWorkspaceDiagnosticParams,
24    AftWorkspaceDiagnosticRequest,
25};
26use crate::lsp::registry::{resolve_lsp_binary, servers_for_file, ServerDef, ServerKind};
27use crate::lsp::roots::{find_workspace_root, ServerKey};
28use crate::lsp::LspError;
29use crate::slog_error;
30
31const STDERR_REASON_BYTES: usize = 2 * 1024;
32
33/// Outcome of attempting to ensure a server is running for a single matching
34/// `ServerDef`. Returned per matching server so the caller can report exactly
35/// what happened to the user instead of collapsing all failures into "no
36/// server".
37#[derive(Debug, Clone)]
38pub enum ServerAttemptResult {
39    /// Server is running and ready to serve requests for this file.
40    Ok { server_key: ServerKey },
41    /// No workspace root was found by walking up from the file looking for
42    /// any of the server's configured root markers.
43    NoRootMarker { looked_for: Vec<String> },
44    /// The server's binary could not be found on PATH (or override was
45    /// missing/invalid).
46    BinaryNotInstalled { binary: String },
47    /// Binary was found but spawning or initializing the server failed.
48    SpawnFailed { binary: String, reason: String },
49}
50
51/// One server's attempt to handle a file.
52#[derive(Debug, Clone)]
53pub struct ServerAttempt {
54    /// Stable server identifier (kind ID, e.g. "pyright", "rust-analyzer").
55    pub server_id: String,
56    /// Server display name from the registry.
57    pub server_name: String,
58    pub result: ServerAttemptResult,
59}
60
61/// Aggregate outcome of `ensure_server_for_file_detailed`. Distinguishes:
62/// - "No server registered for this file's extension" (`attempts.is_empty()`)
63/// - "Servers registered but none could start" (`successful.is_empty()` but
64///   `!attempts.is_empty()`)
65/// - "At least one server is ready" (`!successful.is_empty()`)
66#[derive(Debug, Clone, Default)]
67pub struct EnsureServerOutcomes {
68    /// Server keys that are now running and ready to serve requests.
69    pub successful: Vec<ServerKey>,
70    /// Per-server attempt records. Empty if no server is registered for the
71    /// file's extension.
72    pub attempts: Vec<ServerAttempt>,
73}
74
75impl EnsureServerOutcomes {
76    /// True if no server in the registry matched this file's extension.
77    pub fn no_server_registered(&self) -> bool {
78        self.attempts.is_empty()
79    }
80}
81
82/// Outcome of a post-edit diagnostics wait. Reports the per-server status
83/// alongside the fresh diagnostics, so the response layer can build an
84/// honest tri-state payload (`success: true` + `complete: bool` + named
85/// gap fields per `crates/aft/src/protocol.rs`).
86///
87/// `diagnostics` only contains entries from servers that proved freshness
88/// (version-match preferred, epoch-fallback for unversioned servers).
89/// Pre-edit cached entries are NEVER included — that's the whole point of
90/// this type.
91#[derive(Debug, Clone, Default)]
92pub struct PostEditWaitOutcome {
93    /// Diagnostics from servers whose response we verified is FOR the
94    /// post-edit document version (or whose epoch we saw advance after our
95    /// pre-edit snapshot, for unversioned servers).
96    pub diagnostics: Vec<StoredDiagnostic>,
97    /// Servers we expected to publish but didn't before the deadline.
98    /// Reported to the agent via `pending_lsp_servers` so they understand
99    /// the result is partial.
100    pub pending_servers: Vec<ServerKey>,
101    /// Servers whose process exited between notification and deadline.
102    /// Reported separately so the agent knows the gap is unrecoverable
103    /// without a server restart, not "wait longer."
104    pub exited_servers: Vec<ServerKey>,
105}
106
107/// Pre-edit freshness snapshot for one server/file pair.
108#[derive(Debug, Clone, Copy, Default)]
109pub struct PreEditSnapshot {
110    pub epoch: u64,
111    pub document_version_at_capture: Option<i32>,
112}
113
114pub fn post_edit_entry_is_fresh(
115    entry: &DiagnosticEntry,
116    target_version: i32,
117    pre: PreEditSnapshot,
118) -> bool {
119    if entry.epoch <= pre.epoch {
120        return false;
121    }
122
123    match entry.version {
124        Some(version) => version >= target_version,
125        // Unversioned publishDiagnostics payloads cannot prove which document
126        // state they describe. Epoch advancement only proves arrival order; an
127        // old analysis result can still arrive after our pre-snapshot. Treat as
128        // pending/partial rather than fresh.
129        None => false,
130    }
131}
132
133impl PostEditWaitOutcome {
134    /// True if every expected server reported a fresh result. False means
135    /// the agent should treat the diagnostics as a partial picture.
136    pub fn complete(&self) -> bool {
137        self.pending_servers.is_empty() && self.exited_servers.is_empty()
138    }
139}
140
141/// Per-server outcome of a `textDocument/diagnostic` (per-file pull) request.
142#[derive(Debug, Clone)]
143pub enum PullFileOutcome {
144    /// Server returned a full report; diagnostics stored.
145    Full { diagnostic_count: usize },
146    /// Server returned `kind: "unchanged"` — cached diagnostics still valid.
147    Unchanged,
148    /// Server returned a partial-result token; we don't subscribe to streamed
149    /// progress so the response is treated as a soft empty until the next pull.
150    PartialNotSupported,
151    /// Server doesn't advertise pull capability — caller should fall back to
152    /// push diagnostics for this server.
153    PullNotSupported,
154    /// The pull request failed (timeout, server error, etc.).
155    RequestFailed { reason: String },
156}
157
158/// Result of `pull_file_diagnostics` for one matching server.
159#[derive(Debug, Clone)]
160pub struct PullFileResult {
161    pub server_key: ServerKey,
162    pub outcome: PullFileOutcome,
163}
164
165/// Result of `pull_workspace_diagnostics` for a single server.
166#[derive(Debug, Clone)]
167pub struct PullWorkspaceResult {
168    pub server_key: ServerKey,
169    /// Files for which a Full report was received and cached. Files that came
170    /// back as `Unchanged` are NOT listed here because their cached entry was
171    /// already authoritative.
172    pub files_reported: Vec<PathBuf>,
173    /// True if the server returned a full response within the timeout.
174    pub complete: bool,
175    /// True if we cancelled (request timed out before the server responded).
176    pub cancelled: bool,
177    /// True if the server advertised workspace pull support. When false, the
178    /// other fields are empty and the caller should fall back to file-mode
179    /// pull or to push semantics.
180    pub supports_workspace: bool,
181}
182
183pub struct LspManager {
184    /// Active server instances, keyed by (ServerKind, workspace_root).
185    clients: HashMap<ServerKey, LspClient>,
186    /// Binary names for active server instances. Kept separate from
187    /// `LspClient` so crash handling can report the installable binary name
188    /// after a post-initialize process exit.
189    server_binaries: HashMap<ServerKey, String>,
190    /// Tracks opened documents and versions per active server.
191    documents: HashMap<ServerKey, DocumentStore>,
192    /// Stored publishDiagnostics payloads across all servers.
193    diagnostics: DiagnosticsStore,
194    /// Unified event channel — all server reader threads send here.
195    event_tx: Sender<LspEvent>,
196    event_rx: Receiver<LspEvent>,
197    /// Optional binary path overrides used by integration tests.
198    binary_overrides: HashMap<ServerKind, PathBuf>,
199    /// Extra env vars merged into every spawned LSP child. Used in tests to
200    /// drive the fake server's behavioral variants (`AFT_FAKE_LSP_PULL=1`,
201    /// `AFT_FAKE_LSP_WORKSPACE=1`, etc.). Production code does not set this.
202    extra_env: HashMap<String, String>,
203    /// Per-(kind,root) cache of spawn failures. Once a server fails to spawn
204    /// for a workspace root, we remember why and skip subsequent attempts for
205    /// the lifetime of this AFT process. Without this, every file open or
206    /// didChange retries `spawn_server` and logs a fresh ERROR — visible as
207    /// repeated `failed to spawn TypeScript Language Server: Could not find a
208    /// valid TypeScript installation` lines per edit.
209    ///
210    /// Entries are NEVER evicted automatically. The expected recovery path is
211    /// for the user to fix their environment (install the missing binary or
212    /// add a `tsconfig.json` / `package.json` with the right dependency) and
213    /// restart OpenCode/Pi, which spawns a fresh `aft` process with an empty
214    /// cache. We deliberately don't auto-retry on file events: the failure
215    /// modes we track here (binary not installed, init handshake failure)
216    /// don't fix themselves at runtime.
217    failed_spawns: HashMap<ServerKey, ServerAttemptResult>,
218    /// Server/root pairs for which we already logged that watched-file
219    /// notifications are skipped because the capability is absent.
220    watched_file_skip_logged: HashSet<ServerKey>,
221    /// Tracks PIDs of spawned LSP child processes so the signal handler can
222    /// kill them on SIGTERM/SIGINT before aft exits, preventing orphans.
223    /// Defaults to empty; production wires this from `AppContext`.
224    child_registry: LspChildRegistry,
225}
226
227impl LspManager {
228    pub fn new() -> Self {
229        let (event_tx, event_rx) = unbounded();
230        Self {
231            clients: HashMap::new(),
232            server_binaries: HashMap::new(),
233            documents: HashMap::new(),
234            diagnostics: DiagnosticsStore::new(),
235            event_tx,
236            event_rx,
237            binary_overrides: HashMap::new(),
238            extra_env: HashMap::new(),
239            failed_spawns: HashMap::new(),
240            watched_file_skip_logged: HashSet::new(),
241            child_registry: LspChildRegistry::new(),
242        }
243    }
244
245    /// Set the child-PID registry. Must be called before any servers spawn.
246    pub fn set_child_registry(&mut self, registry: LspChildRegistry) {
247        self.child_registry = registry;
248    }
249
250    /// For testing: set an extra environment variable that gets passed to
251    /// every spawned LSP child process. Useful for driving fake-server
252    /// behavioral variants in integration tests.
253    pub fn set_extra_env(&mut self, key: &str, value: &str) {
254        self.extra_env.insert(key.to_string(), value.to_string());
255    }
256
257    /// Count active LSP server instances.
258    pub fn server_count(&self) -> usize {
259        self.clients.len()
260    }
261
262    /// For testing: override the binary for a server kind.
263    pub fn override_binary(&mut self, kind: ServerKind, binary_path: PathBuf) {
264        self.binary_overrides.insert(kind, binary_path);
265    }
266
267    /// Ensure a server is running for the given file. Spawns if needed.
268    /// Returns the active server keys for the file, or an empty vec if none match.
269    ///
270    /// This is the lightweight wrapper around [`ensure_server_for_file_detailed`]
271    /// that drops failure context. Prefer the detailed variant in command
272    /// handlers that need to surface honest error messages to the agent.
273    pub fn ensure_server_for_file(&mut self, file_path: &Path, config: &Config) -> Vec<ServerKey> {
274        self.ensure_server_for_file_detailed(file_path, config)
275            .successful
276    }
277
278    /// Detailed version of [`ensure_server_for_file`] that records every
279    /// matching server's outcome (`Ok` / `NoRootMarker` / `BinaryNotInstalled`
280    /// / `SpawnFailed`).
281    ///
282    /// Use this when the caller wants to honestly report _why_ a file has no
283    /// active server (e.g., to surface "bash-language-server not on PATH" to
284    /// the agent instead of silently returning `total: 0`).
285    pub fn ensure_server_for_file_detailed(
286        &mut self,
287        file_path: &Path,
288        config: &Config,
289    ) -> EnsureServerOutcomes {
290        let defs = servers_for_file(file_path, config);
291        let mut outcomes = EnsureServerOutcomes::default();
292
293        for def in defs {
294            let server_id = def.kind.id_str().to_string();
295            let server_name = def.name.to_string();
296
297            let Some(root) = find_workspace_root(file_path, &def.root_markers) else {
298                outcomes.attempts.push(ServerAttempt {
299                    server_id,
300                    server_name,
301                    result: ServerAttemptResult::NoRootMarker {
302                        looked_for: def.root_markers.iter().map(|s| s.to_string()).collect(),
303                    },
304                });
305                continue;
306            };
307
308            let key = ServerKey {
309                kind: def.kind.clone(),
310                root,
311            };
312
313            if !self.clients.contains_key(&key) {
314                // If we already tried and failed to spawn this server for this
315                // root, return the cached classification without retrying or
316                // re-logging. This prevents per-edit ERROR spam when the user's
317                // environment is missing a dependency the LSP needs (the
318                // typescript-language-server "Could not find a valid TypeScript
319                // installation" case is the canonical example).
320                if let Some(cached) = self.failed_spawns.get(&key) {
321                    outcomes.attempts.push(ServerAttempt {
322                        server_id,
323                        server_name,
324                        result: cached.clone(),
325                    });
326                    continue;
327                }
328
329                match self.spawn_server(&def, &key.root, config) {
330                    Ok(client) => {
331                        self.clients.insert(key.clone(), client);
332                        self.server_binaries.insert(key.clone(), def.binary.clone());
333                        self.documents.entry(key.clone()).or_default();
334                    }
335                    Err(err) => {
336                        slog_error!("failed to spawn {}: {}", def.name, err);
337                        let result = classify_spawn_error(&def.binary, &err);
338                        // Remember the failure so subsequent file events skip
339                        // this (kind, root) pair instead of producing a fresh
340                        // spawn attempt + ERROR log per request.
341                        self.failed_spawns.insert(key.clone(), result.clone());
342                        outcomes.attempts.push(ServerAttempt {
343                            server_id,
344                            server_name,
345                            result,
346                        });
347                        continue;
348                    }
349                }
350            }
351
352            outcomes.attempts.push(ServerAttempt {
353                server_id,
354                server_name,
355                result: ServerAttemptResult::Ok {
356                    server_key: key.clone(),
357                },
358            });
359            outcomes.successful.push(key);
360        }
361
362        outcomes
363    }
364
365    /// Ensure a server is running using the default LSP registry.
366    /// Kept for integration tests that exercise built-in server helpers directly.
367    pub fn ensure_server_for_file_default(&mut self, file_path: &Path) -> Vec<ServerKey> {
368        self.ensure_server_for_file(file_path, &Config::default())
369    }
370    /// Ensure that servers are running for the file and that the document is open
371    /// in each server's DocumentStore. Reads file content from disk if not already open.
372    /// Returns the server keys for the file.
373    pub fn ensure_file_open(
374        &mut self,
375        file_path: &Path,
376        config: &Config,
377    ) -> Result<Vec<ServerKey>, LspError> {
378        let canonical_path = canonicalize_for_lsp(file_path)?;
379        let server_keys = self.ensure_server_for_file(&canonical_path, config);
380        if server_keys.is_empty() {
381            return Ok(server_keys);
382        }
383
384        let uri = uri_for_path(&canonical_path)?;
385        let language_id = language_id_for_extension(
386            canonical_path
387                .extension()
388                .and_then(|ext| ext.to_str())
389                .unwrap_or_default(),
390        )
391        .to_string();
392
393        for key in &server_keys {
394            let already_open = self
395                .documents
396                .get(key)
397                .is_some_and(|store| store.is_open(&canonical_path));
398
399            if !already_open {
400                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
401                if let Some(client) = self.clients.get_mut(key) {
402                    client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
403                        text_document: TextDocumentItem::new(
404                            uri.clone(),
405                            language_id.clone(),
406                            0,
407                            content,
408                        ),
409                    })?;
410                }
411                self.documents
412                    .entry(key.clone())
413                    .or_default()
414                    .open(canonical_path.clone());
415                continue;
416            }
417
418            // Document is already open. Check disk drift — if the file has
419            // been modified outside the AFT pipeline (other tool, manual
420            // edit, sibling session) we MUST send a didChange before any
421            // pull-diagnostic / hover query, otherwise the LSP server
422            // returns results computed from stale in-memory content.
423            //
424            // This is the regression fix Oracle flagged in finding #6:
425            // "ensure_file_open skips already-open files without checking
426            // if disk content changed."
427            let drifted = self
428                .documents
429                .get(key)
430                .is_some_and(|store| store.is_stale_on_disk(&canonical_path));
431            if drifted {
432                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
433                let next_version = self
434                    .documents
435                    .get(key)
436                    .and_then(|store| store.version(&canonical_path))
437                    .map(|v| v + 1)
438                    .unwrap_or(1);
439                if let Some(client) = self.clients.get_mut(key) {
440                    client.send_notification::<DidChangeTextDocument>(
441                        DidChangeTextDocumentParams {
442                            text_document: VersionedTextDocumentIdentifier::new(
443                                uri.clone(),
444                                next_version,
445                            ),
446                            content_changes: vec![TextDocumentContentChangeEvent {
447                                range: None,
448                                range_length: None,
449                                text: content,
450                            }],
451                        },
452                    )?;
453                }
454                if let Some(store) = self.documents.get_mut(key) {
455                    store.bump_version(&canonical_path);
456                }
457            }
458        }
459
460        Ok(server_keys)
461    }
462
463    pub fn ensure_file_open_default(
464        &mut self,
465        file_path: &Path,
466    ) -> Result<Vec<ServerKey>, LspError> {
467        self.ensure_file_open(file_path, &Config::default())
468    }
469
470    /// Notify relevant LSP servers that a file has been written/changed.
471    /// This is the main hook called after every file write in AFT.
472    ///
473    /// If the file's server isn't running yet, starts it (lazy spawn).
474    /// If the file isn't open in LSP yet, sends didOpen. Otherwise sends didChange.
475    pub fn notify_file_changed(
476        &mut self,
477        file_path: &Path,
478        content: &str,
479        config: &Config,
480    ) -> Result<(), LspError> {
481        self.notify_file_changed_versioned(file_path, content, config)
482            .map(|_| ())
483    }
484
485    /// Like `notify_file_changed`, but returns the target document version
486    /// per server so the post-edit waiter can match `publishDiagnostics`
487    /// against the exact version that this notification carried.
488    ///
489    /// Returns: `Vec<(ServerKey, target_version)>`. `target_version` is the
490    /// `version` field on the `VersionedTextDocumentIdentifier` we just sent
491    /// (post-bump). For freshly-opened documents (`didOpen`) the version is
492    /// `0`. Servers that don't honor versioned text document sync will not
493    /// echo this back on `publishDiagnostics`; the caller is expected to
494    /// fall back to the epoch-delta path for those.
495    pub fn notify_file_changed_versioned(
496        &mut self,
497        file_path: &Path,
498        content: &str,
499        config: &Config,
500    ) -> Result<Vec<(ServerKey, i32)>, LspError> {
501        let canonical_path = canonicalize_for_lsp(file_path)?;
502        let server_keys = self.ensure_server_for_file(&canonical_path, config);
503        if server_keys.is_empty() {
504            return Ok(Vec::new());
505        }
506
507        let uri = uri_for_path(&canonical_path)?;
508        let language_id = language_id_for_extension(
509            canonical_path
510                .extension()
511                .and_then(|ext| ext.to_str())
512                .unwrap_or_default(),
513        )
514        .to_string();
515
516        let mut versions: Vec<(ServerKey, i32)> = Vec::with_capacity(server_keys.len());
517
518        for key in server_keys {
519            let current_version = self
520                .documents
521                .get(&key)
522                .and_then(|store| store.version(&canonical_path));
523
524            if let Some(version) = current_version {
525                let next_version = version + 1;
526                if let Some(client) = self.clients.get_mut(&key) {
527                    client.send_notification::<DidChangeTextDocument>(
528                        DidChangeTextDocumentParams {
529                            text_document: VersionedTextDocumentIdentifier::new(
530                                uri.clone(),
531                                next_version,
532                            ),
533                            content_changes: vec![TextDocumentContentChangeEvent {
534                                range: None,
535                                range_length: None,
536                                text: content.to_string(),
537                            }],
538                        },
539                    )?;
540                }
541                if let Some(store) = self.documents.get_mut(&key) {
542                    store.bump_version(&canonical_path);
543                }
544                versions.push((key, next_version));
545                continue;
546            }
547
548            if let Some(client) = self.clients.get_mut(&key) {
549                client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
550                    text_document: TextDocumentItem::new(
551                        uri.clone(),
552                        language_id.clone(),
553                        0,
554                        content.to_string(),
555                    ),
556                })?;
557            }
558            self.documents
559                .entry(key.clone())
560                .or_default()
561                .open(canonical_path.clone());
562            // didOpen carries version 0 — that's the version the server
563            // will echo on its first publishDiagnostics for this document.
564            versions.push((key, 0));
565        }
566
567        Ok(versions)
568    }
569
570    pub fn notify_file_changed_default(
571        &mut self,
572        file_path: &Path,
573        content: &str,
574    ) -> Result<(), LspError> {
575        self.notify_file_changed(file_path, content, &Config::default())
576    }
577
578    /// Notify every active server whose workspace contains at least one changed
579    /// path that watched files changed. This is intentionally workspace-scoped
580    /// rather than extension-scoped: configuration edits such as `package.json`
581    /// or `tsconfig.json` affect a server's project graph even though those
582    /// files may not be documents handled by the server itself.
583    pub fn notify_files_watched_changed(
584        &mut self,
585        paths: &[(PathBuf, FileChangeType)],
586        _config: &Config,
587    ) -> Result<(), LspError> {
588        if paths.is_empty() {
589            return Ok(());
590        }
591
592        let mut canonical_events = Vec::with_capacity(paths.len());
593        for (path, typ) in paths {
594            let canonical_path = resolve_for_lsp_uri(path);
595            canonical_events.push((canonical_path, *typ));
596        }
597
598        let keys: Vec<ServerKey> = self.clients.keys().cloned().collect();
599        for key in keys {
600            let mut changes = Vec::new();
601            for (path, typ) in &canonical_events {
602                if !path.starts_with(&key.root) {
603                    continue;
604                }
605                changes.push(FileEvent::new(uri_for_path(path)?, *typ));
606            }
607
608            if changes.is_empty() {
609                continue;
610            }
611
612            if let Some(client) = self.clients.get_mut(&key) {
613                // Send when the server either advertised initialize-time
614                // watched-file support or dynamically registered a watcher.
615                // The dynamic client capability we send during initialize only
616                // permits runtime registration; it is tracked separately via
617                // `has_watched_file_registration()`.
618                let supports_static_watched_files = client.supports_watched_files();
619                let has_dynamic_registration = client.has_watched_file_registration();
620                if !(supports_static_watched_files || has_dynamic_registration) {
621                    if self.watched_file_skip_logged.insert(key.clone()) {
622                        log::debug!(
623                            "skipping didChangeWatchedFiles for {:?} (not supported or registered)",
624                            key
625                        );
626                    }
627                    continue;
628                }
629                client.send_notification::<DidChangeWatchedFiles>(DidChangeWatchedFilesParams {
630                    changes,
631                })?;
632            }
633        }
634
635        Ok(())
636    }
637
638    /// Close a document in all servers that have it open.
639    pub fn notify_file_closed(&mut self, file_path: &Path) -> Result<(), LspError> {
640        let canonical_path = canonicalize_for_lsp(file_path)?;
641        let uri = uri_for_path(&canonical_path)?;
642        let keys: Vec<ServerKey> = self.documents.keys().cloned().collect();
643
644        for key in keys {
645            let was_open = self
646                .documents
647                .get(&key)
648                .map(|store| store.is_open(&canonical_path))
649                .unwrap_or(false);
650            if !was_open {
651                continue;
652            }
653
654            if let Some(client) = self.clients.get_mut(&key) {
655                client.send_notification::<DidCloseTextDocument>(DidCloseTextDocumentParams {
656                    text_document: TextDocumentIdentifier::new(uri.clone()),
657                })?;
658            }
659
660            if let Some(store) = self.documents.get_mut(&key) {
661                store.close(&canonical_path);
662            }
663            self.diagnostics
664                .clear_for_server_file(&key, &canonical_path);
665        }
666
667        Ok(())
668    }
669
670    /// Get an active client for a file path, if one exists.
671    pub fn client_for_file(&self, file_path: &Path, config: &Config) -> Option<&LspClient> {
672        let key = self.server_key_for_file(file_path, config)?;
673        self.clients.get(&key)
674    }
675
676    pub fn client_for_file_default(&self, file_path: &Path) -> Option<&LspClient> {
677        self.client_for_file(file_path, &Config::default())
678    }
679
680    /// Get a mutable active client for a file path, if one exists.
681    pub fn client_for_file_mut(
682        &mut self,
683        file_path: &Path,
684        config: &Config,
685    ) -> Option<&mut LspClient> {
686        let key = self.server_key_for_file(file_path, config)?;
687        self.clients.get_mut(&key)
688    }
689
690    pub fn client_for_file_mut_default(&mut self, file_path: &Path) -> Option<&mut LspClient> {
691        self.client_for_file_mut(file_path, &Config::default())
692    }
693
694    /// Number of tracked server clients.
695    pub fn active_client_count(&self) -> usize {
696        self.clients.len()
697    }
698
699    /// Drain all pending LSP events. Call from the main loop.
700    pub fn drain_events(&mut self) -> Vec<LspEvent> {
701        let mut events = Vec::new();
702        while let Ok(event) = self.event_rx.try_recv() {
703            self.handle_event(&event);
704            events.push(event);
705        }
706        events
707    }
708
709    /// Wait for diagnostics to arrive for a specific file until a timeout expires.
710    pub fn wait_for_diagnostics(
711        &mut self,
712        file_path: &Path,
713        config: &Config,
714        timeout: std::time::Duration,
715    ) -> Vec<StoredDiagnostic> {
716        let deadline = std::time::Instant::now() + timeout;
717        self.wait_for_file_diagnostics(file_path, config, deadline)
718    }
719
720    pub fn wait_for_diagnostics_default(
721        &mut self,
722        file_path: &Path,
723        timeout: std::time::Duration,
724    ) -> Vec<StoredDiagnostic> {
725        self.wait_for_diagnostics(file_path, &Config::default(), timeout)
726    }
727
728    /// Test-only accessor for the diagnostics store. Used by integration
729    /// tests that need to inspect per-server entries (e.g., to verify that
730    /// `ServerKey::root` is populated correctly, not the empty path that
731    /// the legacy `publish_with_kind` path produced).
732    #[doc(hidden)]
733    pub fn diagnostics_store_for_test(&self) -> &DiagnosticsStore {
734        &self.diagnostics
735    }
736
737    /// Snapshot the current per-server epoch for every entry that exists
738    /// for `file_path`. Servers without an entry yet (never published)
739    /// are absent from the map; for those, `pre = 0` (any first publish
740    /// will be considered fresh under the epoch-fallback rule).
741    pub fn snapshot_diagnostic_epochs(&self, file_path: &Path) -> HashMap<ServerKey, u64> {
742        let lookup_path = normalize_lookup_path(file_path);
743        self.diagnostics
744            .entries_for_file(&lookup_path)
745            .into_iter()
746            .map(|(key, entry)| (key.clone(), entry.epoch))
747            .collect()
748    }
749
750    /// Snapshot the current diagnostic epoch and document version for every
751    /// active server relevant to `file_path` before a post-edit notification.
752    pub fn snapshot_pre_edit_state(&self, file_path: &Path) -> HashMap<ServerKey, PreEditSnapshot> {
753        let lookup_path = normalize_lookup_path(file_path);
754        let mut snapshots: HashMap<ServerKey, PreEditSnapshot> = self
755            .diagnostics
756            .entries_for_file(&lookup_path)
757            .into_iter()
758            .map(|(key, entry)| {
759                (
760                    key.clone(),
761                    PreEditSnapshot {
762                        epoch: entry.epoch,
763                        document_version_at_capture: None,
764                    },
765                )
766            })
767            .collect();
768
769        for (key, store) in &self.documents {
770            if let Some(version) = store.version(&lookup_path) {
771                snapshots
772                    .entry(key.clone())
773                    .or_default()
774                    .document_version_at_capture = Some(version);
775            }
776        }
777
778        snapshots
779    }
780
781    /// True when the current diagnostic entry for `server_key` can be tied to
782    /// that server's current in-memory document version for `file_path`.
783    ///
784    /// File-mode `lsp_diagnostics` uses this for push-only fallback after it
785    /// has synced/opened the document. Versioned publishes are accepted when
786    /// they match the current document version; unversioned publishes are not
787    /// accepted as fresh because epoch/wall-clock ordering alone is racy.
788    pub fn diagnostic_entry_is_fresh_for_document(
789        &self,
790        file_path: &Path,
791        server_key: &ServerKey,
792        pre: PreEditSnapshot,
793    ) -> bool {
794        let lookup_path = normalize_lookup_path(file_path);
795        let Some(entry) = self
796            .diagnostics
797            .entries_for_file(&lookup_path)
798            .into_iter()
799            .find_map(|(key, entry)| if key == server_key { Some(entry) } else { None })
800        else {
801            return false;
802        };
803
804        let target_version = self
805            .documents
806            .get(server_key)
807            .and_then(|store| store.version(&lookup_path))
808            .or(pre.document_version_at_capture)
809            .unwrap_or(0);
810
811        matches!(entry.version, Some(version) if version >= target_version)
812    }
813
814    /// Wait for FRESH per-server diagnostics that match the just-sent
815    /// document version. This is the v0.17.3 post-edit path that fixes the
816    /// stale-diagnostics bug: instead of returning whatever is in the cache
817    /// when the deadline hits, we only return entries whose `version`
818    /// matches the post-edit target version (or, for servers that don't
819    /// participate in versioned sync, whose `epoch` was bumped after the
820    /// pre-edit snapshot).
821    ///
822    /// `expected_versions` should come from `notify_file_changed_versioned`
823    /// — one `(ServerKey, target_version)` per server we sent didChange/
824    /// didOpen to.
825    ///
826    /// `pre_snapshot` is the per-server epoch BEFORE the notification was
827    /// sent; it gates the epoch-fallback path so an old-version publish
828    /// arriving after `drain_events` and before `didChange` cannot be
829    /// mistaken for a fresh response.
830    ///
831    /// Returns a per-server tri-state: `Fresh` (publish matched target
832    /// version OR epoch advanced past snapshot for an unversioned server),
833    /// `Pending` (deadline hit before this server published anything we
834    /// could verify), or `Exited` (server died between notification and
835    /// deadline).
836    pub fn wait_for_post_edit_diagnostics(
837        &mut self,
838        file_path: &Path,
839        // `config` is intentionally accepted (matches sibling wait APIs and
840        // future-proofs us if freshness rules need it). Currently unused
841        // because expected_versions/pre_snapshot fully determine behavior.
842        _config: &Config,
843        expected_versions: &[(ServerKey, i32)],
844        pre_snapshot: &HashMap<ServerKey, PreEditSnapshot>,
845        timeout: std::time::Duration,
846    ) -> PostEditWaitOutcome {
847        let lookup_path = normalize_lookup_path(file_path);
848        let deadline = std::time::Instant::now() + timeout;
849
850        // Drain any events that arrived while we were sending didChange.
851        // The publishDiagnostics handler stores the version, so even
852        // pre-snapshot publishes that landed late won't be mistaken for
853        // fresh — the version-match check will reject them.
854        let _ = self.drain_events_for_file(&lookup_path);
855
856        let mut fresh: HashMap<ServerKey, Vec<StoredDiagnostic>> = HashMap::new();
857        let mut exited: Vec<ServerKey> = Vec::new();
858
859        loop {
860            // Check freshness for every expected server. A server is fresh
861            // if its current entry for this file satisfies either:
862            //   1. version-match: entry.version == Some(target_version), OR
863            //   2. push-only freshness: entry.version is None AND entry.epoch
864            //      advanced strictly after the pre-edit snapshot. Versioned
865            //      publishes must be >= the post-edit target version.
866            // Servers whose process has exited are reported separately.
867            for (key, target_version) in expected_versions {
868                if fresh.contains_key(key) || exited.contains(key) {
869                    continue;
870                }
871                if !self.clients.contains_key(key) {
872                    exited.push(key.clone());
873                    continue;
874                }
875                if let Some(entry) = self
876                    .diagnostics
877                    .entries_for_file(&lookup_path)
878                    .into_iter()
879                    .find_map(|(k, e)| if k == key { Some(e) } else { None })
880                {
881                    let pre = pre_snapshot.get(key).copied().unwrap_or_default();
882                    let is_fresh = post_edit_entry_is_fresh(entry, *target_version, pre);
883                    if is_fresh {
884                        fresh.insert(key.clone(), entry.diagnostics.clone());
885                    }
886                }
887            }
888
889            // All accounted for? Done.
890            if fresh.len() + exited.len() == expected_versions.len() {
891                break;
892            }
893
894            let now = std::time::Instant::now();
895            if now >= deadline {
896                break;
897            }
898
899            let timeout = deadline.saturating_duration_since(now);
900            match self.event_rx.recv_timeout(timeout) {
901                Ok(event) => {
902                    self.handle_event(&event);
903                }
904                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
905            }
906        }
907
908        // Pending = expected but neither fresh nor exited.
909        let pending: Vec<ServerKey> = expected_versions
910            .iter()
911            .filter(|(k, _)| !fresh.contains_key(k) && !exited.contains(k))
912            .map(|(k, _)| k.clone())
913            .collect();
914
915        // Build deduplicated, sorted diagnostics from the fresh servers only.
916        // Stale or pending servers contribute zero diagnostics.
917        let mut diagnostics: Vec<StoredDiagnostic> = fresh
918            .into_iter()
919            .flat_map(|(_, diags)| diags.into_iter())
920            .collect();
921        diagnostics.sort_by(|a, b| {
922            a.file
923                .cmp(&b.file)
924                .then(a.line.cmp(&b.line))
925                .then(a.column.cmp(&b.column))
926                .then(a.message.cmp(&b.message))
927        });
928
929        PostEditWaitOutcome {
930            diagnostics,
931            pending_servers: pending,
932            exited_servers: exited,
933        }
934    }
935
936    /// Wait for diagnostics to arrive for a specific file until a deadline.
937    ///
938    /// Drains already-queued events first, then blocks on the shared event
939    /// channel only until either `publishDiagnostics` arrives for this file or
940    /// the deadline is reached.
941    pub fn wait_for_file_diagnostics(
942        &mut self,
943        file_path: &Path,
944        config: &Config,
945        deadline: std::time::Instant,
946    ) -> Vec<StoredDiagnostic> {
947        let lookup_path = normalize_lookup_path(file_path);
948
949        if self.server_key_for_file(&lookup_path, config).is_none() {
950            return Vec::new();
951        }
952
953        loop {
954            if self.drain_events_for_file(&lookup_path) {
955                break;
956            }
957
958            let now = std::time::Instant::now();
959            if now >= deadline {
960                break;
961            }
962
963            let timeout = deadline.saturating_duration_since(now);
964            match self.event_rx.recv_timeout(timeout) {
965                Ok(event) => {
966                    if matches!(
967                        self.handle_event(&event),
968                        Some(ref published_file) if published_file.as_path() == lookup_path.as_path()
969                    ) {
970                        break;
971                    }
972                }
973                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
974            }
975        }
976
977        self.get_diagnostics_for_file(&lookup_path)
978            .into_iter()
979            .cloned()
980            .collect()
981    }
982
983    /// Default timeout for `textDocument/diagnostic` (per-file pull). Servers
984    /// usually respond in under 1s for files they've already analyzed; we
985    /// allow up to 10s before falling back to push semantics. Currently
986    /// surfaced via [`Self::pull_file_timeout`] for callers that want to
987    /// override the wait via the `wait_ms` knob.
988    pub const PULL_FILE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
989
990    /// Public accessor so command handlers can reuse the documented default.
991    pub fn pull_file_timeout() -> std::time::Duration {
992        Self::PULL_FILE_TIMEOUT
993    }
994
995    /// Default timeout for `workspace/diagnostic`. The LSP spec allows the
996    /// server to hold this open indefinitely; we cap at 10s and report
997    /// `complete: false` to the agent rather than hanging the bridge.
998    const PULL_WORKSPACE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
999
1000    /// Issue a `textDocument/diagnostic` (LSP 3.17 per-file pull) request to
1001    /// every server that supports pull diagnostics for the given file.
1002    ///
1003    /// Returns the per-server outcome. If a server reports `kind: "unchanged"`,
1004    /// the cached entry's diagnostics are surfaced (deterministic re-use of
1005    /// the previous response). If a server doesn't advertise pull capability,
1006    /// it's skipped here — the caller should fall back to push for those.
1007    ///
1008    /// Side effects: results are stored in `DiagnosticsStore` so directory-mode
1009    /// queries can aggregate them later.
1010    pub fn pull_file_diagnostics(
1011        &mut self,
1012        file_path: &Path,
1013        config: &Config,
1014    ) -> Result<Vec<PullFileResult>, LspError> {
1015        let canonical_path = canonicalize_for_lsp(file_path)?;
1016        // Make sure servers are running and the document is open with fresh
1017        // content (handles disk-drift via DocumentStore::is_stale_on_disk).
1018        self.ensure_file_open(&canonical_path, config)?;
1019
1020        let server_keys = self.ensure_server_for_file(&canonical_path, config);
1021        if server_keys.is_empty() {
1022            return Ok(Vec::new());
1023        }
1024
1025        let uri = uri_for_path(&canonical_path)?;
1026        let mut results = Vec::with_capacity(server_keys.len());
1027
1028        for key in server_keys {
1029            let supports_pull = self
1030                .clients
1031                .get(&key)
1032                .and_then(|c| c.diagnostic_capabilities())
1033                .is_some_and(|caps| caps.pull_diagnostics);
1034
1035            if !supports_pull {
1036                results.push(PullFileResult {
1037                    server_key: key.clone(),
1038                    outcome: PullFileOutcome::PullNotSupported,
1039                });
1040                continue;
1041            }
1042
1043            // Look up previous resultId for incremental requests.
1044            let previous_result_id = self
1045                .diagnostics
1046                .entries_for_file(&canonical_path)
1047                .into_iter()
1048                .find(|(k, _)| **k == key)
1049                .and_then(|(_, entry)| entry.result_id.clone());
1050
1051            let identifier = self
1052                .clients
1053                .get(&key)
1054                .and_then(|c| c.diagnostic_capabilities())
1055                .and_then(|caps| caps.identifier.clone());
1056
1057            let params = AftDocumentDiagnosticParams {
1058                text_document: lsp_types::TextDocumentIdentifier { uri: uri.clone() },
1059                identifier,
1060                previous_result_id,
1061                work_done_progress_params: Default::default(),
1062                partial_result_params: Default::default(),
1063            };
1064
1065            let outcome = match self.send_pull_request(&key, params) {
1066                Ok(report) => self.ingest_document_report(&key, &canonical_path, report),
1067                Err(err) => {
1068                    if let Some(result) = self.cache_post_initialize_exit(&key, &err) {
1069                        PullFileOutcome::RequestFailed {
1070                            reason: server_attempt_result_reason(&result),
1071                        }
1072                    } else if recoverable_pull_rejection(&err)
1073                        && self.clients.get(&key).is_some_and(|client| {
1074                            matches!(
1075                                client.state(),
1076                                ServerState::Ready | ServerState::Initializing
1077                            )
1078                        })
1079                    {
1080                        PullFileOutcome::RequestFailed {
1081                            reason: format!("pull_rejected_push_fallback: {err}"),
1082                        }
1083                    } else {
1084                        PullFileOutcome::RequestFailed {
1085                            reason: err.to_string(),
1086                        }
1087                    }
1088                }
1089            };
1090
1091            results.push(PullFileResult {
1092                server_key: key,
1093                outcome,
1094            });
1095        }
1096
1097        Ok(results)
1098    }
1099
1100    /// Issue a `workspace/diagnostic` request to a specific server. Cancels
1101    /// internally if `timeout` elapses before the server responds. Cached
1102    /// entries from the response are stored so directory-mode queries pick
1103    /// them up.
1104    pub fn pull_workspace_diagnostics(
1105        &mut self,
1106        server_key: &ServerKey,
1107        timeout: Option<std::time::Duration>,
1108    ) -> Result<PullWorkspaceResult, LspError> {
1109        let timeout = timeout.unwrap_or(Self::PULL_WORKSPACE_TIMEOUT);
1110
1111        let supports_workspace = self
1112            .clients
1113            .get(server_key)
1114            .and_then(|c| c.diagnostic_capabilities())
1115            .is_some_and(|caps| caps.workspace_diagnostics);
1116
1117        if !supports_workspace {
1118            return Ok(PullWorkspaceResult {
1119                server_key: server_key.clone(),
1120                files_reported: Vec::new(),
1121                complete: false,
1122                cancelled: false,
1123                supports_workspace: false,
1124            });
1125        }
1126
1127        let identifier = self
1128            .clients
1129            .get(server_key)
1130            .and_then(|c| c.diagnostic_capabilities())
1131            .and_then(|caps| caps.identifier.clone());
1132
1133        let params = AftWorkspaceDiagnosticParams {
1134            identifier,
1135            previous_result_ids: Vec::new(),
1136            work_done_progress_params: Default::default(),
1137            partial_result_params: Default::default(),
1138        };
1139
1140        let result = match self
1141            .clients
1142            .get_mut(server_key)
1143            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?
1144            .send_request_with_timeout::<AftWorkspaceDiagnosticRequest>(params, timeout)
1145        {
1146            Ok(result) => result,
1147            Err(LspError::Timeout(_)) => {
1148                return Ok(PullWorkspaceResult {
1149                    server_key: server_key.clone(),
1150                    files_reported: Vec::new(),
1151                    complete: false,
1152                    cancelled: true,
1153                    supports_workspace: true,
1154                });
1155            }
1156            Err(err) => {
1157                if let Some(result) = self.cache_post_initialize_exit(server_key, &err) {
1158                    return Err(LspError::ServerNotReady(server_attempt_result_reason(
1159                        &result,
1160                    )));
1161                }
1162                return Err(err);
1163            }
1164        };
1165
1166        // Extract the items list. Partial responses are not a complete
1167        // workspace view, but the partial payload can still contain useful
1168        // document reports; ingest those while surfacing complete=false.
1169        let (items, complete) = match result {
1170            lsp_types::WorkspaceDiagnosticReportResult::Report(report) => (report.items, true),
1171            lsp_types::WorkspaceDiagnosticReportResult::Partial(partial) => (partial.items, false),
1172        };
1173
1174        // Ingest each file report into the diagnostics store.
1175        let mut files_reported = Vec::with_capacity(items.len());
1176        for item in items {
1177            match item {
1178                lsp_types::WorkspaceDocumentDiagnosticReport::Full(full) => {
1179                    if let Some(file) = uri_to_path(&full.uri) {
1180                        let stored = from_lsp_diagnostics(
1181                            file.clone(),
1182                            full.full_document_diagnostic_report.items.clone(),
1183                        );
1184                        self.diagnostics.publish_with_result_id(
1185                            server_key.clone(),
1186                            file.clone(),
1187                            stored,
1188                            full.full_document_diagnostic_report.result_id.clone(),
1189                        );
1190                        files_reported.push(file);
1191                    }
1192                }
1193                lsp_types::WorkspaceDocumentDiagnosticReport::Unchanged(_unchanged) => {
1194                    // "Unchanged" means the previously cached report is still
1195                    // valid. We left it in place; nothing to do.
1196                }
1197            }
1198        }
1199
1200        Ok(PullWorkspaceResult {
1201            server_key: server_key.clone(),
1202            files_reported,
1203            complete,
1204            cancelled: false,
1205            supports_workspace: true,
1206        })
1207    }
1208
1209    fn cache_post_initialize_exit(
1210        &mut self,
1211        key: &ServerKey,
1212        err: &LspError,
1213    ) -> Option<ServerAttemptResult> {
1214        let binary = self
1215            .server_binaries
1216            .get(key)
1217            .cloned()
1218            .unwrap_or_else(|| key.kind.id_str().to_string());
1219        let (status, stderr_tail) = {
1220            let client = self.clients.get_mut(key)?;
1221            let mut status = client.child_exit_status();
1222            for _ in 0..10 {
1223                if status.is_some() {
1224                    break;
1225                }
1226                std::thread::sleep(std::time::Duration::from_millis(10));
1227                status = client.child_exit_status();
1228            }
1229            let status = status?;
1230            wait_for_stderr_tail(client);
1231            (status, client.stderr_tail())
1232        };
1233        let reason = format_post_initialize_exit_reason(&binary, status, &stderr_tail, err);
1234        let result = ServerAttemptResult::SpawnFailed { binary, reason };
1235        self.clients.remove(key);
1236        self.server_binaries.remove(key);
1237        self.documents.remove(key);
1238        self.diagnostics.clear_for_server(key);
1239        self.failed_spawns.insert(key.clone(), result.clone());
1240        Some(result)
1241    }
1242
1243    /// Issue the per-file diagnostic request and return the report.
1244    fn send_pull_request(
1245        &mut self,
1246        key: &ServerKey,
1247        params: AftDocumentDiagnosticParams,
1248    ) -> Result<lsp_types::DocumentDiagnosticReportResult, LspError> {
1249        let client = self
1250            .clients
1251            .get_mut(key)
1252            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?;
1253        client.send_request::<AftDocumentDiagnosticRequest>(params)
1254    }
1255
1256    /// Store the result of a per-file pull request and return a structured
1257    /// outcome the caller can inspect.
1258    fn ingest_document_report(
1259        &mut self,
1260        key: &ServerKey,
1261        canonical_path: &Path,
1262        result: lsp_types::DocumentDiagnosticReportResult,
1263    ) -> PullFileOutcome {
1264        let report = match result {
1265            lsp_types::DocumentDiagnosticReportResult::Report(report) => report,
1266            lsp_types::DocumentDiagnosticReportResult::Partial(_) => {
1267                // Partial results stream in via $/progress notifications which
1268                // we don't currently subscribe to. Treat as a soft-empty
1269                // success — the next pull will get the full version.
1270                return PullFileOutcome::PartialNotSupported;
1271            }
1272        };
1273
1274        match report {
1275            lsp_types::DocumentDiagnosticReport::Full(full) => {
1276                let result_id = full.full_document_diagnostic_report.result_id.clone();
1277                let stored = from_lsp_diagnostics(
1278                    canonical_path.to_path_buf(),
1279                    full.full_document_diagnostic_report.items.clone(),
1280                );
1281                let count = stored.len();
1282                self.diagnostics.publish_with_result_id(
1283                    key.clone(),
1284                    canonical_path.to_path_buf(),
1285                    stored,
1286                    result_id,
1287                );
1288                PullFileOutcome::Full {
1289                    diagnostic_count: count,
1290                }
1291            }
1292            lsp_types::DocumentDiagnosticReport::Unchanged(_unchanged) => {
1293                // The server says cache is still valid. That is only usable if
1294                // we already have a report for this exact server/file; an
1295                // initial `unchanged` response cannot prove freshness.
1296                if self
1297                    .diagnostics
1298                    .has_report_for_server_file(key, canonical_path)
1299                {
1300                    PullFileOutcome::Unchanged
1301                } else {
1302                    PullFileOutcome::RequestFailed {
1303                        reason: "no_cache_for_unchanged".to_string(),
1304                    }
1305                }
1306            }
1307        }
1308    }
1309
1310    /// Shutdown all servers gracefully.
1311    pub fn shutdown_all(&mut self) {
1312        for (key, mut client) in self.clients.drain() {
1313            if let Err(err) = client.shutdown() {
1314                slog_error!("error shutting down {:?}: {}", key, err);
1315            }
1316        }
1317        self.server_binaries.clear();
1318        self.documents.clear();
1319        self.diagnostics = DiagnosticsStore::new();
1320    }
1321
1322    /// Check if any server is active.
1323    pub fn has_active_servers(&self) -> bool {
1324        self.clients
1325            .values()
1326            .any(|client| client.state() == ServerState::Ready)
1327    }
1328
1329    /// Active server keys (running clients). Used by `lsp_diagnostics`
1330    /// directory mode to know which servers to ask for workspace pull.
1331    pub fn active_server_keys(&self) -> Vec<ServerKey> {
1332        self.clients.keys().cloned().collect()
1333    }
1334
1335    pub fn get_diagnostics_for_file(&self, file: &Path) -> Vec<&StoredDiagnostic> {
1336        let normalized = normalize_lookup_path(file);
1337        self.diagnostics.for_file(&normalized)
1338    }
1339
1340    pub fn get_diagnostics_for_directory(&self, dir: &Path) -> Vec<&StoredDiagnostic> {
1341        let normalized = normalize_lookup_path(dir);
1342        self.diagnostics.for_directory(&normalized)
1343    }
1344
1345    pub fn get_all_diagnostics(&self) -> Vec<&StoredDiagnostic> {
1346        self.diagnostics.all()
1347    }
1348
1349    fn drain_events_for_file(&mut self, file_path: &Path) -> bool {
1350        let mut saw_file_diagnostics = false;
1351        while let Ok(event) = self.event_rx.try_recv() {
1352            if matches!(
1353                self.handle_event(&event),
1354                Some(ref published_file) if published_file.as_path() == file_path
1355            ) {
1356                saw_file_diagnostics = true;
1357            }
1358        }
1359        saw_file_diagnostics
1360    }
1361
1362    fn handle_event(&mut self, event: &LspEvent) -> Option<PathBuf> {
1363        match event {
1364            LspEvent::Notification {
1365                server_kind,
1366                root,
1367                method,
1368                params: Some(params),
1369            } if method == "textDocument/publishDiagnostics" => {
1370                self.handle_publish_diagnostics(server_kind.clone(), root.clone(), params)
1371            }
1372            LspEvent::ServerExited { server_kind, root } => {
1373                let key = ServerKey {
1374                    kind: server_kind.clone(),
1375                    root: root.clone(),
1376                };
1377                self.clients.remove(&key);
1378                self.server_binaries.remove(&key);
1379                self.documents.remove(&key);
1380                self.diagnostics.clear_for_server(&key);
1381                None
1382            }
1383            _ => None,
1384        }
1385    }
1386
1387    fn handle_publish_diagnostics(
1388        &mut self,
1389        server: ServerKind,
1390        root: PathBuf,
1391        params: &serde_json::Value,
1392    ) -> Option<PathBuf> {
1393        if let Ok(publish_params) =
1394            serde_json::from_value::<lsp_types::PublishDiagnosticsParams>(params.clone())
1395        {
1396            let file = uri_to_path(&publish_params.uri)?;
1397            let stored = from_lsp_diagnostics(file.clone(), publish_params.diagnostics);
1398            // v0.17.3: store with real ServerKey { kind, root } and capture
1399            // the document `version` (when the server provided one) so the
1400            // post-edit waiter can reject stale publishes deterministically
1401            // via version-match (preferred) or epoch-delta (fallback). The
1402            // earlier `publish_with_kind` path silently dropped both.
1403            let key = ServerKey { kind: server, root };
1404            self.diagnostics
1405                .publish_full(key, file.clone(), stored, None, publish_params.version);
1406            return Some(file);
1407        }
1408        None
1409    }
1410
1411    fn spawn_server(
1412        &self,
1413        def: &ServerDef,
1414        root: &Path,
1415        config: &Config,
1416    ) -> Result<LspClient, LspError> {
1417        let binary = self.resolve_binary(def, config)?;
1418
1419        // Merge the server-defined env with our test-injected env.
1420        // `extra_env` is empty in production; tests use it to drive fake
1421        // server variants (AFT_FAKE_LSP_PULL=1, etc.).
1422        let mut merged_env = def.env.clone();
1423        for (key, value) in &self.extra_env {
1424            merged_env.insert(key.clone(), value.clone());
1425        }
1426
1427        let mut client = LspClient::spawn(
1428            def.kind.clone(),
1429            root.to_path_buf(),
1430            &binary,
1431            &def.args,
1432            &merged_env,
1433            self.event_tx.clone(),
1434            self.child_registry.clone(),
1435        )?;
1436        if let Err(err) = client.initialize(root, def.initialization_options.clone()) {
1437            wait_for_stderr_tail(&mut client);
1438            let stderr_tail = client.stderr_tail();
1439            let reason = if client.child_exited() || !stderr_tail.is_empty() {
1440                format_initialize_failure_reason(&def.binary, &stderr_tail, &err)
1441            } else {
1442                format!("server failed during initialize: {err}")
1443            };
1444            return Err(LspError::ServerNotReady(reason));
1445        }
1446        Ok(client)
1447    }
1448
1449    fn resolve_binary(&self, def: &ServerDef, config: &Config) -> Result<PathBuf, LspError> {
1450        if let Some(path) = self.binary_overrides.get(&def.kind) {
1451            if path.exists() {
1452                return Ok(path.clone());
1453            }
1454            return Err(LspError::NotFound(format!(
1455                "override binary for {:?} not found: {}",
1456                def.kind,
1457                path.display()
1458            )));
1459        }
1460
1461        if let Some(path) = env_binary_override(&def.kind) {
1462            if path.exists() {
1463                return Ok(path);
1464            }
1465            return Err(LspError::NotFound(format!(
1466                "environment override binary for {:?} not found: {}",
1467                def.kind,
1468                path.display()
1469            )));
1470        }
1471
1472        // Layered resolution:
1473        //   1. <project_root>/node_modules/.bin/<binary>
1474        //   2. config.lsp_paths_extra (plugin auto-install cache, etc.)
1475        //   3. PATH via `which`
1476        resolve_lsp_binary(
1477            &def.binary,
1478            config.project_root.as_deref(),
1479            &config.lsp_paths_extra,
1480        )
1481        .ok_or_else(|| {
1482            LspError::NotFound(format!(
1483                "language server binary '{}' not found in node_modules/.bin, lsp_paths_extra, or PATH",
1484                def.binary
1485            ))
1486        })
1487    }
1488
1489    fn server_key_for_file(&self, file_path: &Path, config: &Config) -> Option<ServerKey> {
1490        for def in servers_for_file(file_path, config) {
1491            let root = find_workspace_root(file_path, &def.root_markers)?;
1492            let key = ServerKey {
1493                kind: def.kind.clone(),
1494                root,
1495            };
1496            if self.clients.contains_key(&key) {
1497                return Some(key);
1498            }
1499        }
1500        None
1501    }
1502}
1503
1504impl Default for LspManager {
1505    fn default() -> Self {
1506        Self::new()
1507    }
1508}
1509
1510fn wait_for_stderr_tail(client: &mut LspClient) {
1511    for _ in 0..10 {
1512        if !client.stderr_tail().is_empty() {
1513            break;
1514        }
1515        std::thread::sleep(std::time::Duration::from_millis(10));
1516    }
1517}
1518
1519fn recoverable_pull_rejection(err: &LspError) -> bool {
1520    matches!(
1521        err,
1522        LspError::ServerError {
1523            code: -32601 | -32602,
1524            ..
1525        }
1526    )
1527}
1528
1529fn server_attempt_result_reason(result: &ServerAttemptResult) -> String {
1530    match result {
1531        ServerAttemptResult::SpawnFailed { binary, reason } => {
1532            format!("spawn_failed: {binary} ({reason})")
1533        }
1534        ServerAttemptResult::BinaryNotInstalled { binary } => {
1535            format!("binary_not_installed: {binary}")
1536        }
1537        ServerAttemptResult::NoRootMarker { looked_for } => {
1538            format!("no_root_marker (looked for: {})", looked_for.join(", "))
1539        }
1540        ServerAttemptResult::Ok { .. } => "ok".to_string(),
1541    }
1542}
1543
1544fn format_stderr_tail_for_reason(stderr_tail: &str) -> String {
1545    truncate_stderr_tail_for_reason(stderr_tail)
1546        .lines()
1547        .map(|line| format!("  {line}"))
1548        .collect::<Vec<_>>()
1549        .join("\n")
1550}
1551
1552fn truncate_stderr_tail_for_reason(stderr_tail: &str) -> String {
1553    if stderr_tail.len() <= STDERR_REASON_BYTES {
1554        return stderr_tail.to_string();
1555    }
1556
1557    let ellipsis = "...";
1558    let target_len = STDERR_REASON_BYTES.saturating_sub(ellipsis.len());
1559    let mut start = stderr_tail.len() - target_len;
1560    while start < stderr_tail.len() && !stderr_tail.is_char_boundary(start) {
1561        start += 1;
1562    }
1563    format!("{ellipsis}{}", &stderr_tail[start..])
1564}
1565
1566fn format_initialize_failure_reason(binary: &str, stderr_tail: &str, err: &LspError) -> String {
1567    let mut reason = format!("server crashed during initialize: {err}");
1568    if !stderr_tail.is_empty() {
1569        reason.push_str("; stderr (last 64 lines):\n");
1570        reason.push_str(&format_stderr_tail_for_reason(stderr_tail));
1571        reason.push_str("\n\n");
1572        reason.push_str(&failure_hint(binary, stderr_tail));
1573    }
1574    reason
1575}
1576
1577fn format_post_initialize_exit_reason(
1578    binary: &str,
1579    status: std::process::ExitStatus,
1580    stderr_tail: &str,
1581    err: &LspError,
1582) -> String {
1583    let code = status
1584        .code()
1585        .map(|c| c.to_string())
1586        .unwrap_or_else(|| "signal/unknown".to_string());
1587    let mut reason = format!("server exited after initialize (code {code}): {err}");
1588    if !stderr_tail.is_empty() {
1589        reason.push_str("; stderr (last 64 lines):\n");
1590        reason.push_str(&format_stderr_tail_for_reason(stderr_tail));
1591        reason.push_str("\n\n");
1592        reason.push_str(&failure_hint(binary, stderr_tail));
1593    }
1594    reason
1595}
1596
1597fn failure_hint(binary: &str, stderr_tail: &str) -> String {
1598    if stderr_tail.contains("MODULE_NOT_FOUND") || stderr_tail.contains("Cannot find module") {
1599        let package_manager = infer_package_manager(stderr_tail);
1600        format!(
1601            "Your package-manager shim resolves to a missing file. Try reinstalling: {package_manager} install -g {binary} --force. Common cause: hard-link breakage from fs migration or store prune."
1602        )
1603    } else {
1604        format!("Hint: see stderr above for '{binary}' failure details.")
1605    }
1606}
1607
1608fn infer_package_manager(stderr_tail: &str) -> &'static str {
1609    let lower = stderr_tail.to_ascii_lowercase();
1610    if lower.contains(".pnpm/") || lower.contains(".pnpm\\") || lower.contains("/pnpm/") {
1611        "pnpm"
1612    } else if lower.contains(".yarn/")
1613        || lower.contains(".yarn\\")
1614        || lower.contains("/yarn/")
1615        || lower.contains("yarn")
1616    {
1617        "yarn"
1618    } else {
1619        "npm"
1620    }
1621}
1622
1623fn canonicalize_for_lsp(file_path: &Path) -> Result<PathBuf, LspError> {
1624    std::fs::canonicalize(file_path).map_err(LspError::from)
1625}
1626
1627fn resolve_for_lsp_uri(file_path: &Path) -> PathBuf {
1628    if let Ok(path) = std::fs::canonicalize(file_path) {
1629        return path;
1630    }
1631
1632    let mut existing = file_path.to_path_buf();
1633    let mut missing = Vec::new();
1634    while !existing.exists() {
1635        let Some(name) = existing.file_name() else {
1636            break;
1637        };
1638        missing.push(name.to_owned());
1639        let Some(parent) = existing.parent() else {
1640            break;
1641        };
1642        existing = parent.to_path_buf();
1643    }
1644
1645    let mut resolved = std::fs::canonicalize(&existing).unwrap_or(existing);
1646    for segment in missing.into_iter().rev() {
1647        resolved.push(segment);
1648    }
1649    resolved
1650}
1651
1652fn language_id_for_extension(ext: &str) -> &'static str {
1653    match ext {
1654        "ts" => "typescript",
1655        "tsx" => "typescriptreact",
1656        "js" | "mjs" | "cjs" => "javascript",
1657        "jsx" => "javascriptreact",
1658        "py" | "pyi" => "python",
1659        "rs" => "rust",
1660        "go" => "go",
1661        "html" | "htm" => "html",
1662        _ => "plaintext",
1663    }
1664}
1665
1666fn normalize_lookup_path(path: &Path) -> PathBuf {
1667    std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1668}
1669
1670/// Classify an error returned by `spawn_server` into a structured
1671/// `ServerAttemptResult`. The two interesting cases for callers are:
1672/// - `BinaryNotInstalled` — the server's binary couldn't be resolved on PATH
1673///   or via override. The agent can be told "install bash-language-server".
1674/// - `SpawnFailed` — binary was found but spawning/initializing failed
1675///   (permissions, missing runtime, server crashed during initialize, etc.).
1676fn classify_spawn_error(binary: &str, err: &LspError) -> ServerAttemptResult {
1677    match err {
1678        // resolve_binary returns NotFound for both missing override paths and
1679        // missing PATH binaries. The "override missing" case is rare in
1680        // practice (only set in tests / env vars); we report all NotFound as
1681        // BinaryNotInstalled so the user sees an actionable install hint.
1682        LspError::NotFound(_) => ServerAttemptResult::BinaryNotInstalled {
1683            binary: binary.to_string(),
1684        },
1685        other => ServerAttemptResult::SpawnFailed {
1686            binary: binary.to_string(),
1687            reason: other.to_string(),
1688        },
1689    }
1690}
1691
1692fn env_binary_override(kind: &ServerKind) -> Option<PathBuf> {
1693    let id = kind.id_str();
1694    let suffix: String = id
1695        .chars()
1696        .map(|ch| {
1697            if ch.is_ascii_alphanumeric() {
1698                ch.to_ascii_uppercase()
1699            } else {
1700                '_'
1701            }
1702        })
1703        .collect();
1704    let key = format!("AFT_LSP_{suffix}_BINARY");
1705    std::env::var_os(key).map(PathBuf::from)
1706}