1use dapts::IRequest;
2
3use super::*;
4
5impl<S: 'static> TypedLspClient<S> {
6 pub fn send_dap_event<E: dapts::IEvent>(&self, body: E::Body) {
8 let req_id = self.req_queue.lock().outgoing.alloc_request_id();
9
10 self.send_dap_event_(dap::Event::new(req_id as i64, E::EVENT.to_owned(), body));
11 }
12
13 pub fn send_dap_event_(&self, evt: dap::Event) {
15 let method = &evt.event;
16 let Some(sender) = self.sender.upgrade() else {
17 log::warn!("failed to send dap event ({method}): connection closed");
18 return;
19 };
20 if let Err(res) = sender.lsp.send(evt.into()) {
21 log::warn!("failed to send dap event: {res:?}");
22 }
23 }
24}
25
26impl<Args: Initializer> LsBuilder<DapMessage, Args>
27where
28 Args::S: 'static,
29{
30 pub fn with_command_(
32 mut self,
33 cmd: &'static str,
34 handler: RawHandler<Args::S, Vec<JsonValue>>,
35 ) -> Self {
36 self.command_handlers.insert(cmd, raw_to_boxed(handler));
37 self
38 }
39
40 pub fn with_command<R: Serialize + 'static>(
42 mut self,
43 cmd: &'static str,
44 handler: AsyncHandler<Args::S, Vec<JsonValue>, R>,
45 ) -> Self {
46 self.command_handlers.insert(
47 cmd,
48 Box::new(move |s, client, req_id, req| client.schedule(req_id, handler(s, req))),
49 );
50 self
51 }
52
53 pub fn with_raw_request<R: dapts::IRequest>(
56 mut self,
57 handler: RawHandler<Args::S, JsonValue>,
58 ) -> Self {
59 self.req_handlers.insert(R::COMMAND, raw_to_boxed(handler));
60 self
61 }
62
63 pub fn with_request_<R: dapts::IRequest>(
67 mut self,
68 handler: fn(&mut Args::S, RequestId, R::Arguments) -> ScheduledResult,
69 ) -> Self {
70 self.req_handlers.insert(
71 R::COMMAND,
72 Box::new(move |s, _client, req_id, req| handler(s, req_id, from_json(req)?)),
73 );
74 self
75 }
76
77 pub fn with_request<R: dapts::IRequest>(
79 mut self,
80 handler: AsyncHandler<Args::S, R::Arguments, R::Response>,
81 ) -> Self {
82 self.req_handlers.insert(
83 R::COMMAND,
84 Box::new(move |s, client, req_id, req| {
85 client.schedule(req_id, handler(s, from_json(req)?))
86 }),
87 );
88 self
89 }
90}
91
92impl<Args: Initializer> LsDriver<DapMessage, Args>
93where
94 Args::S: 'static,
95{
96 pub fn start(
104 &mut self,
105 inbox: TConnectionRx<DapMessage>,
106 is_replay: bool,
107 ) -> anyhow::Result<()> {
108 let res = self.start_(inbox);
109
110 if is_replay {
111 let client = self.client.clone();
112 let _ = std::thread::spawn(move || {
113 let since = std::time::Instant::now();
114 let timeout = std::env::var("REPLAY_TIMEOUT")
115 .ok()
116 .and_then(|s| s.parse().ok())
117 .unwrap_or(60);
118 client.handle.block_on(async {
119 while client.has_pending_requests() {
120 if since.elapsed().as_secs() > timeout {
121 log::error!("replay timeout reached, {timeout}s");
122 client.begin_panic();
123 }
124
125 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
126 }
127 })
128 })
129 .join();
130 }
131
132 res
133 }
134
135 pub fn start_(&mut self, inbox: TConnectionRx<DapMessage>) -> anyhow::Result<()> {
137 use EventOrMessage::*;
138
139 while let Ok(msg) = inbox.recv() {
140 let loop_start = Instant::now();
141 match msg {
142 Evt(event) => {
143 let Some(event_handler) = self.events.get(&event.as_ref().type_id()) else {
144 log::warn!("unhandled event: {:?}", event.as_ref().type_id());
145 continue;
146 };
147
148 let s = match &mut self.state {
149 State::Uninitialized(u) => ServiceState::Uninitialized(u.as_deref_mut()),
150 State::Initializing(s) | State::Ready(s) => ServiceState::Ready(s),
151 State::ShuttingDown => {
152 log::warn!("server is shutting down");
153 continue;
154 }
155 };
156
157 event_handler(s, &self.client, event)?;
158 }
159 Msg(DapMessage::Request(req)) => self.on_request(loop_start, req),
160 Msg(DapMessage::Event(not)) => {
161 self.on_event(loop_start, not)?;
162 }
163 Msg(DapMessage::Response(resp)) => {
164 let s = match &mut self.state {
165 State::Ready(s) => s,
166 _ => {
167 log::warn!("server is not ready yet");
168 continue;
169 }
170 };
171
172 self.client.clone().complete_dap_request(s, resp)
173 }
174 }
175 }
176
177 log::warn!("client exited without proper shutdown sequence");
178 Ok(())
179 }
180
181 fn on_request(&mut self, request_received: Instant, req: dap::Request) {
184 let req_id = (req.seq as i32).into();
185 self.client
186 .register_request(&req.command, &req_id, request_received);
187
188 let resp = match (&mut self.state, &*req.command) {
189 (State::Uninitialized(args), dapts::request::Initialize::COMMAND) => {
190 let params = serde_json::from_value::<Args::I>(req.arguments);
192 match params {
193 Ok(params) => {
194 let args = args.take().expect("already initialized");
195 let (s, res) = args.initialize(params);
196 self.state = State::Ready(s);
197 res
198 }
199 Err(e) => just_result(Err(invalid_request(e))),
200 }
201 }
202 (State::Uninitialized(..) | State::Initializing(..), _) => {
224 just_result(Err(not_initialized()))
225 }
226 (_, dapts::request::Initialize::COMMAND) => {
227 just_result(Err(invalid_request("server is already initialized")))
228 }
229 (State::Ready(s), _) => 'serve_req: {
234 let method = req.command.as_str();
235
236 let is_disconnect = method == dapts::request::Disconnect::COMMAND;
237
238 let Some(handler) = self.requests.get(method) else {
239 log::warn!("unhandled dap request: {method}");
240 break 'serve_req just_result(Err(method_not_found()));
241 };
242
243 let result = handler(s, &self.client, req_id.clone(), req.arguments);
244 self.client.schedule_tail(req_id, result);
245
246 if is_disconnect {
247 self.state = State::ShuttingDown;
248 }
249
250 return;
251 }
252 (State::ShuttingDown, _) => {
253 just_result(Err(invalid_request("server is shutting down")))
254 }
255 };
256
257 let result = self.client.schedule(req_id.clone(), resp);
258 self.client.schedule_tail(req_id, result);
259 }
260
261 fn on_event(&mut self, received_at: Instant, not: dap::Event) -> anyhow::Result<()> {
263 self.client.hook.start_notification(¬.event);
264 let handle = |s,
265 dap::Event {
266 seq: _,
267 event,
268 body,
269 }: dap::Event| {
270 let Some(handler) = self.notifications.get(event.as_str()) else {
271 log::warn!("unhandled event: {event}");
272 return Ok(());
273 };
274
275 let result = handler(s, body);
276 self.client
277 .hook
278 .stop_notification(&event, received_at, result);
279
280 Ok(())
281 };
282
283 match (&mut self.state, &*not.event) {
284 (State::Ready(state), _) => handle(state, not),
285 (State::Uninitialized(..) | State::Initializing(..), method) => {
287 log::warn!("server is not ready yet, while received event {method}");
288 Ok(())
289 }
290 (State::ShuttingDown, method) => {
291 log::warn!("server is shutting down, while received event {method}");
292 Ok(())
293 }
294 }
295 }
296}