hematite/agent/lsp/
client.rs1use serde::Serialize;
2use serde_json::{json, Value};
3use std::collections::HashMap;
4use std::process::Stdio;
5use std::sync::Arc;
6use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
7use tokio::process::{Child, Command};
8use tokio::sync::{oneshot, Mutex};
9
10#[derive(Serialize)]
12pub struct LspRequest {
13 pub jsonrpc: String,
14 pub id: u64,
15 pub method: String,
16 pub params: Value,
17}
18
19pub struct LspClient {
21 #[allow(dead_code)]
22 child: Child,
23 stdin: Arc<Mutex<tokio::process::ChildStdin>>,
24 pending_requests: Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Value, String>>>>>,
25 pub next_id: Arc<std::sync::atomic::AtomicU64>,
26 pub diagnostics: Arc<Mutex<HashMap<String, Value>>>,
28}
29
30impl LspClient {
31 pub fn spawn(
32 command: &str,
33 args: &[String],
34 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
35 let mut child = Command::new(command)
36 .args(args)
37 .stdin(Stdio::piped())
38 .stdout(Stdio::piped())
39 .stderr(Stdio::inherit())
40 .spawn()?;
41
42 let stdin = child.stdin.take().ok_or("Failed to open stdin")?;
43 let stdout = child.stdout.take().ok_or("Failed to open stdout")?;
44
45 let pending_requests: Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Value, String>>>>> =
46 Arc::new(Mutex::new(HashMap::new()));
47 let next_id = Arc::new(std::sync::atomic::AtomicU64::new(1));
48 let diagnostics: Arc<Mutex<HashMap<String, Value>>> = Arc::new(Mutex::new(HashMap::new()));
49
50 let pending_requests_clone = pending_requests.clone();
51 let diagnostics_clone = diagnostics.clone();
52
53 tokio::spawn(async move {
55 let mut reader = BufReader::new(stdout);
56 let mut line = String::new();
57
58 loop {
59 line.clear();
60 let n = match reader.read_line(&mut line).await {
61 Ok(n) => n,
62 Err(_) => break,
63 };
64 if n == 0 {
65 break;
66 }
67
68 if line.starts_with("Content-Length: ") {
69 let len: usize = line["Content-Length: ".len()..].trim().parse().unwrap_or(0);
70 line.clear();
71 let _ = reader.read_line(&mut line).await;
72
73 let mut body = vec![0u8; len];
74 if let Err(_) = reader.read_exact(&mut body).await {
75 break;
76 }
77
78 if let Ok(json_body) = serde_json::from_slice::<Value>(&body) {
79 if let Some(id) = json_body.get("id").and_then(|v| v.as_u64()) {
80 let mut pending = pending_requests_clone.lock().await;
81 if let Some(tx) = pending.remove(&id) {
82 if let Some(err) = json_body.get("error") {
83 let _ = tx.send(Err(err.to_string()));
84 } else {
85 let result =
86 json_body.get("result").cloned().unwrap_or(Value::Null);
87 let _ = tx.send(Ok(result));
88 }
89 }
90 } else if let Some(method) =
91 json_body.get("method").and_then(|v| v.as_str())
92 {
93 if method == "textDocument/publishDiagnostics" {
95 if let Some(params) = json_body.get("params") {
96 if let Some(uri) = params.get("uri").and_then(|v| v.as_str()) {
97 let mut diags = diagnostics_clone.lock().await;
98 diags.insert(
99 uri.to_string(),
100 params
101 .get("diagnostics")
102 .cloned()
103 .unwrap_or(Value::Null),
104 );
105 }
106 }
107 }
108 }
109 }
110 }
111 }
112 });
113
114 Ok(Self {
115 child,
116 stdin: Arc::new(Mutex::new(stdin)),
117 pending_requests,
118 next_id,
119 diagnostics,
120 })
121 }
122
123 pub async fn call(&self, method: &str, params: Value) -> Result<Value, String> {
124 let id = self
125 .next_id
126 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
127 let (tx, rx) = oneshot::channel();
128
129 {
130 let mut pending = self.pending_requests.lock().await;
131 pending.insert(id, tx);
132 }
133
134 let request = LspRequest {
135 jsonrpc: "2.0".to_string(),
136 id,
137 method: method.to_string(),
138 params,
139 };
140
141 let body = serde_json::to_string(&request).map_err(|e| e.to_string())?;
142 let header = format!("Content-Length: {}\r\n\r\n", body.len());
143
144 {
145 let mut stdin = self.stdin.lock().await;
146 if let Err(e) = stdin.write_all(header.as_bytes()).await {
147 return Err(format!("LSP Stdin Header Fail: {}", e));
148 }
149 if let Err(e) = stdin.write_all(body.as_bytes()).await {
150 return Err(format!("LSP Stdin Body Fail: {}", e));
151 }
152 if let Err(e) = stdin.flush().await {
153 return Err(format!("LSP Stdin Flush Fail: {}", e));
154 }
155 }
156
157 rx.await
158 .map_err(|_| "LSP Response Channel Closed".to_string())?
159 }
160
161 pub async fn notify(&self, method: &str, params: Value) -> Result<(), String> {
163 let notification = json!({
164 "jsonrpc": "2.0",
165 "method": method,
166 "params": params,
167 });
168
169 let body = serde_json::to_string(¬ification).map_err(|e| e.to_string())?;
170 let header = format!("Content-Length: {}\r\n\r\n", body.len());
171
172 {
173 let mut stdin = self.stdin.lock().await;
174 let _ = stdin.write_all(header.as_bytes()).await;
175 let _ = stdin.write_all(body.as_bytes()).await;
176 let _ = stdin.flush().await;
177 }
178 Ok(())
179 }
180}