#![doc(
html_root_url = "https://docs.rs/spirit-hyper/0.6.0/spirit_hyper/",
test(attr(deny(warnings)))
)]
#![forbid(unsafe_code)]
#![warn(missing_docs)]
use std::error::Error;
use std::fmt::Debug;
use std::io::Error as IoError;
use err_context::prelude::*;
use futures::sync::oneshot::{self, Receiver, Sender};
use futures::{Async, Future, Poll, Stream};
use hyper::body::Payload;
use hyper::server::{Builder, Server};
use hyper::service::{MakeServiceRef, Service};
use hyper::Body;
use log::debug;
use serde::{Deserialize, Serialize};
use spirit::fragment::driver::{CacheSimilar, Comparable, Comparison};
use spirit::fragment::{Fragment, Stackable, Transformation};
use spirit::AnyError;
use spirit::Empty;
use spirit_tokio::installer::FutureInstaller;
use spirit_tokio::net::limits::WithLimits;
use spirit_tokio::net::IntoIncoming;
use spirit_tokio::TcpListen;
#[cfg(feature = "cfg-help")]
use structdoc::StructDoc;
use tokio::io::{AsyncRead, AsyncWrite};
fn default_on() -> bool {
true
}
#[derive(Copy, Clone, Debug, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize)]
#[cfg_attr(feature = "cfg-help", derive(StructDoc))]
#[serde(rename_all = "kebab-case")]
enum HttpMode {
Both,
#[serde(rename = "http1-only")]
Http1Only,
#[serde(rename = "http2-only")]
Http2Only,
}
impl Default for HttpMode {
fn default() -> Self {
HttpMode::Both
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize)]
#[cfg_attr(feature = "cfg-help", derive(StructDoc))]
#[serde(rename_all = "kebab-case")]
struct HyperCfg {
#[serde(default = "default_on")]
http1_keepalive: bool,
#[serde(default = "default_on")]
http1_writev: bool,
#[serde(default = "default_on")]
http1_half_close: bool,
#[serde(default)]
http_mode: HttpMode,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize)]
#[cfg_attr(feature = "cfg-help", derive(StructDoc))]
#[serde(rename_all = "kebab-case")]
pub struct HyperServer<Transport> {
#[serde(flatten)]
pub transport: Transport,
#[serde(flatten)]
inner: HyperCfg,
}
impl<Transport: Default> Default for HyperServer<Transport> {
fn default() -> Self {
HyperServer {
transport: Transport::default(),
inner: HyperCfg {
http1_keepalive: true,
http1_writev: true,
http1_half_close: true,
http_mode: HttpMode::default(),
},
}
}
}
impl<Transport: Comparable> Comparable for HyperServer<Transport> {
fn compare(&self, other: &Self) -> Comparison {
let transport_cmp = self.transport.compare(&other.transport);
if transport_cmp == Comparison::Same && self.inner != other.inner {
Comparison::Similar
} else {
transport_cmp
}
}
}
impl<Transport> Fragment for HyperServer<Transport>
where
Transport: Fragment + Debug + Clone + Comparable,
Transport::Resource: IntoIncoming,
{
type Driver = CacheSimilar<Self>;
type Installer = ();
type Seed = Transport::Seed;
type Resource = Builder<<<Transport as Fragment>::Resource as IntoIncoming>::Incoming>;
fn make_seed(&self, name: &'static str) -> Result<Self::Seed, AnyError> {
self.transport.make_seed(name)
}
fn make_resource(
&self,
seed: &mut Self::Seed,
name: &'static str,
) -> Result<Self::Resource, AnyError> {
debug!("Creating HTTP server {}", name);
let (h1_only, h2_only) = match self.inner.http_mode {
HttpMode::Both => (false, false),
HttpMode::Http1Only => (true, false),
HttpMode::Http2Only => (false, true),
};
let transport = self.transport.make_resource(seed, name)?;
let builder = Server::builder(transport.into_incoming())
.http1_keepalive(self.inner.http1_keepalive)
.http1_writev(self.inner.http1_writev)
.http1_half_close(self.inner.http1_half_close)
.http1_only(h1_only)
.http2_only(h2_only);
Ok(builder)
}
}
impl<Transport> Stackable for HyperServer<Transport> where Transport: Stackable {}
pub type HttpServer<ExtraCfg = Empty> = HyperServer<WithLimits<TcpListen<ExtraCfg>>>;
struct ActivateInner<Transport, MS> {
server: Server<Transport, MS>,
receiver: Receiver<()>,
}
pub struct Activate<Transport, MS> {
inner: Option<ActivateInner<Transport, MS>>,
sender: Option<Sender<()>>,
name: &'static str,
}
impl<Transport, MS> Drop for Activate<Transport, MS> {
fn drop(&mut self) {
let _ = self.sender.take().expect("Dropped multiple times").send(());
}
}
impl<Transport, MS, B> Future for Activate<Transport, MS>
where
Transport: Stream<Error = IoError> + Send + Sync + 'static,
Transport::Item: AsyncRead + AsyncWrite + Send + Sync,
MS: MakeServiceRef<Transport::Item, ReqBody = Body, ResBody = B> + Send + 'static,
MS::Error: Into<Box<dyn Error + Send + Sync>>,
MS::Future: Send + 'static,
<MS::Future as Future>::Error: Error + Send + Sync,
MS::Service: Send + 'static,
<MS::Service as Service>::Future: Send,
B: Payload,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
if let Some(inner) = self.inner.take() {
let name = self.name;
let server = inner
.server
.with_graceful_shutdown(inner.receiver)
.map_err(move |e| {
let e = e.context(format!("HTTP server {} failed", name));
spirit::log_error!(multi Error, e.into());
});
tokio::spawn(server);
}
Ok(Async::NotReady)
}
}
pub struct BuildServer<BS>(pub BS);
impl<Transport, Inst, BS, Incoming, S, B>
Transformation<Builder<Incoming>, Inst, HyperServer<Transport>> for BuildServer<BS>
where
Transport: Fragment + 'static,
Transport::Resource: IntoIncoming<Incoming = Incoming, Connection = Incoming::Item>,
Incoming: Stream<Error = IoError> + Send + Sync + 'static,
Incoming::Item: AsyncRead + AsyncWrite + Send + Sync + 'static,
BS: Fn(Builder<Incoming>, &HyperServer<Transport>, &'static str) -> Server<Incoming, S>,
S: MakeServiceRef<Incoming::Item, ReqBody = Body, ResBody = B> + 'static,
B: Payload,
{
type OutputResource = Activate<Incoming, S>;
type OutputInstaller = FutureInstaller<Self::OutputResource>;
fn installer(&mut self, _ii: Inst, _name: &'static str) -> Self::OutputInstaller {
FutureInstaller::default()
}
fn transform(
&mut self,
builder: Builder<Incoming>,
cfg: &HyperServer<Transport>,
name: &'static str,
) -> Result<Self::OutputResource, AnyError> {
let (sender, receiver) = oneshot::channel();
let server = self.0(builder, cfg, name);
Ok(Activate {
inner: Some(ActivateInner { server, receiver }),
sender: Some(sender),
name,
})
}
}