1use async_trait::async_trait;
5use serde::Serialize;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::path::Path;
9use std::process::Stdio;
10use std::sync::atomic::{AtomicI64, Ordering};
11use std::sync::Arc;
12use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
13use tokio::process::{Child, ChildStdin, Command};
14use tokio::sync::{oneshot, Mutex};
15use url::Url;
16
17use crate::constants::kind_name;
18use crate::types::{
19 CancelSignal, LspClient, LspHoverResult, LspLocation, LspServerProfile, LspSymbolInfo,
20 Position1, ServerHandle, ServerState,
21};
22
23struct ServerEntry {
24 language: String,
25 root: String,
26 state: Mutex<ServerState>,
27 stdin: Mutex<ChildStdin>,
28 pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Value, String>>>>>,
29 next_id: AtomicI64,
30 opened_files: Mutex<HashMap<String, bool>>,
31 _child: Mutex<Child>,
32}
33
34impl ServerEntry {
35 async fn handle(&self) -> ServerHandle {
36 ServerHandle {
37 language: self.language.clone(),
38 root: self.root.clone(),
39 state: *self.state.lock().await,
40 }
41 }
42}
43
44pub struct SpawnLspClient {
45 servers: Mutex<HashMap<String, Arc<ServerEntry>>>,
46}
47
48impl SpawnLspClient {
49 pub fn new() -> Self {
50 Self {
51 servers: Mutex::new(HashMap::new()),
52 }
53 }
54
55 fn key(language: &str, root: &str) -> String {
56 format!("{}|{}", language, root)
57 }
58}
59
60impl Default for SpawnLspClient {
61 fn default() -> Self {
62 Self::new()
63 }
64}
65
66async fn send_request(
67 entry: &ServerEntry,
68 method: &str,
69 params: Value,
70 cancel: CancelSignal,
71) -> Result<Value, String> {
72 let id = entry.next_id.fetch_add(1, Ordering::Relaxed);
73 let (tx, rx) = oneshot::channel();
74 {
75 let mut pending = entry.pending.lock().await;
76 pending.insert(id, tx);
77 }
78 let msg = serde_json::json!({
79 "jsonrpc": "2.0",
80 "id": id,
81 "method": method,
82 "params": params,
83 });
84 write_message(entry, &msg).await?;
85
86 let mut cancel_rx = cancel.clone();
87 tokio::select! {
88 res = rx => res.map_err(|_| "request dropped".to_string())?,
89 _ = cancel_rx.changed() => {
90 if *cancel_rx.borrow() {
91 let _ = write_message(entry, &serde_json::json!({
93 "jsonrpc": "2.0",
94 "method": "$/cancelRequest",
95 "params": { "id": id },
96 })).await;
97 let mut pending = entry.pending.lock().await;
98 pending.remove(&id);
99 return Err("aborted".to_string());
100 }
101 Err("cancel channel closed".to_string())
102 }
103 }
104}
105
106async fn send_notification(
107 entry: &ServerEntry,
108 method: &str,
109 params: Value,
110) -> Result<(), String> {
111 let msg = serde_json::json!({
112 "jsonrpc": "2.0",
113 "method": method,
114 "params": params,
115 });
116 write_message(entry, &msg).await
117}
118
119async fn write_message<T: Serialize>(entry: &ServerEntry, msg: &T) -> Result<(), String> {
120 let body = serde_json::to_vec(msg).map_err(|e| e.to_string())?;
121 let header = format!("Content-Length: {}\r\n\r\n", body.len());
122 let mut stdin = entry.stdin.lock().await;
123 stdin
124 .write_all(header.as_bytes())
125 .await
126 .map_err(|e| e.to_string())?;
127 stdin.write_all(&body).await.map_err(|e| e.to_string())?;
128 stdin.flush().await.map_err(|e| e.to_string())?;
129 Ok(())
130}
131
132async fn read_loop<R: tokio::io::AsyncRead + Unpin>(
133 reader: R,
134 pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Value, String>>>>>,
135) {
136 let mut reader = BufReader::new(reader);
137 loop {
138 let mut content_length: Option<usize> = None;
140 loop {
141 let mut line = String::new();
142 let n = match reader.read_line(&mut line).await {
143 Ok(n) => n,
144 Err(_) => return,
145 };
146 if n == 0 {
147 return; }
149 let trimmed = line.trim_end_matches(&['\r', '\n'][..]);
150 if trimmed.is_empty() {
151 break;
152 }
153 if let Some(v) = trimmed
154 .to_ascii_lowercase()
155 .strip_prefix("content-length:")
156 {
157 if let Ok(n) = v.trim().parse::<usize>() {
158 content_length = Some(n);
159 }
160 }
161 }
162 let Some(len) = content_length else {
163 continue;
164 };
165 let mut buf = vec![0u8; len];
166 if reader.read_exact(&mut buf).await.is_err() {
167 return;
168 }
169 let value: Value = match serde_json::from_slice(&buf) {
170 Ok(v) => v,
171 Err(_) => continue,
172 };
173 if let Some(id_raw) = value.get("id") {
175 if let Some(id) = id_raw.as_i64() {
176 let tx_opt = {
177 let mut p = pending.lock().await;
178 p.remove(&id)
179 };
180 if let Some(tx) = tx_opt {
181 if let Some(err) = value.get("error") {
182 let msg = err.get("message").and_then(|m| m.as_str()).unwrap_or("error");
183 let _ = tx.send(Err(msg.to_string()));
184 } else {
185 let r = value.get("result").cloned().unwrap_or(Value::Null);
186 let _ = tx.send(Ok(r));
187 }
188 }
189 }
190 }
191 }
192}
193
194fn file_uri(p: &str) -> String {
195 Url::from_file_path(Path::new(p))
196 .map(|u| u.to_string())
197 .unwrap_or_else(|_| format!("file://{}", p))
198}
199
200fn file_uri_to_path(uri: &str) -> String {
201 if let Some(rest) = uri.strip_prefix("file://") {
202 return rest.to_string();
204 }
205 uri.to_string()
206}
207
208fn lsp_pos(p: Position1) -> Value {
209 serde_json::json!({
210 "line": p.line.saturating_sub(1),
211 "character": p.character.saturating_sub(1),
212 })
213}
214
215fn from_lsp_line(n: i64) -> u32 {
216 (n as i64 + 1).max(1) as u32
217}
218
219fn from_lsp_char(n: i64) -> u32 {
220 (n as i64 + 1).max(1) as u32
221}
222
223async fn did_open_if_needed(entry: &ServerEntry, file_path: &str) -> Result<(), String> {
224 {
225 let opened = entry.opened_files.lock().await;
226 if opened.contains_key(file_path) {
227 return Ok(());
228 }
229 }
230 let text = tokio::fs::read_to_string(file_path)
231 .await
232 .unwrap_or_default();
233 let uri = file_uri(file_path);
234 let language_id = entry.language.clone();
235 send_notification(
236 entry,
237 "textDocument/didOpen",
238 serde_json::json!({
239 "textDocument": {
240 "uri": uri,
241 "languageId": language_id,
242 "version": 1,
243 "text": text,
244 }
245 }),
246 )
247 .await?;
248 let mut opened = entry.opened_files.lock().await;
249 opened.insert(file_path.to_string(), true);
250 Ok(())
251}
252
253async fn preview_line_at(path: &str, zero_indexed_line: i64) -> String {
254 let text = tokio::fs::read_to_string(path).await.unwrap_or_default();
255 let lines: Vec<&str> = text.lines().collect();
256 lines
257 .get(zero_indexed_line.max(0) as usize)
258 .map(|s| s.trim().to_string())
259 .unwrap_or_default()
260}
261
262async fn normalize_location(v: &Value) -> Option<LspLocation> {
263 let uri = v
264 .get("uri")
265 .and_then(|x| x.as_str())
266 .or_else(|| v.get("targetUri").and_then(|x| x.as_str()))?;
267 let range = v
268 .get("range")
269 .or_else(|| v.get("targetSelectionRange"))
270 .or_else(|| v.get("targetRange"))?;
271 let start = range.get("start")?;
272 let line = start.get("line")?.as_i64()?;
273 let character = start.get("character")?.as_i64()?;
274 let path = file_uri_to_path(uri);
275 let preview = preview_line_at(&path, line).await;
276 Some(LspLocation {
277 path,
278 line: from_lsp_line(line),
279 character: from_lsp_char(character),
280 preview,
281 })
282}
283
284fn flatten_hover_contents(contents: &Value) -> (String, bool) {
285 if let Some(s) = contents.as_str() {
286 return (s.to_string(), false);
287 }
288 if let Some(arr) = contents.as_array() {
289 let mut parts: Vec<String> = Vec::new();
290 for c in arr {
291 if let Some(s) = c.as_str() {
292 parts.push(s.to_string());
293 } else if let Some(obj) = c.as_object() {
294 let language = obj.get("language").and_then(|x| x.as_str()).unwrap_or("");
295 let value = obj.get("value").and_then(|x| x.as_str()).unwrap_or("");
296 parts.push(if !language.is_empty() {
297 format!("```{}\n{}\n```", language, value)
298 } else {
299 value.to_string()
300 });
301 }
302 }
303 return (parts.join("\n\n"), true);
304 }
305 if let Some(obj) = contents.as_object() {
306 if let Some(kind) = obj.get("kind").and_then(|x| x.as_str()) {
307 let value = obj.get("value").and_then(|x| x.as_str()).unwrap_or("");
308 return (value.to_string(), kind == "markdown");
309 }
310 if let Some(value) = obj.get("value").and_then(|x| x.as_str()) {
311 let language = obj.get("language").and_then(|x| x.as_str()).unwrap_or("");
312 return (
313 if !language.is_empty() {
314 format!("```{}\n{}\n```", language, value)
315 } else {
316 value.to_string()
317 },
318 true,
319 );
320 }
321 }
322 (String::new(), false)
323}
324
325fn map_document_symbol(v: &Value, file_path: &str) -> LspSymbolInfo {
326 let name = v.get("name").and_then(|x| x.as_str()).unwrap_or("").to_string();
327 let kind = v.get("kind").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
328 let range = v
329 .get("range")
330 .or_else(|| v.get("selectionRange"))
331 .cloned()
332 .unwrap_or(Value::Null);
333 let start = range.get("start").cloned().unwrap_or(Value::Null);
334 let line = start.get("line").and_then(|x| x.as_i64()).unwrap_or(0);
335 let character = start.get("character").and_then(|x| x.as_i64()).unwrap_or(0);
336 let children = v
337 .get("children")
338 .and_then(|x| x.as_array())
339 .map(|arr| arr.iter().map(|c| map_document_symbol(c, file_path)).collect());
340 LspSymbolInfo {
341 name,
342 kind: kind_name(kind).to_string(),
343 path: file_path.to_string(),
344 line: from_lsp_line(line),
345 character: from_lsp_char(character),
346 container_name: None,
347 children,
348 }
349}
350
351fn map_symbol_information(v: &Value) -> LspSymbolInfo {
352 let name = v.get("name").and_then(|x| x.as_str()).unwrap_or("").to_string();
353 let kind = v.get("kind").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
354 let loc = v.get("location").cloned().unwrap_or(Value::Null);
355 let uri = loc.get("uri").and_then(|x| x.as_str()).unwrap_or("");
356 let range = loc.get("range").cloned().unwrap_or(Value::Null);
357 let start = range.get("start").cloned().unwrap_or(Value::Null);
358 let line = start.get("line").and_then(|x| x.as_i64()).unwrap_or(0);
359 let character = start.get("character").and_then(|x| x.as_i64()).unwrap_or(0);
360 let container_name = v
361 .get("containerName")
362 .and_then(|x| x.as_str())
363 .map(|s| s.to_string());
364 LspSymbolInfo {
365 name,
366 kind: kind_name(kind).to_string(),
367 path: file_uri_to_path(uri),
368 line: from_lsp_line(line),
369 character: from_lsp_char(character),
370 container_name,
371 children: None,
372 }
373}
374
375#[async_trait]
376impl LspClient for SpawnLspClient {
377 async fn ensure_server(
378 &self,
379 language: &str,
380 root: &str,
381 profile: &LspServerProfile,
382 ) -> Result<ServerHandle, String> {
383 let key = Self::key(language, root);
384 {
385 let servers = self.servers.lock().await;
386 if let Some(entry) = servers.get(&key) {
387 let st = *entry.state.lock().await;
388 if st != ServerState::Crashed {
389 return Ok(entry.handle().await);
390 }
391 }
392 }
393
394 let (cmd_str, args) = match profile.command.split_first() {
395 Some((c, rest)) => (c.clone(), rest.to_vec()),
396 None => {
397 return Err(format!("LSP profile '{}' has empty command", language));
398 }
399 };
400 let mut cmd = Command::new(&cmd_str);
401 cmd.args(&args);
402 cmd.current_dir(root);
403 cmd.stdin(Stdio::piped());
404 cmd.stdout(Stdio::piped());
405 cmd.stderr(Stdio::piped());
406 cmd.kill_on_drop(true);
407 let mut child = cmd.spawn().map_err(|e| e.to_string())?;
408 let stdout = child.stdout.take().ok_or_else(|| "no stdout".to_string())?;
409 let stdin = child.stdin.take().ok_or_else(|| "no stdin".to_string())?;
410
411 let pending: Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Value, String>>>>> =
412 Arc::new(Mutex::new(HashMap::new()));
413 {
414 let pending_clone = Arc::clone(&pending);
415 tokio::spawn(read_loop(stdout, pending_clone));
416 }
417
418 let entry = Arc::new(ServerEntry {
419 language: language.to_string(),
420 root: root.to_string(),
421 state: Mutex::new(ServerState::Starting),
422 stdin: Mutex::new(stdin),
423 pending: Arc::clone(&pending),
424 next_id: AtomicI64::new(1),
425 opened_files: Mutex::new(HashMap::new()),
426 _child: Mutex::new(child),
427 });
428
429 {
430 let mut servers = self.servers.lock().await;
431 servers.insert(key.clone(), Arc::clone(&entry));
432 }
433
434 let root_uri = Url::from_file_path(Path::new(root))
436 .map(|u| u.to_string())
437 .unwrap_or_else(|_| format!("file://{}", root));
438 let init_params = serde_json::json!({
439 "processId": std::process::id(),
440 "rootUri": root_uri,
441 "workspaceFolders": [
442 { "uri": root_uri, "name": Path::new(root).file_name().and_then(|s| s.to_str()).unwrap_or("workspace") }
443 ],
444 "capabilities": {
445 "textDocument": {
446 "hover": { "contentFormat": ["markdown", "plaintext"] },
447 "definition": { "linkSupport": true },
448 "references": {},
449 "documentSymbol": { "hierarchicalDocumentSymbolSupport": true },
450 "implementation": { "linkSupport": true },
451 },
452 "workspace": { "symbol": {} },
453 },
454 "initializationOptions": profile.initialization_options.clone().unwrap_or(Value::Null),
455 });
456 let (dummy_tx, _dummy_rx) = tokio::sync::watch::channel(false);
457 if let Err(e) =
458 send_request(&entry, "initialize", init_params, dummy_tx.subscribe()).await
459 {
460 let mut servers = self.servers.lock().await;
461 servers.remove(&key);
462 return Err(format!("initialize failed: {}", e));
463 }
464 let _ = send_notification(&entry, "initialized", serde_json::json!({})).await;
465
466 {
467 let mut state = entry.state.lock().await;
468 *state = ServerState::Ready;
469 }
470 Ok(entry.handle().await)
471 }
472
473 async fn hover(
474 &self,
475 handle: &ServerHandle,
476 path: &str,
477 pos: Position1,
478 cancel: CancelSignal,
479 ) -> Result<Option<LspHoverResult>, String> {
480 let entry = self.entry_for(handle).await?;
481 did_open_if_needed(&entry, path).await?;
482 let params = serde_json::json!({
483 "textDocument": { "uri": file_uri(path) },
484 "position": lsp_pos(pos),
485 });
486 let r = send_request(&entry, "textDocument/hover", params, cancel).await?;
487 if r.is_null() {
488 return Ok(None);
489 }
490 let contents = r.get("contents").cloned().unwrap_or(Value::Null);
491 let (text, is_md) = flatten_hover_contents(&contents);
492 if text.is_empty() {
493 return Ok(None);
494 }
495 Ok(Some(LspHoverResult {
496 contents: text,
497 is_markdown: is_md,
498 }))
499 }
500
501 async fn definition(
502 &self,
503 handle: &ServerHandle,
504 path: &str,
505 pos: Position1,
506 cancel: CancelSignal,
507 ) -> Result<Vec<LspLocation>, String> {
508 self.locations_like(handle, path, pos, cancel, "textDocument/definition").await
509 }
510
511 async fn references(
512 &self,
513 handle: &ServerHandle,
514 path: &str,
515 pos: Position1,
516 cancel: CancelSignal,
517 ) -> Result<Vec<LspLocation>, String> {
518 self.locations_like(handle, path, pos, cancel, "textDocument/references").await
519 }
520
521 async fn implementation(
522 &self,
523 handle: &ServerHandle,
524 path: &str,
525 pos: Position1,
526 cancel: CancelSignal,
527 ) -> Result<Vec<LspLocation>, String> {
528 self.locations_like(handle, path, pos, cancel, "textDocument/implementation").await
529 }
530
531 async fn document_symbol(
532 &self,
533 handle: &ServerHandle,
534 path: &str,
535 cancel: CancelSignal,
536 ) -> Result<Vec<LspSymbolInfo>, String> {
537 let entry = self.entry_for(handle).await?;
538 did_open_if_needed(&entry, path).await?;
539 let params = serde_json::json!({
540 "textDocument": { "uri": file_uri(path) },
541 });
542 let r = send_request(&entry, "textDocument/documentSymbol", params, cancel).await?;
543 let Some(arr) = r.as_array() else {
544 return Ok(Vec::new());
545 };
546 if arr.is_empty() {
547 return Ok(Vec::new());
548 }
549 let first = &arr[0];
550 if first.get("location").is_some() {
551 Ok(arr.iter().map(map_symbol_information).collect())
552 } else {
553 Ok(arr.iter().map(|v| map_document_symbol(v, path)).collect())
554 }
555 }
556
557 async fn workspace_symbol(
558 &self,
559 handle: &ServerHandle,
560 query: &str,
561 cancel: CancelSignal,
562 ) -> Result<Vec<LspSymbolInfo>, String> {
563 let entry = self.entry_for(handle).await?;
564 let params = serde_json::json!({ "query": query });
565 let r = send_request(&entry, "workspace/symbol", params, cancel).await?;
566 let Some(arr) = r.as_array() else {
567 return Ok(Vec::new());
568 };
569 Ok(arr.iter().map(map_symbol_information).collect())
570 }
571
572 async fn close_session(&self) {
573 let mut servers = self.servers.lock().await;
574 for (_, entry) in servers.drain() {
575 let (dummy_tx, _) = tokio::sync::watch::channel(false);
577 let _ = send_request(
578 &entry,
579 "shutdown",
580 Value::Null,
581 dummy_tx.subscribe(),
582 )
583 .await;
584 let _ = send_notification(&entry, "exit", Value::Null).await;
585 }
586 }
587}
588
589impl SpawnLspClient {
590 async fn entry_for(&self, handle: &ServerHandle) -> Result<Arc<ServerEntry>, String> {
591 let key = Self::key(&handle.language, &handle.root);
592 let servers = self.servers.lock().await;
593 servers
594 .get(&key)
595 .cloned()
596 .ok_or_else(|| format!("no server entry for {}", key))
597 }
598
599 async fn locations_like(
600 &self,
601 handle: &ServerHandle,
602 path: &str,
603 pos: Position1,
604 cancel: CancelSignal,
605 method: &str,
606 ) -> Result<Vec<LspLocation>, String> {
607 let entry = self.entry_for(handle).await?;
608 did_open_if_needed(&entry, path).await?;
609 let mut params = serde_json::json!({
610 "textDocument": { "uri": file_uri(path) },
611 "position": lsp_pos(pos),
612 });
613 if method == "textDocument/references" {
614 params["context"] = serde_json::json!({ "includeDeclaration": true });
615 }
616 let r = send_request(&entry, method, params, cancel).await?;
617 if r.is_null() {
618 return Ok(Vec::new());
619 }
620 let items: Vec<Value> = if let Some(arr) = r.as_array() {
621 arr.clone()
622 } else {
623 vec![r]
624 };
625 let mut out: Vec<LspLocation> = Vec::new();
626 for item in &items {
627 if let Some(loc) = normalize_location(item).await {
628 out.push(loc);
629 }
630 }
631 Ok(out)
632 }
633}