use std::io;
use std::pin::Pin;
use crate::body::AdaptIncomingService;
use crate::bridge::io::TokioIo;
use crate::bridge::rt::TokioExecutor;
use crate::bridge::service::TowerHyperService;
pub use hyper::server::conn::http1;
pub use hyper::server::conn::http2;
use thiserror::Error;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use crate::server::Protocol;
pub use acceptor::Acceptor;
#[cfg(feature = "stream")]
pub use acceptor::AcceptorCore;
pub use info::{
ConnectionWithInfo, MakeServiceConnectionInfoLayer, MakeServiceConnectionInfoService,
};
pub use stream::{Accept, AcceptExt, AcceptOne, Stream};
mod acceptor;
pub mod auto;
mod connecting;
mod info;
mod stream;
#[cfg(feature = "tls")]
pub mod tls;
#[derive(Debug, Error)]
pub enum ConnectionError {
#[error(transparent)]
Hyper(#[from] hyper::Error),
#[error("service: {0}")]
Service(#[source] Box<dyn std::error::Error + Send + Sync>),
#[error("protocol: {0}")]
Protocol(#[source] io::Error),
}
type Adapted<S> = TowerHyperService<AdaptIncomingService<S>>;
pub trait Connection {
fn graceful_shutdown(self: Pin<&mut Self>);
}
impl<S, IO> Connection for http1::UpgradeableConnection<TokioIo<IO>, Adapted<S>>
where
S: tower::Service<crate::body::Request, Response = crate::body::Response>
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
fn graceful_shutdown(self: Pin<&mut Self>) {
self.graceful_shutdown()
}
}
impl<S, IO> Protocol<S, IO> for http1::Builder
where
S: tower::Service<crate::body::Request, Response = crate::body::Response>
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Connection = http1::UpgradeableConnection<TokioIo<IO>, Adapted<S>>;
type Error = hyper::Error;
fn serve_connection_with_upgrades(&self, stream: IO, service: S) -> Self::Connection
where
IO: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
let conn = self.serve_connection(
TokioIo::new(stream),
TowerHyperService::new(AdaptIncomingService::new(service)),
);
conn.with_upgrades()
}
}
impl<S, IO> Connection for http2::Connection<TokioIo<IO>, Adapted<S>, TokioExecutor>
where
S: tower::Service<crate::body::Request, Response = crate::body::Response>
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
fn graceful_shutdown(self: Pin<&mut Self>) {
self.graceful_shutdown()
}
}
impl<S, IO> Protocol<S, IO> for http2::Builder<TokioExecutor>
where
S: tower::Service<crate::body::Request, Response = crate::body::Response>
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Connection = http2::Connection<TokioIo<IO>, Adapted<S>, TokioExecutor>;
type Error = hyper::Error;
fn serve_connection_with_upgrades(&self, stream: IO, service: S) -> Self::Connection {
self.serve_connection(
TokioIo::new(stream),
TowerHyperService::new(AdaptIncomingService::new(service)),
)
}
}