1use std::collections::HashMap;
2use std::io::{self, BufReader, BufWriter};
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicI64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use crossbeam_channel::{bounded, RecvTimeoutError, Sender};
12use serde::de::DeserializeOwned;
13use serde_json::{json, Value};
14
15use crate::lsp::jsonrpc::{
16 Notification, Request, RequestId, Response as JsonRpcResponse, ServerMessage,
17};
18use crate::lsp::registry::ServerKind;
19use crate::lsp::{transport, LspError};
20
21const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
22const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
23const EXIT_POLL_INTERVAL: Duration = Duration::from_millis(25);
24
25type PendingMap = HashMap<RequestId, Sender<JsonRpcResponse>>;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum ServerState {
30 Starting,
31 Initializing,
32 Ready,
33 ShuttingDown,
34 Exited,
35}
36
37#[derive(Debug)]
39pub enum LspEvent {
40 Notification {
42 server_kind: ServerKind,
43 root: PathBuf,
44 method: String,
45 params: Option<Value>,
46 },
47 ServerRequest {
49 server_kind: ServerKind,
50 root: PathBuf,
51 id: RequestId,
52 method: String,
53 params: Option<Value>,
54 },
55 ServerExited {
57 server_kind: ServerKind,
58 root: PathBuf,
59 },
60}
61
62pub struct LspClient {
64 kind: ServerKind,
65 root: PathBuf,
66 state: ServerState,
67 child: Child,
68 writer: Arc<Mutex<BufWriter<std::process::ChildStdin>>>,
69
70 pending: Arc<Mutex<PendingMap>>,
72 next_id: AtomicI64,
74}
75
76impl LspClient {
77 pub fn spawn(
79 kind: ServerKind,
80 root: PathBuf,
81 binary: &Path,
82 args: &[&str],
83 event_tx: Sender<LspEvent>,
84 ) -> io::Result<Self> {
85 let mut child = Command::new(binary)
86 .args(args)
87 .current_dir(&root)
88 .stdin(Stdio::piped())
89 .stdout(Stdio::piped())
90 .stderr(Stdio::null())
93 .spawn()?;
94
95 let stdout = child
96 .stdout
97 .take()
98 .ok_or_else(|| io::Error::other("language server missing stdout pipe"))?;
99 let stdin = child
100 .stdin
101 .take()
102 .ok_or_else(|| io::Error::other("language server missing stdin pipe"))?;
103
104 let writer = Arc::new(Mutex::new(BufWriter::new(stdin)));
105 let pending = Arc::new(Mutex::new(PendingMap::new()));
106 let reader_pending = Arc::clone(&pending);
107 let reader_writer = Arc::clone(&writer);
108 let reader_kind = kind;
109 let reader_root = root.clone();
110
111 thread::spawn(move || {
112 let mut reader = BufReader::new(stdout);
113 loop {
114 match transport::read_message(&mut reader) {
115 Ok(Some(ServerMessage::Response(response))) => {
116 if let Ok(mut guard) = reader_pending.lock() {
117 if let Some(tx) = guard.remove(&response.id) {
118 if tx.send(response).is_err() {
119 log::debug!("[aft-lsp] response channel closed");
120 }
121 }
122 } else {
123 let _ = event_tx.send(LspEvent::ServerExited {
124 server_kind: reader_kind,
125 root: reader_root.clone(),
126 });
127 break;
128 }
129 }
130 Ok(Some(ServerMessage::Notification { method, params })) => {
131 let _ = event_tx.send(LspEvent::Notification {
132 server_kind: reader_kind,
133 root: reader_root.clone(),
134 method,
135 params,
136 });
137 }
138 Ok(Some(ServerMessage::Request { id, method, params })) => {
139 let response_value = if method == "workspace/configuration" {
149 let item_count = params
152 .as_ref()
153 .and_then(|p| p.get("items"))
154 .and_then(|items| items.as_array())
155 .map_or(1, |arr| arr.len());
156 serde_json::Value::Array(vec![serde_json::Value::Null; item_count])
157 } else {
158 serde_json::Value::Null
159 };
160 if let Ok(mut w) = reader_writer.lock() {
161 let response = super::jsonrpc::OutgoingResponse::success(
162 id.clone(),
163 response_value,
164 );
165 let _ = transport::write_response(&mut *w, &response);
166 }
167 let _ = event_tx.send(LspEvent::ServerRequest {
169 server_kind: reader_kind,
170 root: reader_root.clone(),
171 id,
172 method,
173 params,
174 });
175 }
176 Ok(None) | Err(_) => {
177 if let Ok(mut guard) = reader_pending.lock() {
178 guard.clear();
179 }
180 let _ = event_tx.send(LspEvent::ServerExited {
181 server_kind: reader_kind,
182 root: reader_root.clone(),
183 });
184 break;
185 }
186 }
187 }
188 });
189
190 Ok(Self {
191 kind,
192 root,
193 state: ServerState::Starting,
194 child,
195 writer,
196 pending,
197 next_id: AtomicI64::new(1),
198 })
199 }
200
201 pub fn initialize(
203 &mut self,
204 workspace_root: &Path,
205 ) -> Result<lsp_types::InitializeResult, LspError> {
206 self.ensure_can_send()?;
207 self.state = ServerState::Initializing;
208
209 let normalized = normalize_windows_path(workspace_root);
210 let root_url = url::Url::from_file_path(&normalized).map_err(|_| {
211 LspError::NotFound(format!(
212 "failed to convert workspace root '{}' to file URI",
213 workspace_root.display()
214 ))
215 })?;
216 let root_uri = lsp_types::Uri::from_str(root_url.as_str()).map_err(|_| {
217 LspError::NotFound(format!(
218 "failed to convert workspace root '{}' to file URI",
219 workspace_root.display()
220 ))
221 })?;
222
223 let params = serde_json::from_value::<lsp_types::InitializeParams>(json!({
224 "processId": std::process::id(),
225 "rootUri": root_uri,
226 "capabilities": {
227 "workspace": {
228 "workspaceFolders": true,
229 "configuration": true
230 },
231 "textDocument": {
232 "synchronization": {
233 "dynamicRegistration": false,
234 "didSave": true,
235 "willSave": false,
236 "willSaveWaitUntil": false
237 },
238 "publishDiagnostics": {
239 "relatedInformation": true,
240 "versionSupport": true,
241 "codeDescriptionSupport": true,
242 "dataSupport": true
243 }
244 }
245 },
246 "clientInfo": {
247 "name": "aft",
248 "version": env!("CARGO_PKG_VERSION")
249 },
250 "workspaceFolders": [
251 {
252 "uri": root_uri,
253 "name": workspace_root
254 .file_name()
255 .and_then(|name| name.to_str())
256 .unwrap_or("workspace")
257 }
258 ]
259 }))?;
260
261 let result = self.send_request::<lsp_types::request::Initialize>(params)?;
262 self.send_notification::<lsp_types::notification::Initialized>(serde_json::from_value(
263 json!({}),
264 )?)?;
265 self.state = ServerState::Ready;
266 Ok(result)
267 }
268
269 pub fn send_request<R>(&mut self, params: R::Params) -> Result<R::Result, LspError>
271 where
272 R: lsp_types::request::Request,
273 R::Params: serde::Serialize,
274 R::Result: DeserializeOwned,
275 {
276 self.ensure_can_send()?;
277
278 let id = RequestId::Int(self.next_id.fetch_add(1, Ordering::Relaxed));
279 let (tx, rx) = bounded(1);
280 {
281 let mut pending = self.lock_pending()?;
282 pending.insert(id.clone(), tx);
283 }
284
285 let request = Request::new(id.clone(), R::METHOD, Some(serde_json::to_value(params)?));
286 {
287 let mut writer = self
288 .writer
289 .lock()
290 .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
291 if let Err(err) = transport::write_request(&mut *writer, &request) {
292 self.remove_pending(&id);
293 return Err(err.into());
294 }
295 }
296
297 let response = match rx.recv_timeout(REQUEST_TIMEOUT) {
298 Ok(response) => response,
299 Err(RecvTimeoutError::Timeout) => {
300 self.remove_pending(&id);
301 return Err(LspError::Timeout(format!(
302 "timed out waiting for '{}' response from {:?}",
303 R::METHOD,
304 self.kind
305 )));
306 }
307 Err(RecvTimeoutError::Disconnected) => {
308 self.remove_pending(&id);
309 return Err(LspError::ServerNotReady(format!(
310 "language server {:?} disconnected while waiting for '{}'",
311 self.kind,
312 R::METHOD
313 )));
314 }
315 };
316
317 if let Some(error) = response.error {
318 return Err(LspError::ServerError {
319 code: error.code,
320 message: error.message,
321 });
322 }
323
324 serde_json::from_value(response.result.unwrap_or(Value::Null)).map_err(Into::into)
325 }
326
327 pub fn send_notification<N>(&mut self, params: N::Params) -> Result<(), LspError>
329 where
330 N: lsp_types::notification::Notification,
331 N::Params: serde::Serialize,
332 {
333 self.ensure_can_send()?;
334 let notification = Notification::new(N::METHOD, Some(serde_json::to_value(params)?));
335 let mut writer = self
336 .writer
337 .lock()
338 .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
339 transport::write_notification(&mut *writer, ¬ification)?;
340 Ok(())
341 }
342
343 pub fn shutdown(&mut self) -> Result<(), LspError> {
345 if self.state == ServerState::Exited {
346 return Ok(());
347 }
348
349 if self.child.try_wait()?.is_some() {
350 self.state = ServerState::Exited;
351 return Ok(());
352 }
353
354 if let Err(err) = self.send_request::<lsp_types::request::Shutdown>(()) {
355 self.state = ServerState::ShuttingDown;
356 if self.child.try_wait()?.is_some() {
357 self.state = ServerState::Exited;
358 return Ok(());
359 }
360 return Err(err);
361 }
362
363 self.state = ServerState::ShuttingDown;
364
365 if let Err(err) = self.send_notification::<lsp_types::notification::Exit>(()) {
366 if self.child.try_wait()?.is_some() {
367 self.state = ServerState::Exited;
368 return Ok(());
369 }
370 return Err(err);
371 }
372
373 let deadline = Instant::now() + SHUTDOWN_TIMEOUT;
374 loop {
375 if self.child.try_wait()?.is_some() {
376 self.state = ServerState::Exited;
377 return Ok(());
378 }
379 if Instant::now() >= deadline {
380 return Err(LspError::Timeout(format!(
381 "timed out waiting for {:?} to exit",
382 self.kind
383 )));
384 }
385 thread::sleep(EXIT_POLL_INTERVAL);
386 }
387 }
388
389 pub fn state(&self) -> ServerState {
390 self.state
391 }
392
393 pub fn kind(&self) -> ServerKind {
394 self.kind
395 }
396
397 pub fn root(&self) -> &Path {
398 &self.root
399 }
400
401 fn ensure_can_send(&self) -> Result<(), LspError> {
402 if matches!(self.state, ServerState::ShuttingDown | ServerState::Exited) {
403 return Err(LspError::ServerNotReady(format!(
404 "language server {:?} is not ready (state: {:?})",
405 self.kind, self.state
406 )));
407 }
408 Ok(())
409 }
410
411 fn lock_pending(&self) -> Result<std::sync::MutexGuard<'_, PendingMap>, LspError> {
412 self.pending
413 .lock()
414 .map_err(|_| io::Error::other("pending response map poisoned").into())
415 }
416
417 fn remove_pending(&self, id: &RequestId) {
418 if let Ok(mut pending) = self.pending.lock() {
419 pending.remove(id);
420 }
421 }
422}
423
424fn normalize_windows_path(path: &Path) -> PathBuf {
428 let s = path.to_string_lossy();
429 if s.starts_with(r"\\?\") {
430 PathBuf::from(&s[4..])
431 } else {
432 path.to_path_buf()
433 }
434}