codetether_agent/lsp/
transport.rs1use super::types::{JsonRpcNotification, JsonRpcRequest, JsonRpcResponse};
11use anyhow::Result;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicI64, Ordering};
14use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
15use tokio::process::{Child, Command};
16use tokio::sync::{RwLock, mpsc, oneshot};
17use tracing::{debug, error, trace, warn};
18
19pub struct LspTransport {
21 _child: Child,
23 tx: mpsc::Sender<String>,
25 pending: Arc<RwLock<std::collections::HashMap<i64, oneshot::Sender<JsonRpcResponse>>>>,
27 request_id: AtomicI64,
29 initialized: std::sync::atomic::AtomicBool,
31}
32
33impl LspTransport {
34 pub async fn spawn(command: &str, args: &[String]) -> Result<Self> {
36 let mut child = Command::new(command)
37 .args(args)
38 .stdin(std::process::Stdio::piped())
39 .stdout(std::process::Stdio::piped())
40 .stderr(std::process::Stdio::inherit())
41 .spawn()?;
42
43 let stdout = child
44 .stdout
45 .take()
46 .ok_or_else(|| anyhow::anyhow!("No stdout"))?;
47 let mut stdin = child
48 .stdin
49 .take()
50 .ok_or_else(|| anyhow::anyhow!("No stdin"))?;
51
52 let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
53 let pending: Arc<RwLock<std::collections::HashMap<i64, oneshot::Sender<JsonRpcResponse>>>> =
54 Arc::new(RwLock::new(std::collections::HashMap::new()));
55
56 let pending_clone = Arc::clone(&pending);
58 tokio::spawn(async move {
59 while let Some(msg) = write_rx.recv().await {
60 let content_length = msg.len();
61 let header = format!("Content-Length: {}\r\n\r\n", content_length);
62 trace!("LSP TX header: {}", header.trim());
63 trace!("LSP TX body: {}", msg);
64
65 if let Err(e) = stdin.write_all(header.as_bytes()).await {
66 error!("Failed to write header to LSP server: {}", e);
67 break;
68 }
69 if let Err(e) = stdin.write_all(msg.as_bytes()).await {
70 error!("Failed to write body to LSP server: {}", e);
71 break;
72 }
73 if let Err(e) = stdin.flush().await {
74 error!("Failed to flush LSP server stdin: {}", e);
75 break;
76 }
77 }
78 pending_clone.write().await.clear();
80 });
81
82 let pending_clone = Arc::clone(&pending);
84 tokio::spawn(async move {
85 let mut reader = BufReader::new(stdout);
86 let mut header_buf = String::new();
87
88 loop {
89 header_buf.clear();
91 let mut content_length: Option<usize> = None;
92
93 loop {
94 header_buf.clear();
95 match reader.read_line(&mut header_buf).await {
96 Ok(0) => {
97 debug!("LSP server closed connection");
98 return;
99 }
100 Ok(_) => {
101 let line = header_buf.trim();
102 if line.is_empty() {
103 break; }
105 if let Some(stripped) = line.strip_prefix("Content-Length:") {
106 if let Ok(len) = stripped.trim().parse::<usize>() {
107 content_length = Some(len);
108 }
109 }
110 }
112 Err(e) => {
113 error!("Failed to read header from LSP server: {}", e);
114 return;
115 }
116 }
117 }
118
119 let Some(len) = content_length else {
120 warn!("LSP message missing Content-Length header");
121 continue;
122 };
123
124 let mut body_buf = vec![0u8; len];
126 match reader.read_exact(&mut body_buf).await {
127 Ok(_) => {
128 let body = String::from_utf8_lossy(&body_buf);
129 trace!("LSP RX: {}", body);
130
131 match serde_json::from_str::<JsonRpcResponse>(&body) {
133 Ok(response) => {
134 let mut pending_guard = pending_clone.write().await;
136 if let Some(tx) = pending_guard.remove(&response.id) {
137 let id = response.id;
138 if tx.send(response).is_err() {
139 warn!("Request {} receiver dropped", id);
140 }
141 } else {
142 debug!("Received response for unknown request {}", response.id);
144 }
145 }
146 Err(e) => {
147 debug!("Failed to parse LSP response: {} - body: {}", e, body);
149 }
150 }
151 }
152 Err(e) => {
153 error!("Failed to read LSP message body: {}", e);
154 return;
155 }
156 }
157 }
158 });
159
160 Ok(Self {
161 _child: child,
162 tx: write_tx,
163 pending,
164 request_id: AtomicI64::new(1),
165 initialized: std::sync::atomic::AtomicBool::new(false),
166 })
167 }
168
169 pub async fn request(
171 &self,
172 method: &str,
173 params: Option<serde_json::Value>,
174 ) -> Result<JsonRpcResponse> {
175 let id = self.request_id.fetch_add(1, Ordering::SeqCst);
176 let request = JsonRpcRequest::new(id, method, params);
177
178 let (tx, rx) = oneshot::channel();
179 self.pending.write().await.insert(id, tx);
180
181 let json = serde_json::to_string(&request)?;
182 self.tx.send(json).await?;
183
184 let response = tokio::time::timeout(std::time::Duration::from_secs(30), rx)
186 .await
187 .map_err(|_| anyhow::anyhow!("LSP request timeout for method: {}", method))?
188 .map_err(|_| anyhow::anyhow!("LSP response channel closed"))?;
189
190 Ok(response)
191 }
192
193 pub async fn notify(&self, method: &str, params: Option<serde_json::Value>) -> Result<()> {
195 let notification = JsonRpcNotification::new(method, params);
196 let json = serde_json::to_string(¬ification)?;
197 self.tx.send(json).await?;
198 Ok(())
199 }
200
201 pub fn is_initialized(&self) -> bool {
203 self.initialized.load(std::sync::atomic::Ordering::SeqCst)
204 }
205
206 pub fn set_initialized(&self, value: bool) {
208 self.initialized
209 .store(value, std::sync::atomic::Ordering::SeqCst);
210 }
211}
212
213impl Drop for LspTransport {
214 fn drop(&mut self) {
215 if self.is_initialized() {
216 tracing::debug!("LspTransport dropped while still initialized");
217 }
218 }
219}