sync_ls/
lib.rs

1//! A synchronous language server implementation.
2
3#[cfg(feature = "dap")]
4pub mod dap;
5#[cfg(feature = "lsp")]
6pub mod lsp;
7pub mod req_queue;
8pub mod transport;
9
10pub use error::*;
11pub use msg::*;
12pub use server::*;
13
14mod error;
15mod msg;
16mod server;
17
18use std::any::Any;
19use std::pin::Pin;
20use std::sync::{Arc, Weak};
21use std::time::Instant;
22
23use futures::future::MaybeDone;
24use parking_lot::Mutex;
25use serde::Serialize;
26use serde_json::Value as JsonValue;
27
28type Event = Box<dyn Any + Send>;
29
30/// The sender of the language server.
31#[derive(Debug, Clone)]
32pub struct TConnectionTx<M> {
33    /// The sender of the events.
34    pub event: crossbeam_channel::Sender<Event>,
35    /// The sender of the LSP messages.
36    pub lsp: crossbeam_channel::Sender<Message>,
37    marker: std::marker::PhantomData<M>,
38}
39
40/// The sender of the language server.
41#[derive(Debug, Clone)]
42pub struct TConnectionRx<M> {
43    /// The receiver of the events.
44    pub event: crossbeam_channel::Receiver<Event>,
45    /// The receiver of the LSP messages.
46    pub lsp: crossbeam_channel::Receiver<Message>,
47    marker: std::marker::PhantomData<M>,
48}
49
50impl<M: TryFrom<Message, Error = anyhow::Error>> TConnectionRx<M> {
51    /// Receives a message or an event.
52    fn recv(&self) -> anyhow::Result<EventOrMessage<M>> {
53        crossbeam_channel::select_biased! {
54            recv(self.lsp) -> msg => Ok(EventOrMessage::Msg(msg?.try_into()?)),
55            recv(self.event) -> event => Ok(event.map(EventOrMessage::Evt)?),
56        }
57    }
58}
59
60/// The untyped connect tx for the language server.
61pub type ConnectionTx = TConnectionTx<Message>;
62/// The untyped connect rx for the language server.
63pub type ConnectionRx = TConnectionRx<Message>;
64
65/// This is a helper enum to handle both events and messages.
66enum EventOrMessage<M> {
67    Evt(Event),
68    Msg(M),
69}
70
71/// Connection is just a pair of channels of LSP messages.
72pub struct Connection<M> {
73    /// The senders of the connection.
74    pub sender: TConnectionTx<M>,
75    /// The receivers of the connection.
76    pub receiver: TConnectionRx<M>,
77}
78
79impl<M: TryFrom<Message, Error = anyhow::Error>> From<Connection<Message>> for Connection<M> {
80    fn from(conn: Connection<Message>) -> Self {
81        Self {
82            sender: TConnectionTx {
83                event: conn.sender.event,
84                lsp: conn.sender.lsp,
85                marker: std::marker::PhantomData,
86            },
87            receiver: TConnectionRx {
88                event: conn.receiver.event,
89                lsp: conn.receiver.lsp,
90                marker: std::marker::PhantomData,
91            },
92        }
93    }
94}
95
96impl<M: TryFrom<Message, Error = anyhow::Error>> From<TConnectionTx<M>> for ConnectionTx {
97    fn from(conn: TConnectionTx<M>) -> Self {
98        Self {
99            event: conn.event,
100            lsp: conn.lsp,
101            marker: std::marker::PhantomData,
102        }
103    }
104}
105
106/// The common error type for the language server.
107pub use msg::ResponseError;
108/// The common result type for the language server.
109pub type LspResult<T> = Result<T, ResponseError>;
110/// A future that may be done in place or not.
111pub type ResponseFuture<T> = MaybeDone<Pin<Box<dyn std::future::Future<Output = T> + Send>>>;
112/// A future that may be rejected before actual started.
113pub type LspResponseFuture<T> = LspResult<ResponseFuture<T>>;
114/// A future that could be rejected by common error in `LspResponseFuture`.
115pub type SchedulableResponse<T> = LspResponseFuture<LspResult<T>>;
116/// The common future type for the language server.
117pub type AnySchedulableResponse = SchedulableResponse<JsonValue>;
118/// The result of a scheduled response which could be finally caught by
119/// `schedule_tail`.
120/// - Returns Ok(Some()) -> Already responded
121/// - Returns Ok(None) -> Need to respond none
122/// - Returns Err(..) -> Need to respond error
123pub type ScheduledResult = LspResult<Option<()>>;
124
125/// A helper function to create a `LspResponseFuture`
126pub fn just_ok<T, E>(res: T) -> Result<ResponseFuture<Result<T, E>>, E> {
127    Ok(futures::future::MaybeDone::Done(Ok(res)))
128}
129/// A helper function to create a `LspResponseFuture`
130pub fn just_result<T, E>(res: Result<T, E>) -> Result<ResponseFuture<Result<T, E>>, E> {
131    Ok(futures::future::MaybeDone::Done(res))
132}
133/// A helper function to create a `LspResponseFuture`
134pub fn just_future<T, E>(
135    fut: impl std::future::Future<Output = Result<T, E>> + Send + 'static,
136) -> Result<ResponseFuture<Result<T, E>>, E> {
137    Ok(futures::future::MaybeDone::Future(Box::pin(fut)))
138}
139
140type AnyCaster<S> = Arc<dyn Fn(&mut dyn Any) -> &mut S + Send + Sync>;
141
142/// A Lsp client with typed service `S`.
143pub struct TypedLspClient<S> {
144    client: LspClient,
145    caster: AnyCaster<S>,
146}
147
148impl<S> TypedLspClient<S> {
149    /// Converts the client to an untyped client.
150    pub fn to_untyped(self) -> LspClient {
151        self.client
152    }
153}
154
155impl<S: 'static> TypedLspClient<S> {
156    /// Returns the untyped lsp client.
157    pub fn untyped(&self) -> &LspClient {
158        &self.client
159    }
160
161    /// Casts the service to another type.
162    pub fn cast<T: 'static>(&self, f: fn(&mut S) -> &mut T) -> TypedLspClient<T> {
163        let caster = self.caster.clone();
164        TypedLspClient {
165            client: self.client.clone(),
166            caster: Arc::new(move |s| f(caster(s))),
167        }
168    }
169
170    /// Sends a event to the client itself.
171    pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
172        let Some(sender) = self.sender.upgrade() else {
173            log::warn!("failed to send request: connection closed");
174            return;
175        };
176
177        let Err(res) = sender.event.send(Box::new(event)) else {
178            return;
179        };
180        log::warn!("failed to send event: {res:?}");
181    }
182}
183
184impl<S> Clone for TypedLspClient<S> {
185    fn clone(&self) -> Self {
186        Self {
187            client: self.client.clone(),
188            caster: self.caster.clone(),
189        }
190    }
191}
192
193impl<S> std::ops::Deref for TypedLspClient<S> {
194    type Target = LspClient;
195
196    fn deref(&self) -> &Self::Target {
197        &self.client
198    }
199}
200
201/// The root of the language server host.
202/// Will close connection when dropped.
203#[derive(Debug, Clone)]
204pub struct LspClientRoot {
205    weak: LspClient,
206    _strong: Arc<ConnectionTx>,
207}
208
209impl LspClientRoot {
210    /// Creates a new language server host.
211    pub fn new<M: TryFrom<Message, Error = anyhow::Error> + GetMessageKind>(
212        handle: tokio::runtime::Handle,
213        sender: TConnectionTx<M>,
214    ) -> Self {
215        let _strong = Arc::new(sender.into());
216        let weak = LspClient {
217            handle,
218            msg_kind: M::get_message_kind(),
219            sender: Arc::downgrade(&_strong),
220            req_queue: Arc::new(Mutex::new(ReqQueue::default())),
221        };
222        Self { weak, _strong }
223    }
224
225    /// Returns the weak reference to the language server host.
226    pub fn weak(&self) -> LspClient {
227        self.weak.clone()
228    }
229}
230
231type ReqHandler = Box<dyn for<'a> FnOnce(&'a mut dyn Any, LspOrDapResponse) + Send + Sync>;
232type ReqQueue = req_queue::ReqQueue<(String, Instant), ReqHandler>;
233
234/// The host for the language server, or known as the LSP client.
235#[derive(Debug, Clone)]
236pub struct LspClient {
237    /// The tokio handle.
238    pub handle: tokio::runtime::Handle,
239
240    msg_kind: MessageKind,
241    sender: Weak<ConnectionTx>,
242    req_queue: Arc<Mutex<ReqQueue>>,
243}
244
245impl LspClient {
246    /// Returns the untyped lsp client.
247    pub fn untyped(&self) -> &Self {
248        self
249    }
250
251    /// converts the client to a typed client.
252    pub fn to_typed<S: Any>(&self) -> TypedLspClient<S> {
253        TypedLspClient {
254            client: self.clone(),
255            caster: Arc::new(|s| s.downcast_mut().expect("invalid cast")),
256        }
257    }
258
259    /// Checks if there are pending requests.
260    pub fn has_pending_requests(&self) -> bool {
261        self.req_queue.lock().incoming.has_pending()
262    }
263
264    /// Prints states of the request queue and panics.
265    pub fn begin_panic(&self) {
266        self.req_queue.lock().begin_panic();
267    }
268
269    /// Sends a event to the server itself.
270    pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
271        let Some(sender) = self.sender.upgrade() else {
272            log::warn!("failed to send request: connection closed");
273            return;
274        };
275
276        if let Err(res) = sender.event.send(Box::new(event)) {
277            log::warn!("failed to send event: {res:?}");
278        }
279    }
280
281    /// Completes an server2client request in the request queue.
282    #[cfg(feature = "lsp")]
283    pub fn complete_lsp_request<S: Any>(&self, service: &mut S, response: lsp::Response) {
284        let mut req_queue = self.req_queue.lock();
285        let Some(handler) = req_queue.outgoing.complete(response.id.clone()) else {
286            log::warn!("received response for unknown request");
287            return;
288        };
289        drop(req_queue);
290        handler(service, response.into())
291    }
292
293    /// Completes an server2client request in the request queue.
294    #[cfg(feature = "dap")]
295    pub fn complete_dap_request<S: Any>(&self, service: &mut S, response: dap::Response) {
296        let mut req_queue = self.req_queue.lock();
297        let Some(handler) = req_queue
298            .outgoing
299            // todo: casting i64 to i32
300            .complete((response.request_seq as i32).into())
301        else {
302            log::warn!("received response for unknown request");
303            return;
304        };
305        drop(req_queue);
306        handler(service, response.into())
307    }
308
309    /// Registers an client2server request in the request queue.
310    pub fn register_request(&self, method: &str, id: &RequestId, received_at: Instant) {
311        let mut req_queue = self.req_queue.lock();
312        self.start_request(id, method);
313        req_queue
314            .incoming
315            .register(id.clone(), (method.to_owned(), received_at));
316    }
317
318    /// Responds a typed result to the client.
319    pub fn respond_result<T: Serialize>(&self, id: RequestId, result: LspResult<T>) {
320        let result = result.and_then(|t| serde_json::to_value(t).map_err(internal_error));
321        self.respond_any_result(id, result);
322    }
323
324    fn respond_any_result(&self, id: RequestId, result: LspResult<JsonValue>) {
325        let req_id = id.clone();
326        let msg: Message = match (self.msg_kind, result) {
327            #[cfg(feature = "lsp")]
328            (MessageKind::Lsp, Ok(resp)) => lsp::Response::new_ok(id, resp).into(),
329            #[cfg(feature = "lsp")]
330            (MessageKind::Lsp, Err(e)) => lsp::Response::new_err(id, e.code, e.message).into(),
331            #[cfg(feature = "dap")]
332            (MessageKind::Dap, Ok(resp)) => dap::Response::success(RequestId::dap(id), resp).into(),
333            #[cfg(feature = "dap")]
334            (MessageKind::Dap, Err(e)) => {
335                dap::Response::error(RequestId::dap(id), Some(e.message), None).into()
336            }
337        };
338
339        self.respond(req_id, msg);
340    }
341
342    /// Completes an client2server request in the request queue.
343    pub fn respond(&self, id: RequestId, response: Message) {
344        let mut req_queue = self.req_queue.lock();
345        let Some((method, received_at)) = req_queue.incoming.complete(&id) else {
346            return;
347        };
348
349        self.stop_request(&id, &method, received_at);
350
351        let Some(sender) = self.sender.upgrade() else {
352            log::warn!("failed to send response ({method}, {id}): connection closed");
353            return;
354        };
355        if let Err(res) = sender.lsp.send(response) {
356            log::warn!("failed to send response ({method}, {id}): {res:?}");
357        }
358    }
359}
360
361impl LspClient {
362    /// Schedules a request from the client.
363    pub fn schedule<T: Serialize + 'static>(
364        &self,
365        req_id: RequestId,
366        resp: SchedulableResponse<T>,
367    ) -> ScheduledResult {
368        let resp = resp?;
369
370        use futures::future::MaybeDone::*;
371        match resp {
372            Done(output) => {
373                self.respond_result(req_id, output);
374            }
375            Future(fut) => {
376                let client = self.clone();
377                let req_id = req_id.clone();
378                self.handle.spawn(async move {
379                    client.respond_result(req_id, fut.await);
380                });
381            }
382            Gone => {
383                log::warn!("response for request({req_id:?}) already taken");
384            }
385        };
386
387        Ok(Some(()))
388    }
389
390    /// Catch the early rejected requests.
391    fn schedule_tail(&self, req_id: RequestId, resp: ScheduledResult) {
392        match resp {
393            // Already responded
394            Ok(Some(())) => {}
395            // The requests that doesn't start.
396            _ => self.respond_result(req_id, resp),
397        }
398    }
399}
400
401impl LspClient {
402    fn start_request(&self, req_id: &RequestId, method: &str) {
403        log::info!("handling {method} - ({req_id})");
404    }
405
406    fn stop_request(&self, req_id: &RequestId, method: &str, received_at: Instant) {
407        let duration = received_at.elapsed();
408        log::info!("handled  {method} - ({req_id}) in {duration:0.2?}");
409    }
410
411    fn start_notification(&self, method: &str) {
412        log::info!("notifying {method}");
413    }
414
415    fn stop_notification(&self, method: &str, received_at: Instant, result: LspResult<()>) {
416        let request_duration = received_at.elapsed();
417        if let Err(err) = result {
418            log::error!("notify {method} failed in {request_duration:0.2?}: {err:?}");
419        } else {
420            log::info!("notify {method} succeeded in {request_duration:0.2?}");
421        }
422    }
423}