Skip to main content

harness_lsp/
spawn_client.rs

1//! Spawn-based LSP client. Talks LSP JSON-RPC over stdio to a child
2//! language-server. Framing: `Content-Length: N\r\n\r\n<body>`.
3
4use async_trait::async_trait;
5use serde::Serialize;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::path::Path;
9use std::process::Stdio;
10use std::sync::atomic::{AtomicI64, Ordering};
11use std::sync::Arc;
12use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
13use tokio::process::{Child, ChildStdin, Command};
14use tokio::sync::{oneshot, Mutex};
15use url::Url;
16
17use crate::constants::kind_name;
18use crate::types::{
19    CancelSignal, LspClient, LspHoverResult, LspLocation, LspServerProfile, LspSymbolInfo,
20    Position1, ServerHandle, ServerState,
21};
22
23struct ServerEntry {
24    language: String,
25    root: String,
26    state: Mutex<ServerState>,
27    stdin: Mutex<ChildStdin>,
28    pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Value, String>>>>>,
29    next_id: AtomicI64,
30    opened_files: Mutex<HashMap<String, bool>>,
31    _child: Mutex<Child>,
32}
33
34impl ServerEntry {
35    async fn handle(&self) -> ServerHandle {
36        ServerHandle {
37            language: self.language.clone(),
38            root: self.root.clone(),
39            state: *self.state.lock().await,
40        }
41    }
42}
43
44pub struct SpawnLspClient {
45    servers: Mutex<HashMap<String, Arc<ServerEntry>>>,
46}
47
48impl SpawnLspClient {
49    pub fn new() -> Self {
50        Self {
51            servers: Mutex::new(HashMap::new()),
52        }
53    }
54
55    fn key(language: &str, root: &str) -> String {
56        format!("{}|{}", language, root)
57    }
58}
59
60impl Default for SpawnLspClient {
61    fn default() -> Self {
62        Self::new()
63    }
64}
65
66async fn send_request(
67    entry: &ServerEntry,
68    method: &str,
69    params: Value,
70    cancel: CancelSignal,
71) -> Result<Value, String> {
72    let id = entry.next_id.fetch_add(1, Ordering::Relaxed);
73    let (tx, rx) = oneshot::channel();
74    {
75        let mut pending = entry.pending.lock().await;
76        pending.insert(id, tx);
77    }
78    let msg = serde_json::json!({
79        "jsonrpc": "2.0",
80        "id": id,
81        "method": method,
82        "params": params,
83    });
84    write_message(entry, &msg).await?;
85
86    let mut cancel_rx = cancel.clone();
87    tokio::select! {
88        res = rx => res.map_err(|_| "request dropped".to_string())?,
89        _ = cancel_rx.changed() => {
90            if *cancel_rx.borrow() {
91                // Best-effort cancel notification.
92                let _ = write_message(entry, &serde_json::json!({
93                    "jsonrpc": "2.0",
94                    "method": "$/cancelRequest",
95                    "params": { "id": id },
96                })).await;
97                let mut pending = entry.pending.lock().await;
98                pending.remove(&id);
99                return Err("aborted".to_string());
100            }
101            Err("cancel channel closed".to_string())
102        }
103    }
104}
105
106async fn send_notification(
107    entry: &ServerEntry,
108    method: &str,
109    params: Value,
110) -> Result<(), String> {
111    let msg = serde_json::json!({
112        "jsonrpc": "2.0",
113        "method": method,
114        "params": params,
115    });
116    write_message(entry, &msg).await
117}
118
119async fn write_message<T: Serialize>(entry: &ServerEntry, msg: &T) -> Result<(), String> {
120    let body = serde_json::to_vec(msg).map_err(|e| e.to_string())?;
121    let header = format!("Content-Length: {}\r\n\r\n", body.len());
122    let mut stdin = entry.stdin.lock().await;
123    stdin
124        .write_all(header.as_bytes())
125        .await
126        .map_err(|e| e.to_string())?;
127    stdin.write_all(&body).await.map_err(|e| e.to_string())?;
128    stdin.flush().await.map_err(|e| e.to_string())?;
129    Ok(())
130}
131
132async fn read_loop<R: tokio::io::AsyncRead + Unpin>(
133    reader: R,
134    pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Value, String>>>>>,
135) {
136    let mut reader = BufReader::new(reader);
137    loop {
138        // Parse headers
139        let mut content_length: Option<usize> = None;
140        loop {
141            let mut line = String::new();
142            let n = match reader.read_line(&mut line).await {
143                Ok(n) => n,
144                Err(_) => return,
145            };
146            if n == 0 {
147                return; // EOF
148            }
149            let trimmed = line.trim_end_matches(&['\r', '\n'][..]);
150            if trimmed.is_empty() {
151                break;
152            }
153            if let Some(v) = trimmed
154                .to_ascii_lowercase()
155                .strip_prefix("content-length:")
156            {
157                if let Ok(n) = v.trim().parse::<usize>() {
158                    content_length = Some(n);
159                }
160            }
161        }
162        let Some(len) = content_length else {
163            continue;
164        };
165        let mut buf = vec![0u8; len];
166        if reader.read_exact(&mut buf).await.is_err() {
167            return;
168        }
169        let value: Value = match serde_json::from_slice(&buf) {
170            Ok(v) => v,
171            Err(_) => continue,
172        };
173        // Route responses only (ignore server-pushed notifications for now).
174        if let Some(id_raw) = value.get("id") {
175            if let Some(id) = id_raw.as_i64() {
176                let tx_opt = {
177                    let mut p = pending.lock().await;
178                    p.remove(&id)
179                };
180                if let Some(tx) = tx_opt {
181                    if let Some(err) = value.get("error") {
182                        let msg = err.get("message").and_then(|m| m.as_str()).unwrap_or("error");
183                        let _ = tx.send(Err(msg.to_string()));
184                    } else {
185                        let r = value.get("result").cloned().unwrap_or(Value::Null);
186                        let _ = tx.send(Ok(r));
187                    }
188                }
189            }
190        }
191    }
192}
193
194fn file_uri(p: &str) -> String {
195    Url::from_file_path(Path::new(p))
196        .map(|u| u.to_string())
197        .unwrap_or_else(|_| format!("file://{}", p))
198}
199
200fn file_uri_to_path(uri: &str) -> String {
201    if let Some(rest) = uri.strip_prefix("file://") {
202        // Drop leading '/' iff Windows-style; otherwise keep as-is.
203        return rest.to_string();
204    }
205    uri.to_string()
206}
207
208fn lsp_pos(p: Position1) -> Value {
209    serde_json::json!({
210        "line": p.line.saturating_sub(1),
211        "character": p.character.saturating_sub(1),
212    })
213}
214
215fn from_lsp_line(n: i64) -> u32 {
216    (n as i64 + 1).max(1) as u32
217}
218
219fn from_lsp_char(n: i64) -> u32 {
220    (n as i64 + 1).max(1) as u32
221}
222
223async fn did_open_if_needed(entry: &ServerEntry, file_path: &str) -> Result<(), String> {
224    {
225        let opened = entry.opened_files.lock().await;
226        if opened.contains_key(file_path) {
227            return Ok(());
228        }
229    }
230    let text = tokio::fs::read_to_string(file_path)
231        .await
232        .unwrap_or_default();
233    let uri = file_uri(file_path);
234    let language_id = entry.language.clone();
235    send_notification(
236        entry,
237        "textDocument/didOpen",
238        serde_json::json!({
239            "textDocument": {
240                "uri": uri,
241                "languageId": language_id,
242                "version": 1,
243                "text": text,
244            }
245        }),
246    )
247    .await?;
248    let mut opened = entry.opened_files.lock().await;
249    opened.insert(file_path.to_string(), true);
250    Ok(())
251}
252
253async fn preview_line_at(path: &str, zero_indexed_line: i64) -> String {
254    let text = tokio::fs::read_to_string(path).await.unwrap_or_default();
255    let lines: Vec<&str> = text.lines().collect();
256    lines
257        .get(zero_indexed_line.max(0) as usize)
258        .map(|s| s.trim().to_string())
259        .unwrap_or_default()
260}
261
262async fn normalize_location(v: &Value) -> Option<LspLocation> {
263    let uri = v
264        .get("uri")
265        .and_then(|x| x.as_str())
266        .or_else(|| v.get("targetUri").and_then(|x| x.as_str()))?;
267    let range = v
268        .get("range")
269        .or_else(|| v.get("targetSelectionRange"))
270        .or_else(|| v.get("targetRange"))?;
271    let start = range.get("start")?;
272    let line = start.get("line")?.as_i64()?;
273    let character = start.get("character")?.as_i64()?;
274    let path = file_uri_to_path(uri);
275    let preview = preview_line_at(&path, line).await;
276    Some(LspLocation {
277        path,
278        line: from_lsp_line(line),
279        character: from_lsp_char(character),
280        preview,
281    })
282}
283
284fn flatten_hover_contents(contents: &Value) -> (String, bool) {
285    if let Some(s) = contents.as_str() {
286        return (s.to_string(), false);
287    }
288    if let Some(arr) = contents.as_array() {
289        let mut parts: Vec<String> = Vec::new();
290        for c in arr {
291            if let Some(s) = c.as_str() {
292                parts.push(s.to_string());
293            } else if let Some(obj) = c.as_object() {
294                let language = obj.get("language").and_then(|x| x.as_str()).unwrap_or("");
295                let value = obj.get("value").and_then(|x| x.as_str()).unwrap_or("");
296                parts.push(if !language.is_empty() {
297                    format!("```{}\n{}\n```", language, value)
298                } else {
299                    value.to_string()
300                });
301            }
302        }
303        return (parts.join("\n\n"), true);
304    }
305    if let Some(obj) = contents.as_object() {
306        if let Some(kind) = obj.get("kind").and_then(|x| x.as_str()) {
307            let value = obj.get("value").and_then(|x| x.as_str()).unwrap_or("");
308            return (value.to_string(), kind == "markdown");
309        }
310        if let Some(value) = obj.get("value").and_then(|x| x.as_str()) {
311            let language = obj.get("language").and_then(|x| x.as_str()).unwrap_or("");
312            return (
313                if !language.is_empty() {
314                    format!("```{}\n{}\n```", language, value)
315                } else {
316                    value.to_string()
317                },
318                true,
319            );
320        }
321    }
322    (String::new(), false)
323}
324
325fn map_document_symbol(v: &Value, file_path: &str) -> LspSymbolInfo {
326    let name = v.get("name").and_then(|x| x.as_str()).unwrap_or("").to_string();
327    let kind = v.get("kind").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
328    let range = v
329        .get("range")
330        .or_else(|| v.get("selectionRange"))
331        .cloned()
332        .unwrap_or(Value::Null);
333    let start = range.get("start").cloned().unwrap_or(Value::Null);
334    let line = start.get("line").and_then(|x| x.as_i64()).unwrap_or(0);
335    let character = start.get("character").and_then(|x| x.as_i64()).unwrap_or(0);
336    let children = v
337        .get("children")
338        .and_then(|x| x.as_array())
339        .map(|arr| arr.iter().map(|c| map_document_symbol(c, file_path)).collect());
340    LspSymbolInfo {
341        name,
342        kind: kind_name(kind).to_string(),
343        path: file_path.to_string(),
344        line: from_lsp_line(line),
345        character: from_lsp_char(character),
346        container_name: None,
347        children,
348    }
349}
350
351fn map_symbol_information(v: &Value) -> LspSymbolInfo {
352    let name = v.get("name").and_then(|x| x.as_str()).unwrap_or("").to_string();
353    let kind = v.get("kind").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
354    let loc = v.get("location").cloned().unwrap_or(Value::Null);
355    let uri = loc.get("uri").and_then(|x| x.as_str()).unwrap_or("");
356    let range = loc.get("range").cloned().unwrap_or(Value::Null);
357    let start = range.get("start").cloned().unwrap_or(Value::Null);
358    let line = start.get("line").and_then(|x| x.as_i64()).unwrap_or(0);
359    let character = start.get("character").and_then(|x| x.as_i64()).unwrap_or(0);
360    let container_name = v
361        .get("containerName")
362        .and_then(|x| x.as_str())
363        .map(|s| s.to_string());
364    LspSymbolInfo {
365        name,
366        kind: kind_name(kind).to_string(),
367        path: file_uri_to_path(uri),
368        line: from_lsp_line(line),
369        character: from_lsp_char(character),
370        container_name,
371        children: None,
372    }
373}
374
375#[async_trait]
376impl LspClient for SpawnLspClient {
377    async fn ensure_server(
378        &self,
379        language: &str,
380        root: &str,
381        profile: &LspServerProfile,
382    ) -> Result<ServerHandle, String> {
383        let key = Self::key(language, root);
384        {
385            let servers = self.servers.lock().await;
386            if let Some(entry) = servers.get(&key) {
387                let st = *entry.state.lock().await;
388                if st != ServerState::Crashed {
389                    return Ok(entry.handle().await);
390                }
391            }
392        }
393
394        let (cmd_str, args) = match profile.command.split_first() {
395            Some((c, rest)) => (c.clone(), rest.to_vec()),
396            None => {
397                return Err(format!("LSP profile '{}' has empty command", language));
398            }
399        };
400        let mut cmd = Command::new(&cmd_str);
401        cmd.args(&args);
402        cmd.current_dir(root);
403        cmd.stdin(Stdio::piped());
404        cmd.stdout(Stdio::piped());
405        cmd.stderr(Stdio::piped());
406        cmd.kill_on_drop(true);
407        let mut child = cmd.spawn().map_err(|e| e.to_string())?;
408        let stdout = child.stdout.take().ok_or_else(|| "no stdout".to_string())?;
409        let stdin = child.stdin.take().ok_or_else(|| "no stdin".to_string())?;
410
411        let pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Value, String>>>>> =
412            Arc::new(Mutex::new(HashMap::new()));
413        {
414            let pending_clone = Arc::clone(&pending);
415            tokio::spawn(read_loop(stdout, pending_clone));
416        }
417
418        let entry = Arc::new(ServerEntry {
419            language: language.to_string(),
420            root: root.to_string(),
421            state: Mutex::new(ServerState::Starting),
422            stdin: Mutex::new(stdin),
423            pending: Arc::clone(&pending),
424            next_id: AtomicI64::new(1),
425            opened_files: Mutex::new(HashMap::new()),
426            _child: Mutex::new(child),
427        });
428
429        {
430            let mut servers = self.servers.lock().await;
431            servers.insert(key.clone(), Arc::clone(&entry));
432        }
433
434        // initialize handshake
435        let root_uri = Url::from_file_path(Path::new(root))
436            .map(|u| u.to_string())
437            .unwrap_or_else(|_| format!("file://{}", root));
438        let init_params = serde_json::json!({
439            "processId": std::process::id(),
440            "rootUri": root_uri,
441            "workspaceFolders": [
442                { "uri": root_uri, "name": Path::new(root).file_name().and_then(|s| s.to_str()).unwrap_or("workspace") }
443            ],
444            "capabilities": {
445                "textDocument": {
446                    "hover": { "contentFormat": ["markdown", "plaintext"] },
447                    "definition": { "linkSupport": true },
448                    "references": {},
449                    "documentSymbol": { "hierarchicalDocumentSymbolSupport": true },
450                    "implementation": { "linkSupport": true },
451                },
452                "workspace": { "symbol": {} },
453            },
454            "initializationOptions": profile.initialization_options.clone().unwrap_or(Value::Null),
455        });
456        let (dummy_tx, _dummy_rx) = tokio::sync::watch::channel(false);
457        if let Err(e) =
458            send_request(&entry, "initialize", init_params, dummy_tx.subscribe()).await
459        {
460            let mut servers = self.servers.lock().await;
461            servers.remove(&key);
462            return Err(format!("initialize failed: {}", e));
463        }
464        let _ = send_notification(&entry, "initialized", serde_json::json!({})).await;
465
466        {
467            let mut state = entry.state.lock().await;
468            *state = ServerState::Ready;
469        }
470        Ok(entry.handle().await)
471    }
472
473    async fn hover(
474        &self,
475        handle: &ServerHandle,
476        path: &str,
477        pos: Position1,
478        cancel: CancelSignal,
479    ) -> Result<Option<LspHoverResult>, String> {
480        let entry = self.entry_for(handle).await?;
481        did_open_if_needed(&entry, path).await?;
482        let params = serde_json::json!({
483            "textDocument": { "uri": file_uri(path) },
484            "position": lsp_pos(pos),
485        });
486        let r = send_request(&entry, "textDocument/hover", params, cancel).await?;
487        if r.is_null() {
488            return Ok(None);
489        }
490        let contents = r.get("contents").cloned().unwrap_or(Value::Null);
491        let (text, is_md) = flatten_hover_contents(&contents);
492        if text.is_empty() {
493            return Ok(None);
494        }
495        Ok(Some(LspHoverResult {
496            contents: text,
497            is_markdown: is_md,
498        }))
499    }
500
501    async fn definition(
502        &self,
503        handle: &ServerHandle,
504        path: &str,
505        pos: Position1,
506        cancel: CancelSignal,
507    ) -> Result<Vec<LspLocation>, String> {
508        self.locations_like(handle, path, pos, cancel, "textDocument/definition").await
509    }
510
511    async fn references(
512        &self,
513        handle: &ServerHandle,
514        path: &str,
515        pos: Position1,
516        cancel: CancelSignal,
517    ) -> Result<Vec<LspLocation>, String> {
518        self.locations_like(handle, path, pos, cancel, "textDocument/references").await
519    }
520
521    async fn implementation(
522        &self,
523        handle: &ServerHandle,
524        path: &str,
525        pos: Position1,
526        cancel: CancelSignal,
527    ) -> Result<Vec<LspLocation>, String> {
528        self.locations_like(handle, path, pos, cancel, "textDocument/implementation").await
529    }
530
531    async fn document_symbol(
532        &self,
533        handle: &ServerHandle,
534        path: &str,
535        cancel: CancelSignal,
536    ) -> Result<Vec<LspSymbolInfo>, String> {
537        let entry = self.entry_for(handle).await?;
538        did_open_if_needed(&entry, path).await?;
539        let params = serde_json::json!({
540            "textDocument": { "uri": file_uri(path) },
541        });
542        let r = send_request(&entry, "textDocument/documentSymbol", params, cancel).await?;
543        let Some(arr) = r.as_array() else {
544            return Ok(Vec::new());
545        };
546        if arr.is_empty() {
547            return Ok(Vec::new());
548        }
549        let first = &arr[0];
550        if first.get("location").is_some() {
551            Ok(arr.iter().map(map_symbol_information).collect())
552        } else {
553            Ok(arr.iter().map(|v| map_document_symbol(v, path)).collect())
554        }
555    }
556
557    async fn workspace_symbol(
558        &self,
559        handle: &ServerHandle,
560        query: &str,
561        cancel: CancelSignal,
562    ) -> Result<Vec<LspSymbolInfo>, String> {
563        let entry = self.entry_for(handle).await?;
564        let params = serde_json::json!({ "query": query });
565        let r = send_request(&entry, "workspace/symbol", params, cancel).await?;
566        let Some(arr) = r.as_array() else {
567            return Ok(Vec::new());
568        };
569        Ok(arr.iter().map(map_symbol_information).collect())
570    }
571
572    async fn close_session(&self) {
573        let mut servers = self.servers.lock().await;
574        for (_, entry) in servers.drain() {
575            // Best-effort shutdown.
576            let (dummy_tx, _) = tokio::sync::watch::channel(false);
577            let _ = send_request(
578                &entry,
579                "shutdown",
580                Value::Null,
581                dummy_tx.subscribe(),
582            )
583            .await;
584            let _ = send_notification(&entry, "exit", Value::Null).await;
585        }
586    }
587}
588
589impl SpawnLspClient {
590    async fn entry_for(&self, handle: &ServerHandle) -> Result<Arc<ServerEntry>, String> {
591        let key = Self::key(&handle.language, &handle.root);
592        let servers = self.servers.lock().await;
593        servers
594            .get(&key)
595            .cloned()
596            .ok_or_else(|| format!("no server entry for {}", key))
597    }
598
599    async fn locations_like(
600        &self,
601        handle: &ServerHandle,
602        path: &str,
603        pos: Position1,
604        cancel: CancelSignal,
605        method: &str,
606    ) -> Result<Vec<LspLocation>, String> {
607        let entry = self.entry_for(handle).await?;
608        did_open_if_needed(&entry, path).await?;
609        let mut params = serde_json::json!({
610            "textDocument": { "uri": file_uri(path) },
611            "position": lsp_pos(pos),
612        });
613        if method == "textDocument/references" {
614            params["context"] = serde_json::json!({ "includeDeclaration": true });
615        }
616        let r = send_request(&entry, method, params, cancel).await?;
617        if r.is_null() {
618            return Ok(Vec::new());
619        }
620        let items: Vec<Value> = if let Some(arr) = r.as_array() {
621            arr.clone()
622        } else {
623            vec![r]
624        };
625        let mut out: Vec<LspLocation> = Vec::new();
626        for item in &items {
627            if let Some(loc) = normalize_location(item).await {
628                out.push(loc);
629            }
630        }
631        Ok(out)
632    }
633}