sync_ls/server/
dap_srv.rs

1use std::sync::atomic::Ordering;
2
3use dapts::IRequest;
4
5use super::*;
6
7impl<S: 'static> TypedLspClient<S> {
8    /// Sends a dap event to the client.
9    pub fn send_dap_event<E: dapts::IEvent>(&self, body: E::Body) {
10        let req_id = self.req_queue.lock().outgoing.alloc_request_id();
11
12        self.send_dap_event_(dap::Event::new(req_id as i64, E::EVENT.to_owned(), body));
13    }
14
15    /// Sends an untyped dap_event to the client.
16    pub fn send_dap_event_(&self, evt: dap::Event) {
17        self.sender.send_message(evt.into());
18    }
19}
20
21impl<Args: Initializer> LsBuilder<DapMessage, Args>
22where
23    Args::S: 'static,
24{
25    /// Registers an async command handler.
26    pub fn with_command<R: Serialize + 'static>(
27        mut self,
28        cmd: &'static str,
29        handler: AsyncHandler<Args::S, Vec<JsonValue>, R>,
30    ) -> Self {
31        self.command_handlers.insert(
32            cmd,
33            Box::new(move |s, req| erased_response(handler(s, req))),
34        );
35        self
36    }
37
38    /// Registers a raw request handler that handlers a kind of untyped lsp
39    /// request.
40    pub fn with_raw_request<R: IRequest>(
41        mut self,
42        handler: RawHandler<Args::S, JsonValue>,
43    ) -> Self {
44        self.req_handlers.insert(R::COMMAND, Box::new(handler));
45        self
46    }
47
48    // todo: unsafe typed
49    /// Registers an raw request handler that handlers a kind of typed lsp
50    /// request.
51    pub fn with_request_<R: IRequest>(
52        mut self,
53        handler: fn(&mut Args::S, R::Arguments) -> ScheduleResult,
54    ) -> Self {
55        self.req_handlers.insert(
56            R::COMMAND,
57            Box::new(move |s, req| handler(s, from_json(req)?)),
58        );
59        self
60    }
61
62    /// Registers a typed request handler.
63    pub fn with_request<R: IRequest>(
64        mut self,
65        handler: AsyncHandler<Args::S, R::Arguments, R::Response>,
66    ) -> Self {
67        self.req_handlers.insert(
68            R::COMMAND,
69            Box::new(move |s, req| erased_response(handler(s, from_json(req)?))),
70        );
71        self
72    }
73}
74
75#[cfg(feature = "system")]
76impl<Args: Initializer> LsDriver<DapMessage, Args>
77where
78    Args::S: 'static,
79{
80    /// Starts the debug adaptor on the given connection.
81    ///
82    /// If `is_replay` is true, the server will wait for all pending requests to
83    /// finish before exiting. This is useful for testing the language server.
84    ///
85    /// See [`transport::MirrorArgs`] for information about the record-replay
86    /// feature.
87    pub fn start(
88        &mut self,
89        inbox: TConnectionRx<DapMessage>,
90        is_replay: bool,
91    ) -> anyhow::Result<()> {
92        let res = self.start_(inbox);
93
94        if is_replay {
95            let client = self.client.clone();
96            let _ = std::thread::spawn(move || {
97                let since = tinymist_std::time::Instant::now();
98                let timeout = std::env::var("REPLAY_TIMEOUT")
99                    .ok()
100                    .and_then(|s| s.parse().ok())
101                    .unwrap_or(60);
102                client.handle.block_on(async {
103                    while client.has_pending_requests() {
104                        if since.elapsed().as_secs() > timeout {
105                            log::error!("replay timeout reached, {timeout}s");
106                            client.begin_panic();
107                        }
108
109                        tokio::time::sleep(tinymist_std::time::Duration::from_millis(10)).await;
110                    }
111                })
112            })
113            .join();
114        }
115
116        res
117    }
118
119    /// Starts the debug adaptor on the given connection.
120    pub fn start_(&mut self, inbox: TConnectionRx<DapMessage>) -> anyhow::Result<()> {
121        use EventOrMessage::*;
122
123        while let Ok(msg) = inbox.recv() {
124            let loop_start = tinymist_std::time::now();
125            match msg {
126                Evt(event) => {
127                    let Some(event_handler) = self.events.get(&event.as_ref().type_id()) else {
128                        log::warn!("unhandled event: {:?}", event.as_ref().type_id());
129                        continue;
130                    };
131
132                    let s = match &mut self.state {
133                        State::Uninitialized(u) => ServiceState::Uninitialized(u.as_deref_mut()),
134                        State::Initializing(s) | State::Ready(s) => ServiceState::Ready(s),
135                        State::ShuttingDown => {
136                            log::warn!("server is shutting down");
137                            continue;
138                        }
139                    };
140
141                    event_handler(s, &self.client, event)?;
142                }
143                Msg(DapMessage::Request(req)) => {
144                    let client = self.client.clone();
145                    let req_id = (req.seq as i32).into();
146                    client.register_request(&req.command, &req_id, loop_start);
147                    let fut = client.schedule_tail(req_id, self.on_request(req));
148                    self.client.handle.spawn(fut);
149                }
150                Msg(DapMessage::Event(not)) => {
151                    self.on_event(loop_start, not)?;
152                }
153                Msg(DapMessage::Response(resp)) => {
154                    let s = match &mut self.state {
155                        State::Ready(s) => s,
156                        _ => {
157                            log::warn!("server is not ready yet");
158                            continue;
159                        }
160                    };
161
162                    self.client.clone().complete_dap_request(s, resp)
163                }
164            }
165        }
166
167        log::warn!("client exited without proper shutdown sequence");
168        Ok(())
169    }
170
171    /// Registers and handles a request. This should only be called once per
172    /// incoming request.
173    fn on_request(&mut self, req: dap::Request) -> ScheduleResult {
174        match (&mut self.state, &*req.command) {
175            (State::Uninitialized(args), dapts::request::Initialize::COMMAND) => {
176                // todo: what will happen if the request cannot be deserialized?
177                let params = serde_json::from_value::<Args::I>(req.arguments);
178                match params {
179                    Ok(params) => {
180                        let args = args.take().expect("already initialized");
181                        let (s, res) = args.initialize(params);
182                        self.state = State::Ready(s);
183                        res
184                    }
185                    Err(e) => just_result(Err(invalid_request(e))),
186                }
187            }
188            // (state, dap::events::Initialized::METHOD) => {
189            //     let mut s = State::ShuttingDown;
190            //     std::mem::swap(state, &mut s);
191            //     match s {
192            //         State::Initializing(s) => {
193            //             *state = State::Ready(s);
194            //         }
195            //         _ => {
196            //             std::mem::swap(state, &mut s);
197            //         }
198            //     }
199
200            //     let s = match state {
201            //         State::Ready(s) => s,
202            //         _ => {
203            //             log::warn!("server is not ready yet");
204            //             return Ok(());
205            //         }
206            //     };
207            //     handle(s, not)
208            // }
209            (State::Uninitialized(..) | State::Initializing(..), _) => {
210                just_result(Err(not_initialized()))
211            }
212            (_, dapts::request::Initialize::COMMAND) => {
213                just_result(Err(invalid_request("server is already initialized")))
214            }
215            // todo: generalize this
216            // (State::Ready(..), request::ExecuteCommand::METHOD) => {
217            // reschedule!(self.on_execute_command(req))
218            // }
219            (State::Ready(s), _) => 'serve_req: {
220                let method = req.command.as_str();
221
222                let is_disconnect = method == dapts::request::Disconnect::COMMAND;
223
224                let Some(handler) = self.requests.get(method) else {
225                    log::warn!("unhandled dap request: {method}");
226                    break 'serve_req just_result(Err(method_not_found()));
227                };
228
229                let resp = handler(s, req.arguments);
230
231                if is_disconnect {
232                    self.state = State::ShuttingDown;
233                }
234
235                resp
236            }
237            (State::ShuttingDown, _) => {
238                just_result(Err(invalid_request("server is shutting down")))
239            }
240        }
241    }
242
243    /// Handles an incoming event.
244    fn on_event(&mut self, received_at: Time, not: dap::Event) -> anyhow::Result<()> {
245        let track_id = self.next_not_id.fetch_add(1, Ordering::Relaxed);
246        self.client.hook.start_notification(track_id, &not.event);
247        let handle = |s,
248                      dap::Event {
249                          seq: _,
250                          event,
251                          body,
252                      }: dap::Event| {
253            let Some(handler) = self.notifications.get(event.as_str()) else {
254                log::warn!("unhandled event: {event}");
255                return Ok(());
256            };
257
258            let result = handler(s, body);
259            self.client
260                .hook
261                .stop_notification(track_id, &event, received_at, result);
262
263            Ok(())
264        };
265
266        match (&mut self.state, &*not.event) {
267            (State::Ready(state), _) => handle(state, not),
268            // todo: whether it is safe to ignore events
269            (State::Uninitialized(..) | State::Initializing(..), method) => {
270                log::warn!("server is not ready yet, while received event {method}");
271                Ok(())
272            }
273            (State::ShuttingDown, method) => {
274                log::warn!("server is shutting down, while received event {method}");
275                Ok(())
276            }
277        }
278    }
279}