Skip to main content

aft/lsp/
manager.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::str::FromStr;
4
5use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
6use lsp_types::notification::{DidChangeTextDocument, DidCloseTextDocument, DidOpenTextDocument};
7use lsp_types::{
8    DidChangeTextDocumentParams, DidCloseTextDocumentParams, DidOpenTextDocumentParams,
9    TextDocumentContentChangeEvent, TextDocumentIdentifier, TextDocumentItem,
10    VersionedTextDocumentIdentifier,
11};
12
13use crate::config::Config;
14use crate::lsp::client::{LspClient, LspEvent, ServerState};
15use crate::lsp::diagnostics::{from_lsp_diagnostics, DiagnosticsStore, StoredDiagnostic};
16use crate::lsp::document::DocumentStore;
17use crate::lsp::registry::{resolve_lsp_binary, servers_for_file, ServerDef, ServerKind};
18use crate::lsp::roots::{find_workspace_root, ServerKey};
19use crate::lsp::LspError;
20
21/// Outcome of attempting to ensure a server is running for a single matching
22/// `ServerDef`. Returned per matching server so the caller can report exactly
23/// what happened to the user instead of collapsing all failures into "no
24/// server".
25#[derive(Debug, Clone)]
26pub enum ServerAttemptResult {
27    /// Server is running and ready to serve requests for this file.
28    Ok { server_key: ServerKey },
29    /// No workspace root was found by walking up from the file looking for
30    /// any of the server's configured root markers.
31    NoRootMarker { looked_for: Vec<String> },
32    /// The server's binary could not be found on PATH (or override was
33    /// missing/invalid).
34    BinaryNotInstalled { binary: String },
35    /// Binary was found but spawning or initializing the server failed.
36    SpawnFailed { binary: String, reason: String },
37}
38
39/// One server's attempt to handle a file.
40#[derive(Debug, Clone)]
41pub struct ServerAttempt {
42    /// Stable server identifier (kind ID, e.g. "pyright", "rust-analyzer").
43    pub server_id: String,
44    /// Server display name from the registry.
45    pub server_name: String,
46    pub result: ServerAttemptResult,
47}
48
49/// Aggregate outcome of `ensure_server_for_file_detailed`. Distinguishes:
50/// - "No server registered for this file's extension" (`attempts.is_empty()`)
51/// - "Servers registered but none could start" (`successful.is_empty()` but
52///   `!attempts.is_empty()`)
53/// - "At least one server is ready" (`!successful.is_empty()`)
54#[derive(Debug, Clone, Default)]
55pub struct EnsureServerOutcomes {
56    /// Server keys that are now running and ready to serve requests.
57    pub successful: Vec<ServerKey>,
58    /// Per-server attempt records. Empty if no server is registered for the
59    /// file's extension.
60    pub attempts: Vec<ServerAttempt>,
61}
62
63impl EnsureServerOutcomes {
64    /// True if no server in the registry matched this file's extension.
65    pub fn no_server_registered(&self) -> bool {
66        self.attempts.is_empty()
67    }
68}
69
70/// Per-server outcome of a `textDocument/diagnostic` (per-file pull) request.
71#[derive(Debug, Clone)]
72pub enum PullFileOutcome {
73    /// Server returned a full report; diagnostics stored.
74    Full { diagnostic_count: usize },
75    /// Server returned `kind: "unchanged"` — cached diagnostics still valid.
76    Unchanged,
77    /// Server returned a partial-result token; we don't subscribe to streamed
78    /// progress so the response is treated as a soft empty until the next pull.
79    PartialNotSupported,
80    /// Server doesn't advertise pull capability — caller should fall back to
81    /// push diagnostics for this server.
82    PullNotSupported,
83    /// The pull request failed (timeout, server error, etc.).
84    RequestFailed { reason: String },
85}
86
87/// Result of `pull_file_diagnostics` for one matching server.
88#[derive(Debug, Clone)]
89pub struct PullFileResult {
90    pub server_key: ServerKey,
91    pub outcome: PullFileOutcome,
92}
93
94/// Result of `pull_workspace_diagnostics` for a single server.
95#[derive(Debug, Clone)]
96pub struct PullWorkspaceResult {
97    pub server_key: ServerKey,
98    /// Files for which a Full report was received and cached. Files that came
99    /// back as `Unchanged` are NOT listed here because their cached entry was
100    /// already authoritative.
101    pub files_reported: Vec<PathBuf>,
102    /// True if the server returned a full response within the timeout.
103    pub complete: bool,
104    /// True if we cancelled (request timed out before the server responded).
105    pub cancelled: bool,
106    /// True if the server advertised workspace pull support. When false, the
107    /// other fields are empty and the caller should fall back to file-mode
108    /// pull or to push semantics.
109    pub supports_workspace: bool,
110}
111
112pub struct LspManager {
113    /// Active server instances, keyed by (ServerKind, workspace_root).
114    clients: HashMap<ServerKey, LspClient>,
115    /// Tracks opened documents and versions per active server.
116    documents: HashMap<ServerKey, DocumentStore>,
117    /// Stored publishDiagnostics payloads across all servers.
118    diagnostics: DiagnosticsStore,
119    /// Unified event channel — all server reader threads send here.
120    event_tx: Sender<LspEvent>,
121    event_rx: Receiver<LspEvent>,
122    /// Optional binary path overrides used by integration tests.
123    binary_overrides: HashMap<ServerKind, PathBuf>,
124    /// Extra env vars merged into every spawned LSP child. Used in tests to
125    /// drive the fake server's behavioral variants (`AFT_FAKE_LSP_PULL=1`,
126    /// `AFT_FAKE_LSP_WORKSPACE=1`, etc.). Production code does not set this.
127    extra_env: HashMap<String, String>,
128}
129
130impl LspManager {
131    pub fn new() -> Self {
132        let (event_tx, event_rx) = unbounded();
133        Self {
134            clients: HashMap::new(),
135            documents: HashMap::new(),
136            diagnostics: DiagnosticsStore::new(),
137            event_tx,
138            event_rx,
139            binary_overrides: HashMap::new(),
140            extra_env: HashMap::new(),
141        }
142    }
143
144    /// For testing: set an extra environment variable that gets passed to
145    /// every spawned LSP child process. Useful for driving fake-server
146    /// behavioral variants in integration tests.
147    pub fn set_extra_env(&mut self, key: &str, value: &str) {
148        self.extra_env.insert(key.to_string(), value.to_string());
149    }
150
151    /// Count active LSP server instances.
152    pub fn server_count(&self) -> usize {
153        self.clients.len()
154    }
155
156    /// For testing: override the binary for a server kind.
157    pub fn override_binary(&mut self, kind: ServerKind, binary_path: PathBuf) {
158        self.binary_overrides.insert(kind, binary_path);
159    }
160
161    /// Ensure a server is running for the given file. Spawns if needed.
162    /// Returns the active server keys for the file, or an empty vec if none match.
163    ///
164    /// This is the lightweight wrapper around [`ensure_server_for_file_detailed`]
165    /// that drops failure context. Prefer the detailed variant in command
166    /// handlers that need to surface honest error messages to the agent.
167    pub fn ensure_server_for_file(&mut self, file_path: &Path, config: &Config) -> Vec<ServerKey> {
168        self.ensure_server_for_file_detailed(file_path, config)
169            .successful
170    }
171
172    /// Detailed version of [`ensure_server_for_file`] that records every
173    /// matching server's outcome (`Ok` / `NoRootMarker` / `BinaryNotInstalled`
174    /// / `SpawnFailed`).
175    ///
176    /// Use this when the caller wants to honestly report _why_ a file has no
177    /// active server (e.g., to surface "bash-language-server not on PATH" to
178    /// the agent instead of silently returning `total: 0`).
179    pub fn ensure_server_for_file_detailed(
180        &mut self,
181        file_path: &Path,
182        config: &Config,
183    ) -> EnsureServerOutcomes {
184        let defs = servers_for_file(file_path, config);
185        let mut outcomes = EnsureServerOutcomes::default();
186
187        for def in defs {
188            let server_id = def.kind.id_str().to_string();
189            let server_name = def.name.to_string();
190
191            let Some(root) = find_workspace_root(file_path, &def.root_markers) else {
192                outcomes.attempts.push(ServerAttempt {
193                    server_id,
194                    server_name,
195                    result: ServerAttemptResult::NoRootMarker {
196                        looked_for: def.root_markers.iter().map(|s| s.to_string()).collect(),
197                    },
198                });
199                continue;
200            };
201
202            let key = ServerKey {
203                kind: def.kind.clone(),
204                root,
205            };
206
207            if !self.clients.contains_key(&key) {
208                match self.spawn_server(&def, &key.root, config) {
209                    Ok(client) => {
210                        self.clients.insert(key.clone(), client);
211                        self.documents.entry(key.clone()).or_default();
212                    }
213                    Err(err) => {
214                        log::error!("failed to spawn {}: {}", def.name, err);
215                        let result = classify_spawn_error(&def.binary, &err);
216                        outcomes.attempts.push(ServerAttempt {
217                            server_id,
218                            server_name,
219                            result,
220                        });
221                        continue;
222                    }
223                }
224            }
225
226            outcomes.attempts.push(ServerAttempt {
227                server_id,
228                server_name,
229                result: ServerAttemptResult::Ok {
230                    server_key: key.clone(),
231                },
232            });
233            outcomes.successful.push(key);
234        }
235
236        outcomes
237    }
238
239    /// Ensure a server is running using the default LSP registry.
240    /// Kept for integration tests that exercise built-in server helpers directly.
241    pub fn ensure_server_for_file_default(&mut self, file_path: &Path) -> Vec<ServerKey> {
242        self.ensure_server_for_file(file_path, &Config::default())
243    }
244    /// Ensure that servers are running for the file and that the document is open
245    /// in each server's DocumentStore. Reads file content from disk if not already open.
246    /// Returns the server keys for the file.
247    pub fn ensure_file_open(
248        &mut self,
249        file_path: &Path,
250        config: &Config,
251    ) -> Result<Vec<ServerKey>, LspError> {
252        let canonical_path = canonicalize_for_lsp(file_path)?;
253        let server_keys = self.ensure_server_for_file(&canonical_path, config);
254        if server_keys.is_empty() {
255            return Ok(server_keys);
256        }
257
258        let uri = uri_for_path(&canonical_path)?;
259        let language_id = language_id_for_extension(
260            canonical_path
261                .extension()
262                .and_then(|ext| ext.to_str())
263                .unwrap_or_default(),
264        )
265        .to_string();
266
267        for key in &server_keys {
268            let already_open = self
269                .documents
270                .get(key)
271                .is_some_and(|store| store.is_open(&canonical_path));
272
273            if !already_open {
274                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
275                if let Some(client) = self.clients.get_mut(key) {
276                    client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
277                        text_document: TextDocumentItem::new(
278                            uri.clone(),
279                            language_id.clone(),
280                            0,
281                            content,
282                        ),
283                    })?;
284                }
285                self.documents
286                    .entry(key.clone())
287                    .or_default()
288                    .open(canonical_path.clone());
289                continue;
290            }
291
292            // Document is already open. Check disk drift — if the file has
293            // been modified outside the AFT pipeline (other tool, manual
294            // edit, sibling session) we MUST send a didChange before any
295            // pull-diagnostic / hover query, otherwise the LSP server
296            // returns results computed from stale in-memory content.
297            //
298            // This is the regression fix Oracle flagged in finding #6:
299            // "ensure_file_open skips already-open files without checking
300            // if disk content changed."
301            let drifted = self
302                .documents
303                .get(key)
304                .is_some_and(|store| store.is_stale_on_disk(&canonical_path));
305            if drifted {
306                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
307                let next_version = self
308                    .documents
309                    .get(key)
310                    .and_then(|store| store.version(&canonical_path))
311                    .map(|v| v + 1)
312                    .unwrap_or(1);
313                if let Some(client) = self.clients.get_mut(key) {
314                    client.send_notification::<DidChangeTextDocument>(
315                        DidChangeTextDocumentParams {
316                            text_document: VersionedTextDocumentIdentifier::new(
317                                uri.clone(),
318                                next_version,
319                            ),
320                            content_changes: vec![TextDocumentContentChangeEvent {
321                                range: None,
322                                range_length: None,
323                                text: content,
324                            }],
325                        },
326                    )?;
327                }
328                if let Some(store) = self.documents.get_mut(key) {
329                    store.bump_version(&canonical_path);
330                }
331            }
332        }
333
334        Ok(server_keys)
335    }
336
337    pub fn ensure_file_open_default(
338        &mut self,
339        file_path: &Path,
340    ) -> Result<Vec<ServerKey>, LspError> {
341        self.ensure_file_open(file_path, &Config::default())
342    }
343
344    /// Notify relevant LSP servers that a file has been written/changed.
345    /// This is the main hook called after every file write in AFT.
346    ///
347    /// If the file's server isn't running yet, starts it (lazy spawn).
348    /// If the file isn't open in LSP yet, sends didOpen. Otherwise sends didChange.
349    pub fn notify_file_changed(
350        &mut self,
351        file_path: &Path,
352        content: &str,
353        config: &Config,
354    ) -> Result<(), LspError> {
355        let canonical_path = canonicalize_for_lsp(file_path)?;
356        let server_keys = self.ensure_server_for_file(&canonical_path, config);
357        if server_keys.is_empty() {
358            return Ok(());
359        }
360
361        let uri = uri_for_path(&canonical_path)?;
362        let language_id = language_id_for_extension(
363            canonical_path
364                .extension()
365                .and_then(|ext| ext.to_str())
366                .unwrap_or_default(),
367        )
368        .to_string();
369
370        for key in server_keys {
371            let current_version = self
372                .documents
373                .get(&key)
374                .and_then(|store| store.version(&canonical_path));
375
376            if let Some(version) = current_version {
377                let next_version = version + 1;
378                if let Some(client) = self.clients.get_mut(&key) {
379                    client.send_notification::<DidChangeTextDocument>(
380                        DidChangeTextDocumentParams {
381                            text_document: VersionedTextDocumentIdentifier::new(
382                                uri.clone(),
383                                next_version,
384                            ),
385                            content_changes: vec![TextDocumentContentChangeEvent {
386                                range: None,
387                                range_length: None,
388                                text: content.to_string(),
389                            }],
390                        },
391                    )?;
392                }
393                if let Some(store) = self.documents.get_mut(&key) {
394                    store.bump_version(&canonical_path);
395                }
396                continue;
397            }
398
399            if let Some(client) = self.clients.get_mut(&key) {
400                client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
401                    text_document: TextDocumentItem::new(
402                        uri.clone(),
403                        language_id.clone(),
404                        0,
405                        content.to_string(),
406                    ),
407                })?;
408            }
409            self.documents
410                .entry(key)
411                .or_default()
412                .open(canonical_path.clone());
413        }
414
415        Ok(())
416    }
417
418    pub fn notify_file_changed_default(
419        &mut self,
420        file_path: &Path,
421        content: &str,
422    ) -> Result<(), LspError> {
423        self.notify_file_changed(file_path, content, &Config::default())
424    }
425
426    /// Close a document in all servers that have it open.
427    pub fn notify_file_closed(&mut self, file_path: &Path) -> Result<(), LspError> {
428        let canonical_path = canonicalize_for_lsp(file_path)?;
429        let uri = uri_for_path(&canonical_path)?;
430        let keys: Vec<ServerKey> = self.documents.keys().cloned().collect();
431
432        for key in keys {
433            let was_open = self
434                .documents
435                .get(&key)
436                .map(|store| store.is_open(&canonical_path))
437                .unwrap_or(false);
438            if !was_open {
439                continue;
440            }
441
442            if let Some(client) = self.clients.get_mut(&key) {
443                client.send_notification::<DidCloseTextDocument>(DidCloseTextDocumentParams {
444                    text_document: TextDocumentIdentifier::new(uri.clone()),
445                })?;
446            }
447
448            if let Some(store) = self.documents.get_mut(&key) {
449                store.close(&canonical_path);
450            }
451        }
452
453        Ok(())
454    }
455
456    /// Get an active client for a file path, if one exists.
457    pub fn client_for_file(&self, file_path: &Path, config: &Config) -> Option<&LspClient> {
458        let key = self.server_key_for_file(file_path, config)?;
459        self.clients.get(&key)
460    }
461
462    pub fn client_for_file_default(&self, file_path: &Path) -> Option<&LspClient> {
463        self.client_for_file(file_path, &Config::default())
464    }
465
466    /// Get a mutable active client for a file path, if one exists.
467    pub fn client_for_file_mut(
468        &mut self,
469        file_path: &Path,
470        config: &Config,
471    ) -> Option<&mut LspClient> {
472        let key = self.server_key_for_file(file_path, config)?;
473        self.clients.get_mut(&key)
474    }
475
476    pub fn client_for_file_mut_default(&mut self, file_path: &Path) -> Option<&mut LspClient> {
477        self.client_for_file_mut(file_path, &Config::default())
478    }
479
480    /// Number of tracked server clients.
481    pub fn active_client_count(&self) -> usize {
482        self.clients.len()
483    }
484
485    /// Drain all pending LSP events. Call from the main loop.
486    pub fn drain_events(&mut self) -> Vec<LspEvent> {
487        let mut events = Vec::new();
488        while let Ok(event) = self.event_rx.try_recv() {
489            self.handle_event(&event);
490            events.push(event);
491        }
492        events
493    }
494
495    /// Wait for diagnostics to arrive for a specific file until a timeout expires.
496    pub fn wait_for_diagnostics(
497        &mut self,
498        file_path: &Path,
499        config: &Config,
500        timeout: std::time::Duration,
501    ) -> Vec<StoredDiagnostic> {
502        let deadline = std::time::Instant::now() + timeout;
503        self.wait_for_file_diagnostics(file_path, config, deadline)
504    }
505
506    pub fn wait_for_diagnostics_default(
507        &mut self,
508        file_path: &Path,
509        timeout: std::time::Duration,
510    ) -> Vec<StoredDiagnostic> {
511        self.wait_for_diagnostics(file_path, &Config::default(), timeout)
512    }
513
514    /// Wait for diagnostics to arrive for a specific file until a deadline.
515    ///
516    /// Drains already-queued events first, then blocks on the shared event
517    /// channel only until either `publishDiagnostics` arrives for this file or
518    /// the deadline is reached.
519    pub fn wait_for_file_diagnostics(
520        &mut self,
521        file_path: &Path,
522        config: &Config,
523        deadline: std::time::Instant,
524    ) -> Vec<StoredDiagnostic> {
525        let lookup_path = normalize_lookup_path(file_path);
526
527        if self.server_key_for_file(&lookup_path, config).is_none() {
528            return Vec::new();
529        }
530
531        loop {
532            if self.drain_events_for_file(&lookup_path) {
533                break;
534            }
535
536            let now = std::time::Instant::now();
537            if now >= deadline {
538                break;
539            }
540
541            let timeout = deadline.saturating_duration_since(now);
542            match self.event_rx.recv_timeout(timeout) {
543                Ok(event) => {
544                    if matches!(
545                        self.handle_event(&event),
546                        Some(ref published_file) if published_file.as_path() == lookup_path.as_path()
547                    ) {
548                        break;
549                    }
550                }
551                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
552            }
553        }
554
555        self.get_diagnostics_for_file(&lookup_path)
556            .into_iter()
557            .cloned()
558            .collect()
559    }
560
561    /// Default timeout for `textDocument/diagnostic` (per-file pull). Servers
562    /// usually respond in under 1s for files they've already analyzed; we
563    /// allow up to 10s before falling back to push semantics. Currently
564    /// surfaced via [`Self::pull_file_timeout`] for callers that want to
565    /// override the wait via the `wait_ms` knob.
566    pub const PULL_FILE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
567
568    /// Public accessor so command handlers can reuse the documented default.
569    pub fn pull_file_timeout() -> std::time::Duration {
570        Self::PULL_FILE_TIMEOUT
571    }
572
573    /// Default timeout for `workspace/diagnostic`. The LSP spec allows the
574    /// server to hold this open indefinitely; we cap at 10s and report
575    /// `complete: false` to the agent rather than hanging the bridge.
576    const PULL_WORKSPACE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
577
578    /// Issue a `textDocument/diagnostic` (LSP 3.17 per-file pull) request to
579    /// every server that supports pull diagnostics for the given file.
580    ///
581    /// Returns the per-server outcome. If a server reports `kind: "unchanged"`,
582    /// the cached entry's diagnostics are surfaced (deterministic re-use of
583    /// the previous response). If a server doesn't advertise pull capability,
584    /// it's skipped here — the caller should fall back to push for those.
585    ///
586    /// Side effects: results are stored in `DiagnosticsStore` so directory-mode
587    /// queries can aggregate them later.
588    pub fn pull_file_diagnostics(
589        &mut self,
590        file_path: &Path,
591        config: &Config,
592    ) -> Result<Vec<PullFileResult>, LspError> {
593        let canonical_path = canonicalize_for_lsp(file_path)?;
594        // Make sure servers are running and the document is open with fresh
595        // content (handles disk-drift via DocumentStore::is_stale_on_disk).
596        self.ensure_file_open(&canonical_path, config)?;
597
598        let server_keys = self.ensure_server_for_file(&canonical_path, config);
599        if server_keys.is_empty() {
600            return Ok(Vec::new());
601        }
602
603        let uri = uri_for_path(&canonical_path)?;
604        let mut results = Vec::with_capacity(server_keys.len());
605
606        for key in server_keys {
607            let supports_pull = self
608                .clients
609                .get(&key)
610                .and_then(|c| c.diagnostic_capabilities())
611                .is_some_and(|caps| caps.pull_diagnostics);
612
613            if !supports_pull {
614                results.push(PullFileResult {
615                    server_key: key.clone(),
616                    outcome: PullFileOutcome::PullNotSupported,
617                });
618                continue;
619            }
620
621            // Look up previous resultId for incremental requests.
622            let previous_result_id = self
623                .diagnostics
624                .entries_for_file(&canonical_path)
625                .into_iter()
626                .find(|(k, _)| **k == key)
627                .and_then(|(_, entry)| entry.result_id.clone());
628
629            let identifier = self
630                .clients
631                .get(&key)
632                .and_then(|c| c.diagnostic_capabilities())
633                .and_then(|caps| caps.identifier.clone());
634
635            let params = lsp_types::DocumentDiagnosticParams {
636                text_document: lsp_types::TextDocumentIdentifier { uri: uri.clone() },
637                identifier,
638                previous_result_id,
639                work_done_progress_params: Default::default(),
640                partial_result_params: Default::default(),
641            };
642
643            let outcome = match self.send_pull_request(&key, params) {
644                Ok(report) => self.ingest_document_report(&key, &canonical_path, report),
645                Err(err) => PullFileOutcome::RequestFailed {
646                    reason: err.to_string(),
647                },
648            };
649
650            results.push(PullFileResult {
651                server_key: key,
652                outcome,
653            });
654        }
655
656        Ok(results)
657    }
658
659    /// Issue a `workspace/diagnostic` request to a specific server. Cancels
660    /// internally if `timeout` elapses before the server responds. Cached
661    /// entries from the response are stored so directory-mode queries pick
662    /// them up.
663    pub fn pull_workspace_diagnostics(
664        &mut self,
665        server_key: &ServerKey,
666        timeout: Option<std::time::Duration>,
667    ) -> Result<PullWorkspaceResult, LspError> {
668        let _timeout = timeout.unwrap_or(Self::PULL_WORKSPACE_TIMEOUT);
669
670        let supports_workspace = self
671            .clients
672            .get(server_key)
673            .and_then(|c| c.diagnostic_capabilities())
674            .is_some_and(|caps| caps.workspace_diagnostics);
675
676        if !supports_workspace {
677            return Ok(PullWorkspaceResult {
678                server_key: server_key.clone(),
679                files_reported: Vec::new(),
680                complete: false,
681                cancelled: false,
682                supports_workspace: false,
683            });
684        }
685
686        let identifier = self
687            .clients
688            .get(server_key)
689            .and_then(|c| c.diagnostic_capabilities())
690            .and_then(|caps| caps.identifier.clone());
691
692        let params = lsp_types::WorkspaceDiagnosticParams {
693            identifier,
694            previous_result_ids: Vec::new(),
695            work_done_progress_params: Default::default(),
696            partial_result_params: Default::default(),
697        };
698
699        // Note: LspClient::send_request currently uses a fixed REQUEST_TIMEOUT
700        // (30s, see client.rs). For workspace pull this is intentionally not
701        // overridden because servers like rust-analyzer may legitimately take
702        // many seconds on first request. The plugin bridge timeout (also 30s)
703        // is what we ultimately defer to. In a future revision we should plumb
704        // a custom timeout through send_request — for v0.16 we accept that
705        // workspace pull obeys the standard request timeout.
706        let result = match self
707            .clients
708            .get_mut(server_key)
709            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?
710            .send_request::<lsp_types::request::WorkspaceDiagnosticRequest>(params)
711        {
712            Ok(result) => result,
713            Err(LspError::Timeout(_)) => {
714                return Ok(PullWorkspaceResult {
715                    server_key: server_key.clone(),
716                    files_reported: Vec::new(),
717                    complete: false,
718                    cancelled: true,
719                    supports_workspace: true,
720                });
721            }
722            Err(err) => return Err(err),
723        };
724
725        // Extract the items list. Partial responses stream via $/progress
726        // notifications which we don't subscribe to — treat them as soft
727        // empty (caller will see complete: true with files_reported empty,
728        // matching "got a partial response, no full report").
729        let items = match result {
730            lsp_types::WorkspaceDiagnosticReportResult::Report(report) => report.items,
731            lsp_types::WorkspaceDiagnosticReportResult::Partial(_) => Vec::new(),
732        };
733
734        // Ingest each file report into the diagnostics store.
735        let mut files_reported = Vec::with_capacity(items.len());
736        for item in items {
737            match item {
738                lsp_types::WorkspaceDocumentDiagnosticReport::Full(full) => {
739                    if let Some(file) = uri_to_path(&full.uri) {
740                        let stored = from_lsp_diagnostics(
741                            file.clone(),
742                            full.full_document_diagnostic_report.items.clone(),
743                        );
744                        self.diagnostics.publish_with_result_id(
745                            server_key.clone(),
746                            file.clone(),
747                            stored,
748                            full.full_document_diagnostic_report.result_id.clone(),
749                        );
750                        files_reported.push(file);
751                    }
752                }
753                lsp_types::WorkspaceDocumentDiagnosticReport::Unchanged(_unchanged) => {
754                    // "Unchanged" means the previously cached report is still
755                    // valid. We left it in place; nothing to do.
756                }
757            }
758        }
759
760        Ok(PullWorkspaceResult {
761            server_key: server_key.clone(),
762            files_reported,
763            complete: true,
764            cancelled: false,
765            supports_workspace: true,
766        })
767    }
768
769    /// Issue the per-file diagnostic request and return the report.
770    fn send_pull_request(
771        &mut self,
772        key: &ServerKey,
773        params: lsp_types::DocumentDiagnosticParams,
774    ) -> Result<lsp_types::DocumentDiagnosticReportResult, LspError> {
775        let client = self
776            .clients
777            .get_mut(key)
778            .ok_or_else(|| LspError::ServerNotReady("server not found".into()))?;
779        client.send_request::<lsp_types::request::DocumentDiagnosticRequest>(params)
780    }
781
782    /// Store the result of a per-file pull request and return a structured
783    /// outcome the caller can inspect.
784    fn ingest_document_report(
785        &mut self,
786        key: &ServerKey,
787        canonical_path: &Path,
788        result: lsp_types::DocumentDiagnosticReportResult,
789    ) -> PullFileOutcome {
790        let report = match result {
791            lsp_types::DocumentDiagnosticReportResult::Report(report) => report,
792            lsp_types::DocumentDiagnosticReportResult::Partial(_) => {
793                // Partial results stream in via $/progress notifications which
794                // we don't currently subscribe to. Treat as a soft-empty
795                // success — the next pull will get the full version.
796                return PullFileOutcome::PartialNotSupported;
797            }
798        };
799
800        match report {
801            lsp_types::DocumentDiagnosticReport::Full(full) => {
802                let result_id = full.full_document_diagnostic_report.result_id.clone();
803                let stored = from_lsp_diagnostics(
804                    canonical_path.to_path_buf(),
805                    full.full_document_diagnostic_report.items.clone(),
806                );
807                let count = stored.len();
808                self.diagnostics.publish_with_result_id(
809                    key.clone(),
810                    canonical_path.to_path_buf(),
811                    stored,
812                    result_id,
813                );
814                PullFileOutcome::Full {
815                    diagnostic_count: count,
816                }
817            }
818            lsp_types::DocumentDiagnosticReport::Unchanged(_unchanged) => {
819                // The server says cache is still valid. We don't refresh
820                // anything; the existing entry's diagnostics remain authoritative.
821                PullFileOutcome::Unchanged
822            }
823        }
824    }
825
826    /// Shutdown all servers gracefully.
827    pub fn shutdown_all(&mut self) {
828        for (key, mut client) in self.clients.drain() {
829            if let Err(err) = client.shutdown() {
830                log::error!("error shutting down {:?}: {}", key, err);
831            }
832        }
833        self.documents.clear();
834        self.diagnostics = DiagnosticsStore::new();
835    }
836
837    /// Check if any server is active.
838    pub fn has_active_servers(&self) -> bool {
839        self.clients
840            .values()
841            .any(|client| client.state() == ServerState::Ready)
842    }
843
844    /// Active server keys (running clients). Used by `lsp_diagnostics`
845    /// directory mode to know which servers to ask for workspace pull.
846    pub fn active_server_keys(&self) -> Vec<ServerKey> {
847        self.clients.keys().cloned().collect()
848    }
849
850    pub fn get_diagnostics_for_file(&self, file: &Path) -> Vec<&StoredDiagnostic> {
851        let normalized = normalize_lookup_path(file);
852        self.diagnostics.for_file(&normalized)
853    }
854
855    pub fn get_diagnostics_for_directory(&self, dir: &Path) -> Vec<&StoredDiagnostic> {
856        let normalized = normalize_lookup_path(dir);
857        self.diagnostics.for_directory(&normalized)
858    }
859
860    pub fn get_all_diagnostics(&self) -> Vec<&StoredDiagnostic> {
861        self.diagnostics.all()
862    }
863
864    fn drain_events_for_file(&mut self, file_path: &Path) -> bool {
865        let mut saw_file_diagnostics = false;
866        while let Ok(event) = self.event_rx.try_recv() {
867            if matches!(
868                self.handle_event(&event),
869                Some(ref published_file) if published_file.as_path() == file_path
870            ) {
871                saw_file_diagnostics = true;
872            }
873        }
874        saw_file_diagnostics
875    }
876
877    fn handle_event(&mut self, event: &LspEvent) -> Option<PathBuf> {
878        match event {
879            LspEvent::Notification {
880                server_kind,
881                method,
882                params: Some(params),
883                ..
884            } if method == "textDocument/publishDiagnostics" => {
885                self.handle_publish_diagnostics(server_kind.clone(), params)
886            }
887            LspEvent::ServerExited { server_kind, root } => {
888                let key = ServerKey {
889                    kind: server_kind.clone(),
890                    root: root.clone(),
891                };
892                self.clients.remove(&key);
893                self.documents.remove(&key);
894                self.diagnostics.clear_server(server_kind.clone());
895                None
896            }
897            _ => None,
898        }
899    }
900
901    fn handle_publish_diagnostics(
902        &mut self,
903        server: ServerKind,
904        params: &serde_json::Value,
905    ) -> Option<PathBuf> {
906        if let Ok(publish_params) =
907            serde_json::from_value::<lsp_types::PublishDiagnosticsParams>(params.clone())
908        {
909            let file = uri_to_path(&publish_params.uri)?;
910            let stored = from_lsp_diagnostics(file.clone(), publish_params.diagnostics);
911            self.diagnostics.publish_with_kind(server, file, stored);
912            return uri_to_path(&publish_params.uri);
913        }
914        None
915    }
916
917    fn spawn_server(
918        &self,
919        def: &ServerDef,
920        root: &Path,
921        config: &Config,
922    ) -> Result<LspClient, LspError> {
923        let binary = self.resolve_binary(def, config)?;
924
925        // Merge the server-defined env with our test-injected env.
926        // `extra_env` is empty in production; tests use it to drive fake
927        // server variants (AFT_FAKE_LSP_PULL=1, etc.).
928        let mut merged_env = def.env.clone();
929        for (key, value) in &self.extra_env {
930            merged_env.insert(key.clone(), value.clone());
931        }
932
933        let mut client = LspClient::spawn(
934            def.kind.clone(),
935            root.to_path_buf(),
936            &binary,
937            &def.args,
938            &merged_env,
939            self.event_tx.clone(),
940        )?;
941        client.initialize(root, def.initialization_options.clone())?;
942        Ok(client)
943    }
944
945    fn resolve_binary(&self, def: &ServerDef, config: &Config) -> Result<PathBuf, LspError> {
946        if let Some(path) = self.binary_overrides.get(&def.kind) {
947            if path.exists() {
948                return Ok(path.clone());
949            }
950            return Err(LspError::NotFound(format!(
951                "override binary for {:?} not found: {}",
952                def.kind,
953                path.display()
954            )));
955        }
956
957        if let Some(path) = env_binary_override(&def.kind) {
958            if path.exists() {
959                return Ok(path);
960            }
961            return Err(LspError::NotFound(format!(
962                "environment override binary for {:?} not found: {}",
963                def.kind,
964                path.display()
965            )));
966        }
967
968        // Layered resolution:
969        //   1. <project_root>/node_modules/.bin/<binary>
970        //   2. config.lsp_paths_extra (plugin auto-install cache, etc.)
971        //   3. PATH via `which`
972        resolve_lsp_binary(
973            &def.binary,
974            config.project_root.as_deref(),
975            &config.lsp_paths_extra,
976        )
977        .ok_or_else(|| {
978            LspError::NotFound(format!(
979                "language server binary '{}' not found in node_modules/.bin, lsp_paths_extra, or PATH",
980                def.binary
981            ))
982        })
983    }
984
985    fn server_key_for_file(&self, file_path: &Path, config: &Config) -> Option<ServerKey> {
986        for def in servers_for_file(file_path, config) {
987            let root = find_workspace_root(file_path, &def.root_markers)?;
988            let key = ServerKey {
989                kind: def.kind.clone(),
990                root,
991            };
992            if self.clients.contains_key(&key) {
993                return Some(key);
994            }
995        }
996        None
997    }
998}
999
1000impl Default for LspManager {
1001    fn default() -> Self {
1002        Self::new()
1003    }
1004}
1005
1006fn canonicalize_for_lsp(file_path: &Path) -> Result<PathBuf, LspError> {
1007    std::fs::canonicalize(file_path).map_err(LspError::from)
1008}
1009
1010fn uri_for_path(path: &Path) -> Result<lsp_types::Uri, LspError> {
1011    let url = url::Url::from_file_path(path).map_err(|_| {
1012        LspError::NotFound(format!(
1013            "failed to convert '{}' to file URI",
1014            path.display()
1015        ))
1016    })?;
1017    lsp_types::Uri::from_str(url.as_str()).map_err(|_| {
1018        LspError::NotFound(format!("failed to parse file URI for '{}'", path.display()))
1019    })
1020}
1021
1022fn language_id_for_extension(ext: &str) -> &'static str {
1023    match ext {
1024        "ts" => "typescript",
1025        "tsx" => "typescriptreact",
1026        "js" | "mjs" | "cjs" => "javascript",
1027        "jsx" => "javascriptreact",
1028        "py" | "pyi" => "python",
1029        "rs" => "rust",
1030        "go" => "go",
1031        "html" | "htm" => "html",
1032        _ => "plaintext",
1033    }
1034}
1035
1036fn normalize_lookup_path(path: &Path) -> PathBuf {
1037    std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1038}
1039
1040fn uri_to_path(uri: &lsp_types::Uri) -> Option<PathBuf> {
1041    let url = url::Url::parse(uri.as_str()).ok()?;
1042    url.to_file_path()
1043        .ok()
1044        .map(|path| normalize_lookup_path(&path))
1045}
1046
1047/// Classify an error returned by `spawn_server` into a structured
1048/// `ServerAttemptResult`. The two interesting cases for callers are:
1049/// - `BinaryNotInstalled` — the server's binary couldn't be resolved on PATH
1050///   or via override. The agent can be told "install bash-language-server".
1051/// - `SpawnFailed` — binary was found but spawning/initializing failed
1052///   (permissions, missing runtime, server crashed during initialize, etc.).
1053fn classify_spawn_error(binary: &str, err: &LspError) -> ServerAttemptResult {
1054    match err {
1055        // resolve_binary returns NotFound for both missing override paths and
1056        // missing PATH binaries. The "override missing" case is rare in
1057        // practice (only set in tests / env vars); we report all NotFound as
1058        // BinaryNotInstalled so the user sees an actionable install hint.
1059        LspError::NotFound(_) => ServerAttemptResult::BinaryNotInstalled {
1060            binary: binary.to_string(),
1061        },
1062        other => ServerAttemptResult::SpawnFailed {
1063            binary: binary.to_string(),
1064            reason: other.to_string(),
1065        },
1066    }
1067}
1068
1069fn env_binary_override(kind: &ServerKind) -> Option<PathBuf> {
1070    let id = kind.id_str();
1071    let suffix: String = id
1072        .chars()
1073        .map(|ch| {
1074            if ch.is_ascii_alphanumeric() {
1075                ch.to_ascii_uppercase()
1076            } else {
1077                '_'
1078            }
1079        })
1080        .collect();
1081    let key = format!("AFT_LSP_{suffix}_BINARY");
1082    std::env::var_os(key).map(PathBuf::from)
1083}