Skip to main content

lean_ctx/lsp/
client.rs

1#![allow(clippy::wildcard_imports, clippy::default_trait_access)]
2
3use lsp_types::*;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::io::{BufRead, BufReader, Write};
7use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
8use std::sync::atomic::{AtomicI64, Ordering};
9use std::sync::mpsc::{self, Receiver};
10use std::time::{Duration, Instant};
11
12use super::config::LspServerConfig;
13
14const INIT_TIMEOUT_SECS: u64 = 60;
15const REQUEST_TIMEOUT_SECS: u64 = 30;
16const SHUTDOWN_TIMEOUT_SECS: u64 = 5;
17
18pub fn file_path_to_uri(path: &str) -> Result<Uri, String> {
19    let abs = if path.starts_with('/') || (path.len() >= 2 && path.as_bytes()[1] == b':') {
20        path.to_string()
21    } else {
22        std::fs::canonicalize(path)
23            .map(|p| p.to_string_lossy().to_string())
24            .map_err(|e| format!("Cannot resolve path '{path}': {e}"))?
25    };
26    let normalized = abs.replace('\\', "/");
27    let uri_str = if normalized.starts_with('/') {
28        format!("file://{normalized}")
29    } else {
30        format!("file:///{normalized}")
31    };
32    uri_str
33        .parse::<Uri>()
34        .map_err(|e| format!("Invalid URI: {e}"))
35}
36
37pub fn uri_to_file_path(uri: &Uri) -> Option<String> {
38    let s = uri.as_str();
39    s.strip_prefix("file://")
40        .map(|p| urlencoding::decode(p).map_or_else(|_| p.to_string(), |d| d.to_string()))
41}
42
43pub struct LspClient {
44    child: Child,
45    stdin: ChildStdin,
46    response_rx: Receiver<Result<Value, String>>,
47    next_id: AtomicI64,
48    initialized: bool,
49}
50
51#[derive(Serialize)]
52struct JsonRpcRequest {
53    jsonrpc: &'static str,
54    id: i64,
55    method: String,
56    params: Value,
57}
58
59#[derive(Deserialize)]
60struct JsonRpcResponse {
61    #[allow(dead_code)]
62    id: Option<i64>,
63    result: Option<Value>,
64    error: Option<JsonRpcError>,
65}
66
67#[derive(Deserialize)]
68struct JsonRpcError {
69    #[allow(dead_code)]
70    code: i64,
71    message: String,
72}
73
74fn read_one_message(reader: &mut BufReader<ChildStdout>) -> Result<Value, String> {
75    let mut content_length = 0usize;
76    loop {
77        let mut line = String::new();
78        let bytes_read = reader
79            .read_line(&mut line)
80            .map_err(|e| format!("Read header: {e}"))?;
81        if bytes_read == 0 {
82            return Err("LSP server closed connection (EOF)".into());
83        }
84        let trimmed = line.trim();
85        if trimmed.is_empty() {
86            break;
87        }
88        if let Some(val) = trimmed.strip_prefix("Content-Length: ") {
89            content_length = val.parse().map_err(|e| format!("Parse length: {e}"))?;
90        }
91    }
92    if content_length == 0 {
93        return Err("Zero content length from LSP server".into());
94    }
95    let mut body = vec![0u8; content_length];
96    std::io::Read::read_exact(reader, &mut body).map_err(|e| format!("Read body: {e}"))?;
97    let text = String::from_utf8_lossy(&body);
98    serde_json::from_str(&text).map_err(|e| format!("Parse response: {e}"))
99}
100
101fn spawn_reader(stdout: ChildStdout) -> Receiver<Result<Value, String>> {
102    let (tx, rx) = mpsc::channel();
103    std::thread::Builder::new()
104        .name("lsp-reader".into())
105        .spawn(move || {
106            let mut reader = BufReader::new(stdout);
107            loop {
108                match read_one_message(&mut reader) {
109                    Ok(msg) => {
110                        if tx.send(Ok(msg)).is_err() {
111                            break;
112                        }
113                    }
114                    Err(e) => {
115                        let _ = tx.send(Err(e));
116                        break;
117                    }
118                }
119            }
120        })
121        .ok();
122    rx
123}
124
125impl LspClient {
126    pub fn start(config: &LspServerConfig, root_uri: &Uri) -> Result<Self, String> {
127        let mut child = Command::new(&config.command)
128            .args(&config.args)
129            .stdin(Stdio::piped())
130            .stdout(Stdio::piped())
131            .stderr(Stdio::null())
132            .spawn()
133            .map_err(|e| format!("Failed to start LSP server '{}': {e}", config.command))?;
134
135        let stdin = child.stdin.take().ok_or("No stdin")?;
136        let stdout = child.stdout.take().ok_or("No stdout")?;
137        let response_rx = spawn_reader(stdout);
138
139        let mut client = Self {
140            child,
141            stdin,
142            response_rx,
143            next_id: AtomicI64::new(1),
144            initialized: false,
145        };
146
147        client.initialize(root_uri)?;
148        Ok(client)
149    }
150
151    fn check_alive(&mut self) -> Result<(), String> {
152        match self.child.try_wait() {
153            Ok(Some(status)) => Err(format!("LSP server exited: {status}")),
154            Ok(None) => Ok(()),
155            Err(e) => Err(format!("Cannot check LSP server status: {e}")),
156        }
157    }
158
159    #[allow(deprecated)]
160    fn initialize(&mut self, root_uri: &Uri) -> Result<(), String> {
161        let params = InitializeParams {
162            root_uri: Some(root_uri.clone()),
163            capabilities: ClientCapabilities {
164                text_document: Some(TextDocumentClientCapabilities {
165                    rename: Some(RenameClientCapabilities {
166                        dynamic_registration: Some(false),
167                        prepare_support: Some(true),
168                        ..Default::default()
169                    }),
170                    references: Some(DynamicRegistrationClientCapabilities {
171                        dynamic_registration: Some(false),
172                    }),
173                    definition: Some(GotoCapability {
174                        dynamic_registration: Some(false),
175                        link_support: Some(false),
176                    }),
177                    implementation: Some(GotoCapability {
178                        dynamic_registration: Some(false),
179                        link_support: Some(false),
180                    }),
181                    ..Default::default()
182                }),
183                ..Default::default()
184            },
185            ..Default::default()
186        };
187
188        let _result = self.request_with_timeout::<request::Initialize>(
189            params,
190            Duration::from_secs(INIT_TIMEOUT_SECS),
191        )?;
192        self.send_notification::<notification::Initialized>(InitializedParams {})?;
193        self.initialized = true;
194        Ok(())
195    }
196
197    pub fn did_open(&mut self, uri: &Uri, language_id: &str, text: &str) -> Result<(), String> {
198        self.check_alive()?;
199        self.send_notification::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
200            text_document: TextDocumentItem {
201                uri: uri.clone(),
202                language_id: language_id.to_string(),
203                version: 1,
204                text: text.to_string(),
205            },
206        })
207    }
208
209    pub fn references(&mut self, uri: &Uri, position: Position) -> Result<Vec<Location>, String> {
210        self.check_alive()?;
211        let params = ReferenceParams {
212            text_document_position: TextDocumentPositionParams {
213                text_document: TextDocumentIdentifier { uri: uri.clone() },
214                position,
215            },
216            context: ReferenceContext {
217                include_declaration: true,
218            },
219            work_done_progress_params: Default::default(),
220            partial_result_params: Default::default(),
221        };
222        let result = self.request_with_timeout::<request::References>(
223            params,
224            Duration::from_secs(REQUEST_TIMEOUT_SECS),
225        )?;
226        Ok(result.unwrap_or_default())
227    }
228
229    pub fn definition(
230        &mut self,
231        uri: &Uri,
232        position: Position,
233    ) -> Result<GotoDefinitionResponse, String> {
234        self.check_alive()?;
235        let params = GotoDefinitionParams {
236            text_document_position_params: TextDocumentPositionParams {
237                text_document: TextDocumentIdentifier { uri: uri.clone() },
238                position,
239            },
240            work_done_progress_params: Default::default(),
241            partial_result_params: Default::default(),
242        };
243        let result = self.request_with_timeout::<request::GotoDefinition>(
244            params,
245            Duration::from_secs(REQUEST_TIMEOUT_SECS),
246        )?;
247        Ok(result.unwrap_or(GotoDefinitionResponse::Array(vec![])))
248    }
249
250    pub fn rename(
251        &mut self,
252        uri: &Uri,
253        position: Position,
254        new_name: &str,
255    ) -> Result<Option<WorkspaceEdit>, String> {
256        self.check_alive()?;
257        let params = RenameParams {
258            text_document_position: TextDocumentPositionParams {
259                text_document: TextDocumentIdentifier { uri: uri.clone() },
260                position,
261            },
262            new_name: new_name.to_string(),
263            work_done_progress_params: Default::default(),
264        };
265        self.request_with_timeout::<request::Rename>(
266            params,
267            Duration::from_secs(REQUEST_TIMEOUT_SECS),
268        )
269    }
270
271    pub fn implementations(
272        &mut self,
273        uri: &Uri,
274        position: Position,
275    ) -> Result<Vec<Location>, String> {
276        self.check_alive()?;
277        let params = GotoDefinitionParams {
278            text_document_position_params: TextDocumentPositionParams {
279                text_document: TextDocumentIdentifier { uri: uri.clone() },
280                position,
281            },
282            work_done_progress_params: Default::default(),
283            partial_result_params: Default::default(),
284        };
285        let value = self.request_raw_with_timeout(
286            "textDocument/implementation",
287            serde_json::to_value(params).unwrap_or_default(),
288            Duration::from_secs(REQUEST_TIMEOUT_SECS),
289        )?;
290        match value {
291            Some(v) => {
292                let locations: Vec<Location> = serde_json::from_value(v).unwrap_or_default();
293                Ok(locations)
294            }
295            None => Ok(vec![]),
296        }
297    }
298
299    fn request_with_timeout<R: request::Request>(
300        &mut self,
301        params: R::Params,
302        timeout: Duration,
303    ) -> Result<R::Result, String>
304    where
305        R::Params: Serialize,
306        R::Result: for<'de> Deserialize<'de>,
307    {
308        let value = self.request_raw_with_timeout(
309            R::METHOD,
310            serde_json::to_value(params).map_err(|e| e.to_string())?,
311            timeout,
312        )?;
313        match value {
314            Some(v) => serde_json::from_value(v).map_err(|e| format!("Deserialize error: {e}")),
315            None => serde_json::from_value(Value::Null).map_err(|e| format!("Null result: {e}")),
316        }
317    }
318
319    fn request_raw_with_timeout(
320        &mut self,
321        method: &str,
322        params: Value,
323        timeout: Duration,
324    ) -> Result<Option<Value>, String> {
325        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
326        let req = JsonRpcRequest {
327            jsonrpc: "2.0",
328            id,
329            method: method.to_string(),
330            params,
331        };
332        self.send_message(&serde_json::to_value(req).unwrap())?;
333        self.read_response(id, timeout)
334    }
335
336    fn send_notification<N: notification::Notification>(
337        &mut self,
338        params: N::Params,
339    ) -> Result<(), String>
340    where
341        N::Params: Serialize,
342    {
343        let msg = serde_json::json!({
344            "jsonrpc": "2.0",
345            "method": N::METHOD,
346            "params": serde_json::to_value(params).map_err(|e| e.to_string())?
347        });
348        self.send_message(&msg)
349    }
350
351    fn send_message(&mut self, msg: &Value) -> Result<(), String> {
352        let body = serde_json::to_string(msg).map_err(|e| e.to_string())?;
353        let header = format!("Content-Length: {}\r\n\r\n", body.len());
354        self.stdin
355            .write_all(header.as_bytes())
356            .map_err(|e| format!("Write to LSP server: {e}"))?;
357        self.stdin
358            .write_all(body.as_bytes())
359            .map_err(|e| format!("Write to LSP server: {e}"))?;
360        self.stdin
361            .flush()
362            .map_err(|e| format!("Flush LSP server: {e}"))?;
363        Ok(())
364    }
365
366    fn read_response(&self, expected_id: i64, timeout: Duration) -> Result<Option<Value>, String> {
367        let deadline = Instant::now() + timeout;
368        loop {
369            let remaining = deadline.saturating_duration_since(Instant::now());
370            if remaining.is_zero() {
371                return Err(format!(
372                    "LSP response timeout ({}s) for request id={expected_id}",
373                    timeout.as_secs()
374                ));
375            }
376
377            match self.response_rx.recv_timeout(remaining) {
378                Ok(Ok(msg)) => {
379                    if msg.get("id").and_then(Value::as_i64) == Some(expected_id) {
380                        let resp: JsonRpcResponse =
381                            serde_json::from_value(msg).map_err(|e| e.to_string())?;
382                        if let Some(err) = resp.error {
383                            return Err(format!("LSP error: {}", err.message));
384                        }
385                        return Ok(resp.result);
386                    }
387                }
388                Ok(Err(e)) => return Err(format!("LSP reader error: {e}")),
389                Err(mpsc::RecvTimeoutError::Timeout) => {
390                    return Err(format!("LSP response timeout ({}s)", timeout.as_secs()));
391                }
392                Err(mpsc::RecvTimeoutError::Disconnected) => {
393                    return Err("LSP server connection lost".into());
394                }
395            }
396        }
397    }
398
399    pub fn shutdown(&mut self) {
400        let _ = self.request_raw_with_timeout(
401            "shutdown",
402            Value::Null,
403            Duration::from_secs(SHUTDOWN_TIMEOUT_SECS),
404        );
405        let _ = self.send_notification::<notification::Exit>(());
406        let _ = self.child.wait();
407    }
408}
409
410impl Drop for LspClient {
411    fn drop(&mut self) {
412        if self.initialized {
413            self.shutdown();
414        }
415    }
416}