Skip to main content

aft/lsp/
manager.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::str::FromStr;
4
5use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
6use lsp_types::notification::{DidChangeTextDocument, DidCloseTextDocument, DidOpenTextDocument};
7use lsp_types::{
8    DidChangeTextDocumentParams, DidCloseTextDocumentParams, DidOpenTextDocumentParams,
9    TextDocumentContentChangeEvent, TextDocumentIdentifier, TextDocumentItem,
10    VersionedTextDocumentIdentifier,
11};
12
13use crate::lsp::client::{LspClient, LspEvent, ServerState};
14use crate::lsp::diagnostics::{from_lsp_diagnostics, DiagnosticsStore, StoredDiagnostic};
15use crate::lsp::document::DocumentStore;
16use crate::lsp::registry::{servers_for_file, ServerDef, ServerKind};
17use crate::lsp::roots::{find_workspace_root, ServerKey};
18use crate::lsp::LspError;
19
20pub struct LspManager {
21    /// Active server instances, keyed by (ServerKind, workspace_root).
22    clients: HashMap<ServerKey, LspClient>,
23    /// Tracks opened documents and versions per active server.
24    documents: HashMap<ServerKey, DocumentStore>,
25    /// Stored publishDiagnostics payloads across all servers.
26    diagnostics: DiagnosticsStore,
27    /// Unified event channel — all server reader threads send here.
28    event_tx: Sender<LspEvent>,
29    event_rx: Receiver<LspEvent>,
30    /// Optional binary path overrides used by integration tests.
31    binary_overrides: HashMap<ServerKind, PathBuf>,
32}
33
34impl LspManager {
35    pub fn new() -> Self {
36        let (event_tx, event_rx) = unbounded();
37        Self {
38            clients: HashMap::new(),
39            documents: HashMap::new(),
40            diagnostics: DiagnosticsStore::new(),
41            event_tx,
42            event_rx,
43            binary_overrides: HashMap::new(),
44        }
45    }
46
47    /// Count active LSP server instances.
48    pub fn server_count(&self) -> usize {
49        self.clients.len()
50    }
51
52    /// For testing: override the binary for a server kind.
53    pub fn override_binary(&mut self, kind: ServerKind, binary_path: PathBuf) {
54        self.binary_overrides.insert(kind, binary_path);
55    }
56
57    /// Ensure a server is running for the given file. Spawns if needed.
58    /// Returns the active server keys for the file, or an empty vec if none match.
59    pub fn ensure_server_for_file(&mut self, file_path: &Path) -> Vec<ServerKey> {
60        let defs = servers_for_file(file_path);
61        let mut keys = Vec::new();
62
63        for def in defs {
64            let Some(root) = find_workspace_root(file_path, def.root_markers) else {
65                continue;
66            };
67
68            let key = ServerKey {
69                kind: def.kind,
70                root,
71            };
72
73            if !self.clients.contains_key(&key) {
74                match self.spawn_server(def, &key.root) {
75                    Ok(client) => {
76                        self.clients.insert(key.clone(), client);
77                        self.documents.entry(key.clone()).or_default();
78                    }
79                    Err(err) => {
80                        log::error!("failed to spawn {}: {}", def.name, err);
81                        continue;
82                    }
83                }
84            }
85
86            keys.push(key);
87        }
88
89        keys
90    }
91    /// Ensure that servers are running for the file and that the document is open
92    /// in each server's DocumentStore. Reads file content from disk if not already open.
93    /// Returns the server keys for the file.
94    pub fn ensure_file_open(&mut self, file_path: &Path) -> Result<Vec<ServerKey>, LspError> {
95        let canonical_path = canonicalize_for_lsp(file_path)?;
96        let server_keys = self.ensure_server_for_file(&canonical_path);
97        if server_keys.is_empty() {
98            return Ok(server_keys);
99        }
100
101        let uri = uri_for_path(&canonical_path)?;
102        let language_id = language_id_for_extension(
103            canonical_path
104                .extension()
105                .and_then(|ext| ext.to_str())
106                .unwrap_or_default(),
107        )
108        .to_string();
109
110        for key in &server_keys {
111            let already_open = self
112                .documents
113                .get(key)
114                .map_or(false, |store| store.is_open(&canonical_path));
115
116            if !already_open {
117                let content = std::fs::read_to_string(&canonical_path).map_err(LspError::Io)?;
118                if let Some(client) = self.clients.get_mut(key) {
119                    client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
120                        text_document: TextDocumentItem::new(
121                            uri.clone(),
122                            language_id.clone(),
123                            0,
124                            content,
125                        ),
126                    })?;
127                }
128                self.documents
129                    .entry(key.clone())
130                    .or_default()
131                    .open(canonical_path.clone());
132            }
133        }
134
135        Ok(server_keys)
136    }
137
138    /// Notify relevant LSP servers that a file has been written/changed.
139    /// This is the main hook called after every file write in AFT.
140    ///
141    /// If the file's server isn't running yet, starts it (lazy spawn).
142    /// If the file isn't open in LSP yet, sends didOpen. Otherwise sends didChange.
143    pub fn notify_file_changed(&mut self, file_path: &Path, content: &str) -> Result<(), LspError> {
144        let canonical_path = canonicalize_for_lsp(file_path)?;
145        let server_keys = self.ensure_server_for_file(&canonical_path);
146        if server_keys.is_empty() {
147            return Ok(());
148        }
149
150        let uri = uri_for_path(&canonical_path)?;
151        let language_id = language_id_for_extension(
152            canonical_path
153                .extension()
154                .and_then(|ext| ext.to_str())
155                .unwrap_or_default(),
156        )
157        .to_string();
158
159        for key in server_keys {
160            let current_version = self
161                .documents
162                .get(&key)
163                .and_then(|store| store.version(&canonical_path));
164
165            if let Some(version) = current_version {
166                let next_version = version + 1;
167                if let Some(client) = self.clients.get_mut(&key) {
168                    client.send_notification::<DidChangeTextDocument>(
169                        DidChangeTextDocumentParams {
170                            text_document: VersionedTextDocumentIdentifier::new(
171                                uri.clone(),
172                                next_version,
173                            ),
174                            content_changes: vec![TextDocumentContentChangeEvent {
175                                range: None,
176                                range_length: None,
177                                text: content.to_string(),
178                            }],
179                        },
180                    )?;
181                }
182                if let Some(store) = self.documents.get_mut(&key) {
183                    store.bump_version(&canonical_path);
184                }
185                continue;
186            }
187
188            if let Some(client) = self.clients.get_mut(&key) {
189                client.send_notification::<DidOpenTextDocument>(DidOpenTextDocumentParams {
190                    text_document: TextDocumentItem::new(
191                        uri.clone(),
192                        language_id.clone(),
193                        0,
194                        content.to_string(),
195                    ),
196                })?;
197            }
198            self.documents
199                .entry(key)
200                .or_default()
201                .open(canonical_path.clone());
202        }
203
204        Ok(())
205    }
206
207    /// Close a document in all servers that have it open.
208    pub fn notify_file_closed(&mut self, file_path: &Path) -> Result<(), LspError> {
209        let canonical_path = canonicalize_for_lsp(file_path)?;
210        let uri = uri_for_path(&canonical_path)?;
211        let keys: Vec<ServerKey> = self.documents.keys().cloned().collect();
212
213        for key in keys {
214            let was_open = self
215                .documents
216                .get(&key)
217                .map(|store| store.is_open(&canonical_path))
218                .unwrap_or(false);
219            if !was_open {
220                continue;
221            }
222
223            if let Some(client) = self.clients.get_mut(&key) {
224                client.send_notification::<DidCloseTextDocument>(DidCloseTextDocumentParams {
225                    text_document: TextDocumentIdentifier::new(uri.clone()),
226                })?;
227            }
228
229            if let Some(store) = self.documents.get_mut(&key) {
230                store.close(&canonical_path);
231            }
232        }
233
234        Ok(())
235    }
236
237    /// Get an active client for a file path, if one exists.
238    pub fn client_for_file(&self, file_path: &Path) -> Option<&LspClient> {
239        let key = self.server_key_for_file(file_path)?;
240        self.clients.get(&key)
241    }
242
243    /// Get a mutable active client for a file path, if one exists.
244    pub fn client_for_file_mut(&mut self, file_path: &Path) -> Option<&mut LspClient> {
245        let key = self.server_key_for_file(file_path)?;
246        self.clients.get_mut(&key)
247    }
248
249    /// Number of tracked server clients.
250    pub fn active_client_count(&self) -> usize {
251        self.clients.len()
252    }
253
254    /// Drain all pending LSP events. Call from the main loop.
255    pub fn drain_events(&mut self) -> Vec<LspEvent> {
256        let mut events = Vec::new();
257        while let Ok(event) = self.event_rx.try_recv() {
258            self.handle_event(&event);
259            events.push(event);
260        }
261        events
262    }
263
264    /// Wait for diagnostics to arrive for a specific file until a timeout expires.
265    pub fn wait_for_diagnostics(
266        &mut self,
267        file_path: &Path,
268        timeout: std::time::Duration,
269    ) -> Vec<StoredDiagnostic> {
270        let deadline = std::time::Instant::now() + timeout;
271        self.wait_for_file_diagnostics(file_path, deadline)
272    }
273
274    /// Wait for diagnostics to arrive for a specific file until a deadline.
275    ///
276    /// Drains already-queued events first, then blocks on the shared event
277    /// channel only until either `publishDiagnostics` arrives for this file or
278    /// the deadline is reached.
279    pub fn wait_for_file_diagnostics(
280        &mut self,
281        file_path: &Path,
282        deadline: std::time::Instant,
283    ) -> Vec<StoredDiagnostic> {
284        let lookup_path = normalize_lookup_path(file_path);
285
286        if self.server_key_for_file(&lookup_path).is_none() {
287            return Vec::new();
288        }
289
290        loop {
291            if self.drain_events_for_file(&lookup_path) {
292                break;
293            }
294
295            let now = std::time::Instant::now();
296            if now >= deadline {
297                break;
298            }
299
300            let timeout = deadline.saturating_duration_since(now);
301            match self.event_rx.recv_timeout(timeout) {
302                Ok(event) => {
303                    if matches!(
304                        self.handle_event(&event),
305                        Some(ref published_file) if published_file.as_path() == lookup_path.as_path()
306                    ) {
307                        break;
308                    }
309                }
310                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => break,
311            }
312        }
313
314        self.get_diagnostics_for_file(&lookup_path)
315            .into_iter()
316            .cloned()
317            .collect()
318    }
319
320    /// Shutdown all servers gracefully.
321    pub fn shutdown_all(&mut self) {
322        for (key, mut client) in self.clients.drain() {
323            if let Err(err) = client.shutdown() {
324                log::error!("error shutting down {:?}: {}", key, err);
325            }
326        }
327        self.documents.clear();
328        self.diagnostics = DiagnosticsStore::new();
329    }
330
331    /// Check if any server is active.
332    pub fn has_active_servers(&self) -> bool {
333        self.clients
334            .values()
335            .any(|client| client.state() == ServerState::Ready)
336    }
337
338    pub fn get_diagnostics_for_file(&self, file: &Path) -> Vec<&StoredDiagnostic> {
339        let normalized = normalize_lookup_path(file);
340        self.diagnostics.for_file(&normalized)
341    }
342
343    pub fn get_diagnostics_for_directory(&self, dir: &Path) -> Vec<&StoredDiagnostic> {
344        let normalized = normalize_lookup_path(dir);
345        self.diagnostics.for_directory(&normalized)
346    }
347
348    pub fn get_all_diagnostics(&self) -> Vec<&StoredDiagnostic> {
349        self.diagnostics.all()
350    }
351
352    fn drain_events_for_file(&mut self, file_path: &Path) -> bool {
353        let mut saw_file_diagnostics = false;
354        while let Ok(event) = self.event_rx.try_recv() {
355            if matches!(
356                self.handle_event(&event),
357                Some(ref published_file) if published_file.as_path() == file_path
358            ) {
359                saw_file_diagnostics = true;
360            }
361        }
362        saw_file_diagnostics
363    }
364
365    fn handle_event(&mut self, event: &LspEvent) -> Option<PathBuf> {
366        match event {
367            LspEvent::Notification {
368                server_kind,
369                method,
370                params: Some(params),
371                ..
372            } if method == "textDocument/publishDiagnostics" => {
373                self.handle_publish_diagnostics(*server_kind, params)
374            }
375            LspEvent::ServerExited { server_kind, root } => {
376                let key = ServerKey {
377                    kind: *server_kind,
378                    root: root.clone(),
379                };
380                self.clients.remove(&key);
381                self.documents.remove(&key);
382                self.diagnostics.clear_server(*server_kind);
383                None
384            }
385            _ => None,
386        }
387    }
388
389    fn handle_publish_diagnostics(
390        &mut self,
391        server: ServerKind,
392        params: &serde_json::Value,
393    ) -> Option<PathBuf> {
394        if let Ok(publish_params) =
395            serde_json::from_value::<lsp_types::PublishDiagnosticsParams>(params.clone())
396        {
397            let Some(file) = uri_to_path(&publish_params.uri) else {
398                return None;
399            };
400            let stored = from_lsp_diagnostics(file.clone(), publish_params.diagnostics);
401            self.diagnostics.publish(server, file, stored);
402            return Some(uri_to_path(&publish_params.uri)?);
403        }
404        None
405    }
406
407    fn spawn_server(&self, def: &ServerDef, root: &Path) -> Result<LspClient, LspError> {
408        let binary = self.resolve_binary(def)?;
409        let mut client = LspClient::spawn(
410            def.kind,
411            root.to_path_buf(),
412            &binary,
413            def.args,
414            self.event_tx.clone(),
415        )?;
416        client.initialize(root)?;
417        Ok(client)
418    }
419
420    fn resolve_binary(&self, def: &ServerDef) -> Result<PathBuf, LspError> {
421        if let Some(path) = self.binary_overrides.get(&def.kind) {
422            if path.exists() {
423                return Ok(path.clone());
424            }
425            return Err(LspError::NotFound(format!(
426                "override binary for {:?} not found: {}",
427                def.kind,
428                path.display()
429            )));
430        }
431
432        if let Some(path) = env_binary_override(def.kind) {
433            if path.exists() {
434                return Ok(path);
435            }
436            return Err(LspError::NotFound(format!(
437                "environment override binary for {:?} not found: {}",
438                def.kind,
439                path.display()
440            )));
441        }
442
443        which::which(def.binary).map_err(|_| {
444            LspError::NotFound(format!(
445                "language server binary '{}' not found on PATH",
446                def.binary
447            ))
448        })
449    }
450
451    fn server_key_for_file(&self, file_path: &Path) -> Option<ServerKey> {
452        for def in servers_for_file(file_path) {
453            let root = find_workspace_root(file_path, def.root_markers)?;
454            let key = ServerKey {
455                kind: def.kind,
456                root,
457            };
458            if self.clients.contains_key(&key) {
459                return Some(key);
460            }
461        }
462        None
463    }
464}
465
466impl Default for LspManager {
467    fn default() -> Self {
468        Self::new()
469    }
470}
471
472fn canonicalize_for_lsp(file_path: &Path) -> Result<PathBuf, LspError> {
473    std::fs::canonicalize(file_path).map_err(LspError::from)
474}
475
476fn uri_for_path(path: &Path) -> Result<lsp_types::Uri, LspError> {
477    let url = url::Url::from_file_path(path).map_err(|_| {
478        LspError::NotFound(format!(
479            "failed to convert '{}' to file URI",
480            path.display()
481        ))
482    })?;
483    lsp_types::Uri::from_str(url.as_str()).map_err(|_| {
484        LspError::NotFound(format!("failed to parse file URI for '{}'", path.display()))
485    })
486}
487
488fn language_id_for_extension(ext: &str) -> &'static str {
489    match ext {
490        "ts" => "typescript",
491        "tsx" => "typescriptreact",
492        "js" | "mjs" | "cjs" => "javascript",
493        "jsx" => "javascriptreact",
494        "py" | "pyi" => "python",
495        "rs" => "rust",
496        "go" => "go",
497        "html" | "htm" => "html",
498        _ => "plaintext",
499    }
500}
501
502fn normalize_lookup_path(path: &Path) -> PathBuf {
503    std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
504}
505
506fn uri_to_path(uri: &lsp_types::Uri) -> Option<PathBuf> {
507    let url = url::Url::parse(uri.as_str()).ok()?;
508    url.to_file_path()
509        .ok()
510        .map(|path| normalize_lookup_path(&path))
511}
512
513fn env_binary_override(kind: ServerKind) -> Option<PathBuf> {
514    let key = match kind {
515        ServerKind::TypeScript => "AFT_LSP_TYPESCRIPT_BINARY",
516        ServerKind::Python => "AFT_LSP_PYTHON_BINARY",
517        ServerKind::Rust => "AFT_LSP_RUST_BINARY",
518        ServerKind::Go => "AFT_LSP_GO_BINARY",
519    };
520    std::env::var_os(key).map(PathBuf::from)
521}