async_lsp/
forward.rs

1use std::future::Future;
2use std::ops::ControlFlow;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use futures::channel::oneshot;
7use futures::FutureExt;
8use tower_service::Service;
9
10use crate::{
11    AnyEvent, AnyNotification, AnyRequest, AnyResponse, ClientSocket, ErrorCode, LspService,
12    MainLoopEvent, Message, PeerSocket, ResponseError, Result, ServerSocket,
13};
14
15pub struct PeerSocketResponseFuture {
16    rx: oneshot::Receiver<AnyResponse>,
17}
18
19impl Future for PeerSocketResponseFuture {
20    type Output = Result<serde_json::Value, ResponseError>;
21
22    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
23        match self.rx.poll_unpin(cx) {
24            Poll::Ready(Ok(resp)) => Poll::Ready(match resp.error {
25                None => Ok(resp.result.unwrap_or_default()),
26                Some(resp_err) => Err(resp_err),
27            }),
28            Poll::Ready(Err(_closed)) => Poll::Ready(Err(ResponseError::new(
29                ErrorCode::INTERNAL_ERROR,
30                "forwarding service stopped",
31            ))),
32            Poll::Pending => Poll::Pending,
33        }
34    }
35}
36
37impl PeerSocket {
38    fn on_call(&mut self, req: AnyRequest) -> PeerSocketResponseFuture {
39        let (tx, rx) = oneshot::channel();
40        let _: Result<_, _> = self.send(MainLoopEvent::OutgoingRequest(req, tx));
41        PeerSocketResponseFuture { rx }
42    }
43
44    fn on_notify(&mut self, notif: AnyNotification) -> ControlFlow<Result<()>> {
45        match self.send(MainLoopEvent::Outgoing(Message::Notification(notif))) {
46            Ok(()) => ControlFlow::Continue(()),
47            Err(err) => ControlFlow::Break(Err(err)),
48        }
49    }
50
51    fn on_emit(&mut self, event: AnyEvent) -> ControlFlow<Result<()>> {
52        match self.send(MainLoopEvent::Any(event)) {
53            Ok(()) => ControlFlow::Continue(()),
54            Err(err) => ControlFlow::Break(Err(err)),
55        }
56    }
57}
58
59macro_rules! define_socket_wrappers {
60    ($($ty:ty),*) => {
61        $(
62        impl Service<AnyRequest> for $ty {
63            type Response = serde_json::Value;
64            type Error = ResponseError;
65            type Future = PeerSocketResponseFuture;
66
67            fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
68                // `*Socket` has an unbounded buffer, thus is always ready.
69                Poll::Ready(Ok(()))
70            }
71
72            fn call(&mut self, req: AnyRequest) -> Self::Future {
73                self.0.on_call(req)
74            }
75        }
76
77        impl LspService for $ty {
78            fn notify(&mut self, notif: AnyNotification) -> ControlFlow<Result<()>> {
79                self.0.on_notify(notif)
80            }
81
82            fn emit(&mut self, event: AnyEvent) -> ControlFlow<Result<()>> {
83                self.0.on_emit(event)
84            }
85        }
86        )*
87    };
88}
89
90define_socket_wrappers!(ClientSocket, ServerSocket);