1use std::collections::HashMap;
2use std::io::{self, BufReader, BufWriter};
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicI64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread;
9use std::time::{Duration, Instant};
10
11use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
12use serde::de::DeserializeOwned;
13use serde_json::{json, Value};
14
15use crate::lsp::jsonrpc::{
16 Notification, Request, RequestId, Response as JsonRpcResponse, ServerMessage,
17};
18use crate::lsp::registry::ServerKind;
19use crate::lsp::{transport, LspError};
20
21const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
22const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
23const EXIT_POLL_INTERVAL: Duration = Duration::from_millis(25);
24
25type PendingMap = HashMap<RequestId, Sender<JsonRpcResponse>>;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum ServerState {
30 Starting,
31 Initializing,
32 Ready,
33 ShuttingDown,
34 Exited,
35}
36
37#[derive(Debug)]
39pub enum LspEvent {
40 Notification {
42 server_kind: ServerKind,
43 root: PathBuf,
44 method: String,
45 params: Option<Value>,
46 },
47 ServerRequest {
49 server_kind: ServerKind,
50 root: PathBuf,
51 id: RequestId,
52 method: String,
53 params: Option<Value>,
54 },
55 ServerExited {
57 server_kind: ServerKind,
58 root: PathBuf,
59 },
60}
61
62pub struct LspClient {
64 kind: ServerKind,
65 root: PathBuf,
66 state: ServerState,
67 child: Child,
68 writer: BufWriter<std::process::ChildStdin>,
69 #[allow(dead_code)]
72 event_rx: Receiver<LspEvent>,
73 pending: Arc<Mutex<PendingMap>>,
75 next_id: AtomicI64,
77}
78
79impl LspClient {
80 pub fn spawn(
82 kind: ServerKind,
83 root: PathBuf,
84 binary: &Path,
85 args: &[&str],
86 event_tx: Sender<LspEvent>,
87 ) -> io::Result<Self> {
88 let mut child = Command::new(binary)
89 .args(args)
90 .current_dir(&root)
91 .stdin(Stdio::piped())
92 .stdout(Stdio::piped())
93 .stderr(Stdio::piped())
94 .spawn()?;
95
96 let stdout = child
97 .stdout
98 .take()
99 .ok_or_else(|| io::Error::other("language server missing stdout pipe"))?;
100 let stdin = child
101 .stdin
102 .take()
103 .ok_or_else(|| io::Error::other("language server missing stdin pipe"))?;
104
105 let pending = Arc::new(Mutex::new(PendingMap::new()));
106 let reader_pending = Arc::clone(&pending);
107 let reader_kind = kind;
108 let reader_root = root.clone();
109 let (_client_event_tx, event_rx) = crossbeam_channel::unbounded();
110
111 thread::spawn(move || {
112 let mut reader = BufReader::new(stdout);
113 loop {
114 match transport::read_message(&mut reader) {
115 Ok(Some(ServerMessage::Response(response))) => {
116 if let Ok(mut guard) = reader_pending.lock() {
117 if let Some(tx) = guard.remove(&response.id) {
118 let _ = tx.send(response);
119 }
120 } else {
121 let _ = event_tx.send(LspEvent::ServerExited {
122 server_kind: reader_kind,
123 root: reader_root.clone(),
124 });
125 break;
126 }
127 }
128 Ok(Some(ServerMessage::Notification { method, params })) => {
129 let _ = event_tx.send(LspEvent::Notification {
130 server_kind: reader_kind,
131 root: reader_root.clone(),
132 method,
133 params,
134 });
135 }
136 Ok(Some(ServerMessage::Request { id, method, params })) => {
137 let _ = event_tx.send(LspEvent::ServerRequest {
138 server_kind: reader_kind,
139 root: reader_root.clone(),
140 id,
141 method,
142 params,
143 });
144 }
145 Ok(None) | Err(_) => {
146 if let Ok(mut guard) = reader_pending.lock() {
147 guard.clear();
148 }
149 let _ = event_tx.send(LspEvent::ServerExited {
150 server_kind: reader_kind,
151 root: reader_root.clone(),
152 });
153 break;
154 }
155 }
156 }
157 });
158
159 Ok(Self {
160 kind,
161 root,
162 state: ServerState::Starting,
163 child,
164 writer: BufWriter::new(stdin),
165 event_rx,
166 pending,
167 next_id: AtomicI64::new(1),
168 })
169 }
170
171 pub fn initialize(
173 &mut self,
174 workspace_root: &Path,
175 ) -> Result<lsp_types::InitializeResult, LspError> {
176 self.ensure_can_send()?;
177 self.state = ServerState::Initializing;
178
179 let root_uri = lsp_types::Uri::from_str(&format!("file://{}", workspace_root.display()))
180 .map_err(|_| {
181 LspError::NotFound(format!(
182 "failed to convert workspace root '{}' to file URI",
183 workspace_root.display()
184 ))
185 })?;
186
187 let params = serde_json::from_value::<lsp_types::InitializeParams>(json!({
188 "processId": std::process::id(),
189 "rootUri": root_uri,
190 "capabilities": {
191 "workspace": {
192 "workspaceFolders": true,
193 "configuration": true
194 },
195 "textDocument": {
196 "synchronization": {
197 "dynamicRegistration": false,
198 "didSave": true,
199 "willSave": false,
200 "willSaveWaitUntil": false
201 },
202 "publishDiagnostics": {
203 "relatedInformation": true,
204 "versionSupport": true,
205 "codeDescriptionSupport": true,
206 "dataSupport": true
207 }
208 }
209 },
210 "clientInfo": {
211 "name": "aft",
212 "version": env!("CARGO_PKG_VERSION")
213 },
214 "workspaceFolders": [
215 {
216 "uri": root_uri,
217 "name": workspace_root
218 .file_name()
219 .and_then(|name| name.to_str())
220 .unwrap_or("workspace")
221 }
222 ]
223 }))?;
224
225 let result = self.send_request::<lsp_types::request::Initialize>(params)?;
226 self.send_notification::<lsp_types::notification::Initialized>(serde_json::from_value(
227 json!({}),
228 )?)?;
229 self.state = ServerState::Ready;
230 Ok(result)
231 }
232
233 pub fn send_request<R>(&mut self, params: R::Params) -> Result<R::Result, LspError>
235 where
236 R: lsp_types::request::Request,
237 R::Params: serde::Serialize,
238 R::Result: DeserializeOwned,
239 {
240 self.ensure_can_send()?;
241
242 let id = RequestId::Int(self.next_id.fetch_add(1, Ordering::Relaxed));
243 let (tx, rx) = bounded(1);
244 {
245 let mut pending = self.lock_pending()?;
246 pending.insert(id.clone(), tx);
247 }
248
249 let request = Request::new(id.clone(), R::METHOD, Some(serde_json::to_value(params)?));
250 if let Err(err) = transport::write_request(&mut self.writer, &request) {
251 self.remove_pending(&id);
252 return Err(err.into());
253 }
254
255 let response = match rx.recv_timeout(REQUEST_TIMEOUT) {
256 Ok(response) => response,
257 Err(RecvTimeoutError::Timeout) => {
258 self.remove_pending(&id);
259 return Err(LspError::Timeout(format!(
260 "timed out waiting for '{}' response from {:?}",
261 R::METHOD,
262 self.kind
263 )));
264 }
265 Err(RecvTimeoutError::Disconnected) => {
266 self.remove_pending(&id);
267 return Err(LspError::ServerNotReady(format!(
268 "language server {:?} disconnected while waiting for '{}'",
269 self.kind,
270 R::METHOD
271 )));
272 }
273 };
274
275 if let Some(error) = response.error {
276 return Err(LspError::ServerError {
277 code: error.code,
278 message: error.message,
279 });
280 }
281
282 serde_json::from_value(response.result.unwrap_or(Value::Null)).map_err(Into::into)
283 }
284
285 pub fn send_notification<N>(&mut self, params: N::Params) -> Result<(), LspError>
287 where
288 N: lsp_types::notification::Notification,
289 N::Params: serde::Serialize,
290 {
291 self.ensure_can_send()?;
292 let notification = Notification::new(N::METHOD, Some(serde_json::to_value(params)?));
293 transport::write_notification(&mut self.writer, ¬ification)?;
294 Ok(())
295 }
296
297 pub fn shutdown(&mut self) -> Result<(), LspError> {
299 if self.state == ServerState::Exited {
300 return Ok(());
301 }
302
303 if self.child.try_wait()?.is_some() {
304 self.state = ServerState::Exited;
305 return Ok(());
306 }
307
308 if let Err(err) = self.send_request::<lsp_types::request::Shutdown>(()) {
309 self.state = ServerState::ShuttingDown;
310 if self.child.try_wait()?.is_some() {
311 self.state = ServerState::Exited;
312 return Ok(());
313 }
314 return Err(err);
315 }
316
317 self.state = ServerState::ShuttingDown;
318
319 if let Err(err) = self.send_notification::<lsp_types::notification::Exit>(()) {
320 if self.child.try_wait()?.is_some() {
321 self.state = ServerState::Exited;
322 return Ok(());
323 }
324 return Err(err);
325 }
326
327 let deadline = Instant::now() + SHUTDOWN_TIMEOUT;
328 loop {
329 if self.child.try_wait()?.is_some() {
330 self.state = ServerState::Exited;
331 return Ok(());
332 }
333 if Instant::now() >= deadline {
334 return Err(LspError::Timeout(format!(
335 "timed out waiting for {:?} to exit",
336 self.kind
337 )));
338 }
339 thread::sleep(EXIT_POLL_INTERVAL);
340 }
341 }
342
343 pub fn state(&self) -> ServerState {
344 self.state
345 }
346
347 pub fn kind(&self) -> ServerKind {
348 self.kind
349 }
350
351 pub fn root(&self) -> &Path {
352 &self.root
353 }
354
355 fn ensure_can_send(&self) -> Result<(), LspError> {
356 if matches!(self.state, ServerState::ShuttingDown | ServerState::Exited) {
357 return Err(LspError::ServerNotReady(format!(
358 "language server {:?} is not ready (state: {:?})",
359 self.kind, self.state
360 )));
361 }
362 Ok(())
363 }
364
365 fn lock_pending(&self) -> Result<std::sync::MutexGuard<'_, PendingMap>, LspError> {
366 self.pending
367 .lock()
368 .map_err(|_| io::Error::other("pending response map poisoned").into())
369 }
370
371 fn remove_pending(&self, id: &RequestId) {
372 if let Ok(mut pending) = self.pending.lock() {
373 pending.remove(id);
374 }
375 }
376}