sync_ls/server/
dap_srv.rs

1#[cfg(feature = "system")]
2use std::sync::atomic::Ordering;
3
4use dapts::IRequest;
5
6use super::*;
7
8impl<S: 'static> TypedLspClient<S> {
9    /// Sends a dap event to the client.
10    pub fn send_dap_event<E: dapts::IEvent>(&self, body: E::Body) {
11        let req_id = self.req_queue.lock().outgoing.alloc_request_id();
12
13        self.send_dap_event_(dap::Event::new(req_id as i64, E::EVENT.to_owned(), body));
14    }
15
16    /// Sends an untyped dap_event to the client.
17    pub fn send_dap_event_(&self, evt: dap::Event) {
18        self.sender.send_message(evt.into());
19    }
20}
21
22impl<Args: Initializer> LsBuilder<DapMessage, Args>
23where
24    Args::S: 'static,
25{
26    /// Registers an async command handler.
27    pub fn with_command<R: Serialize + 'static>(
28        mut self,
29        cmd: &'static str,
30        handler: AsyncHandler<Args::S, Vec<JsonValue>, R>,
31    ) -> Self {
32        self.command_handlers.insert(
33            cmd,
34            Box::new(move |s, req| erased_response(handler(s, req))),
35        );
36        self
37    }
38
39    /// Registers a raw request handler that handlers a kind of untyped lsp
40    /// request.
41    pub fn with_raw_request<R: IRequest>(
42        mut self,
43        handler: RawHandler<Args::S, JsonValue>,
44    ) -> Self {
45        self.req_handlers.insert(R::COMMAND, Box::new(handler));
46        self
47    }
48
49    // todo: unsafe typed
50    /// Registers an raw request handler that handlers a kind of typed lsp
51    /// request.
52    pub fn with_request_<R: IRequest>(
53        mut self,
54        handler: fn(&mut Args::S, R::Arguments) -> ScheduleResult,
55    ) -> Self {
56        self.req_handlers.insert(
57            R::COMMAND,
58            Box::new(move |s, req| handler(s, from_json(req)?)),
59        );
60        self
61    }
62
63    /// Registers a typed request handler.
64    pub fn with_request<R: IRequest>(
65        mut self,
66        handler: AsyncHandler<Args::S, R::Arguments, R::Response>,
67    ) -> Self {
68        self.req_handlers.insert(
69            R::COMMAND,
70            Box::new(move |s, req| erased_response(handler(s, from_json(req)?))),
71        );
72        self
73    }
74}
75
76#[cfg(feature = "system")]
77impl<Args: Initializer> LsDriver<DapMessage, Args>
78where
79    Args::S: 'static,
80{
81    /// Starts the debug adaptor on the given connection.
82    ///
83    /// If `is_replay` is true, the server will wait for all pending requests to
84    /// finish before exiting. This is useful for testing the language server.
85    ///
86    /// See [`transport::MirrorArgs`] for information about the record-replay
87    /// feature.
88    pub fn start(
89        &mut self,
90        inbox: TConnectionRx<DapMessage>,
91        is_replay: bool,
92    ) -> anyhow::Result<()> {
93        let res = self.start_(inbox);
94
95        if is_replay {
96            let client = self.client.clone();
97            let _ = std::thread::spawn(move || {
98                let since = tinymist_std::time::Instant::now();
99                let timeout = std::env::var("REPLAY_TIMEOUT")
100                    .ok()
101                    .and_then(|s| s.parse().ok())
102                    .unwrap_or(60);
103                client.handle.block_on(async {
104                    while client.has_pending_requests() {
105                        if since.elapsed().as_secs() > timeout {
106                            log::error!("replay timeout reached, {timeout}s");
107                            client.begin_panic();
108                        }
109
110                        tokio::time::sleep(tinymist_std::time::Duration::from_millis(10)).await;
111                    }
112                })
113            })
114            .join();
115        }
116
117        res
118    }
119
120    /// Starts the debug adaptor on the given connection.
121    pub fn start_(&mut self, inbox: TConnectionRx<DapMessage>) -> anyhow::Result<()> {
122        use EventOrMessage::*;
123
124        while let Ok(msg) = inbox.recv() {
125            let loop_start = tinymist_std::time::now();
126            match msg {
127                Evt(event) => {
128                    let Some(event_handler) = self.events.get(&event.as_ref().type_id()) else {
129                        log::warn!("unhandled event: {:?}", event.as_ref().type_id());
130                        continue;
131                    };
132
133                    let s = match &mut self.state {
134                        State::Uninitialized(u) => ServiceState::Uninitialized(u.as_deref_mut()),
135                        State::Initializing(s) | State::Ready(s) => ServiceState::Ready(s),
136                        State::ShuttingDown => {
137                            log::warn!("server is shutting down");
138                            continue;
139                        }
140                    };
141
142                    event_handler(s, &self.client, event)?;
143                }
144                Msg(DapMessage::Request(req)) => {
145                    let client = self.client.clone();
146                    let req_id = (req.seq as i32).into();
147                    client.register_request(&req.command, &req_id, loop_start);
148                    let fut = client.schedule_tail(req_id, self.on_request(req));
149                    self.client.handle.spawn(fut);
150                }
151                Msg(DapMessage::Event(not)) => {
152                    self.on_event(loop_start, not)?;
153                }
154                Msg(DapMessage::Response(resp)) => {
155                    let s = match &mut self.state {
156                        State::Ready(s) => s,
157                        _ => {
158                            log::warn!("server is not ready yet");
159                            continue;
160                        }
161                    };
162
163                    self.client.clone().complete_dap_request(s, resp)
164                }
165            }
166        }
167
168        log::warn!("client exited without proper shutdown sequence");
169        Ok(())
170    }
171
172    /// Registers and handles a request. This should only be called once per
173    /// incoming request.
174    fn on_request(&mut self, req: dap::Request) -> ScheduleResult {
175        match (&mut self.state, &*req.command) {
176            (State::Uninitialized(args), dapts::request::Initialize::COMMAND) => {
177                // todo: what will happen if the request cannot be deserialized?
178                let params = serde_json::from_value::<Args::I>(req.arguments);
179                match params {
180                    Ok(params) => {
181                        let args = args.take().expect("already initialized");
182                        let (s, res) = args.initialize(params);
183                        self.state = State::Ready(s);
184                        res
185                    }
186                    Err(e) => just_result(Err(invalid_request(e))),
187                }
188            }
189            // (state, dap::events::Initialized::METHOD) => {
190            //     let mut s = State::ShuttingDown;
191            //     std::mem::swap(state, &mut s);
192            //     match s {
193            //         State::Initializing(s) => {
194            //             *state = State::Ready(s);
195            //         }
196            //         _ => {
197            //             std::mem::swap(state, &mut s);
198            //         }
199            //     }
200
201            //     let s = match state {
202            //         State::Ready(s) => s,
203            //         _ => {
204            //             log::warn!("server is not ready yet");
205            //             return Ok(());
206            //         }
207            //     };
208            //     handle(s, not)
209            // }
210            (State::Uninitialized(..) | State::Initializing(..), _) => {
211                just_result(Err(not_initialized()))
212            }
213            (_, dapts::request::Initialize::COMMAND) => {
214                just_result(Err(invalid_request("server is already initialized")))
215            }
216            // todo: generalize this
217            // (State::Ready(..), request::ExecuteCommand::METHOD) => {
218            // reschedule!(self.on_execute_command(req))
219            // }
220            (State::Ready(s), _) => 'serve_req: {
221                let method = req.command.as_str();
222
223                let is_disconnect = method == dapts::request::Disconnect::COMMAND;
224
225                let Some(handler) = self.requests.get(method) else {
226                    log::warn!("unhandled dap request: {method}");
227                    break 'serve_req just_result(Err(method_not_found()));
228                };
229
230                let resp = handler(s, req.arguments);
231
232                if is_disconnect {
233                    self.state = State::ShuttingDown;
234                }
235
236                resp
237            }
238            (State::ShuttingDown, _) => {
239                just_result(Err(invalid_request("server is shutting down")))
240            }
241        }
242    }
243
244    /// Handles an incoming event.
245    fn on_event(&mut self, received_at: Time, not: dap::Event) -> anyhow::Result<()> {
246        let track_id = self.next_not_id.fetch_add(1, Ordering::Relaxed);
247        self.client.hook.start_notification(track_id, &not.event);
248        let handle = |s,
249                      dap::Event {
250                          seq: _,
251                          event,
252                          body,
253                      }: dap::Event| {
254            let Some(handler) = self.notifications.get(event.as_str()) else {
255                log::warn!("unhandled event: {event}");
256                return Ok(());
257            };
258
259            let result = handler(s, body);
260            self.client
261                .hook
262                .stop_notification(track_id, &event, received_at, result);
263
264            Ok(())
265        };
266
267        match (&mut self.state, &*not.event) {
268            (State::Ready(state), _) => handle(state, not),
269            // todo: whether it is safe to ignore events
270            (State::Uninitialized(..) | State::Initializing(..), method) => {
271                log::warn!("server is not ready yet, while received event {method}");
272                Ok(())
273            }
274            (State::ShuttingDown, method) => {
275                log::warn!("server is shutting down, while received event {method}");
276                Ok(())
277            }
278        }
279    }
280}