Skip to main content

aft/lsp/
client.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::io::{self, BufRead, BufReader, BufWriter};
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicI64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use crossbeam_channel::{bounded, RecvTimeoutError, Sender};
12use serde::de::DeserializeOwned;
13use serde_json::{json, Value};
14
15use crate::lsp::child_registry::LspChildRegistry;
16use crate::lsp::jsonrpc::{
17    Notification, Request, RequestId, Response as JsonRpcResponse, ServerMessage,
18};
19use crate::lsp::position::path_to_uri;
20use crate::lsp::registry::ServerKind;
21use crate::lsp::{transport, LspError};
22
23/// Default timeout for interactive LSP requests (hover, goto-def, references, rename).
24const INTERACTIVE_REQUEST_TIMEOUT: Duration = Duration::from_secs(8);
25/// Longer budget for one-shot handshake requests (initialize, shutdown).
26const HANDSHAKE_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
27const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
28const EXIT_POLL_INTERVAL: Duration = Duration::from_millis(25);
29const STDERR_TAIL_LINES: usize = 64;
30const STDERR_LINE_BYTES: usize = 4 * 1024;
31
32type PendingMap = HashMap<RequestId, Sender<JsonRpcResponse>>;
33type WatchedFileRegistrations = Arc<Mutex<HashSet<String>>>;
34
35#[cfg(windows)]
36fn is_windows_batch_file(path: &Path) -> bool {
37    path.extension()
38        .and_then(|ext| ext.to_str())
39        .is_some_and(|ext| ext.eq_ignore_ascii_case("cmd") || ext.eq_ignore_ascii_case("bat"))
40}
41
42/// Lifecycle state of a language server.
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum ServerState {
45    Starting,
46    Initializing,
47    Ready,
48    ShuttingDown,
49    Exited,
50}
51
52/// Events sent from background reader threads into the main loop.
53#[derive(Debug)]
54pub enum LspEvent {
55    /// Server sent a notification (e.g. publishDiagnostics).
56    Notification {
57        server_kind: ServerKind,
58        root: PathBuf,
59        method: String,
60        params: Option<Value>,
61    },
62    /// Server sent a request (e.g. workspace/configuration).
63    ServerRequest {
64        server_kind: ServerKind,
65        root: PathBuf,
66        id: RequestId,
67        method: String,
68        params: Option<Value>,
69    },
70    /// Server process exited or the transport stream closed.
71    ServerExited {
72        server_kind: ServerKind,
73        root: PathBuf,
74    },
75}
76
77/// What this server told us it can do during the LSP `initialize` handshake.
78///
79/// We capture this once and use it to route diagnostic requests:
80/// - `pull_diagnostics` → use `textDocument/diagnostic` instead of waiting for push
81/// - `workspace_diagnostics` → use `workspace/diagnostic` for directory mode
82///
83/// Defaults are conservative: `false` means "fall back to push semantics".
84#[derive(Debug, Clone, Default)]
85pub struct ServerDiagnosticCapabilities {
86    /// Server supports `textDocument/diagnostic` (LSP 3.17 per-file pull).
87    pub pull_diagnostics: bool,
88    /// Server supports `workspace/diagnostic` (LSP 3.17 workspace-wide pull).
89    pub workspace_diagnostics: bool,
90    /// `identifier` field from server's diagnosticProvider, if any.
91    /// Used to scope previousResultId tracking when multiple servers share a file.
92    pub identifier: Option<String>,
93    /// Whether the server requested workspace diagnostic refresh notifications.
94    /// We declare `refreshSupport: false` in our client capabilities so this
95    /// should always be false in practice — kept for completeness.
96    pub refresh_support: bool,
97}
98
99/// A client connected to one language server process.
100pub struct LspClient {
101    kind: ServerKind,
102    root: PathBuf,
103    state: ServerState,
104    child: Child,
105    /// Child PID captured at spawn time. Used by Drop to untrack the
106    /// PID from the shared registry; we capture once rather than reading
107    /// `child.id()` later because Drop ordering with the Child can race.
108    child_pid: u32,
109    writer: Arc<Mutex<BufWriter<std::process::ChildStdin>>>,
110
111    /// Pending request responses, keyed by request ID.
112    pending: Arc<Mutex<PendingMap>>,
113    /// Next request ID counter.
114    next_id: AtomicI64,
115    /// Diagnostic capabilities reported by the server in its initialize response.
116    /// `None` until `initialize()` succeeds; conservative defaults thereafter
117    /// when the server doesn't advertise diagnosticProvider.
118    diagnostic_caps: Option<ServerDiagnosticCapabilities>,
119    /// Whether the server advertised static `workspace.didChangeWatchedFiles`
120    /// support during `initialize`. Dynamic registration is tracked separately
121    /// in `watched_file_registrations`; either path permits notifications.
122    /// Intentional default: `false` (conservative — requires server opt-in).
123    supports_watched_files: bool,
124    /// Dynamic `workspace/didChangeWatchedFiles` registrations requested by
125    /// the server via `client/registerCapability`. Per LSP, the client must
126    /// not send watched-file notifications merely because a server mentions
127    /// dynamic registration during initialize; a real registration is required.
128    watched_file_registrations: WatchedFileRegistrations,
129    /// Shared registry that tracks live LSP child PIDs across the process
130    /// so the signal handler can SIGKILL them on SIGTERM/SIGINT before
131    /// aft exits. Cloned via `Arc` — multiple clients share the same set.
132    child_registry: LspChildRegistry,
133    stderr_tail: Arc<Mutex<VecDeque<String>>>,
134}
135
136impl LspClient {
137    /// Spawn a new language server process and start the background reader thread.
138    ///
139    /// `child_registry` is a shared handle that records this child's PID so
140    /// the signal handler can SIGKILL it on SIGTERM/SIGINT. Tests that don't
141    /// care about signal cleanup can pass `LspChildRegistry::new()`.
142    pub fn spawn(
143        kind: ServerKind,
144        root: PathBuf,
145        binary: &Path,
146        args: &[String],
147        env: &HashMap<String, String>,
148        event_tx: Sender<LspEvent>,
149        child_registry: LspChildRegistry,
150    ) -> io::Result<Self> {
151        #[cfg(windows)]
152        let mut command = if is_windows_batch_file(binary) {
153            let mut command = Command::new("cmd.exe");
154            command.arg("/C").arg(binary.as_os_str());
155            command
156        } else {
157            Command::new(binary)
158        };
159        #[cfg(not(windows))]
160        let mut command = Command::new(binary);
161        command
162            .args(args)
163            .current_dir(&root)
164            .stdin(Stdio::piped())
165            .stdout(Stdio::piped())
166            // Drain stderr on a background thread so failed shims/crashes have
167            // actionable diagnostics without risking pipe-buffer deadlock.
168            .stderr(Stdio::piped());
169        for (key, value) in env {
170            command.env(key, value);
171        }
172
173        // Put each LSP child in its own process group so we can SIGKILL the
174        // whole group on shutdown. Critical for npm-wrapped servers like
175        // biome (`node biome lsp-proxy` spawns `cli-darwin-arm64 biome
176        // lsp-proxy` as a child); killing just the wrapper PID leaves the
177        // real server orphaned to PID 1.
178        #[cfg(unix)]
179        unsafe {
180            use std::os::unix::process::CommandExt;
181            command.pre_exec(|| {
182                #[cfg(target_os = "linux")]
183                {
184                    // If aft is killed with SIGKILL, Rust cleanup and our
185                    // signal-handler thread never run. Ask the kernel to kill
186                    // the LSP process group as soon as the parent dies. This is
187                    // best-effort Linux coverage for the otherwise unhandleable
188                    // parent-death path.
189                    if libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL) == -1 {
190                        return Err(io::Error::last_os_error());
191                    }
192                    if libc::getppid() == 1 {
193                        return Err(io::Error::other("parent died before LSP spawn completed"));
194                    }
195                }
196                if libc::setsid() == -1 {
197                    return Err(io::Error::last_os_error());
198                }
199                Ok(())
200            });
201        }
202
203        let mut child = child_registry.spawn_tracked(&mut command)?;
204        let child_pid = child.id();
205
206        let stdout = child
207            .stdout
208            .take()
209            .ok_or_else(|| io::Error::other("language server missing stdout pipe"))?;
210        let stdin = child
211            .stdin
212            .take()
213            .ok_or_else(|| io::Error::other("language server missing stdin pipe"))?;
214        let stderr = child
215            .stderr
216            .take()
217            .ok_or_else(|| io::Error::other("language server missing stderr pipe"))?;
218        let stderr_tail = Arc::new(Mutex::new(VecDeque::with_capacity(STDERR_TAIL_LINES)));
219        spawn_stderr_drain_thread(stderr, Arc::clone(&stderr_tail));
220
221        let writer = Arc::new(Mutex::new(BufWriter::new(stdin)));
222        let pending = Arc::new(Mutex::new(PendingMap::new()));
223        let watched_file_registrations = Arc::new(Mutex::new(HashSet::new()));
224        let reader_pending = Arc::clone(&pending);
225        let reader_writer = Arc::clone(&writer);
226        let reader_watched_file_registrations = Arc::clone(&watched_file_registrations);
227        let reader_kind = kind.clone();
228        let reader_root = root.clone();
229
230        thread::spawn(move || {
231            let mut reader = BufReader::new(stdout);
232            loop {
233                match transport::read_message(&mut reader) {
234                    Ok(Some(ServerMessage::Response(response))) => {
235                        if let Ok(mut guard) = reader_pending.lock() {
236                            if let Some(tx) = guard.remove(&response.id) {
237                                if tx.send(response).is_err() {
238                                    log::debug!("response channel closed");
239                                }
240                            }
241                        } else {
242                            let _ = event_tx.send(LspEvent::ServerExited {
243                                server_kind: reader_kind.clone(),
244                                root: reader_root.clone(),
245                            });
246                            break;
247                        }
248                    }
249                    Ok(Some(ServerMessage::Notification { method, params })) => {
250                        let _ = event_tx.send(LspEvent::Notification {
251                            server_kind: reader_kind.clone(),
252                            root: reader_root.clone(),
253                            method,
254                            params,
255                        });
256                    }
257                    Ok(Some(ServerMessage::Request { id, method, params })) => {
258                        record_watched_file_registration(
259                            &reader_watched_file_registrations,
260                            &method,
261                            params.as_ref(),
262                        );
263                        // Auto-respond to server requests to prevent deadlocks.
264                        // Server requests (like client/registerCapability,
265                        // window/workDoneProgress/create) block the server until
266                        // we respond. If we don't respond, the server won't send
267                        // responses to OUR pending requests → deadlock.
268                        //
269                        // Dispatch by method to return correct types:
270                        // - workspace/configuration expects Vec<Value> (one per item)
271                        // - Everything else gets null (safe default for registration/progress)
272                        let response_value = if method == "workspace/configuration" {
273                            // Return an array of null configs — one per requested item.
274                            // Servers fall back to filesystem config (tsconfig, pyrightconfig, etc.)
275                            let item_count = params
276                                .as_ref()
277                                .and_then(|p| p.get("items"))
278                                .and_then(|items| items.as_array())
279                                .map_or(1, |arr| arr.len());
280                            serde_json::Value::Array(vec![serde_json::Value::Null; item_count])
281                        } else {
282                            serde_json::Value::Null
283                        };
284                        if let Ok(mut w) = reader_writer.lock() {
285                            let response = super::jsonrpc::OutgoingResponse::success(
286                                id.clone(),
287                                response_value,
288                            );
289                            let _ = transport::write_response(&mut *w, &response);
290                        }
291                        // Also forward as event for any interested handlers
292                        let _ = event_tx.send(LspEvent::ServerRequest {
293                            server_kind: reader_kind.clone(),
294                            root: reader_root.clone(),
295                            id,
296                            method,
297                            params,
298                        });
299                    }
300                    Ok(None) | Err(_) => {
301                        if let Ok(mut guard) = reader_pending.lock() {
302                            guard.clear();
303                        }
304                        let _ = event_tx.send(LspEvent::ServerExited {
305                            server_kind: reader_kind.clone(),
306                            root: reader_root.clone(),
307                        });
308                        break;
309                    }
310                }
311            }
312        });
313
314        Ok(Self {
315            kind,
316            root,
317            state: ServerState::Starting,
318            child,
319            child_pid,
320            writer,
321            pending,
322            next_id: AtomicI64::new(1),
323            diagnostic_caps: None,
324            supports_watched_files: false,
325            watched_file_registrations,
326            child_registry,
327            stderr_tail,
328        })
329    }
330
331    /// Send the initialize request and wait for response. Transition to Ready.
332    pub fn initialize(
333        &mut self,
334        workspace_root: &Path,
335        initialization_options: Option<serde_json::Value>,
336    ) -> Result<lsp_types::InitializeResult, LspError> {
337        self.ensure_can_send()?;
338        self.state = ServerState::Initializing;
339
340        let root_url = path_to_uri(workspace_root)?;
341        let root_uri = lsp_types::Uri::from_str(root_url.as_str()).map_err(|_| {
342            LspError::NotFound(format!(
343                "failed to convert workspace root '{}' to file URI",
344                workspace_root.display()
345            ))
346        })?;
347
348        let mut params_value = json!({
349            "processId": std::process::id(),
350            "rootUri": root_uri,
351            "capabilities": {
352                "workspace": {
353                    "workspaceFolders": true,
354                    "configuration": true,
355                    "didChangeWatchedFiles": {
356                        "dynamicRegistration": true
357                    },
358                    // LSP 3.17 workspace diagnostic pull. We declare refreshSupport=false
359                    // because we drive diagnostics on-demand via pull/push and re-query
360                    // when the agent calls lsp_diagnostics again — we don't need the
361                    // server to proactively push refresh notifications.
362                    "diagnostic": {
363                        "refreshSupport": false
364                    }
365                },
366                "textDocument": {
367                    "synchronization": {
368                        "dynamicRegistration": false,
369                        "didSave": true,
370                        "willSave": false,
371                        "willSaveWaitUntil": false
372                    },
373                    "publishDiagnostics": {
374                        "relatedInformation": true,
375                        "versionSupport": true,
376                        "codeDescriptionSupport": true,
377                        "dataSupport": true
378                    },
379                    // LSP 3.17 textDocument diagnostic pull. dynamicRegistration=false
380                    // because we use static capability discovery from the InitializeResult.
381                    // relatedDocumentSupport=true to receive cascading diagnostics for
382                    // files that became known while analyzing the requested one.
383                    "diagnostic": {
384                        "dynamicRegistration": false,
385                        "relatedDocumentSupport": true
386                    }
387                }
388            },
389            "clientInfo": {
390                "name": "aft",
391                "version": env!("CARGO_PKG_VERSION")
392            },
393            "workspaceFolders": [
394                {
395                    "uri": root_uri,
396                    "name": workspace_root
397                        .file_name()
398                        .and_then(|name| name.to_str())
399                        .unwrap_or("workspace")
400                }
401            ]
402        });
403        if let Some(initialization_options) = initialization_options {
404            params_value["initializationOptions"] = initialization_options;
405        }
406
407        let params = serde_json::from_value::<lsp_types::InitializeParams>(params_value)?;
408
409        let result_value = self.send_request_value_with_timeout(
410            <lsp_types::request::Initialize as lsp_types::request::Request>::METHOD,
411            params,
412            HANDSHAKE_REQUEST_TIMEOUT,
413        )?;
414        let result: lsp_types::InitializeResult = serde_json::from_value(result_value.clone())?;
415
416        // Capture diagnostic capabilities from the initialize response. We parse
417        // from a re-serialized JSON Value because the lsp-types crate's
418        // diagnostic_provider strict variants reject some shapes real servers
419        // emit (e.g. bare `true`), and we want defensive Default fallback.
420        let caps_value = result_value
421            .get("capabilities")
422            .cloned()
423            .unwrap_or_else(|| serde_json::to_value(&result.capabilities).unwrap_or(Value::Null));
424        self.diagnostic_caps = Some(parse_diagnostic_capabilities(&caps_value));
425
426        // Capture initialize-time (static) workspace/didChangeWatchedFiles
427        // support. Runtime client/registerCapability subscriptions are recorded
428        // separately by the reader thread. Missing capability is unsupported by
429        // default; callers must not send notifications unless one of those two
430        // server opt-in paths is present.
431        self.supports_watched_files = caps_value
432            .pointer("/workspace/didChangeWatchedFiles/dynamicRegistration")
433            .and_then(|v| v.as_bool())
434            .unwrap_or(false)
435            || caps_value
436                .pointer("/workspace/didChangeWatchedFiles")
437                .map(|v| v.is_object() || v.as_bool() == Some(true))
438                .unwrap_or(false);
439
440        self.send_notification::<lsp_types::notification::Initialized>(serde_json::from_value(
441            json!({}),
442        )?)?;
443        self.state = ServerState::Ready;
444        Ok(result)
445    }
446
447    /// Diagnostic capabilities advertised by the server. Returns `None` until
448    /// `initialize()` has succeeded; returns `Some` with conservative defaults
449    /// (all `false`) when the server didn't advertise diagnosticProvider.
450    pub fn diagnostic_capabilities(&self) -> Option<&ServerDiagnosticCapabilities> {
451        self.diagnostic_caps.as_ref()
452    }
453
454    /// Whether the server advertised initialize-time
455    /// `workspace/didChangeWatchedFiles` support. Dynamic registrations are
456    /// reported by `has_watched_file_registration()`.
457    pub fn supports_watched_files(&self) -> bool {
458        self.supports_watched_files
459    }
460
461    /// Whether this server currently has an active dynamic watched-file
462    /// registration. This, not the initialize-time capability shape, controls
463    /// whether `workspace/didChangeWatchedFiles` may be sent.
464    pub fn has_watched_file_registration(&self) -> bool {
465        self.watched_file_registrations
466            .lock()
467            .map(|registrations| !registrations.is_empty())
468            .unwrap_or(false)
469    }
470
471    /// Send a request and wait for the response.
472    pub fn send_request<R>(&mut self, params: R::Params) -> Result<R::Result, LspError>
473    where
474        R: lsp_types::request::Request,
475        R::Params: serde::Serialize,
476        R::Result: DeserializeOwned,
477    {
478        self.ensure_can_send()?;
479
480        let value = self.send_request_value(R::METHOD, params)?;
481        serde_json::from_value(value).map_err(Into::into)
482    }
483
484    /// Send a request and wait up to `timeout` for the response. If the local
485    /// deadline expires, remove the pending response handler and notify the
486    /// server with `$/cancelRequest` so it can stop work.
487    pub fn send_request_with_timeout<R>(
488        &mut self,
489        params: R::Params,
490        timeout: Duration,
491    ) -> Result<R::Result, LspError>
492    where
493        R: lsp_types::request::Request,
494        R::Params: serde::Serialize,
495        R::Result: DeserializeOwned,
496    {
497        self.ensure_can_send()?;
498
499        let value = self.send_request_value_with_timeout(R::METHOD, params, timeout)?;
500        serde_json::from_value(value).map_err(Into::into)
501    }
502
503    fn send_request_value<P>(&mut self, method: &'static str, params: P) -> Result<Value, LspError>
504    where
505        P: serde::Serialize,
506    {
507        self.send_request_value_with_timeout(method, params, INTERACTIVE_REQUEST_TIMEOUT)
508    }
509
510    fn send_request_value_with_timeout<P>(
511        &mut self,
512        method: &'static str,
513        params: P,
514        timeout: Duration,
515    ) -> Result<Value, LspError>
516    where
517        P: serde::Serialize,
518    {
519        self.ensure_can_send()?;
520
521        let id = RequestId::Int(self.next_id.fetch_add(1, Ordering::Relaxed));
522        let (tx, rx) = bounded(1);
523        {
524            let mut pending = self.lock_pending()?;
525            pending.insert(id.clone(), tx);
526        }
527
528        let request = Request::new(id.clone(), method, Some(serde_json::to_value(params)?));
529        {
530            let mut writer = self
531                .writer
532                .lock()
533                .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
534            if let Err(err) = transport::write_request(&mut *writer, &request) {
535                self.remove_pending(&id);
536                return Err(err.into());
537            }
538        }
539
540        let response = match rx.recv_timeout(timeout) {
541            Ok(response) => response,
542            Err(RecvTimeoutError::Timeout) => {
543                self.remove_pending(&id);
544                self.send_cancel_request(&id)?;
545                return Err(LspError::Timeout(format!(
546                    "timed out waiting for '{}' response from {:?}",
547                    method, self.kind
548                )));
549            }
550            Err(RecvTimeoutError::Disconnected) => {
551                self.remove_pending(&id);
552                return Err(LspError::ServerNotReady(format!(
553                    "language server {:?} disconnected while waiting for '{}'",
554                    self.kind, method
555                )));
556            }
557        };
558
559        if let Some(error) = response.error {
560            return Err(LspError::ServerError {
561                code: error.code,
562                message: error.message,
563            });
564        }
565
566        Ok(response.result.unwrap_or(Value::Null))
567    }
568
569    /// Send a notification (fire-and-forget).
570    pub fn send_notification<N>(&mut self, params: N::Params) -> Result<(), LspError>
571    where
572        N: lsp_types::notification::Notification,
573        N::Params: serde::Serialize,
574    {
575        self.ensure_can_send()?;
576        let notification = Notification::new(N::METHOD, Some(serde_json::to_value(params)?));
577        let mut writer = self
578            .writer
579            .lock()
580            .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
581        transport::write_notification(&mut *writer, &notification)?;
582        Ok(())
583    }
584
585    /// Graceful shutdown: send shutdown request, then exit notification.
586    pub fn shutdown(&mut self) -> Result<(), LspError> {
587        if self.state == ServerState::Exited {
588            self.child_registry.untrack(self.child_pid);
589            return Ok(());
590        }
591
592        if self.child.try_wait()?.is_some() {
593            self.state = ServerState::Exited;
594            self.child_registry.untrack(self.child_pid);
595            return Ok(());
596        }
597
598        if let Err(err) = self.send_request_with_timeout::<lsp_types::request::Shutdown>(
599            (),
600            HANDSHAKE_REQUEST_TIMEOUT,
601        ) {
602            self.state = ServerState::ShuttingDown;
603            if self.child.try_wait()?.is_some() {
604                self.state = ServerState::Exited;
605                return Ok(());
606            }
607            return Err(err);
608        }
609
610        self.state = ServerState::ShuttingDown;
611
612        if let Err(err) = self.send_notification::<lsp_types::notification::Exit>(()) {
613            if self.child.try_wait()?.is_some() {
614                self.state = ServerState::Exited;
615                return Ok(());
616            }
617            return Err(err);
618        }
619
620        let deadline = Instant::now() + SHUTDOWN_TIMEOUT;
621        loop {
622            if self.child.try_wait()?.is_some() {
623                self.state = ServerState::Exited;
624                return Ok(());
625            }
626            if Instant::now() >= deadline {
627                // Kill the entire process group, not just the wrapper PID, so
628                // npm-wrapped servers (biome's `node biome lsp-proxy` spawns
629                // a separate cli-darwin-arm64 child) don't leak orphans.
630                kill_lsp_child_group(&mut self.child);
631                self.state = ServerState::Exited;
632                return Err(LspError::Timeout(format!(
633                    "timed out waiting for {:?} to exit",
634                    self.kind
635                )));
636            }
637            thread::sleep(EXIT_POLL_INTERVAL);
638        }
639    }
640
641    pub fn stderr_tail(&self) -> String {
642        self.stderr_tail
643            .lock()
644            .map(|tail| stderr_tail_to_string(&tail))
645            .unwrap_or_default()
646    }
647
648    pub fn child_exited(&mut self) -> bool {
649        self.child.try_wait().ok().flatten().is_some()
650    }
651
652    pub fn child_exit_status(&mut self) -> Option<std::process::ExitStatus> {
653        self.child.try_wait().ok().flatten()
654    }
655
656    pub fn state(&self) -> ServerState {
657        self.state
658    }
659
660    pub fn kind(&self) -> ServerKind {
661        self.kind.clone()
662    }
663
664    pub fn root(&self) -> &Path {
665        &self.root
666    }
667
668    fn ensure_can_send(&self) -> Result<(), LspError> {
669        if matches!(self.state, ServerState::ShuttingDown | ServerState::Exited) {
670            return Err(LspError::ServerNotReady(format!(
671                "language server {:?} is not ready (state: {:?})",
672                self.kind, self.state
673            )));
674        }
675        Ok(())
676    }
677
678    fn lock_pending(&self) -> Result<std::sync::MutexGuard<'_, PendingMap>, LspError> {
679        self.pending
680            .lock()
681            .map_err(|_| io::Error::other("pending response map poisoned").into())
682    }
683
684    fn remove_pending(&self, id: &RequestId) {
685        if let Ok(mut pending) = self.pending.lock() {
686            pending.remove(id);
687        }
688    }
689
690    fn send_cancel_request(&mut self, id: &RequestId) -> Result<(), LspError> {
691        let notification = Notification::new("$/cancelRequest", Some(json!({ "id": id })));
692        let mut writer = self
693            .writer
694            .lock()
695            .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
696        transport::write_notification(&mut *writer, &notification)?;
697        Ok(())
698    }
699}
700
701impl Drop for LspClient {
702    fn drop(&mut self) {
703        // Untrack first so the signal handler can't race with this kill and
704        // try to SIGKILL a PID that's already been reaped.
705        self.child_registry.untrack(self.child_pid);
706        kill_lsp_child_group(&mut self.child);
707    }
708}
709
710fn spawn_stderr_drain_thread(
711    stderr: std::process::ChildStderr,
712    stderr_tail: Arc<Mutex<VecDeque<String>>>,
713) {
714    thread::spawn(move || {
715        let mut reader = BufReader::new(stderr);
716        let mut line = String::new();
717
718        loop {
719            line.clear();
720            match reader.read_line(&mut line) {
721                Ok(0) => break,
722                Ok(_) => {
723                    if let Ok(mut tail) = stderr_tail.lock() {
724                        append_stderr_tail(&mut tail, &line);
725                    } else {
726                        break;
727                    }
728                }
729                Err(_) => break,
730            }
731        }
732    });
733}
734
735fn append_stderr_tail(tail: &mut VecDeque<String>, line: &str) {
736    if tail.len() == STDERR_TAIL_LINES {
737        tail.pop_front();
738    }
739    tail.push_back(trim_stderr_line(line));
740}
741
742fn trim_stderr_line(line: &str) -> String {
743    let line = line.trim_end_matches(|ch| ch == '\r' || ch == '\n');
744    if line.len() <= STDERR_LINE_BYTES {
745        return line.to_string();
746    }
747
748    let mut start = line.len() - STDERR_LINE_BYTES;
749    while start < line.len() && !line.is_char_boundary(start) {
750        start += 1;
751    }
752    format!("...{}", &line[start..])
753}
754
755fn stderr_tail_to_string(tail: &VecDeque<String>) -> String {
756    tail.iter()
757        .map(String::as_str)
758        .collect::<Vec<_>>()
759        .join("\n")
760}
761
762/// Force-terminate an LSP child and its entire process group on Unix.
763/// On Windows, `taskkill /F /T` kills the process tree.
764///
765/// Necessary because some LSP servers ship as npm-installed Node shims that
766/// spawn the real binary as a child. Killing only the wrapper PID leaves the
767/// real server orphaned to PID 1 and accumulates over time.
768fn kill_lsp_child_group(child: &mut std::process::Child) {
769    #[cfg(unix)]
770    {
771        let pgid = child.id() as i32;
772        crate::bash_background::process::terminate_pgid(pgid, Some(child));
773        let _ = child.wait();
774    }
775    #[cfg(not(unix))]
776    {
777        crate::bash_background::process::terminate_process(child);
778        let _ = child.wait();
779    }
780}
781
782fn record_watched_file_registration(
783    registrations: &WatchedFileRegistrations,
784    method: &str,
785    params: Option<&Value>,
786) {
787    match method {
788        "client/registerCapability" => {
789            let Some(items) = params
790                .and_then(|params| params.get("registrations"))
791                .and_then(|registrations| registrations.as_array())
792            else {
793                return;
794            };
795            if let Ok(mut guard) = registrations.lock() {
796                for item in items {
797                    if item.get("method").and_then(Value::as_str)
798                        == Some("workspace/didChangeWatchedFiles")
799                    {
800                        if let Some(id) = item.get("id").and_then(Value::as_str) {
801                            guard.insert(id.to_string());
802                        }
803                    }
804                }
805            }
806        }
807        "client/unregisterCapability" => {
808            let Some(items) = params
809                .and_then(|params| params.get("unregisterations"))
810                .and_then(|registrations| registrations.as_array())
811            else {
812                return;
813            };
814            if let Ok(mut guard) = registrations.lock() {
815                for item in items {
816                    if item.get("method").and_then(Value::as_str)
817                        == Some("workspace/didChangeWatchedFiles")
818                    {
819                        if let Some(id) = item.get("id").and_then(Value::as_str) {
820                            guard.remove(id);
821                        }
822                    }
823                }
824            }
825        }
826        _ => {}
827    }
828}
829
830/// Parse `ServerDiagnosticCapabilities` from a re-serialized
831/// `ServerCapabilities` JSON value.
832///
833/// LSP 3.17 spec for `diagnosticProvider`:
834/// - `capabilities.diagnosticProvider` may be absent (no pull support),
835///   `DiagnosticOptions`, or `DiagnosticRegistrationOptions`.
836/// - If present:
837///   - `interFileDependencies: bool` (we don't currently use this)
838///   - `workspaceDiagnostics: bool` → workspace pull support
839///   - `identifier?: string` → optional identifier scoping result IDs
840///
841/// We parse the raw JSON Value defensively: presence of any
842/// `diagnosticProvider` value (object or `true`) means the server supports
843/// at least `textDocument/diagnostic` pull.
844fn parse_diagnostic_capabilities(value: &Value) -> ServerDiagnosticCapabilities {
845    let mut caps = ServerDiagnosticCapabilities::default();
846
847    if let Some(provider) = value.get("diagnosticProvider") {
848        // diagnosticProvider can be `true` (rare) or an object. Treat both as
849        // pull_diagnostics support.
850        if provider.is_object() || provider.as_bool() == Some(true) {
851            caps.pull_diagnostics = true;
852        }
853
854        if let Some(obj) = provider.as_object() {
855            if obj
856                .get("workspaceDiagnostics")
857                .and_then(|v| v.as_bool())
858                .unwrap_or(false)
859            {
860                caps.workspace_diagnostics = true;
861            }
862            if let Some(identifier) = obj.get("identifier").and_then(|v| v.as_str()) {
863                caps.identifier = Some(identifier.to_string());
864            }
865        }
866    }
867
868    // Workspace diagnostic refresh (rare — most servers don't request this,
869    // and we declared refreshSupport=false in our client capabilities anyway).
870    if let Some(refresh) = value
871        .get("workspace")
872        .and_then(|w| w.get("diagnostic"))
873        .and_then(|d| d.get("refreshSupport"))
874        .and_then(|r| r.as_bool())
875    {
876        caps.refresh_support = refresh;
877    }
878
879    caps
880}
881
882#[cfg(test)]
883mod tests {
884    use super::*;
885
886    #[test]
887    fn parse_caps_no_diagnostic_provider() {
888        let value = json!({});
889        let caps = parse_diagnostic_capabilities(&value);
890        assert!(!caps.pull_diagnostics);
891        assert!(!caps.workspace_diagnostics);
892        assert!(caps.identifier.is_none());
893    }
894
895    #[test]
896    fn parse_caps_basic_pull_only() {
897        let value = json!({
898            "diagnosticProvider": {
899                "interFileDependencies": false,
900                "workspaceDiagnostics": false
901            }
902        });
903        let caps = parse_diagnostic_capabilities(&value);
904        assert!(caps.pull_diagnostics);
905        assert!(!caps.workspace_diagnostics);
906    }
907
908    #[test]
909    fn parse_caps_full_pull_with_workspace() {
910        let value = json!({
911            "diagnosticProvider": {
912                "interFileDependencies": true,
913                "workspaceDiagnostics": true,
914                "identifier": "rust-analyzer"
915            }
916        });
917        let caps = parse_diagnostic_capabilities(&value);
918        assert!(caps.pull_diagnostics);
919        assert!(caps.workspace_diagnostics);
920        assert_eq!(caps.identifier.as_deref(), Some("rust-analyzer"));
921    }
922
923    #[test]
924    fn parse_caps_provider_as_bare_true() {
925        // LSP 3.17 allows DiagnosticOptions OR boolean — treat true as pull_diagnostics
926        let value = json!({
927            "diagnosticProvider": true
928        });
929        let caps = parse_diagnostic_capabilities(&value);
930        assert!(caps.pull_diagnostics);
931        assert!(!caps.workspace_diagnostics);
932    }
933
934    #[test]
935    fn interactive_request_timeout_is_eight_seconds() {
936        assert_eq!(INTERACTIVE_REQUEST_TIMEOUT, Duration::from_secs(8));
937    }
938
939    #[test]
940    fn handshake_request_timeout_remains_thirty_seconds() {
941        assert_eq!(HANDSHAKE_REQUEST_TIMEOUT, Duration::from_secs(30));
942    }
943
944    #[test]
945    fn parse_caps_workspace_refresh_support() {
946        let value = json!({
947            "workspace": {
948                "diagnostic": {
949                    "refreshSupport": true
950                }
951            }
952        });
953        let caps = parse_diagnostic_capabilities(&value);
954        assert!(caps.refresh_support);
955        // No diagnosticProvider → pull still false
956        assert!(!caps.pull_diagnostics);
957    }
958}