realhydroper_lsp/service/client/
socket.rs1use std::pin::Pin;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures::channel::mpsc::Receiver;
9use futures::sink::Sink;
10use futures::stream::{FusedStream, Stream, StreamExt};
11
12use super::{ExitedError, Pending, ServerState, State};
13use crate::jsonrpc::{Request, Response};
14
15#[derive(Debug)]
17pub struct ClientSocket {
18 pub(super) rx: Receiver<Request>,
19 pub(super) pending: Arc<Pending>,
20 pub(super) state: Rc<ServerState>,
21}
22
23impl ClientSocket {
24 pub fn split(self) -> (RequestStream, ResponseSink) {
31 let ClientSocket { rx, pending, state } = self;
32 let state_ = state.clone();
33
34 (
35 RequestStream { rx, state: state_ },
36 ResponseSink { pending, state },
37 )
38 }
39}
40
41impl Stream for ClientSocket {
43 type Item = Request;
44
45 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46 if self.state.get() == State::Exited || self.rx.is_terminated() {
47 Poll::Ready(None)
48 } else {
49 self.rx.poll_next_unpin(cx)
50 }
51 }
52
53 fn size_hint(&self) -> (usize, Option<usize>) {
54 self.rx.size_hint()
55 }
56}
57
58impl FusedStream for ClientSocket {
59 fn is_terminated(&self) -> bool {
60 self.rx.is_terminated()
61 }
62}
63
64impl Sink<Response> for ClientSocket {
66 type Error = ExitedError;
67
68 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
69 if self.state.get() == State::Exited || self.rx.is_terminated() {
70 Poll::Ready(Err(ExitedError(())))
71 } else {
72 Poll::Ready(Ok(()))
73 }
74 }
75
76 fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
77 self.pending.insert(item);
78 Ok(())
79 }
80
81 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
82 Poll::Ready(Ok(()))
83 }
84
85 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86 Poll::Ready(Ok(()))
87 }
88}
89
90#[derive(Debug)]
92#[must_use = "streams do nothing unless polled"]
93pub struct RequestStream {
94 rx: Receiver<Request>,
95 state: Rc<ServerState>,
96}
97
98impl Stream for RequestStream {
99 type Item = Request;
100
101 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102 if self.state.get() == State::Exited || self.rx.is_terminated() {
103 Poll::Ready(None)
104 } else {
105 self.rx.poll_next_unpin(cx)
106 }
107 }
108
109 fn size_hint(&self) -> (usize, Option<usize>) {
110 self.rx.size_hint()
111 }
112}
113
114impl FusedStream for RequestStream {
115 fn is_terminated(&self) -> bool {
116 self.rx.is_terminated()
117 }
118}
119
120#[derive(Debug)]
122pub struct ResponseSink {
123 pending: Arc<Pending>,
124 state: Rc<ServerState>,
125}
126
127impl Sink<Response> for ResponseSink {
128 type Error = ExitedError;
129
130 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
131 if self.state.get() == State::Exited {
132 Poll::Ready(Err(ExitedError(())))
133 } else {
134 Poll::Ready(Ok(()))
135 }
136 }
137
138 fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
139 self.pending.insert(item);
140 Ok(())
141 }
142
143 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
144 Poll::Ready(Ok(()))
145 }
146
147 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
148 Poll::Ready(Ok(()))
149 }
150}