Skip to main content

aft/lsp/
client.rs

1use std::collections::HashMap;
2use std::io::{self, BufReader, BufWriter};
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicI64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
12use serde::de::DeserializeOwned;
13use serde_json::{json, Value};
14
15use crate::lsp::jsonrpc::{
16    Notification, Request, RequestId, Response as JsonRpcResponse, ServerMessage,
17};
18use crate::lsp::registry::ServerKind;
19use crate::lsp::{transport, LspError};
20
21const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
22const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
23const EXIT_POLL_INTERVAL: Duration = Duration::from_millis(25);
24
25type PendingMap = HashMap<RequestId, Sender<JsonRpcResponse>>;
26
27/// Lifecycle state of a language server.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum ServerState {
30    Starting,
31    Initializing,
32    Ready,
33    ShuttingDown,
34    Exited,
35}
36
37/// Events sent from background reader threads into the main loop.
38#[derive(Debug)]
39pub enum LspEvent {
40    /// Server sent a notification (e.g. publishDiagnostics).
41    Notification {
42        server_kind: ServerKind,
43        root: PathBuf,
44        method: String,
45        params: Option<Value>,
46    },
47    /// Server sent a request (e.g. workspace/configuration).
48    ServerRequest {
49        server_kind: ServerKind,
50        root: PathBuf,
51        id: RequestId,
52        method: String,
53        params: Option<Value>,
54    },
55    /// Server process exited or the transport stream closed.
56    ServerExited {
57        server_kind: ServerKind,
58        root: PathBuf,
59    },
60}
61
62/// A client connected to one language server process.
63pub struct LspClient {
64    kind: ServerKind,
65    root: PathBuf,
66    state: ServerState,
67    child: Child,
68    writer: Arc<Mutex<BufWriter<std::process::ChildStdin>>>,
69    /// Channel kept on the client to reflect the per-server event stream.
70    /// Reader threads send into the manager's shared event channel.
71    #[allow(dead_code)]
72    event_rx: Receiver<LspEvent>,
73    /// Pending request responses, keyed by request ID.
74    pending: Arc<Mutex<PendingMap>>,
75    /// Next request ID counter.
76    next_id: AtomicI64,
77}
78
79impl LspClient {
80    /// Spawn a new language server process and start the background reader thread.
81    pub fn spawn(
82        kind: ServerKind,
83        root: PathBuf,
84        binary: &Path,
85        args: &[&str],
86        event_tx: Sender<LspEvent>,
87    ) -> io::Result<Self> {
88        let mut child = Command::new(binary)
89            .args(args)
90            .current_dir(&root)
91            .stdin(Stdio::piped())
92            .stdout(Stdio::piped())
93            .stderr(Stdio::piped())
94            .spawn()?;
95
96        let stdout = child
97            .stdout
98            .take()
99            .ok_or_else(|| io::Error::other("language server missing stdout pipe"))?;
100        let stdin = child
101            .stdin
102            .take()
103            .ok_or_else(|| io::Error::other("language server missing stdin pipe"))?;
104
105        let writer = Arc::new(Mutex::new(BufWriter::new(stdin)));
106        let pending = Arc::new(Mutex::new(PendingMap::new()));
107        let reader_pending = Arc::clone(&pending);
108        let reader_writer = Arc::clone(&writer);
109        let reader_kind = kind;
110        let reader_root = root.clone();
111        let (_client_event_tx, event_rx) = crossbeam_channel::unbounded();
112
113        thread::spawn(move || {
114            let mut reader = BufReader::new(stdout);
115            loop {
116                match transport::read_message(&mut reader) {
117                    Ok(Some(ServerMessage::Response(response))) => {
118                        if let Ok(mut guard) = reader_pending.lock() {
119                            if let Some(tx) = guard.remove(&response.id) {
120                                let _ = tx.send(response);
121                            }
122                        } else {
123                            let _ = event_tx.send(LspEvent::ServerExited {
124                                server_kind: reader_kind,
125                                root: reader_root.clone(),
126                            });
127                            break;
128                        }
129                    }
130                    Ok(Some(ServerMessage::Notification { method, params })) => {
131                        let _ = event_tx.send(LspEvent::Notification {
132                            server_kind: reader_kind,
133                            root: reader_root.clone(),
134                            method,
135                            params,
136                        });
137                    }
138                    Ok(Some(ServerMessage::Request { id, method, params })) => {
139                        // Auto-respond to server requests to prevent deadlocks.
140                        // Server requests (like client/registerCapability,
141                        // window/workDoneProgress/create) block the server until
142                        // we respond. If we don't respond, the server won't send
143                        // responses to OUR pending requests → deadlock.
144                        if let Ok(mut w) = reader_writer.lock() {
145                            let response = super::jsonrpc::OutgoingResponse::success(
146                                id.clone(),
147                                serde_json::Value::Null,
148                            );
149                            let _ = transport::write_response(&mut *w, &response);
150                        }
151                        // Also forward as event for any interested handlers
152                        let _ = event_tx.send(LspEvent::ServerRequest {
153                            server_kind: reader_kind,
154                            root: reader_root.clone(),
155                            id,
156                            method,
157                            params,
158                        });
159                    }
160                    Ok(None) | Err(_) => {
161                        if let Ok(mut guard) = reader_pending.lock() {
162                            guard.clear();
163                        }
164                        let _ = event_tx.send(LspEvent::ServerExited {
165                            server_kind: reader_kind,
166                            root: reader_root.clone(),
167                        });
168                        break;
169                    }
170                }
171            }
172        });
173
174        Ok(Self {
175            kind,
176            root,
177            state: ServerState::Starting,
178            child,
179            writer,
180            event_rx,
181            pending,
182            next_id: AtomicI64::new(1),
183        })
184    }
185
186    /// Send the initialize request and wait for response. Transition to Ready.
187    pub fn initialize(
188        &mut self,
189        workspace_root: &Path,
190    ) -> Result<lsp_types::InitializeResult, LspError> {
191        self.ensure_can_send()?;
192        self.state = ServerState::Initializing;
193
194        let root_uri = lsp_types::Uri::from_str(&format!("file://{}", workspace_root.display()))
195            .map_err(|_| {
196                LspError::NotFound(format!(
197                    "failed to convert workspace root '{}' to file URI",
198                    workspace_root.display()
199                ))
200            })?;
201
202        let params = serde_json::from_value::<lsp_types::InitializeParams>(json!({
203            "processId": std::process::id(),
204            "rootUri": root_uri,
205            "capabilities": {
206                "workspace": {
207                    "workspaceFolders": true,
208                    "configuration": true
209                },
210                "textDocument": {
211                    "synchronization": {
212                        "dynamicRegistration": false,
213                        "didSave": true,
214                        "willSave": false,
215                        "willSaveWaitUntil": false
216                    },
217                    "publishDiagnostics": {
218                        "relatedInformation": true,
219                        "versionSupport": true,
220                        "codeDescriptionSupport": true,
221                        "dataSupport": true
222                    }
223                }
224            },
225            "clientInfo": {
226                "name": "aft",
227                "version": env!("CARGO_PKG_VERSION")
228            },
229            "workspaceFolders": [
230                {
231                    "uri": root_uri,
232                    "name": workspace_root
233                        .file_name()
234                        .and_then(|name| name.to_str())
235                        .unwrap_or("workspace")
236                }
237            ]
238        }))?;
239
240        let result = self.send_request::<lsp_types::request::Initialize>(params)?;
241        self.send_notification::<lsp_types::notification::Initialized>(serde_json::from_value(
242            json!({}),
243        )?)?;
244        self.state = ServerState::Ready;
245        Ok(result)
246    }
247
248    /// Send a request and wait for the response.
249    pub fn send_request<R>(&mut self, params: R::Params) -> Result<R::Result, LspError>
250    where
251        R: lsp_types::request::Request,
252        R::Params: serde::Serialize,
253        R::Result: DeserializeOwned,
254    {
255        self.ensure_can_send()?;
256
257        let id = RequestId::Int(self.next_id.fetch_add(1, Ordering::Relaxed));
258        let (tx, rx) = bounded(1);
259        {
260            let mut pending = self.lock_pending()?;
261            pending.insert(id.clone(), tx);
262        }
263
264        let request = Request::new(id.clone(), R::METHOD, Some(serde_json::to_value(params)?));
265        {
266            let mut writer = self
267                .writer
268                .lock()
269                .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
270            if let Err(err) = transport::write_request(&mut *writer, &request) {
271                self.remove_pending(&id);
272                return Err(err.into());
273            }
274        }
275
276        let response = match rx.recv_timeout(REQUEST_TIMEOUT) {
277            Ok(response) => response,
278            Err(RecvTimeoutError::Timeout) => {
279                self.remove_pending(&id);
280                return Err(LspError::Timeout(format!(
281                    "timed out waiting for '{}' response from {:?}",
282                    R::METHOD,
283                    self.kind
284                )));
285            }
286            Err(RecvTimeoutError::Disconnected) => {
287                self.remove_pending(&id);
288                return Err(LspError::ServerNotReady(format!(
289                    "language server {:?} disconnected while waiting for '{}'",
290                    self.kind,
291                    R::METHOD
292                )));
293            }
294        };
295
296        if let Some(error) = response.error {
297            return Err(LspError::ServerError {
298                code: error.code,
299                message: error.message,
300            });
301        }
302
303        serde_json::from_value(response.result.unwrap_or(Value::Null)).map_err(Into::into)
304    }
305
306    /// Send a notification (fire-and-forget).
307    pub fn send_notification<N>(&mut self, params: N::Params) -> Result<(), LspError>
308    where
309        N: lsp_types::notification::Notification,
310        N::Params: serde::Serialize,
311    {
312        self.ensure_can_send()?;
313        let notification = Notification::new(N::METHOD, Some(serde_json::to_value(params)?));
314        let mut writer = self
315            .writer
316            .lock()
317            .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
318        transport::write_notification(&mut *writer, &notification)?;
319        Ok(())
320    }
321
322    /// Graceful shutdown: send shutdown request, then exit notification.
323    pub fn shutdown(&mut self) -> Result<(), LspError> {
324        if self.state == ServerState::Exited {
325            return Ok(());
326        }
327
328        if self.child.try_wait()?.is_some() {
329            self.state = ServerState::Exited;
330            return Ok(());
331        }
332
333        if let Err(err) = self.send_request::<lsp_types::request::Shutdown>(()) {
334            self.state = ServerState::ShuttingDown;
335            if self.child.try_wait()?.is_some() {
336                self.state = ServerState::Exited;
337                return Ok(());
338            }
339            return Err(err);
340        }
341
342        self.state = ServerState::ShuttingDown;
343
344        if let Err(err) = self.send_notification::<lsp_types::notification::Exit>(()) {
345            if self.child.try_wait()?.is_some() {
346                self.state = ServerState::Exited;
347                return Ok(());
348            }
349            return Err(err);
350        }
351
352        let deadline = Instant::now() + SHUTDOWN_TIMEOUT;
353        loop {
354            if self.child.try_wait()?.is_some() {
355                self.state = ServerState::Exited;
356                return Ok(());
357            }
358            if Instant::now() >= deadline {
359                return Err(LspError::Timeout(format!(
360                    "timed out waiting for {:?} to exit",
361                    self.kind
362                )));
363            }
364            thread::sleep(EXIT_POLL_INTERVAL);
365        }
366    }
367
368    pub fn state(&self) -> ServerState {
369        self.state
370    }
371
372    pub fn kind(&self) -> ServerKind {
373        self.kind
374    }
375
376    pub fn root(&self) -> &Path {
377        &self.root
378    }
379
380    fn ensure_can_send(&self) -> Result<(), LspError> {
381        if matches!(self.state, ServerState::ShuttingDown | ServerState::Exited) {
382            return Err(LspError::ServerNotReady(format!(
383                "language server {:?} is not ready (state: {:?})",
384                self.kind, self.state
385            )));
386        }
387        Ok(())
388    }
389
390    fn lock_pending(&self) -> Result<std::sync::MutexGuard<'_, PendingMap>, LspError> {
391        self.pending
392            .lock()
393            .map_err(|_| io::Error::other("pending response map poisoned").into())
394    }
395
396    fn remove_pending(&self, id: &RequestId) {
397        if let Ok(mut pending) = self.pending.lock() {
398            pending.remove(id);
399        }
400    }
401}