poem/listener/
mod.rs

1//! Commonly used listeners.
2
3#[cfg(feature = "acme-base")]
4#[cfg_attr(docsrs, doc(cfg(feature = "acme-base")))]
5pub mod acme;
6mod combined;
7#[cfg(any(feature = "native-tls", feature = "rustls", feature = "openssl-tls"))]
8mod handshake_stream;
9#[cfg(feature = "native-tls")]
10mod native_tls;
11#[cfg(feature = "openssl-tls")]
12mod openssl_tls;
13#[cfg(feature = "rustls")]
14mod rustls;
15mod tcp;
16#[cfg(any(feature = "rustls", feature = "native-tls", feature = "openssl-tls"))]
17mod tls;
18#[cfg(unix)]
19mod unix;
20
21use std::{
22    convert::Infallible,
23    io::Error,
24    pin::Pin,
25    task::{Context, Poll},
26};
27
28use futures_util::{Future, FutureExt, TryFutureExt, future::BoxFuture};
29use http::uri::Scheme;
30use tokio::io::{AsyncRead, AsyncWrite, ReadBuf, Result as IoResult};
31
32#[cfg(feature = "acme-base")]
33use self::acme::{AutoCert, AutoCertListener};
34#[cfg(any(feature = "native-tls", feature = "rustls", feature = "openssl-tls"))]
35pub use self::handshake_stream::HandshakeStream;
36#[cfg(feature = "native-tls")]
37pub use self::native_tls::{NativeTlsAcceptor, NativeTlsConfig, NativeTlsListener};
38#[cfg(feature = "openssl-tls")]
39pub use self::openssl_tls::{OpensslTlsAcceptor, OpensslTlsConfig, OpensslTlsListener};
40#[cfg(feature = "rustls")]
41pub use self::rustls::{RustlsAcceptor, RustlsCertificate, RustlsConfig, RustlsListener};
42#[cfg(any(feature = "rustls", feature = "native-tls", feature = "openssl-tls"))]
43pub use self::tls::IntoTlsConfigStream;
44#[cfg(unix)]
45pub use self::unix::{UnixAcceptor, UnixListener};
46pub use self::{
47    combined::{Combined, CombinedStream},
48    tcp::{TcpAcceptor, TcpListener},
49};
50use crate::web::{LocalAddr, RemoteAddr};
51
52/// An IO type for BoxAcceptor.
53pub struct BoxIo {
54    reader: Box<dyn AsyncRead + Send + Unpin + 'static>,
55    writer: Box<dyn AsyncWrite + Send + Unpin + 'static>,
56}
57
58impl BoxIo {
59    fn new(io: impl AsyncRead + AsyncWrite + Send + Unpin + 'static) -> Self {
60        let (reader, writer) = tokio::io::split(io);
61        Self {
62            reader: Box::new(reader),
63            writer: Box::new(writer),
64        }
65    }
66}
67
68impl AsyncRead for BoxIo {
69    fn poll_read(
70        mut self: Pin<&mut Self>,
71        cx: &mut Context<'_>,
72        buf: &mut ReadBuf<'_>,
73    ) -> Poll<IoResult<()>> {
74        let this = &mut *self;
75        Pin::new(&mut this.reader).poll_read(cx, buf)
76    }
77}
78
79impl AsyncWrite for BoxIo {
80    fn poll_write(
81        mut self: Pin<&mut Self>,
82        cx: &mut Context<'_>,
83        buf: &[u8],
84    ) -> Poll<Result<usize, Error>> {
85        let this = &mut *self;
86        Pin::new(&mut this.writer).poll_write(cx, buf)
87    }
88
89    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
90        let this = &mut *self;
91        Pin::new(&mut this.writer).poll_flush(cx)
92    }
93
94    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
95        let this = &mut *self;
96        Pin::new(&mut this.writer).poll_shutdown(cx)
97    }
98}
99
100/// A `acceptor` that can be dynamically dispatched.
101pub trait DynAcceptor: Send {
102    /// Returns the local address that this listener is bound to.
103    fn local_addr(&self) -> Vec<LocalAddr>;
104
105    /// Accepts a new incoming connection from this listener.
106    ///
107    /// This function will yield once a new TCP connection is established. When
108    /// established, the corresponding IO stream and the remote peer’s
109    /// address will be returned.
110    fn accept(&mut self) -> BoxFuture<'_, IoResult<(BoxIo, LocalAddr, RemoteAddr, Scheme)>>;
111}
112
113/// A [`Acceptor`] wrapper used to implement [`DynAcceptor`].
114pub struct ToDynAcceptor<A>(pub A);
115
116impl<A: Acceptor> DynAcceptor for ToDynAcceptor<A> {
117    #[inline]
118    fn local_addr(&self) -> Vec<LocalAddr> {
119        self.0.local_addr()
120    }
121
122    #[inline]
123    fn accept(&mut self) -> BoxFuture<'_, IoResult<(BoxIo, LocalAddr, RemoteAddr, Scheme)>> {
124        async move {
125            let (io, local_addr, remote_addr, scheme) = self.0.accept().await?;
126            let io = BoxIo::new(io);
127            Ok((io, local_addr, remote_addr, scheme))
128        }
129        .boxed()
130    }
131}
132
133impl Acceptor for dyn DynAcceptor + '_ {
134    type Io = BoxIo;
135
136    #[inline]
137    fn local_addr(&self) -> Vec<LocalAddr> {
138        DynAcceptor::local_addr(self)
139    }
140
141    #[inline]
142    async fn accept(&mut self) -> IoResult<(BoxIo, LocalAddr, RemoteAddr, Scheme)> {
143        DynAcceptor::accept(self).await
144    }
145}
146
147/// Represents a acceptor type.
148pub trait Acceptor: Send {
149    /// IO stream type.
150    type Io: AsyncRead + AsyncWrite + Send + Unpin + 'static;
151
152    /// Returns the local address that this listener is bound to.
153    fn local_addr(&self) -> Vec<LocalAddr>;
154
155    /// Accepts a new incoming connection from this listener.
156    ///
157    /// This function will yield once a new TCP connection is established. When
158    /// established, the corresponding IO stream and the remote peer’s
159    /// address will be returned.
160    fn accept(
161        &mut self,
162    ) -> impl Future<Output = IoResult<(Self::Io, LocalAddr, RemoteAddr, Scheme)>> + Send;
163}
164
165/// An owned dynamically typed Acceptor for use in cases where you can’t
166/// statically type your result or need to add some indirection.
167pub type BoxAcceptor = Box<dyn DynAcceptor>;
168
169/// Extension trait for [`Acceptor`].
170pub trait AcceptorExt: Acceptor {
171    /// Combine two acceptors.
172    #[must_use]
173    fn combine<T>(self, other: T) -> Combined<Self, T>
174    where
175        Self: Sized,
176    {
177        Combined::new(self, other)
178    }
179
180    /// Wrap the acceptor in a `Box`.
181    fn boxed(self) -> BoxAcceptor
182    where
183        Self: Sized + 'static,
184    {
185        Box::new(ToDynAcceptor(self))
186    }
187
188    /// Consume this acceptor and return a new TLS acceptor with [`rustls`](https://crates.io/crates/rustls).
189    #[cfg(feature = "rustls")]
190    #[cfg_attr(docsrs, doc(cfg(feature = "rustls")))]
191    fn rustls<S>(self, config_stream: S) -> RustlsAcceptor<Self, S>
192    where
193        Self: Sized,
194        S: futures_util::Stream<Item = RustlsConfig> + Send + Unpin + 'static,
195    {
196        RustlsAcceptor::new(self, config_stream)
197    }
198
199    /// Consume this acceptor and return a new TLS acceptor with [`native-tls`](https://crates.io/crates/native-tls).
200    #[cfg(feature = "native-tls")]
201    #[cfg_attr(docsrs, doc(cfg(feature = "native-tls")))]
202    fn native_tls<S>(self, config_stream: S) -> NativeTlsAcceptor<Self, S>
203    where
204        Self: Sized,
205        S: futures_util::Stream<Item = NativeTlsConfig> + Send + Unpin + 'static,
206    {
207        NativeTlsAcceptor::new(self, config_stream)
208    }
209
210    /// Consume this acceptor and return a new TLS acceptor with [`openssl-tls`](https://crates.io/crates/openssl).
211    #[cfg(feature = "openssl-tls")]
212    #[cfg_attr(docsrs, doc(cfg(feature = "openssl-tls")))]
213    fn openssl_tls<S>(self, config_stream: S) -> OpensslTlsAcceptor<Self, S>
214    where
215        Self: Sized,
216        S: futures_util::Stream<Item = OpensslTlsConfig> + Send + Unpin + 'static,
217    {
218        OpensslTlsAcceptor::new(self, config_stream)
219    }
220}
221
222impl<T: Acceptor> AcceptorExt for T {}
223
224/// Represents a listener that can be listens for incoming connections.
225pub trait Listener: Send {
226    /// The acceptor type.
227    type Acceptor: Acceptor;
228
229    /// Create a acceptor instance.
230    fn into_acceptor(self) -> impl Future<Output = IoResult<Self::Acceptor>> + Send;
231
232    /// Combine two listeners.
233    ///
234    /// You can call this function multiple times to combine more listeners.
235    ///
236    /// # Example
237    ///
238    /// ```
239    /// use poem::listener::{Listener, TcpListener};
240    ///
241    /// let listener = TcpListener::bind("0.0.0.0:80").combine(TcpListener::bind("0.0.0.0:81"));
242    /// ```
243    #[must_use]
244    fn combine<T>(self, other: T) -> Combined<Self, T>
245    where
246        Self: Sized,
247    {
248        Combined::new(self, other)
249    }
250
251    /// Consume this listener and return a new TLS listener with [`rustls`](https://crates.io/crates/rustls).
252    #[cfg(feature = "rustls")]
253    #[cfg_attr(docsrs, doc(cfg(feature = "rustls")))]
254    #[must_use]
255    fn rustls<S: IntoTlsConfigStream<RustlsConfig>>(
256        self,
257        config_stream: S,
258    ) -> RustlsListener<Self, S>
259    where
260        Self: Sized,
261    {
262        RustlsListener::new(self, config_stream)
263    }
264
265    /// Consume this listener and return a new TLS listener with [`native-tls`](https://crates.io/crates/native-tls).
266    #[cfg(feature = "native-tls")]
267    #[cfg_attr(docsrs, doc(cfg(feature = "native-tls")))]
268    #[must_use]
269    fn native_tls<S: IntoTlsConfigStream<NativeTlsConfig>>(
270        self,
271        config_stream: S,
272    ) -> NativeTlsListener<Self, S>
273    where
274        Self: Sized,
275    {
276        NativeTlsListener::new(self, config_stream)
277    }
278
279    /// Consume this listener and return a new TLS listener with [`openssl-tls`](https://crates.io/crates/openssl).
280    #[cfg(feature = "openssl-tls")]
281    #[cfg_attr(docsrs, doc(cfg(feature = "openssl-tls")))]
282    #[must_use]
283    fn openssl_tls<S: IntoTlsConfigStream<OpensslTlsConfig>>(
284        self,
285        config_stream: S,
286    ) -> OpensslTlsListener<Self, S>
287    where
288        Self: Sized,
289    {
290        OpensslTlsListener::new(self, config_stream)
291    }
292
293    /// Consume this listener and return a new ACME listener.
294    ///
295    /// # Example
296    ///
297    /// ```
298    /// use poem::listener::{
299    ///     Listener, TcpListener,
300    ///     acme::{AutoCert, LETS_ENCRYPT_PRODUCTION},
301    /// };
302    ///
303    /// let listener = TcpListener::bind("0.0.0.0:443").acme(
304    ///     AutoCert::builder()
305    ///         .directory_url(LETS_ENCRYPT_PRODUCTION)
306    ///         .domain("poem.rs")
307    ///         .build()
308    ///         .unwrap(),
309    /// );
310    /// ```
311    #[cfg(feature = "acme-base")]
312    #[cfg_attr(docsrs, doc(cfg(feature = "acme-base")))]
313    #[must_use]
314    fn acme(self, auto_cert: AutoCert) -> AutoCertListener<Self>
315    where
316        Self: Sized,
317    {
318        AutoCertListener::new(self, auto_cert)
319    }
320
321    /// Wrap the listener in a `Box`.
322    fn boxed(self) -> BoxListener
323    where
324        Self: Sized + 'static,
325    {
326        BoxListener::new(self)
327    }
328}
329
330impl Listener for Infallible {
331    type Acceptor = Infallible;
332
333    async fn into_acceptor(self) -> IoResult<Self::Acceptor> {
334        unreachable!()
335    }
336}
337
338impl<T: Listener + Sized> Listener for Box<T> {
339    type Acceptor = T::Acceptor;
340
341    async fn into_acceptor(self) -> IoResult<Self::Acceptor> {
342        (*self).into_acceptor().await
343    }
344}
345
346impl<T: Acceptor + ?Sized> Acceptor for Box<T> {
347    type Io = T::Io;
348
349    fn local_addr(&self) -> Vec<LocalAddr> {
350        self.as_ref().local_addr()
351    }
352
353    async fn accept(&mut self) -> IoResult<(Self::Io, LocalAddr, RemoteAddr, Scheme)> {
354        self.as_mut().accept().await
355    }
356}
357
358impl Acceptor for Infallible {
359    type Io = BoxIo;
360
361    fn local_addr(&self) -> Vec<LocalAddr> {
362        vec![]
363    }
364
365    async fn accept(&mut self) -> IoResult<(Self::Io, LocalAddr, RemoteAddr, Scheme)> {
366        unreachable!()
367    }
368}
369
370/// An owned dynamically typed Listener for use in cases where you can’t
371/// statically type your result or need to add some indirection.
372pub struct BoxListener(BoxFuture<'static, IoResult<BoxAcceptor>>);
373
374impl BoxListener {
375    fn new<T: Listener + 'static>(listener: T) -> Self {
376        BoxListener(listener.into_acceptor().map_ok(AcceptorExt::boxed).boxed())
377    }
378}
379
380impl Listener for BoxListener {
381    type Acceptor = BoxAcceptor;
382
383    async fn into_acceptor(self) -> IoResult<Self::Acceptor> {
384        self.0.await
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    #[tokio::test]
393    async fn combined_listener() {
394        let a = TcpListener::bind("127.0.0.1:0");
395        let b = TcpListener::bind("127.0.0.1:0");
396        let _ = a.combine(b);
397    }
398
399    #[tokio::test]
400    async fn combined_acceptor() {
401        let a = TcpListener::bind("127.0.0.1:0")
402            .into_acceptor()
403            .await
404            .unwrap();
405
406        let b = TcpListener::bind("127.0.0.1:0")
407            .into_acceptor()
408            .await
409            .unwrap();
410
411        let _ = a.combine(b);
412    }
413}