Skip to main content

aft/lsp/
manager.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::str::FromStr;
4
5use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
6use lsp_types::notification::{DidChangeTextDocument, DidCloseTextDocument, DidOpenTextDocument};
7use lsp_types::{
8    DidChangeTextDocumentParams, DidCloseTextDocumentParams, DidOpenTextDocumentParams,
9    TextDocumentContentChangeEvent, TextDocumentIdentifier, TextDocumentItem,
10    VersionedTextDocumentIdentifier,
11};
12
13use crate::config::Config;
14use crate::lsp::client::{LspClient, LspEvent, ServerState};
15use crate::lsp::diagnostics::{from_lsp_diagnostics, DiagnosticsStore, StoredDiagnostic};
16use crate::lsp::document::DocumentStore;
17use crate::lsp::registry::{resolve_lsp_binary, servers_for_file, ServerDef, ServerKind};
18use crate::lsp::roots::{find_workspace_root, ServerKey};
19use crate::lsp::LspError;
20
21/// Outcome of attempting to ensure a server is running for a single matching
22/// `ServerDef`. Returned per matching server so the caller can report exactly
23/// what happened to the user instead of collapsing all failures into "no
24/// server".
25#[derive(Debug, Clone)]
26pub enum ServerAttemptResult {
27    /// Server is running and ready to serve requests for this file.
28    Ok { server_key: ServerKey },
29    /// No workspace root was found by walking up from the file looking for
30    /// any of the server's configured root markers.
31    NoRootMarker { looked_for: Vec<String> },
32    /// The server's binary could not be found on PATH (or override was
33    /// missing/invalid).
34    BinaryNotInstalled { binary: String },
35    /// Binary was found but spawning or initializing the server failed.
36    SpawnFailed { binary: String, reason: String },
37}
38
39/// One server's attempt to handle a file.
40#[derive(Debug, Clone)]
41pub struct ServerAttempt {
42    /// Stable server identifier (kind ID, e.g. "pyright", "rust-analyzer").
43    pub server_id: String,
44    /// Server display name from the registry.
45    pub server_name: String,
46    pub result: ServerAttemptResult,
47}
48
49/// Aggregate outcome of `ensure_server_for_file_detailed`. Distinguishes:
50/// - "No server registered for this file's extension" (`attempts.is_empty()`)
51/// - "Servers registered but none could start" (`successful.is_empty()` but
52///   `!attempts.is_empty()`)
53/// - "At least one server is ready" (`!successful.is_empty()`)
54#[derive(Debug, Clone, Default)]
55pub struct EnsureServerOutcomes {
56    /// Server keys that are now running and ready to serve requests.
57    pub successful: Vec<ServerKey>,
58    /// Per-server attempt records. Empty if no server is registered for the
59    /// file's extension.
60    pub attempts: Vec<ServerAttempt>,
61}
62
63impl EnsureServerOutcomes {
64    /// True if no server in the registry matched this file's extension.
65    pub fn no_server_registered(&self) -> bool {
66        self.attempts.is_empty()
67    }
68}
69
70/// Outcome of a post-edit diagnostics wait. Reports the per-server status
71/// alongside the fresh diagnostics, so the response layer can build an
72/// honest tri-state payload (`success: true` + `complete: bool` + named
73/// gap fields per `crates/aft/src/protocol.rs`).
74///
75/// `diagnostics` only contains entries from servers that proved freshness
76/// (version-match preferred, epoch-fallback for unversioned servers).
77/// Pre-edit cached entries are NEVER included — that's the whole point of
78/// this type.
79#[derive(Debug, Clone, Default)]
80pub struct PostEditWaitOutcome {
81    /// Diagnostics from servers whose response we verified is FOR the
82    /// post-edit document version (or whose epoch we saw advance after our
83    /// pre-edit snapshot, for unversioned servers).
84    pub diagnostics: Vec<StoredDiagnostic>,
85    /// Servers we expected to publish but didn't before the deadline.
86    /// Reported to the agent via `pending_lsp_servers` so they understand
87    /// the result is partial.
88    pub pending_servers: Vec<ServerKey>,
89    /// Servers whose process exited between notification and deadline.
90    /// Reported separately so the agent knows the gap is unrecoverable
91    /// without a server restart, not "wait longer."
92    pub exited_servers: Vec<ServerKey>,
93}
94
95impl PostEditWaitOutcome {
96    /// True if every expected server reported a fresh result. False means
97    /// the agent should treat the diagnostics as a partial picture.
98    pub fn complete(&self) -> bool {
99        self.pending_servers.is_empty() && self.exited_servers.is_empty()
100    }
101}
102
103/// Per-server outcome of a `textDocument/diagnostic` (per-file pull) request.
104#[derive(Debug, Clone)]
105pub enum PullFileOutcome {
106    /// Server returned a full report; diagnostics stored.
107    Full { diagnostic_count: usize },
108    /// Server returned `kind: "unchanged"` — cached diagnostics still valid.
109    Unchanged,
110    /// Server returned a partial-result token; we don't subscribe to streamed
111    /// progress so the response is treated as a soft empty until the next pull.
112    PartialNotSupported,
113    /// Server doesn't advertise pull capability — caller should fall back to
114    /// push diagnostics for this server.
115    PullNotSupported,
116    /// The pull request failed (timeout, server error, etc.).
117    RequestFailed { reason: String },
118}
119
120/// Result of `pull_file_diagnostics` for one matching server.
121#[derive(Debug, Clone)]
122pub struct PullFileResult {
123    pub server_key: ServerKey,
124    pub outcome: PullFileOutcome,
125}
126
127/// Result of `pull_workspace_diagnostics` for a single server.
128#[derive(Debug, Clone)]
129pub struct PullWorkspaceResult {
130    pub server_key: ServerKey,
131    /// Files for which a Full report was received and cached. Files that came
132    /// back as `Unchanged` are NOT listed here because their cached entry was
133    /// already authoritative.
134    pub files_reported: Vec<PathBuf>,
135    /// True if the server returned a full response within the timeout.
136    pub complete: bool,
137    /// True if we cancelled (request timed out before the server responded).
138    pub cancelled: bool,
139    /// True if the server advertised workspace pull support. When false, the
140    /// other fields are empty and the caller should fall back to file-mode
141    /// pull or to push semantics.
142    pub supports_workspace: bool,
143}
144
145pub struct LspManager {
146    /// Active server instances, keyed by (ServerKind, workspace_root).
147    clients: HashMap<ServerKey, LspClient>,
148    /// Tracks opened documents and versions per active server.
149    documents: HashMap<ServerKey, DocumentStore>,
150    /// Stored publishDiagnostics payloads across all servers.
151    diagnostics: DiagnosticsStore,
152    /// Unified event channel — all server reader threads send here.
153    event_tx: Sender<LspEvent>,
154    event_rx: Receiver<LspEvent>,
155    /// Optional binary path overrides used by integration tests.
156    binary_overrides: HashMap<ServerKind, PathBuf>,
157    /// Extra env vars merged into every spawned LSP child. Used in tests to
158    /// drive the fake server's behavioral variants (`AFT_FAKE_LSP_PULL=1`,
159    /// `AFT_FAKE_LSP_WORKSPACE=1`, etc.). Production code does not set this.
160    extra_env: HashMap<String, String>,
161}
162
163impl LspManager {
164    pub fn new() -> Self {
165        let (event_tx, event_rx) = unbounded();
166        Self {
167            clients: HashMap::new(),
168            documents: HashMap::new(),
169            diagnostics: DiagnosticsStore::new(),
170            event_tx,
171            event_rx,
172            binary_overrides: HashMap::new(),
173            extra_env: HashMap::new(),
174        }
175    }
176
177    /// For testing: set an extra environment variable that gets passed to
178    /// every spawned LSP child process. Useful for driving fake-server
179    /// behavioral variants in integration tests.
180    pub fn set_extra_env(&mut self, key: &str, value: &str) {
181        self.extra_env.insert(key.to_string(), value.to_string());
182    }
183
184    /// Count active LSP server instances.
185    pub fn server_count(&self) -> usize {
186        self.clients.len()
187    }
188
189    /// For testing: override the binary for a server kind.
190    pub fn override_binary(&mut self, kind: ServerKind, binary_path: PathBuf) {
191        self.binary_overrides.insert(kind, binary_path);
192    }
193
194    /// Ensure a server is running for the given file. Spawns if needed.
195    /// Returns the active server keys for the file, or an empty vec if none match.
196    ///
197    /// This is the lightweight wrapper around [`ensure_server_for_file_detailed`]
198    /// that drops failure context. Prefer the detailed variant in command
199    /// handlers that need to surface honest error messages to the agent.
200    pub fn ensure_server_for_file(&mut self, file_path: &Path, config: &Config) -> Vec<ServerKey> {
201        self.ensure_server_for_file_detailed(file_path, config)
202            .successful
203    }
204
205    /// Detailed version of [`ensure_server_for_file`] that records every
206    /// matching server's outcome (`Ok` / `NoRootMarker` / `BinaryNotInstalled`
207    /// / `SpawnFailed`).
208    ///
209    /// Use this when the caller wants to honestly report _why_ a file has no
210    /// active server (e.g., to surface "bash-language-server not on PATH" to
211    /// the agent instead of silently returning `total: 0`).
212    pub fn ensure_server_for_file_detailed(
213        &mut self,
214        file_path: &Path,
215        config: &Config,
216    ) -> EnsureServerOutcomes {
217        let defs = servers_for_file(file_path, config);
218        let mut outcomes = EnsureServerOutcomes::default();
219
220        for def in defs {
221            let server_id = def.kind.id_str().to_string();
222            let server_name = def.name.to_string();
223
224            let Some(root) = find_workspace_root(file_path, &def.root_markers) else {
225                outcomes.attempts.push(ServerAttempt {
226                    server_id,
227                    server_name,
228                    result: ServerAttemptResult::NoRootMarker {
229                        looked_for: def.root_markers.iter().map(|s| s.to_string()).collect(),
230                    },
231                });
232                continue;
233            };
234
235            let key = ServerKey {
236                kind: def.kind.clone(),
237                root,
238            };
239
240            if !self.clients.contains_key(&key) {
241                match self.spawn_server(&def, &key.root, config) {
242                    Ok(client) => {
243                        self.clients.insert(key.clone(), client);
244                        self.documents.entry(key.clone()).or_default();
245                    }
246                    Err(err) => {
247                        log::error!("failed to spawn {}: {}", def.name, err);
248                        let result = classify_spawn_error(&def.binary, &err);
249                        outcomes.attempts.push(ServerAttempt {
250                            server_id,
251                            server_name,
252                            result,
253                        });
254                        continue;
255                    }
256                }
257            }
258
259            outcomes.attempts.push(ServerAttempt {
260                server_id,
261                server_name,
262                result: ServerAttemptResult::Ok {
263                    server_key: key.clone(),
264                },
265            });
266            outcomes.successful.push(key);
267        }
268
269        outcomes
270    }
271
272    /// Ensure a server is running using the default LSP registry.
273    /// Kept for integration tests that exercise built-in server helpers directly.
274    pub fn ensure_server_for_file_default(&mut self, file_path: &Path) -> Vec<ServerKey> {
275        self.ensure_server_for_file(file_path, &Config::default())
276    }
277    /// Ensure that servers are running for the file and that the document is open
278    /// in each server's DocumentStore. Reads file content from disk if not already open.
279    /// Returns the server keys for the file.
280    pub fn ensure_file_open(
281        &mut self,
282        file_path: &Path,
283        config: &Config,
284    ) -> Result<Vec<ServerKey>, LspError> {
285        let canonical_path = canonicalize_for_lsp(file_path)?;
286        let server_keys = self.ensure_server_for_file(&canonical_path, config);
287        if server_keys.is_empty() {
288            return Ok(server_keys);
289        }
290
291        let uri = uri_for_path(&canonical_path)?;
292        let language_id = language_id_for_extension(
293            canonical_path
294                .extension()
295                .and_then(|ext| ext.to_str())
296                .unwrap_or_default(),
297        )
298        .to_string();
299
300        for key in &server_keys {
301            let already_open = self
302                .documents
303                .get(key)
304                .is_some_and(|store| store.is_open(&canonical_path));
305
306            if !already_open {
307                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
308                if let Some(client) = self.clients.get_mut(key) {
309                    client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
310                        text_document: TextDocumentItem::new(
311                            uri.clone(),
312                            language_id.clone(),
313                            0,
314                            content,
315                        ),
316                    })?;
317                }
318                self.documents
319                    .entry(key.clone())
320                    .or_default()
321                    .open(canonical_path.clone());
322                continue;
323            }
324
325            // Document is already open. Check disk drift — if the file has
326            // been modified outside the AFT pipeline (other tool, manual
327            // edit, sibling session) we MUST send a didChange before any
328            // pull-diagnostic / hover query, otherwise the LSP server
329            // returns results computed from stale in-memory content.
330            //
331            // This is the regression fix Oracle flagged in finding #6:
332            // "ensure_file_open skips already-open files without checking
333            // if disk content changed."
334            let drifted = self
335                .documents
336                .get(key)
337                .is_some_and(|store| store.is_stale_on_disk(&canonical_path));
338            if drifted {
339                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
340                let next_version = self
341                    .documents
342                    .get(key)
343                    .and_then(|store| store.version(&canonical_path))
344                    .map(|v| v + 1)
345                    .unwrap_or(1);
346                if let Some(client) = self.clients.get_mut(key) {
347                    client.send_notification::<DidChangeTextDocument>(
348                        DidChangeTextDocumentParams {
349                            text_document: VersionedTextDocumentIdentifier::new(
350                                uri.clone(),
351                                next_version,
352                            ),
353                            content_changes: vec![TextDocumentContentChangeEvent {
354                                range: None,
355                                range_length: None,
356                                text: content,
357                            }],
358                        },
359                    )?;
360                }
361                if let Some(store) = self.documents.get_mut(key) {
362                    store.bump_version(&canonical_path);
363                }
364            }
365        }
366
367        Ok(server_keys)
368    }
369
370    pub fn ensure_file_open_default(
371        &mut self,
372        file_path: &Path,
373    ) -> Result<Vec<ServerKey>, LspError> {
374        self.ensure_file_open(file_path, &Config::default())
375    }
376
377    /// Notify relevant LSP servers that a file has been written/changed.
378    /// This is the main hook called after every file write in AFT.
379    ///
380    /// If the file's server isn't running yet, starts it (lazy spawn).
381    /// If the file isn't open in LSP yet, sends didOpen. Otherwise sends didChange.
382    pub fn notify_file_changed(
383        &mut self,
384        file_path: &Path,
385        content: &str,
386        config: &Config,
387    ) -> Result<(), LspError> {
388        self.notify_file_changed_versioned(file_path, content, config)
389            .map(|_| ())
390    }
391
392    /// Like `notify_file_changed`, but returns the target document version
393    /// per server so the post-edit waiter can match `publishDiagnostics`
394    /// against the exact version that this notification carried.
395    ///
396    /// Returns: `Vec<(ServerKey, target_version)>`. `target_version` is the
397    /// `version` field on the `VersionedTextDocumentIdentifier` we just sent
398    /// (post-bump). For freshly-opened documents (`didOpen`) the version is
399    /// `0`. Servers that don't honor versioned text document sync will not
400    /// echo this back on `publishDiagnostics`; the caller is expected to
401    /// fall back to the epoch-delta path for those.
402    pub fn notify_file_changed_versioned(
403        &mut self,
404        file_path: &Path,
405        content: &str,
406        config: &Config,
407    ) -> Result<Vec<(ServerKey, i32)>, LspError> {
408        let canonical_path = canonicalize_for_lsp(file_path)?;
409        let server_keys = self.ensure_server_for_file(&canonical_path, config);
410        if server_keys.is_empty() {
411            return Ok(Vec::new());
412        }
413
414        let uri = uri_for_path(&canonical_path)?;
415        let language_id = language_id_for_extension(
416            canonical_path
417                .extension()
418                .and_then(|ext| ext.to_str())
419                .unwrap_or_default(),
420        )
421        .to_string();
422
423        let mut versions: Vec<(ServerKey, i32)> = Vec::with_capacity(server_keys.len());
424
425        for key in server_keys {
426            let current_version = self
427                .documents
428                .get(&key)
429                .and_then(|store| store.version(&canonical_path));
430
431            if let Some(version) = current_version {
432                let next_version = version + 1;
433                if let Some(client) = self.clients.get_mut(&key) {
434                    client.send_notification::<DidChangeTextDocument>(
435                        DidChangeTextDocumentParams {
436                            text_document: VersionedTextDocumentIdentifier::new(
437                                uri.clone(),
438                                next_version,
439                            ),
440                            content_changes: vec![TextDocumentContentChangeEvent {
441                                range: None,
442                                range_length: None,
443                                text: content.to_string(),
444                            }],
445                        },
446                    )?;
447                }
448                if let Some(store) = self.documents.get_mut(&key) {
449                    store.bump_version(&canonical_path);
450                }
451                versions.push((key, next_version));
452                continue;
453            }
454
455            if let Some(client) = self.clients.get_mut(&key) {
456                client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
457                    text_document: TextDocumentItem::new(
458                        uri.clone(),
459                        language_id.clone(),
460                        0,
461                        content.to_string(),
462                    ),
463                })?;
464            }
465            self.documents
466                .entry(key.clone())
467                .or_default()
468                .open(canonical_path.clone());
469            // didOpen carries version 0 — that's the version the server
470            // will echo on its first publishDiagnostics for this document.
471            versions.push((key, 0));
472        }
473
474        Ok(versions)
475    }
476
477    pub fn notify_file_changed_default(
478        &mut self,
479        file_path: &Path,
480        content: &str,
481    ) -> Result<(), LspError> {
482        self.notify_file_changed(file_path, content, &Config::default())
483    }
484
485    /// Close a document in all servers that have it open.
486    pub fn notify_file_closed(&mut self, file_path: &Path) -> Result<(), LspError> {
487        let canonical_path = canonicalize_for_lsp(file_path)?;
488        let uri = uri_for_path(&canonical_path)?;
489        let keys: Vec<ServerKey> = self.documents.keys().cloned().collect();
490
491        for key in keys {
492            let was_open = self
493                .documents
494                .get(&key)
495                .map(|store| store.is_open(&canonical_path))
496                .unwrap_or(false);
497            if !was_open {
498                continue;
499            }
500
501            if let Some(client) = self.clients.get_mut(&key) {
502                client.send_notification::<DidCloseTextDocument>(DidCloseTextDocumentParams {
503                    text_document: TextDocumentIdentifier::new(uri.clone()),
504                })?;
505            }
506
507            if let Some(store) = self.documents.get_mut(&key) {
508                store.close(&canonical_path);
509            }
510        }
511
512        Ok(())
513    }
514
515    /// Get an active client for a file path, if one exists.
516    pub fn client_for_file(&self, file_path: &Path, config: &Config) -> Option<&LspClient> {
517        let key = self.server_key_for_file(file_path, config)?;
518        self.clients.get(&key)
519    }
520
521    pub fn client_for_file_default(&self, file_path: &Path) -> Option<&LspClient> {
522        self.client_for_file(file_path, &Config::default())
523    }
524
525    /// Get a mutable active client for a file path, if one exists.
526    pub fn client_for_file_mut(
527        &mut self,
528        file_path: &Path,
529        config: &Config,
530    ) -> Option<&mut LspClient> {
531        let key = self.server_key_for_file(file_path, config)?;
532        self.clients.get_mut(&key)
533    }
534
535    pub fn client_for_file_mut_default(&mut self, file_path: &Path) -> Option<&mut LspClient> {
536        self.client_for_file_mut(file_path, &Config::default())
537    }
538
539    /// Number of tracked server clients.
540    pub fn active_client_count(&self) -> usize {
541        self.clients.len()
542    }
543
544    /// Drain all pending LSP events. Call from the main loop.
545    pub fn drain_events(&mut self) -> Vec<LspEvent> {
546        let mut events = Vec::new();
547        while let Ok(event) = self.event_rx.try_recv() {
548            self.handle_event(&event);
549            events.push(event);
550        }
551        events
552    }
553
554    /// Wait for diagnostics to arrive for a specific file until a timeout expires.
555    pub fn wait_for_diagnostics(
556        &mut self,
557        file_path: &Path,
558        config: &Config,
559        timeout: std::time::Duration,
560    ) -> Vec<StoredDiagnostic> {
561        let deadline = std::time::Instant::now() + timeout;
562        self.wait_for_file_diagnostics(file_path, config, deadline)
563    }
564
565    pub fn wait_for_diagnostics_default(
566        &mut self,
567        file_path: &Path,
568        timeout: std::time::Duration,
569    ) -> Vec<StoredDiagnostic> {
570        self.wait_for_diagnostics(file_path, &Config::default(), timeout)
571    }
572
573    /// Test-only accessor for the diagnostics store. Used by integration
574    /// tests that need to inspect per-server entries (e.g., to verify that
575    /// `ServerKey::root` is populated correctly, not the empty path that
576    /// the legacy `publish_with_kind` path produced).
577    #[doc(hidden)]
578    pub fn diagnostics_store_for_test(&self) -> &DiagnosticsStore {
579        &self.diagnostics
580    }
581
582    /// Snapshot the current per-server epoch for every entry that exists
583    /// for `file_path`. Servers without an entry yet (never published)
584    /// are absent from the map; for those, `pre = 0` (any first publish
585    /// will be considered fresh under the epoch-fallback rule).
586    pub fn snapshot_diagnostic_epochs(&self, file_path: &Path) -> HashMap<ServerKey, u64> {
587        let lookup_path = normalize_lookup_path(file_path);
588        self.diagnostics
589            .entries_for_file(&lookup_path)
590            .into_iter()
591            .map(|(key, entry)| (key.clone(), entry.epoch))
592            .collect()
593    }
594
595    /// Wait for FRESH per-server diagnostics that match the just-sent
596    /// document version. This is the v0.17.3 post-edit path that fixes the
597    /// stale-diagnostics bug: instead of returning whatever is in the cache
598    /// when the deadline hits, we only return entries whose `version`
599    /// matches the post-edit target version (or, for servers that don't
600    /// participate in versioned sync, whose `epoch` was bumped after the
601    /// pre-edit snapshot).
602    ///
603    /// `expected_versions` should come from `notify_file_changed_versioned`
604    /// — one `(ServerKey, target_version)` per server we sent didChange/
605    /// didOpen to.
606    ///
607    /// `pre_snapshot` is the per-server epoch BEFORE the notification was
608    /// sent; it gates the epoch-fallback path so an old-version publish
609    /// arriving after `drain_events` and before `didChange` cannot be
610    /// mistaken for a fresh response.
611    ///
612    /// Returns a per-server tri-state: `Fresh` (publish matched target
613    /// version OR epoch advanced past snapshot for an unversioned server),
614    /// `Pending` (deadline hit before this server published anything we
615    /// could verify), or `Exited` (server died between notification and
616    /// deadline).
617    pub fn wait_for_post_edit_diagnostics(
618        &mut self,
619        file_path: &Path,
620        // `config` is intentionally accepted (matches sibling wait APIs and
621        // future-proofs us if freshness rules need it). Currently unused
622        // because expected_versions/pre_snapshot fully determine behavior.
623        _config: &Config,
624        expected_versions: &[(ServerKey, i32)],
625        pre_snapshot: &HashMap<ServerKey, u64>,
626        timeout: std::time::Duration,
627    ) -> PostEditWaitOutcome {
628        let lookup_path = normalize_lookup_path(file_path);
629        let deadline = std::time::Instant::now() + timeout;
630
631        // Drain any events that arrived while we were sending didChange.
632        // The publishDiagnostics handler stores the version, so even
633        // pre-snapshot publishes that landed late won't be mistaken for
634        // fresh — the version-match check will reject them.
635        let _ = self.drain_events_for_file(&lookup_path);
636
637        let mut fresh: HashMap<ServerKey, Vec<StoredDiagnostic>> = HashMap::new();
638        let mut exited: Vec<ServerKey> = Vec::new();
639
640        loop {
641            // Check freshness for every expected server. A server is fresh
642            // if its current entry for this file satisfies either:
643            //   1. version-match: entry.version == Some(target_version), OR
644            //   2. epoch-fallback: entry.version is None AND
645            //      entry.epoch > pre_snapshot.get(&key).copied().unwrap_or(0)
646            // Servers whose process has exited are reported separately.
647            for (key, target_version) in expected_versions {
648                if fresh.contains_key(key) || exited.contains(key) {
649                    continue;
650                }
651                if !self.clients.contains_key(key) {
652                    exited.push(key.clone());
653                    continue;
654                }
655                if let Some(entry) = self
656                    .diagnostics
657                    .entries_for_file(&lookup_path)
658                    .into_iter()
659                    .find_map(|(k, e)| if k == key { Some(e) } else { None })
660                {
661                    let is_fresh = match entry.version {
662                        Some(v) => v == *target_version,
663                        None => {
664                            let pre = pre_snapshot.get(key).copied().unwrap_or(0);
665                            entry.epoch > pre
666                        }
667                    };
668                    if is_fresh {
669                        fresh.insert(key.clone(), entry.diagnostics.clone());
670                    }
671                }
672            }
673
674            // All accounted for? Done.
675            if fresh.len() + exited.len() == expected_versions.len() {
676                break;
677            }
678
679            let now = std::time::Instant::now();
680            if now >= deadline {
681                break;
682            }
683
684            let timeout = deadline.saturating_duration_since(now);
685            match self.event_rx.recv_timeout(timeout) {
686                Ok(event) => {
687                    self.handle_event(&event);
688                }
689                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
690            }
691        }
692
693        // Pending = expected but neither fresh nor exited.
694        let pending: Vec<ServerKey> = expected_versions
695            .iter()
696            .filter(|(k, _)| !fresh.contains_key(k) && !exited.contains(k))
697            .map(|(k, _)| k.clone())
698            .collect();
699
700        // Build deduplicated, sorted diagnostics from the fresh servers only.
701        // Stale or pending servers contribute zero diagnostics.
702        let mut diagnostics: Vec<StoredDiagnostic> = fresh
703            .into_iter()
704            .flat_map(|(_, diags)| diags.into_iter())
705            .collect();
706        diagnostics.sort_by(|a, b| {
707            a.file
708                .cmp(&b.file)
709                .then(a.line.cmp(&b.line))
710                .then(a.column.cmp(&b.column))
711                .then(a.message.cmp(&b.message))
712        });
713
714        PostEditWaitOutcome {
715            diagnostics,
716            pending_servers: pending,
717            exited_servers: exited,
718        }
719    }
720
721    /// Wait for diagnostics to arrive for a specific file until a deadline.
722    ///
723    /// Drains already-queued events first, then blocks on the shared event
724    /// channel only until either `publishDiagnostics` arrives for this file or
725    /// the deadline is reached.
726    pub fn wait_for_file_diagnostics(
727        &mut self,
728        file_path: &Path,
729        config: &Config,
730        deadline: std::time::Instant,
731    ) -> Vec<StoredDiagnostic> {
732        let lookup_path = normalize_lookup_path(file_path);
733
734        if self.server_key_for_file(&lookup_path, config).is_none() {
735            return Vec::new();
736        }
737
738        loop {
739            if self.drain_events_for_file(&lookup_path) {
740                break;
741            }
742
743            let now = std::time::Instant::now();
744            if now >= deadline {
745                break;
746            }
747
748            let timeout = deadline.saturating_duration_since(now);
749            match self.event_rx.recv_timeout(timeout) {
750                Ok(event) => {
751                    if matches!(
752                        self.handle_event(&event),
753                        Some(ref published_file) if published_file.as_path() == lookup_path.as_path()
754                    ) {
755                        break;
756                    }
757                }
758                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
759            }
760        }
761
762        self.get_diagnostics_for_file(&lookup_path)
763            .into_iter()
764            .cloned()
765            .collect()
766    }
767
768    /// Default timeout for `textDocument/diagnostic` (per-file pull). Servers
769    /// usually respond in under 1s for files they've already analyzed; we
770    /// allow up to 10s before falling back to push semantics. Currently
771    /// surfaced via [`Self::pull_file_timeout`] for callers that want to
772    /// override the wait via the `wait_ms` knob.
773    pub const PULL_FILE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
774
775    /// Public accessor so command handlers can reuse the documented default.
776    pub fn pull_file_timeout() -> std::time::Duration {
777        Self::PULL_FILE_TIMEOUT
778    }
779
780    /// Default timeout for `workspace/diagnostic`. The LSP spec allows the
781    /// server to hold this open indefinitely; we cap at 10s and report
782    /// `complete: false` to the agent rather than hanging the bridge.
783    const PULL_WORKSPACE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
784
785    /// Issue a `textDocument/diagnostic` (LSP 3.17 per-file pull) request to
786    /// every server that supports pull diagnostics for the given file.
787    ///
788    /// Returns the per-server outcome. If a server reports `kind: "unchanged"`,
789    /// the cached entry's diagnostics are surfaced (deterministic re-use of
790    /// the previous response). If a server doesn't advertise pull capability,
791    /// it's skipped here — the caller should fall back to push for those.
792    ///
793    /// Side effects: results are stored in `DiagnosticsStore` so directory-mode
794    /// queries can aggregate them later.
795    pub fn pull_file_diagnostics(
796        &mut self,
797        file_path: &Path,
798        config: &Config,
799    ) -> Result<Vec<PullFileResult>, LspError> {
800        let canonical_path = canonicalize_for_lsp(file_path)?;
801        // Make sure servers are running and the document is open with fresh
802        // content (handles disk-drift via DocumentStore::is_stale_on_disk).
803        self.ensure_file_open(&canonical_path, config)?;
804
805        let server_keys = self.ensure_server_for_file(&canonical_path, config);
806        if server_keys.is_empty() {
807            return Ok(Vec::new());
808        }
809
810        let uri = uri_for_path(&canonical_path)?;
811        let mut results = Vec::with_capacity(server_keys.len());
812
813        for key in server_keys {
814            let supports_pull = self
815                .clients
816                .get(&key)
817                .and_then(|c| c.diagnostic_capabilities())
818                .is_some_and(|caps| caps.pull_diagnostics);
819
820            if !supports_pull {
821                results.push(PullFileResult {
822                    server_key: key.clone(),
823                    outcome: PullFileOutcome::PullNotSupported,
824                });
825                continue;
826            }
827
828            // Look up previous resultId for incremental requests.
829            let previous_result_id = self
830                .diagnostics
831                .entries_for_file(&canonical_path)
832                .into_iter()
833                .find(|(k, _)| **k == key)
834                .and_then(|(_, entry)| entry.result_id.clone());
835
836            let identifier = self
837                .clients
838                .get(&key)
839                .and_then(|c| c.diagnostic_capabilities())
840                .and_then(|caps| caps.identifier.clone());
841
842            let params = lsp_types::DocumentDiagnosticParams {
843                text_document: lsp_types::TextDocumentIdentifier { uri: uri.clone() },
844                identifier,
845                previous_result_id,
846                work_done_progress_params: Default::default(),
847                partial_result_params: Default::default(),
848            };
849
850            let outcome = match self.send_pull_request(&key, params) {
851                Ok(report) => self.ingest_document_report(&key, &canonical_path, report),
852                Err(err) => PullFileOutcome::RequestFailed {
853                    reason: err.to_string(),
854                },
855            };
856
857            results.push(PullFileResult {
858                server_key: key,
859                outcome,
860            });
861        }
862
863        Ok(results)
864    }
865
866    /// Issue a `workspace/diagnostic` request to a specific server. Cancels
867    /// internally if `timeout` elapses before the server responds. Cached
868    /// entries from the response are stored so directory-mode queries pick
869    /// them up.
870    pub fn pull_workspace_diagnostics(
871        &mut self,
872        server_key: &ServerKey,
873        timeout: Option<std::time::Duration>,
874    ) -> Result<PullWorkspaceResult, LspError> {
875        let _timeout = timeout.unwrap_or(Self::PULL_WORKSPACE_TIMEOUT);
876
877        let supports_workspace = self
878            .clients
879            .get(server_key)
880            .and_then(|c| c.diagnostic_capabilities())
881            .is_some_and(|caps| caps.workspace_diagnostics);
882
883        if !supports_workspace {
884            return Ok(PullWorkspaceResult {
885                server_key: server_key.clone(),
886                files_reported: Vec::new(),
887                complete: false,
888                cancelled: false,
889                supports_workspace: false,
890            });
891        }
892
893        let identifier = self
894            .clients
895            .get(server_key)
896            .and_then(|c| c.diagnostic_capabilities())
897            .and_then(|caps| caps.identifier.clone());
898
899        let params = lsp_types::WorkspaceDiagnosticParams {
900            identifier,
901            previous_result_ids: Vec::new(),
902            work_done_progress_params: Default::default(),
903            partial_result_params: Default::default(),
904        };
905
906        // Note: LspClient::send_request currently uses a fixed REQUEST_TIMEOUT
907        // (30s, see client.rs). For workspace pull this is intentionally not
908        // overridden because servers like rust-analyzer may legitimately take
909        // many seconds on first request. The plugin bridge timeout (also 30s)
910        // is what we ultimately defer to. In a future revision we should plumb
911        // a custom timeout through send_request — for v0.16 we accept that
912        // workspace pull obeys the standard request timeout.
913        let result = match self
914            .clients
915            .get_mut(server_key)
916            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?
917            .send_request::<lsp_types::request::WorkspaceDiagnosticRequest>(params)
918        {
919            Ok(result) => result,
920            Err(LspError::Timeout(_)) => {
921                return Ok(PullWorkspaceResult {
922                    server_key: server_key.clone(),
923                    files_reported: Vec::new(),
924                    complete: false,
925                    cancelled: true,
926                    supports_workspace: true,
927                });
928            }
929            Err(err) => return Err(err),
930        };
931
932        // Extract the items list. Partial responses stream via $/progress
933        // notifications which we don't subscribe to — treat them as soft
934        // empty (caller will see complete: true with files_reported empty,
935        // matching "got a partial response, no full report").
936        let items = match result {
937            lsp_types::WorkspaceDiagnosticReportResult::Report(report) => report.items,
938            lsp_types::WorkspaceDiagnosticReportResult::Partial(_) => Vec::new(),
939        };
940
941        // Ingest each file report into the diagnostics store.
942        let mut files_reported = Vec::with_capacity(items.len());
943        for item in items {
944            match item {
945                lsp_types::WorkspaceDocumentDiagnosticReport::Full(full) => {
946                    if let Some(file) = uri_to_path(&full.uri) {
947                        let stored = from_lsp_diagnostics(
948                            file.clone(),
949                            full.full_document_diagnostic_report.items.clone(),
950                        );
951                        self.diagnostics.publish_with_result_id(
952                            server_key.clone(),
953                            file.clone(),
954                            stored,
955                            full.full_document_diagnostic_report.result_id.clone(),
956                        );
957                        files_reported.push(file);
958                    }
959                }
960                lsp_types::WorkspaceDocumentDiagnosticReport::Unchanged(_unchanged) => {
961                    // "Unchanged" means the previously cached report is still
962                    // valid. We left it in place; nothing to do.
963                }
964            }
965        }
966
967        Ok(PullWorkspaceResult {
968            server_key: server_key.clone(),
969            files_reported,
970            complete: true,
971            cancelled: false,
972            supports_workspace: true,
973        })
974    }
975
976    /// Issue the per-file diagnostic request and return the report.
977    fn send_pull_request(
978        &mut self,
979        key: &ServerKey,
980        params: lsp_types::DocumentDiagnosticParams,
981    ) -> Result<lsp_types::DocumentDiagnosticReportResult, LspError> {
982        let client = self
983            .clients
984            .get_mut(key)
985            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?;
986        client.send_request::<lsp_types::request::DocumentDiagnosticRequest>(params)
987    }
988
989    /// Store the result of a per-file pull request and return a structured
990    /// outcome the caller can inspect.
991    fn ingest_document_report(
992        &mut self,
993        key: &ServerKey,
994        canonical_path: &Path,
995        result: lsp_types::DocumentDiagnosticReportResult,
996    ) -> PullFileOutcome {
997        let report = match result {
998            lsp_types::DocumentDiagnosticReportResult::Report(report) => report,
999            lsp_types::DocumentDiagnosticReportResult::Partial(_) => {
1000                // Partial results stream in via $/progress notifications which
1001                // we don't currently subscribe to. Treat as a soft-empty
1002                // success — the next pull will get the full version.
1003                return PullFileOutcome::PartialNotSupported;
1004            }
1005        };
1006
1007        match report {
1008            lsp_types::DocumentDiagnosticReport::Full(full) => {
1009                let result_id = full.full_document_diagnostic_report.result_id.clone();
1010                let stored = from_lsp_diagnostics(
1011                    canonical_path.to_path_buf(),
1012                    full.full_document_diagnostic_report.items.clone(),
1013                );
1014                let count = stored.len();
1015                self.diagnostics.publish_with_result_id(
1016                    key.clone(),
1017                    canonical_path.to_path_buf(),
1018                    stored,
1019                    result_id,
1020                );
1021                PullFileOutcome::Full {
1022                    diagnostic_count: count,
1023                }
1024            }
1025            lsp_types::DocumentDiagnosticReport::Unchanged(_unchanged) => {
1026                // The server says cache is still valid. We don't refresh
1027                // anything; the existing entry's diagnostics remain authoritative.
1028                PullFileOutcome::Unchanged
1029            }
1030        }
1031    }
1032
1033    /// Shutdown all servers gracefully.
1034    pub fn shutdown_all(&mut self) {
1035        for (key, mut client) in self.clients.drain() {
1036            if let Err(err) = client.shutdown() {
1037                log::error!("error shutting down {:?}: {}", key, err);
1038            }
1039        }
1040        self.documents.clear();
1041        self.diagnostics = DiagnosticsStore::new();
1042    }
1043
1044    /// Check if any server is active.
1045    pub fn has_active_servers(&self) -> bool {
1046        self.clients
1047            .values()
1048            .any(|client| client.state() == ServerState::Ready)
1049    }
1050
1051    /// Active server keys (running clients). Used by `lsp_diagnostics`
1052    /// directory mode to know which servers to ask for workspace pull.
1053    pub fn active_server_keys(&self) -> Vec<ServerKey> {
1054        self.clients.keys().cloned().collect()
1055    }
1056
1057    pub fn get_diagnostics_for_file(&self, file: &Path) -> Vec<&StoredDiagnostic> {
1058        let normalized = normalize_lookup_path(file);
1059        self.diagnostics.for_file(&normalized)
1060    }
1061
1062    pub fn get_diagnostics_for_directory(&self, dir: &Path) -> Vec<&StoredDiagnostic> {
1063        let normalized = normalize_lookup_path(dir);
1064        self.diagnostics.for_directory(&normalized)
1065    }
1066
1067    pub fn get_all_diagnostics(&self) -> Vec<&StoredDiagnostic> {
1068        self.diagnostics.all()
1069    }
1070
1071    fn drain_events_for_file(&mut self, file_path: &Path) -> bool {
1072        let mut saw_file_diagnostics = false;
1073        while let Ok(event) = self.event_rx.try_recv() {
1074            if matches!(
1075                self.handle_event(&event),
1076                Some(ref published_file) if published_file.as_path() == file_path
1077            ) {
1078                saw_file_diagnostics = true;
1079            }
1080        }
1081        saw_file_diagnostics
1082    }
1083
1084    fn handle_event(&mut self, event: &LspEvent) -> Option<PathBuf> {
1085        match event {
1086            LspEvent::Notification {
1087                server_kind,
1088                root,
1089                method,
1090                params: Some(params),
1091            } if method == "textDocument/publishDiagnostics" => {
1092                self.handle_publish_diagnostics(server_kind.clone(), root.clone(), params)
1093            }
1094            LspEvent::ServerExited { server_kind, root } => {
1095                let key = ServerKey {
1096                    kind: server_kind.clone(),
1097                    root: root.clone(),
1098                };
1099                self.clients.remove(&key);
1100                self.documents.remove(&key);
1101                self.diagnostics.clear_server(server_kind.clone());
1102                None
1103            }
1104            _ => None,
1105        }
1106    }
1107
1108    fn handle_publish_diagnostics(
1109        &mut self,
1110        server: ServerKind,
1111        root: PathBuf,
1112        params: &serde_json::Value,
1113    ) -> Option<PathBuf> {
1114        if let Ok(publish_params) =
1115            serde_json::from_value::<lsp_types::PublishDiagnosticsParams>(params.clone())
1116        {
1117            let file = uri_to_path(&publish_params.uri)?;
1118            let stored = from_lsp_diagnostics(file.clone(), publish_params.diagnostics);
1119            // v0.17.3: store with real ServerKey { kind, root } and capture
1120            // the document `version` (when the server provided one) so the
1121            // post-edit waiter can reject stale publishes deterministically
1122            // via version-match (preferred) or epoch-delta (fallback). The
1123            // earlier `publish_with_kind` path silently dropped both.
1124            let key = ServerKey { kind: server, root };
1125            self.diagnostics
1126                .publish_full(key, file.clone(), stored, None, publish_params.version);
1127            return Some(file);
1128        }
1129        None
1130    }
1131
1132    fn spawn_server(
1133        &self,
1134        def: &ServerDef,
1135        root: &Path,
1136        config: &Config,
1137    ) -> Result<LspClient, LspError> {
1138        let binary = self.resolve_binary(def, config)?;
1139
1140        // Merge the server-defined env with our test-injected env.
1141        // `extra_env` is empty in production; tests use it to drive fake
1142        // server variants (AFT_FAKE_LSP_PULL=1, etc.).
1143        let mut merged_env = def.env.clone();
1144        for (key, value) in &self.extra_env {
1145            merged_env.insert(key.clone(), value.clone());
1146        }
1147
1148        let mut client = LspClient::spawn(
1149            def.kind.clone(),
1150            root.to_path_buf(),
1151            &binary,
1152            &def.args,
1153            &merged_env,
1154            self.event_tx.clone(),
1155        )?;
1156        client.initialize(root, def.initialization_options.clone())?;
1157        Ok(client)
1158    }
1159
1160    fn resolve_binary(&self, def: &ServerDef, config: &Config) -> Result<PathBuf, LspError> {
1161        if let Some(path) = self.binary_overrides.get(&def.kind) {
1162            if path.exists() {
1163                return Ok(path.clone());
1164            }
1165            return Err(LspError::NotFound(format!(
1166                "override binary for {:?} not found: {}",
1167                def.kind,
1168                path.display()
1169            )));
1170        }
1171
1172        if let Some(path) = env_binary_override(&def.kind) {
1173            if path.exists() {
1174                return Ok(path);
1175            }
1176            return Err(LspError::NotFound(format!(
1177                "environment override binary for {:?} not found: {}",
1178                def.kind,
1179                path.display()
1180            )));
1181        }
1182
1183        // Layered resolution:
1184        //   1. <project_root>/node_modules/.bin/<binary>
1185        //   2. config.lsp_paths_extra (plugin auto-install cache, etc.)
1186        //   3. PATH via `which`
1187        resolve_lsp_binary(
1188            &def.binary,
1189            config.project_root.as_deref(),
1190            &config.lsp_paths_extra,
1191        )
1192        .ok_or_else(|| {
1193            LspError::NotFound(format!(
1194                "language server binary '{}' not found in node_modules/.bin, lsp_paths_extra, or PATH",
1195                def.binary
1196            ))
1197        })
1198    }
1199
1200    fn server_key_for_file(&self, file_path: &Path, config: &Config) -> Option<ServerKey> {
1201        for def in servers_for_file(file_path, config) {
1202            let root = find_workspace_root(file_path, &def.root_markers)?;
1203            let key = ServerKey {
1204                kind: def.kind.clone(),
1205                root,
1206            };
1207            if self.clients.contains_key(&key) {
1208                return Some(key);
1209            }
1210        }
1211        None
1212    }
1213}
1214
1215impl Default for LspManager {
1216    fn default() -> Self {
1217        Self::new()
1218    }
1219}
1220
1221fn canonicalize_for_lsp(file_path: &Path) -> Result<PathBuf, LspError> {
1222    std::fs::canonicalize(file_path).map_err(LspError::from)
1223}
1224
1225fn uri_for_path(path: &Path) -> Result<lsp_types::Uri, LspError> {
1226    let url = url::Url::from_file_path(path).map_err(|_| {
1227        LspError::NotFound(format!(
1228            "failed to convert '{}' to file URI",
1229            path.display()
1230        ))
1231    })?;
1232    lsp_types::Uri::from_str(url.as_str()).map_err(|_| {
1233        LspError::NotFound(format!("failed to parse file URI for '{}'", path.display()))
1234    })
1235}
1236
1237fn language_id_for_extension(ext: &str) -> &'static str {
1238    match ext {
1239        "ts" => "typescript",
1240        "tsx" => "typescriptreact",
1241        "js" | "mjs" | "cjs" => "javascript",
1242        "jsx" => "javascriptreact",
1243        "py" | "pyi" => "python",
1244        "rs" => "rust",
1245        "go" => "go",
1246        "html" | "htm" => "html",
1247        _ => "plaintext",
1248    }
1249}
1250
1251fn normalize_lookup_path(path: &Path) -> PathBuf {
1252    std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1253}
1254
1255fn uri_to_path(uri: &lsp_types::Uri) -> Option<PathBuf> {
1256    let url = url::Url::parse(uri.as_str()).ok()?;
1257    url.to_file_path()
1258        .ok()
1259        .map(|path| normalize_lookup_path(&path))
1260}
1261
1262/// Classify an error returned by `spawn_server` into a structured
1263/// `ServerAttemptResult`. The two interesting cases for callers are:
1264/// - `BinaryNotInstalled` — the server's binary couldn't be resolved on PATH
1265///   or via override. The agent can be told "install bash-language-server".
1266/// - `SpawnFailed` — binary was found but spawning/initializing failed
1267///   (permissions, missing runtime, server crashed during initialize, etc.).
1268fn classify_spawn_error(binary: &str, err: &LspError) -> ServerAttemptResult {
1269    match err {
1270        // resolve_binary returns NotFound for both missing override paths and
1271        // missing PATH binaries. The "override missing" case is rare in
1272        // practice (only set in tests / env vars); we report all NotFound as
1273        // BinaryNotInstalled so the user sees an actionable install hint.
1274        LspError::NotFound(_) => ServerAttemptResult::BinaryNotInstalled {
1275            binary: binary.to_string(),
1276        },
1277        other => ServerAttemptResult::SpawnFailed {
1278            binary: binary.to_string(),
1279            reason: other.to_string(),
1280        },
1281    }
1282}
1283
1284fn env_binary_override(kind: &ServerKind) -> Option<PathBuf> {
1285    let id = kind.id_str();
1286    let suffix: String = id
1287        .chars()
1288        .map(|ch| {
1289            if ch.is_ascii_alphanumeric() {
1290                ch.to_ascii_uppercase()
1291            } else {
1292                '_'
1293            }
1294        })
1295        .collect();
1296    let key = format!("AFT_LSP_{suffix}_BINARY");
1297    std::env::var_os(key).map(PathBuf::from)
1298}