sync_ls/server/
dap_srv.rs

1use dapts::IRequest;
2
3use super::*;
4
5impl<S: 'static> TypedLspClient<S> {
6    /// Sends a dap event to the client.
7    pub fn send_dap_event<E: dapts::IEvent>(&self, body: E::Body) {
8        let req_id = self.req_queue.lock().outgoing.alloc_request_id();
9
10        self.send_dap_event_(dap::Event::new(req_id as i64, E::EVENT.to_owned(), body));
11    }
12
13    /// Sends an untyped dap_event to the client.
14    pub fn send_dap_event_(&self, evt: dap::Event) {
15        let method = &evt.event;
16        let Some(sender) = self.sender.upgrade() else {
17            log::warn!("failed to send dap event ({method}): connection closed");
18            return;
19        };
20        if let Err(res) = sender.lsp.send(evt.into()) {
21            log::warn!("failed to send dap event: {res:?}");
22        }
23    }
24}
25
26impl<Args: Initializer> LsBuilder<DapMessage, Args>
27where
28    Args::S: 'static,
29{
30    /// Registers an raw event handler.
31    pub fn with_command_(
32        mut self,
33        cmd: &'static str,
34        handler: RawHandler<Args::S, Vec<JsonValue>>,
35    ) -> Self {
36        self.command_handlers.insert(cmd, raw_to_boxed(handler));
37        self
38    }
39
40    /// Registers an async command handler.
41    pub fn with_command<R: Serialize + 'static>(
42        mut self,
43        cmd: &'static str,
44        handler: AsyncHandler<Args::S, Vec<JsonValue>, R>,
45    ) -> Self {
46        self.command_handlers.insert(
47            cmd,
48            Box::new(move |s, client, req_id, req| client.schedule(req_id, handler(s, req))),
49        );
50        self
51    }
52
53    /// Registers a raw request handler that handlers a kind of untyped lsp
54    /// request.
55    pub fn with_raw_request<R: dapts::IRequest>(
56        mut self,
57        handler: RawHandler<Args::S, JsonValue>,
58    ) -> Self {
59        self.req_handlers.insert(R::COMMAND, raw_to_boxed(handler));
60        self
61    }
62
63    // todo: unsafe typed
64    /// Registers an raw request handler that handlers a kind of typed lsp
65    /// request.
66    pub fn with_request_<R: dapts::IRequest>(
67        mut self,
68        handler: fn(&mut Args::S, RequestId, R::Arguments) -> ScheduledResult,
69    ) -> Self {
70        self.req_handlers.insert(
71            R::COMMAND,
72            Box::new(move |s, _client, req_id, req| handler(s, req_id, from_json(req)?)),
73        );
74        self
75    }
76
77    /// Registers a typed request handler.
78    pub fn with_request<R: dapts::IRequest>(
79        mut self,
80        handler: AsyncHandler<Args::S, R::Arguments, R::Response>,
81    ) -> Self {
82        self.req_handlers.insert(
83            R::COMMAND,
84            Box::new(move |s, client, req_id, req| {
85                client.schedule(req_id, handler(s, from_json(req)?))
86            }),
87        );
88        self
89    }
90}
91
92impl<Args: Initializer> LsDriver<DapMessage, Args>
93where
94    Args::S: 'static,
95{
96    /// Starts the debug adaptor on the given connection.
97    ///
98    /// If `is_replay` is true, the server will wait for all pending requests to
99    /// finish before exiting. This is useful for testing the language server.
100    ///
101    /// See [`transport::MirrorArgs`] for information about the record-replay
102    /// feature.
103    pub fn start(
104        &mut self,
105        inbox: TConnectionRx<DapMessage>,
106        is_replay: bool,
107    ) -> anyhow::Result<()> {
108        let res = self.start_(inbox);
109
110        if is_replay {
111            let client = self.client.clone();
112            let _ = std::thread::spawn(move || {
113                let since = std::time::Instant::now();
114                let timeout = std::env::var("REPLAY_TIMEOUT")
115                    .ok()
116                    .and_then(|s| s.parse().ok())
117                    .unwrap_or(60);
118                client.handle.block_on(async {
119                    while client.has_pending_requests() {
120                        if since.elapsed().as_secs() > timeout {
121                            log::error!("replay timeout reached, {timeout}s");
122                            client.begin_panic();
123                        }
124
125                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
126                    }
127                })
128            })
129            .join();
130        }
131
132        res
133    }
134
135    /// Starts the debug adaptor on the given connection.
136    pub fn start_(&mut self, inbox: TConnectionRx<DapMessage>) -> anyhow::Result<()> {
137        use EventOrMessage::*;
138
139        while let Ok(msg) = inbox.recv() {
140            let loop_start = Instant::now();
141            match msg {
142                Evt(event) => {
143                    let Some(event_handler) = self.events.get(&event.as_ref().type_id()) else {
144                        log::warn!("unhandled event: {:?}", event.as_ref().type_id());
145                        continue;
146                    };
147
148                    let s = match &mut self.state {
149                        State::Uninitialized(u) => ServiceState::Uninitialized(u.as_deref_mut()),
150                        State::Initializing(s) | State::Ready(s) => ServiceState::Ready(s),
151                        State::ShuttingDown => {
152                            log::warn!("server is shutting down");
153                            continue;
154                        }
155                    };
156
157                    event_handler(s, &self.client, event)?;
158                }
159                Msg(DapMessage::Request(req)) => self.on_request(loop_start, req),
160                Msg(DapMessage::Event(not)) => {
161                    self.on_event(loop_start, not)?;
162                }
163                Msg(DapMessage::Response(resp)) => {
164                    let s = match &mut self.state {
165                        State::Ready(s) => s,
166                        _ => {
167                            log::warn!("server is not ready yet");
168                            continue;
169                        }
170                    };
171
172                    self.client.clone().complete_dap_request(s, resp)
173                }
174            }
175        }
176
177        log::warn!("client exited without proper shutdown sequence");
178        Ok(())
179    }
180
181    /// Registers and handles a request. This should only be called once per
182    /// incoming request.
183    fn on_request(&mut self, request_received: Instant, req: dap::Request) {
184        let req_id = (req.seq as i32).into();
185        self.client
186            .register_request(&req.command, &req_id, request_received);
187
188        let resp = match (&mut self.state, &*req.command) {
189            (State::Uninitialized(args), dapts::request::Initialize::COMMAND) => {
190                // todo: what will happen if the request cannot be deserialized?
191                let params = serde_json::from_value::<Args::I>(req.arguments);
192                match params {
193                    Ok(params) => {
194                        let args = args.take().expect("already initialized");
195                        let (s, res) = args.initialize(params);
196                        self.state = State::Ready(s);
197                        res
198                    }
199                    Err(e) => just_result(Err(invalid_request(e))),
200                }
201            }
202            // (state, dap::events::Initialized::METHOD) => {
203            //     let mut s = State::ShuttingDown;
204            //     std::mem::swap(state, &mut s);
205            //     match s {
206            //         State::Initializing(s) => {
207            //             *state = State::Ready(s);
208            //         }
209            //         _ => {
210            //             std::mem::swap(state, &mut s);
211            //         }
212            //     }
213
214            //     let s = match state {
215            //         State::Ready(s) => s,
216            //         _ => {
217            //             log::warn!("server is not ready yet");
218            //             return Ok(());
219            //         }
220            //     };
221            //     handle(s, not)
222            // }
223            (State::Uninitialized(..) | State::Initializing(..), _) => {
224                just_result(Err(not_initialized()))
225            }
226            (_, dapts::request::Initialize::COMMAND) => {
227                just_result(Err(invalid_request("server is already initialized")))
228            }
229            // todo: generalize this
230            // (State::Ready(..), request::ExecuteCommand::METHOD) => {
231            // reschedule!(self.on_execute_command(req))
232            // }
233            (State::Ready(s), _) => 'serve_req: {
234                let method = req.command.as_str();
235
236                let is_disconnect = method == dapts::request::Disconnect::COMMAND;
237
238                let Some(handler) = self.requests.get(method) else {
239                    log::warn!("unhandled dap request: {method}");
240                    break 'serve_req just_result(Err(method_not_found()));
241                };
242
243                let result = handler(s, &self.client, req_id.clone(), req.arguments);
244                self.client.schedule_tail(req_id, result);
245
246                if is_disconnect {
247                    self.state = State::ShuttingDown;
248                }
249
250                return;
251            }
252            (State::ShuttingDown, _) => {
253                just_result(Err(invalid_request("server is shutting down")))
254            }
255        };
256
257        let result = self.client.schedule(req_id.clone(), resp);
258        self.client.schedule_tail(req_id, result);
259    }
260
261    /// Handles an incoming event.
262    fn on_event(&mut self, received_at: Instant, not: dap::Event) -> anyhow::Result<()> {
263        self.client.start_notification(&not.event);
264        let handle = |s,
265                      dap::Event {
266                          seq: _,
267                          event,
268                          body,
269                      }: dap::Event| {
270            let Some(handler) = self.notifications.get(event.as_str()) else {
271                log::warn!("unhandled event: {event}");
272                return Ok(());
273            };
274
275            let result = handler(s, body);
276            self.client.stop_notification(&event, received_at, result);
277
278            Ok(())
279        };
280
281        match (&mut self.state, &*not.event) {
282            (State::Ready(state), _) => handle(state, not),
283            // todo: whether it is safe to ignore events
284            (State::Uninitialized(..) | State::Initializing(..), method) => {
285                log::warn!("server is not ready yet, while received event {method}");
286                Ok(())
287            }
288            (State::ShuttingDown, method) => {
289                log::warn!("server is shutting down, while received event {method}");
290                Ok(())
291            }
292        }
293    }
294}