1use futures::{
2 ready,
3 stream::Stream,
4 task::{Context, Poll},
5};
6use std::{error::Error, fmt, pin::Pin, sync::Arc};
7use tokio::sync::{Mutex, mpsc, oneshot};
8use tokio_util::sync::ReusableBoxFuture;
9
10use super::{
11 mux::PortEvt,
12 port_allocator::{PortAllocator, PortNumber},
13 receiver::Receiver,
14 sender::Sender,
15};
16use crate::exec;
17
18#[derive(Debug, Clone)]
20#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
21pub enum ListenerError {
22 LocalPortsExhausted,
24 MultiplexerError,
26}
27
28impl fmt::Display for ListenerError {
29 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
30 match self {
31 Self::LocalPortsExhausted => write!(f, "all local ports are in use"),
32 Self::MultiplexerError => write!(f, "multiplexer error"),
33 }
34 }
35}
36
37impl Error for ListenerError {}
38
39impl From<ListenerError> for std::io::Error {
40 fn from(err: ListenerError) -> Self {
41 use std::io::ErrorKind;
42 match err {
43 ListenerError::LocalPortsExhausted => Self::new(ErrorKind::AddrInUse, err.to_string()),
44 ListenerError::MultiplexerError => Self::new(ErrorKind::ConnectionReset, err.to_string()),
45 }
46 }
47}
48
49pub struct Request {
53 remote_port: u32,
54 id: u32,
55 wait: bool,
56 allocator: PortAllocator,
57 tx: mpsc::Sender<PortEvt>,
58 done_tx: Option<oneshot::Sender<()>>,
59}
60
61impl fmt::Debug for Request {
62 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
63 f.debug_struct("Request")
64 .field("remote_port", &self.remote_port)
65 .field("id", &self.id)
66 .field("wait", &self.wait)
67 .finish()
68 }
69}
70
71impl Request {
72 pub(crate) fn new(
73 remote_port: u32, id: u32, wait: bool, allocator: PortAllocator, tx: mpsc::Sender<PortEvt>,
74 ) -> Self {
75 let (done_tx, done_rx) = oneshot::channel();
76 let drop_tx = tx.clone();
77 exec::spawn(async move {
78 if done_rx.await.is_err() {
79 let _ = drop_tx.send(PortEvt::Rejected { remote_port, no_ports: false }).await;
80 }
81 });
82
83 Self { remote_port, id, wait, allocator, tx, done_tx: Some(done_tx) }
84 }
85
86 pub fn remote_port(&self) -> u32 {
88 self.remote_port
89 }
90
91 pub fn id(&self) -> u32 {
95 self.id
96 }
97
98 pub fn is_wait(&self) -> bool {
101 self.wait
102 }
103
104 pub async fn accept(self) -> Result<(Sender, Receiver), ListenerError> {
106 let local_port = if self.wait {
107 self.allocator.allocate().await
108 } else {
109 match self.allocator.try_allocate() {
110 Some(local_port) => local_port,
111 None => {
112 self.reject(true).await;
113 return Err(ListenerError::LocalPortsExhausted);
114 }
115 }
116 };
117
118 self.accept_from(local_port).await
119 }
120
121 pub async fn accept_from(mut self, local_port: PortNumber) -> Result<(Sender, Receiver), ListenerError> {
123 let (port_tx, port_rx) = oneshot::channel();
124 let _ = self.tx.send(PortEvt::Accepted { local_port, remote_port: self.remote_port, port_tx }).await;
125 let _ = self.done_tx.take().unwrap().send(());
126
127 port_rx.await.map_err(|_| ListenerError::MultiplexerError)
128 }
129
130 pub async fn reject(mut self, no_ports: bool) {
135 let _ = self.tx.send(PortEvt::Rejected { remote_port: self.remote_port, no_ports }).await;
136 let _ = self.done_tx.take().unwrap().send(());
137 }
138}
139
140impl Drop for Request {
141 fn drop(&mut self) {
142 }
144}
145
146pub(crate) enum RemoteConnectMsg {
148 Request(Request),
150 ClientDropped,
152}
153
154pub struct Listener {
156 wait_rx: mpsc::Receiver<RemoteConnectMsg>,
157 no_wait_rx: mpsc::Receiver<RemoteConnectMsg>,
158 port_allocator: PortAllocator,
159 terminate_tx: mpsc::UnboundedSender<()>,
160 closed: bool,
161}
162
163impl fmt::Debug for Listener {
164 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
165 f.debug_struct("Listener").field("port_allocator", &self.port_allocator).finish()
166 }
167}
168
169impl Listener {
170 pub(crate) fn new(
171 wait_rx: mpsc::Receiver<RemoteConnectMsg>, no_wait_rx: mpsc::Receiver<RemoteConnectMsg>,
172 port_allocator: PortAllocator, terminate_tx: mpsc::UnboundedSender<()>,
173 ) -> Self {
174 Self { wait_rx, no_wait_rx, port_allocator, terminate_tx, closed: false }
175 }
176
177 pub fn port_allocator(&self) -> PortAllocator {
179 self.port_allocator.clone()
180 }
181
182 pub async fn accept(&mut self) -> Result<Option<(Sender, Receiver)>, ListenerError> {
187 if self.closed {
188 return Ok(None);
189 }
190
191 loop {
192 tokio::select! {
193 local_port = self.port_allocator.allocate() => {
194 match self.inspect().await? {
195 Some(req) => break Ok(Some(req.accept_from(local_port).await?)),
196 None => break Ok(None),
197 }
198 },
199
200 no_wait_req_opt = self.no_wait_rx.recv() => {
201 match no_wait_req_opt {
202 Some(RemoteConnectMsg::Request(no_wait_req)) => {
203 match self.port_allocator.try_allocate() {
204 Some(local_port) => break Ok(Some(no_wait_req.accept_from(local_port).await?)),
205 None => no_wait_req.reject(true).await,
206 }
207 },
208 Some(RemoteConnectMsg::ClientDropped) => {
209 self.closed = true;
210 break Ok(None);
211 },
212 None => break Err(ListenerError::MultiplexerError),
213 }
214 },
215 }
216 }
217 }
218
219 pub async fn inspect(&mut self) -> Result<Option<Request>, ListenerError> {
229 if self.closed {
230 return Ok(None);
231 }
232
233 let req_opt = tokio::select! {
234 req_opt = self.wait_rx.recv() => req_opt,
235 req_opt = self.no_wait_rx.recv() => req_opt,
236 };
237
238 match req_opt {
239 Some(RemoteConnectMsg::Request(req)) => Ok(Some(req)),
240 Some(RemoteConnectMsg::ClientDropped) => {
241 self.closed = true;
242 Ok(None)
243 }
244 None => Err(ListenerError::MultiplexerError),
245 }
246 }
247
248 pub fn into_stream(self) -> ListenerStream {
250 ListenerStream::new(self)
251 }
252
253 pub fn terminate(&self) {
255 let _ = self.terminate_tx.send(());
256 }
257}
258
259impl Drop for Listener {
260 fn drop(&mut self) {
261 }
263}
264
265pub struct ListenerStream {
269 server: Arc<Mutex<Listener>>,
270 #[allow(clippy::type_complexity)]
271 accept_fut: Option<ReusableBoxFuture<'static, Option<Result<(Sender, Receiver), ListenerError>>>>,
272}
273
274impl ListenerStream {
275 fn new(server: Listener) -> Self {
276 Self { server: Arc::new(Mutex::new(server)), accept_fut: None }
277 }
278
279 async fn accept(server: Arc<Mutex<Listener>>) -> Option<Result<(Sender, Receiver), ListenerError>> {
280 let mut server = server.lock().await;
281 server.accept().await.transpose()
282 }
283
284 fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Result<(Sender, Receiver), ListenerError>>> {
285 if self.accept_fut.is_none() {
286 self.accept_fut = Some(ReusableBoxFuture::new(Self::accept(self.server.clone())));
287 }
288
289 let accept_fut = self.accept_fut.as_mut().unwrap();
290 let res = ready!(accept_fut.poll(cx));
291
292 self.accept_fut = None;
293 Poll::Ready(res)
294 }
295}
296
297impl Stream for ListenerStream {
298 type Item = Result<(Sender, Receiver), ListenerError>;
299
300 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
301 Pin::into_inner(self).poll_next(cx)
302 }
303}