1use std::collections::HashMap;
2use std::io::{self, BufReader, BufWriter};
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicI64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use crossbeam_channel::{bounded, RecvTimeoutError, Sender};
12use serde::de::DeserializeOwned;
13use serde_json::{json, Value};
14
15use crate::lsp::jsonrpc::{
16 Notification, Request, RequestId, Response as JsonRpcResponse, ServerMessage,
17};
18use crate::lsp::registry::ServerKind;
19use crate::lsp::{transport, LspError};
20
21const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
22const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
23const EXIT_POLL_INTERVAL: Duration = Duration::from_millis(25);
24
25type PendingMap = HashMap<RequestId, Sender<JsonRpcResponse>>;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum ServerState {
30 Starting,
31 Initializing,
32 Ready,
33 ShuttingDown,
34 Exited,
35}
36
37#[derive(Debug)]
39pub enum LspEvent {
40 Notification {
42 server_kind: ServerKind,
43 root: PathBuf,
44 method: String,
45 params: Option<Value>,
46 },
47 ServerRequest {
49 server_kind: ServerKind,
50 root: PathBuf,
51 id: RequestId,
52 method: String,
53 params: Option<Value>,
54 },
55 ServerExited {
57 server_kind: ServerKind,
58 root: PathBuf,
59 },
60}
61
62pub struct LspClient {
64 kind: ServerKind,
65 root: PathBuf,
66 state: ServerState,
67 child: Child,
68 writer: Arc<Mutex<BufWriter<std::process::ChildStdin>>>,
69
70 pending: Arc<Mutex<PendingMap>>,
72 next_id: AtomicI64,
74}
75
76impl LspClient {
77 pub fn spawn(
79 kind: ServerKind,
80 root: PathBuf,
81 binary: &Path,
82 args: &[&str],
83 event_tx: Sender<LspEvent>,
84 ) -> io::Result<Self> {
85 let mut child = Command::new(binary)
86 .args(args)
87 .current_dir(&root)
88 .stdin(Stdio::piped())
89 .stdout(Stdio::piped())
90 .stderr(Stdio::piped())
91 .spawn()?;
92
93 let stdout = child
94 .stdout
95 .take()
96 .ok_or_else(|| io::Error::other("language server missing stdout pipe"))?;
97 let stdin = child
98 .stdin
99 .take()
100 .ok_or_else(|| io::Error::other("language server missing stdin pipe"))?;
101
102 let writer = Arc::new(Mutex::new(BufWriter::new(stdin)));
103 let pending = Arc::new(Mutex::new(PendingMap::new()));
104 let reader_pending = Arc::clone(&pending);
105 let reader_writer = Arc::clone(&writer);
106 let reader_kind = kind;
107 let reader_root = root.clone();
108
109 thread::spawn(move || {
110 let mut reader = BufReader::new(stdout);
111 loop {
112 match transport::read_message(&mut reader) {
113 Ok(Some(ServerMessage::Response(response))) => {
114 if let Ok(mut guard) = reader_pending.lock() {
115 if let Some(tx) = guard.remove(&response.id) {
116 if tx.send(response).is_err() {
117 log::debug!("[aft-lsp] response channel closed");
118 }
119 }
120 } else {
121 let _ = event_tx.send(LspEvent::ServerExited {
122 server_kind: reader_kind,
123 root: reader_root.clone(),
124 });
125 break;
126 }
127 }
128 Ok(Some(ServerMessage::Notification { method, params })) => {
129 let _ = event_tx.send(LspEvent::Notification {
130 server_kind: reader_kind,
131 root: reader_root.clone(),
132 method,
133 params,
134 });
135 }
136 Ok(Some(ServerMessage::Request { id, method, params })) => {
137 if let Ok(mut w) = reader_writer.lock() {
143 let response = super::jsonrpc::OutgoingResponse::success(
144 id.clone(),
145 serde_json::Value::Null,
146 );
147 let _ = transport::write_response(&mut *w, &response);
148 }
149 let _ = event_tx.send(LspEvent::ServerRequest {
151 server_kind: reader_kind,
152 root: reader_root.clone(),
153 id,
154 method,
155 params,
156 });
157 }
158 Ok(None) | Err(_) => {
159 if let Ok(mut guard) = reader_pending.lock() {
160 guard.clear();
161 }
162 let _ = event_tx.send(LspEvent::ServerExited {
163 server_kind: reader_kind,
164 root: reader_root.clone(),
165 });
166 break;
167 }
168 }
169 }
170 });
171
172 Ok(Self {
173 kind,
174 root,
175 state: ServerState::Starting,
176 child,
177 writer,
178 pending,
179 next_id: AtomicI64::new(1),
180 })
181 }
182
183 pub fn initialize(
185 &mut self,
186 workspace_root: &Path,
187 ) -> Result<lsp_types::InitializeResult, LspError> {
188 self.ensure_can_send()?;
189 self.state = ServerState::Initializing;
190
191 let root_uri = lsp_types::Uri::from_str(&format!("file://{}", workspace_root.display()))
192 .map_err(|_| {
193 LspError::NotFound(format!(
194 "failed to convert workspace root '{}' to file URI",
195 workspace_root.display()
196 ))
197 })?;
198
199 let params = serde_json::from_value::<lsp_types::InitializeParams>(json!({
200 "processId": std::process::id(),
201 "rootUri": root_uri,
202 "capabilities": {
203 "workspace": {
204 "workspaceFolders": true,
205 "configuration": true
206 },
207 "textDocument": {
208 "synchronization": {
209 "dynamicRegistration": false,
210 "didSave": true,
211 "willSave": false,
212 "willSaveWaitUntil": false
213 },
214 "publishDiagnostics": {
215 "relatedInformation": true,
216 "versionSupport": true,
217 "codeDescriptionSupport": true,
218 "dataSupport": true
219 }
220 }
221 },
222 "clientInfo": {
223 "name": "aft",
224 "version": env!("CARGO_PKG_VERSION")
225 },
226 "workspaceFolders": [
227 {
228 "uri": root_uri,
229 "name": workspace_root
230 .file_name()
231 .and_then(|name| name.to_str())
232 .unwrap_or("workspace")
233 }
234 ]
235 }))?;
236
237 let result = self.send_request::<lsp_types::request::Initialize>(params)?;
238 self.send_notification::<lsp_types::notification::Initialized>(serde_json::from_value(
239 json!({}),
240 )?)?;
241 self.state = ServerState::Ready;
242 Ok(result)
243 }
244
245 pub fn send_request<R>(&mut self, params: R::Params) -> Result<R::Result, LspError>
247 where
248 R: lsp_types::request::Request,
249 R::Params: serde::Serialize,
250 R::Result: DeserializeOwned,
251 {
252 self.ensure_can_send()?;
253
254 let id = RequestId::Int(self.next_id.fetch_add(1, Ordering::Relaxed));
255 let (tx, rx) = bounded(1);
256 {
257 let mut pending = self.lock_pending()?;
258 pending.insert(id.clone(), tx);
259 }
260
261 let request = Request::new(id.clone(), R::METHOD, Some(serde_json::to_value(params)?));
262 {
263 let mut writer = self
264 .writer
265 .lock()
266 .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
267 if let Err(err) = transport::write_request(&mut *writer, &request) {
268 self.remove_pending(&id);
269 return Err(err.into());
270 }
271 }
272
273 let response = match rx.recv_timeout(REQUEST_TIMEOUT) {
274 Ok(response) => response,
275 Err(RecvTimeoutError::Timeout) => {
276 self.remove_pending(&id);
277 return Err(LspError::Timeout(format!(
278 "timed out waiting for '{}' response from {:?}",
279 R::METHOD,
280 self.kind
281 )));
282 }
283 Err(RecvTimeoutError::Disconnected) => {
284 self.remove_pending(&id);
285 return Err(LspError::ServerNotReady(format!(
286 "language server {:?} disconnected while waiting for '{}'",
287 self.kind,
288 R::METHOD
289 )));
290 }
291 };
292
293 if let Some(error) = response.error {
294 return Err(LspError::ServerError {
295 code: error.code,
296 message: error.message,
297 });
298 }
299
300 serde_json::from_value(response.result.unwrap_or(Value::Null)).map_err(Into::into)
301 }
302
303 pub fn send_notification<N>(&mut self, params: N::Params) -> Result<(), LspError>
305 where
306 N: lsp_types::notification::Notification,
307 N::Params: serde::Serialize,
308 {
309 self.ensure_can_send()?;
310 let notification = Notification::new(N::METHOD, Some(serde_json::to_value(params)?));
311 let mut writer = self
312 .writer
313 .lock()
314 .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
315 transport::write_notification(&mut *writer, ¬ification)?;
316 Ok(())
317 }
318
319 pub fn shutdown(&mut self) -> Result<(), LspError> {
321 if self.state == ServerState::Exited {
322 return Ok(());
323 }
324
325 if self.child.try_wait()?.is_some() {
326 self.state = ServerState::Exited;
327 return Ok(());
328 }
329
330 if let Err(err) = self.send_request::<lsp_types::request::Shutdown>(()) {
331 self.state = ServerState::ShuttingDown;
332 if self.child.try_wait()?.is_some() {
333 self.state = ServerState::Exited;
334 return Ok(());
335 }
336 return Err(err);
337 }
338
339 self.state = ServerState::ShuttingDown;
340
341 if let Err(err) = self.send_notification::<lsp_types::notification::Exit>(()) {
342 if self.child.try_wait()?.is_some() {
343 self.state = ServerState::Exited;
344 return Ok(());
345 }
346 return Err(err);
347 }
348
349 let deadline = Instant::now() + SHUTDOWN_TIMEOUT;
350 loop {
351 if self.child.try_wait()?.is_some() {
352 self.state = ServerState::Exited;
353 return Ok(());
354 }
355 if Instant::now() >= deadline {
356 return Err(LspError::Timeout(format!(
357 "timed out waiting for {:?} to exit",
358 self.kind
359 )));
360 }
361 thread::sleep(EXIT_POLL_INTERVAL);
362 }
363 }
364
365 pub fn state(&self) -> ServerState {
366 self.state
367 }
368
369 pub fn kind(&self) -> ServerKind {
370 self.kind
371 }
372
373 pub fn root(&self) -> &Path {
374 &self.root
375 }
376
377 fn ensure_can_send(&self) -> Result<(), LspError> {
378 if matches!(self.state, ServerState::ShuttingDown | ServerState::Exited) {
379 return Err(LspError::ServerNotReady(format!(
380 "language server {:?} is not ready (state: {:?})",
381 self.kind, self.state
382 )));
383 }
384 Ok(())
385 }
386
387 fn lock_pending(&self) -> Result<std::sync::MutexGuard<'_, PendingMap>, LspError> {
388 self.pending
389 .lock()
390 .map_err(|_| io::Error::other("pending response map poisoned").into())
391 }
392
393 fn remove_pending(&self, id: &RequestId) {
394 if let Ok(mut pending) = self.pending.lock() {
395 pending.remove(id);
396 }
397 }
398}