pub mod middleware;
pub use self::imp::{start, ServerConfig, ServerError, ServerResult, ServiceBuilder};
mod imp {
use std::error;
use std::fmt;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use failure;
use failure::Fallible;
use futures::{future, Future, Stream};
use http::{Request, Response};
use hyper::body::{Body, Payload};
use hyper::server as hyper_server;
use hyper::server::conn::AddrIncoming;
use hyper::service as hyper_service;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::runtime::Runtime;
use tower_service;
use tower_service::NewService;
#[cfg(feature = "tower-web")]
use tower_web;
use app::App;
use endpoint::OutputEndpoint;
use rt::{with_set_runtime_mode, RuntimeMode};
#[cfg(feature = "tower-web")]
use super::middleware::TowerWebMiddleware;
use super::middleware::{Chain, Middleware};
#[derive(Debug)]
pub struct ServerError {
inner: failure::Error,
}
impl fmt::Display for ServerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "failed to start the server: {}", self.inner)
}
}
impl error::Error for ServerError {
fn description(&self) -> &str {
"failed to start the server"
}
}
pub type ServerResult<T> = Result<T, ServerError>;
pub fn start<E>(endpoint: E) -> ServiceBuilder<App<E>>
where
for<'a> E: OutputEndpoint<'a> + 'static,
{
ServiceBuilder::new(App::new(endpoint))
}
#[derive(Debug)]
pub struct ServiceBuilder<S> {
new_service: S,
}
impl<S> ServiceBuilder<S>
where
S: NewService,
{
pub fn new(new_service: S) -> Self {
ServiceBuilder { new_service }
}
pub fn with_middleware<M>(self, middleware: M) -> ServiceBuilder<Chain<S, M>>
where
M: Middleware<S::Service> + Clone,
{
ServiceBuilder {
new_service: Chain::new(self.new_service, middleware),
}
}
#[cfg(feature = "tower-web")]
pub fn with_tower_middleware<M>(
self,
middleware: M,
) -> ServiceBuilder<Chain<S, TowerWebMiddleware<M>>>
where
M: tower_web::middleware::Middleware<S::Service>,
{
ServiceBuilder {
new_service: Chain::new(self.new_service, TowerWebMiddleware::new(middleware)),
}
}
}
impl<S> NewService for ServiceBuilder<S>
where
S: NewService,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Service = S::Service;
type InitError = S::InitError;
type Future = S::Future;
#[inline]
fn new_service(&self) -> Self::Future {
self.new_service.new_service()
}
}
impl<S, Bd> ServiceBuilder<S>
where
S: NewService<Request = Request<Body>, Response = Response<Bd>> + Send + Sync + 'static,
S::Error: Into<Box<dyn error::Error + Send + Sync + 'static>>,
S::InitError: Into<Box<dyn error::Error + Send + Sync + 'static>>,
S::Service: Send,
S::Future: Send + 'static,
<S::Service as tower_service::Service>::Future: Send + 'static,
Bd: Payload,
{
pub fn serve(self, config: impl ServerConfig) -> ServerResult<()> {
self.serve_with_graceful_shutdown(config, future::empty::<(), ()>())
}
pub fn serve_with_graceful_shutdown(
self,
server_config: impl ServerConfig,
signal: impl Future<Item = ()> + Send + 'static,
) -> ServerResult<()> {
server_config
.build()
.map_err(|inner| ServerError { inner })?
.serve_with_graceful_shutdown(self, signal)
.map_err(|inner| ServerError { inner })
}
}
#[derive(Debug)]
pub struct Server<I> {
builder: hyper_server::Builder<I>,
rt: Runtime,
}
impl<I> Server<I>
where
I: Stream + Send + 'static,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
I::Error: Into<Box<dyn error::Error + Send + Sync + 'static>>,
{
fn serve_with_graceful_shutdown<S, Bd>(
self,
new_service: S,
signal: impl Future<Item = ()> + Send + 'static,
) -> Fallible<()>
where
S: NewService<Request = Request<Body>, Response = Response<Bd>> + Send + Sync + 'static,
S::Error: Into<Box<dyn error::Error + Send + Sync + 'static>>,
S::InitError: Into<Box<dyn error::Error + Send + Sync + 'static>>,
S::Service: Send,
S::Future: Send + 'static,
<S::Service as tower_service::Service>::Future: Send + 'static,
Bd: Payload,
{
let Self { builder, mut rt } = self;
let new_service = Arc::new(new_service);
let mut server = builder
.serve(NewHttpService(new_service.clone()))
.with_graceful_shutdown(signal)
.map_err(|err| error!("server error: {}", err));
let server = future::poll_fn(move || {
with_set_runtime_mode(RuntimeMode::ThreadPool, || server.poll())
});
rt.spawn(server);
rt.shutdown_on_idle().wait().unwrap();
Ok(())
}
}
pub trait ServerConfig: ServerConfigImpl {}
impl<'a> ServerConfig for &'a str {}
impl ServerConfig for String {}
impl<I: Into<IpAddr>> ServerConfig for (I, u16) {}
impl ServerConfig for SocketAddr {}
impl<'a> ServerConfig for &'a SocketAddr {}
pub trait ServerConfigImpl {
type Item: AsyncRead + AsyncWrite + Send + 'static;
type Error: Into<Box<dyn error::Error + Send + Sync + 'static>>;
type Incoming: Stream<Item = Self::Item, Error = Self::Error> + Send + 'static;
fn build(self) -> Fallible<Server<Self::Incoming>>;
}
impl<'a> ServerConfigImpl for &'a str {
type Item = <AddrIncoming as Stream>::Item;
type Error = <AddrIncoming as Stream>::Error;
type Incoming = AddrIncoming;
fn build(self) -> Fallible<Server<Self::Incoming>> {
ServerConfigImpl::build(&self.parse::<SocketAddr>()?)
}
}
impl ServerConfigImpl for String {
type Item = <AddrIncoming as Stream>::Item;
type Error = <AddrIncoming as Stream>::Error;
type Incoming = AddrIncoming;
fn build(self) -> Fallible<Server<Self::Incoming>> {
ServerConfigImpl::build(self.as_str())
}
}
impl<I: Into<IpAddr>> ServerConfigImpl for (I, u16) {
type Item = <AddrIncoming as Stream>::Item;
type Error = <AddrIncoming as Stream>::Error;
type Incoming = AddrIncoming;
fn build(self) -> Fallible<Server<Self::Incoming>> {
ServerConfigImpl::build(SocketAddr::from(self))
}
}
impl ServerConfigImpl for SocketAddr {
type Item = <AddrIncoming as Stream>::Item;
type Error = <AddrIncoming as Stream>::Error;
type Incoming = AddrIncoming;
fn build(self) -> Fallible<Server<Self::Incoming>> {
ServerConfigImpl::build(&self)
}
}
impl<'a> ServerConfigImpl for &'a SocketAddr {
type Item = <AddrIncoming as Stream>::Item;
type Error = <AddrIncoming as Stream>::Error;
type Incoming = AddrIncoming;
fn build(self) -> Fallible<Server<Self::Incoming>> {
let builder = hyper_server::Server::try_bind(self)?;
let rt = Runtime::new()?;
Ok(Server { builder, rt })
}
}
#[derive(Debug)]
struct NewHttpService<S>(S);
impl<S, ReqBody, ResBody> hyper_service::NewService for NewHttpService<S>
where
S: tower_service::NewService<Request = Request<ReqBody>, Response = Response<ResBody>>,
ReqBody: Payload,
ResBody: Payload,
S::Error: Into<Box<dyn error::Error + Send + Sync + 'static>>,
S::InitError: Into<Box<dyn error::Error + Send + Sync + 'static>>,
{
type ReqBody = ReqBody;
type ResBody = ResBody;
type Error = S::Error;
type Service = HttpService<S::Service>;
type InitError = S::InitError;
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
type Future = future::Map<S::Future, fn(S::Service) -> HttpService<S::Service>>;
#[inline]
fn new_service(&self) -> Self::Future {
self.0.new_service().map(HttpService)
}
}
#[derive(Debug)]
struct HttpService<S>(S);
impl<S, ReqBody, ResBody> hyper_service::Service for HttpService<S>
where
S: tower_service::Service<Request = Request<ReqBody>, Response = Response<ResBody>>,
ReqBody: Payload,
ResBody: Payload,
S::Error: Into<Box<dyn error::Error + Send + Sync + 'static>>,
{
type ReqBody = ReqBody;
type ResBody = ResBody;
type Error = S::Error;
type Future = S::Future;
#[inline]
fn call(&mut self, request: Request<ReqBody>) -> Self::Future {
self.0.call(request)
}
}
}