sync_ls/
server.rs

1//! A synchronous language server implementation.
2
3#[cfg(feature = "dap")]
4mod dap_srv;
5
6#[cfg(feature = "lsp")]
7mod lsp_srv;
8
9use core::fmt;
10use std::any::Any;
11use std::collections::HashMap;
12use std::path::{Path, PathBuf};
13use std::pin::Pin;
14use std::sync::{Arc, Weak};
15use std::time::Instant;
16
17use futures::future::MaybeDone;
18use parking_lot::Mutex;
19use serde::Serialize;
20use serde_json::{from_value, Value as JsonValue};
21
22#[cfg(feature = "lsp")]
23use crate::lsp::{Notification, Request};
24use crate::msg::*;
25use crate::req_queue;
26use crate::*;
27
28type ImmutPath = Arc<Path>;
29
30/// A future that may be done in place or not.
31pub type ResponseFuture<T> = MaybeDone<Pin<Box<dyn std::future::Future<Output = T> + Send>>>;
32/// A future that may be rejected before actual started.
33pub type LspResponseFuture<T> = LspResult<ResponseFuture<T>>;
34/// A future that could be rejected by common error in `LspResponseFuture`.
35pub type SchedulableResponse<T> = LspResponseFuture<LspResult<T>>;
36/// The common response future type for language servers.
37pub type AnySchedulableResponse = SchedulableResponse<JsonValue>;
38/// The result of a scheduled response which could be finally caught by
39/// `schedule_tail`.
40/// - Returns Ok(Some()) -> Already responded
41/// - Returns Ok(None) -> Need to respond none
42/// - Returns Err(..) -> Need to respond error
43pub type ScheduledResult = LspResult<Option<()>>;
44
45/// The untyped connect tx for language servers.
46pub type ConnectionTx = TConnectionTx<Message>;
47/// The untyped connect rx for language servers.
48pub type ConnectionRx = TConnectionRx<Message>;
49
50/// The sender of the language server.
51#[derive(Debug, Clone)]
52pub struct TConnectionTx<M> {
53    /// The sender of the events.
54    pub event: crossbeam_channel::Sender<Event>,
55    /// The sender of the LSP messages.
56    pub lsp: crossbeam_channel::Sender<Message>,
57    pub(crate) marker: std::marker::PhantomData<M>,
58}
59
60/// The sender of the language server.
61#[derive(Debug, Clone)]
62pub struct TConnectionRx<M> {
63    /// The receiver of the events.
64    pub event: crossbeam_channel::Receiver<Event>,
65    /// The receiver of the LSP messages.
66    pub lsp: crossbeam_channel::Receiver<Message>,
67    pub(crate) marker: std::marker::PhantomData<M>,
68}
69
70impl<M: TryFrom<Message, Error = anyhow::Error>> TConnectionRx<M> {
71    /// Receives a message or an event.
72    pub(crate) fn recv(&self) -> anyhow::Result<EventOrMessage<M>> {
73        crossbeam_channel::select_biased! {
74            recv(self.lsp) -> msg => Ok(EventOrMessage::Msg(msg?.try_into()?)),
75            recv(self.event) -> event => Ok(event.map(EventOrMessage::Evt)?),
76        }
77    }
78}
79
80/// This is a helper enum to handle both events and messages.
81pub(crate) enum EventOrMessage<M> {
82    Evt(Event),
83    Msg(M),
84}
85
86/// Connection is just a pair of channels of LSP messages.
87pub struct Connection<M> {
88    /// The senders of the connection.
89    pub sender: TConnectionTx<M>,
90    /// The receivers of the connection.
91    pub receiver: TConnectionRx<M>,
92}
93
94impl<M: TryFrom<Message, Error = anyhow::Error>> From<Connection<Message>> for Connection<M> {
95    fn from(conn: Connection<Message>) -> Self {
96        Self {
97            sender: TConnectionTx {
98                event: conn.sender.event,
99                lsp: conn.sender.lsp,
100                marker: std::marker::PhantomData,
101            },
102            receiver: TConnectionRx {
103                event: conn.receiver.event,
104                lsp: conn.receiver.lsp,
105                marker: std::marker::PhantomData,
106            },
107        }
108    }
109}
110
111impl<M: TryFrom<Message, Error = anyhow::Error>> From<TConnectionTx<M>> for ConnectionTx {
112    fn from(conn: TConnectionTx<M>) -> Self {
113        Self {
114            event: conn.event,
115            lsp: conn.lsp,
116            marker: std::marker::PhantomData,
117        }
118    }
119}
120
121type AnyCaster<S> = Arc<dyn Fn(&mut dyn Any) -> &mut S + Send + Sync>;
122
123/// A Lsp client with typed service `S`.
124pub struct TypedLspClient<S> {
125    client: LspClient,
126    caster: AnyCaster<S>,
127}
128
129impl<S> TypedLspClient<S> {
130    /// Converts the client to an untyped client.
131    pub fn to_untyped(self) -> LspClient {
132        self.client
133    }
134}
135
136impl<S: 'static> TypedLspClient<S> {
137    /// Returns the untyped lsp client.
138    pub fn untyped(&self) -> &LspClient {
139        &self.client
140    }
141
142    /// Casts the service to another type.
143    pub fn cast<T: 'static>(&self, f: fn(&mut S) -> &mut T) -> TypedLspClient<T> {
144        let caster = self.caster.clone();
145        TypedLspClient {
146            client: self.client.clone(),
147            caster: Arc::new(move |s| f(caster(s))),
148        }
149    }
150
151    /// Sends a event to the client itself.
152    pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
153        let Some(sender) = self.sender.upgrade() else {
154            log::warn!("failed to send request: connection closed");
155            return;
156        };
157
158        let Err(res) = sender.event.send(Box::new(event)) else {
159            return;
160        };
161        log::warn!("failed to send event: {res:?}");
162    }
163}
164
165impl<S> Clone for TypedLspClient<S> {
166    fn clone(&self) -> Self {
167        Self {
168            client: self.client.clone(),
169            caster: self.caster.clone(),
170        }
171    }
172}
173
174impl<S> std::ops::Deref for TypedLspClient<S> {
175    type Target = LspClient;
176
177    fn deref(&self) -> &Self::Target {
178        &self.client
179    }
180}
181
182/// The root of the language server host.
183/// Will close connection when dropped.
184#[derive(Debug, Clone)]
185pub struct LspClientRoot {
186    weak: LspClient,
187    _strong: Arc<ConnectionTx>,
188}
189
190impl LspClientRoot {
191    /// Creates a new language server host.
192    pub fn new<M: TryFrom<Message, Error = anyhow::Error> + GetMessageKind>(
193        handle: tokio::runtime::Handle,
194        sender: TConnectionTx<M>,
195    ) -> Self {
196        let _strong = Arc::new(sender.into());
197        let weak = LspClient {
198            handle,
199            msg_kind: M::get_message_kind(),
200            sender: Arc::downgrade(&_strong),
201            req_queue: Arc::new(Mutex::new(ReqQueue::default())),
202        };
203        Self { weak, _strong }
204    }
205
206    /// Returns the weak reference to the language server host.
207    pub fn weak(&self) -> LspClient {
208        self.weak.clone()
209    }
210}
211
212type ReqHandler = Box<dyn for<'a> FnOnce(&'a mut dyn Any, LspOrDapResponse) + Send + Sync>;
213type ReqQueue = req_queue::ReqQueue<(String, Instant), ReqHandler>;
214
215/// The host for the language server, or known as the LSP client.
216#[derive(Debug, Clone)]
217pub struct LspClient {
218    /// The tokio handle.
219    pub handle: tokio::runtime::Handle,
220
221    pub(crate) msg_kind: MessageKind,
222    pub(crate) sender: Weak<ConnectionTx>,
223    pub(crate) req_queue: Arc<Mutex<ReqQueue>>,
224}
225
226impl LspClient {
227    /// Returns the untyped lsp client.
228    pub fn untyped(&self) -> &Self {
229        self
230    }
231
232    /// converts the client to a typed client.
233    pub fn to_typed<S: Any>(&self) -> TypedLspClient<S> {
234        TypedLspClient {
235            client: self.clone(),
236            caster: Arc::new(|s| s.downcast_mut().expect("invalid cast")),
237        }
238    }
239
240    /// Checks if there are pending requests.
241    pub fn has_pending_requests(&self) -> bool {
242        self.req_queue.lock().incoming.has_pending()
243    }
244
245    /// Prints states of the request queue and panics.
246    pub fn begin_panic(&self) {
247        self.req_queue.lock().begin_panic();
248    }
249
250    /// Sends a event to the server itself.
251    pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
252        let Some(sender) = self.sender.upgrade() else {
253            log::warn!("failed to send request: connection closed");
254            return;
255        };
256
257        if let Err(res) = sender.event.send(Box::new(event)) {
258            log::warn!("failed to send event: {res:?}");
259        }
260    }
261
262    /// Completes an server2client request in the request queue.
263    #[cfg(feature = "lsp")]
264    pub fn complete_lsp_request<S: Any>(&self, service: &mut S, response: lsp::Response) {
265        let mut req_queue = self.req_queue.lock();
266        let Some(handler) = req_queue.outgoing.complete(response.id.clone()) else {
267            log::warn!("received response for unknown request");
268            return;
269        };
270        drop(req_queue);
271        handler(service, response.into())
272    }
273
274    /// Completes an server2client request in the request queue.
275    #[cfg(feature = "dap")]
276    pub fn complete_dap_request<S: Any>(&self, service: &mut S, response: dap::Response) {
277        let mut req_queue = self.req_queue.lock();
278        let Some(handler) = req_queue
279            .outgoing
280            // todo: casting i64 to i32
281            .complete((response.request_seq as i32).into())
282        else {
283            log::warn!("received response for unknown request");
284            return;
285        };
286        drop(req_queue);
287        handler(service, response.into())
288    }
289
290    /// Registers an client2server request in the request queue.
291    pub fn register_request(&self, method: &str, id: &RequestId, received_at: Instant) {
292        let mut req_queue = self.req_queue.lock();
293        self.start_request(id, method);
294        req_queue
295            .incoming
296            .register(id.clone(), (method.to_owned(), received_at));
297    }
298
299    /// Responds a typed result to the client.
300    pub fn respond_result<T: Serialize>(&self, id: RequestId, result: LspResult<T>) {
301        let result = result.and_then(|t| serde_json::to_value(t).map_err(internal_error));
302        self.respond_any_result(id, result);
303    }
304
305    fn respond_any_result(&self, id: RequestId, result: LspResult<JsonValue>) {
306        let req_id = id.clone();
307        let msg: Message = match (self.msg_kind, result) {
308            #[cfg(feature = "lsp")]
309            (MessageKind::Lsp, Ok(resp)) => lsp::Response::new_ok(id, resp).into(),
310            #[cfg(feature = "lsp")]
311            (MessageKind::Lsp, Err(e)) => lsp::Response::new_err(id, e.code, e.message).into(),
312            #[cfg(feature = "dap")]
313            (MessageKind::Dap, Ok(resp)) => dap::Response::success(RequestId::dap(id), resp).into(),
314            #[cfg(feature = "dap")]
315            (MessageKind::Dap, Err(e)) => {
316                dap::Response::error(RequestId::dap(id), Some(e.message), None).into()
317            }
318        };
319
320        self.respond(req_id, msg);
321    }
322
323    /// Completes an client2server request in the request queue.
324    pub fn respond(&self, id: RequestId, response: Message) {
325        let mut req_queue = self.req_queue.lock();
326        let Some((method, received_at)) = req_queue.incoming.complete(&id) else {
327            return;
328        };
329
330        self.stop_request(&id, &method, received_at);
331
332        let Some(sender) = self.sender.upgrade() else {
333            log::warn!("failed to send response ({method}, {id}): connection closed");
334            return;
335        };
336        if let Err(res) = sender.lsp.send(response) {
337            log::warn!("failed to send response ({method}, {id}): {res:?}");
338        }
339    }
340}
341
342impl LspClient {
343    /// Schedules a request from the client.
344    pub fn schedule<T: Serialize + 'static>(
345        &self,
346        req_id: RequestId,
347        resp: SchedulableResponse<T>,
348    ) -> ScheduledResult {
349        let resp = resp?;
350
351        use futures::future::MaybeDone::*;
352        match resp {
353            Done(output) => {
354                self.respond_result(req_id, output);
355            }
356            Future(fut) => {
357                let client = self.clone();
358                let req_id = req_id.clone();
359                self.handle.spawn(async move {
360                    client.respond_result(req_id, fut.await);
361                });
362            }
363            Gone => {
364                log::warn!("response for request({req_id:?}) already taken");
365            }
366        };
367
368        Ok(Some(()))
369    }
370
371    /// Catch the early rejected requests.
372    pub(crate) fn schedule_tail(&self, req_id: RequestId, resp: ScheduledResult) {
373        match resp {
374            // Already responded
375            Ok(Some(())) => {}
376            // The requests that doesn't start.
377            _ => self.respond_result(req_id, resp),
378        }
379    }
380}
381
382impl LspClient {
383    fn start_request(&self, req_id: &RequestId, method: &str) {
384        log::info!("handling {method} - ({req_id})");
385    }
386
387    fn stop_request(&self, req_id: &RequestId, method: &str, received_at: Instant) {
388        let duration = received_at.elapsed();
389        log::info!("handled  {method} - ({req_id}) in {duration:0.2?}");
390    }
391
392    fn start_notification(&self, method: &str) {
393        log::info!("notifying {method}");
394    }
395
396    fn stop_notification(&self, method: &str, received_at: Instant, result: LspResult<()>) {
397        let request_duration = received_at.elapsed();
398        if let Err(err) = result {
399            log::error!("notify {method} failed in {request_duration:0.2?}: {err:?}");
400        } else {
401            log::info!("notify {method} succeeded in {request_duration:0.2?}");
402        }
403    }
404}
405
406type AsyncHandler<S, T, R> = fn(srv: &mut S, args: T) -> SchedulableResponse<R>;
407type RawHandler<S, T> = fn(srv: &mut S, req_id: RequestId, args: T) -> ScheduledResult;
408type BoxPureHandler<S, T> = Box<dyn Fn(&mut S, T) -> LspResult<()>>;
409type BoxHandler<S, T> = Box<dyn Fn(&mut S, &LspClient, RequestId, T) -> ScheduledResult>;
410type ExecuteCmdMap<S> = HashMap<&'static str, BoxHandler<S, Vec<JsonValue>>>;
411type RegularCmdMap<S> = HashMap<&'static str, BoxHandler<S, JsonValue>>;
412type NotifyCmdMap<S> = HashMap<&'static str, BoxPureHandler<S, JsonValue>>;
413type ResourceMap<S> = HashMap<ImmutPath, BoxHandler<S, Vec<JsonValue>>>;
414type MayInitBoxHandler<A, S, T> =
415    Box<dyn for<'a> Fn(ServiceState<'a, A, S>, &LspClient, T) -> anyhow::Result<()>>;
416type EventMap<A, S> = HashMap<core::any::TypeId, MayInitBoxHandler<A, S, Event>>;
417
418/// A trait that initializes the language server.
419pub trait Initializer {
420    /// The type of the initialization request.
421    type I: for<'de> serde::Deserialize<'de>;
422    /// The type of the service.
423    type S;
424
425    /// Handles the initialization request.
426    /// If the behind protocol is the standard LSP, the request is
427    /// `InitializeParams`.
428    fn initialize(self, req: Self::I) -> (Self::S, AnySchedulableResponse);
429}
430
431/// The language server builder serving LSP.
432#[cfg(feature = "lsp")]
433pub type LspBuilder<Args> = LsBuilder<LspMessage, Args>;
434/// The language server builder serving DAP.
435#[cfg(feature = "dap")]
436pub type DapBuilder<Args> = LsBuilder<DapMessage, Args>;
437
438/// The builder pattern for the language server.
439pub struct LsBuilder<M, Args: Initializer> {
440    /// The extra initialization arguments.
441    pub args: Args,
442    /// The client surface for the implementing language server.
443    pub client: LspClient,
444    /// The event handlers.
445    pub events: EventMap<Args, Args::S>,
446    /// The command handlers.
447    pub command_handlers: ExecuteCmdMap<Args::S>,
448    /// The notification handlers.
449    pub notif_handlers: NotifyCmdMap<Args::S>,
450    /// The LSP request handlers.
451    pub req_handlers: RegularCmdMap<Args::S>,
452    /// The resource handlers.
453    pub resource_handlers: ResourceMap<Args::S>,
454    _marker: std::marker::PhantomData<M>,
455}
456
457impl<M, Args: Initializer> LsBuilder<M, Args>
458where
459    Args::S: 'static,
460{
461    /// Creates a new language server builder.
462    pub fn new(args: Args, client: LspClient) -> Self {
463        Self {
464            args,
465            client,
466            events: EventMap::new(),
467            command_handlers: ExecuteCmdMap::new(),
468            notif_handlers: NotifyCmdMap::new(),
469            req_handlers: RegularCmdMap::new(),
470            resource_handlers: ResourceMap::new(),
471            _marker: std::marker::PhantomData,
472        }
473    }
474
475    /// Registers an event handler.
476    pub fn with_event<T: std::any::Any>(
477        mut self,
478        ins: &T,
479        handler: impl for<'a> Fn(ServiceState<'a, Args, Args::S>, T) -> anyhow::Result<()> + 'static,
480    ) -> Self {
481        self.events.insert(
482            ins.type_id(),
483            Box::new(move |s, _client, req| handler(s, *req.downcast().unwrap())),
484        );
485        self
486    }
487
488    /// Registers a raw resource handler.
489    pub fn with_resource_(
490        mut self,
491        path: ImmutPath,
492        handler: RawHandler<Args::S, Vec<JsonValue>>,
493    ) -> Self {
494        self.resource_handlers.insert(path, raw_to_boxed(handler));
495        self
496    }
497
498    /// Registers an async resource handler.
499    pub fn with_resource(
500        mut self,
501        path: &'static str,
502        handler: fn(&mut Args::S, Vec<JsonValue>) -> AnySchedulableResponse,
503    ) -> Self {
504        self.resource_handlers.insert(
505            Path::new(path).into(),
506            Box::new(move |s, client, req_id, req| client.schedule(req_id, handler(s, req))),
507        );
508        self
509    }
510
511    /// Builds the language server driver.
512    pub fn build(self) -> LsDriver<M, Args> {
513        LsDriver {
514            state: State::Uninitialized(Some(Box::new(self.args))),
515            events: self.events,
516            client: self.client,
517            commands: self.command_handlers,
518            notifications: self.notif_handlers,
519            requests: self.req_handlers,
520            resources: self.resource_handlers,
521            _marker: std::marker::PhantomData,
522        }
523    }
524}
525
526/// An enum to represent the state of the language server.
527pub enum ServiceState<'a, A, S> {
528    /// The service is uninitialized.
529    Uninitialized(Option<&'a mut A>),
530    /// The service is initializing.
531    Ready(&'a mut S),
532}
533
534impl<A, S> ServiceState<'_, A, S> {
535    /// Converts the state to an option holding the ready service.
536    pub fn ready(&mut self) -> Option<&mut S> {
537        match self {
538            ServiceState::Ready(s) => Some(s),
539            _ => None,
540        }
541    }
542}
543
544#[derive(Debug, Clone, PartialEq, Eq)]
545#[allow(dead_code)]
546enum State<Args, S> {
547    Uninitialized(Option<Box<Args>>),
548    Initializing(S),
549    Ready(S),
550    ShuttingDown,
551}
552
553impl<Args, S> State<Args, S> {
554    fn opt(&self) -> Option<&S> {
555        match &self {
556            State::Ready(s) => Some(s),
557            _ => None,
558        }
559    }
560
561    fn opt_mut(&mut self) -> Option<&mut S> {
562        match self {
563            State::Ready(s) => Some(s),
564            _ => None,
565        }
566    }
567}
568
569/// The language server driver.
570pub struct LsDriver<M, Args: Initializer> {
571    /// State to synchronize with the client.
572    state: State<Args, Args::S>,
573    /// The language server client.
574    pub client: LspClient,
575
576    // Handle maps
577    /// Events for dispatching.
578    pub events: EventMap<Args, Args::S>,
579    /// Extra commands provided with `textDocument/executeCommand`.
580    pub commands: ExecuteCmdMap<Args::S>,
581    /// Notifications for dispatching.
582    pub notifications: NotifyCmdMap<Args::S>,
583    /// Requests for dispatching.
584    pub requests: RegularCmdMap<Args::S>,
585    /// Resources for dispatching.
586    pub resources: ResourceMap<Args::S>,
587    _marker: std::marker::PhantomData<M>,
588}
589
590impl<M, Args: Initializer> LsDriver<M, Args> {
591    /// Gets the state of the language server.
592    pub fn state(&self) -> Option<&Args::S> {
593        self.state.opt()
594    }
595
596    /// Gets the mutable state of the language server.
597    pub fn state_mut(&mut self) -> Option<&mut Args::S> {
598        self.state.opt_mut()
599    }
600
601    /// Makes the language server ready.
602    pub fn ready(&mut self, params: Args::I) -> AnySchedulableResponse {
603        let args = match &mut self.state {
604            State::Uninitialized(args) => args,
605            _ => return just_result(Err(invalid_request("server is already initialized"))),
606        };
607
608        let args = args.take().expect("already initialized");
609        let (s, res) = args.initialize(params);
610        self.state = State::Ready(s);
611
612        res
613    }
614
615    /// Get static resources with help of tinymist service, for example, a
616    /// static help pages for some typst function.
617    pub fn get_resources(&mut self, req_id: RequestId, args: Vec<JsonValue>) -> ScheduledResult {
618        let s = self.state.opt_mut().ok_or_else(not_initialized)?;
619
620        let path =
621            from_value::<PathBuf>(args[0].clone()).map_err(|e| invalid_params(e.to_string()))?;
622
623        let Some(handler) = self.resources.get(path.as_path()) else {
624            log::error!("asked for unknown resource: {path:?}");
625            return Err(method_not_found());
626        };
627
628        // Note our redirection will keep the first path argument in the args vec.
629        handler(s, &self.client, req_id, args)
630    }
631}
632
633/// A helper function to create a `LspResponseFuture`
634pub fn just_ok<T, E>(res: T) -> Result<ResponseFuture<Result<T, E>>, E> {
635    Ok(futures::future::MaybeDone::Done(Ok(res)))
636}
637
638/// A helper function to create a `LspResponseFuture`
639pub fn just_result<T, E>(res: Result<T, E>) -> Result<ResponseFuture<Result<T, E>>, E> {
640    Ok(futures::future::MaybeDone::Done(res))
641}
642
643/// A helper function to create a `LspResponseFuture`
644pub fn just_future<T, E>(
645    fut: impl std::future::Future<Output = Result<T, E>> + Send + 'static,
646) -> Result<ResponseFuture<Result<T, E>>, E> {
647    Ok(futures::future::MaybeDone::Future(Box::pin(fut)))
648}
649
650/// Creates an invalid params error.
651pub fn invalid_params(msg: impl fmt::Display) -> ResponseError {
652    resp_err(ErrorCode::InvalidParams, msg)
653}
654
655/// Creates an internal error.
656pub fn internal_error(msg: impl fmt::Display) -> ResponseError {
657    resp_err(ErrorCode::InternalError, msg)
658}
659
660/// Creates a not initialized error.
661pub fn not_initialized() -> ResponseError {
662    resp_err(ErrorCode::ServerNotInitialized, "not initialized yet")
663}
664
665/// Creates a method not found error.
666pub fn method_not_found() -> ResponseError {
667    resp_err(ErrorCode::MethodNotFound, "method not found")
668}
669
670/// Creates an invalid request error.
671pub fn invalid_request(msg: impl fmt::Display) -> ResponseError {
672    resp_err(ErrorCode::InvalidRequest, msg)
673}
674
675fn from_json<T: serde::de::DeserializeOwned>(json: JsonValue) -> LspResult<T> {
676    serde_json::from_value(json).map_err(invalid_request)
677}
678
679fn raw_to_boxed<S: 'static, T: 'static>(handler: RawHandler<S, T>) -> BoxHandler<S, T> {
680    Box::new(move |s, _client, req_id, req| handler(s, req_id, req))
681}
682
683fn resp_err(code: ErrorCode, msg: impl fmt::Display) -> ResponseError {
684    ResponseError {
685        code: code as i32,
686        message: msg.to_string(),
687        data: None,
688    }
689}