use std::fmt::{self, Debug, Display, Formatter};
use std::io::Result as IoResult;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures_util::future::{BoxFuture, FutureExt};
use http::uri::Scheme;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_util::sync::CancellationToken;
use crate::fuse::{ArcFuseFactory, ArcFusewire};
use crate::http::Version;
use crate::service::HyperHandler;
mod proto;
pub use proto::HttpBuilder;
mod stream;
pub use stream::*;
cfg_feature! {
#![feature = "native-tls"]
pub mod native_tls;
pub use self::native_tls::NativeTlsListener;
}
cfg_feature! {
#![feature = "rustls"]
pub mod rustls;
pub use rustls::RustlsListener;
}
cfg_feature! {
#![feature = "openssl"]
pub mod openssl;
pub use self::openssl::OpensslListener;
}
cfg_feature! {
#![feature = "http1"]
pub use hyper::server::conn::http1;
}
cfg_feature! {
#![feature = "http2"]
pub use hyper::server::conn::http2;
}
cfg_feature! {
#![feature = "quinn"]
pub mod quinn;
pub use self::quinn::{QuinnListener, QuinnConnection};
}
cfg_feature! {
#![all(feature = "unix", unix)]
pub mod unix;
}
pub mod addr;
pub use addr::SocketAddr;
pub mod tcp;
pub use tcp::TcpListener;
mod joined;
pub use joined::{JoinedAcceptor, JoinedListener};
cfg_feature! {
#![all(feature = "unix", unix)]
pub use unix::UnixListener;
}
#[cfg(any(feature = "rustls", feature = "native-tls", feature = "openssl"))]
pub trait IntoConfigStream<C> {
type Stream: futures_util::Stream<Item = C> + Send + 'static;
fn into_stream(self) -> Self::Stream;
}
pub struct Accepted<C, S>
where
C: Coupler<Stream = S>,
S: Send + 'static,
{
pub coupler: C,
pub stream: S,
pub fusewire: Option<ArcFusewire>,
pub local_addr: SocketAddr,
pub remote_addr: SocketAddr,
pub http_scheme: Scheme,
}
impl<C, S> Debug for Accepted<C, S>
where
C: Coupler<Stream = S>,
S: Send + 'static,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Accepted")
.field("local_addr", &self.local_addr)
.field("remote_addr", &self.remote_addr)
.field("http_scheme", &self.http_scheme)
.finish()
}
}
impl<C, S> Accepted<C, S>
where
C: Coupler<Stream = S>,
S: Send + 'static,
{
#[inline]
#[doc(hidden)]
pub fn map_into<TC, TS>(
self,
coupler_fn: impl FnOnce(C) -> TC,
stream_fn: impl FnOnce(S) -> TS,
) -> Accepted<TC, TS>
where
TC: Coupler<Stream = TS>,
TS: Send + 'static,
{
let Self {
coupler,
stream,
fusewire,
local_addr,
remote_addr,
http_scheme,
} = self;
Accepted {
coupler: coupler_fn(coupler),
stream: stream_fn(stream),
fusewire,
local_addr,
remote_addr,
http_scheme,
}
}
}
pub trait Acceptor: Send {
type Coupler: Coupler<Stream = Self::Stream> + Unpin + Send + 'static;
type Stream: Unpin + Send + 'static;
fn holdings(&self) -> &[Holding];
fn accept(
&mut self,
fuse_factory: Option<ArcFuseFactory>,
) -> impl Future<Output = IoResult<Accepted<Self::Coupler, Self::Stream>>> + Send;
}
#[derive(Clone, Debug)]
pub struct Holding {
pub local_addr: SocketAddr,
pub http_versions: Vec<Version>,
pub http_scheme: Scheme,
}
impl Display for Holding {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"{:?} on {}://{}",
self.http_versions,
self.http_scheme,
self.local_addr.to_string().trim_start_matches("socket://")
)
}
}
pub trait Coupler: Send {
type Stream: Send + 'static;
fn couple(
&self,
stream: Self::Stream,
handler: HyperHandler,
builder: Arc<HttpBuilder>,
graceful_stop_token: Option<CancellationToken>,
) -> BoxFuture<'static, IoResult<()>>;
}
pub trait Listener: Send {
type Acceptor: Acceptor;
fn bind(self) -> impl Future<Output = Self::Acceptor> + Send
where
Self: Sized + Send + 'static,
{
async move { self.try_bind().await.expect("bind failed") }.boxed()
}
fn try_bind(self) -> impl Future<Output = crate::Result<Self::Acceptor>> + Send;
#[inline]
fn join<T>(self, other: T) -> JoinedListener<Self, T>
where
Self: Sized + Send,
{
JoinedListener::new(self, other)
}
}
pub struct DynStream {
reader: Box<dyn AsyncRead + Send + Unpin + 'static>,
writer: Box<dyn AsyncWrite + Send + Unpin + 'static>,
}
impl DynStream {
fn new(stream: impl AsyncRead + AsyncWrite + Send + 'static) -> Self {
let (reader, writer) = tokio::io::split(stream);
Self {
reader: Box::new(reader),
writer: Box::new(writer),
}
}
}
impl Debug for DynStream {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("DynStream").finish()
}
}
impl AsyncRead for DynStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<IoResult<()>> {
let this = &mut *self;
Pin::new(&mut this.reader).poll_read(cx, buf)
}
}
impl AsyncWrite for DynStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<IoResult<usize>> {
let this = &mut *self;
Pin::new(&mut this.writer).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
let this = &mut *self;
Pin::new(&mut this.writer).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
let this = &mut *self;
Pin::new(&mut this.writer).poll_shutdown(cx)
}
}