realhydroper_lsp/service/client/
socket.rs

1//! Loopback connection to the language client.
2
3use std::pin::Pin;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures::channel::mpsc::Receiver;
9use futures::sink::Sink;
10use futures::stream::{FusedStream, Stream, StreamExt};
11
12use super::{ExitedError, Pending, ServerState, State};
13use crate::jsonrpc::{Request, Response};
14
15/// A loopback channel for server-to-client communication.
16#[derive(Debug)]
17pub struct ClientSocket {
18    pub(super) rx: Receiver<Request>,
19    pub(super) pending: Arc<Pending>,
20    pub(super) state: Rc<ServerState>,
21}
22
23impl ClientSocket {
24    /// Splits this `ClientSocket` into two halves capable of operating independently.
25    ///
26    /// The two halves returned implement the [`Stream`] and [`Sink`] traits, respectively.
27    ///
28    /// [`Stream`]: futures::Stream
29    /// [`Sink`]: futures::Sink
30    pub fn split(self) -> (RequestStream, ResponseSink) {
31        let ClientSocket { rx, pending, state } = self;
32        let state_ = state.clone();
33
34        (
35            RequestStream { rx, state: state_ },
36            ResponseSink { pending, state },
37        )
38    }
39}
40
41/// Yields a stream of pending server-to-client requests.
42impl Stream for ClientSocket {
43    type Item = Request;
44
45    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46        if self.state.get() == State::Exited || self.rx.is_terminated() {
47            Poll::Ready(None)
48        } else {
49            self.rx.poll_next_unpin(cx)
50        }
51    }
52
53    fn size_hint(&self) -> (usize, Option<usize>) {
54        self.rx.size_hint()
55    }
56}
57
58impl FusedStream for ClientSocket {
59    fn is_terminated(&self) -> bool {
60        self.rx.is_terminated()
61    }
62}
63
64/// Routes client-to-server responses back to the server.
65impl Sink<Response> for ClientSocket {
66    type Error = ExitedError;
67
68    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
69        if self.state.get() == State::Exited || self.rx.is_terminated() {
70            Poll::Ready(Err(ExitedError(())))
71        } else {
72            Poll::Ready(Ok(()))
73        }
74    }
75
76    fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
77        self.pending.insert(item);
78        Ok(())
79    }
80
81    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
82        Poll::Ready(Ok(()))
83    }
84
85    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86        Poll::Ready(Ok(()))
87    }
88}
89
90/// Yields a stream of pending server-to-client requests.
91#[derive(Debug)]
92#[must_use = "streams do nothing unless polled"]
93pub struct RequestStream {
94    rx: Receiver<Request>,
95    state: Rc<ServerState>,
96}
97
98impl Stream for RequestStream {
99    type Item = Request;
100
101    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102        if self.state.get() == State::Exited || self.rx.is_terminated() {
103            Poll::Ready(None)
104        } else {
105            self.rx.poll_next_unpin(cx)
106        }
107    }
108
109    fn size_hint(&self) -> (usize, Option<usize>) {
110        self.rx.size_hint()
111    }
112}
113
114impl FusedStream for RequestStream {
115    fn is_terminated(&self) -> bool {
116        self.rx.is_terminated()
117    }
118}
119
120/// Routes client-to-server responses back to the server.
121#[derive(Debug)]
122pub struct ResponseSink {
123    pending: Arc<Pending>,
124    state: Rc<ServerState>,
125}
126
127impl Sink<Response> for ResponseSink {
128    type Error = ExitedError;
129
130    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
131        if self.state.get() == State::Exited {
132            Poll::Ready(Err(ExitedError(())))
133        } else {
134            Poll::Ready(Ok(()))
135        }
136    }
137
138    fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
139        self.pending.insert(item);
140        Ok(())
141    }
142
143    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
144        Poll::Ready(Ok(()))
145    }
146
147    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
148        Poll::Ready(Ok(()))
149    }
150}