1use std::collections::HashMap;
2use std::io::{self, BufReader, BufWriter};
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicI64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
12use serde::de::DeserializeOwned;
13use serde_json::{json, Value};
14
15use crate::lsp::jsonrpc::{
16 Notification, Request, RequestId, Response as JsonRpcResponse, ServerMessage,
17};
18use crate::lsp::registry::ServerKind;
19use crate::lsp::{transport, LspError};
20
21const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
22const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
23const EXIT_POLL_INTERVAL: Duration = Duration::from_millis(25);
24
25type PendingMap = HashMap<RequestId, Sender<JsonRpcResponse>>;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum ServerState {
30 Starting,
31 Initializing,
32 Ready,
33 ShuttingDown,
34 Exited,
35}
36
37#[derive(Debug)]
39pub enum LspEvent {
40 Notification {
42 server_kind: ServerKind,
43 root: PathBuf,
44 method: String,
45 params: Option<Value>,
46 },
47 ServerRequest {
49 server_kind: ServerKind,
50 root: PathBuf,
51 id: RequestId,
52 method: String,
53 params: Option<Value>,
54 },
55 ServerExited {
57 server_kind: ServerKind,
58 root: PathBuf,
59 },
60}
61
62pub struct LspClient {
64 kind: ServerKind,
65 root: PathBuf,
66 state: ServerState,
67 child: Child,
68 writer: Arc<Mutex<BufWriter<std::process::ChildStdin>>>,
69 #[allow(dead_code)]
72 event_rx: Receiver<LspEvent>,
73 pending: Arc<Mutex<PendingMap>>,
75 next_id: AtomicI64,
77}
78
79impl LspClient {
80 pub fn spawn(
82 kind: ServerKind,
83 root: PathBuf,
84 binary: &Path,
85 args: &[&str],
86 event_tx: Sender<LspEvent>,
87 ) -> io::Result<Self> {
88 let mut child = Command::new(binary)
89 .args(args)
90 .current_dir(&root)
91 .stdin(Stdio::piped())
92 .stdout(Stdio::piped())
93 .stderr(Stdio::piped())
94 .spawn()?;
95
96 let stdout = child
97 .stdout
98 .take()
99 .ok_or_else(|| io::Error::other("language server missing stdout pipe"))?;
100 let stdin = child
101 .stdin
102 .take()
103 .ok_or_else(|| io::Error::other("language server missing stdin pipe"))?;
104
105 let writer = Arc::new(Mutex::new(BufWriter::new(stdin)));
106 let pending = Arc::new(Mutex::new(PendingMap::new()));
107 let reader_pending = Arc::clone(&pending);
108 let reader_writer = Arc::clone(&writer);
109 let reader_kind = kind;
110 let reader_root = root.clone();
111 let (_client_event_tx, event_rx) = crossbeam_channel::unbounded();
112
113 thread::spawn(move || {
114 let mut reader = BufReader::new(stdout);
115 loop {
116 match transport::read_message(&mut reader) {
117 Ok(Some(ServerMessage::Response(response))) => {
118 if let Ok(mut guard) = reader_pending.lock() {
119 if let Some(tx) = guard.remove(&response.id) {
120 let _ = tx.send(response);
121 }
122 } else {
123 let _ = event_tx.send(LspEvent::ServerExited {
124 server_kind: reader_kind,
125 root: reader_root.clone(),
126 });
127 break;
128 }
129 }
130 Ok(Some(ServerMessage::Notification { method, params })) => {
131 let _ = event_tx.send(LspEvent::Notification {
132 server_kind: reader_kind,
133 root: reader_root.clone(),
134 method,
135 params,
136 });
137 }
138 Ok(Some(ServerMessage::Request { id, method, params })) => {
139 if let Ok(mut w) = reader_writer.lock() {
145 let response = super::jsonrpc::OutgoingResponse::success(
146 id.clone(),
147 serde_json::Value::Null,
148 );
149 let _ = transport::write_response(&mut *w, &response);
150 }
151 let _ = event_tx.send(LspEvent::ServerRequest {
153 server_kind: reader_kind,
154 root: reader_root.clone(),
155 id,
156 method,
157 params,
158 });
159 }
160 Ok(None) | Err(_) => {
161 if let Ok(mut guard) = reader_pending.lock() {
162 guard.clear();
163 }
164 let _ = event_tx.send(LspEvent::ServerExited {
165 server_kind: reader_kind,
166 root: reader_root.clone(),
167 });
168 break;
169 }
170 }
171 }
172 });
173
174 Ok(Self {
175 kind,
176 root,
177 state: ServerState::Starting,
178 child,
179 writer,
180 event_rx,
181 pending,
182 next_id: AtomicI64::new(1),
183 })
184 }
185
186 pub fn initialize(
188 &mut self,
189 workspace_root: &Path,
190 ) -> Result<lsp_types::InitializeResult, LspError> {
191 self.ensure_can_send()?;
192 self.state = ServerState::Initializing;
193
194 let root_uri = lsp_types::Uri::from_str(&format!("file://{}", workspace_root.display()))
195 .map_err(|_| {
196 LspError::NotFound(format!(
197 "failed to convert workspace root '{}' to file URI",
198 workspace_root.display()
199 ))
200 })?;
201
202 let params = serde_json::from_value::<lsp_types::InitializeParams>(json!({
203 "processId": std::process::id(),
204 "rootUri": root_uri,
205 "capabilities": {
206 "workspace": {
207 "workspaceFolders": true,
208 "configuration": true
209 },
210 "textDocument": {
211 "synchronization": {
212 "dynamicRegistration": false,
213 "didSave": true,
214 "willSave": false,
215 "willSaveWaitUntil": false
216 },
217 "publishDiagnostics": {
218 "relatedInformation": true,
219 "versionSupport": true,
220 "codeDescriptionSupport": true,
221 "dataSupport": true
222 }
223 }
224 },
225 "clientInfo": {
226 "name": "aft",
227 "version": env!("CARGO_PKG_VERSION")
228 },
229 "workspaceFolders": [
230 {
231 "uri": root_uri,
232 "name": workspace_root
233 .file_name()
234 .and_then(|name| name.to_str())
235 .unwrap_or("workspace")
236 }
237 ]
238 }))?;
239
240 let result = self.send_request::<lsp_types::request::Initialize>(params)?;
241 self.send_notification::<lsp_types::notification::Initialized>(serde_json::from_value(
242 json!({}),
243 )?)?;
244 self.state = ServerState::Ready;
245 Ok(result)
246 }
247
248 pub fn send_request<R>(&mut self, params: R::Params) -> Result<R::Result, LspError>
250 where
251 R: lsp_types::request::Request,
252 R::Params: serde::Serialize,
253 R::Result: DeserializeOwned,
254 {
255 self.ensure_can_send()?;
256
257 let id = RequestId::Int(self.next_id.fetch_add(1, Ordering::Relaxed));
258 let (tx, rx) = bounded(1);
259 {
260 let mut pending = self.lock_pending()?;
261 pending.insert(id.clone(), tx);
262 }
263
264 let request = Request::new(id.clone(), R::METHOD, Some(serde_json::to_value(params)?));
265 {
266 let mut writer = self
267 .writer
268 .lock()
269 .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
270 if let Err(err) = transport::write_request(&mut *writer, &request) {
271 self.remove_pending(&id);
272 return Err(err.into());
273 }
274 }
275
276 let response = match rx.recv_timeout(REQUEST_TIMEOUT) {
277 Ok(response) => response,
278 Err(RecvTimeoutError::Timeout) => {
279 self.remove_pending(&id);
280 return Err(LspError::Timeout(format!(
281 "timed out waiting for '{}' response from {:?}",
282 R::METHOD,
283 self.kind
284 )));
285 }
286 Err(RecvTimeoutError::Disconnected) => {
287 self.remove_pending(&id);
288 return Err(LspError::ServerNotReady(format!(
289 "language server {:?} disconnected while waiting for '{}'",
290 self.kind,
291 R::METHOD
292 )));
293 }
294 };
295
296 if let Some(error) = response.error {
297 return Err(LspError::ServerError {
298 code: error.code,
299 message: error.message,
300 });
301 }
302
303 serde_json::from_value(response.result.unwrap_or(Value::Null)).map_err(Into::into)
304 }
305
306 pub fn send_notification<N>(&mut self, params: N::Params) -> Result<(), LspError>
308 where
309 N: lsp_types::notification::Notification,
310 N::Params: serde::Serialize,
311 {
312 self.ensure_can_send()?;
313 let notification = Notification::new(N::METHOD, Some(serde_json::to_value(params)?));
314 let mut writer = self
315 .writer
316 .lock()
317 .map_err(|_| LspError::ServerNotReady("writer lock poisoned".to_string()))?;
318 transport::write_notification(&mut *writer, ¬ification)?;
319 Ok(())
320 }
321
322 pub fn shutdown(&mut self) -> Result<(), LspError> {
324 if self.state == ServerState::Exited {
325 return Ok(());
326 }
327
328 if self.child.try_wait()?.is_some() {
329 self.state = ServerState::Exited;
330 return Ok(());
331 }
332
333 if let Err(err) = self.send_request::<lsp_types::request::Shutdown>(()) {
334 self.state = ServerState::ShuttingDown;
335 if self.child.try_wait()?.is_some() {
336 self.state = ServerState::Exited;
337 return Ok(());
338 }
339 return Err(err);
340 }
341
342 self.state = ServerState::ShuttingDown;
343
344 if let Err(err) = self.send_notification::<lsp_types::notification::Exit>(()) {
345 if self.child.try_wait()?.is_some() {
346 self.state = ServerState::Exited;
347 return Ok(());
348 }
349 return Err(err);
350 }
351
352 let deadline = Instant::now() + SHUTDOWN_TIMEOUT;
353 loop {
354 if self.child.try_wait()?.is_some() {
355 self.state = ServerState::Exited;
356 return Ok(());
357 }
358 if Instant::now() >= deadline {
359 return Err(LspError::Timeout(format!(
360 "timed out waiting for {:?} to exit",
361 self.kind
362 )));
363 }
364 thread::sleep(EXIT_POLL_INTERVAL);
365 }
366 }
367
368 pub fn state(&self) -> ServerState {
369 self.state
370 }
371
372 pub fn kind(&self) -> ServerKind {
373 self.kind
374 }
375
376 pub fn root(&self) -> &Path {
377 &self.root
378 }
379
380 fn ensure_can_send(&self) -> Result<(), LspError> {
381 if matches!(self.state, ServerState::ShuttingDown | ServerState::Exited) {
382 return Err(LspError::ServerNotReady(format!(
383 "language server {:?} is not ready (state: {:?})",
384 self.kind, self.state
385 )));
386 }
387 Ok(())
388 }
389
390 fn lock_pending(&self) -> Result<std::sync::MutexGuard<'_, PendingMap>, LspError> {
391 self.pending
392 .lock()
393 .map_err(|_| io::Error::other("pending response map poisoned").into())
394 }
395
396 fn remove_pending(&self, id: &RequestId) {
397 if let Ok(mut pending) = self.pending.lock() {
398 pending.remove(id);
399 }
400 }
401}