matrixcode_core/lsp/
transport.rs1use anyhow::{Result, anyhow};
6use std::sync::atomic::{AtomicU32, Ordering};
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
9use tokio::process::{Child, Command};
10use tokio::sync::Mutex;
11use tokio::time::{Duration, timeout};
12
13pub struct LspTransport {
15 process: Arc<Mutex<Option<Child>>>,
17 stdin: Arc<Mutex<Option<Box<dyn AsyncWrite + Unpin + Send>>>>,
19 stdout_reader: Arc<Mutex<Option<BufReader<Box<dyn AsyncRead + Unpin + Send>>>>>,
21 request_id: AtomicU32,
23 server_name: String,
25}
26
27impl LspTransport {
28 pub async fn spawn(
30 server_name: impl Into<String>,
31 command: &str,
32 args: &[String],
33 ) -> Result<Self> {
34 let server_name = server_name.into();
35
36 let (actual_command, actual_args) = if cfg!(target_os = "windows")
38 && (command == "npx" || command == "npm" || command == "node")
39 {
40 let mut full_args = vec!["/c".to_string(), command.to_string()];
41 full_args.extend(args.iter().cloned());
42 ("cmd.exe".to_string(), full_args)
43 } else {
44 (command.to_string(), args.to_vec())
45 };
46
47 let mut cmd = Command::new(&actual_command);
48 cmd.args(&actual_args)
49 .stdin(std::process::Stdio::piped())
50 .stdout(std::process::Stdio::piped())
51 .stderr(std::process::Stdio::piped())
52 .kill_on_drop(true);
53
54 let mut child = cmd.spawn().map_err(|e| {
55 anyhow!(
56 "Failed to spawn LSP server '{}': {} (command: {} {:?})",
57 server_name,
58 e,
59 actual_command,
60 actual_args
61 )
62 })?;
63
64 let stdin = child.stdin.take().map(|s| Box::new(s) as Box<dyn AsyncWrite + Unpin + Send>);
65 let stdout = child.stdout.take().map(|s| {
66 Box::new(s) as Box<dyn AsyncRead + Unpin + Send>
67 });
68
69 let stdout_reader = stdout.map(|s| BufReader::new(s));
70
71 log::info!(
72 "LSP server '{}' spawned successfully (pid: {:?})",
73 server_name,
74 child.id()
75 );
76
77 Ok(Self {
78 process: Arc::new(Mutex::new(Some(child))),
79 stdin: Arc::new(Mutex::new(stdin)),
80 stdout_reader: Arc::new(Mutex::new(stdout_reader)),
81 request_id: AtomicU32::new(1),
82 server_name,
83 })
84 }
85
86 pub async fn send_request(
88 &self,
89 method: &str,
90 params: serde_json::Value,
91 ) -> Result<serde_json::Value> {
92 let id = self.request_id.fetch_add(1, Ordering::SeqCst);
93
94 let message = serde_json::json!({
95 "jsonrpc": "2.0",
96 "id": id,
97 "method": method,
98 "params": params
99 });
100
101 self.send_message(&message.to_string()).await?;
102 self.receive_response(id).await
103 }
104
105 pub async fn send_notification(
107 &self,
108 method: &str,
109 params: serde_json::Value,
110 ) -> Result<()> {
111 let message = serde_json::json!({
112 "jsonrpc": "2.0",
113 "method": method,
114 "params": params
115 });
116
117 self.send_message(&message.to_string()).await
118 }
119
120 async fn send_message(&self, content: &str) -> Result<()> {
122 let mut stdin = self.stdin.lock().await;
123 let stdin = stdin.as_mut().ok_or_else(|| anyhow!("stdin not available"))?;
124
125 let header = format!("Content-Length: {}\r\n\r\n", content.len());
126 stdin.write_all(header.as_bytes()).await?;
127 stdin.write_all(content.as_bytes()).await?;
128 stdin.flush().await?;
129
130 log::debug!("LSP '{}' sent: {}", self.server_name, content);
131 Ok(())
132 }
133
134 async fn receive_response(&self, expected_id: u32) -> Result<serde_json::Value> {
136 let timeout_duration = Duration::from_secs(30);
137
138 timeout(timeout_duration, async {
139 loop {
140 let message = self.receive_messages().await?;
141
142 let json: serde_json::Value = serde_json::from_str(&message)?;
144
145 if let Some(id) = json.get("id").and_then(|i| i.as_u64()) {
147 if id == expected_id as u64 {
148 if let Some(error) = json.get("error") {
150 return Err(anyhow!("LSP error: {:?}", error));
151 }
152 return Ok(json.get("result").cloned().unwrap_or(serde_json::Value::Null));
153 }
154 }
155
156 log::debug!("LSP '{}' received other message: {}", self.server_name, message);
158 }
159 }).await.map_err(|_| anyhow!("LSP request timeout after {}s", timeout_duration.as_secs()))?
160 }
161
162 pub async fn receive_messages(&self) -> Result<String> {
164 let mut reader = self.stdout_reader.lock().await;
165 let reader = reader.as_mut().ok_or_else(|| anyhow!("stdout not available"))?;
166
167 let mut header_line = String::new();
169 reader.read_line(&mut header_line).await?;
170
171 let content_length: usize = header_line
173 .strip_prefix("Content-Length: ")
174 .and_then(|s| s.trim().parse().ok())
175 .ok_or_else(|| anyhow!("Invalid LSP header: {}", header_line))?;
176
177 let mut empty_line = String::new();
179 reader.read_line(&mut empty_line).await?;
180
181 let mut content = vec![0u8; content_length];
183 reader.read_exact(&mut content).await?;
184
185 let message = String::from_utf8(content)?;
186 log::debug!("LSP '{}' received: {}", self.server_name, message);
187
188 Ok(message)
189 }
190
191 pub async fn close(&self) -> Result<()> {
193 let mut process = self.process.lock().await;
194 if let Some(mut child) = process.take() {
195 child.kill().await?;
196 log::info!("LSP server '{}' stopped", self.server_name);
197 }
198 Ok(())
199 }
200
201 pub fn server_name(&self) -> &str {
203 &self.server_name
204 }
205}