Skip to main content

codetether_agent/lsp/
transport.rs

1//! LSP transport layer - stdio implementation with Content-Length framing
2//!
3//! LSP uses a special framing format with Content-Length headers:
4//! ```text
5//! Content-Length: 123\r\n
6//! \r\n
7//! <JSON payload>
8//! ```
9
10use super::types::{JsonRpcNotification, JsonRpcRequest, JsonRpcResponse};
11use anyhow::{Context, Result};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicI64, Ordering};
15use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
16use tokio::process::{Child, Command};
17use tokio::sync::{RwLock, mpsc, oneshot};
18use tracing::{debug, error, trace, warn};
19
20/// LSP Transport for communicating with language servers
21pub struct LspTransport {
22    /// The child process (kept alive for the transport lifetime)
23    _child: Child,
24    /// Channel for sending messages
25    tx: mpsc::Sender<String>,
26    /// Pending requests waiting for responses
27    pending: Arc<RwLock<HashMap<i64, oneshot::Sender<JsonRpcResponse>>>>,
28    /// Request ID counter
29    request_id: AtomicI64,
30    /// Whether the server is initialized
31    initialized: std::sync::atomic::AtomicBool,
32    /// Per-request timeout in milliseconds.
33    timeout_ms: u64,
34    /// Recent stderr lines from the language server for diagnostics.
35    recent_stderr: Arc<RwLock<Vec<String>>>,
36    /// Server command for diagnostics.
37    command: String,
38    /// Last diagnostics published by the language server, keyed by URI.
39    diagnostics: Arc<RwLock<HashMap<String, Vec<lsp_types::Diagnostic>>>>,
40}
41
42impl LspTransport {
43    /// Spawn a language server and create a transport
44    pub async fn spawn(command: &str, args: &[String], timeout_ms: u64) -> Result<Self> {
45        let mut child = Command::new(command)
46            .args(args)
47            .stdin(std::process::Stdio::piped())
48            .stdout(std::process::Stdio::piped())
49            .stderr(std::process::Stdio::piped())
50            .spawn()
51            .with_context(|| format!("Failed to spawn language server '{command}'"))?;
52
53        let stdout = child
54            .stdout
55            .take()
56            .ok_or_else(|| anyhow::anyhow!("No stdout"))?;
57        let stderr = child
58            .stderr
59            .take()
60            .ok_or_else(|| anyhow::anyhow!("No stderr"))?;
61        let mut stdin = child
62            .stdin
63            .take()
64            .ok_or_else(|| anyhow::anyhow!("No stdin"))?;
65
66        let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
67        let pending: Arc<RwLock<HashMap<i64, oneshot::Sender<JsonRpcResponse>>>> =
68            Arc::new(RwLock::new(HashMap::new()));
69        let recent_stderr = Arc::new(RwLock::new(Vec::new()));
70        let diagnostics = Arc::new(RwLock::new(HashMap::new()));
71
72        // Writer task - sends messages with Content-Length framing
73        let pending_clone = Arc::clone(&pending);
74        tokio::spawn(async move {
75            while let Some(msg) = write_rx.recv().await {
76                let content_length = msg.len();
77                let header = format!("Content-Length: {}\r\n\r\n", content_length);
78                trace!("LSP TX header: {}", header.trim());
79                trace!("LSP TX body: {}", msg);
80
81                if let Err(e) = stdin.write_all(header.as_bytes()).await {
82                    error!("Failed to write header to LSP server: {}", e);
83                    break;
84                }
85                if let Err(e) = stdin.write_all(msg.as_bytes()).await {
86                    error!("Failed to write body to LSP server: {}", e);
87                    break;
88                }
89                if let Err(e) = stdin.flush().await {
90                    error!("Failed to flush LSP server stdin: {}", e);
91                    break;
92                }
93            }
94            pending_clone.write().await.clear();
95        });
96
97        // Stderr task - capture recent diagnostics from the language server.
98        let recent_stderr_clone = Arc::clone(&recent_stderr);
99        let stderr_command = command.to_string();
100        tokio::spawn(async move {
101            let mut reader = BufReader::new(stderr);
102            let mut line = String::new();
103            loop {
104                line.clear();
105                match reader.read_line(&mut line).await {
106                    Ok(0) => return,
107                    Ok(_) => {
108                        let trimmed = line.trim().to_string();
109                        if trimmed.is_empty() {
110                            continue;
111                        }
112                        warn!(command = %stderr_command, stderr = %trimmed, "Language server stderr");
113                        let mut guard = recent_stderr_clone.write().await;
114                        guard.push(trimmed);
115                        if guard.len() > 20 {
116                            let excess = guard.len() - 20;
117                            guard.drain(0..excess);
118                        }
119                    }
120                    Err(e) => {
121                        warn!(command = %stderr_command, error = %e, "Failed reading language server stderr");
122                        return;
123                    }
124                }
125            }
126        });
127
128        // Reader task - parses Content-Length framed responses and notifications.
129        let pending_clone = Arc::clone(&pending);
130        let diagnostics_clone = Arc::clone(&diagnostics);
131        tokio::spawn(async move {
132            let mut reader = BufReader::new(stdout);
133            let mut header_buf = String::new();
134
135            loop {
136                header_buf.clear();
137                let mut content_length: Option<usize> = None;
138
139                loop {
140                    header_buf.clear();
141                    match reader.read_line(&mut header_buf).await {
142                        Ok(0) => {
143                            debug!("LSP server closed connection");
144                            return;
145                        }
146                        Ok(_) => {
147                            let line = header_buf.trim();
148                            if line.is_empty() {
149                                break;
150                            }
151                            if let Some(stripped) = line.strip_prefix("Content-Length:")
152                                && let Ok(len) = stripped.trim().parse::<usize>()
153                            {
154                                content_length = Some(len);
155                            }
156                        }
157                        Err(e) => {
158                            error!("Failed to read header from LSP server: {}", e);
159                            return;
160                        }
161                    }
162                }
163
164                let Some(len) = content_length else {
165                    warn!("LSP message missing Content-Length header");
166                    continue;
167                };
168
169                let mut body_buf = vec![0u8; len];
170                match reader.read_exact(&mut body_buf).await {
171                    Ok(_) => {
172                        let body = String::from_utf8_lossy(&body_buf);
173                        trace!("LSP RX: {}", body);
174
175                        if let Ok(response) = serde_json::from_str::<JsonRpcResponse>(&body) {
176                            let mut pending_guard = pending_clone.write().await;
177                            if let Some(tx) = pending_guard.remove(&response.id) {
178                                let id = response.id;
179                                if tx.send(response).is_err() {
180                                    warn!("Request {} receiver dropped", id);
181                                }
182                            } else {
183                                debug!("Received response for unknown request {}", response.id);
184                            }
185                            continue;
186                        }
187
188                        match serde_json::from_str::<serde_json::Value>(&body) {
189                            Ok(value) => {
190                                if value.get("method").and_then(serde_json::Value::as_str)
191                                    == Some("textDocument/publishDiagnostics")
192                                {
193                                    if let Some(params) = value.get("params") {
194                                        let uri = params
195                                            .get("uri")
196                                            .and_then(serde_json::Value::as_str)
197                                            .unwrap_or_default()
198                                            .to_string();
199                                        let diagnostics = params
200                                            .get("diagnostics")
201                                            .cloned()
202                                            .and_then(|v| serde_json::from_value(v).ok())
203                                            .unwrap_or_default();
204                                        if !uri.is_empty() {
205                                            diagnostics_clone
206                                                .write()
207                                                .await
208                                                .insert(uri, diagnostics);
209                                        }
210                                    }
211                                } else {
212                                    debug!(
213                                        "Ignoring LSP notification/message without tracked handler: {}",
214                                        body
215                                    );
216                                }
217                            }
218                            Err(e) => {
219                                debug!("Failed to parse LSP message: {} - body: {}", e, body);
220                            }
221                        }
222                    }
223                    Err(e) => {
224                        error!("Failed to read LSP message body: {}", e);
225                        return;
226                    }
227                }
228            }
229        });
230
231        Ok(Self {
232            _child: child,
233            tx: write_tx,
234            pending,
235            request_id: AtomicI64::new(1),
236            initialized: std::sync::atomic::AtomicBool::new(false),
237            timeout_ms,
238            recent_stderr,
239            command: command.to_string(),
240            diagnostics,
241        })
242    }
243
244    /// Send a request and wait for response
245    pub async fn request(
246        &self,
247        method: &str,
248        params: Option<serde_json::Value>,
249    ) -> Result<JsonRpcResponse> {
250        let id = self.request_id.fetch_add(1, Ordering::SeqCst);
251        let request = JsonRpcRequest::new(id, method, params);
252
253        let (tx, rx) = oneshot::channel();
254        self.pending.write().await.insert(id, tx);
255
256        let json = serde_json::to_string(&request)?;
257        self.tx.send(json).await?;
258
259        let response = tokio::time::timeout(std::time::Duration::from_millis(self.timeout_ms), rx)
260            .await
261            .map_err(|_| {
262                let stderr_summary = self.stderr_summary();
263                anyhow::anyhow!(
264                    "LSP request timeout for method: {} (server: {}, timeout: {}ms{})",
265                    method,
266                    self.command,
267                    self.timeout_ms,
268                    stderr_summary
269                        .as_deref()
270                        .map(|summary| format!(", recent stderr: {summary}"))
271                        .unwrap_or_default()
272                )
273            })?
274            .map_err(|_| anyhow::anyhow!("LSP response channel closed"))?;
275
276        Ok(response)
277    }
278
279    fn stderr_summary(&self) -> Option<String> {
280        self.recent_stderr.try_read().ok().and_then(|lines| {
281            if lines.is_empty() {
282                None
283            } else {
284                Some(lines.join(" | "))
285            }
286        })
287    }
288
289    /// Send a notification (no response expected)
290    pub async fn notify(&self, method: &str, params: Option<serde_json::Value>) -> Result<()> {
291        let notification = JsonRpcNotification::new(method, params);
292        let json = serde_json::to_string(&notification)?;
293        self.tx.send(json).await?;
294        Ok(())
295    }
296
297    /// Return the last diagnostics published by the language server.
298    pub async fn diagnostics_snapshot(&self) -> HashMap<String, Vec<lsp_types::Diagnostic>> {
299        self.diagnostics.read().await.clone()
300    }
301
302    /// Check if the server is initialized
303    pub fn is_initialized(&self) -> bool {
304        self.initialized.load(std::sync::atomic::Ordering::SeqCst)
305    }
306
307    /// Mark the server as initialized
308    pub fn set_initialized(&self, value: bool) {
309        self.initialized
310            .store(value, std::sync::atomic::Ordering::SeqCst);
311    }
312}
313
314impl Drop for LspTransport {
315    fn drop(&mut self) {
316        if self.is_initialized() {
317            tracing::debug!("LspTransport dropped while still initialized");
318        }
319    }
320}