Skip to main content

codetether_agent/lsp/
transport.rs

1//! LSP transport layer - stdio implementation with Content-Length framing
2//!
3//! LSP uses a special framing format with Content-Length headers:
4//! ```text
5//! Content-Length: 123\r\n
6//! \r\n
7//! <JSON payload>
8//! ```
9
10use super::types::{JsonRpcNotification, JsonRpcRequest, JsonRpcResponse};
11use anyhow::Result;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicI64, Ordering};
14use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
15use tokio::process::{Child, Command};
16use tokio::sync::{RwLock, mpsc, oneshot};
17use tracing::{debug, error, trace, warn};
18
19/// LSP Transport for communicating with language servers
20pub struct LspTransport {
21    /// The child process (kept alive for the transport lifetime)
22    _child: Child,
23    /// Channel for sending messages
24    tx: mpsc::Sender<String>,
25    /// Pending requests waiting for responses
26    pending: Arc<RwLock<std::collections::HashMap<i64, oneshot::Sender<JsonRpcResponse>>>>,
27    /// Request ID counter
28    request_id: AtomicI64,
29    /// Whether the server is initialized
30    initialized: std::sync::atomic::AtomicBool,
31}
32
33impl LspTransport {
34    /// Spawn a language server and create a transport
35    pub async fn spawn(command: &str, args: &[String]) -> Result<Self> {
36        let mut child = Command::new(command)
37            .args(args)
38            .stdin(std::process::Stdio::piped())
39            .stdout(std::process::Stdio::piped())
40            .stderr(std::process::Stdio::inherit())
41            .spawn()?;
42
43        let stdout = child
44            .stdout
45            .take()
46            .ok_or_else(|| anyhow::anyhow!("No stdout"))?;
47        let mut stdin = child
48            .stdin
49            .take()
50            .ok_or_else(|| anyhow::anyhow!("No stdin"))?;
51
52        let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
53        let pending: Arc<RwLock<std::collections::HashMap<i64, oneshot::Sender<JsonRpcResponse>>>> =
54            Arc::new(RwLock::new(std::collections::HashMap::new()));
55
56        // Writer task - sends messages with Content-Length framing
57        let pending_clone = Arc::clone(&pending);
58        tokio::spawn(async move {
59            while let Some(msg) = write_rx.recv().await {
60                let content_length = msg.len();
61                let header = format!("Content-Length: {}\r\n\r\n", content_length);
62                trace!("LSP TX header: {}", header.trim());
63                trace!("LSP TX body: {}", msg);
64
65                if let Err(e) = stdin.write_all(header.as_bytes()).await {
66                    error!("Failed to write header to LSP server: {}", e);
67                    break;
68                }
69                if let Err(e) = stdin.write_all(msg.as_bytes()).await {
70                    error!("Failed to write body to LSP server: {}", e);
71                    break;
72                }
73                if let Err(e) = stdin.flush().await {
74                    error!("Failed to flush LSP server stdin: {}", e);
75                    break;
76                }
77            }
78            // Clear pending requests on shutdown
79            pending_clone.write().await.clear();
80        });
81
82        // Reader task - parses Content-Length framed responses
83        let pending_clone = Arc::clone(&pending);
84        tokio::spawn(async move {
85            let mut reader = BufReader::new(stdout);
86            let mut header_buf = String::new();
87
88            loop {
89                // Read headers until empty line
90                header_buf.clear();
91                let mut content_length: Option<usize> = None;
92
93                loop {
94                    header_buf.clear();
95                    match reader.read_line(&mut header_buf).await {
96                        Ok(0) => {
97                            debug!("LSP server closed connection");
98                            return;
99                        }
100                        Ok(_) => {
101                            let line = header_buf.trim();
102                            if line.is_empty() {
103                                break; // End of headers
104                            }
105                            if let Some(stripped) = line.strip_prefix("Content-Length:") {
106                                if let Ok(len) = stripped.trim().parse::<usize>() {
107                                    content_length = Some(len);
108                                }
109                            }
110                            // Ignore other headers (Content-Type, etc.)
111                        }
112                        Err(e) => {
113                            error!("Failed to read header from LSP server: {}", e);
114                            return;
115                        }
116                    }
117                }
118
119                let Some(len) = content_length else {
120                    warn!("LSP message missing Content-Length header");
121                    continue;
122                };
123
124                // Read the body
125                let mut body_buf = vec![0u8; len];
126                match reader.read_exact(&mut body_buf).await {
127                    Ok(_) => {
128                        let body = String::from_utf8_lossy(&body_buf);
129                        trace!("LSP RX: {}", body);
130
131                        // Parse as JSON-RPC response
132                        match serde_json::from_str::<JsonRpcResponse>(&body) {
133                            Ok(response) => {
134                                // Find and complete the pending request
135                                let mut pending_guard = pending_clone.write().await;
136                                if let Some(tx) = pending_guard.remove(&response.id) {
137                                    let id = response.id;
138                                    if tx.send(response).is_err() {
139                                        warn!("Request {} receiver dropped", id);
140                                    }
141                                } else {
142                                    // Could be a response to a notification or unknown request
143                                    debug!("Received response for unknown request {}", response.id);
144                                }
145                            }
146                            Err(e) => {
147                                // Might be a notification (no id) or malformed
148                                debug!("Failed to parse LSP response: {} - body: {}", e, body);
149                            }
150                        }
151                    }
152                    Err(e) => {
153                        error!("Failed to read LSP message body: {}", e);
154                        return;
155                    }
156                }
157            }
158        });
159
160        Ok(Self {
161            _child: child,
162            tx: write_tx,
163            pending,
164            request_id: AtomicI64::new(1),
165            initialized: std::sync::atomic::AtomicBool::new(false),
166        })
167    }
168
169    /// Send a request and wait for response
170    pub async fn request(
171        &self,
172        method: &str,
173        params: Option<serde_json::Value>,
174    ) -> Result<JsonRpcResponse> {
175        let id = self.request_id.fetch_add(1, Ordering::SeqCst);
176        let request = JsonRpcRequest::new(id, method, params);
177
178        let (tx, rx) = oneshot::channel();
179        self.pending.write().await.insert(id, tx);
180
181        let json = serde_json::to_string(&request)?;
182        self.tx.send(json).await?;
183
184        // Wait for response with timeout
185        let response = tokio::time::timeout(std::time::Duration::from_secs(30), rx)
186            .await
187            .map_err(|_| anyhow::anyhow!("LSP request timeout for method: {}", method))?
188            .map_err(|_| anyhow::anyhow!("LSP response channel closed"))?;
189
190        Ok(response)
191    }
192
193    /// Send a notification (no response expected)
194    pub async fn notify(&self, method: &str, params: Option<serde_json::Value>) -> Result<()> {
195        let notification = JsonRpcNotification::new(method, params);
196        let json = serde_json::to_string(&notification)?;
197        self.tx.send(json).await?;
198        Ok(())
199    }
200
201    /// Check if the server is initialized
202    pub fn is_initialized(&self) -> bool {
203        self.initialized.load(std::sync::atomic::Ordering::SeqCst)
204    }
205
206    /// Mark the server as initialized
207    pub fn set_initialized(&self, value: bool) {
208        self.initialized
209            .store(value, std::sync::atomic::Ordering::SeqCst);
210    }
211}
212
213impl Drop for LspTransport {
214    fn drop(&mut self) {
215        if self.is_initialized() {
216            tracing::debug!("LspTransport dropped while still initialized");
217        }
218    }
219}