Skip to main content

aft/lsp/
client.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::io::{self, BufRead, BufReader, BufWriter};
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicI64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use crossbeam_channel::{bounded, RecvTimeoutError, Sender};
12use serde::de::DeserializeOwned;
13use serde_json::{json, Value};
14
15use crate::lsp::child_registry::LspChildRegistry;
16use crate::lsp::jsonrpc::{
17    Notification, Request, RequestId, Response as JsonRpcResponse, ServerMessage,
18};
19use crate::lsp::position::path_to_uri;
20use crate::lsp::registry::ServerKind;
21use crate::lsp::{transport, LspError};
22
23const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
24const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
25const EXIT_POLL_INTERVAL: Duration = Duration::from_millis(25);
26const STDERR_TAIL_LINES: usize = 64;
27const STDERR_LINE_BYTES: usize = 4 * 1024;
28
29type PendingMap = HashMap<RequestId, Sender<JsonRpcResponse>>;
30type WatchedFileRegistrations = Arc<Mutex<HashSet<String>>>;
31
32#[cfg(windows)]
33fn is_windows_batch_file(path: &Path) -> bool {
34    path.extension()
35        .and_then(|ext| ext.to_str())
36        .is_some_and(|ext| ext.eq_ignore_ascii_case("cmd") || ext.eq_ignore_ascii_case("bat"))
37}
38
39/// Lifecycle state of a language server.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum ServerState {
42    Starting,
43    Initializing,
44    Ready,
45    ShuttingDown,
46    Exited,
47}
48
49/// Events sent from background reader threads into the main loop.
50#[derive(Debug)]
51pub enum LspEvent {
52    /// Server sent a notification (e.g. publishDiagnostics).
53    Notification {
54        server_kind: ServerKind,
55        root: PathBuf,
56        method: String,
57        params: Option<Value>,
58    },
59    /// Server sent a request (e.g. workspace/configuration).
60    ServerRequest {
61        server_kind: ServerKind,
62        root: PathBuf,
63        id: RequestId,
64        method: String,
65        params: Option<Value>,
66    },
67    /// Server process exited or the transport stream closed.
68    ServerExited {
69        server_kind: ServerKind,
70        root: PathBuf,
71    },
72}
73
74/// What this server told us it can do during the LSP `initialize` handshake.
75///
76/// We capture this once and use it to route diagnostic requests:
77/// - `pull_diagnostics` → use `textDocument/diagnostic` instead of waiting for push
78/// - `workspace_diagnostics` → use `workspace/diagnostic` for directory mode
79///
80/// Defaults are conservative: `false` means "fall back to push semantics".
81#[derive(Debug, Clone, Default)]
82pub struct ServerDiagnosticCapabilities {
83    /// Server supports `textDocument/diagnostic` (LSP 3.17 per-file pull).
84    pub pull_diagnostics: bool,
85    /// Server supports `workspace/diagnostic` (LSP 3.17 workspace-wide pull).
86    pub workspace_diagnostics: bool,
87    /// `identifier` field from server's diagnosticProvider, if any.
88    /// Used to scope previousResultId tracking when multiple servers share a file.
89    pub identifier: Option<String>,
90    /// Whether the server requested workspace diagnostic refresh notifications.
91    /// We declare `refreshSupport: false` in our client capabilities so this
92    /// should always be false in practice — kept for completeness.
93    pub refresh_support: bool,
94}
95
96/// A client connected to one language server process.
97pub struct LspClient {
98    kind: ServerKind,
99    root: PathBuf,
100    state: ServerState,
101    child: Child,
102    /// Child PID captured at spawn time. Used by Drop to untrack the
103    /// PID from the shared registry; we capture once rather than reading
104    /// `child.id()` later because Drop ordering with the Child can race.
105    child_pid: u32,
106    writer: Arc<Mutex<BufWriter<std::process::ChildStdin>>>,
107
108    /// Pending request responses, keyed by request ID.
109    pending: Arc<Mutex<PendingMap>>,
110    /// Next request ID counter.
111    next_id: AtomicI64,
112    /// Diagnostic capabilities reported by the server in its initialize response.
113    /// `None` until `initialize()` succeeds; conservative defaults thereafter
114    /// when the server doesn't advertise diagnosticProvider.
115    diagnostic_caps: Option<ServerDiagnosticCapabilities>,
116    /// Whether the server advertised static `workspace.didChangeWatchedFiles`
117    /// support during `initialize`. Dynamic registration is tracked separately
118    /// in `watched_file_registrations`; either path permits notifications.
119    /// Intentional default: `false` (conservative — requires server opt-in).
120    supports_watched_files: bool,
121    /// Dynamic `workspace/didChangeWatchedFiles` registrations requested by
122    /// the server via `client/registerCapability`. Per LSP, the client must
123    /// not send watched-file notifications merely because a server mentions
124    /// dynamic registration during initialize; a real registration is required.
125    watched_file_registrations: WatchedFileRegistrations,
126    /// Shared registry that tracks live LSP child PIDs across the process
127    /// so the signal handler can SIGKILL them on SIGTERM/SIGINT before
128    /// aft exits. Cloned via `Arc` — multiple clients share the same set.
129    child_registry: LspChildRegistry,
130    stderr_tail: Arc<Mutex<VecDeque<String>>>,
131}
132
133impl LspClient {
134    /// Spawn a new language server process and start the background reader thread.
135    ///
136    /// `child_registry` is a shared handle that records this child's PID so
137    /// the signal handler can SIGKILL it on SIGTERM/SIGINT. Tests that don't
138    /// care about signal cleanup can pass `LspChildRegistry::new()`.
139    pub fn spawn(
140        kind: ServerKind,
141        root: PathBuf,
142        binary: &Path,
143        args: &[String],
144        env: &HashMap<String, String>,
145        event_tx: Sender<LspEvent>,
146        child_registry: LspChildRegistry,
147    ) -> io::Result<Self> {
148        #[cfg(windows)]
149        let mut command = if is_windows_batch_file(binary) {
150            let mut command = Command::new("cmd.exe");
151            command.arg("/C").arg(binary.as_os_str());
152            command
153        } else {
154            Command::new(binary)
155        };
156        #[cfg(not(windows))]
157        let mut command = Command::new(binary);
158        command
159            .args(args)
160            .current_dir(&root)
161            .stdin(Stdio::piped())
162            .stdout(Stdio::piped())
163            // Drain stderr on a background thread so failed shims/crashes have
164            // actionable diagnostics without risking pipe-buffer deadlock.
165            .stderr(Stdio::piped());
166        for (key, value) in env {
167            command.env(key, value);
168        }
169
170        // Put each LSP child in its own process group so we can SIGKILL the
171        // whole group on shutdown. Critical for npm-wrapped servers like
172        // biome (`node biome lsp-proxy` spawns `cli-darwin-arm64 biome
173        // lsp-proxy` as a child); killing just the wrapper PID leaves the
174        // real server orphaned to PID 1.
175        #[cfg(unix)]
176        unsafe {
177            use std::os::unix::process::CommandExt;
178            command.pre_exec(|| {
179                #[cfg(target_os = "linux")]
180                {
181                    // If aft is killed with SIGKILL, Rust cleanup and our
182                    // signal-handler thread never run. Ask the kernel to kill
183                    // the LSP process group as soon as the parent dies. This is
184                    // best-effort Linux coverage for the otherwise unhandleable
185                    // parent-death path.
186                    if libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL) == -1 {
187                        return Err(io::Error::last_os_error());
188                    }
189                    if libc::getppid() == 1 {
190                        return Err(io::Error::other("parent died before LSP spawn completed"));
191                    }
192                }
193                if libc::setsid() == -1 {
194                    return Err(io::Error::last_os_error());
195                }
196                Ok(())
197            });
198        }
199
200        let mut child = child_registry.spawn_tracked(&mut command)?;
201        let child_pid = child.id();
202
203        let stdout = child
204            .stdout
205            .take()
206            .ok_or_else(|| io::Error::other("language server missing stdout pipe"))?;
207        let stdin = child
208            .stdin
209            .take()
210            .ok_or_else(|| io::Error::other("language server missing stdin pipe"))?;
211        let stderr = child
212            .stderr
213            .take()
214            .ok_or_else(|| io::Error::other("language server missing stderr pipe"))?;
215        let stderr_tail = Arc::new(Mutex::new(VecDeque::with_capacity(STDERR_TAIL_LINES)));
216        spawn_stderr_drain_thread(stderr, Arc::clone(&stderr_tail));
217
218        let writer = Arc::new(Mutex::new(BufWriter::new(stdin)));
219        let pending = Arc::new(Mutex::new(PendingMap::new()));
220        let watched_file_registrations = Arc::new(Mutex::new(HashSet::new()));
221        let reader_pending = Arc::clone(&pending);
222        let reader_writer = Arc::clone(&writer);
223        let reader_watched_file_registrations = Arc::clone(&watched_file_registrations);
224        let reader_kind = kind.clone();
225        let reader_root = root.clone();
226
227        thread::spawn(move || {
228            let mut reader = BufReader::new(stdout);
229            loop {
230                match transport::read_message(&mut reader) {
231                    Ok(Some(ServerMessage::Response(response))) => {
232                        if let Ok(mut guard) = reader_pending.lock() {
233                            if let Some(tx) = guard.remove(&response.id) {
234                                if tx.send(response).is_err() {
235                                    log::debug!("response channel closed");
236                                }
237                            }
238                        } else {
239                            let _ = event_tx.send(LspEvent::ServerExited {
240                                server_kind: reader_kind.clone(),
241                                root: reader_root.clone(),
242                            });
243                            break;
244                        }
245                    }
246                    Ok(Some(ServerMessage::Notification { method, params })) => {
247                        let _ = event_tx.send(LspEvent::Notification {
248                            server_kind: reader_kind.clone(),
249                            root: reader_root.clone(),
250                            method,
251                            params,
252                        });
253                    }
254                    Ok(Some(ServerMessage::Request { id, method, params })) => {
255                        record_watched_file_registration(
256                            &reader_watched_file_registrations,
257                            &method,
258                            params.as_ref(),
259                        );
260                        // Auto-respond to server requests to prevent deadlocks.
261                        // Server requests (like client/registerCapability,
262                        // window/workDoneProgress/create) block the server until
263                        // we respond. If we don't respond, the server won't send
264                        // responses to OUR pending requests → deadlock.
265                        //
266                        // Dispatch by method to return correct types:
267                        // - workspace/configuration expects Vec<Value> (one per item)
268                        // - Everything else gets null (safe default for registration/progress)
269                        let response_value = if method == "workspace/configuration" {
270                            // Return an array of null configs — one per requested item.
271                            // Servers fall back to filesystem config (tsconfig, pyrightconfig, etc.)
272                            let item_count = params
273                                .as_ref()
274                                .and_then(|p| p.get("items"))
275                                .and_then(|items| items.as_array())
276                                .map_or(1, |arr| arr.len());
277                            serde_json::Value::Array(vec![serde_json::Value::Null; item_count])
278                        } else {
279                            serde_json::Value::Null
280                        };
281                        if let Ok(mut w) = reader_writer.lock() {
282                            let response = super::jsonrpc::OutgoingResponse::success(
283                                id.clone(),
284                                response_value,
285                            );
286                            let _ = transport::write_response(&mut *w, &response);
287                        }
288                        // Also forward as event for any interested handlers
289                        let _ = event_tx.send(LspEvent::ServerRequest {
290                            server_kind: reader_kind.clone(),
291                            root: reader_root.clone(),
292                            id,
293                            method,
294                            params,
295                        });
296                    }
297                    Ok(None) | Err(_) => {
298                        if let Ok(mut guard) = reader_pending.lock() {
299                            guard.clear();
300                        }
301                        let _ = event_tx.send(LspEvent::ServerExited {
302                            server_kind: reader_kind.clone(),
303                            root: reader_root.clone(),
304                        });
305                        break;
306                    }
307                }
308            }
309        });
310
311        Ok(Self {
312            kind,
313            root,
314            state: ServerState::Starting,
315            child,
316            child_pid,
317            writer,
318            pending,
319            next_id: AtomicI64::new(1),
320            diagnostic_caps: None,
321            supports_watched_files: false,
322            watched_file_registrations,
323            child_registry,
324            stderr_tail,
325        })
326    }
327
328    /// Send the initialize request and wait for response. Transition to Ready.
329    pub fn initialize(
330        &mut self,
331        workspace_root: &Path,
332        initialization_options: Option<serde_json::Value>,
333    ) -> Result<lsp_types::InitializeResult, LspError> {
334        self.ensure_can_send()?;
335        self.state = ServerState::Initializing;
336
337        let root_url = path_to_uri(workspace_root)?;
338        let root_uri = lsp_types::Uri::from_str(root_url.as_str()).map_err(|_| {
339            LspError::NotFound(format!(
340                "failed to convert workspace root '{}' to file URI",
341                workspace_root.display()
342            ))
343        })?;
344
345        let mut params_value = json!({
346            "processId": std::process::id(),
347            "rootUri": root_uri,
348            "capabilities": {
349                "workspace": {
350                    "workspaceFolders": true,
351                    "configuration": true,
352                    "didChangeWatchedFiles": {
353                        "dynamicRegistration": true
354                    },
355                    // LSP 3.17 workspace diagnostic pull. We declare refreshSupport=false
356                    // because we drive diagnostics on-demand via pull/push and re-query
357                    // when the agent calls lsp_diagnostics again — we don't need the
358                    // server to proactively push refresh notifications.
359                    "diagnostic": {
360                        "refreshSupport": false
361                    }
362                },
363                "textDocument": {
364                    "synchronization": {
365                        "dynamicRegistration": false,
366                        "didSave": true,
367                        "willSave": false,
368                        "willSaveWaitUntil": false
369                    },
370                    "publishDiagnostics": {
371                        "relatedInformation": true,
372                        "versionSupport": true,
373                        "codeDescriptionSupport": true,
374                        "dataSupport": true
375                    },
376                    // LSP 3.17 textDocument diagnostic pull. dynamicRegistration=false
377                    // because we use static capability discovery from the InitializeResult.
378                    // relatedDocumentSupport=true to receive cascading diagnostics for
379                    // files that became known while analyzing the requested one.
380                    "diagnostic": {
381                        "dynamicRegistration": false,
382                        "relatedDocumentSupport": true
383                    }
384                }
385            },
386            "clientInfo": {
387                "name": "aft",
388                "version": env!("CARGO_PKG_VERSION")
389            },
390            "workspaceFolders": [
391                {
392                    "uri": root_uri,
393                    "name": workspace_root
394                        .file_name()
395                        .and_then(|name| name.to_str())
396                        .unwrap_or("workspace")
397                }
398            ]
399        });
400        if let Some(initialization_options) = initialization_options {
401            params_value["initializationOptions"] = initialization_options;
402        }
403
404        let params = serde_json::from_value::<lsp_types::InitializeParams>(params_value)?;
405
406        let result_value = self.send_request_value(
407            <lsp_types::request::Initialize as lsp_types::request::Request>::METHOD,
408            params,
409        )?;
410        let result: lsp_types::InitializeResult = serde_json::from_value(result_value.clone())?;
411
412        // Capture diagnostic capabilities from the initialize response. We parse
413        // from a re-serialized JSON Value because the lsp-types crate's
414        // diagnostic_provider strict variants reject some shapes real servers
415        // emit (e.g. bare `true`), and we want defensive Default fallback.
416        let caps_value = result_value
417            .get("capabilities")
418            .cloned()
419            .unwrap_or_else(|| serde_json::to_value(&result.capabilities).unwrap_or(Value::Null));
420        self.diagnostic_caps = Some(parse_diagnostic_capabilities(&caps_value));
421
422        // Capture initialize-time (static) workspace/didChangeWatchedFiles
423        // support. Runtime client/registerCapability subscriptions are recorded
424        // separately by the reader thread. Missing capability is unsupported by
425        // default; callers must not send notifications unless one of those two
426        // server opt-in paths is present.
427        self.supports_watched_files = caps_value
428            .pointer("/workspace/didChangeWatchedFiles/dynamicRegistration")
429            .and_then(|v| v.as_bool())
430            .unwrap_or(false)
431            || caps_value
432                .pointer("/workspace/didChangeWatchedFiles")
433                .map(|v| v.is_object() || v.as_bool() == Some(true))
434                .unwrap_or(false);
435
436        self.send_notification::<lsp_types::notification::Initialized>(serde_json::from_value(
437            json!({}),
438        )?)?;
439        self.state = ServerState::Ready;
440        Ok(result)
441    }
442
443    /// Diagnostic capabilities advertised by the server. Returns `None` until
444    /// `initialize()` has succeeded; returns `Some` with conservative defaults
445    /// (all `false`) when the server didn't advertise diagnosticProvider.
446    pub fn diagnostic_capabilities(&self) -> Option<&ServerDiagnosticCapabilities> {
447        self.diagnostic_caps.as_ref()
448    }
449
450    /// Whether the server advertised initialize-time
451    /// `workspace/didChangeWatchedFiles` support. Dynamic registrations are
452    /// reported by `has_watched_file_registration()`.
453    pub fn supports_watched_files(&self) -> bool {
454        self.supports_watched_files
455    }
456
457    /// Whether this server currently has an active dynamic watched-file
458    /// registration. This, not the initialize-time capability shape, controls
459    /// whether `workspace/didChangeWatchedFiles` may be sent.
460    pub fn has_watched_file_registration(&self) -> bool {
461        self.watched_file_registrations
462            .lock()
463            .map(|registrations| !registrations.is_empty())
464            .unwrap_or(false)
465    }
466
467    /// Send a request and wait for the response.
468    pub fn send_request<R>(&mut self, params: R::Params) -> Result<R::Result, LspError>
469    where
470        R: lsp_types::request::Request,
471        R::Params: serde::Serialize,
472        R::Result: DeserializeOwned,
473    {
474        self.ensure_can_send()?;
475
476        let value = self.send_request_value(R::METHOD, params)?;
477        serde_json::from_value(value).map_err(Into::into)
478    }
479
480    /// Send a request and wait up to `timeout` for the response. If the local
481    /// deadline expires, remove the pending response handler and notify the
482    /// server with `$/cancelRequest` so it can stop work.
483    pub fn send_request_with_timeout<R>(
484        &mut self,
485        params: R::Params,
486        timeout: Duration,
487    ) -> Result<R::Result, LspError>
488    where
489        R: lsp_types::request::Request,
490        R::Params: serde::Serialize,
491        R::Result: DeserializeOwned,
492    {
493        self.ensure_can_send()?;
494
495        let value = self.send_request_value_with_timeout(R::METHOD, params, timeout)?;
496        serde_json::from_value(value).map_err(Into::into)
497    }
498
499    fn send_request_value<P>(&mut self, method: &'static str, params: P) -> Result<Value, LspError>
500    where
501        P: serde::Serialize,
502    {
503        self.send_request_value_with_timeout(method, params, REQUEST_TIMEOUT)
504    }
505
506    fn send_request_value_with_timeout<P>(
507        &mut self,
508        method: &'static str,
509        params: P,
510        timeout: Duration,
511    ) -> Result<Value, LspError>
512    where
513        P: serde::Serialize,
514    {
515        self.ensure_can_send()?;
516
517        let id = RequestId::Int(self.next_id.fetch_add(1, Ordering::Relaxed));
518        let (tx, rx) = bounded(1);
519        {
520            let mut pending = self.lock_pending()?;
521            pending.insert(id.clone(), tx);
522        }
523
524        let request = Request::new(id.clone(), method, Some(serde_json::to_value(params)?));
525        {
526            let mut writer = self
527                .writer
528                .lock()
529                .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
530            if let Err(err) = transport::write_request(&mut *writer, &request) {
531                self.remove_pending(&id);
532                return Err(err.into());
533            }
534        }
535
536        let response = match rx.recv_timeout(timeout) {
537            Ok(response) => response,
538            Err(RecvTimeoutError::Timeout) => {
539                self.remove_pending(&id);
540                self.send_cancel_request(&id)?;
541                return Err(LspError::Timeout(format!(
542                    "timed out waiting for '{}' response from {:?}",
543                    method, self.kind
544                )));
545            }
546            Err(RecvTimeoutError::Disconnected) => {
547                self.remove_pending(&id);
548                return Err(LspError::ServerNotReady(format!(
549                    "language server {:?} disconnected while waiting for '{}'",
550                    self.kind, method
551                )));
552            }
553        };
554
555        if let Some(error) = response.error {
556            return Err(LspError::ServerError {
557                code: error.code,
558                message: error.message,
559            });
560        }
561
562        Ok(response.result.unwrap_or(Value::Null))
563    }
564
565    /// Send a notification (fire-and-forget).
566    pub fn send_notification<N>(&mut self, params: N::Params) -> Result<(), LspError>
567    where
568        N: lsp_types::notification::Notification,
569        N::Params: serde::Serialize,
570    {
571        self.ensure_can_send()?;
572        let notification = Notification::new(N::METHOD, Some(serde_json::to_value(params)?));
573        let mut writer = self
574            .writer
575            .lock()
576            .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
577        transport::write_notification(&mut *writer, &notification)?;
578        Ok(())
579    }
580
581    /// Graceful shutdown: send shutdown request, then exit notification.
582    pub fn shutdown(&mut self) -> Result<(), LspError> {
583        if self.state == ServerState::Exited {
584            self.child_registry.untrack(self.child_pid);
585            return Ok(());
586        }
587
588        if self.child.try_wait()?.is_some() {
589            self.state = ServerState::Exited;
590            self.child_registry.untrack(self.child_pid);
591            return Ok(());
592        }
593
594        if let Err(err) = self.send_request::<lsp_types::request::Shutdown>(()) {
595            self.state = ServerState::ShuttingDown;
596            if self.child.try_wait()?.is_some() {
597                self.state = ServerState::Exited;
598                return Ok(());
599            }
600            return Err(err);
601        }
602
603        self.state = ServerState::ShuttingDown;
604
605        if let Err(err) = self.send_notification::<lsp_types::notification::Exit>(()) {
606            if self.child.try_wait()?.is_some() {
607                self.state = ServerState::Exited;
608                return Ok(());
609            }
610            return Err(err);
611        }
612
613        let deadline = Instant::now() + SHUTDOWN_TIMEOUT;
614        loop {
615            if self.child.try_wait()?.is_some() {
616                self.state = ServerState::Exited;
617                return Ok(());
618            }
619            if Instant::now() >= deadline {
620                // Kill the entire process group, not just the wrapper PID, so
621                // npm-wrapped servers (biome's `node biome lsp-proxy` spawns
622                // a separate cli-darwin-arm64 child) don't leak orphans.
623                kill_lsp_child_group(&mut self.child);
624                self.state = ServerState::Exited;
625                return Err(LspError::Timeout(format!(
626                    "timed out waiting for {:?} to exit",
627                    self.kind
628                )));
629            }
630            thread::sleep(EXIT_POLL_INTERVAL);
631        }
632    }
633
634    pub fn stderr_tail(&self) -> String {
635        self.stderr_tail
636            .lock()
637            .map(|tail| stderr_tail_to_string(&tail))
638            .unwrap_or_default()
639    }
640
641    pub fn child_exited(&mut self) -> bool {
642        self.child.try_wait().ok().flatten().is_some()
643    }
644
645    pub fn child_exit_status(&mut self) -> Option<std::process::ExitStatus> {
646        self.child.try_wait().ok().flatten()
647    }
648
649    pub fn state(&self) -> ServerState {
650        self.state
651    }
652
653    pub fn kind(&self) -> ServerKind {
654        self.kind.clone()
655    }
656
657    pub fn root(&self) -> &Path {
658        &self.root
659    }
660
661    fn ensure_can_send(&self) -> Result<(), LspError> {
662        if matches!(self.state, ServerState::ShuttingDown | ServerState::Exited) {
663            return Err(LspError::ServerNotReady(format!(
664                "language server {:?} is not ready (state: {:?})",
665                self.kind, self.state
666            )));
667        }
668        Ok(())
669    }
670
671    fn lock_pending(&self) -> Result<std::sync::MutexGuard<'_, PendingMap>, LspError> {
672        self.pending
673            .lock()
674            .map_err(|_| io::Error::other("pending response map poisoned").into())
675    }
676
677    fn remove_pending(&self, id: &RequestId) {
678        if let Ok(mut pending) = self.pending.lock() {
679            pending.remove(id);
680        }
681    }
682
683    fn send_cancel_request(&mut self, id: &RequestId) -> Result<(), LspError> {
684        let notification = Notification::new("$/cancelRequest", Some(json!({ "id": id })));
685        let mut writer = self
686            .writer
687            .lock()
688            .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
689        transport::write_notification(&mut *writer, &notification)?;
690        Ok(())
691    }
692}
693
694impl Drop for LspClient {
695    fn drop(&mut self) {
696        // Untrack first so the signal handler can't race with this kill and
697        // try to SIGKILL a PID that's already been reaped.
698        self.child_registry.untrack(self.child_pid);
699        kill_lsp_child_group(&mut self.child);
700    }
701}
702
703fn spawn_stderr_drain_thread(
704    stderr: std::process::ChildStderr,
705    stderr_tail: Arc<Mutex<VecDeque<String>>>,
706) {
707    thread::spawn(move || {
708        let mut reader = BufReader::new(stderr);
709        let mut line = String::new();
710
711        loop {
712            line.clear();
713            match reader.read_line(&mut line) {
714                Ok(0) => break,
715                Ok(_) => {
716                    if let Ok(mut tail) = stderr_tail.lock() {
717                        append_stderr_tail(&mut tail, &line);
718                    } else {
719                        break;
720                    }
721                }
722                Err(_) => break,
723            }
724        }
725    });
726}
727
728fn append_stderr_tail(tail: &mut VecDeque<String>, line: &str) {
729    if tail.len() == STDERR_TAIL_LINES {
730        tail.pop_front();
731    }
732    tail.push_back(trim_stderr_line(line));
733}
734
735fn trim_stderr_line(line: &str) -> String {
736    let line = line.trim_end_matches(|ch| ch == '\r' || ch == '\n');
737    if line.len() <= STDERR_LINE_BYTES {
738        return line.to_string();
739    }
740
741    let mut start = line.len() - STDERR_LINE_BYTES;
742    while start < line.len() && !line.is_char_boundary(start) {
743        start += 1;
744    }
745    format!("...{}", &line[start..])
746}
747
748fn stderr_tail_to_string(tail: &VecDeque<String>) -> String {
749    tail.iter()
750        .map(String::as_str)
751        .collect::<Vec<_>>()
752        .join("\n")
753}
754
755/// Force-terminate an LSP child and its entire process group on Unix.
756/// On Windows, `taskkill /F /T` kills the process tree.
757///
758/// Necessary because some LSP servers ship as npm-installed Node shims that
759/// spawn the real binary as a child. Killing only the wrapper PID leaves the
760/// real server orphaned to PID 1 and accumulates over time.
761fn kill_lsp_child_group(child: &mut std::process::Child) {
762    #[cfg(unix)]
763    {
764        let pgid = child.id() as i32;
765        crate::bash_background::process::terminate_pgid(pgid, Some(child));
766        let _ = child.wait();
767    }
768    #[cfg(not(unix))]
769    {
770        crate::bash_background::process::terminate_process(child);
771        let _ = child.wait();
772    }
773}
774
775fn record_watched_file_registration(
776    registrations: &WatchedFileRegistrations,
777    method: &str,
778    params: Option<&Value>,
779) {
780    match method {
781        "client/registerCapability" => {
782            let Some(items) = params
783                .and_then(|params| params.get("registrations"))
784                .and_then(|registrations| registrations.as_array())
785            else {
786                return;
787            };
788            if let Ok(mut guard) = registrations.lock() {
789                for item in items {
790                    if item.get("method").and_then(Value::as_str)
791                        == Some("workspace/didChangeWatchedFiles")
792                    {
793                        if let Some(id) = item.get("id").and_then(Value::as_str) {
794                            guard.insert(id.to_string());
795                        }
796                    }
797                }
798            }
799        }
800        "client/unregisterCapability" => {
801            let Some(items) = params
802                .and_then(|params| params.get("unregisterations"))
803                .and_then(|registrations| registrations.as_array())
804            else {
805                return;
806            };
807            if let Ok(mut guard) = registrations.lock() {
808                for item in items {
809                    if item.get("method").and_then(Value::as_str)
810                        == Some("workspace/didChangeWatchedFiles")
811                    {
812                        if let Some(id) = item.get("id").and_then(Value::as_str) {
813                            guard.remove(id);
814                        }
815                    }
816                }
817            }
818        }
819        _ => {}
820    }
821}
822
823/// Parse `ServerDiagnosticCapabilities` from a re-serialized
824/// `ServerCapabilities` JSON value.
825///
826/// LSP 3.17 spec for `diagnosticProvider`:
827/// - `capabilities.diagnosticProvider` may be absent (no pull support),
828///   `DiagnosticOptions`, or `DiagnosticRegistrationOptions`.
829/// - If present:
830///   - `interFileDependencies: bool` (we don't currently use this)
831///   - `workspaceDiagnostics: bool` → workspace pull support
832///   - `identifier?: string` → optional identifier scoping result IDs
833///
834/// We parse the raw JSON Value defensively: presence of any
835/// `diagnosticProvider` value (object or `true`) means the server supports
836/// at least `textDocument/diagnostic` pull.
837fn parse_diagnostic_capabilities(value: &Value) -> ServerDiagnosticCapabilities {
838    let mut caps = ServerDiagnosticCapabilities::default();
839
840    if let Some(provider) = value.get("diagnosticProvider") {
841        // diagnosticProvider can be `true` (rare) or an object. Treat both as
842        // pull_diagnostics support.
843        if provider.is_object() || provider.as_bool() == Some(true) {
844            caps.pull_diagnostics = true;
845        }
846
847        if let Some(obj) = provider.as_object() {
848            if obj
849                .get("workspaceDiagnostics")
850                .and_then(|v| v.as_bool())
851                .unwrap_or(false)
852            {
853                caps.workspace_diagnostics = true;
854            }
855            if let Some(identifier) = obj.get("identifier").and_then(|v| v.as_str()) {
856                caps.identifier = Some(identifier.to_string());
857            }
858        }
859    }
860
861    // Workspace diagnostic refresh (rare — most servers don't request this,
862    // and we declared refreshSupport=false in our client capabilities anyway).
863    if let Some(refresh) = value
864        .get("workspace")
865        .and_then(|w| w.get("diagnostic"))
866        .and_then(|d| d.get("refreshSupport"))
867        .and_then(|r| r.as_bool())
868    {
869        caps.refresh_support = refresh;
870    }
871
872    caps
873}
874
875#[cfg(test)]
876mod tests {
877    use super::*;
878
879    #[test]
880    fn parse_caps_no_diagnostic_provider() {
881        let value = json!({});
882        let caps = parse_diagnostic_capabilities(&value);
883        assert!(!caps.pull_diagnostics);
884        assert!(!caps.workspace_diagnostics);
885        assert!(caps.identifier.is_none());
886    }
887
888    #[test]
889    fn parse_caps_basic_pull_only() {
890        let value = json!({
891            "diagnosticProvider": {
892                "interFileDependencies": false,
893                "workspaceDiagnostics": false
894            }
895        });
896        let caps = parse_diagnostic_capabilities(&value);
897        assert!(caps.pull_diagnostics);
898        assert!(!caps.workspace_diagnostics);
899    }
900
901    #[test]
902    fn parse_caps_full_pull_with_workspace() {
903        let value = json!({
904            "diagnosticProvider": {
905                "interFileDependencies": true,
906                "workspaceDiagnostics": true,
907                "identifier": "rust-analyzer"
908            }
909        });
910        let caps = parse_diagnostic_capabilities(&value);
911        assert!(caps.pull_diagnostics);
912        assert!(caps.workspace_diagnostics);
913        assert_eq!(caps.identifier.as_deref(), Some("rust-analyzer"));
914    }
915
916    #[test]
917    fn parse_caps_provider_as_bare_true() {
918        // LSP 3.17 allows DiagnosticOptions OR boolean — treat true as pull_diagnostics
919        let value = json!({
920            "diagnosticProvider": true
921        });
922        let caps = parse_diagnostic_capabilities(&value);
923        assert!(caps.pull_diagnostics);
924        assert!(!caps.workspace_diagnostics);
925    }
926
927    #[test]
928    fn parse_caps_workspace_refresh_support() {
929        let value = json!({
930            "workspace": {
931                "diagnostic": {
932                    "refreshSupport": true
933                }
934            }
935        });
936        let caps = parse_diagnostic_capabilities(&value);
937        assert!(caps.refresh_support);
938        // No diagnosticProvider → pull still false
939        assert!(!caps.pull_diagnostics);
940    }
941}