1use super::*;
2
3use lsp_types::{notification::Notification as Notif, request::Request as Req, *};
4
5type PureHandler<S, T> = fn(srv: &mut S, args: T) -> LspResult<()>;
6
7macro_rules! reschedule {
9 ($expr:expr) => {
10 match $expr {
11 Ok(Some(())) => return,
12 Ok(None) => Ok(futures::future::MaybeDone::Done(Ok(
13 serde_json::Value::Null,
14 ))),
15 Err(e) => Err(e),
16 }
17 };
18}
19
20impl<S: 'static> TypedLspClient<S> {
21 pub fn send_lsp_request<R: Req>(
24 &self,
25 params: R::Params,
26 handler: impl FnOnce(&mut S, lsp::Response) + Send + Sync + 'static,
27 ) {
28 let caster = self.caster.clone();
29 self.client
30 .send_lsp_request_::<R>(params, move |s, resp| handler(caster(s), resp))
31 }
32}
33
34impl LspClient {
35 pub fn send_lsp_request_<R: Req>(
37 &self,
38 params: R::Params,
39 handler: impl FnOnce(&mut dyn Any, lsp::Response) + Send + Sync + 'static,
40 ) {
41 let mut req_queue = self.req_queue.lock();
42 let request = req_queue.outgoing.register(
43 R::METHOD.to_owned(),
44 params,
45 Box::new(|s, resp| handler(s, resp.try_into().unwrap())),
46 );
47
48 let Some(sender) = self.sender.upgrade() else {
49 log::warn!("failed to send request: connection closed");
50 return;
51 };
52 if let Err(res) = sender.lsp.send(request.into()) {
53 log::warn!("failed to send request: {res:?}");
54 }
55 }
56
57 pub fn respond_lsp(&self, response: lsp::Response) {
59 self.respond(response.id.clone(), response.into())
60 }
61
62 pub fn send_notification<N: Notif>(&self, params: &N::Params) {
64 self.send_notification_(lsp::Notification::new(N::METHOD.to_owned(), params));
65 }
66
67 pub fn send_notification_(&self, notif: lsp::Notification) {
69 let method = ¬if.method;
70 let Some(sender) = self.sender.upgrade() else {
71 log::warn!("failed to send notification ({method}): connection closed");
72 return;
73 };
74 if let Err(res) = sender.lsp.send(notif.into()) {
75 log::warn!("failed to send notification: {res:?}");
76 }
77 }
78}
79
80impl<Args: Initializer> LsBuilder<LspMessage, Args>
81where
82 Args::S: 'static,
83{
84 pub fn with_command_(
86 mut self,
87 cmd: &'static str,
88 handler: RawHandler<Args::S, Vec<JsonValue>>,
89 ) -> Self {
90 self.command_handlers.insert(cmd, raw_to_boxed(handler));
91 self
92 }
93
94 pub fn with_command<R: Serialize + 'static>(
96 mut self,
97 cmd: &'static str,
98 handler: AsyncHandler<Args::S, Vec<JsonValue>, R>,
99 ) -> Self {
100 self.command_handlers.insert(
101 cmd,
102 Box::new(move |s, client, req_id, req| client.schedule(req_id, handler(s, req))),
103 );
104 self
105 }
106
107 pub fn with_notification_<R: Notif>(
109 mut self,
110 handler: PureHandler<Args::S, JsonValue>,
111 ) -> Self {
112 self.notif_handlers.insert(R::METHOD, Box::new(handler));
113 self
114 }
115
116 pub fn with_notification<R: Notif>(mut self, handler: PureHandler<Args::S, R::Params>) -> Self {
118 self.notif_handlers.insert(
119 R::METHOD,
120 Box::new(move |s, req| handler(s, from_json(req)?)),
121 );
122 self
123 }
124
125 pub fn with_raw_request<R: Req>(mut self, handler: RawHandler<Args::S, JsonValue>) -> Self {
128 self.req_handlers.insert(R::METHOD, raw_to_boxed(handler));
129 self
130 }
131
132 pub fn with_request_<R: Req>(
136 mut self,
137 handler: fn(&mut Args::S, RequestId, R::Params) -> ScheduledResult,
138 ) -> Self {
139 self.req_handlers.insert(
140 R::METHOD,
141 Box::new(move |s, _client, req_id, req| handler(s, req_id, from_json(req)?)),
142 );
143 self
144 }
145
146 pub fn with_request<R: Req>(
148 mut self,
149 handler: AsyncHandler<Args::S, R::Params, R::Result>,
150 ) -> Self {
151 self.req_handlers.insert(
152 R::METHOD,
153 Box::new(move |s, client, req_id, req| {
154 client.schedule(req_id, handler(s, from_json(req)?))
155 }),
156 );
157 self
158 }
159}
160
161impl<Args: Initializer> LsDriver<LspMessage, Args>
162where
163 Args::S: 'static,
164{
165 pub fn start(
173 &mut self,
174 inbox: TConnectionRx<LspMessage>,
175 is_replay: bool,
176 ) -> anyhow::Result<()> {
177 let res = self.start_(inbox);
178
179 if is_replay {
180 let client = self.client.clone();
181 let _ = std::thread::spawn(move || {
182 let since = std::time::Instant::now();
183 let timeout = std::env::var("REPLAY_TIMEOUT")
184 .ok()
185 .and_then(|s| s.parse().ok())
186 .unwrap_or(60);
187 client.handle.block_on(async {
188 while client.has_pending_requests() {
189 if since.elapsed().as_secs() > timeout {
190 log::error!("replay timeout reached, {timeout}s");
191 client.begin_panic();
192 }
193
194 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
195 }
196 })
197 })
198 .join();
199 }
200
201 res
202 }
203
204 pub fn start_(&mut self, inbox: TConnectionRx<LspMessage>) -> anyhow::Result<()> {
206 use EventOrMessage::*;
207 while let Ok(msg) = inbox.recv() {
228 const EXIT_METHOD: &str = notification::Exit::METHOD;
229 let loop_start = Instant::now();
230 match msg {
231 Evt(event) => {
232 let Some(event_handler) = self.events.get(&event.as_ref().type_id()) else {
233 log::warn!("unhandled event: {:?}", event.as_ref().type_id());
234 continue;
235 };
236
237 let s = match &mut self.state {
238 State::Uninitialized(u) => ServiceState::Uninitialized(u.as_deref_mut()),
239 State::Initializing(s) | State::Ready(s) => ServiceState::Ready(s),
240 State::ShuttingDown => {
241 log::warn!("server is shutting down");
242 continue;
243 }
244 };
245
246 event_handler(s, &self.client, event)?;
247 }
248 Msg(LspMessage::Request(req)) => self.on_lsp_request(loop_start, req),
249 Msg(LspMessage::Notification(not)) => {
250 let is_exit = not.method == EXIT_METHOD;
251 self.on_notification(loop_start, not)?;
252 if is_exit {
253 return Ok(());
254 }
255 }
256 Msg(LspMessage::Response(resp)) => {
257 let s = match &mut self.state {
258 State::Ready(s) => s,
259 _ => {
260 log::warn!("server is not ready yet");
261 continue;
262 }
263 };
264
265 self.client.clone().complete_lsp_request(s, resp)
266 }
267 }
268 }
269
270 log::warn!("client exited without proper shutdown sequence");
271 Ok(())
272 }
273
274 fn on_lsp_request(&mut self, request_received: Instant, req: Request) {
277 self.client
278 .register_request(&req.method, &req.id, request_received);
279
280 let req_id = req.id.clone();
281 let resp = match (&mut self.state, &*req.method) {
282 (State::Uninitialized(args), request::Initialize::METHOD) => {
283 let params = serde_json::from_value::<Args::I>(req.params);
285 match params {
286 Ok(params) => {
287 let args = args.take().expect("already initialized");
288 let (s, res) = args.initialize(params);
289 self.state = State::Initializing(s);
290 res
291 }
292 Err(e) => just_result(Err(invalid_request(e))),
293 }
294 }
295 (State::Uninitialized(..) | State::Initializing(..), _) => {
296 just_result(Err(not_initialized()))
297 }
298 (_, request::Initialize::METHOD) => {
299 just_result(Err(invalid_request("server is already initialized")))
300 }
301 (State::Ready(..), request::ExecuteCommand::METHOD) => {
303 reschedule!(self.on_execute_command(req))
304 }
305 (State::Ready(s), _) => 'serve_req: {
306 let method = req.method.as_str();
307 let is_shutdown = method == request::Shutdown::METHOD;
308
309 let Some(handler) = self.requests.get(method) else {
310 log::warn!("unhandled lsp request: {method}");
311 break 'serve_req just_result(Err(method_not_found()));
312 };
313
314 let result = handler(s, &self.client, req_id.clone(), req.params);
315 self.client.schedule_tail(req_id, result);
316
317 if is_shutdown {
318 self.state = State::ShuttingDown;
319 }
320
321 return;
322 }
323 (State::ShuttingDown, _) => {
324 just_result(Err(invalid_request("server is shutting down")))
325 }
326 };
327
328 let result = self.client.schedule(req_id.clone(), resp);
329 self.client.schedule_tail(req_id, result);
330 }
331
332 fn on_execute_command(&mut self, req: Request) -> ScheduledResult {
334 let s = self.state.opt_mut().ok_or_else(not_initialized)?;
335
336 let params = from_value::<ExecuteCommandParams>(req.params)
337 .map_err(|e| invalid_params(e.to_string()))?;
338
339 let ExecuteCommandParams {
340 command, arguments, ..
341 } = params;
342
343 if command == "tinymist.getResources" {
345 self.get_resources(req.id, arguments)
346 } else {
347 let Some(handler) = self.commands.get(command.as_str()) else {
348 log::error!("asked to execute unknown command: {command}");
349 return Err(method_not_found());
350 };
351 handler(s, &self.client, req.id, arguments)
352 }
353 }
354
355 fn on_notification(&mut self, received_at: Instant, not: Notification) -> anyhow::Result<()> {
357 self.client.hook.start_notification(¬.method);
358 let handle = |s, Notification { method, params }: Notification| {
359 let Some(handler) = self.notifications.get(method.as_str()) else {
360 log::warn!("unhandled notification: {method}");
361 return Ok(());
362 };
363
364 let result = handler(s, params);
365 self.client
366 .hook
367 .stop_notification(&method, received_at, result);
368
369 Ok(())
370 };
371
372 match (&mut self.state, &*not.method) {
373 (state, notification::Initialized::METHOD) => {
374 let mut s = State::ShuttingDown;
375 std::mem::swap(state, &mut s);
376 match s {
377 State::Initializing(s) => {
378 *state = State::Ready(s);
379 }
380 _ => {
381 std::mem::swap(state, &mut s);
382 }
383 }
384
385 let s = match state {
386 State::Ready(s) => s,
387 _ => {
388 log::warn!("server is not ready yet");
389 return Ok(());
390 }
391 };
392 handle(s, not)
393 }
394 (State::Ready(state), _) => handle(state, not),
395 (State::Uninitialized(..) | State::Initializing(..), method) => {
397 log::warn!("server is not ready yet, while received notification {method}");
398 Ok(())
399 }
400 (State::ShuttingDown, method) => {
401 log::warn!("server is shutting down, while received notification {method}");
402 Ok(())
403 }
404 }
405 }
406}