Skip to main content

hematite/agent/lsp/
client.rs

1use serde::Serialize;
2use serde_json::{json, Value};
3use std::collections::HashMap;
4use std::process::Stdio;
5use std::sync::Arc;
6use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
7use tokio::process::{Child, Command};
8use tokio::sync::{oneshot, Mutex};
9
10/// LSP JSON-RPC Request object
11#[derive(Serialize)]
12pub struct LspRequest {
13    pub jsonrpc: String,
14    pub id: u64,
15    pub method: String,
16    pub params: Value,
17}
18
19/// A robust, async-first LSP client for Hematite-CLI.
20pub struct LspClient {
21    #[allow(dead_code)]
22    child: Child,
23    stdin: Arc<Mutex<tokio::process::ChildStdin>>,
24    pending_requests: Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Value, String>>>>>,
25    pub next_id: Arc<std::sync::atomic::AtomicU64>,
26    /// Layer 9: Diagnostic Storage (Pinned to URI)
27    pub diagnostics: Arc<Mutex<HashMap<String, Value>>>,
28}
29
30impl LspClient {
31    pub fn spawn(
32        command: &str,
33        args: &[String],
34    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
35        let mut child = Command::new(command)
36            .args(args)
37            .stdin(Stdio::piped())
38            .stdout(Stdio::piped())
39            .stderr(Stdio::inherit())
40            .spawn()?;
41
42        let stdin = child.stdin.take().ok_or("Failed to open stdin")?;
43        let stdout = child.stdout.take().ok_or("Failed to open stdout")?;
44
45        let pending_requests: Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Value, String>>>>> =
46            Arc::new(Mutex::new(HashMap::new()));
47        let next_id = Arc::new(std::sync::atomic::AtomicU64::new(1));
48        let diagnostics: Arc<Mutex<HashMap<String, Value>>> = Arc::new(Mutex::new(HashMap::new()));
49
50        let pending_requests_clone = pending_requests.clone();
51        let diagnostics_clone = diagnostics.clone();
52
53        // Background thread to read LSP stdout
54        tokio::spawn(async move {
55            let mut reader = BufReader::new(stdout);
56            let mut line = String::new();
57
58            loop {
59                line.clear();
60                let n = match reader.read_line(&mut line).await {
61                    Ok(n) => n,
62                    Err(_) => break,
63                };
64                if n == 0 {
65                    break;
66                }
67
68                if line.starts_with("Content-Length: ") {
69                    let len: usize = line["Content-Length: ".len()..].trim().parse().unwrap_or(0);
70                    line.clear();
71                    let _ = reader.read_line(&mut line).await;
72
73                    let mut body = vec![0u8; len];
74                    if let Err(_) = reader.read_exact(&mut body).await {
75                        break;
76                    }
77
78                    if let Ok(json_body) = serde_json::from_slice::<Value>(&body) {
79                        if let Some(id) = json_body.get("id").and_then(|v| v.as_u64()) {
80                            let mut pending = pending_requests_clone.lock().await;
81                            if let Some(tx) = pending.remove(&id) {
82                                if let Some(err) = json_body.get("error") {
83                                    let _ = tx.send(Err(err.to_string()));
84                                } else {
85                                    let result =
86                                        json_body.get("result").cloned().unwrap_or(Value::Null);
87                                    let _ = tx.send(Ok(result));
88                                }
89                            }
90                        } else if let Some(method) =
91                            json_body.get("method").and_then(|v| v.as_str())
92                        {
93                            // This is a notification
94                            if method == "textDocument/publishDiagnostics" {
95                                if let Some(params) = json_body.get("params") {
96                                    if let Some(uri) = params.get("uri").and_then(|v| v.as_str()) {
97                                        let mut diags = diagnostics_clone.lock().await;
98                                        diags.insert(
99                                            uri.to_string(),
100                                            params
101                                                .get("diagnostics")
102                                                .cloned()
103                                                .unwrap_or(Value::Null),
104                                        );
105                                    }
106                                }
107                            }
108                        }
109                    }
110                }
111            }
112        });
113
114        Ok(Self {
115            child,
116            stdin: Arc::new(Mutex::new(stdin)),
117            pending_requests,
118            next_id,
119            diagnostics,
120        })
121    }
122
123    pub async fn call(&self, method: &str, params: Value) -> Result<Value, String> {
124        let id = self
125            .next_id
126            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
127        let (tx, rx) = oneshot::channel();
128
129        {
130            let mut pending = self.pending_requests.lock().await;
131            pending.insert(id, tx);
132        }
133
134        let request = LspRequest {
135            jsonrpc: "2.0".to_string(),
136            id,
137            method: method.to_string(),
138            params,
139        };
140
141        let body = serde_json::to_string(&request).map_err(|e| e.to_string())?;
142        let header = format!("Content-Length: {}\r\n\r\n", body.len());
143
144        {
145            let mut stdin = self.stdin.lock().await;
146            if let Err(e) = stdin.write_all(header.as_bytes()).await {
147                return Err(format!("LSP Stdin Header Fail: {}", e));
148            }
149            if let Err(e) = stdin.write_all(body.as_bytes()).await {
150                return Err(format!("LSP Stdin Body Fail: {}", e));
151            }
152            if let Err(e) = stdin.flush().await {
153                return Err(format!("LSP Stdin Flush Fail: {}", e));
154            }
155        }
156
157        rx.await
158            .map_err(|_| "LSP Response Channel Closed".to_string())?
159    }
160
161    /// Sends an LSP notification (no response expected).
162    pub async fn notify(&self, method: &str, params: Value) -> Result<(), String> {
163        let notification = json!({
164            "jsonrpc": "2.0",
165            "method": method,
166            "params": params,
167        });
168
169        let body = serde_json::to_string(&notification).map_err(|e| e.to_string())?;
170        let header = format!("Content-Length: {}\r\n\r\n", body.len());
171
172        {
173            let mut stdin = self.stdin.lock().await;
174            let _ = stdin.write_all(header.as_bytes()).await;
175            let _ = stdin.write_all(body.as_bytes()).await;
176            let _ = stdin.flush().await;
177        }
178        Ok(())
179    }
180}