use crate::net::Reactor;
use crate::{executor, Body, Request, Response};
use std::convert::Infallible;
use std::future::Future;
use std::io;
use std::net::{SocketAddr, TcpListener, ToSocketAddrs};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use hyper::rt::Executor;
use hyper_util::server::conn::auto::Builder;
#[derive(Default)]
pub struct Server {
listener: Option<TcpListener>,
http1_keep_alive: Option<bool>,
http1_half_close: Option<bool>,
http1_max_buf_size: Option<usize>,
http1_pipeline_flush: Option<bool>,
http1_writev: Option<bool>,
http1_title_case_headers: Option<bool>,
http1_preserve_header_case: Option<bool>,
http1_only: bool,
#[cfg(feature = "http2")]
http2_only: bool,
#[cfg(feature = "http2")]
http2_initial_stream_window_size: Option<u32>,
#[cfg(feature = "http2")]
http2_enable_connect_protocol: bool,
#[cfg(feature = "http2")]
http2_initial_connection_window_size: Option<u32>,
#[cfg(feature = "http2")]
http2_adaptive_window: Option<bool>,
#[cfg(feature = "http2")]
http2_max_frame_size: Option<u32>,
#[cfg(feature = "http2")]
http2_max_concurrent_streams: Option<u32>,
#[cfg(feature = "http2")]
http2_max_send_buf_size: Option<usize>,
#[cfg(feature = "http2")]
http2_max_header_list_size: Option<u32>,
worker_keep_alive: Option<Duration>,
max_workers: Option<usize>,
}
#[derive(Clone, Debug)]
pub struct ConnectionInfo {
peer_addr: Option<SocketAddr>,
}
impl ConnectionInfo {
pub fn peer_addr(&self) -> Option<SocketAddr> {
self.peer_addr
}
}
pub trait Service: Send + 'static {
fn call(&self, request: Request, info: ConnectionInfo) -> Response;
}
impl<F> Service for F
where
F: Fn(Request, ConnectionInfo) -> Response + Send + 'static,
{
fn call(&self, request: Request, info: ConnectionInfo) -> Response {
(self)(request, info)
}
}
impl<S> Service for Arc<S>
where
S: Service + Sync,
{
fn call(&self, request: Request, info: ConnectionInfo) -> Response {
(**self).call(request, info)
}
}
impl Server {
pub fn bind(addr: impl ToSocketAddrs) -> Server {
let listener = std::net::TcpListener::bind(addr).expect("failed to bind listener");
Server {
listener: Some(listener),
..Default::default()
}
}
pub fn serve<S>(self, service: S) -> io::Result<()>
where
S: Service + Sync,
{
self.serve_clone(Arc::new(service))
}
pub fn serve_clone<S>(self, service: S) -> io::Result<()>
where
S: Service + Clone,
{
let executor = executor::Executor::new(self.max_workers, self.worker_keep_alive);
let http = self.configure(Builder::new(executor.clone()));
let reactor = Reactor::new().expect("failed to create reactor");
for conn in self.listener.unwrap().incoming() {
let conn = conn.and_then(|stream| reactor.register(stream))?;
let service = service.clone();
let builder = http.clone();
let info = ConnectionInfo {
peer_addr: conn.sys.peer_addr().ok(),
};
executor.execute(async move {
if let Err(err) = builder
.clone()
.serve_connection(conn, service::HyperService(service, info))
.await
{
log::error!("error serving connection: {}", err);
}
});
}
Ok(())
}
pub fn max_workers(mut self, val: usize) -> Self {
self.max_workers = Some(val);
self
}
pub fn worker_keep_alive(mut self, val: Duration) -> Self {
self.worker_keep_alive = Some(val);
self
}
pub fn http1_keep_alive(mut self, val: bool) -> Self {
self.http1_keep_alive = Some(val);
self
}
pub fn http1_half_close(mut self, val: bool) -> Self {
self.http1_half_close = Some(val);
self
}
pub fn http1_max_buf_size(mut self, val: usize) -> Self {
self.http1_max_buf_size = Some(val);
self
}
pub fn http1_pipeline_flush(mut self, val: bool) -> Self {
self.http1_pipeline_flush = Some(val);
self
}
pub fn http1_writev(mut self, enabled: bool) -> Self {
self.http1_writev = Some(enabled);
self
}
pub fn http1_title_case_headers(mut self, val: bool) -> Self {
self.http1_title_case_headers = Some(val);
self
}
pub fn http1_preserve_header_case(mut self, val: bool) -> Self {
self.http1_preserve_header_case = Some(val);
self
}
pub fn http1_only(mut self) -> Self {
self.http1_only = true;
self
}
#[cfg(feature = "http2")]
pub fn http2_only(mut self) -> Self {
self.http2_only = true;
self
}
#[cfg(feature = "http2")]
pub fn http2_initial_stream_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
self.http2_initial_stream_window_size = sz.into();
self
}
#[cfg(feature = "http2")]
pub fn http2_enable_connect_protocol(mut self) -> Self {
self.http2_enable_connect_protocol = true;
self
}
#[cfg(feature = "http2")]
pub fn http2_initial_connection_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
self.http2_initial_connection_window_size = sz.into();
self
}
#[cfg(feature = "http2")]
pub fn http2_adaptive_window(mut self, enabled: bool) -> Self {
self.http2_adaptive_window = Some(enabled);
self
}
#[cfg(feature = "http2")]
pub fn http2_max_frame_size(mut self, sz: impl Into<Option<u32>>) -> Self {
self.http2_max_frame_size = sz.into();
self
}
#[cfg(feature = "http2")]
pub fn http2_max_concurrent_streams(mut self, max: impl Into<Option<u32>>) -> Self {
self.http2_max_concurrent_streams = max.into();
self
}
#[cfg(feature = "http2")]
pub fn http2_max_send_buf_size(mut self, max: usize) -> Self {
self.http2_max_send_buf_size = Some(max);
self
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.listener
.as_ref()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Server::bind not called yet"))?
.local_addr()
}
fn configure<T>(&self, mut http: Builder<T>) -> Builder<T> {
macro_rules! configure {
($self:ident.$option:ident => $other:ident.$builder:ident.$other_option:ident) => {{
if let Some(val) = $self.$option {
$other.$builder().$other_option(val);
}
}};
($self:ident.$option:ident => $other:ident.$builder:ident.$other_option:ident()) => {{
if $self.$option {
$other.$builder().$other_option();
}
}};
}
if self.http1_only {
http = http.http1_only();
}
#[cfg(feature = "http2")]
if self.http2_only {
http = http.http2_only();
}
configure!(self.http1_keep_alive => http.http1.keep_alive);
configure!(self.http1_half_close => http.http1.half_close);
configure!(self.http1_max_buf_size => http.http1.max_buf_size);
configure!(self.http1_pipeline_flush => http.http1.pipeline_flush);
configure!(self.http1_writev => http.http1.writev);
configure!(self.http1_title_case_headers => http.http1.title_case_headers);
configure!(self.http1_preserve_header_case => http.http1.preserve_header_case);
#[cfg(feature = "http2")]
{
configure!(self.http2_initial_stream_window_size => http.http2.initial_stream_window_size);
configure!(self.http2_enable_connect_protocol => http.http2.enable_connect_protocol());
configure!(self.http2_initial_connection_window_size => http.http2.initial_connection_window_size);
configure!(self.http2_adaptive_window => http.http2.adaptive_window);
configure!(self.http2_max_frame_size => http.http2.max_frame_size);
configure!(self.http2_max_concurrent_streams => http.http2.max_concurrent_streams);
configure!(self.http2_max_send_buf_size => http.http2.max_send_buf_size);
configure!(self.http2_max_header_list_size => http.http2.max_header_list_size);
}
http
}
}
mod service {
use super::*;
use http_body_util::combinators::UnsyncBoxBody;
use http_body_util::BodyExt;
type HyperRequest = hyper::Request<hyper::body::Incoming>;
pub struct HyperService<S>(pub S, pub ConnectionInfo);
impl<S> hyper::service::Service<HyperRequest> for HyperService<S>
where
S: Service + Clone,
{
type Response = Response;
type Error = Infallible;
type Future = Lazy<S>;
fn call(&self, request: HyperRequest) -> Self::Future {
Lazy {
request: Some(request),
service: self.0.clone(),
info: self.1.clone(),
}
}
}
pub struct Lazy<S> {
service: S,
request: Option<HyperRequest>,
info: ConnectionInfo,
}
impl<S> Unpin for Lazy<S> {}
impl<S> Future for Lazy<S>
where
S: Service,
{
type Output = Result<Response, Infallible>;
fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
let (parts, body) = self.request.take().unwrap().into_parts();
let body = Body(UnsyncBoxBody::new(body.map_err(io::Error::other)));
let req = Request::from_parts(parts, body);
let res = self.service.call(req, self.info.clone());
Poll::Ready(Ok(res))
}
}
}