hematite/agent/lsp/
client.rs1use serde::Serialize;
2use serde_json::{json, Value};
3use std::collections::HashMap;
4use std::process::Stdio;
5use std::sync::Arc;
6use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
7use tokio::process::{Child, Command};
8use tokio::sync::{oneshot, Mutex};
9
10#[derive(Serialize)]
12pub struct LspRequest {
13 pub jsonrpc: String,
14 pub id: u64,
15 pub method: String,
16 pub params: Value,
17}
18
19type PendingRequests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Value, String>>>>>;
20
21pub struct LspClient {
23 #[allow(dead_code)]
24 child: Child,
25 stdin: Arc<Mutex<tokio::process::ChildStdin>>,
26 pending_requests: PendingRequests,
27 pub next_id: Arc<std::sync::atomic::AtomicU64>,
28 pub diagnostics: Arc<Mutex<HashMap<String, Value>>>,
30}
31
32impl LspClient {
33 pub fn spawn(
34 command: &str,
35 args: &[String],
36 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
37 let mut child = Command::new(command)
38 .args(args)
39 .stdin(Stdio::piped())
40 .stdout(Stdio::piped())
41 .stderr(Stdio::inherit())
42 .spawn()?;
43
44 let stdin = child.stdin.take().ok_or("Failed to open stdin")?;
45 let stdout = child.stdout.take().ok_or("Failed to open stdout")?;
46
47 let pending_requests: PendingRequests = Arc::new(Mutex::new(HashMap::new()));
48 let next_id = Arc::new(std::sync::atomic::AtomicU64::new(1));
49 let diagnostics: Arc<Mutex<HashMap<String, Value>>> = Arc::new(Mutex::new(HashMap::new()));
50
51 let pending_requests_clone = pending_requests.clone();
52 let diagnostics_clone = diagnostics.clone();
53
54 tokio::spawn(async move {
56 let mut reader = BufReader::new(stdout);
57 let mut line = String::new();
58
59 loop {
60 line.clear();
61 let n = match reader.read_line(&mut line).await {
62 Ok(n) => n,
63 Err(_) => break,
64 };
65 if n == 0 {
66 break;
67 }
68
69 if line.starts_with("Content-Length: ") {
70 let len: usize = line["Content-Length: ".len()..].trim().parse().unwrap_or(0);
71 line.clear();
72 let _ = reader.read_line(&mut line).await;
73
74 let mut body = vec![0u8; len];
75 if (reader.read_exact(&mut body).await).is_err() {
76 break;
77 }
78
79 if let Ok(json_body) = serde_json::from_slice::<Value>(&body) {
80 if let Some(id) = json_body.get("id").and_then(|v| v.as_u64()) {
81 let mut pending = pending_requests_clone.lock().await;
82 if let Some(tx) = pending.remove(&id) {
83 if let Some(err) = json_body.get("error") {
84 let _ = tx.send(Err(err.to_string()));
85 } else {
86 let result =
87 json_body.get("result").cloned().unwrap_or(Value::Null);
88 let _ = tx.send(Ok(result));
89 }
90 }
91 } else if let Some(method) =
92 json_body.get("method").and_then(|v| v.as_str())
93 {
94 if method == "textDocument/publishDiagnostics" {
96 if let Some(params) = json_body.get("params") {
97 if let Some(uri) = params.get("uri").and_then(|v| v.as_str()) {
98 let mut diags = diagnostics_clone.lock().await;
99 diags.insert(
100 uri.to_string(),
101 params
102 .get("diagnostics")
103 .cloned()
104 .unwrap_or(Value::Null),
105 );
106 }
107 }
108 }
109 }
110 }
111 }
112 }
113 });
114
115 Ok(Self {
116 child,
117 stdin: Arc::new(Mutex::new(stdin)),
118 pending_requests,
119 next_id,
120 diagnostics,
121 })
122 }
123
124 pub async fn call(&self, method: &str, params: Value) -> Result<Value, String> {
125 let id = self
126 .next_id
127 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
128 let (tx, rx) = oneshot::channel();
129
130 {
131 let mut pending = self.pending_requests.lock().await;
132 pending.insert(id, tx);
133 }
134
135 let request = LspRequest {
136 jsonrpc: "2.0".to_string(),
137 id,
138 method: method.to_string(),
139 params,
140 };
141
142 let body = serde_json::to_string(&request).map_err(|e| e.to_string())?;
143 let header = format!("Content-Length: {}\r\n\r\n", body.len());
144
145 {
146 let mut stdin = self.stdin.lock().await;
147 if let Err(e) = stdin.write_all(header.as_bytes()).await {
148 return Err(format!("LSP Stdin Header Fail: {}", e));
149 }
150 if let Err(e) = stdin.write_all(body.as_bytes()).await {
151 return Err(format!("LSP Stdin Body Fail: {}", e));
152 }
153 if let Err(e) = stdin.flush().await {
154 return Err(format!("LSP Stdin Flush Fail: {}", e));
155 }
156 }
157
158 rx.await
159 .map_err(|_| "LSP Response Channel Closed".to_string())?
160 }
161
162 pub async fn notify(&self, method: &str, params: Value) -> Result<(), String> {
164 let notification = json!({
165 "jsonrpc": "2.0",
166 "method": method,
167 "params": params,
168 });
169
170 let body = serde_json::to_string(¬ification).map_err(|e| e.to_string())?;
171 let header = format!("Content-Length: {}\r\n\r\n", body.len());
172
173 {
174 let mut stdin = self.stdin.lock().await;
175 let _ = stdin.write_all(header.as_bytes()).await;
176 let _ = stdin.write_all(body.as_bytes()).await;
177 let _ = stdin.flush().await;
178 }
179 Ok(())
180 }
181}