matrixcode_core/lsp/
transport.rs1use anyhow::{Result, anyhow};
6use std::sync::atomic::{AtomicU32, Ordering};
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
9use tokio::process::{Child, Command};
10use tokio::sync::Mutex;
11use tokio::time::{Duration, timeout};
12
13pub struct LspTransport {
15 process: Arc<Mutex<Option<Child>>>,
17 stdin: Arc<Mutex<Option<Box<dyn AsyncWrite + Unpin + Send>>>>,
19 stdout_reader: Arc<Mutex<Option<BufReader<Box<dyn AsyncRead + Unpin + Send>>>>>,
21 request_id: AtomicU32,
23 server_name: String,
25}
26
27impl LspTransport {
28 pub async fn spawn(
30 server_name: impl Into<String>,
31 command: &str,
32 args: &[String],
33 ) -> Result<Self> {
34 let server_name = server_name.into();
35
36 let (actual_command, actual_args) = if cfg!(target_os = "windows")
38 && (command == "npx" || command == "npm" || command == "node")
39 {
40 let mut full_args = vec!["/c".to_string(), command.to_string()];
41 full_args.extend(args.iter().cloned());
42 ("cmd.exe".to_string(), full_args)
43 } else {
44 (command.to_string(), args.to_vec())
45 };
46
47 let mut cmd = Command::new(&actual_command);
48 cmd.args(&actual_args)
49 .stdin(std::process::Stdio::piped())
50 .stdout(std::process::Stdio::piped())
51 .stderr(std::process::Stdio::piped())
52 .kill_on_drop(true);
53
54 let mut child = cmd.spawn().map_err(|e| {
55 anyhow!(
56 "Failed to spawn LSP server '{}': {} (command: {} {:?})",
57 server_name,
58 e,
59 actual_command,
60 actual_args
61 )
62 })?;
63
64 let stdin = child.stdin.take().map(|s| Box::new(s) as Box<dyn AsyncWrite + Unpin + Send>);
65 let stdout = child.stdout.take().map(|s| {
66 Box::new(s) as Box<dyn AsyncRead + Unpin + Send>
67 });
68
69 let stdout_reader = stdout.map(|s| BufReader::new(s));
70
71 let stderr = child.stderr.take().map(|s| {
74 Box::new(s) as Box<dyn AsyncRead + Unpin + Send>
75 });
76
77 if let Some(stderr) = stderr {
78 let server_name_clone = server_name.clone();
79 let stderr_reader = BufReader::new(stderr).lines();
80
81 tokio::spawn(async move {
82 let mut lines = stderr_reader;
83
84 while let Ok(Some(line)) = lines.next_line().await {
85 let line_lower = line.to_lowercase();
87 if line_lower.contains("error") || line_lower.contains("fatal") {
88 log::error!("LSP '{}' stderr: {}", server_name_clone, line);
89 } else if line_lower.contains("warn") || line_lower.contains("warning") {
90 log::warn!("LSP '{}' stderr: {}", server_name_clone, line);
91 } else {
92 log::debug!("LSP '{}' stderr: {}", server_name_clone, line);
93 }
94 }
95
96 log::info!("LSP '{}' stderr stream ended", server_name_clone);
97 });
98 }
99
100 log::info!(
101 "LSP server '{}' spawned successfully (pid: {:?})",
102 server_name,
103 child.id()
104 );
105
106 Ok(Self {
107 process: Arc::new(Mutex::new(Some(child))),
108 stdin: Arc::new(Mutex::new(stdin)),
109 stdout_reader: Arc::new(Mutex::new(stdout_reader)),
110 request_id: AtomicU32::new(1),
111 server_name,
112 })
113 }
114
115 pub async fn send_request(
117 &self,
118 method: &str,
119 params: serde_json::Value,
120 ) -> Result<serde_json::Value> {
121 let id = self.request_id.fetch_add(1, Ordering::SeqCst);
122
123 let message = serde_json::json!({
124 "jsonrpc": "2.0",
125 "id": id,
126 "method": method,
127 "params": params
128 });
129
130 self.send_message(&message.to_string()).await?;
131 self.receive_response(id).await
132 }
133
134 pub async fn send_notification(
136 &self,
137 method: &str,
138 params: serde_json::Value,
139 ) -> Result<()> {
140 let message = serde_json::json!({
141 "jsonrpc": "2.0",
142 "method": method,
143 "params": params
144 });
145
146 self.send_message(&message.to_string()).await
147 }
148
149 async fn send_message(&self, content: &str) -> Result<()> {
151 let mut stdin = self.stdin.lock().await;
152 let stdin = stdin.as_mut().ok_or_else(|| anyhow!("stdin not available"))?;
153
154 let header = format!("Content-Length: {}\r\n\r\n", content.len());
155 stdin.write_all(header.as_bytes()).await?;
156 stdin.write_all(content.as_bytes()).await?;
157 stdin.flush().await?;
158
159 log::debug!("LSP '{}' sent: {}", self.server_name, content);
160 Ok(())
161 }
162
163 async fn receive_response(&self, expected_id: u32) -> Result<serde_json::Value> {
165 let timeout_duration = Duration::from_secs(30);
166
167 timeout(timeout_duration, async {
168 loop {
169 let message = self.receive_messages().await?;
170
171 let json: serde_json::Value = serde_json::from_str(&message)?;
173
174 if let Some(id) = json.get("id").and_then(|i| i.as_u64()) {
176 if id == expected_id as u64 {
177 if let Some(error) = json.get("error") {
179 return Err(anyhow!("LSP error: {:?}", error));
180 }
181 return Ok(json.get("result").cloned().unwrap_or(serde_json::Value::Null));
182 }
183 }
184
185 log::debug!("LSP '{}' received other message: {}", self.server_name, message);
187 }
188 }).await.map_err(|_| anyhow!("LSP request timeout after {}s", timeout_duration.as_secs()))?
189 }
190
191 pub async fn receive_messages(&self) -> Result<String> {
193 let mut reader = self.stdout_reader.lock().await;
194 let reader = reader.as_mut().ok_or_else(|| anyhow!("stdout not available"))?;
195
196 let mut header_line = String::new();
198 reader.read_line(&mut header_line).await?;
199
200 let content_length: usize = header_line
202 .strip_prefix("Content-Length: ")
203 .and_then(|s| s.trim().parse().ok())
204 .ok_or_else(|| anyhow!("Invalid LSP header: {}", header_line))?;
205
206 let mut empty_line = String::new();
208 reader.read_line(&mut empty_line).await?;
209
210 let mut content = vec![0u8; content_length];
212 reader.read_exact(&mut content).await?;
213
214 let message = String::from_utf8(content)?;
215 log::debug!("LSP '{}' received: {}", self.server_name, message);
216
217 Ok(message)
218 }
219
220 pub async fn close(&self) -> Result<()> {
222 let mut process = self.process.lock().await;
223 if let Some(mut child) = process.take() {
224 child.kill().await?;
225 log::info!("LSP server '{}' stopped", self.server_name);
226 }
227 Ok(())
228 }
229
230 pub fn server_name(&self) -> &str {
232 &self.server_name
233 }
234}