Skip to main content

aft/lsp/
client.rs

1use std::collections::HashMap;
2use std::io::{self, BufReader, BufWriter};
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicI64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use crossbeam_channel::{bounded, RecvTimeoutError, Sender};
12use serde::de::DeserializeOwned;
13use serde_json::{json, Value};
14
15use crate::lsp::jsonrpc::{
16    Notification, Request, RequestId, Response as JsonRpcResponse, ServerMessage,
17};
18use crate::lsp::registry::ServerKind;
19use crate::lsp::{transport, LspError};
20
21const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
22const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
23const EXIT_POLL_INTERVAL: Duration = Duration::from_millis(25);
24
25type PendingMap = HashMap<RequestId, Sender<JsonRpcResponse>>;
26
27/// Lifecycle state of a language server.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum ServerState {
30    Starting,
31    Initializing,
32    Ready,
33    ShuttingDown,
34    Exited,
35}
36
37/// Events sent from background reader threads into the main loop.
38#[derive(Debug)]
39pub enum LspEvent {
40    /// Server sent a notification (e.g. publishDiagnostics).
41    Notification {
42        server_kind: ServerKind,
43        root: PathBuf,
44        method: String,
45        params: Option<Value>,
46    },
47    /// Server sent a request (e.g. workspace/configuration).
48    ServerRequest {
49        server_kind: ServerKind,
50        root: PathBuf,
51        id: RequestId,
52        method: String,
53        params: Option<Value>,
54    },
55    /// Server process exited or the transport stream closed.
56    ServerExited {
57        server_kind: ServerKind,
58        root: PathBuf,
59    },
60}
61
62/// A client connected to one language server process.
63pub struct LspClient {
64    kind: ServerKind,
65    root: PathBuf,
66    state: ServerState,
67    child: Child,
68    writer: Arc<Mutex<BufWriter<std::process::ChildStdin>>>,
69
70    /// Pending request responses, keyed by request ID.
71    pending: Arc<Mutex<PendingMap>>,
72    /// Next request ID counter.
73    next_id: AtomicI64,
74}
75
76impl LspClient {
77    /// Spawn a new language server process and start the background reader thread.
78    pub fn spawn(
79        kind: ServerKind,
80        root: PathBuf,
81        binary: &Path,
82        args: &[&str],
83        event_tx: Sender<LspEvent>,
84    ) -> io::Result<Self> {
85        let mut child = Command::new(binary)
86            .args(args)
87            .current_dir(&root)
88            .stdin(Stdio::piped())
89            .stdout(Stdio::piped())
90            .stderr(Stdio::piped())
91            .spawn()?;
92
93        let stdout = child
94            .stdout
95            .take()
96            .ok_or_else(|| io::Error::other("language server missing stdout pipe"))?;
97        let stdin = child
98            .stdin
99            .take()
100            .ok_or_else(|| io::Error::other("language server missing stdin pipe"))?;
101
102        let writer = Arc::new(Mutex::new(BufWriter::new(stdin)));
103        let pending = Arc::new(Mutex::new(PendingMap::new()));
104        let reader_pending = Arc::clone(&pending);
105        let reader_writer = Arc::clone(&writer);
106        let reader_kind = kind;
107        let reader_root = root.clone();
108    
109
110        thread::spawn(move || {
111            let mut reader = BufReader::new(stdout);
112            loop {
113                match transport::read_message(&mut reader) {
114                    Ok(Some(ServerMessage::Response(response))) => {
115                        if let Ok(mut guard) = reader_pending.lock() {
116                            if let Some(tx) = guard.remove(&response.id) {
117                                let _ = tx.send(response);
118                            }
119                        } else {
120                            let _ = event_tx.send(LspEvent::ServerExited {
121                                server_kind: reader_kind,
122                                root: reader_root.clone(),
123                            });
124                            break;
125                        }
126                    }
127                    Ok(Some(ServerMessage::Notification { method, params })) => {
128                        let _ = event_tx.send(LspEvent::Notification {
129                            server_kind: reader_kind,
130                            root: reader_root.clone(),
131                            method,
132                            params,
133                        });
134                    }
135                    Ok(Some(ServerMessage::Request { id, method, params })) => {
136                        // Auto-respond to server requests to prevent deadlocks.
137                        // Server requests (like client/registerCapability,
138                        // window/workDoneProgress/create) block the server until
139                        // we respond. If we don't respond, the server won't send
140                        // responses to OUR pending requests → deadlock.
141                        if let Ok(mut w) = reader_writer.lock() {
142                            let response = super::jsonrpc::OutgoingResponse::success(
143                                id.clone(),
144                                serde_json::Value::Null,
145                            );
146                            let _ = transport::write_response(&mut *w, &response);
147                        }
148                        // Also forward as event for any interested handlers
149                        let _ = event_tx.send(LspEvent::ServerRequest {
150                            server_kind: reader_kind,
151                            root: reader_root.clone(),
152                            id,
153                            method,
154                            params,
155                        });
156                    }
157                    Ok(None) | Err(_) => {
158                        if let Ok(mut guard) = reader_pending.lock() {
159                            guard.clear();
160                        }
161                        let _ = event_tx.send(LspEvent::ServerExited {
162                            server_kind: reader_kind,
163                            root: reader_root.clone(),
164                        });
165                        break;
166                    }
167                }
168            }
169        });
170
171        Ok(Self {
172            kind,
173            root,
174            state: ServerState::Starting,
175            child,
176            writer,
177            pending,
178            next_id: AtomicI64::new(1),
179        })
180    }
181
182    /// Send the initialize request and wait for response. Transition to Ready.
183    pub fn initialize(
184        &mut self,
185        workspace_root: &Path,
186    ) -> Result<lsp_types::InitializeResult, LspError> {
187        self.ensure_can_send()?;
188        self.state = ServerState::Initializing;
189
190        let root_uri = lsp_types::Uri::from_str(&format!("file://{}", workspace_root.display()))
191            .map_err(|_| {
192                LspError::NotFound(format!(
193                    "failed to convert workspace root '{}' to file URI",
194                    workspace_root.display()
195                ))
196            })?;
197
198        let params = serde_json::from_value::<lsp_types::InitializeParams>(json!({
199            "processId": std::process::id(),
200            "rootUri": root_uri,
201            "capabilities": {
202                "workspace": {
203                    "workspaceFolders": true,
204                    "configuration": true
205                },
206                "textDocument": {
207                    "synchronization": {
208                        "dynamicRegistration": false,
209                        "didSave": true,
210                        "willSave": false,
211                        "willSaveWaitUntil": false
212                    },
213                    "publishDiagnostics": {
214                        "relatedInformation": true,
215                        "versionSupport": true,
216                        "codeDescriptionSupport": true,
217                        "dataSupport": true
218                    }
219                }
220            },
221            "clientInfo": {
222                "name": "aft",
223                "version": env!("CARGO_PKG_VERSION")
224            },
225            "workspaceFolders": [
226                {
227                    "uri": root_uri,
228                    "name": workspace_root
229                        .file_name()
230                        .and_then(|name| name.to_str())
231                        .unwrap_or("workspace")
232                }
233            ]
234        }))?;
235
236        let result = self.send_request::<lsp_types::request::Initialize>(params)?;
237        self.send_notification::<lsp_types::notification::Initialized>(serde_json::from_value(
238            json!({}),
239        )?)?;
240        self.state = ServerState::Ready;
241        Ok(result)
242    }
243
244    /// Send a request and wait for the response.
245    pub fn send_request<R>(&mut self, params: R::Params) -> Result<R::Result, LspError>
246    where
247        R: lsp_types::request::Request,
248        R::Params: serde::Serialize,
249        R::Result: DeserializeOwned,
250    {
251        self.ensure_can_send()?;
252
253        let id = RequestId::Int(self.next_id.fetch_add(1, Ordering::Relaxed));
254        let (tx, rx) = bounded(1);
255        {
256            let mut pending = self.lock_pending()?;
257            pending.insert(id.clone(), tx);
258        }
259
260        let request = Request::new(id.clone(), R::METHOD, Some(serde_json::to_value(params)?));
261        {
262            let mut writer = self
263                .writer
264                .lock()
265                .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
266            if let Err(err) = transport::write_request(&mut *writer, &request) {
267                self.remove_pending(&id);
268                return Err(err.into());
269            }
270        }
271
272        let response = match rx.recv_timeout(REQUEST_TIMEOUT) {
273            Ok(response) => response,
274            Err(RecvTimeoutError::Timeout) => {
275                self.remove_pending(&id);
276                return Err(LspError::Timeout(format!(
277                    "timed out waiting for '{}' response from {:?}",
278                    R::METHOD,
279                    self.kind
280                )));
281            }
282            Err(RecvTimeoutError::Disconnected) => {
283                self.remove_pending(&id);
284                return Err(LspError::ServerNotReady(format!(
285                    "language server {:?} disconnected while waiting for '{}'",
286                    self.kind,
287                    R::METHOD
288                )));
289            }
290        };
291
292        if let Some(error) = response.error {
293            return Err(LspError::ServerError {
294                code: error.code,
295                message: error.message,
296            });
297        }
298
299        serde_json::from_value(response.result.unwrap_or(Value::Null)).map_err(Into::into)
300    }
301
302    /// Send a notification (fire-and-forget).
303    pub fn send_notification<N>(&mut self, params: N::Params) -> Result<(), LspError>
304    where
305        N: lsp_types::notification::Notification,
306        N::Params: serde::Serialize,
307    {
308        self.ensure_can_send()?;
309        let notification = Notification::new(N::METHOD, Some(serde_json::to_value(params)?));
310        let mut writer = self
311            .writer
312            .lock()
313            .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
314        transport::write_notification(&mut *writer, &notification)?;
315        Ok(())
316    }
317
318    /// Graceful shutdown: send shutdown request, then exit notification.
319    pub fn shutdown(&mut self) -> Result<(), LspError> {
320        if self.state == ServerState::Exited {
321            return Ok(());
322        }
323
324        if self.child.try_wait()?.is_some() {
325            self.state = ServerState::Exited;
326            return Ok(());
327        }
328
329        if let Err(err) = self.send_request::<lsp_types::request::Shutdown>(()) {
330            self.state = ServerState::ShuttingDown;
331            if self.child.try_wait()?.is_some() {
332                self.state = ServerState::Exited;
333                return Ok(());
334            }
335            return Err(err);
336        }
337
338        self.state = ServerState::ShuttingDown;
339
340        if let Err(err) = self.send_notification::<lsp_types::notification::Exit>(()) {
341            if self.child.try_wait()?.is_some() {
342                self.state = ServerState::Exited;
343                return Ok(());
344            }
345            return Err(err);
346        }
347
348        let deadline = Instant::now() + SHUTDOWN_TIMEOUT;
349        loop {
350            if self.child.try_wait()?.is_some() {
351                self.state = ServerState::Exited;
352                return Ok(());
353            }
354            if Instant::now() >= deadline {
355                return Err(LspError::Timeout(format!(
356                    "timed out waiting for {:?} to exit",
357                    self.kind
358                )));
359            }
360            thread::sleep(EXIT_POLL_INTERVAL);
361        }
362    }
363
364    pub fn state(&self) -> ServerState {
365        self.state
366    }
367
368    pub fn kind(&self) -> ServerKind {
369        self.kind
370    }
371
372    pub fn root(&self) -> &Path {
373        &self.root
374    }
375
376    fn ensure_can_send(&self) -> Result<(), LspError> {
377        if matches!(self.state, ServerState::ShuttingDown | ServerState::Exited) {
378            return Err(LspError::ServerNotReady(format!(
379                "language server {:?} is not ready (state: {:?})",
380                self.kind, self.state
381            )));
382        }
383        Ok(())
384    }
385
386    fn lock_pending(&self) -> Result<std::sync::MutexGuard<'_, PendingMap>, LspError> {
387        self.pending
388            .lock()
389            .map_err(|_| io::Error::other("pending response map poisoned").into())
390    }
391
392    fn remove_pending(&self, id: &RequestId) {
393        if let Ok(mut pending) = self.pending.lock() {
394            pending.remove(id);
395        }
396    }
397}