sync_ls/server/
lsp_srv.rs

1use lsp_types::{notification::Notification as Notif, request::Request as Req, *};
2
3use super::*;
4
5type PureHandler<S, T> = fn(srv: &mut S, args: T) -> LspResult<()>;
6
7impl<S: 'static> TypedLspClient<S> {
8    /// Sends a request to the client and registers a handler handled by the
9    /// service `S`.
10    pub fn send_lsp_request<R: Req>(
11        &self,
12        params: R::Params,
13        handler: impl FnOnce(&mut S, lsp::Response) + Send + Sync + 'static,
14    ) {
15        let caster = self.caster.clone();
16        self.client
17            .send_lsp_request_::<R>(params, move |s, resp| handler(caster(s), resp))
18    }
19}
20
21impl LspClient {
22    /// Sends a request to the client and registers a handler.
23    pub fn send_lsp_request_<R: Req>(
24        &self,
25        params: R::Params,
26        handler: impl FnOnce(&mut dyn Any, lsp::Response) + Send + Sync + 'static,
27    ) {
28        let mut req_queue = self.req_queue.lock();
29        let request = req_queue.outgoing.register(
30            R::METHOD.to_owned(),
31            params,
32            Box::new(|s, resp| handler(s, resp.try_into().unwrap())),
33        );
34
35        self.sender.send_message(request.into());
36    }
37
38    /// Completes an client2server request in the request queue.
39    pub fn respond_lsp(&self, response: lsp::Response) {
40        self.respond(response.id.clone(), response.into())
41    }
42
43    /// Sends a typed notification to the client.
44    pub fn send_notification<N: Notif>(&self, params: &N::Params) {
45        self.send_notification_(lsp::Notification::new(N::METHOD.to_owned(), params));
46    }
47
48    /// Sends an untyped notification to the client.
49    pub fn send_notification_(&self, notif: lsp::Notification) {
50        self.sender.send_message(notif.into());
51    }
52}
53
54impl<Args: Initializer> LsBuilder<LspMessage, Args>
55where
56    Args::S: 'static,
57{
58    /// Registers an raw event handler.
59    pub fn with_command_(
60        mut self,
61        cmd: &'static str,
62        handler: RawHandler<Args::S, Vec<JsonValue>>,
63    ) -> Self {
64        self.command_handlers.insert(cmd, Box::new(handler));
65        self
66    }
67
68    /// Registers an async command handler.
69    pub fn with_command<R: Serialize + 'static>(
70        mut self,
71        cmd: &'static str,
72        handler: AsyncHandler<Args::S, Vec<JsonValue>, R>,
73    ) -> Self {
74        self.command_handlers.insert(
75            cmd,
76            Box::new(move |s, req| erased_response(handler(s, req))),
77        );
78        self
79    }
80
81    /// Registers an untyped notification handler.
82    pub fn with_notification_<R: Notif>(
83        mut self,
84        handler: PureHandler<Args::S, JsonValue>,
85    ) -> Self {
86        self.notif_handlers.insert(R::METHOD, Box::new(handler));
87        self
88    }
89
90    /// Registers a typed notification handler.
91    pub fn with_notification<R: Notif>(mut self, handler: PureHandler<Args::S, R::Params>) -> Self {
92        self.notif_handlers.insert(
93            R::METHOD,
94            Box::new(move |s, req| handler(s, from_json(req)?)),
95        );
96        self
97    }
98
99    /// Registers a raw request handler that handlers a kind of untyped lsp
100    /// request.
101    pub fn with_raw_request<R: Req>(mut self, handler: RawHandler<Args::S, JsonValue>) -> Self {
102        self.req_handlers.insert(R::METHOD, Box::new(handler));
103        self
104    }
105
106    // todo: unsafe typed
107    /// Registers an raw request handler that handlers a kind of typed lsp
108    /// request.
109    pub fn with_request_<R: Req>(
110        mut self,
111        handler: fn(&mut Args::S, R::Params) -> ScheduleResult,
112    ) -> Self {
113        self.req_handlers.insert(
114            R::METHOD,
115            Box::new(move |s, req| handler(s, from_json(req)?)),
116        );
117        self
118    }
119
120    /// Registers a typed request handler.
121    pub fn with_request<R: Req>(
122        mut self,
123        handler: AsyncHandler<Args::S, R::Params, R::Result>,
124    ) -> Self {
125        self.req_handlers.insert(
126            R::METHOD,
127            Box::new(move |s, req| erased_response(handler(s, from_json(req)?))),
128        );
129        self
130    }
131}
132
133impl<Args: Initializer> LsDriver<LspMessage, Args>
134where
135    Args::S: 'static,
136{
137    /// Starts the language server on the given connection.
138    ///
139    /// If `is_replay` is true, the server will wait for all pending requests to
140    /// finish before exiting. This is useful for testing the language server.
141    ///
142    /// See [`transport::MirrorArgs`] for information about the record-replay
143    /// feature.
144    #[cfg(feature = "system")]
145    pub fn start(
146        &mut self,
147        inbox: TConnectionRx<LspMessage>,
148        is_replay: bool,
149    ) -> anyhow::Result<()> {
150        let res = self.start_(inbox);
151
152        if is_replay {
153            let client = self.client.clone();
154            let _ = std::thread::spawn(move || {
155                let since = tinymist_std::time::Instant::now();
156                let timeout = std::env::var("REPLAY_TIMEOUT")
157                    .ok()
158                    .and_then(|s| s.parse().ok())
159                    .unwrap_or(60);
160                client.handle.block_on(async {
161                    while client.has_pending_requests() {
162                        if since.elapsed().as_secs() > timeout {
163                            log::error!("replay timeout reached, {timeout}s");
164                            client.begin_panic();
165                        }
166
167                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
168                    }
169                })
170            })
171            .join();
172        }
173
174        res
175    }
176
177    /// Starts the language server on the given connection.
178    #[cfg(feature = "system")]
179    pub fn start_(&mut self, inbox: TConnectionRx<LspMessage>) -> anyhow::Result<()> {
180        use EventOrMessage::*;
181        // todo: follow what rust analyzer does
182        // Windows scheduler implements priority boosts: if thread waits for an
183        // event (like a condvar), and event fires, priority of the thread is
184        // temporary bumped. This optimization backfires in our case: each time
185        // the `main_loop` schedules a task to run on a threadpool, the
186        // worker threads gets a higher priority, and (on a machine with
187        // fewer cores) displaces the main loop! We work around this by
188        // marking the main loop as a higher-priority thread.
189        //
190        // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
191        // https://docs.microsoft.com/en-us/windows/win32/procthread/priority-boosts
192        // https://github.com/rust-lang/rust-analyzer/issues/2835
193        // #[cfg(windows)]
194        // unsafe {
195        //     use winapi::um::processthreadsapi::*;
196        //     let thread = GetCurrentThread();
197        //     let thread_priority_above_normal = 1;
198        //     SetThreadPriority(thread, thread_priority_above_normal);
199        // }
200
201        while let Ok(msg) = inbox.recv() {
202            const EXIT_METHOD: &str = notification::Exit::METHOD;
203            let loop_start = tinymist_std::time::now();
204            match msg {
205                Evt(event) => {
206                    let Some(event_handler) = self.events.get(&event.as_ref().type_id()) else {
207                        log::warn!("unhandled event: {:?}", event.as_ref().type_id());
208                        continue;
209                    };
210
211                    let s = match &mut self.state {
212                        State::Uninitialized(u) => ServiceState::Uninitialized(u.as_deref_mut()),
213                        State::Initializing(s) | State::Ready(s) => ServiceState::Ready(s),
214                        State::ShuttingDown => {
215                            log::warn!("server is shutting down");
216                            continue;
217                        }
218                    };
219
220                    event_handler(s, &self.client, event)?;
221                }
222                Msg(LspMessage::Request(req)) => {
223                    let client = self.client.clone();
224                    let req_id = req.id.clone();
225                    client.register_request(&req.method, &req_id, loop_start);
226                    let fut =
227                        client.schedule_tail(req_id, self.on_lsp_request(&req.method, req.params));
228                    self.client.handle.spawn(fut);
229                }
230                Msg(LspMessage::Notification(not)) => {
231                    let is_exit = not.method == EXIT_METHOD;
232                    let track_id = self
233                        .next_not_id
234                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
235                    self.client.hook.start_notification(track_id, &not.method);
236                    let result = self.on_notification(&not.method, not.params);
237                    self.client
238                        .hook
239                        .stop_notification(track_id, &not.method, loop_start, result);
240                    if is_exit {
241                        return Ok(());
242                    }
243                }
244                Msg(LspMessage::Response(resp)) => {
245                    let s = match &mut self.state {
246                        State::Ready(s) => s,
247                        _ => {
248                            log::warn!("server is not ready yet");
249                            continue;
250                        }
251                    };
252
253                    self.client.clone().complete_lsp_request(s, resp)
254                }
255            }
256        }
257
258        log::warn!("client exited without proper shutdown sequence");
259        Ok(())
260    }
261
262    /// Handles an incoming server event.
263    #[cfg(feature = "web")]
264    pub fn on_server_event(&mut self, event_id: u32) {
265        let evt = match &self.client.sender {
266            TransportHost::Js { events, .. } => events.lock().remove(&event_id),
267            TransportHost::System(_) => {
268                panic!("cannot send server event in system transport");
269            }
270        };
271
272        if let Some(event) = evt {
273            let Some(event_handler) = self.events.get(&event.as_ref().type_id()) else {
274                log::warn!("unhandled event: {:?}", event.as_ref().type_id());
275                return;
276            };
277
278            let s = match &mut self.state {
279                State::Uninitialized(u) => ServiceState::Uninitialized(u.as_deref_mut()),
280                State::Initializing(s) | State::Ready(s) => ServiceState::Ready(s),
281                State::ShuttingDown => {
282                    log::warn!("server is shutting down");
283                    return;
284                }
285            };
286
287            let res = event_handler(s, &self.client, event);
288            if let Err(err) = res {
289                log::error!("failed to handle server event {event_id}: {err}");
290            }
291        }
292    }
293
294    /// Registers and handles a request. This should only be called once per
295    /// incoming request.
296    pub fn on_lsp_request(&mut self, method: &str, params: JsonValue) -> ScheduleResult {
297        match (&mut self.state, method) {
298            (State::Uninitialized(args), request::Initialize::METHOD) => {
299                // todo: what will happen if the request cannot be deserialized?
300                let params = serde_json::from_value::<Args::I>(params);
301                match params {
302                    Ok(params) => {
303                        let args = args.take().expect("already initialized");
304                        let (s, res) = args.initialize(params);
305                        self.state = State::Initializing(s);
306                        res
307                    }
308                    Err(e) => just_result(Err(invalid_request(e))),
309                }
310            }
311            (State::Uninitialized(..) | State::Initializing(..), _) => {
312                just_result(Err(not_initialized()))
313            }
314            (_, request::Initialize::METHOD) => {
315                just_result(Err(invalid_request("server is already initialized")))
316            }
317            // todo: generalize this
318            (State::Ready(..), request::ExecuteCommand::METHOD) => self.on_execute_command(params),
319            (State::Ready(s), method) => 'serve_req: {
320                let is_shutdown = method == request::Shutdown::METHOD;
321
322                let Some(handler) = self.requests.get(method) else {
323                    log::warn!("unhandled lsp request: {method}");
324                    break 'serve_req just_result(Err(method_not_found()));
325                };
326
327                let resp = handler(s, params);
328
329                if is_shutdown {
330                    self.state = State::ShuttingDown;
331                }
332
333                resp
334            }
335            (State::ShuttingDown, _) => {
336                just_result(Err(invalid_request("server is shutting down")))
337            }
338        }
339    }
340
341    /// The entry point for the `workspace/executeCommand` request.
342    fn on_execute_command(&mut self, params: JsonValue) -> ScheduleResult {
343        let s = self.state.opt_mut().ok_or_else(not_initialized)?;
344
345        let params = from_value::<ExecuteCommandParams>(params)
346            .map_err(|e| invalid_params(e.to_string()))?;
347
348        let ExecuteCommandParams {
349            command, arguments, ..
350        } = params;
351
352        // todo: generalize this
353        if command == "tinymist.getResources" {
354            self.get_resources(arguments)
355        } else {
356            let Some(handler) = self.commands.get(command.as_str()) else {
357                log::error!("asked to execute unknown command: {command}");
358                return Err(method_not_found());
359            };
360            handler(s, arguments)
361        }
362    }
363
364    /// Handles an incoming notification.
365    pub fn on_notification(&mut self, method: &str, params: JsonValue) -> LspResult<()> {
366        let handle = |s, method: &str, params: JsonValue| {
367            let Some(handler) = self.notifications.get(method) else {
368                log::warn!("unhandled notification: {method}");
369                return Ok(());
370            };
371
372            handler(s, params)
373        };
374
375        match (&mut self.state, method) {
376            (state, notification::Initialized::METHOD) => {
377                let mut s = State::ShuttingDown;
378                std::mem::swap(state, &mut s);
379                match s {
380                    State::Initializing(s) => {
381                        *state = State::Ready(s);
382                    }
383                    _ => {
384                        std::mem::swap(state, &mut s);
385                    }
386                }
387
388                let s = match state {
389                    State::Ready(s) => s,
390                    _ => {
391                        log::warn!("server is not ready yet");
392                        return Ok(());
393                    }
394                };
395                handle(s, method, params)
396            }
397            (State::Ready(state), method) => handle(state, method, params),
398            // todo: whether it is safe to ignore notifications
399            (State::Uninitialized(..) | State::Initializing(..), method) => {
400                log::warn!("server is not ready yet, while received notification {method}");
401                Ok(())
402            }
403            (State::ShuttingDown, method) => {
404                log::warn!("server is shutting down, while received notification {method}");
405                Ok(())
406            }
407        }
408    }
409
410    /// Handles an incoming response.
411    pub fn on_lsp_response(&mut self, resp: lsp::Response) {
412        let client = self.client.clone();
413        let Some(s) = self.state_mut() else {
414            log::warn!("server is not ready yet, while received response");
415            return;
416        };
417
418        client.complete_lsp_request(s, resp)
419    }
420}