salvo_core/conn/
mod.rs

1//! Various listener implementations for handling HTTP connections.
2//!
3//! These listeners include implementations for different TLS libraries such as `rustls`, `native-tls`, and `openssl`.
4//! The module also provides support for HTTP versions 1 and 2, as well as the QUIC protocol.
5//! Additionally, it includes implementations for Unix domain sockets.
6use std::fmt::{self, Debug, Display, Formatter};
7use std::io::Result as IoResult;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11
12use futures_util::future::{BoxFuture, FutureExt};
13use http::uri::Scheme;
14use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
15use tokio_util::sync::CancellationToken;
16
17use crate::fuse::{ArcFuseFactory, ArcFusewire};
18use crate::http::Version;
19use crate::service::HyperHandler;
20
21mod proto;
22pub use proto::HttpBuilder;
23mod stream;
24pub use stream::*;
25
26cfg_feature! {
27    #![feature = "acme"]
28    pub mod acme;
29    pub use acme::AcmeListener;
30}
31cfg_feature! {
32    #![feature = "native-tls"]
33    pub mod native_tls;
34    pub use self::native_tls::NativeTlsListener;
35}
36cfg_feature! {
37    #![feature = "rustls"]
38    pub mod rustls;
39    pub use rustls::RustlsListener;
40}
41cfg_feature! {
42    #![feature = "openssl"]
43    pub mod openssl;
44    pub use self::openssl::OpensslListener;
45}
46cfg_feature! {
47    #![feature = "http1"]
48    pub use hyper::server::conn::http1;
49}
50cfg_feature! {
51    #![feature = "http2"]
52    pub use hyper::server::conn::http2;
53}
54cfg_feature! {
55    #![feature = "quinn"]
56    pub mod quinn;
57    pub use self::quinn::{QuinnListener, QuinnConnection};
58}
59cfg_feature! {
60    #![unix]
61    pub mod unix;
62}
63pub mod addr;
64pub use addr::SocketAddr;
65
66pub mod tcp;
67pub use tcp::TcpListener;
68
69mod joined;
70pub use joined::{JoinedAcceptor, JoinedListener};
71
72cfg_feature! {
73    #![unix]
74    pub use unix::UnixListener;
75}
76
77#[cfg(any(feature = "rustls", feature = "native-tls", feature = "openssl"))]
78/// A type that can convert into TLS config stream.
79pub trait IntoConfigStream<C> {
80    /// TLS config stream.
81    type Stream: futures_util::Stream<Item = C> + Send + 'static;
82
83    /// Consume itself and return TLS config stream.
84    fn into_stream(self) -> Self::Stream;
85}
86
87/// [`Acceptor`]'s return type.
88///
89/// The `Accepted` struct represents an accepted connection and contains information such as the connection itself,
90/// the local and remote addresses, the HTTP scheme, and the HTTP version.
91#[non_exhaustive]
92pub struct Accepted<C, S>
93where
94    C: Coupler<Stream = S>,
95    S: Send + 'static,
96{
97    /// Coupler for couple stream.
98    pub coupler: C,
99    /// Incoming stream.
100    pub stream: S,
101    /// Fusewire for the connection.
102    pub fusewire: Option<ArcFusewire>,
103    /// Local addr.
104    pub local_addr: SocketAddr,
105    /// Remote addr.
106    pub remote_addr: SocketAddr,
107    /// HTTP scheme.
108    pub http_scheme: Scheme,
109}
110impl<C, S> Debug for Accepted<C, S>
111where
112    C: Coupler<Stream = S>,
113    S: Send + 'static,
114{
115    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
116        f.debug_struct("Accepted")
117            .field("local_addr", &self.local_addr)
118            .field("remote_addr", &self.remote_addr)
119            .field("http_scheme", &self.http_scheme)
120            .finish()
121    }
122}
123
124impl<C, S> Accepted<C, S>
125where
126    C: Coupler<Stream = S>,
127    S: Send + 'static,
128{
129    #[inline]
130    #[doc(hidden)]
131    pub fn map_into<TC, TS>(
132        self,
133        coupler_fn: impl FnOnce(C) -> TC,
134        stream_fn: impl FnOnce(S) -> TS,
135    ) -> Accepted<TC, TS>
136    where
137        TC: Coupler<Stream = TS>,
138        TS: Send + 'static,
139    {
140        let Self {
141            coupler,
142            stream,
143            fusewire,
144            local_addr,
145            remote_addr,
146            http_scheme,
147        } = self;
148        Accepted {
149            coupler: coupler_fn(coupler),
150            stream: stream_fn(stream),
151            fusewire,
152            local_addr,
153            remote_addr,
154            http_scheme,
155        }
156    }
157}
158
159/// An acceptor that can accept incoming connections.
160pub trait Acceptor: Send {
161    /// Coupler type.
162    type Coupler: Coupler<Stream = Self::Stream> + Unpin + Send + 'static;
163    /// Stream type.
164    type Stream: Unpin + Send + 'static;
165
166    /// Returns the holding information that this listener is bound to.
167    fn holdings(&self) -> &[Holding];
168
169    /// Accepts a new incoming connection from this listener.
170    fn accept(
171        &mut self,
172        fuse_factory: Option<ArcFuseFactory>,
173    ) -> impl Future<Output = IoResult<Accepted<Self::Coupler, Self::Stream>>> + Send;
174}
175
176// /// Get Http version from alpha.
177// pub fn version_from_alpn(proto: impl AsRef<[u8]>) -> Version {
178//     if proto.as_ref().windows(2).any(|window| window == b"h2") {
179//         Version::HTTP_2
180//     } else {
181//         Version::HTTP_11
182//     }
183// }
184
185/// Holding information.
186#[derive(Clone, Debug)]
187#[non_exhaustive]
188pub struct Holding {
189    /// Local address.
190    pub local_addr: SocketAddr,
191    /// HTTP versions.
192    pub http_versions: Vec<Version>,
193    /// HTTP scheme.
194    pub http_scheme: Scheme,
195}
196impl Display for Holding {
197    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
198        write!(
199            f,
200            "{:?} on {}://{}",
201            self.http_versions,
202            self.http_scheme,
203            self.local_addr.to_string().trim_start_matches("socket://")
204        )
205    }
206}
207/// A trait for couple http stream.
208pub trait Coupler: Send {
209    /// Connection stream type.
210    type Stream: Send + 'static;
211
212    /// Couple http connection.
213    fn couple(
214        &self,
215        stream: Self::Stream,
216        handler: HyperHandler,
217        builder: Arc<HttpBuilder>,
218        graceful_stop_token: Option<CancellationToken>,
219    ) -> BoxFuture<'static, IoResult<()>>;
220}
221
222/// `Listener` represents a listener that can bind to a specific address and port and return an acceptor.
223pub trait Listener: Send {
224    /// Acceptor type.
225    type Acceptor: Acceptor;
226
227    /// Bind and returns acceptor.
228    fn bind(self) -> impl Future<Output = Self::Acceptor> + Send
229    where
230        Self: Sized + Send + 'static,
231    {
232        async move { self.try_bind().await.expect("bind failed") }.boxed()
233    }
234
235    /// Bind and returns acceptor.
236    fn try_bind(self) -> impl Future<Output = crate::Result<Self::Acceptor>> + Send;
237
238    /// Join current listener with the other.
239    #[inline]
240    fn join<T>(self, other: T) -> JoinedListener<Self, T>
241    where
242        Self: Sized + Send,
243    {
244        JoinedListener::new(self, other)
245    }
246}
247
248/// Stream for DynAcceptor.
249pub struct DynStream {
250    reader: Box<dyn AsyncRead + Send + Unpin + 'static>,
251    writer: Box<dyn AsyncWrite + Send + Unpin + 'static>,
252}
253
254impl DynStream {
255    fn new(stream: impl AsyncRead + AsyncWrite + Send + 'static) -> Self {
256        let (reader, writer) = tokio::io::split(stream);
257        Self {
258            reader: Box::new(reader),
259            writer: Box::new(writer),
260        }
261    }
262}
263
264impl Debug for DynStream {
265    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
266        f.debug_struct("DynStream").finish()
267    }
268}
269
270impl AsyncRead for DynStream {
271    fn poll_read(
272        mut self: Pin<&mut Self>,
273        cx: &mut Context<'_>,
274        buf: &mut ReadBuf<'_>,
275    ) -> Poll<IoResult<()>> {
276        let this = &mut *self;
277        Pin::new(&mut this.reader).poll_read(cx, buf)
278    }
279}
280
281impl AsyncWrite for DynStream {
282    fn poll_write(
283        mut self: Pin<&mut Self>,
284        cx: &mut Context<'_>,
285        buf: &[u8],
286    ) -> Poll<IoResult<usize>> {
287        let this = &mut *self;
288        Pin::new(&mut this.writer).poll_write(cx, buf)
289    }
290
291    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
292        let this = &mut *self;
293        Pin::new(&mut this.writer).poll_flush(cx)
294    }
295
296    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
297        let this = &mut *self;
298        Pin::new(&mut this.writer).poll_shutdown(cx)
299    }
300}