1use std::future::Future;
2use std::ops::ControlFlow;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use futures::channel::oneshot;
7use futures::FutureExt;
8use tower_service::Service;
9
10use crate::{
11 AnyEvent, AnyNotification, AnyRequest, AnyResponse, ClientSocket, ErrorCode, LspService,
12 MainLoopEvent, Message, PeerSocket, ResponseError, Result, ServerSocket,
13};
14
15pub struct PeerSocketResponseFuture {
16 rx: oneshot::Receiver<AnyResponse>,
17}
18
19impl Future for PeerSocketResponseFuture {
20 type Output = Result<serde_json::Value, ResponseError>;
21
22 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
23 match self.rx.poll_unpin(cx) {
24 Poll::Ready(Ok(resp)) => Poll::Ready(match resp.error {
25 None => Ok(resp.result.unwrap_or_default()),
26 Some(resp_err) => Err(resp_err),
27 }),
28 Poll::Ready(Err(_closed)) => Poll::Ready(Err(ResponseError::new(
29 ErrorCode::INTERNAL_ERROR,
30 "forwarding service stopped",
31 ))),
32 Poll::Pending => Poll::Pending,
33 }
34 }
35}
36
37impl PeerSocket {
38 fn on_call(&mut self, req: AnyRequest) -> PeerSocketResponseFuture {
39 let (tx, rx) = oneshot::channel();
40 let _: Result<_, _> = self.send(MainLoopEvent::OutgoingRequest(req, tx));
41 PeerSocketResponseFuture { rx }
42 }
43
44 fn on_notify(&mut self, notif: AnyNotification) -> ControlFlow<Result<()>> {
45 match self.send(MainLoopEvent::Outgoing(Message::Notification(notif))) {
46 Ok(()) => ControlFlow::Continue(()),
47 Err(err) => ControlFlow::Break(Err(err)),
48 }
49 }
50
51 fn on_emit(&mut self, event: AnyEvent) -> ControlFlow<Result<()>> {
52 match self.send(MainLoopEvent::Any(event)) {
53 Ok(()) => ControlFlow::Continue(()),
54 Err(err) => ControlFlow::Break(Err(err)),
55 }
56 }
57}
58
59macro_rules! define_socket_wrappers {
60 ($($ty:ty),*) => {
61 $(
62 impl Service<AnyRequest> for $ty {
63 type Response = serde_json::Value;
64 type Error = ResponseError;
65 type Future = PeerSocketResponseFuture;
66
67 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
68 Poll::Ready(Ok(()))
70 }
71
72 fn call(&mut self, req: AnyRequest) -> Self::Future {
73 self.0.on_call(req)
74 }
75 }
76
77 impl LspService for $ty {
78 fn notify(&mut self, notif: AnyNotification) -> ControlFlow<Result<()>> {
79 self.0.on_notify(notif)
80 }
81
82 fn emit(&mut self, event: AnyEvent) -> ControlFlow<Result<()>> {
83 self.0.on_emit(event)
84 }
85 }
86 )*
87 };
88}
89
90define_socket_wrappers!(ClientSocket, ServerSocket);