Skip to main content

aft/lsp/
client.rs

1use std::collections::HashMap;
2use std::io::{self, BufReader, BufWriter};
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicI64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use crossbeam_channel::{bounded, RecvTimeoutError, Sender};
12use serde::de::DeserializeOwned;
13use serde_json::{json, Value};
14
15use crate::lsp::jsonrpc::{
16    Notification, Request, RequestId, Response as JsonRpcResponse, ServerMessage,
17};
18use crate::lsp::registry::ServerKind;
19use crate::lsp::{transport, LspError};
20
21const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
22const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
23const EXIT_POLL_INTERVAL: Duration = Duration::from_millis(25);
24
25type PendingMap = HashMap<RequestId, Sender<JsonRpcResponse>>;
26
27/// Lifecycle state of a language server.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum ServerState {
30    Starting,
31    Initializing,
32    Ready,
33    ShuttingDown,
34    Exited,
35}
36
37/// Events sent from background reader threads into the main loop.
38#[derive(Debug)]
39pub enum LspEvent {
40    /// Server sent a notification (e.g. publishDiagnostics).
41    Notification {
42        server_kind: ServerKind,
43        root: PathBuf,
44        method: String,
45        params: Option<Value>,
46    },
47    /// Server sent a request (e.g. workspace/configuration).
48    ServerRequest {
49        server_kind: ServerKind,
50        root: PathBuf,
51        id: RequestId,
52        method: String,
53        params: Option<Value>,
54    },
55    /// Server process exited or the transport stream closed.
56    ServerExited {
57        server_kind: ServerKind,
58        root: PathBuf,
59    },
60}
61
62/// A client connected to one language server process.
63pub struct LspClient {
64    kind: ServerKind,
65    root: PathBuf,
66    state: ServerState,
67    child: Child,
68    writer: Arc<Mutex<BufWriter<std::process::ChildStdin>>>,
69
70    /// Pending request responses, keyed by request ID.
71    pending: Arc<Mutex<PendingMap>>,
72    /// Next request ID counter.
73    next_id: AtomicI64,
74}
75
76impl LspClient {
77    /// Spawn a new language server process and start the background reader thread.
78    pub fn spawn(
79        kind: ServerKind,
80        root: PathBuf,
81        binary: &Path,
82        args: &[&str],
83        event_tx: Sender<LspEvent>,
84    ) -> io::Result<Self> {
85        let mut child = Command::new(binary)
86            .args(args)
87            .current_dir(&root)
88            .stdin(Stdio::piped())
89            .stdout(Stdio::piped())
90            // Use null() instead of piped() to prevent deadlock when the server
91            // writes more than ~64KB to stderr (piped buffer fills, server blocks)
92            .stderr(Stdio::null())
93            .spawn()?;
94
95        let stdout = child
96            .stdout
97            .take()
98            .ok_or_else(|| io::Error::other("language server missing stdout pipe"))?;
99        let stdin = child
100            .stdin
101            .take()
102            .ok_or_else(|| io::Error::other("language server missing stdin pipe"))?;
103
104        let writer = Arc::new(Mutex::new(BufWriter::new(stdin)));
105        let pending = Arc::new(Mutex::new(PendingMap::new()));
106        let reader_pending = Arc::clone(&pending);
107        let reader_writer = Arc::clone(&writer);
108        let reader_kind = kind;
109        let reader_root = root.clone();
110
111        thread::spawn(move || {
112            let mut reader = BufReader::new(stdout);
113            loop {
114                match transport::read_message(&mut reader) {
115                    Ok(Some(ServerMessage::Response(response))) => {
116                        if let Ok(mut guard) = reader_pending.lock() {
117                            if let Some(tx) = guard.remove(&response.id) {
118                                if tx.send(response).is_err() {
119                                    log::debug!("[aft-lsp] response channel closed");
120                                }
121                            }
122                        } else {
123                            let _ = event_tx.send(LspEvent::ServerExited {
124                                server_kind: reader_kind,
125                                root: reader_root.clone(),
126                            });
127                            break;
128                        }
129                    }
130                    Ok(Some(ServerMessage::Notification { method, params })) => {
131                        let _ = event_tx.send(LspEvent::Notification {
132                            server_kind: reader_kind,
133                            root: reader_root.clone(),
134                            method,
135                            params,
136                        });
137                    }
138                    Ok(Some(ServerMessage::Request { id, method, params })) => {
139                        // Auto-respond to server requests to prevent deadlocks.
140                        // Server requests (like client/registerCapability,
141                        // window/workDoneProgress/create) block the server until
142                        // we respond. If we don't respond, the server won't send
143                        // responses to OUR pending requests → deadlock.
144                        //
145                        // Dispatch by method to return correct types:
146                        // - workspace/configuration expects Vec<Value> (one per item)
147                        // - Everything else gets null (safe default for registration/progress)
148                        let response_value = if method == "workspace/configuration" {
149                            // Return an array of null configs — one per requested item.
150                            // Servers fall back to filesystem config (tsconfig, pyrightconfig, etc.)
151                            let item_count = params
152                                .as_ref()
153                                .and_then(|p| p.get("items"))
154                                .and_then(|items| items.as_array())
155                                .map_or(1, |arr| arr.len());
156                            serde_json::Value::Array(vec![serde_json::Value::Null; item_count])
157                        } else {
158                            serde_json::Value::Null
159                        };
160                        if let Ok(mut w) = reader_writer.lock() {
161                            let response = super::jsonrpc::OutgoingResponse::success(
162                                id.clone(),
163                                response_value,
164                            );
165                            let _ = transport::write_response(&mut *w, &response);
166                        }
167                        // Also forward as event for any interested handlers
168                        let _ = event_tx.send(LspEvent::ServerRequest {
169                            server_kind: reader_kind,
170                            root: reader_root.clone(),
171                            id,
172                            method,
173                            params,
174                        });
175                    }
176                    Ok(None) | Err(_) => {
177                        if let Ok(mut guard) = reader_pending.lock() {
178                            guard.clear();
179                        }
180                        let _ = event_tx.send(LspEvent::ServerExited {
181                            server_kind: reader_kind,
182                            root: reader_root.clone(),
183                        });
184                        break;
185                    }
186                }
187            }
188        });
189
190        Ok(Self {
191            kind,
192            root,
193            state: ServerState::Starting,
194            child,
195            writer,
196            pending,
197            next_id: AtomicI64::new(1),
198        })
199    }
200
201    /// Send the initialize request and wait for response. Transition to Ready.
202    pub fn initialize(
203        &mut self,
204        workspace_root: &Path,
205    ) -> Result<lsp_types::InitializeResult, LspError> {
206        self.ensure_can_send()?;
207        self.state = ServerState::Initializing;
208
209        let normalized = normalize_windows_path(workspace_root);
210        let root_url = url::Url::from_file_path(&normalized).map_err(|_| {
211            LspError::NotFound(format!(
212                "failed to convert workspace root '{}' to file URI",
213                workspace_root.display()
214            ))
215        })?;
216        let root_uri = lsp_types::Uri::from_str(root_url.as_str()).map_err(|_| {
217            LspError::NotFound(format!(
218                "failed to convert workspace root '{}' to file URI",
219                workspace_root.display()
220            ))
221        })?;
222
223        let params = serde_json::from_value::<lsp_types::InitializeParams>(json!({
224            "processId": std::process::id(),
225            "rootUri": root_uri,
226            "capabilities": {
227                "workspace": {
228                    "workspaceFolders": true,
229                    "configuration": true
230                },
231                "textDocument": {
232                    "synchronization": {
233                        "dynamicRegistration": false,
234                        "didSave": true,
235                        "willSave": false,
236                        "willSaveWaitUntil": false
237                    },
238                    "publishDiagnostics": {
239                        "relatedInformation": true,
240                        "versionSupport": true,
241                        "codeDescriptionSupport": true,
242                        "dataSupport": true
243                    }
244                }
245            },
246            "clientInfo": {
247                "name": "aft",
248                "version": env!("CARGO_PKG_VERSION")
249            },
250            "workspaceFolders": [
251                {
252                    "uri": root_uri,
253                    "name": workspace_root
254                        .file_name()
255                        .and_then(|name| name.to_str())
256                        .unwrap_or("workspace")
257                }
258            ]
259        }))?;
260
261        let result = self.send_request::<lsp_types::request::Initialize>(params)?;
262        self.send_notification::<lsp_types::notification::Initialized>(serde_json::from_value(
263            json!({}),
264        )?)?;
265        self.state = ServerState::Ready;
266        Ok(result)
267    }
268
269    /// Send a request and wait for the response.
270    pub fn send_request<R>(&mut self, params: R::Params) -> Result<R::Result, LspError>
271    where
272        R: lsp_types::request::Request,
273        R::Params: serde::Serialize,
274        R::Result: DeserializeOwned,
275    {
276        self.ensure_can_send()?;
277
278        let id = RequestId::Int(self.next_id.fetch_add(1, Ordering::Relaxed));
279        let (tx, rx) = bounded(1);
280        {
281            let mut pending = self.lock_pending()?;
282            pending.insert(id.clone(), tx);
283        }
284
285        let request = Request::new(id.clone(), R::METHOD, Some(serde_json::to_value(params)?));
286        {
287            let mut writer = self
288                .writer
289                .lock()
290                .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
291            if let Err(err) = transport::write_request(&mut *writer, &request) {
292                self.remove_pending(&id);
293                return Err(err.into());
294            }
295        }
296
297        let response = match rx.recv_timeout(REQUEST_TIMEOUT) {
298            Ok(response) => response,
299            Err(RecvTimeoutError::Timeout) => {
300                self.remove_pending(&id);
301                return Err(LspError::Timeout(format!(
302                    "timed out waiting for '{}' response from {:?}",
303                    R::METHOD,
304                    self.kind
305                )));
306            }
307            Err(RecvTimeoutError::Disconnected) => {
308                self.remove_pending(&id);
309                return Err(LspError::ServerNotReady(format!(
310                    "language server {:?} disconnected while waiting for '{}'",
311                    self.kind,
312                    R::METHOD
313                )));
314            }
315        };
316
317        if let Some(error) = response.error {
318            return Err(LspError::ServerError {
319                code: error.code,
320                message: error.message,
321            });
322        }
323
324        serde_json::from_value(response.result.unwrap_or(Value::Null)).map_err(Into::into)
325    }
326
327    /// Send a notification (fire-and-forget).
328    pub fn send_notification<N>(&mut self, params: N::Params) -> Result<(), LspError>
329    where
330        N: lsp_types::notification::Notification,
331        N::Params: serde::Serialize,
332    {
333        self.ensure_can_send()?;
334        let notification = Notification::new(N::METHOD, Some(serde_json::to_value(params)?));
335        let mut writer = self
336            .writer
337            .lock()
338            .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
339        transport::write_notification(&mut *writer, &notification)?;
340        Ok(())
341    }
342
343    /// Graceful shutdown: send shutdown request, then exit notification.
344    pub fn shutdown(&mut self) -> Result<(), LspError> {
345        if self.state == ServerState::Exited {
346            return Ok(());
347        }
348
349        if self.child.try_wait()?.is_some() {
350            self.state = ServerState::Exited;
351            return Ok(());
352        }
353
354        if let Err(err) = self.send_request::<lsp_types::request::Shutdown>(()) {
355            self.state = ServerState::ShuttingDown;
356            if self.child.try_wait()?.is_some() {
357                self.state = ServerState::Exited;
358                return Ok(());
359            }
360            return Err(err);
361        }
362
363        self.state = ServerState::ShuttingDown;
364
365        if let Err(err) = self.send_notification::<lsp_types::notification::Exit>(()) {
366            if self.child.try_wait()?.is_some() {
367                self.state = ServerState::Exited;
368                return Ok(());
369            }
370            return Err(err);
371        }
372
373        let deadline = Instant::now() + SHUTDOWN_TIMEOUT;
374        loop {
375            if self.child.try_wait()?.is_some() {
376                self.state = ServerState::Exited;
377                return Ok(());
378            }
379            if Instant::now() >= deadline {
380                return Err(LspError::Timeout(format!(
381                    "timed out waiting for {:?} to exit",
382                    self.kind
383                )));
384            }
385            thread::sleep(EXIT_POLL_INTERVAL);
386        }
387    }
388
389    pub fn state(&self) -> ServerState {
390        self.state
391    }
392
393    pub fn kind(&self) -> ServerKind {
394        self.kind
395    }
396
397    pub fn root(&self) -> &Path {
398        &self.root
399    }
400
401    fn ensure_can_send(&self) -> Result<(), LspError> {
402        if matches!(self.state, ServerState::ShuttingDown | ServerState::Exited) {
403            return Err(LspError::ServerNotReady(format!(
404                "language server {:?} is not ready (state: {:?})",
405                self.kind, self.state
406            )));
407        }
408        Ok(())
409    }
410
411    fn lock_pending(&self) -> Result<std::sync::MutexGuard<'_, PendingMap>, LspError> {
412        self.pending
413            .lock()
414            .map_err(|_| io::Error::other("pending response map poisoned").into())
415    }
416
417    fn remove_pending(&self, id: &RequestId) {
418        if let Ok(mut pending) = self.pending.lock() {
419            pending.remove(id);
420        }
421    }
422}
423
424/// Normalize a path for file URI conversion.
425/// On Windows, strips the extended-length `\\?\` prefix that `Url::from_file_path` cannot handle.
426/// On other platforms, returns the path unchanged.
427fn normalize_windows_path(path: &Path) -> PathBuf {
428    let s = path.to_string_lossy();
429    if s.starts_with(r"\\?\") {
430        PathBuf::from(&s[4..])
431    } else {
432        path.to_path_buf()
433    }
434}