remoc/chmux/
listener.rs

1use futures::{
2    ready,
3    stream::Stream,
4    task::{Context, Poll},
5};
6use std::{error::Error, fmt, pin::Pin, sync::Arc};
7use tokio::sync::{Mutex, mpsc, oneshot};
8use tokio_util::sync::ReusableBoxFuture;
9
10use super::{
11    mux::PortEvt,
12    port_allocator::{PortAllocator, PortNumber},
13    receiver::Receiver,
14    sender::Sender,
15};
16use crate::exec;
17
18/// An multiplexer listener error.
19#[derive(Debug, Clone)]
20#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
21pub enum ListenerError {
22    /// All local ports are in use.
23    LocalPortsExhausted,
24    /// A multiplexer error has occurred or it has been terminated.
25    MultiplexerError,
26}
27
28impl fmt::Display for ListenerError {
29    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
30        match self {
31            Self::LocalPortsExhausted => write!(f, "all local ports are in use"),
32            Self::MultiplexerError => write!(f, "multiplexer error"),
33        }
34    }
35}
36
37impl Error for ListenerError {}
38
39impl From<ListenerError> for std::io::Error {
40    fn from(err: ListenerError) -> Self {
41        use std::io::ErrorKind;
42        match err {
43            ListenerError::LocalPortsExhausted => Self::new(ErrorKind::AddrInUse, err.to_string()),
44            ListenerError::MultiplexerError => Self::new(ErrorKind::ConnectionReset, err.to_string()),
45        }
46    }
47}
48
49/// A connection request by the remote endpoint.
50///
51/// Dropping the request rejects it.
52pub struct Request {
53    remote_port: u32,
54    id: u32,
55    wait: bool,
56    allocator: PortAllocator,
57    tx: mpsc::Sender<PortEvt>,
58    done_tx: Option<oneshot::Sender<()>>,
59}
60
61impl fmt::Debug for Request {
62    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
63        f.debug_struct("Request")
64            .field("remote_port", &self.remote_port)
65            .field("id", &self.id)
66            .field("wait", &self.wait)
67            .finish()
68    }
69}
70
71impl Request {
72    pub(crate) fn new(
73        remote_port: u32, id: u32, wait: bool, allocator: PortAllocator, tx: mpsc::Sender<PortEvt>,
74    ) -> Self {
75        let (done_tx, done_rx) = oneshot::channel();
76        let drop_tx = tx.clone();
77        exec::spawn(async move {
78            if done_rx.await.is_err() {
79                let _ = drop_tx.send(PortEvt::Rejected { remote_port, no_ports: false }).await;
80            }
81        });
82
83        Self { remote_port, id, wait, allocator, tx, done_tx: Some(done_tx) }
84    }
85
86    /// The remote port number.
87    pub fn remote_port(&self) -> u32 {
88        self.remote_port
89    }
90
91    /// The remotely provided id.
92    ///
93    /// If no id was provided, this returns the [`remote port`](Self::remote_port).
94    pub fn id(&self) -> u32 {
95        self.id
96    }
97
98    /// Indicates whether the handler of the request should wait for a local
99    /// port to become available, if all are currently in use.
100    pub fn is_wait(&self) -> bool {
101        self.wait
102    }
103
104    /// Accepts the request using a newly allocated local port.
105    pub async fn accept(self) -> Result<(Sender, Receiver), ListenerError> {
106        let local_port = if self.wait {
107            self.allocator.allocate().await
108        } else {
109            match self.allocator.try_allocate() {
110                Some(local_port) => local_port,
111                None => {
112                    self.reject(true).await;
113                    return Err(ListenerError::LocalPortsExhausted);
114                }
115            }
116        };
117
118        self.accept_from(local_port).await
119    }
120
121    /// Accepts the request using the specified local port.
122    pub async fn accept_from(mut self, local_port: PortNumber) -> Result<(Sender, Receiver), ListenerError> {
123        let (port_tx, port_rx) = oneshot::channel();
124        let _ = self.tx.send(PortEvt::Accepted { local_port, remote_port: self.remote_port, port_tx }).await;
125        let _ = self.done_tx.take().unwrap().send(());
126
127        port_rx.await.map_err(|_| ListenerError::MultiplexerError)
128    }
129
130    /// Rejects the connect request.
131    ///
132    /// Setting `no_ports` to true indicates to the remote endpoint that the request
133    /// was rejected because no local port could be allocated.
134    pub async fn reject(mut self, no_ports: bool) {
135        let _ = self.tx.send(PortEvt::Rejected { remote_port: self.remote_port, no_ports }).await;
136        let _ = self.done_tx.take().unwrap().send(());
137    }
138}
139
140impl Drop for Request {
141    fn drop(&mut self) {
142        // required for correct drop order
143    }
144}
145
146/// Remote connect message.
147pub(crate) enum RemoteConnectMsg {
148    /// Remote connect request.
149    Request(Request),
150    /// Client of remote endpoint has been dropped.
151    ClientDropped,
152}
153
154/// Multiplexer listener.
155pub struct Listener {
156    wait_rx: mpsc::Receiver<RemoteConnectMsg>,
157    no_wait_rx: mpsc::Receiver<RemoteConnectMsg>,
158    port_allocator: PortAllocator,
159    terminate_tx: mpsc::UnboundedSender<()>,
160    closed: bool,
161}
162
163impl fmt::Debug for Listener {
164    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
165        f.debug_struct("Listener").field("port_allocator", &self.port_allocator).finish()
166    }
167}
168
169impl Listener {
170    pub(crate) fn new(
171        wait_rx: mpsc::Receiver<RemoteConnectMsg>, no_wait_rx: mpsc::Receiver<RemoteConnectMsg>,
172        port_allocator: PortAllocator, terminate_tx: mpsc::UnboundedSender<()>,
173    ) -> Self {
174        Self { wait_rx, no_wait_rx, port_allocator, terminate_tx, closed: false }
175    }
176
177    /// Obtains the port allocator.
178    pub fn port_allocator(&self) -> PortAllocator {
179        self.port_allocator.clone()
180    }
181
182    /// Accept a connection returning the sender and receiver for the opened port.
183    ///
184    /// Returns [None] when the client of the remote endpoint has been dropped and
185    /// no more connection requests can be made.
186    pub async fn accept(&mut self) -> Result<Option<(Sender, Receiver)>, ListenerError> {
187        if self.closed {
188            return Ok(None);
189        }
190
191        loop {
192            tokio::select! {
193                local_port = self.port_allocator.allocate() => {
194                    match self.inspect().await? {
195                        Some(req) => break Ok(Some(req.accept_from(local_port).await?)),
196                        None => break Ok(None),
197                    }
198                },
199
200                no_wait_req_opt = self.no_wait_rx.recv() => {
201                    match no_wait_req_opt {
202                        Some(RemoteConnectMsg::Request(no_wait_req)) => {
203                            match self.port_allocator.try_allocate() {
204                                Some(local_port) => break Ok(Some(no_wait_req.accept_from(local_port).await?)),
205                                None => no_wait_req.reject(true).await,
206                            }
207                        },
208                        Some(RemoteConnectMsg::ClientDropped) => {
209                            self.closed = true;
210                            break Ok(None);
211                        },
212                        None => break Err(ListenerError::MultiplexerError),
213                    }
214                },
215            }
216        }
217    }
218
219    /// Obtains the next connection request from the remote endpoint.
220    ///
221    /// Connection requests can be stored and accepted or rejected at a later time.
222    /// The maximum number of unanswered connection requests is specified in the
223    /// configuration. If this number is reached, the remote endpoint will
224    /// not send any more connection requests.
225    ///
226    /// Returns [None] when the client of the remote endpoint has been dropped and
227    /// no more connection requests can be made.
228    pub async fn inspect(&mut self) -> Result<Option<Request>, ListenerError> {
229        if self.closed {
230            return Ok(None);
231        }
232
233        let req_opt = tokio::select! {
234            req_opt = self.wait_rx.recv() => req_opt,
235            req_opt = self.no_wait_rx.recv() => req_opt,
236        };
237
238        match req_opt {
239            Some(RemoteConnectMsg::Request(req)) => Ok(Some(req)),
240            Some(RemoteConnectMsg::ClientDropped) => {
241                self.closed = true;
242                Ok(None)
243            }
244            None => Err(ListenerError::MultiplexerError),
245        }
246    }
247
248    /// Convert this into a listener stream.
249    pub fn into_stream(self) -> ListenerStream {
250        ListenerStream::new(self)
251    }
252
253    /// Terminates the multiplexer, forcibly closing all open ports.
254    pub fn terminate(&self) {
255        let _ = self.terminate_tx.send(());
256    }
257}
258
259impl Drop for Listener {
260    fn drop(&mut self) {
261        // required for correct drop order
262    }
263}
264
265/// A stream accepting connections and returning senders and receivers.
266///
267/// Ends when the client is dropped at the remote endpoint.
268pub struct ListenerStream {
269    server: Arc<Mutex<Listener>>,
270    #[allow(clippy::type_complexity)]
271    accept_fut: Option<ReusableBoxFuture<'static, Option<Result<(Sender, Receiver), ListenerError>>>>,
272}
273
274impl ListenerStream {
275    fn new(server: Listener) -> Self {
276        Self { server: Arc::new(Mutex::new(server)), accept_fut: None }
277    }
278
279    async fn accept(server: Arc<Mutex<Listener>>) -> Option<Result<(Sender, Receiver), ListenerError>> {
280        let mut server = server.lock().await;
281        server.accept().await.transpose()
282    }
283
284    fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Result<(Sender, Receiver), ListenerError>>> {
285        if self.accept_fut.is_none() {
286            self.accept_fut = Some(ReusableBoxFuture::new(Self::accept(self.server.clone())));
287        }
288
289        let accept_fut = self.accept_fut.as_mut().unwrap();
290        let res = ready!(accept_fut.poll(cx));
291
292        self.accept_fut = None;
293        Poll::Ready(res)
294    }
295}
296
297impl Stream for ListenerStream {
298    type Item = Result<(Sender, Receiver), ListenerError>;
299
300    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
301        Pin::into_inner(self).poll_next(cx)
302    }
303}