sync_ls/
server.rs

1//! A synchronous language server implementation.
2
3#[cfg(feature = "dap")]
4mod dap_srv;
5
6#[cfg(feature = "lsp")]
7mod lsp_srv;
8
9use core::fmt;
10use std::any::Any;
11use std::collections::HashMap;
12use std::path::{Path, PathBuf};
13use std::pin::Pin;
14use std::sync::{Arc, Weak};
15use std::time::Instant;
16
17use futures::future::MaybeDone;
18use parking_lot::Mutex;
19use serde::Serialize;
20use serde_json::{from_value, Value as JsonValue};
21
22#[cfg(feature = "lsp")]
23use crate::lsp::{Notification, Request};
24use crate::msg::*;
25use crate::req_queue;
26use crate::*;
27
28type ImmutPath = Arc<Path>;
29
30/// A future that may be done in place or not.
31pub type ResponseFuture<T> = MaybeDone<Pin<Box<dyn std::future::Future<Output = T> + Send>>>;
32/// A future that may be rejected before actual started.
33pub type LspResponseFuture<T> = LspResult<ResponseFuture<T>>;
34/// A future that could be rejected by common error in `LspResponseFuture`.
35pub type SchedulableResponse<T> = LspResponseFuture<LspResult<T>>;
36/// The common response future type for language servers.
37pub type AnySchedulableResponse = SchedulableResponse<JsonValue>;
38/// The result of a scheduled response which could be finally caught by
39/// `schedule_tail`.
40/// - Returns Ok(Some()) -> Already responded
41/// - Returns Ok(None) -> Need to respond none
42/// - Returns Err(..) -> Need to respond error
43pub type ScheduledResult = LspResult<Option<()>>;
44
45/// The untyped connect tx for language servers.
46pub type ConnectionTx = TConnectionTx<Message>;
47/// The untyped connect rx for language servers.
48pub type ConnectionRx = TConnectionRx<Message>;
49
50/// The sender of the language server.
51#[derive(Debug, Clone)]
52pub struct TConnectionTx<M> {
53    /// The sender of the events.
54    pub event: crossbeam_channel::Sender<Event>,
55    /// The sender of the LSP messages.
56    pub lsp: crossbeam_channel::Sender<Message>,
57    pub(crate) marker: std::marker::PhantomData<M>,
58}
59
60/// The sender of the language server.
61#[derive(Debug, Clone)]
62pub struct TConnectionRx<M> {
63    /// The receiver of the events.
64    pub event: crossbeam_channel::Receiver<Event>,
65    /// The receiver of the LSP messages.
66    pub lsp: crossbeam_channel::Receiver<Message>,
67    pub(crate) marker: std::marker::PhantomData<M>,
68}
69
70impl<M: TryFrom<Message, Error = anyhow::Error>> TConnectionRx<M> {
71    /// Receives a message or an event.
72    pub(crate) fn recv(&self) -> anyhow::Result<EventOrMessage<M>> {
73        crossbeam_channel::select_biased! {
74            recv(self.lsp) -> msg => Ok(EventOrMessage::Msg(msg?.try_into()?)),
75            recv(self.event) -> event => Ok(event.map(EventOrMessage::Evt)?),
76        }
77    }
78}
79
80/// This is a helper enum to handle both events and messages.
81pub(crate) enum EventOrMessage<M> {
82    Evt(Event),
83    Msg(M),
84}
85
86/// Connection is just a pair of channels of LSP messages.
87pub struct Connection<M> {
88    /// The senders of the connection.
89    pub sender: TConnectionTx<M>,
90    /// The receivers of the connection.
91    pub receiver: TConnectionRx<M>,
92}
93
94impl<M: TryFrom<Message, Error = anyhow::Error>> From<Connection<Message>> for Connection<M> {
95    fn from(conn: Connection<Message>) -> Self {
96        Self {
97            sender: TConnectionTx {
98                event: conn.sender.event,
99                lsp: conn.sender.lsp,
100                marker: std::marker::PhantomData,
101            },
102            receiver: TConnectionRx {
103                event: conn.receiver.event,
104                lsp: conn.receiver.lsp,
105                marker: std::marker::PhantomData,
106            },
107        }
108    }
109}
110
111impl<M: TryFrom<Message, Error = anyhow::Error>> From<TConnectionTx<M>> for ConnectionTx {
112    fn from(conn: TConnectionTx<M>) -> Self {
113        Self {
114            event: conn.event,
115            lsp: conn.lsp,
116            marker: std::marker::PhantomData,
117        }
118    }
119}
120
121type AnyCaster<S> = Arc<dyn Fn(&mut dyn Any) -> &mut S + Send + Sync>;
122
123/// A Lsp client with typed service `S`.
124pub struct TypedLspClient<S> {
125    client: LspClient,
126    caster: AnyCaster<S>,
127}
128
129impl<S> TypedLspClient<S> {
130    /// Converts the client to an untyped client.
131    pub fn to_untyped(self) -> LspClient {
132        self.client
133    }
134}
135
136impl<S: 'static> TypedLspClient<S> {
137    /// Returns the untyped lsp client.
138    pub fn untyped(&self) -> &LspClient {
139        &self.client
140    }
141
142    /// Casts the service to another type.
143    pub fn cast<T: 'static>(&self, f: fn(&mut S) -> &mut T) -> TypedLspClient<T> {
144        let caster = self.caster.clone();
145        TypedLspClient {
146            client: self.client.clone(),
147            caster: Arc::new(move |s| f(caster(s))),
148        }
149    }
150
151    /// Sends a event to the client itself.
152    pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
153        let Some(sender) = self.sender.upgrade() else {
154            log::warn!("failed to send request: connection closed");
155            return;
156        };
157
158        let Err(res) = sender.event.send(Box::new(event)) else {
159            return;
160        };
161        log::warn!("failed to send event: {res:?}");
162    }
163}
164
165impl<S> Clone for TypedLspClient<S> {
166    fn clone(&self) -> Self {
167        Self {
168            client: self.client.clone(),
169            caster: self.caster.clone(),
170        }
171    }
172}
173
174impl<S> std::ops::Deref for TypedLspClient<S> {
175    type Target = LspClient;
176
177    fn deref(&self) -> &Self::Target {
178        &self.client
179    }
180}
181
182/// The root of the language server host.
183/// Will close connection when dropped.
184#[derive(Debug, Clone)]
185pub struct LspClientRoot {
186    weak: LspClient,
187    _strong: Arc<ConnectionTx>,
188}
189
190impl LspClientRoot {
191    /// Creates a new language server host.
192    pub fn new<M: TryFrom<Message, Error = anyhow::Error> + GetMessageKind>(
193        handle: tokio::runtime::Handle,
194        sender: TConnectionTx<M>,
195    ) -> Self {
196        let _strong = Arc::new(sender.into());
197        let weak = LspClient {
198            handle,
199            msg_kind: M::get_message_kind(),
200            sender: Arc::downgrade(&_strong),
201            req_queue: Arc::new(Mutex::new(ReqQueue::default())),
202        };
203        Self { weak, _strong }
204    }
205
206    /// Returns the weak reference to the language server host.
207    pub fn weak(&self) -> LspClient {
208        self.weak.clone()
209    }
210}
211
212type ReqHandler = Box<dyn for<'a> FnOnce(&'a mut dyn Any, LspOrDapResponse) + Send + Sync>;
213type ReqQueue = req_queue::ReqQueue<(String, Instant), ReqHandler>;
214
215/// The host for the language server, or known as the LSP client.
216#[derive(Debug, Clone)]
217pub struct LspClient {
218    /// The tokio handle.
219    pub handle: tokio::runtime::Handle,
220
221    pub(crate) msg_kind: MessageKind,
222    pub(crate) sender: Weak<ConnectionTx>,
223    pub(crate) req_queue: Arc<Mutex<ReqQueue>>,
224}
225
226impl LspClient {
227    /// Returns the untyped lsp client.
228    pub fn untyped(&self) -> &Self {
229        self
230    }
231
232    /// converts the client to a typed client.
233    pub fn to_typed<S: Any>(&self) -> TypedLspClient<S> {
234        TypedLspClient {
235            client: self.clone(),
236            caster: Arc::new(|s| s.downcast_mut().expect("invalid cast")),
237        }
238    }
239
240    /// Checks if there are pending requests.
241    pub fn has_pending_requests(&self) -> bool {
242        self.req_queue.lock().incoming.has_pending()
243    }
244
245    /// Prints states of the request queue and panics.
246    pub fn begin_panic(&self) {
247        self.req_queue.lock().begin_panic();
248    }
249
250    /// Sends a event to the server itself.
251    pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
252        let Some(sender) = self.sender.upgrade() else {
253            log::warn!("failed to send request: connection closed");
254            return;
255        };
256
257        if let Err(res) = sender.event.send(Box::new(event)) {
258            log::warn!("failed to send event: {res:?}");
259        }
260    }
261
262    /// Completes an server2client request in the request queue.
263    #[cfg(feature = "lsp")]
264    pub fn complete_lsp_request<S: Any>(&self, service: &mut S, response: lsp::Response) {
265        let mut req_queue = self.req_queue.lock();
266        let Some(handler) = req_queue.outgoing.complete(response.id.clone()) else {
267            log::warn!("received response for unknown request");
268            return;
269        };
270        drop(req_queue);
271        handler(service, response.into())
272    }
273
274    /// Completes an server2client request in the request queue.
275    #[cfg(feature = "dap")]
276    pub fn complete_dap_request<S: Any>(&self, service: &mut S, response: dap::Response) {
277        let mut req_queue = self.req_queue.lock();
278        let Some(handler) = req_queue
279            .outgoing
280            // todo: casting i64 to i32
281            .complete((response.request_seq as i32).into())
282        else {
283            log::warn!("received response for unknown request");
284            return;
285        };
286        drop(req_queue);
287        handler(service, response.into())
288    }
289
290    /// Registers an client2server request in the request queue.
291    pub fn register_request(&self, method: &str, id: &RequestId, received_at: Instant) {
292        let mut req_queue = self.req_queue.lock();
293        self.start_request(id, method);
294        req_queue
295            .incoming
296            .register(id.clone(), (method.to_owned(), received_at));
297    }
298
299    /// Responds a typed result to the client.
300    pub fn respond_result<T: Serialize>(&self, id: RequestId, result: LspResult<T>) {
301        let result = result.and_then(|t| serde_json::to_value(t).map_err(internal_error));
302        self.respond_any_result(id, result);
303    }
304
305    fn respond_any_result(&self, id: RequestId, result: LspResult<JsonValue>) {
306        let req_id = id.clone();
307        let msg: Message = match (self.msg_kind, result) {
308            #[cfg(feature = "lsp")]
309            (MessageKind::Lsp, Ok(resp)) => lsp::Response::new_ok(id, resp).into(),
310            #[cfg(feature = "lsp")]
311            (MessageKind::Lsp, Err(e)) => lsp::Response::new_err(id, e.code, e.message).into(),
312            #[cfg(feature = "dap")]
313            (MessageKind::Dap, Ok(resp)) => dap::Response::success(RequestId::dap(id), resp).into(),
314            #[cfg(feature = "dap")]
315            (MessageKind::Dap, Err(e)) => {
316                dap::Response::error(RequestId::dap(id), Some(e.message), None).into()
317            }
318        };
319
320        self.respond(req_id, msg);
321    }
322
323    /// Completes an client2server request in the request queue.
324    pub fn respond(&self, id: RequestId, response: Message) {
325        let mut req_queue = self.req_queue.lock();
326        let Some((method, received_at)) = req_queue.incoming.complete(&id) else {
327            return;
328        };
329
330        self.stop_request(&id, &method, received_at);
331
332        let Some(sender) = self.sender.upgrade() else {
333            log::warn!("failed to send response ({method}, {id}): connection closed");
334            return;
335        };
336        if let Err(res) = sender.lsp.send(response) {
337            log::warn!("failed to send response ({method}, {id}): {res:?}");
338        }
339    }
340}
341
342impl LspClient {
343    /// Schedules a request from the client.
344    pub fn schedule<T: Serialize + 'static>(
345        &self,
346        req_id: RequestId,
347        resp: SchedulableResponse<T>,
348    ) -> ScheduledResult {
349        let resp = resp?;
350
351        use futures::future::MaybeDone::*;
352        match resp {
353            Done(output) => {
354                self.respond_result(req_id, output);
355            }
356            Future(fut) => {
357                let client = self.clone();
358                let req_id = req_id.clone();
359                self.handle.spawn(async move {
360                    client.respond_result(req_id, fut.await);
361                });
362            }
363            Gone => {
364                log::warn!("response for request({req_id:?}) already taken");
365            }
366        };
367
368        Ok(Some(()))
369    }
370
371    /// Finally sends the response if it is not sent before.
372    /// From the definition, the response is already sent if it is `Some(())`.
373    pub(crate) fn schedule_tail(&self, req_id: RequestId, resp: ScheduledResult) {
374        match resp {
375            // Already responded
376            Ok(Some(())) => {}
377            // The requests that doesn't start.
378            _ => self.respond_result(req_id, resp),
379        }
380    }
381}
382
383impl LspClient {
384    fn start_request(&self, req_id: &RequestId, method: &str) {
385        log::info!("handling {method} - ({req_id})");
386    }
387
388    fn stop_request(&self, req_id: &RequestId, method: &str, received_at: Instant) {
389        let duration = received_at.elapsed();
390        log::info!("handled  {method} - ({req_id}) in {duration:0.2?}");
391    }
392
393    fn start_notification(&self, method: &str) {
394        log::info!("notifying {method}");
395    }
396
397    fn stop_notification(&self, method: &str, received_at: Instant, result: LspResult<()>) {
398        let request_duration = received_at.elapsed();
399        if let Err(err) = result {
400            log::error!("notify {method} failed in {request_duration:0.2?}: {err:?}");
401        } else {
402            log::info!("notify {method} succeeded in {request_duration:0.2?}");
403        }
404    }
405}
406
407type AsyncHandler<S, T, R> = fn(srv: &mut S, args: T) -> SchedulableResponse<R>;
408type RawHandler<S, T> = fn(srv: &mut S, req_id: RequestId, args: T) -> ScheduledResult;
409type BoxPureHandler<S, T> = Box<dyn Fn(&mut S, T) -> LspResult<()>>;
410type BoxHandler<S, T> = Box<dyn Fn(&mut S, &LspClient, RequestId, T) -> ScheduledResult>;
411type ExecuteCmdMap<S> = HashMap<&'static str, BoxHandler<S, Vec<JsonValue>>>;
412type RegularCmdMap<S> = HashMap<&'static str, BoxHandler<S, JsonValue>>;
413type NotifyCmdMap<S> = HashMap<&'static str, BoxPureHandler<S, JsonValue>>;
414type ResourceMap<S> = HashMap<ImmutPath, BoxHandler<S, Vec<JsonValue>>>;
415type MayInitBoxHandler<A, S, T> =
416    Box<dyn for<'a> Fn(ServiceState<'a, A, S>, &LspClient, T) -> anyhow::Result<()>>;
417type EventMap<A, S> = HashMap<core::any::TypeId, MayInitBoxHandler<A, S, Event>>;
418
419/// A trait that initializes the language server.
420pub trait Initializer {
421    /// The type of the initialization request.
422    type I: for<'de> serde::Deserialize<'de>;
423    /// The type of the service.
424    type S;
425
426    /// Handles the initialization request.
427    /// If the behind protocol is the standard LSP, the request is
428    /// `InitializeParams`.
429    fn initialize(self, req: Self::I) -> (Self::S, AnySchedulableResponse);
430}
431
432/// The language server builder serving LSP.
433#[cfg(feature = "lsp")]
434pub type LspBuilder<Args> = LsBuilder<LspMessage, Args>;
435/// The language server builder serving DAP.
436#[cfg(feature = "dap")]
437pub type DapBuilder<Args> = LsBuilder<DapMessage, Args>;
438
439/// The builder pattern for the language server.
440pub struct LsBuilder<M, Args: Initializer> {
441    /// The extra initialization arguments.
442    pub args: Args,
443    /// The client surface for the implementing language server.
444    pub client: LspClient,
445    /// The event handlers.
446    pub events: EventMap<Args, Args::S>,
447    /// The command handlers.
448    pub command_handlers: ExecuteCmdMap<Args::S>,
449    /// The notification handlers.
450    pub notif_handlers: NotifyCmdMap<Args::S>,
451    /// The LSP request handlers.
452    pub req_handlers: RegularCmdMap<Args::S>,
453    /// The resource handlers.
454    pub resource_handlers: ResourceMap<Args::S>,
455    _marker: std::marker::PhantomData<M>,
456}
457
458impl<M, Args: Initializer> LsBuilder<M, Args>
459where
460    Args::S: 'static,
461{
462    /// Creates a new language server builder.
463    pub fn new(args: Args, client: LspClient) -> Self {
464        Self {
465            args,
466            client,
467            events: EventMap::new(),
468            command_handlers: ExecuteCmdMap::new(),
469            notif_handlers: NotifyCmdMap::new(),
470            req_handlers: RegularCmdMap::new(),
471            resource_handlers: ResourceMap::new(),
472            _marker: std::marker::PhantomData,
473        }
474    }
475
476    /// Registers an event handler.
477    pub fn with_event<T: std::any::Any>(
478        mut self,
479        ins: &T,
480        handler: impl for<'a> Fn(ServiceState<'a, Args, Args::S>, T) -> anyhow::Result<()> + 'static,
481    ) -> Self {
482        self.events.insert(
483            ins.type_id(),
484            Box::new(move |s, _client, req| handler(s, *req.downcast().unwrap())),
485        );
486        self
487    }
488
489    /// Registers a raw resource handler.
490    pub fn with_resource_(
491        mut self,
492        path: ImmutPath,
493        handler: RawHandler<Args::S, Vec<JsonValue>>,
494    ) -> Self {
495        self.resource_handlers.insert(path, raw_to_boxed(handler));
496        self
497    }
498
499    /// Registers an async resource handler.
500    pub fn with_resource(
501        mut self,
502        path: &'static str,
503        handler: fn(&mut Args::S, Vec<JsonValue>) -> AnySchedulableResponse,
504    ) -> Self {
505        self.resource_handlers.insert(
506            Path::new(path).into(),
507            Box::new(move |s, client, req_id, req| client.schedule(req_id, handler(s, req))),
508        );
509        self
510    }
511
512    /// Builds the language server driver.
513    pub fn build(self) -> LsDriver<M, Args> {
514        LsDriver {
515            state: State::Uninitialized(Some(Box::new(self.args))),
516            events: self.events,
517            client: self.client,
518            commands: self.command_handlers,
519            notifications: self.notif_handlers,
520            requests: self.req_handlers,
521            resources: self.resource_handlers,
522            _marker: std::marker::PhantomData,
523        }
524    }
525}
526
527/// An enum to represent the state of the language server.
528pub enum ServiceState<'a, A, S> {
529    /// The service is uninitialized.
530    Uninitialized(Option<&'a mut A>),
531    /// The service is initializing.
532    Ready(&'a mut S),
533}
534
535impl<A, S> ServiceState<'_, A, S> {
536    /// Converts the state to an option holding the ready service.
537    pub fn ready(&mut self) -> Option<&mut S> {
538        match self {
539            ServiceState::Ready(s) => Some(s),
540            _ => None,
541        }
542    }
543}
544
545#[derive(Debug, Clone, PartialEq, Eq)]
546#[allow(dead_code)]
547enum State<Args, S> {
548    Uninitialized(Option<Box<Args>>),
549    Initializing(S),
550    Ready(S),
551    ShuttingDown,
552}
553
554impl<Args, S> State<Args, S> {
555    fn opt(&self) -> Option<&S> {
556        match &self {
557            State::Ready(s) => Some(s),
558            _ => None,
559        }
560    }
561
562    fn opt_mut(&mut self) -> Option<&mut S> {
563        match self {
564            State::Ready(s) => Some(s),
565            _ => None,
566        }
567    }
568}
569
570/// The language server driver.
571pub struct LsDriver<M, Args: Initializer> {
572    /// State to synchronize with the client.
573    state: State<Args, Args::S>,
574    /// The language server client.
575    pub client: LspClient,
576
577    // Handle maps
578    /// Events for dispatching.
579    pub events: EventMap<Args, Args::S>,
580    /// Extra commands provided with `textDocument/executeCommand`.
581    pub commands: ExecuteCmdMap<Args::S>,
582    /// Notifications for dispatching.
583    pub notifications: NotifyCmdMap<Args::S>,
584    /// Requests for dispatching.
585    pub requests: RegularCmdMap<Args::S>,
586    /// Resources for dispatching.
587    pub resources: ResourceMap<Args::S>,
588    _marker: std::marker::PhantomData<M>,
589}
590
591impl<M, Args: Initializer> LsDriver<M, Args> {
592    /// Gets the state of the language server.
593    pub fn state(&self) -> Option<&Args::S> {
594        self.state.opt()
595    }
596
597    /// Gets the mutable state of the language server.
598    pub fn state_mut(&mut self) -> Option<&mut Args::S> {
599        self.state.opt_mut()
600    }
601
602    /// Makes the language server ready.
603    pub fn ready(&mut self, params: Args::I) -> AnySchedulableResponse {
604        let args = match &mut self.state {
605            State::Uninitialized(args) => args,
606            _ => return just_result(Err(invalid_request("server is already initialized"))),
607        };
608
609        let args = args.take().expect("already initialized");
610        let (s, res) = args.initialize(params);
611        self.state = State::Ready(s);
612
613        res
614    }
615
616    /// Get static resources with help of tinymist service, for example, a
617    /// static help pages for some typst function.
618    pub fn get_resources(&mut self, req_id: RequestId, args: Vec<JsonValue>) -> ScheduledResult {
619        let s = self.state.opt_mut().ok_or_else(not_initialized)?;
620
621        let path =
622            from_value::<PathBuf>(args[0].clone()).map_err(|e| invalid_params(e.to_string()))?;
623
624        let Some(handler) = self.resources.get(path.as_path()) else {
625            log::error!("asked for unknown resource: {path:?}");
626            return Err(method_not_found());
627        };
628
629        // Note our redirection will keep the first path argument in the args vec.
630        handler(s, &self.client, req_id, args)
631    }
632}
633
634/// A helper function to create a `LspResponseFuture`
635pub fn just_ok<T, E>(res: T) -> Result<ResponseFuture<Result<T, E>>, E> {
636    Ok(futures::future::MaybeDone::Done(Ok(res)))
637}
638
639/// A helper function to create a `LspResponseFuture`
640pub fn just_result<T, E>(res: Result<T, E>) -> Result<ResponseFuture<Result<T, E>>, E> {
641    Ok(futures::future::MaybeDone::Done(res))
642}
643
644/// A helper function to create a `LspResponseFuture`
645pub fn just_future<T, E>(
646    fut: impl std::future::Future<Output = Result<T, E>> + Send + 'static,
647) -> Result<ResponseFuture<Result<T, E>>, E> {
648    Ok(futures::future::MaybeDone::Future(Box::pin(fut)))
649}
650
651/// Creates an invalid params error.
652pub fn invalid_params(msg: impl fmt::Display) -> ResponseError {
653    resp_err(ErrorCode::InvalidParams, msg)
654}
655
656/// Creates an internal error.
657pub fn internal_error(msg: impl fmt::Display) -> ResponseError {
658    resp_err(ErrorCode::InternalError, msg)
659}
660
661/// Creates a not initialized error.
662pub fn not_initialized() -> ResponseError {
663    resp_err(ErrorCode::ServerNotInitialized, "not initialized yet")
664}
665
666/// Creates a method not found error.
667pub fn method_not_found() -> ResponseError {
668    resp_err(ErrorCode::MethodNotFound, "method not found")
669}
670
671/// Creates an invalid request error.
672pub fn invalid_request(msg: impl fmt::Display) -> ResponseError {
673    resp_err(ErrorCode::InvalidRequest, msg)
674}
675
676fn from_json<T: serde::de::DeserializeOwned>(json: JsonValue) -> LspResult<T> {
677    serde_json::from_value(json).map_err(invalid_request)
678}
679
680fn raw_to_boxed<S: 'static, T: 'static>(handler: RawHandler<S, T>) -> BoxHandler<S, T> {
681    Box::new(move |s, _client, req_id, req| handler(s, req_id, req))
682}
683
684fn resp_err(code: ErrorCode, msg: impl fmt::Display) -> ResponseError {
685    ResponseError {
686        code: code as i32,
687        message: msg.to_string(),
688        data: None,
689    }
690}