sync_ls/server/
dap_srv.rs1#[cfg(feature = "system")]
2use std::sync::atomic::Ordering;
3
4use dapts::IRequest;
5
6use super::*;
7
8impl<S: 'static> TypedLspClient<S> {
9 pub fn send_dap_event<E: dapts::IEvent>(&self, body: E::Body) {
11 let req_id = self.req_queue.lock().outgoing.alloc_request_id();
12
13 self.send_dap_event_(dap::Event::new(req_id as i64, E::EVENT.to_owned(), body));
14 }
15
16 pub fn send_dap_event_(&self, evt: dap::Event) {
18 self.sender.send_message(evt.into());
19 }
20}
21
22impl<Args: Initializer> LsBuilder<DapMessage, Args>
23where
24 Args::S: 'static,
25{
26 pub fn with_command<R: Serialize + 'static>(
28 mut self,
29 cmd: &'static str,
30 handler: AsyncHandler<Args::S, Vec<JsonValue>, R>,
31 ) -> Self {
32 self.command_handlers.insert(
33 cmd,
34 Box::new(move |s, req| erased_response(handler(s, req))),
35 );
36 self
37 }
38
39 pub fn with_raw_request<R: IRequest>(
42 mut self,
43 handler: RawHandler<Args::S, JsonValue>,
44 ) -> Self {
45 self.req_handlers.insert(R::COMMAND, Box::new(handler));
46 self
47 }
48
49 pub fn with_request_<R: IRequest>(
53 mut self,
54 handler: fn(&mut Args::S, R::Arguments) -> ScheduleResult,
55 ) -> Self {
56 self.req_handlers.insert(
57 R::COMMAND,
58 Box::new(move |s, req| handler(s, from_json(req)?)),
59 );
60 self
61 }
62
63 pub fn with_request<R: IRequest>(
65 mut self,
66 handler: AsyncHandler<Args::S, R::Arguments, R::Response>,
67 ) -> Self {
68 self.req_handlers.insert(
69 R::COMMAND,
70 Box::new(move |s, req| erased_response(handler(s, from_json(req)?))),
71 );
72 self
73 }
74}
75
76#[cfg(feature = "system")]
77impl<Args: Initializer> LsDriver<DapMessage, Args>
78where
79 Args::S: 'static,
80{
81 pub fn start(
89 &mut self,
90 inbox: TConnectionRx<DapMessage>,
91 is_replay: bool,
92 ) -> anyhow::Result<()> {
93 let res = self.start_(inbox);
94
95 if is_replay {
96 let client = self.client.clone();
97 let _ = std::thread::spawn(move || {
98 let since = tinymist_std::time::Instant::now();
99 let timeout = std::env::var("REPLAY_TIMEOUT")
100 .ok()
101 .and_then(|s| s.parse().ok())
102 .unwrap_or(60);
103 client.handle.block_on(async {
104 while client.has_pending_requests() {
105 if since.elapsed().as_secs() > timeout {
106 log::error!("replay timeout reached, {timeout}s");
107 client.begin_panic();
108 }
109
110 tokio::time::sleep(tinymist_std::time::Duration::from_millis(10)).await;
111 }
112 })
113 })
114 .join();
115 }
116
117 res
118 }
119
120 pub fn start_(&mut self, inbox: TConnectionRx<DapMessage>) -> anyhow::Result<()> {
122 use EventOrMessage::*;
123
124 while let Ok(msg) = inbox.recv() {
125 let loop_start = tinymist_std::time::now();
126 match msg {
127 Evt(event) => {
128 let Some(event_handler) = self.events.get(&event.as_ref().type_id()) else {
129 log::warn!("unhandled event: {:?}", event.as_ref().type_id());
130 continue;
131 };
132
133 let s = match &mut self.state {
134 State::Uninitialized(u) => ServiceState::Uninitialized(u.as_deref_mut()),
135 State::Initializing(s) | State::Ready(s) => ServiceState::Ready(s),
136 State::ShuttingDown => {
137 log::warn!("server is shutting down");
138 continue;
139 }
140 };
141
142 event_handler(s, &self.client, event)?;
143 }
144 Msg(DapMessage::Request(req)) => {
145 let client = self.client.clone();
146 let req_id = (req.seq as i32).into();
147 client.register_request(&req.command, &req_id, loop_start);
148 let fut = client.schedule_tail(req_id, self.on_request(req));
149 self.client.handle.spawn(fut);
150 }
151 Msg(DapMessage::Event(not)) => {
152 self.on_event(loop_start, not)?;
153 }
154 Msg(DapMessage::Response(resp)) => {
155 let s = match &mut self.state {
156 State::Ready(s) => s,
157 _ => {
158 log::warn!("server is not ready yet");
159 continue;
160 }
161 };
162
163 self.client.clone().complete_dap_request(s, resp)
164 }
165 }
166 }
167
168 log::warn!("client exited without proper shutdown sequence");
169 Ok(())
170 }
171
172 fn on_request(&mut self, req: dap::Request) -> ScheduleResult {
175 match (&mut self.state, &*req.command) {
176 (State::Uninitialized(args), dapts::request::Initialize::COMMAND) => {
177 let params = serde_json::from_value::<Args::I>(req.arguments);
179 match params {
180 Ok(params) => {
181 let args = args.take().expect("already initialized");
182 let (s, res) = args.initialize(params);
183 self.state = State::Ready(s);
184 res
185 }
186 Err(e) => just_result(Err(invalid_request(e))),
187 }
188 }
189 (State::Uninitialized(..) | State::Initializing(..), _) => {
211 just_result(Err(not_initialized()))
212 }
213 (_, dapts::request::Initialize::COMMAND) => {
214 just_result(Err(invalid_request("server is already initialized")))
215 }
216 (State::Ready(s), _) => 'serve_req: {
221 let method = req.command.as_str();
222
223 let is_disconnect = method == dapts::request::Disconnect::COMMAND;
224
225 let Some(handler) = self.requests.get(method) else {
226 log::warn!("unhandled dap request: {method}");
227 break 'serve_req just_result(Err(method_not_found()));
228 };
229
230 let resp = handler(s, req.arguments);
231
232 if is_disconnect {
233 self.state = State::ShuttingDown;
234 }
235
236 resp
237 }
238 (State::ShuttingDown, _) => {
239 just_result(Err(invalid_request("server is shutting down")))
240 }
241 }
242 }
243
244 fn on_event(&mut self, received_at: Time, not: dap::Event) -> anyhow::Result<()> {
246 let track_id = self.next_not_id.fetch_add(1, Ordering::Relaxed);
247 self.client.hook.start_notification(track_id, ¬.event);
248 let handle = |s,
249 dap::Event {
250 seq: _,
251 event,
252 body,
253 }: dap::Event| {
254 let Some(handler) = self.notifications.get(event.as_str()) else {
255 log::warn!("unhandled event: {event}");
256 return Ok(());
257 };
258
259 let result = handler(s, body);
260 self.client
261 .hook
262 .stop_notification(track_id, &event, received_at, result);
263
264 Ok(())
265 };
266
267 match (&mut self.state, &*not.event) {
268 (State::Ready(state), _) => handle(state, not),
269 (State::Uninitialized(..) | State::Initializing(..), method) => {
271 log::warn!("server is not ready yet, while received event {method}");
272 Ok(())
273 }
274 (State::ShuttingDown, method) => {
275 log::warn!("server is shutting down, while received event {method}");
276 Ok(())
277 }
278 }
279 }
280}