Skip to main content

aft/lsp/
client.rs

1use std::collections::HashMap;
2use std::io::{self, BufReader, BufWriter};
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicI64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
12use serde::de::DeserializeOwned;
13use serde_json::{json, Value};
14
15use crate::lsp::jsonrpc::{
16    Notification, Request, RequestId, Response as JsonRpcResponse, ServerMessage,
17};
18use crate::lsp::registry::ServerKind;
19use crate::lsp::{transport, LspError};
20
21const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
22const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
23const EXIT_POLL_INTERVAL: Duration = Duration::from_millis(25);
24
25type PendingMap = HashMap<RequestId, Sender<JsonRpcResponse>>;
26
27/// Lifecycle state of a language server.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum ServerState {
30    Starting,
31    Initializing,
32    Ready,
33    ShuttingDown,
34    Exited,
35}
36
37/// Events sent from background reader threads into the main loop.
38#[derive(Debug)]
39pub enum LspEvent {
40    /// Server sent a notification (e.g. publishDiagnostics).
41    Notification {
42        server_kind: ServerKind,
43        root: PathBuf,
44        method: String,
45        params: Option<Value>,
46    },
47    /// Server sent a request (e.g. workspace/configuration).
48    ServerRequest {
49        server_kind: ServerKind,
50        root: PathBuf,
51        id: RequestId,
52        method: String,
53        params: Option<Value>,
54    },
55    /// Server process exited or the transport stream closed.
56    ServerExited {
57        server_kind: ServerKind,
58        root: PathBuf,
59    },
60}
61
62/// A client connected to one language server process.
63pub struct LspClient {
64    kind: ServerKind,
65    root: PathBuf,
66    state: ServerState,
67    child: Child,
68    writer: BufWriter<std::process::ChildStdin>,
69    /// Channel kept on the client to reflect the per-server event stream.
70    /// Reader threads send into the manager's shared event channel.
71    #[allow(dead_code)]
72    event_rx: Receiver<LspEvent>,
73    /// Pending request responses, keyed by request ID.
74    pending: Arc<Mutex<PendingMap>>,
75    /// Next request ID counter.
76    next_id: AtomicI64,
77}
78
79impl LspClient {
80    /// Spawn a new language server process and start the background reader thread.
81    pub fn spawn(
82        kind: ServerKind,
83        root: PathBuf,
84        binary: &Path,
85        args: &[&str],
86        event_tx: Sender<LspEvent>,
87    ) -> io::Result<Self> {
88        let mut child = Command::new(binary)
89            .args(args)
90            .current_dir(&root)
91            .stdin(Stdio::piped())
92            .stdout(Stdio::piped())
93            .stderr(Stdio::piped())
94            .spawn()?;
95
96        let stdout = child
97            .stdout
98            .take()
99            .ok_or_else(|| io::Error::other("language server missing stdout pipe"))?;
100        let stdin = child
101            .stdin
102            .take()
103            .ok_or_else(|| io::Error::other("language server missing stdin pipe"))?;
104
105        let pending = Arc::new(Mutex::new(PendingMap::new()));
106        let reader_pending = Arc::clone(&pending);
107        let reader_kind = kind;
108        let reader_root = root.clone();
109        let (_client_event_tx, event_rx) = crossbeam_channel::unbounded();
110
111        thread::spawn(move || {
112            let mut reader = BufReader::new(stdout);
113            loop {
114                match transport::read_message(&mut reader) {
115                    Ok(Some(ServerMessage::Response(response))) => {
116                        if let Ok(mut guard) = reader_pending.lock() {
117                            if let Some(tx) = guard.remove(&response.id) {
118                                let _ = tx.send(response);
119                            }
120                        } else {
121                            let _ = event_tx.send(LspEvent::ServerExited {
122                                server_kind: reader_kind,
123                                root: reader_root.clone(),
124                            });
125                            break;
126                        }
127                    }
128                    Ok(Some(ServerMessage::Notification { method, params })) => {
129                        let _ = event_tx.send(LspEvent::Notification {
130                            server_kind: reader_kind,
131                            root: reader_root.clone(),
132                            method,
133                            params,
134                        });
135                    }
136                    Ok(Some(ServerMessage::Request { id, method, params })) => {
137                        let _ = event_tx.send(LspEvent::ServerRequest {
138                            server_kind: reader_kind,
139                            root: reader_root.clone(),
140                            id,
141                            method,
142                            params,
143                        });
144                    }
145                    Ok(None) | Err(_) => {
146                        if let Ok(mut guard) = reader_pending.lock() {
147                            guard.clear();
148                        }
149                        let _ = event_tx.send(LspEvent::ServerExited {
150                            server_kind: reader_kind,
151                            root: reader_root.clone(),
152                        });
153                        break;
154                    }
155                }
156            }
157        });
158
159        Ok(Self {
160            kind,
161            root,
162            state: ServerState::Starting,
163            child,
164            writer: BufWriter::new(stdin),
165            event_rx,
166            pending,
167            next_id: AtomicI64::new(1),
168        })
169    }
170
171    /// Send the initialize request and wait for response. Transition to Ready.
172    pub fn initialize(
173        &mut self,
174        workspace_root: &Path,
175    ) -> Result<lsp_types::InitializeResult, LspError> {
176        self.ensure_can_send()?;
177        self.state = ServerState::Initializing;
178
179        let root_uri = lsp_types::Uri::from_str(&format!("file://{}", workspace_root.display()))
180            .map_err(|_| {
181                LspError::NotFound(format!(
182                    "failed to convert workspace root '{}' to file URI",
183                    workspace_root.display()
184                ))
185            })?;
186
187        let params = serde_json::from_value::<lsp_types::InitializeParams>(json!({
188            "processId": std::process::id(),
189            "rootUri": root_uri,
190            "capabilities": {
191                "workspace": {
192                    "workspaceFolders": true,
193                    "configuration": true
194                },
195                "textDocument": {
196                    "synchronization": {
197                        "dynamicRegistration": false,
198                        "didSave": true,
199                        "willSave": false,
200                        "willSaveWaitUntil": false
201                    },
202                    "publishDiagnostics": {
203                        "relatedInformation": true,
204                        "versionSupport": true,
205                        "codeDescriptionSupport": true,
206                        "dataSupport": true
207                    }
208                }
209            },
210            "clientInfo": {
211                "name": "aft",
212                "version": env!("CARGO_PKG_VERSION")
213            },
214            "workspaceFolders": [
215                {
216                    "uri": root_uri,
217                    "name": workspace_root
218                        .file_name()
219                        .and_then(|name| name.to_str())
220                        .unwrap_or("workspace")
221                }
222            ]
223        }))?;
224
225        let result = self.send_request::<lsp_types::request::Initialize>(params)?;
226        self.send_notification::<lsp_types::notification::Initialized>(serde_json::from_value(
227            json!({}),
228        )?)?;
229        self.state = ServerState::Ready;
230        Ok(result)
231    }
232
233    /// Send a request and wait for the response.
234    pub fn send_request<R>(&mut self, params: R::Params) -> Result<R::Result, LspError>
235    where
236        R: lsp_types::request::Request,
237        R::Params: serde::Serialize,
238        R::Result: DeserializeOwned,
239    {
240        self.ensure_can_send()?;
241
242        let id = RequestId::Int(self.next_id.fetch_add(1, Ordering::Relaxed));
243        let (tx, rx) = bounded(1);
244        {
245            let mut pending = self.lock_pending()?;
246            pending.insert(id.clone(), tx);
247        }
248
249        let request = Request::new(id.clone(), R::METHOD, Some(serde_json::to_value(params)?));
250        if let Err(err) = transport::write_request(&mut self.writer, &request) {
251            self.remove_pending(&id);
252            return Err(err.into());
253        }
254
255        let response = match rx.recv_timeout(REQUEST_TIMEOUT) {
256            Ok(response) => response,
257            Err(RecvTimeoutError::Timeout) => {
258                self.remove_pending(&id);
259                return Err(LspError::Timeout(format!(
260                    "timed out waiting for '{}' response from {:?}",
261                    R::METHOD,
262                    self.kind
263                )));
264            }
265            Err(RecvTimeoutError::Disconnected) => {
266                self.remove_pending(&id);
267                return Err(LspError::ServerNotReady(format!(
268                    "language server {:?} disconnected while waiting for '{}'",
269                    self.kind,
270                    R::METHOD
271                )));
272            }
273        };
274
275        if let Some(error) = response.error {
276            return Err(LspError::ServerError {
277                code: error.code,
278                message: error.message,
279            });
280        }
281
282        serde_json::from_value(response.result.unwrap_or(Value::Null)).map_err(Into::into)
283    }
284
285    /// Send a notification (fire-and-forget).
286    pub fn send_notification<N>(&mut self, params: N::Params) -> Result<(), LspError>
287    where
288        N: lsp_types::notification::Notification,
289        N::Params: serde::Serialize,
290    {
291        self.ensure_can_send()?;
292        let notification = Notification::new(N::METHOD, Some(serde_json::to_value(params)?));
293        transport::write_notification(&mut self.writer, &notification)?;
294        Ok(())
295    }
296
297    /// Graceful shutdown: send shutdown request, then exit notification.
298    pub fn shutdown(&mut self) -> Result<(), LspError> {
299        if self.state == ServerState::Exited {
300            return Ok(());
301        }
302
303        if self.child.try_wait()?.is_some() {
304            self.state = ServerState::Exited;
305            return Ok(());
306        }
307
308        if let Err(err) = self.send_request::<lsp_types::request::Shutdown>(()) {
309            self.state = ServerState::ShuttingDown;
310            if self.child.try_wait()?.is_some() {
311                self.state = ServerState::Exited;
312                return Ok(());
313            }
314            return Err(err);
315        }
316
317        self.state = ServerState::ShuttingDown;
318
319        if let Err(err) = self.send_notification::<lsp_types::notification::Exit>(()) {
320            if self.child.try_wait()?.is_some() {
321                self.state = ServerState::Exited;
322                return Ok(());
323            }
324            return Err(err);
325        }
326
327        let deadline = Instant::now() + SHUTDOWN_TIMEOUT;
328        loop {
329            if self.child.try_wait()?.is_some() {
330                self.state = ServerState::Exited;
331                return Ok(());
332            }
333            if Instant::now() >= deadline {
334                return Err(LspError::Timeout(format!(
335                    "timed out waiting for {:?} to exit",
336                    self.kind
337                )));
338            }
339            thread::sleep(EXIT_POLL_INTERVAL);
340        }
341    }
342
343    pub fn state(&self) -> ServerState {
344        self.state
345    }
346
347    pub fn kind(&self) -> ServerKind {
348        self.kind
349    }
350
351    pub fn root(&self) -> &Path {
352        &self.root
353    }
354
355    fn ensure_can_send(&self) -> Result<(), LspError> {
356        if matches!(self.state, ServerState::ShuttingDown | ServerState::Exited) {
357            return Err(LspError::ServerNotReady(format!(
358                "language server {:?} is not ready (state: {:?})",
359                self.kind, self.state
360            )));
361        }
362        Ok(())
363    }
364
365    fn lock_pending(&self) -> Result<std::sync::MutexGuard<'_, PendingMap>, LspError> {
366        self.pending
367            .lock()
368            .map_err(|_| io::Error::other("pending response map poisoned").into())
369    }
370
371    fn remove_pending(&self, id: &RequestId) {
372        if let Ok(mut pending) = self.pending.lock() {
373            pending.remove(id);
374        }
375    }
376}