Skip to main content

hematite/agent/lsp/
client.rs

1use serde::Serialize;
2use serde_json::{json, Value};
3use std::collections::HashMap;
4use std::process::Stdio;
5use std::sync::Arc;
6use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
7use tokio::process::{Child, Command};
8use tokio::sync::{oneshot, Mutex};
9
10/// LSP JSON-RPC Request object
11#[derive(Serialize)]
12pub struct LspRequest {
13    pub jsonrpc: String,
14    pub id: u64,
15    pub method: String,
16    pub params: Value,
17}
18
19type PendingRequests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Value, String>>>>>;
20
21/// A robust, async-first LSP client for Hematite-CLI.
22pub struct LspClient {
23    #[allow(dead_code)]
24    child: Child,
25    stdin: Arc<Mutex<tokio::process::ChildStdin>>,
26    pending_requests: PendingRequests,
27    pub next_id: Arc<std::sync::atomic::AtomicU64>,
28    /// Layer 9: Diagnostic Storage (Pinned to URI)
29    pub diagnostics: Arc<Mutex<HashMap<String, Value>>>,
30}
31
32impl LspClient {
33    pub fn spawn(
34        command: &str,
35        args: &[String],
36    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
37        let mut child = Command::new(command)
38            .args(args)
39            .stdin(Stdio::piped())
40            .stdout(Stdio::piped())
41            .stderr(Stdio::inherit())
42            .spawn()?;
43
44        let stdin = child.stdin.take().ok_or("Failed to open stdin")?;
45        let stdout = child.stdout.take().ok_or("Failed to open stdout")?;
46
47        let pending_requests: PendingRequests = Arc::new(Mutex::new(HashMap::new()));
48        let next_id = Arc::new(std::sync::atomic::AtomicU64::new(1));
49        let diagnostics: Arc<Mutex<HashMap<String, Value>>> = Arc::new(Mutex::new(HashMap::new()));
50
51        let pending_requests_clone = pending_requests.clone();
52        let diagnostics_clone = diagnostics.clone();
53
54        // Background thread to read LSP stdout
55        tokio::spawn(async move {
56            let mut reader = BufReader::new(stdout);
57            let mut line = String::new();
58
59            loop {
60                line.clear();
61                let n = match reader.read_line(&mut line).await {
62                    Ok(n) => n,
63                    Err(_) => break,
64                };
65                if n == 0 {
66                    break;
67                }
68
69                if line.starts_with("Content-Length: ") {
70                    let len: usize = line["Content-Length: ".len()..].trim().parse().unwrap_or(0);
71                    line.clear();
72                    let _ = reader.read_line(&mut line).await;
73
74                    let mut body = vec![0u8; len];
75                    if (reader.read_exact(&mut body).await).is_err() {
76                        break;
77                    }
78
79                    if let Ok(json_body) = serde_json::from_slice::<Value>(&body) {
80                        if let Some(id) = json_body.get("id").and_then(|v| v.as_u64()) {
81                            let mut pending = pending_requests_clone.lock().await;
82                            if let Some(tx) = pending.remove(&id) {
83                                if let Some(err) = json_body.get("error") {
84                                    let _ = tx.send(Err(err.to_string()));
85                                } else {
86                                    let result =
87                                        json_body.get("result").cloned().unwrap_or(Value::Null);
88                                    let _ = tx.send(Ok(result));
89                                }
90                            }
91                        } else if let Some(method) =
92                            json_body.get("method").and_then(|v| v.as_str())
93                        {
94                            // This is a notification
95                            if method == "textDocument/publishDiagnostics" {
96                                if let Some(params) = json_body.get("params") {
97                                    if let Some(uri) = params.get("uri").and_then(|v| v.as_str()) {
98                                        let mut diags = diagnostics_clone.lock().await;
99                                        diags.insert(
100                                            uri.to_string(),
101                                            params
102                                                .get("diagnostics")
103                                                .cloned()
104                                                .unwrap_or(Value::Null),
105                                        );
106                                    }
107                                }
108                            }
109                        }
110                    }
111                }
112            }
113        });
114
115        Ok(Self {
116            child,
117            stdin: Arc::new(Mutex::new(stdin)),
118            pending_requests,
119            next_id,
120            diagnostics,
121        })
122    }
123
124    pub async fn call(&self, method: &str, params: Value) -> Result<Value, String> {
125        let id = self
126            .next_id
127            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
128        let (tx, rx) = oneshot::channel();
129
130        {
131            let mut pending = self.pending_requests.lock().await;
132            pending.insert(id, tx);
133        }
134
135        let request = LspRequest {
136            jsonrpc: "2.0".to_string(),
137            id,
138            method: method.to_string(),
139            params,
140        };
141
142        let body = serde_json::to_string(&request).map_err(|e| e.to_string())?;
143        let header = format!("Content-Length: {}\r\n\r\n", body.len());
144
145        {
146            let mut stdin = self.stdin.lock().await;
147            if let Err(e) = stdin.write_all(header.as_bytes()).await {
148                return Err(format!("LSP Stdin Header Fail: {}", e));
149            }
150            if let Err(e) = stdin.write_all(body.as_bytes()).await {
151                return Err(format!("LSP Stdin Body Fail: {}", e));
152            }
153            if let Err(e) = stdin.flush().await {
154                return Err(format!("LSP Stdin Flush Fail: {}", e));
155            }
156        }
157
158        rx.await
159            .map_err(|_| "LSP Response Channel Closed".to_string())?
160    }
161
162    /// Sends an LSP notification (no response expected).
163    pub async fn notify(&self, method: &str, params: Value) -> Result<(), String> {
164        let notification = json!({
165            "jsonrpc": "2.0",
166            "method": method,
167            "params": params,
168        });
169
170        let body = serde_json::to_string(&notification).map_err(|e| e.to_string())?;
171        let header = format!("Content-Length: {}\r\n\r\n", body.len());
172
173        {
174            let mut stdin = self.stdin.lock().await;
175            let _ = stdin.write_all(header.as_bytes()).await;
176            let _ = stdin.write_all(body.as_bytes()).await;
177            let _ = stdin.flush().await;
178        }
179        Ok(())
180    }
181}