sync_ls/server/
dap_srv.rs1use std::sync::atomic::Ordering;
2
3use dapts::IRequest;
4
5use super::*;
6
7impl<S: 'static> TypedLspClient<S> {
8 pub fn send_dap_event<E: dapts::IEvent>(&self, body: E::Body) {
10 let req_id = self.req_queue.lock().outgoing.alloc_request_id();
11
12 self.send_dap_event_(dap::Event::new(req_id as i64, E::EVENT.to_owned(), body));
13 }
14
15 pub fn send_dap_event_(&self, evt: dap::Event) {
17 self.sender.send_message(evt.into());
18 }
19}
20
21impl<Args: Initializer> LsBuilder<DapMessage, Args>
22where
23 Args::S: 'static,
24{
25 pub fn with_command<R: Serialize + 'static>(
27 mut self,
28 cmd: &'static str,
29 handler: AsyncHandler<Args::S, Vec<JsonValue>, R>,
30 ) -> Self {
31 self.command_handlers.insert(
32 cmd,
33 Box::new(move |s, req| erased_response(handler(s, req))),
34 );
35 self
36 }
37
38 pub fn with_raw_request<R: IRequest>(
41 mut self,
42 handler: RawHandler<Args::S, JsonValue>,
43 ) -> Self {
44 self.req_handlers.insert(R::COMMAND, Box::new(handler));
45 self
46 }
47
48 pub fn with_request_<R: IRequest>(
52 mut self,
53 handler: fn(&mut Args::S, R::Arguments) -> ScheduleResult,
54 ) -> Self {
55 self.req_handlers.insert(
56 R::COMMAND,
57 Box::new(move |s, req| handler(s, from_json(req)?)),
58 );
59 self
60 }
61
62 pub fn with_request<R: IRequest>(
64 mut self,
65 handler: AsyncHandler<Args::S, R::Arguments, R::Response>,
66 ) -> Self {
67 self.req_handlers.insert(
68 R::COMMAND,
69 Box::new(move |s, req| erased_response(handler(s, from_json(req)?))),
70 );
71 self
72 }
73}
74
75#[cfg(feature = "system")]
76impl<Args: Initializer> LsDriver<DapMessage, Args>
77where
78 Args::S: 'static,
79{
80 pub fn start(
88 &mut self,
89 inbox: TConnectionRx<DapMessage>,
90 is_replay: bool,
91 ) -> anyhow::Result<()> {
92 let res = self.start_(inbox);
93
94 if is_replay {
95 let client = self.client.clone();
96 let _ = std::thread::spawn(move || {
97 let since = tinymist_std::time::Instant::now();
98 let timeout = std::env::var("REPLAY_TIMEOUT")
99 .ok()
100 .and_then(|s| s.parse().ok())
101 .unwrap_or(60);
102 client.handle.block_on(async {
103 while client.has_pending_requests() {
104 if since.elapsed().as_secs() > timeout {
105 log::error!("replay timeout reached, {timeout}s");
106 client.begin_panic();
107 }
108
109 tokio::time::sleep(tinymist_std::time::Duration::from_millis(10)).await;
110 }
111 })
112 })
113 .join();
114 }
115
116 res
117 }
118
119 pub fn start_(&mut self, inbox: TConnectionRx<DapMessage>) -> anyhow::Result<()> {
121 use EventOrMessage::*;
122
123 while let Ok(msg) = inbox.recv() {
124 let loop_start = tinymist_std::time::now();
125 match msg {
126 Evt(event) => {
127 let Some(event_handler) = self.events.get(&event.as_ref().type_id()) else {
128 log::warn!("unhandled event: {:?}", event.as_ref().type_id());
129 continue;
130 };
131
132 let s = match &mut self.state {
133 State::Uninitialized(u) => ServiceState::Uninitialized(u.as_deref_mut()),
134 State::Initializing(s) | State::Ready(s) => ServiceState::Ready(s),
135 State::ShuttingDown => {
136 log::warn!("server is shutting down");
137 continue;
138 }
139 };
140
141 event_handler(s, &self.client, event)?;
142 }
143 Msg(DapMessage::Request(req)) => {
144 let client = self.client.clone();
145 let req_id = (req.seq as i32).into();
146 client.register_request(&req.command, &req_id, loop_start);
147 let fut = client.schedule_tail(req_id, self.on_request(req));
148 self.client.handle.spawn(fut);
149 }
150 Msg(DapMessage::Event(not)) => {
151 self.on_event(loop_start, not)?;
152 }
153 Msg(DapMessage::Response(resp)) => {
154 let s = match &mut self.state {
155 State::Ready(s) => s,
156 _ => {
157 log::warn!("server is not ready yet");
158 continue;
159 }
160 };
161
162 self.client.clone().complete_dap_request(s, resp)
163 }
164 }
165 }
166
167 log::warn!("client exited without proper shutdown sequence");
168 Ok(())
169 }
170
171 fn on_request(&mut self, req: dap::Request) -> ScheduleResult {
174 match (&mut self.state, &*req.command) {
175 (State::Uninitialized(args), dapts::request::Initialize::COMMAND) => {
176 let params = serde_json::from_value::<Args::I>(req.arguments);
178 match params {
179 Ok(params) => {
180 let args = args.take().expect("already initialized");
181 let (s, res) = args.initialize(params);
182 self.state = State::Ready(s);
183 res
184 }
185 Err(e) => just_result(Err(invalid_request(e))),
186 }
187 }
188 (State::Uninitialized(..) | State::Initializing(..), _) => {
210 just_result(Err(not_initialized()))
211 }
212 (_, dapts::request::Initialize::COMMAND) => {
213 just_result(Err(invalid_request("server is already initialized")))
214 }
215 (State::Ready(s), _) => 'serve_req: {
220 let method = req.command.as_str();
221
222 let is_disconnect = method == dapts::request::Disconnect::COMMAND;
223
224 let Some(handler) = self.requests.get(method) else {
225 log::warn!("unhandled dap request: {method}");
226 break 'serve_req just_result(Err(method_not_found()));
227 };
228
229 let resp = handler(s, req.arguments);
230
231 if is_disconnect {
232 self.state = State::ShuttingDown;
233 }
234
235 resp
236 }
237 (State::ShuttingDown, _) => {
238 just_result(Err(invalid_request("server is shutting down")))
239 }
240 }
241 }
242
243 fn on_event(&mut self, received_at: Time, not: dap::Event) -> anyhow::Result<()> {
245 let track_id = self.next_not_id.fetch_add(1, Ordering::Relaxed);
246 self.client.hook.start_notification(track_id, ¬.event);
247 let handle = |s,
248 dap::Event {
249 seq: _,
250 event,
251 body,
252 }: dap::Event| {
253 let Some(handler) = self.notifications.get(event.as_str()) else {
254 log::warn!("unhandled event: {event}");
255 return Ok(());
256 };
257
258 let result = handler(s, body);
259 self.client
260 .hook
261 .stop_notification(track_id, &event, received_at, result);
262
263 Ok(())
264 };
265
266 match (&mut self.state, &*not.event) {
267 (State::Ready(state), _) => handle(state, not),
268 (State::Uninitialized(..) | State::Initializing(..), method) => {
270 log::warn!("server is not ready yet, while received event {method}");
271 Ok(())
272 }
273 (State::ShuttingDown, method) => {
274 log::warn!("server is shutting down, while received event {method}");
275 Ok(())
276 }
277 }
278 }
279}