Skip to main content

matrixcode_core/lsp/
transport.rs

1//! LSP Transport Layer
2//!
3//! 通过 stdio 与 LSP 服务器进程通信,处理 JSON-RPC 2.0 消息格式。
4
5use anyhow::{Result, anyhow};
6use std::sync::atomic::{AtomicU32, Ordering};
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
9use tokio::process::{Child, Command};
10use tokio::sync::Mutex;
11use tokio::time::{Duration, timeout};
12
13/// LSP 传输层
14pub struct LspTransport {
15    /// 子进程
16    process: Arc<Mutex<Option<Child>>>,
17    /// 写入端 (进程 stdin)
18    stdin: Arc<Mutex<Option<Box<dyn AsyncWrite + Unpin + Send>>>>,
19    /// 读取端 (进程 stdout)
20    stdout_reader: Arc<Mutex<Option<BufReader<Box<dyn AsyncRead + Unpin + Send>>>>>,
21    /// 请求 ID 计数器
22    request_id: AtomicU32,
23    /// 服务器名称(用于日志)
24    server_name: String,
25}
26
27impl LspTransport {
28    /// 启动 LSP 服务器进程
29    pub async fn spawn(
30        server_name: impl Into<String>,
31        command: &str,
32        args: &[String],
33    ) -> Result<Self> {
34        let server_name = server_name.into();
35
36        // Windows 兼容性处理
37        let (actual_command, actual_args) = if cfg!(target_os = "windows")
38            && (command == "npx" || command == "npm" || command == "node")
39        {
40            let mut full_args = vec!["/c".to_string(), command.to_string()];
41            full_args.extend(args.iter().cloned());
42            ("cmd.exe".to_string(), full_args)
43        } else {
44            (command.to_string(), args.to_vec())
45        };
46
47        let mut cmd = Command::new(&actual_command);
48        cmd.args(&actual_args)
49            .stdin(std::process::Stdio::piped())
50            .stdout(std::process::Stdio::piped())
51            .stderr(std::process::Stdio::piped())
52            .kill_on_drop(true);
53
54        let mut child = cmd.spawn().map_err(|e| {
55            anyhow!(
56                "Failed to spawn LSP server '{}': {} (command: {} {:?})",
57                server_name,
58                e,
59                actual_command,
60                actual_args
61            )
62        })?;
63
64        let stdin = child.stdin.take().map(|s| Box::new(s) as Box<dyn AsyncWrite + Unpin + Send>);
65        let stdout = child.stdout.take().map(|s| {
66            Box::new(s) as Box<dyn AsyncRead + Unpin + Send>
67        });
68
69        let stdout_reader = stdout.map(|s| BufReader::new(s));
70
71        // Take stderr and spawn background task to continuously read it
72        // This prevents stderr buffer overflow which would block the LSP process
73        let stderr = child.stderr.take().map(|s| {
74            Box::new(s) as Box<dyn AsyncRead + Unpin + Send>
75        });
76
77        if let Some(stderr) = stderr {
78            let server_name_clone = server_name.clone();
79            let stderr_reader = BufReader::new(stderr).lines();
80
81            tokio::spawn(async move {
82                let mut lines = stderr_reader;
83
84                while let Ok(Some(line)) = lines.next_line().await {
85                    // Determine log level based on content
86                    let line_lower = line.to_lowercase();
87                    if line_lower.contains("error") || line_lower.contains("fatal") {
88                        log::error!("LSP '{}' stderr: {}", server_name_clone, line);
89                    } else if line_lower.contains("warn") || line_lower.contains("warning") {
90                        log::warn!("LSP '{}' stderr: {}", server_name_clone, line);
91                    } else {
92                        log::debug!("LSP '{}' stderr: {}", server_name_clone, line);
93                    }
94                }
95
96                log::info!("LSP '{}' stderr stream ended", server_name_clone);
97            });
98        }
99
100        log::info!(
101            "LSP server '{}' spawned successfully (pid: {:?})",
102            server_name,
103            child.id()
104        );
105
106        Ok(Self {
107            process: Arc::new(Mutex::new(Some(child))),
108            stdin: Arc::new(Mutex::new(stdin)),
109            stdout_reader: Arc::new(Mutex::new(stdout_reader)),
110            request_id: AtomicU32::new(1),
111            server_name,
112        })
113    }
114
115    /// 发送请求并等待响应
116    pub async fn send_request(
117        &self,
118        method: &str,
119        params: serde_json::Value,
120    ) -> Result<serde_json::Value> {
121        let id = self.request_id.fetch_add(1, Ordering::SeqCst);
122
123        let message = serde_json::json!({
124            "jsonrpc": "2.0",
125            "id": id,
126            "method": method,
127            "params": params
128        });
129
130        self.send_message(&message.to_string()).await?;
131        self.receive_response(id).await
132    }
133
134    /// 发送通知(无需响应)
135    pub async fn send_notification(
136        &self,
137        method: &str,
138        params: serde_json::Value,
139    ) -> Result<()> {
140        let message = serde_json::json!({
141            "jsonrpc": "2.0",
142            "method": method,
143            "params": params
144        });
145
146        self.send_message(&message.to_string()).await
147    }
148
149    /// 发送原始消息(带 Content-Length header)
150    async fn send_message(&self, content: &str) -> Result<()> {
151        let mut stdin = self.stdin.lock().await;
152        let stdin = stdin.as_mut().ok_or_else(|| anyhow!("stdin not available"))?;
153
154        let header = format!("Content-Length: {}\r\n\r\n", content.len());
155        stdin.write_all(header.as_bytes()).await?;
156        stdin.write_all(content.as_bytes()).await?;
157        stdin.flush().await?;
158
159        log::debug!("LSP '{}' sent: {}", self.server_name, content);
160        Ok(())
161    }
162
163    /// 接收响应
164    async fn receive_response(&self, expected_id: u32) -> Result<serde_json::Value> {
165        let timeout_duration = Duration::from_secs(30);
166
167        timeout(timeout_duration, async {
168            loop {
169                let message = self.receive_messages().await?;
170
171                // 解析消息
172                let json: serde_json::Value = serde_json::from_str(&message)?;
173
174                // 检查是否是我们要的响应
175                if let Some(id) = json.get("id").and_then(|i| i.as_u64()) {
176                    if id == expected_id as u64 {
177                        // 检查是否有错误
178                        if let Some(error) = json.get("error") {
179                            return Err(anyhow!("LSP error: {:?}", error));
180                        }
181                        return Ok(json.get("result").cloned().unwrap_or(serde_json::Value::Null));
182                    }
183                }
184
185                // 不是我们要的响应,继续等待(可能是 notification)
186                log::debug!("LSP '{}' received other message: {}", self.server_name, message);
187            }
188        }).await.map_err(|_| anyhow!("LSP request timeout after {}s", timeout_duration.as_secs()))?
189    }
190
191    /// 接收一条消息
192    pub async fn receive_messages(&self) -> Result<String> {
193        let mut reader = self.stdout_reader.lock().await;
194        let reader = reader.as_mut().ok_or_else(|| anyhow!("stdout not available"))?;
195
196        // 读取 Content-Length header
197        let mut header_line = String::new();
198        reader.read_line(&mut header_line).await?;
199
200        // 解析 Content-Length
201        let content_length: usize = header_line
202            .strip_prefix("Content-Length: ")
203            .and_then(|s| s.trim().parse().ok())
204            .ok_or_else(|| anyhow!("Invalid LSP header: {}", header_line))?;
205
206        // 读取空行
207        let mut empty_line = String::new();
208        reader.read_line(&mut empty_line).await?;
209
210        // 读取内容
211        let mut content = vec![0u8; content_length];
212        reader.read_exact(&mut content).await?;
213
214        let message = String::from_utf8(content)?;
215        log::debug!("LSP '{}' received: {}", self.server_name, message);
216
217        Ok(message)
218    }
219
220    /// 关闭连接
221    pub async fn close(&self) -> Result<()> {
222        let mut process = self.process.lock().await;
223        if let Some(mut child) = process.take() {
224            child.kill().await?;
225            log::info!("LSP server '{}' stopped", self.server_name);
226        }
227        Ok(())
228    }
229
230    /// 获取服务器名称
231    pub fn server_name(&self) -> &str {
232        &self.server_name
233    }
234}