spirit_hyper/
lib.rs

1#![doc(test(attr(deny(warnings))))]
2// Our program-long snippets are more readable with main
3#![allow(clippy::needless_doctest_main)]
4#![forbid(unsafe_code)]
5#![warn(missing_docs)]
6#![cfg_attr(docsrs, feature(doc_cfg))]
7
8//! [Spirit][spirit] extension for Hyper servers.
9//!
10//! This allows having Hyper servers auto-spawned from configuration. It is possible to put them on
11//! top of arbitrary stream-style IO objects (TcpStream, UdsStream, these wrapped in SSL...).
12//!
13//! # Tokio runtime
14//!
15//! This uses the [`spirit_tokio`] crate under the hood. Similar drawback with initializing a
16//! runtime applies here too (see the [`spirit_tokio`] docs for details).
17//!
18//! # Examples
19//!
20//! ```rust
21//! use hyper::{Body, Request, Response};
22//! use serde::Deserialize;
23//! use spirit::{Empty, Pipeline, Spirit};
24//! use spirit::prelude::*;
25//! use spirit_hyper::{server_from_handler, BuildServer, HttpServer};
26//!
27//! const DEFAULT_CONFIG: &str = r#"
28//! [server]
29//! port = 2234
30//! "#;
31//!
32//! #[derive(Default, Deserialize)]
33//! struct Config {
34//!     server: HttpServer,
35//! }
36//!
37//! impl Config {
38//!     fn server(&self) -> HttpServer {
39//!         self.server.clone()
40//!     }
41//! }
42//!
43//! async fn request(_req: Request<Body>) -> Response<Body> {
44//!     Response::new(Body::from("Hello world\n"))
45//! }
46//!
47//! fn main() {
48//!     Spirit::<Empty, Config>::new()
49//!         .config_defaults(DEFAULT_CONFIG)
50//!         .with(
51//!             // Let's build a http server as configured by the user
52//!             Pipeline::new("listen")
53//!                 .extract_cfg(Config::server)
54//!                 // This is where we teach the server what it serves. It is the usual stuff from
55//!                 // hyper.
56//!                 .transform(BuildServer(server_from_handler(request)))
57//!         )
58//!         .run(|spirit| {
59//! #           let spirit = std::sync::Arc::clone(spirit);
60//! #           std::thread::spawn(move || spirit.terminate());
61//!             Ok(())
62//!         });
63//! }
64//! ```
65//!
66//! Further examples (with more flexible handling) are in the
67//! [git repository](https://github.com/vorner/spirit/tree/master/spirit-hyper/examples).
68
69use std::convert::Infallible;
70use std::fmt::Debug;
71use std::future::Future;
72use std::io::Error as IoError;
73use std::pin::Pin;
74use std::task::{Context, Poll};
75use std::time::Duration;
76
77use err_context::prelude::*;
78use err_context::AnyError;
79use hyper::body::Body;
80use hyper::server::accept::Accept as HyperAccept;
81use hyper::server::{Builder, Server};
82use hyper::service::{make_service_fn, service_fn};
83use hyper::{Error as HyperError, Request, Response};
84use log::{debug, trace};
85use pin_project::pin_project;
86use serde::{Deserialize, Serialize};
87use spirit::fragment::driver::{CacheSimilar, Comparable, Comparison};
88use spirit::fragment::{Fragment, Stackable, Transformation};
89use spirit::utils::{deserialize_opt_duration, is_default, is_true, serialize_opt_duration};
90use spirit::{log_error, Empty};
91use spirit_tokio::net::limits::WithLimits;
92use spirit_tokio::net::{Accept as SpiritAccept, TcpListen};
93use spirit_tokio::runtime::{self, ShutGuard};
94use spirit_tokio::FutureInstaller;
95#[cfg(feature = "cfg-help")]
96use structdoc::StructDoc;
97use tokio::io::{AsyncRead, AsyncWrite};
98use tokio::sync::oneshot::{self, Receiver, Sender};
99use tokio::task::JoinHandle;
100
101const KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
102
103fn is_default_timeout(t: &Duration) -> bool {
104    *t == KEEPALIVE_TIMEOUT
105}
106
107/// Configuration of the selected HTTP protocol version.
108#[derive(Copy, Clone, Debug, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize)]
109#[cfg_attr(feature = "cfg-help", derive(StructDoc))]
110#[serde(rename_all = "kebab-case")]
111#[non_exhaustive]
112pub enum HttpMode {
113    /// Enable both HTTP1 and HTTP2 protocols.
114    Both,
115
116    /// Disable the HTTP2 protocol.
117    #[serde(rename = "http1-only")]
118    Http1Only,
119
120    /// Disable the HTTP1 protocol.
121    #[serde(rename = "http2-only")]
122    Http2Only,
123}
124
125impl Default for HttpMode {
126    fn default() -> Self {
127        HttpMode::Both
128    }
129}
130
131/// Configuration of Hyper HTTP servers.
132///
133/// This are the things that are extra over the transport. It doesn't contain any kind of ports or
134/// SSL certificates, these are added inside the [`HyperServer`]. This is only for configuring the
135/// HTTP protocol itself.
136#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize)]
137#[cfg_attr(feature = "cfg-help", derive(StructDoc))]
138#[serde(rename_all = "kebab-case", default)]
139#[non_exhaustive]
140pub struct HyperCfg {
141    /// The HTTP keepalive.
142    ///
143    /// <https://en.wikipedia.org/wiki/HTTP_persistent_connection>.
144    ///
145    /// Default is on, can be turned off.
146    #[serde(skip_serializing_if = "is_true")]
147    pub http1_keepalive: bool,
148
149    /// When a http1 client closes its write end, keep the connection open until the reply is sent.
150    ///
151    /// If set to false, if the client closes its connection, server does too.
152    #[serde(skip_serializing_if = "is_true")]
153    pub http1_half_close: bool,
154
155    /// Maximum buffer size of HTTP1.
156    #[serde(skip_serializing_if = "Option::is_none")]
157    pub http1_max_buf_size: Option<usize>,
158
159    /// Initial window size.
160    #[serde(skip_serializing_if = "Option::is_none")]
161    pub http2_initial_stream_window_size: Option<u32>,
162
163    /// Initial window size.
164    #[serde(skip_serializing_if = "Option::is_none")]
165    pub http2_initial_connection_window_size: Option<u32>,
166
167    /// Choose the window sizes dynamically at runtime.
168    ///
169    /// If turned off (the default), uses the values configured.
170    #[serde(default, skip_serializing_if = "is_default")]
171    pub http2_adaptive_window: bool,
172
173    /// Maximum number of concurrent streams.
174    ///
175    /// Defaults to no limit.
176    #[serde(skip_serializing_if = "Option::is_none")]
177    pub http2_max_concurrent_streams: Option<u32>,
178
179    /// The maximum frame size of http2.
180    #[serde(skip_serializing_if = "Option::is_none")]
181    pub http2_max_frame_size: Option<u32>,
182
183    /// How often to send keep alive/ping frames.
184    ///
185    /// Defaults to disabled.
186    #[serde(
187        deserialize_with = "deserialize_opt_duration",
188        serialize_with = "serialize_opt_duration",
189        skip_serializing_if = "Option::is_none"
190    )]
191    pub http2_keep_alive_interval: Option<Duration>,
192
193    /// Close connection if no response for ping in this time.
194    ///
195    /// Defaults to 20s.
196    #[serde(skip_serializing_if = "is_default_timeout")]
197    pub http2_keep_alive_timeout: Duration,
198
199    /// What protocols are enabled.
200    #[serde(default, skip_serializing_if = "is_default")]
201    pub http_mode: HttpMode,
202}
203
204impl HyperCfg {
205    /// Constructs a Hyper server [`Builder`] based on this configuration.
206    pub fn builder<I>(&self, incoming: I) -> Builder<I> {
207        let (h1_only, h2_only) = match self.http_mode {
208            HttpMode::Both => (false, false),
209            HttpMode::Http1Only => (true, false),
210            HttpMode::Http2Only => (false, true),
211        };
212
213        let mut builder = Server::builder(incoming)
214            .http1_keepalive(self.http1_keepalive)
215            .http1_half_close(self.http1_half_close)
216            .http2_initial_connection_window_size(self.http2_initial_connection_window_size)
217            .http2_initial_stream_window_size(self.http2_initial_stream_window_size)
218            .http2_adaptive_window(self.http2_adaptive_window)
219            .http2_max_concurrent_streams(self.http2_max_concurrent_streams)
220            .http2_max_frame_size(self.http2_max_frame_size)
221            .http2_keep_alive_interval(self.http2_keep_alive_interval)
222            .http2_keep_alive_timeout(self.http2_keep_alive_timeout)
223            .http1_only(h1_only)
224            .http2_only(h2_only);
225
226        if let Some(size) = self.http1_max_buf_size {
227            builder = builder.http1_max_buf_size(size);
228        }
229
230        builder
231    }
232}
233
234impl Default for HyperCfg {
235    fn default() -> Self {
236        HyperCfg {
237            http1_keepalive: true,
238            http1_half_close: true,
239            http1_max_buf_size: None,
240            http2_initial_connection_window_size: None,
241            http2_initial_stream_window_size: None,
242            http2_adaptive_window: false,
243            http2_max_concurrent_streams: None,
244            http2_max_frame_size: None,
245            http2_keep_alive_interval: None,
246            http2_keep_alive_timeout: KEEPALIVE_TIMEOUT,
247            http_mode: HttpMode::default(),
248        }
249    }
250}
251
252/// A plumbing wrapper type.
253///
254/// Not of direct interest to users, though it might leak to some function signatures at an
255/// occasion. The real listener socket is wrapped inside.
256#[pin_project]
257#[derive(Copy, Clone, Debug)]
258pub struct Acceptor<A>(#[pin] A);
259
260impl<A: SpiritAccept> HyperAccept for Acceptor<A> {
261    type Conn = A::Connection;
262    type Error = IoError;
263    fn poll_accept(
264        self: Pin<&mut Self>,
265        ctx: &mut Context,
266    ) -> Poll<Option<Result<Self::Conn, IoError>>> {
267        self.project()
268            .0
269            .poll_accept(ctx)
270            .map(|p| p.map(Some).transpose())
271    }
272}
273
274/// A [`Fragment`] for hyper servers.
275///
276/// This is a wrapper around a `Transport` [`Fragment`]. It takes something that accepts
277/// connections ‒ like [`TcpListen`] and adds configuration specific for a HTTP server.
278///
279/// The [`Fragment`] produces [hyper] [Builder]. The [`BuildServer`] transformation can be used to
280/// make it into a [`Server`] and install it into a tokio runtime.
281///
282/// See also the [`HttpServer`] type alias.
283///
284/// # Configuration options
285///
286/// In addition to options already provided by the `Transport`, these options are added:
287///
288/// * `http1-keepalive`: boolean, default true.
289/// * `http-mode`: One of `"both"`, `"http1-only"` or `"http2-only"`. Defaults to `"both"`.
290#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize)]
291#[cfg_attr(feature = "cfg-help", derive(StructDoc))]
292#[serde(rename_all = "kebab-case")]
293#[non_exhaustive]
294pub struct HyperServer<Transport> {
295    /// The inner transport.
296    ///
297    /// This is accessible by the user in case it contains something of use to the
298    /// [`Transformation`]s.
299    #[serde(flatten)]
300    pub transport: Transport,
301
302    /// Configuration of Hyper.
303    ///
304    /// The HTTP configuration is inside this.
305    #[serde(flatten)]
306    pub hyper_cfg: HyperCfg,
307}
308
309impl<Transport: Comparable> Comparable for HyperServer<Transport> {
310    fn compare(&self, other: &Self) -> Comparison {
311        let transport_cmp = self.transport.compare(&other.transport);
312        if transport_cmp == Comparison::Same && self.hyper_cfg != other.hyper_cfg {
313            Comparison::Similar
314        } else {
315            transport_cmp
316        }
317    }
318}
319
320impl<Transport> Fragment for HyperServer<Transport>
321where
322    Transport: Fragment + Debug + Clone + Comparable,
323{
324    type Driver = CacheSimilar<Self>;
325    type Installer = ();
326    type Seed = Transport::Seed;
327    type Resource = Builder<Acceptor<Transport::Resource>>;
328    fn make_seed(&self, name: &'static str) -> Result<Self::Seed, AnyError> {
329        self.transport.make_seed(name)
330    }
331    fn make_resource(
332        &self,
333        seed: &mut Self::Seed,
334        name: &'static str,
335    ) -> Result<Self::Resource, AnyError> {
336        debug!("Creating HTTP server {}", name);
337
338        let transport = self.transport.make_resource(seed, name)?;
339        let builder = self.hyper_cfg.builder(Acceptor(transport));
340
341        Ok(builder)
342    }
343}
344
345impl<Transport> Stackable for HyperServer<Transport> where Transport: Stackable {}
346
347/// A type alias for http (plain TCP) hyper server.
348pub type HttpServer<ExtraCfg = Empty> = HyperServer<WithLimits<TcpListen<ExtraCfg>>>;
349
350/// A plumbing helper type.
351pub struct Activate<Fut> {
352    build_server: Option<Box<dyn FnOnce(Receiver<()>) -> Fut + Send>>,
353    shut_guard: Option<ShutGuard>,
354    sender: Option<Sender<()>>,
355    join: Option<JoinHandle<()>>,
356    name: &'static str,
357}
358
359impl<Fut> Drop for Activate<Fut> {
360    fn drop(&mut self) {
361        // Tell the server to terminate
362        if let Some(sender) = self.sender.take() {
363            let _ = sender.send(());
364        }
365    }
366}
367
368impl<Fut, E> Future for Activate<Fut>
369where
370    Fut: Future<Output = Result<(), E>> + Send + 'static,
371    E: Into<AnyError>,
372{
373    type Output = ();
374
375    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<()> {
376        trace!("Poll on Activate({})", self.name);
377        if let Some(build_server) = self.build_server.take() {
378            trace!("Activating {}", self.name);
379            // Hack: Hyper is very unfriendly to constraining type parameters (not only there's
380            // like a whole paragraph on their methods, some of the traits are private so one can't
381            // really ask for them :-(.
382            //
383            // We don't want to box our future, and we want to remotely ask for graceful shutdown.
384            // Therefore we spawn the returned future separately and signal the shutdown from our
385            // drop.
386            //
387            // We also propagate termination of that server to us.
388            let (sender, receiver) = oneshot::channel();
389            let server = build_server(receiver);
390            let name = self.name;
391            let shut_guard = self.shut_guard.take();
392            let server = async move {
393                // Make sure the runtime doesn't terminate before the graceful shutdown of the
394                // server.
395                let _shut_guard = shut_guard;
396                if let Err(e) = server.await {
397                    log_error!(
398                        Error,
399                        e.into()
400                            .context(format!("HTTP server error {}", name))
401                            .into()
402                    );
403                }
404                trace!("Server {} terminated", name);
405            };
406            let join = tokio::spawn(server);
407            self.join = Some(join);
408            self.sender = Some(sender);
409        }
410
411        match Pin::new(self.join.as_mut().expect("Missing join handle")).poll(ctx) {
412            Poll::Ready(Ok(())) => {
413                debug!("Future of server {} terminated", self.name);
414                Poll::Ready(())
415            }
416            Poll::Ready(Err(e)) => {
417                debug!("Future of server {} errored out", self.name);
418                log_error!(
419                    Error,
420                    e.context(format!("HTTP server {} failed", self.name))
421                        .into()
422                );
423                Poll::Ready(())
424            }
425            Poll::Pending => {
426                trace!("Future of server {} is still pending", self.name);
427                Poll::Pending
428            }
429        }
430    }
431}
432
433/// A [`Transformation`] to turn a [`Builder`] into a [`Server`].
434///
435/// The value wrapped in this shall be a closure taking:
436/// * The [`Builder`]
437/// * The configuration fragment ([`HyperServer`])
438/// * A `&str` name
439/// * A [`Receiver<()>`][Receiver].
440///
441/// It shall produce a server wrapped with a graceful shutdown. Technically, it can produce
442/// whatever future that'll terminate once the [`Receiver`] contains something.
443///
444/// Constructing the server builder directly is a bit of a chore (bunch of cloning, lifetimes, and
445/// it's something like 3rd-order function ‒ function that returns a function that returns a
446/// function...). In many cases, the [`server_from_handler`] can help.
447///
448/// An instance of [`ServerBuilder`] goes inside.
449pub struct BuildServer<BS>(pub BS);
450
451impl<Tr, Inst, BS> Transformation<Builder<Acceptor<Tr::Resource>>, Inst, HyperServer<Tr>>
452    for BuildServer<BS>
453where
454    Tr: Fragment + Clone + Send + 'static,
455    Tr::Resource: Send,
456    BS: ServerBuilder<Tr> + Clone + Send + 'static,
457    BS::OutputFut: Future<Output = Result<(), HyperError>>,
458{
459    type OutputResource = Activate<BS::OutputFut>;
460    type OutputInstaller = FutureInstaller;
461    fn installer(&mut self, _ii: Inst, _name: &'static str) -> Self::OutputInstaller {
462        FutureInstaller::default()
463    }
464    fn transform(
465        &mut self,
466        builder: Builder<Acceptor<Tr::Resource>>,
467        cfg: &HyperServer<Tr>,
468        name: &'static str,
469    ) -> Result<Self::OutputResource, AnyError> {
470        let build_server = self.0.clone();
471        let cfg = cfg.clone();
472        let build_server = move |receiver| build_server.build(builder, &cfg, name, receiver);
473        Ok(Activate {
474            build_server: Some(Box::new(build_server)),
475            shut_guard: runtime::shut_guard(),
476            join: None,
477            name,
478            sender: None,
479        })
480    }
481}
482
483/// A trait abstracting the creation of servers.
484///
485/// When spawning a server, there are 3 layers.
486///
487/// * A layer creating the [`Server`] from the builder.
488/// * A layer creating a service for each connection.
489/// * A layer responding to one request.
490///
491/// Each layer must be able to create new instances of the lower layer (by cloning, creating new
492/// instances, etc).
493///
494/// This represents the top-level layer. This shall do:
495///
496/// * Create the [`Server`].
497/// * Call the [`with_graceful_shutdown`][Server::with_graceful_shutdown] on it, tying it to the
498///   passed `shutdown` parameter.
499///
500/// You don't have to implement the trait by hand, a closure with the corresponding signature (see
501/// [`build`][ServerBuilder::build]) does the job.
502///
503/// This exists for two reasons:
504///
505/// * To enable different implementations than just closures.
506/// * To allow it to live in `impl Trait` position.
507///
508/// # Examples
509///
510/// ```
511/// use std::convert::Infallible;
512///
513/// use hyper::{Body, Request, Response};
514/// use hyper::server::Builder;
515/// use hyper::service::{make_service_fn, service_fn};
516/// use serde::Deserialize;
517/// use spirit::{Empty, Pipeline, Spirit};
518/// use spirit::prelude::*;
519/// use spirit_hyper::{BuildServer, HttpServer};
520/// use spirit_tokio::net::limits::Tracked;
521/// use tokio::net::TcpStream;
522/// use tokio::sync::oneshot::Receiver;
523///
524/// const DEFAULT_CONFIG: &str = r#"
525/// [server]
526/// port = 2235
527/// "#;
528///
529/// #[derive(Default, Deserialize)]
530/// struct Config {
531///     server: HttpServer,
532/// }
533///
534/// impl Config {
535///     fn server(&self) -> HttpServer {
536///         self.server.clone()
537///     }
538/// }
539///
540/// async fn request() -> Response<Body> {
541///     Response::new(Body::from("Hello world\n"))
542/// }
543///
544/// type Connection = Tracked<TcpStream>;
545///
546/// fn main() {
547///     let build_server =
548///         |builder: Builder<_>, cfg: &HttpServer, name: &str, shutdown: Receiver<()>| {
549///             eprintln!("Creating server {} for {:?}", name, cfg);
550///             builder
551///                 .serve(make_service_fn(|conn: &Connection| {
552///                     let conn_addr = conn.peer_addr().expect("Peer address doesn't fail");
553///                     eprintln!("New connection {}", conn_addr);
554///                     async {
555///                         Ok::<_, Infallible>(service_fn(|_req: Request<Body>| async {
556///                             Ok::<_, Infallible>(request().await)
557///                         }))
558///                     }
559///                 }))
560///                 .with_graceful_shutdown(async {
561///                     // Shutting down both by receiving a message and the other end being
562///                     // dropped.
563///                     let _ = shutdown.await;
564///                 })
565///         };
566///     Spirit::<Empty, Config>::new()
567///         .config_defaults(DEFAULT_CONFIG)
568///         .with(
569///             // Let's build a http server as configured by the user
570///             Pipeline::new("listen")
571///                 .extract_cfg(Config::server)
572///                 // This is where we teach the server what it serves. It is the usual stuff from
573///                 // hyper.
574///                 .transform(BuildServer(build_server))
575///                 .check()
576///         )
577///         .run(|spirit| {
578/// #           let spirit = std::sync::Arc::clone(spirit);
579/// #           std::thread::spawn(move || spirit.terminate());
580///             Ok(())
581///         });
582/// }
583/// ```
584pub trait ServerBuilder<Tr>
585where
586    Tr: Fragment,
587{
588    /// The future returned by the build.
589    ///
590    /// The future shall represent the graceful shut down server.
591    type OutputFut: Future<Output = Result<(), HyperError>> + Send;
592
593    /// Invokes the build with the parameters.
594    ///
595    /// Directly corresponds to calling the closure for the blank implementation.
596    fn build(
597        &self,
598        builder: Builder<Acceptor<Tr::Resource>>,
599        cfg: &HyperServer<Tr>,
600        name: &'static str,
601        shutdown: Receiver<()>,
602    ) -> Self::OutputFut;
603}
604
605impl<F, Tr, Fut> ServerBuilder<Tr> for F
606where
607    Tr: Fragment,
608    F: Fn(Builder<Acceptor<Tr::Resource>>, &HyperServer<Tr>, &'static str, Receiver<()>) -> Fut,
609    Fut: Future<Output = Result<(), HyperError>> + Send,
610{
611    type OutputFut = Fut;
612
613    fn build(
614        &self,
615        builder: Builder<Acceptor<Tr::Resource>>,
616        cfg: &HyperServer<Tr>,
617        name: &'static str,
618        shutdown: Receiver<()>,
619    ) -> Fut {
620        self(builder, cfg, name, shutdown)
621    }
622}
623
624/// A simplified version of creating the [`ServerBuilder`].
625///
626/// Implementing the [`ServerBuilder`] by hand is possible and sometimes necessary (it is more
627/// flexible ‒ one can receive parameters on the way ‒ the configuration of the specific server
628/// instance in case there are multiple, getting access to the connection for which a service is
629/// being created, unusual ways of creating the instances). But it is quite a chore with a lot of
630/// `async` and `move` involved (you can have a look at source code of this function to get the
631/// idea, the `[SRC]` button at the right).
632///
633/// This gets it done for the very simple case ‒ in case there's an async function/closure that
634/// just takes a [`Request<Body>`][Request] and returns a [`Response<Body>`][Response].
635///
636/// See the [crate level][self] example.
637pub fn server_from_handler<H, Tr, S>(handler: H) -> impl ServerBuilder<Tr> + Clone + Send
638where
639    Tr: Fragment,
640    Tr::Resource: SpiritAccept + Unpin,
641    <Tr::Resource as SpiritAccept>::Connection: AsyncRead + AsyncWrite + Unpin,
642    H: Clone + Send + Sync + Fn(Request<Body>) -> S + 'static,
643    S: Future<Output = Response<Body>> + Send + 'static,
644{
645    move |builder: Builder<Acceptor<Tr::Resource>>, _: &_, name, shutdown| {
646        debug!("Creating server instance {}", name);
647        let handler = handler.clone();
648        builder
649            .serve(make_service_fn(move |_conn| {
650                trace!("Creating a service for {}", name);
651                let handler = handler.clone();
652                async move {
653                    Ok::<_, Infallible>(service_fn(move |req| {
654                        let handler = handler.clone();
655                        async move { Ok::<_, Infallible>(handler(req).await) }
656                    }))
657                }
658            }))
659            .with_graceful_shutdown(async move {
660                let _ = shutdown.await;
661            })
662    }
663}