sync_ls/server/
lsp_srv.rs

1use super::*;
2
3use lsp_types::{notification::Notification as Notif, request::Request as Req, *};
4
5type PureHandler<S, T> = fn(srv: &mut S, args: T) -> LspResult<()>;
6
7/// Converts a `ScheduledResult` to a `SchedulableResponse`.
8macro_rules! reschedule {
9    ($expr:expr) => {
10        match $expr {
11            Ok(Some(())) => return,
12            Ok(None) => Ok(futures::future::MaybeDone::Done(Ok(
13                serde_json::Value::Null,
14            ))),
15            Err(e) => Err(e),
16        }
17    };
18}
19
20impl<S: 'static> TypedLspClient<S> {
21    /// Sends a request to the client and registers a handler handled by the
22    /// service `S`.
23    pub fn send_lsp_request<R: Req>(
24        &self,
25        params: R::Params,
26        handler: impl FnOnce(&mut S, lsp::Response) + Send + Sync + 'static,
27    ) {
28        let caster = self.caster.clone();
29        self.client
30            .send_lsp_request_::<R>(params, move |s, resp| handler(caster(s), resp))
31    }
32}
33
34impl LspClient {
35    /// Sends a request to the client and registers a handler.
36    pub fn send_lsp_request_<R: Req>(
37        &self,
38        params: R::Params,
39        handler: impl FnOnce(&mut dyn Any, lsp::Response) + Send + Sync + 'static,
40    ) {
41        let mut req_queue = self.req_queue.lock();
42        let request = req_queue.outgoing.register(
43            R::METHOD.to_owned(),
44            params,
45            Box::new(|s, resp| handler(s, resp.try_into().unwrap())),
46        );
47
48        let Some(sender) = self.sender.upgrade() else {
49            log::warn!("failed to send request: connection closed");
50            return;
51        };
52        if let Err(res) = sender.lsp.send(request.into()) {
53            log::warn!("failed to send request: {res:?}");
54        }
55    }
56
57    /// Completes an client2server request in the request queue.
58    pub fn respond_lsp(&self, response: lsp::Response) {
59        self.respond(response.id.clone(), response.into())
60    }
61
62    /// Sends a typed notification to the client.
63    pub fn send_notification<N: Notif>(&self, params: &N::Params) {
64        self.send_notification_(lsp::Notification::new(N::METHOD.to_owned(), params));
65    }
66
67    /// Sends an untyped notification to the client.
68    pub fn send_notification_(&self, notif: lsp::Notification) {
69        let method = &notif.method;
70        let Some(sender) = self.sender.upgrade() else {
71            log::warn!("failed to send notification ({method}): connection closed");
72            return;
73        };
74        if let Err(res) = sender.lsp.send(notif.into()) {
75            log::warn!("failed to send notification: {res:?}");
76        }
77    }
78}
79
80impl<Args: Initializer> LsBuilder<LspMessage, Args>
81where
82    Args::S: 'static,
83{
84    /// Registers an raw event handler.
85    pub fn with_command_(
86        mut self,
87        cmd: &'static str,
88        handler: RawHandler<Args::S, Vec<JsonValue>>,
89    ) -> Self {
90        self.command_handlers.insert(cmd, raw_to_boxed(handler));
91        self
92    }
93
94    /// Registers an async command handler.
95    pub fn with_command<R: Serialize + 'static>(
96        mut self,
97        cmd: &'static str,
98        handler: AsyncHandler<Args::S, Vec<JsonValue>, R>,
99    ) -> Self {
100        self.command_handlers.insert(
101            cmd,
102            Box::new(move |s, client, req_id, req| client.schedule(req_id, handler(s, req))),
103        );
104        self
105    }
106
107    /// Registers an untyped notification handler.
108    pub fn with_notification_<R: Notif>(
109        mut self,
110        handler: PureHandler<Args::S, JsonValue>,
111    ) -> Self {
112        self.notif_handlers.insert(R::METHOD, Box::new(handler));
113        self
114    }
115
116    /// Registers a typed notification handler.
117    pub fn with_notification<R: Notif>(mut self, handler: PureHandler<Args::S, R::Params>) -> Self {
118        self.notif_handlers.insert(
119            R::METHOD,
120            Box::new(move |s, req| handler(s, from_json(req)?)),
121        );
122        self
123    }
124
125    /// Registers a raw request handler that handlers a kind of untyped lsp
126    /// request.
127    pub fn with_raw_request<R: Req>(mut self, handler: RawHandler<Args::S, JsonValue>) -> Self {
128        self.req_handlers.insert(R::METHOD, raw_to_boxed(handler));
129        self
130    }
131
132    // todo: unsafe typed
133    /// Registers an raw request handler that handlers a kind of typed lsp
134    /// request.
135    pub fn with_request_<R: Req>(
136        mut self,
137        handler: fn(&mut Args::S, RequestId, R::Params) -> ScheduledResult,
138    ) -> Self {
139        self.req_handlers.insert(
140            R::METHOD,
141            Box::new(move |s, _client, req_id, req| handler(s, req_id, from_json(req)?)),
142        );
143        self
144    }
145
146    /// Registers a typed request handler.
147    pub fn with_request<R: Req>(
148        mut self,
149        handler: AsyncHandler<Args::S, R::Params, R::Result>,
150    ) -> Self {
151        self.req_handlers.insert(
152            R::METHOD,
153            Box::new(move |s, client, req_id, req| {
154                client.schedule(req_id, handler(s, from_json(req)?))
155            }),
156        );
157        self
158    }
159}
160
161impl<Args: Initializer> LsDriver<LspMessage, Args>
162where
163    Args::S: 'static,
164{
165    /// Starts the language server on the given connection.
166    ///
167    /// If `is_replay` is true, the server will wait for all pending requests to
168    /// finish before exiting. This is useful for testing the language server.
169    ///
170    /// See [`transport::MirrorArgs`] for information about the record-replay
171    /// feature.
172    pub fn start(
173        &mut self,
174        inbox: TConnectionRx<LspMessage>,
175        is_replay: bool,
176    ) -> anyhow::Result<()> {
177        let res = self.start_(inbox);
178
179        if is_replay {
180            let client = self.client.clone();
181            let _ = std::thread::spawn(move || {
182                let since = std::time::Instant::now();
183                let timeout = std::env::var("REPLAY_TIMEOUT")
184                    .ok()
185                    .and_then(|s| s.parse().ok())
186                    .unwrap_or(60);
187                client.handle.block_on(async {
188                    while client.has_pending_requests() {
189                        if since.elapsed().as_secs() > timeout {
190                            log::error!("replay timeout reached, {timeout}s");
191                            client.begin_panic();
192                        }
193
194                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
195                    }
196                })
197            })
198            .join();
199        }
200
201        res
202    }
203
204    /// Starts the language server on the given connection.
205    pub fn start_(&mut self, inbox: TConnectionRx<LspMessage>) -> anyhow::Result<()> {
206        use EventOrMessage::*;
207        // todo: follow what rust analyzer does
208        // Windows scheduler implements priority boosts: if thread waits for an
209        // event (like a condvar), and event fires, priority of the thread is
210        // temporary bumped. This optimization backfires in our case: each time
211        // the `main_loop` schedules a task to run on a threadpool, the
212        // worker threads gets a higher priority, and (on a machine with
213        // fewer cores) displaces the main loop! We work around this by
214        // marking the main loop as a higher-priority thread.
215        //
216        // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
217        // https://docs.microsoft.com/en-us/windows/win32/procthread/priority-boosts
218        // https://github.com/rust-lang/rust-analyzer/issues/2835
219        // #[cfg(windows)]
220        // unsafe {
221        //     use winapi::um::processthreadsapi::*;
222        //     let thread = GetCurrentThread();
223        //     let thread_priority_above_normal = 1;
224        //     SetThreadPriority(thread, thread_priority_above_normal);
225        // }
226
227        while let Ok(msg) = inbox.recv() {
228            const EXIT_METHOD: &str = notification::Exit::METHOD;
229            let loop_start = Instant::now();
230            match msg {
231                Evt(event) => {
232                    let Some(event_handler) = self.events.get(&event.as_ref().type_id()) else {
233                        log::warn!("unhandled event: {:?}", event.as_ref().type_id());
234                        continue;
235                    };
236
237                    let s = match &mut self.state {
238                        State::Uninitialized(u) => ServiceState::Uninitialized(u.as_deref_mut()),
239                        State::Initializing(s) | State::Ready(s) => ServiceState::Ready(s),
240                        State::ShuttingDown => {
241                            log::warn!("server is shutting down");
242                            continue;
243                        }
244                    };
245
246                    event_handler(s, &self.client, event)?;
247                }
248                Msg(LspMessage::Request(req)) => self.on_lsp_request(loop_start, req),
249                Msg(LspMessage::Notification(not)) => {
250                    let is_exit = not.method == EXIT_METHOD;
251                    self.on_notification(loop_start, not)?;
252                    if is_exit {
253                        return Ok(());
254                    }
255                }
256                Msg(LspMessage::Response(resp)) => {
257                    let s = match &mut self.state {
258                        State::Ready(s) => s,
259                        _ => {
260                            log::warn!("server is not ready yet");
261                            continue;
262                        }
263                    };
264
265                    self.client.clone().complete_lsp_request(s, resp)
266                }
267            }
268        }
269
270        log::warn!("client exited without proper shutdown sequence");
271        Ok(())
272    }
273
274    /// Registers and handles a request. This should only be called once per
275    /// incoming request.
276    fn on_lsp_request(&mut self, request_received: Instant, req: Request) {
277        self.client
278            .register_request(&req.method, &req.id, request_received);
279
280        let req_id = req.id.clone();
281        let resp = match (&mut self.state, &*req.method) {
282            (State::Uninitialized(args), request::Initialize::METHOD) => {
283                // todo: what will happen if the request cannot be deserialized?
284                let params = serde_json::from_value::<Args::I>(req.params);
285                match params {
286                    Ok(params) => {
287                        let args = args.take().expect("already initialized");
288                        let (s, res) = args.initialize(params);
289                        self.state = State::Initializing(s);
290                        res
291                    }
292                    Err(e) => just_result(Err(invalid_request(e))),
293                }
294            }
295            (State::Uninitialized(..) | State::Initializing(..), _) => {
296                just_result(Err(not_initialized()))
297            }
298            (_, request::Initialize::METHOD) => {
299                just_result(Err(invalid_request("server is already initialized")))
300            }
301            // todo: generalize this
302            (State::Ready(..), request::ExecuteCommand::METHOD) => {
303                reschedule!(self.on_execute_command(req))
304            }
305            (State::Ready(s), _) => 'serve_req: {
306                let method = req.method.as_str();
307                let is_shutdown = method == request::Shutdown::METHOD;
308
309                let Some(handler) = self.requests.get(method) else {
310                    log::warn!("unhandled lsp request: {method}");
311                    break 'serve_req just_result(Err(method_not_found()));
312                };
313
314                let result = handler(s, &self.client, req_id.clone(), req.params);
315                self.client.schedule_tail(req_id, result);
316
317                if is_shutdown {
318                    self.state = State::ShuttingDown;
319                }
320
321                return;
322            }
323            (State::ShuttingDown, _) => {
324                just_result(Err(invalid_request("server is shutting down")))
325            }
326        };
327
328        let result = self.client.schedule(req_id.clone(), resp);
329        self.client.schedule_tail(req_id, result);
330    }
331
332    /// The entry point for the `workspace/executeCommand` request.
333    fn on_execute_command(&mut self, req: Request) -> ScheduledResult {
334        let s = self.state.opt_mut().ok_or_else(not_initialized)?;
335
336        let params = from_value::<ExecuteCommandParams>(req.params)
337            .map_err(|e| invalid_params(e.to_string()))?;
338
339        let ExecuteCommandParams {
340            command, arguments, ..
341        } = params;
342
343        // todo: generalize this
344        if command == "tinymist.getResources" {
345            self.get_resources(req.id, arguments)
346        } else {
347            let Some(handler) = self.commands.get(command.as_str()) else {
348                log::error!("asked to execute unknown command: {command}");
349                return Err(method_not_found());
350            };
351            handler(s, &self.client, req.id, arguments)
352        }
353    }
354
355    /// Handles an incoming notification.
356    fn on_notification(&mut self, received_at: Instant, not: Notification) -> anyhow::Result<()> {
357        self.client.start_notification(&not.method);
358        let handle = |s, Notification { method, params }: Notification| {
359            let Some(handler) = self.notifications.get(method.as_str()) else {
360                log::warn!("unhandled notification: {method}");
361                return Ok(());
362            };
363
364            let result = handler(s, params);
365            self.client.stop_notification(&method, received_at, result);
366
367            Ok(())
368        };
369
370        match (&mut self.state, &*not.method) {
371            (state, notification::Initialized::METHOD) => {
372                let mut s = State::ShuttingDown;
373                std::mem::swap(state, &mut s);
374                match s {
375                    State::Initializing(s) => {
376                        *state = State::Ready(s);
377                    }
378                    _ => {
379                        std::mem::swap(state, &mut s);
380                    }
381                }
382
383                let s = match state {
384                    State::Ready(s) => s,
385                    _ => {
386                        log::warn!("server is not ready yet");
387                        return Ok(());
388                    }
389                };
390                handle(s, not)
391            }
392            (State::Ready(state), _) => handle(state, not),
393            // todo: whether it is safe to ignore notifications
394            (State::Uninitialized(..) | State::Initializing(..), method) => {
395                log::warn!("server is not ready yet, while received notification {method}");
396                Ok(())
397            }
398            (State::ShuttingDown, method) => {
399                log::warn!("server is shutting down, while received notification {method}");
400                Ok(())
401            }
402        }
403    }
404}