Skip to main content

matrixcode_core/lsp/
transport.rs

1//! LSP Transport Layer
2//!
3//! 通过 stdio 与 LSP 服务器进程通信,处理 JSON-RPC 2.0 消息格式。
4
5use anyhow::{Result, anyhow};
6use std::sync::atomic::{AtomicU32, Ordering};
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
9use tokio::process::{Child, Command};
10use tokio::sync::Mutex;
11use tokio::time::{Duration, timeout};
12
13/// LSP 传输层
14pub struct LspTransport {
15    /// 子进程
16    process: Arc<Mutex<Option<Child>>>,
17    /// 写入端 (进程 stdin)
18    stdin: Arc<Mutex<Option<Box<dyn AsyncWrite + Unpin + Send>>>>,
19    /// 读取端 (进程 stdout)
20    stdout_reader: Arc<Mutex<Option<BufReader<Box<dyn AsyncRead + Unpin + Send>>>>>,
21    /// 请求 ID 计数器
22    request_id: AtomicU32,
23    /// 服务器名称(用于日志)
24    server_name: String,
25}
26
27impl LspTransport {
28    /// 启动 LSP 服务器进程
29    pub async fn spawn(
30        server_name: impl Into<String>,
31        command: &str,
32        args: &[String],
33    ) -> Result<Self> {
34        let server_name = server_name.into();
35
36        // Windows 兼容性处理
37        let (actual_command, actual_args) = if cfg!(target_os = "windows")
38            && (command == "npx" || command == "npm" || command == "node")
39        {
40            let mut full_args = vec!["/c".to_string(), command.to_string()];
41            full_args.extend(args.iter().cloned());
42            ("cmd.exe".to_string(), full_args)
43        } else {
44            (command.to_string(), args.to_vec())
45        };
46
47        let mut cmd = Command::new(&actual_command);
48        cmd.args(&actual_args)
49            .stdin(std::process::Stdio::piped())
50            .stdout(std::process::Stdio::piped())
51            .stderr(std::process::Stdio::piped())
52            .kill_on_drop(true);
53
54        let mut child = cmd.spawn().map_err(|e| {
55            anyhow!(
56                "Failed to spawn LSP server '{}': {} (command: {} {:?})",
57                server_name,
58                e,
59                actual_command,
60                actual_args
61            )
62        })?;
63
64        let stdin = child.stdin.take().map(|s| Box::new(s) as Box<dyn AsyncWrite + Unpin + Send>);
65        let stdout = child.stdout.take().map(|s| {
66            Box::new(s) as Box<dyn AsyncRead + Unpin + Send>
67        });
68
69        let stdout_reader = stdout.map(|s| BufReader::new(s));
70
71        log::info!(
72            "LSP server '{}' spawned successfully (pid: {:?})",
73            server_name,
74            child.id()
75        );
76
77        Ok(Self {
78            process: Arc::new(Mutex::new(Some(child))),
79            stdin: Arc::new(Mutex::new(stdin)),
80            stdout_reader: Arc::new(Mutex::new(stdout_reader)),
81            request_id: AtomicU32::new(1),
82            server_name,
83        })
84    }
85
86    /// 发送请求并等待响应
87    pub async fn send_request(
88        &self,
89        method: &str,
90        params: serde_json::Value,
91    ) -> Result<serde_json::Value> {
92        let id = self.request_id.fetch_add(1, Ordering::SeqCst);
93
94        let message = serde_json::json!({
95            "jsonrpc": "2.0",
96            "id": id,
97            "method": method,
98            "params": params
99        });
100
101        self.send_message(&message.to_string()).await?;
102        self.receive_response(id).await
103    }
104
105    /// 发送通知(无需响应)
106    pub async fn send_notification(
107        &self,
108        method: &str,
109        params: serde_json::Value,
110    ) -> Result<()> {
111        let message = serde_json::json!({
112            "jsonrpc": "2.0",
113            "method": method,
114            "params": params
115        });
116
117        self.send_message(&message.to_string()).await
118    }
119
120    /// 发送原始消息(带 Content-Length header)
121    async fn send_message(&self, content: &str) -> Result<()> {
122        let mut stdin = self.stdin.lock().await;
123        let stdin = stdin.as_mut().ok_or_else(|| anyhow!("stdin not available"))?;
124
125        let header = format!("Content-Length: {}\r\n\r\n", content.len());
126        stdin.write_all(header.as_bytes()).await?;
127        stdin.write_all(content.as_bytes()).await?;
128        stdin.flush().await?;
129
130        log::debug!("LSP '{}' sent: {}", self.server_name, content);
131        Ok(())
132    }
133
134    /// 接收响应
135    async fn receive_response(&self, expected_id: u32) -> Result<serde_json::Value> {
136        let timeout_duration = Duration::from_secs(30);
137
138        timeout(timeout_duration, async {
139            loop {
140                let message = self.receive_messages().await?;
141
142                // 解析消息
143                let json: serde_json::Value = serde_json::from_str(&message)?;
144
145                // 检查是否是我们要的响应
146                if let Some(id) = json.get("id").and_then(|i| i.as_u64()) {
147                    if id == expected_id as u64 {
148                        // 检查是否有错误
149                        if let Some(error) = json.get("error") {
150                            return Err(anyhow!("LSP error: {:?}", error));
151                        }
152                        return Ok(json.get("result").cloned().unwrap_or(serde_json::Value::Null));
153                    }
154                }
155
156                // 不是我们要的响应,继续等待(可能是 notification)
157                log::debug!("LSP '{}' received other message: {}", self.server_name, message);
158            }
159        }).await.map_err(|_| anyhow!("LSP request timeout after {}s", timeout_duration.as_secs()))?
160    }
161
162    /// 接收一条消息
163    pub async fn receive_messages(&self) -> Result<String> {
164        let mut reader = self.stdout_reader.lock().await;
165        let reader = reader.as_mut().ok_or_else(|| anyhow!("stdout not available"))?;
166
167        // 读取 Content-Length header
168        let mut header_line = String::new();
169        reader.read_line(&mut header_line).await?;
170
171        // 解析 Content-Length
172        let content_length: usize = header_line
173            .strip_prefix("Content-Length: ")
174            .and_then(|s| s.trim().parse().ok())
175            .ok_or_else(|| anyhow!("Invalid LSP header: {}", header_line))?;
176
177        // 读取空行
178        let mut empty_line = String::new();
179        reader.read_line(&mut empty_line).await?;
180
181        // 读取内容
182        let mut content = vec![0u8; content_length];
183        reader.read_exact(&mut content).await?;
184
185        let message = String::from_utf8(content)?;
186        log::debug!("LSP '{}' received: {}", self.server_name, message);
187
188        Ok(message)
189    }
190
191    /// 关闭连接
192    pub async fn close(&self) -> Result<()> {
193        let mut process = self.process.lock().await;
194        if let Some(mut child) = process.take() {
195            child.kill().await?;
196            log::info!("LSP server '{}' stopped", self.server_name);
197        }
198        Ok(())
199    }
200
201    /// 获取服务器名称
202    pub fn server_name(&self) -> &str {
203        &self.server_name
204    }
205}