1use std::fmt::{self, Debug, Display, Formatter};
7use std::io::Result as IoResult;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11
12use futures_util::future::{BoxFuture, FutureExt};
13use http::uri::Scheme;
14use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
15use tokio_util::sync::CancellationToken;
16
17use crate::fuse::{ArcFuseFactory, ArcFusewire};
18use crate::http::Version;
19use crate::service::HyperHandler;
20
21mod proto;
22pub use proto::HttpBuilder;
23mod stream;
24pub use stream::*;
25
26cfg_feature! {
27 #![feature = "acme"]
28 pub mod acme;
29 pub use acme::AcmeListener;
30}
31cfg_feature! {
32 #![feature = "native-tls"]
33 pub mod native_tls;
34 pub use self::native_tls::NativeTlsListener;
35}
36cfg_feature! {
37 #![feature = "rustls"]
38 pub mod rustls;
39 pub use rustls::RustlsListener;
40}
41cfg_feature! {
42 #![feature = "openssl"]
43 pub mod openssl;
44 pub use self::openssl::OpensslListener;
45}
46cfg_feature! {
47 #![feature = "http1"]
48 pub use hyper::server::conn::http1;
49}
50cfg_feature! {
51 #![feature = "http2"]
52 pub use hyper::server::conn::http2;
53}
54cfg_feature! {
55 #![feature = "quinn"]
56 pub mod quinn;
57 pub use self::quinn::{QuinnListener, QuinnConnection};
58}
59cfg_feature! {
60 #![unix]
61 pub mod unix;
62}
63pub mod addr;
64pub use addr::SocketAddr;
65
66pub mod tcp;
67pub use tcp::TcpListener;
68
69mod joined;
70pub use joined::{JoinedAcceptor, JoinedListener};
71
72cfg_feature! {
73 #![unix]
74 pub use unix::UnixListener;
75}
76
77#[cfg(any(feature = "rustls", feature = "native-tls", feature = "openssl"))]
78pub trait IntoConfigStream<C> {
80 type Stream: futures_util::Stream<Item = C> + Send + 'static;
82
83 fn into_stream(self) -> Self::Stream;
85}
86
87#[non_exhaustive]
92pub struct Accepted<C, S>
93where
94 C: Coupler<Stream = S>,
95 S: Send + 'static,
96{
97 pub coupler: C,
99 pub stream: S,
101 pub fusewire: Option<ArcFusewire>,
103 pub local_addr: SocketAddr,
105 pub remote_addr: SocketAddr,
107 pub http_scheme: Scheme,
109}
110impl<C, S> Debug for Accepted<C, S>
111where
112 C: Coupler<Stream = S>,
113 S: Send + 'static,
114{
115 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
116 f.debug_struct("Accepted")
117 .field("local_addr", &self.local_addr)
118 .field("remote_addr", &self.remote_addr)
119 .field("http_scheme", &self.http_scheme)
120 .finish()
121 }
122}
123
124impl<C, S> Accepted<C, S>
125where
126 C: Coupler<Stream = S>,
127 S: Send + 'static,
128{
129 #[inline]
130 #[doc(hidden)]
131 pub fn map_into<TC, TS>(
132 self,
133 coupler_fn: impl FnOnce(C) -> TC,
134 stream_fn: impl FnOnce(S) -> TS,
135 ) -> Accepted<TC, TS>
136 where
137 TC: Coupler<Stream = TS>,
138 TS: Send + 'static,
139 {
140 let Self {
141 coupler,
142 stream,
143 fusewire,
144 local_addr,
145 remote_addr,
146 http_scheme,
147 } = self;
148 Accepted {
149 coupler: coupler_fn(coupler),
150 stream: stream_fn(stream),
151 fusewire,
152 local_addr,
153 remote_addr,
154 http_scheme,
155 }
156 }
157}
158
159pub trait Acceptor: Send {
161 type Coupler: Coupler<Stream = Self::Stream> + Unpin + Send + 'static;
163 type Stream: Unpin + Send + 'static;
165
166 fn holdings(&self) -> &[Holding];
168
169 fn accept(
171 &mut self,
172 fuse_factory: Option<ArcFuseFactory>,
173 ) -> impl Future<Output = IoResult<Accepted<Self::Coupler, Self::Stream>>> + Send;
174}
175
176#[derive(Clone, Debug)]
187#[non_exhaustive]
188pub struct Holding {
189 pub local_addr: SocketAddr,
191 pub http_versions: Vec<Version>,
193 pub http_scheme: Scheme,
195}
196impl Display for Holding {
197 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
198 write!(
199 f,
200 "{:?} on {}://{}",
201 self.http_versions,
202 self.http_scheme,
203 self.local_addr.to_string().trim_start_matches("socket://")
204 )
205 }
206}
207pub trait Coupler: Send {
209 type Stream: Send + 'static;
211
212 fn couple(
214 &self,
215 stream: Self::Stream,
216 handler: HyperHandler,
217 builder: Arc<HttpBuilder>,
218 graceful_stop_token: Option<CancellationToken>,
219 ) -> BoxFuture<'static, IoResult<()>>;
220}
221
222pub trait Listener: Send {
224 type Acceptor: Acceptor;
226
227 fn bind(self) -> impl Future<Output = Self::Acceptor> + Send
229 where
230 Self: Sized + Send + 'static,
231 {
232 async move { self.try_bind().await.expect("bind failed") }.boxed()
233 }
234
235 fn try_bind(self) -> impl Future<Output = crate::Result<Self::Acceptor>> + Send;
237
238 #[inline]
240 fn join<T>(self, other: T) -> JoinedListener<Self, T>
241 where
242 Self: Sized + Send,
243 {
244 JoinedListener::new(self, other)
245 }
246}
247
248pub struct DynStream {
250 reader: Box<dyn AsyncRead + Send + Unpin + 'static>,
251 writer: Box<dyn AsyncWrite + Send + Unpin + 'static>,
252}
253
254impl DynStream {
255 fn new(stream: impl AsyncRead + AsyncWrite + Send + 'static) -> Self {
256 let (reader, writer) = tokio::io::split(stream);
257 Self {
258 reader: Box::new(reader),
259 writer: Box::new(writer),
260 }
261 }
262}
263
264impl Debug for DynStream {
265 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
266 f.debug_struct("DynStream").finish()
267 }
268}
269
270impl AsyncRead for DynStream {
271 fn poll_read(
272 mut self: Pin<&mut Self>,
273 cx: &mut Context<'_>,
274 buf: &mut ReadBuf<'_>,
275 ) -> Poll<IoResult<()>> {
276 let this = &mut *self;
277 Pin::new(&mut this.reader).poll_read(cx, buf)
278 }
279}
280
281impl AsyncWrite for DynStream {
282 fn poll_write(
283 mut self: Pin<&mut Self>,
284 cx: &mut Context<'_>,
285 buf: &[u8],
286 ) -> Poll<IoResult<usize>> {
287 let this = &mut *self;
288 Pin::new(&mut this.writer).poll_write(cx, buf)
289 }
290
291 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
292 let this = &mut *self;
293 Pin::new(&mut this.writer).poll_flush(cx)
294 }
295
296 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
297 let this = &mut *self;
298 Pin::new(&mut this.writer).poll_shutdown(cx)
299 }
300}