codetether_agent/lsp/
transport.rs1use super::types::{JsonRpcNotification, JsonRpcRequest, JsonRpcResponse};
11use anyhow::{Context, Result};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicI64, Ordering};
15use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
16use tokio::process::{Child, Command};
17use tokio::sync::{RwLock, mpsc, oneshot};
18use tracing::{debug, error, trace, warn};
19
20pub struct LspTransport {
22 _child: Child,
24 tx: mpsc::Sender<String>,
26 pending: Arc<RwLock<HashMap<i64, oneshot::Sender<JsonRpcResponse>>>>,
28 request_id: AtomicI64,
30 initialized: std::sync::atomic::AtomicBool,
32 timeout_ms: u64,
34 recent_stderr: Arc<RwLock<Vec<String>>>,
36 command: String,
38 diagnostics: Arc<RwLock<HashMap<String, Vec<lsp_types::Diagnostic>>>>,
40}
41
42impl LspTransport {
43 pub async fn spawn(command: &str, args: &[String], timeout_ms: u64) -> Result<Self> {
45 let mut child = Command::new(command)
46 .args(args)
47 .stdin(std::process::Stdio::piped())
48 .stdout(std::process::Stdio::piped())
49 .stderr(std::process::Stdio::piped())
50 .spawn()
51 .with_context(|| format!("Failed to spawn language server '{command}'"))?;
52
53 let stdout = child
54 .stdout
55 .take()
56 .ok_or_else(|| anyhow::anyhow!("No stdout"))?;
57 let stderr = child
58 .stderr
59 .take()
60 .ok_or_else(|| anyhow::anyhow!("No stderr"))?;
61 let mut stdin = child
62 .stdin
63 .take()
64 .ok_or_else(|| anyhow::anyhow!("No stdin"))?;
65
66 let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
67 let pending: Arc<RwLock<HashMap<i64, oneshot::Sender<JsonRpcResponse>>>> =
68 Arc::new(RwLock::new(HashMap::new()));
69 let recent_stderr = Arc::new(RwLock::new(Vec::new()));
70 let diagnostics = Arc::new(RwLock::new(HashMap::new()));
71
72 let pending_clone = Arc::clone(&pending);
74 tokio::spawn(async move {
75 while let Some(msg) = write_rx.recv().await {
76 let content_length = msg.len();
77 let header = format!("Content-Length: {}\r\n\r\n", content_length);
78 trace!("LSP TX header: {}", header.trim());
79 trace!("LSP TX body: {}", msg);
80
81 if let Err(e) = stdin.write_all(header.as_bytes()).await {
82 error!("Failed to write header to LSP server: {}", e);
83 break;
84 }
85 if let Err(e) = stdin.write_all(msg.as_bytes()).await {
86 error!("Failed to write body to LSP server: {}", e);
87 break;
88 }
89 if let Err(e) = stdin.flush().await {
90 error!("Failed to flush LSP server stdin: {}", e);
91 break;
92 }
93 }
94 pending_clone.write().await.clear();
95 });
96
97 let recent_stderr_clone = Arc::clone(&recent_stderr);
99 let stderr_command = command.to_string();
100 tokio::spawn(async move {
101 let mut reader = BufReader::new(stderr);
102 let mut line = String::new();
103 loop {
104 line.clear();
105 match reader.read_line(&mut line).await {
106 Ok(0) => return,
107 Ok(_) => {
108 let trimmed = line.trim().to_string();
109 if trimmed.is_empty() {
110 continue;
111 }
112 warn!(command = %stderr_command, stderr = %trimmed, "Language server stderr");
113 let mut guard = recent_stderr_clone.write().await;
114 guard.push(trimmed);
115 if guard.len() > 20 {
116 let excess = guard.len() - 20;
117 guard.drain(0..excess);
118 }
119 }
120 Err(e) => {
121 warn!(command = %stderr_command, error = %e, "Failed reading language server stderr");
122 return;
123 }
124 }
125 }
126 });
127
128 let pending_clone = Arc::clone(&pending);
130 let diagnostics_clone = Arc::clone(&diagnostics);
131 tokio::spawn(async move {
132 let mut reader = BufReader::new(stdout);
133 let mut header_buf = String::new();
134
135 loop {
136 header_buf.clear();
137 let mut content_length: Option<usize> = None;
138
139 loop {
140 header_buf.clear();
141 match reader.read_line(&mut header_buf).await {
142 Ok(0) => {
143 debug!("LSP server closed connection");
144 return;
145 }
146 Ok(_) => {
147 let line = header_buf.trim();
148 if line.is_empty() {
149 break;
150 }
151 if let Some(stripped) = line.strip_prefix("Content-Length:")
152 && let Ok(len) = stripped.trim().parse::<usize>()
153 {
154 content_length = Some(len);
155 }
156 }
157 Err(e) => {
158 error!("Failed to read header from LSP server: {}", e);
159 return;
160 }
161 }
162 }
163
164 let Some(len) = content_length else {
165 warn!("LSP message missing Content-Length header");
166 continue;
167 };
168
169 let mut body_buf = vec![0u8; len];
170 match reader.read_exact(&mut body_buf).await {
171 Ok(_) => {
172 let body = String::from_utf8_lossy(&body_buf);
173 trace!("LSP RX: {}", body);
174
175 if let Ok(response) = serde_json::from_str::<JsonRpcResponse>(&body) {
176 let mut pending_guard = pending_clone.write().await;
177 if let Some(tx) = pending_guard.remove(&response.id) {
178 let id = response.id;
179 if tx.send(response).is_err() {
180 warn!("Request {} receiver dropped", id);
181 }
182 } else {
183 debug!("Received response for unknown request {}", response.id);
184 }
185 continue;
186 }
187
188 match serde_json::from_str::<serde_json::Value>(&body) {
189 Ok(value) => {
190 if value.get("method").and_then(serde_json::Value::as_str)
191 == Some("textDocument/publishDiagnostics")
192 {
193 if let Some(params) = value.get("params") {
194 let uri = params
195 .get("uri")
196 .and_then(serde_json::Value::as_str)
197 .unwrap_or_default()
198 .to_string();
199 let diagnostics = params
200 .get("diagnostics")
201 .cloned()
202 .and_then(|v| serde_json::from_value(v).ok())
203 .unwrap_or_default();
204 if !uri.is_empty() {
205 diagnostics_clone
206 .write()
207 .await
208 .insert(uri, diagnostics);
209 }
210 }
211 } else {
212 debug!(
213 "Ignoring LSP notification/message without tracked handler: {}",
214 body
215 );
216 }
217 }
218 Err(e) => {
219 debug!("Failed to parse LSP message: {} - body: {}", e, body);
220 }
221 }
222 }
223 Err(e) => {
224 error!("Failed to read LSP message body: {}", e);
225 return;
226 }
227 }
228 }
229 });
230
231 Ok(Self {
232 _child: child,
233 tx: write_tx,
234 pending,
235 request_id: AtomicI64::new(1),
236 initialized: std::sync::atomic::AtomicBool::new(false),
237 timeout_ms,
238 recent_stderr,
239 command: command.to_string(),
240 diagnostics,
241 })
242 }
243
244 pub async fn request(
246 &self,
247 method: &str,
248 params: Option<serde_json::Value>,
249 ) -> Result<JsonRpcResponse> {
250 let id = self.request_id.fetch_add(1, Ordering::SeqCst);
251 let request = JsonRpcRequest::new(id, method, params);
252
253 let (tx, rx) = oneshot::channel();
254 self.pending.write().await.insert(id, tx);
255
256 let json = serde_json::to_string(&request)?;
257 self.tx.send(json).await?;
258
259 let response = tokio::time::timeout(std::time::Duration::from_millis(self.timeout_ms), rx)
260 .await
261 .map_err(|_| {
262 let stderr_summary = self.stderr_summary();
263 anyhow::anyhow!(
264 "LSP request timeout for method: {} (server: {}, timeout: {}ms{})",
265 method,
266 self.command,
267 self.timeout_ms,
268 stderr_summary
269 .as_deref()
270 .map(|summary| format!(", recent stderr: {summary}"))
271 .unwrap_or_default()
272 )
273 })?
274 .map_err(|_| anyhow::anyhow!("LSP response channel closed"))?;
275
276 Ok(response)
277 }
278
279 fn stderr_summary(&self) -> Option<String> {
280 self.recent_stderr.try_read().ok().and_then(|lines| {
281 if lines.is_empty() {
282 None
283 } else {
284 Some(lines.join(" | "))
285 }
286 })
287 }
288
289 pub async fn notify(&self, method: &str, params: Option<serde_json::Value>) -> Result<()> {
291 let notification = JsonRpcNotification::new(method, params);
292 let json = serde_json::to_string(¬ification)?;
293 self.tx.send(json).await?;
294 Ok(())
295 }
296
297 pub async fn diagnostics_snapshot(&self) -> HashMap<String, Vec<lsp_types::Diagnostic>> {
299 self.diagnostics.read().await.clone()
300 }
301
302 pub fn is_initialized(&self) -> bool {
304 self.initialized.load(std::sync::atomic::Ordering::SeqCst)
305 }
306
307 pub fn set_initialized(&self, value: bool) {
309 self.initialized
310 .store(value, std::sync::atomic::Ordering::SeqCst);
311 }
312}
313
314impl Drop for LspTransport {
315 fn drop(&mut self) {
316 if self.is_initialized() {
317 tracing::debug!("LspTransport dropped while still initialized");
318 }
319 }
320}