spirit_hyper/lib.rs
1#![doc(test(attr(deny(warnings))))]
2// Our program-long snippets are more readable with main
3#![allow(clippy::needless_doctest_main)]
4#![forbid(unsafe_code)]
5#![warn(missing_docs)]
6#![cfg_attr(docsrs, feature(doc_cfg))]
7
8//! [Spirit][spirit] extension for Hyper servers.
9//!
10//! This allows having Hyper servers auto-spawned from configuration. It is possible to put them on
11//! top of arbitrary stream-style IO objects (TcpStream, UdsStream, these wrapped in SSL...).
12//!
13//! # Tokio runtime
14//!
15//! This uses the [`spirit_tokio`] crate under the hood. Similar drawback with initializing a
16//! runtime applies here too (see the [`spirit_tokio`] docs for details).
17//!
18//! # Examples
19//!
20//! ```rust
21//! use hyper::{Body, Request, Response};
22//! use serde::Deserialize;
23//! use spirit::{Empty, Pipeline, Spirit};
24//! use spirit::prelude::*;
25//! use spirit_hyper::{server_from_handler, BuildServer, HttpServer};
26//!
27//! const DEFAULT_CONFIG: &str = r#"
28//! [server]
29//! port = 2234
30//! "#;
31//!
32//! #[derive(Default, Deserialize)]
33//! struct Config {
34//! server: HttpServer,
35//! }
36//!
37//! impl Config {
38//! fn server(&self) -> HttpServer {
39//! self.server.clone()
40//! }
41//! }
42//!
43//! async fn request(_req: Request<Body>) -> Response<Body> {
44//! Response::new(Body::from("Hello world\n"))
45//! }
46//!
47//! fn main() {
48//! Spirit::<Empty, Config>::new()
49//! .config_defaults(DEFAULT_CONFIG)
50//! .with(
51//! // Let's build a http server as configured by the user
52//! Pipeline::new("listen")
53//! .extract_cfg(Config::server)
54//! // This is where we teach the server what it serves. It is the usual stuff from
55//! // hyper.
56//! .transform(BuildServer(server_from_handler(request)))
57//! )
58//! .run(|spirit| {
59//! # let spirit = std::sync::Arc::clone(spirit);
60//! # std::thread::spawn(move || spirit.terminate());
61//! Ok(())
62//! });
63//! }
64//! ```
65//!
66//! Further examples (with more flexible handling) are in the
67//! [git repository](https://github.com/vorner/spirit/tree/master/spirit-hyper/examples).
68
69use std::convert::Infallible;
70use std::fmt::Debug;
71use std::future::Future;
72use std::io::Error as IoError;
73use std::pin::Pin;
74use std::task::{Context, Poll};
75use std::time::Duration;
76
77use err_context::prelude::*;
78use err_context::AnyError;
79use hyper::body::Body;
80use hyper::server::accept::Accept as HyperAccept;
81use hyper::server::{Builder, Server};
82use hyper::service::{make_service_fn, service_fn};
83use hyper::{Error as HyperError, Request, Response};
84use log::{debug, trace};
85use pin_project::pin_project;
86use serde::{Deserialize, Serialize};
87use spirit::fragment::driver::{CacheSimilar, Comparable, Comparison};
88use spirit::fragment::{Fragment, Stackable, Transformation};
89use spirit::utils::{deserialize_opt_duration, is_default, is_true, serialize_opt_duration};
90use spirit::{log_error, Empty};
91use spirit_tokio::net::limits::WithLimits;
92use spirit_tokio::net::{Accept as SpiritAccept, TcpListen};
93use spirit_tokio::runtime::{self, ShutGuard};
94use spirit_tokio::FutureInstaller;
95#[cfg(feature = "cfg-help")]
96use structdoc::StructDoc;
97use tokio::io::{AsyncRead, AsyncWrite};
98use tokio::sync::oneshot::{self, Receiver, Sender};
99use tokio::task::JoinHandle;
100
101const KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
102
103fn is_default_timeout(t: &Duration) -> bool {
104 *t == KEEPALIVE_TIMEOUT
105}
106
107/// Configuration of the selected HTTP protocol version.
108#[derive(Copy, Clone, Debug, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize)]
109#[cfg_attr(feature = "cfg-help", derive(StructDoc))]
110#[serde(rename_all = "kebab-case")]
111#[non_exhaustive]
112pub enum HttpMode {
113 /// Enable both HTTP1 and HTTP2 protocols.
114 Both,
115
116 /// Disable the HTTP2 protocol.
117 #[serde(rename = "http1-only")]
118 Http1Only,
119
120 /// Disable the HTTP1 protocol.
121 #[serde(rename = "http2-only")]
122 Http2Only,
123}
124
125impl Default for HttpMode {
126 fn default() -> Self {
127 HttpMode::Both
128 }
129}
130
131/// Configuration of Hyper HTTP servers.
132///
133/// This are the things that are extra over the transport. It doesn't contain any kind of ports or
134/// SSL certificates, these are added inside the [`HyperServer`]. This is only for configuring the
135/// HTTP protocol itself.
136#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize)]
137#[cfg_attr(feature = "cfg-help", derive(StructDoc))]
138#[serde(rename_all = "kebab-case", default)]
139#[non_exhaustive]
140pub struct HyperCfg {
141 /// The HTTP keepalive.
142 ///
143 /// <https://en.wikipedia.org/wiki/HTTP_persistent_connection>.
144 ///
145 /// Default is on, can be turned off.
146 #[serde(skip_serializing_if = "is_true")]
147 pub http1_keepalive: bool,
148
149 /// When a http1 client closes its write end, keep the connection open until the reply is sent.
150 ///
151 /// If set to false, if the client closes its connection, server does too.
152 #[serde(skip_serializing_if = "is_true")]
153 pub http1_half_close: bool,
154
155 /// Maximum buffer size of HTTP1.
156 #[serde(skip_serializing_if = "Option::is_none")]
157 pub http1_max_buf_size: Option<usize>,
158
159 /// Initial window size.
160 #[serde(skip_serializing_if = "Option::is_none")]
161 pub http2_initial_stream_window_size: Option<u32>,
162
163 /// Initial window size.
164 #[serde(skip_serializing_if = "Option::is_none")]
165 pub http2_initial_connection_window_size: Option<u32>,
166
167 /// Choose the window sizes dynamically at runtime.
168 ///
169 /// If turned off (the default), uses the values configured.
170 #[serde(default, skip_serializing_if = "is_default")]
171 pub http2_adaptive_window: bool,
172
173 /// Maximum number of concurrent streams.
174 ///
175 /// Defaults to no limit.
176 #[serde(skip_serializing_if = "Option::is_none")]
177 pub http2_max_concurrent_streams: Option<u32>,
178
179 /// The maximum frame size of http2.
180 #[serde(skip_serializing_if = "Option::is_none")]
181 pub http2_max_frame_size: Option<u32>,
182
183 /// How often to send keep alive/ping frames.
184 ///
185 /// Defaults to disabled.
186 #[serde(
187 deserialize_with = "deserialize_opt_duration",
188 serialize_with = "serialize_opt_duration",
189 skip_serializing_if = "Option::is_none"
190 )]
191 pub http2_keep_alive_interval: Option<Duration>,
192
193 /// Close connection if no response for ping in this time.
194 ///
195 /// Defaults to 20s.
196 #[serde(skip_serializing_if = "is_default_timeout")]
197 pub http2_keep_alive_timeout: Duration,
198
199 /// What protocols are enabled.
200 #[serde(default, skip_serializing_if = "is_default")]
201 pub http_mode: HttpMode,
202}
203
204impl HyperCfg {
205 /// Constructs a Hyper server [`Builder`] based on this configuration.
206 pub fn builder<I>(&self, incoming: I) -> Builder<I> {
207 let (h1_only, h2_only) = match self.http_mode {
208 HttpMode::Both => (false, false),
209 HttpMode::Http1Only => (true, false),
210 HttpMode::Http2Only => (false, true),
211 };
212
213 let mut builder = Server::builder(incoming)
214 .http1_keepalive(self.http1_keepalive)
215 .http1_half_close(self.http1_half_close)
216 .http2_initial_connection_window_size(self.http2_initial_connection_window_size)
217 .http2_initial_stream_window_size(self.http2_initial_stream_window_size)
218 .http2_adaptive_window(self.http2_adaptive_window)
219 .http2_max_concurrent_streams(self.http2_max_concurrent_streams)
220 .http2_max_frame_size(self.http2_max_frame_size)
221 .http2_keep_alive_interval(self.http2_keep_alive_interval)
222 .http2_keep_alive_timeout(self.http2_keep_alive_timeout)
223 .http1_only(h1_only)
224 .http2_only(h2_only);
225
226 if let Some(size) = self.http1_max_buf_size {
227 builder = builder.http1_max_buf_size(size);
228 }
229
230 builder
231 }
232}
233
234impl Default for HyperCfg {
235 fn default() -> Self {
236 HyperCfg {
237 http1_keepalive: true,
238 http1_half_close: true,
239 http1_max_buf_size: None,
240 http2_initial_connection_window_size: None,
241 http2_initial_stream_window_size: None,
242 http2_adaptive_window: false,
243 http2_max_concurrent_streams: None,
244 http2_max_frame_size: None,
245 http2_keep_alive_interval: None,
246 http2_keep_alive_timeout: KEEPALIVE_TIMEOUT,
247 http_mode: HttpMode::default(),
248 }
249 }
250}
251
252/// A plumbing wrapper type.
253///
254/// Not of direct interest to users, though it might leak to some function signatures at an
255/// occasion. The real listener socket is wrapped inside.
256#[pin_project]
257#[derive(Copy, Clone, Debug)]
258pub struct Acceptor<A>(#[pin] A);
259
260impl<A: SpiritAccept> HyperAccept for Acceptor<A> {
261 type Conn = A::Connection;
262 type Error = IoError;
263 fn poll_accept(
264 self: Pin<&mut Self>,
265 ctx: &mut Context,
266 ) -> Poll<Option<Result<Self::Conn, IoError>>> {
267 self.project()
268 .0
269 .poll_accept(ctx)
270 .map(|p| p.map(Some).transpose())
271 }
272}
273
274/// A [`Fragment`] for hyper servers.
275///
276/// This is a wrapper around a `Transport` [`Fragment`]. It takes something that accepts
277/// connections ‒ like [`TcpListen`] and adds configuration specific for a HTTP server.
278///
279/// The [`Fragment`] produces [hyper] [Builder]. The [`BuildServer`] transformation can be used to
280/// make it into a [`Server`] and install it into a tokio runtime.
281///
282/// See also the [`HttpServer`] type alias.
283///
284/// # Configuration options
285///
286/// In addition to options already provided by the `Transport`, these options are added:
287///
288/// * `http1-keepalive`: boolean, default true.
289/// * `http-mode`: One of `"both"`, `"http1-only"` or `"http2-only"`. Defaults to `"both"`.
290#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize)]
291#[cfg_attr(feature = "cfg-help", derive(StructDoc))]
292#[serde(rename_all = "kebab-case")]
293#[non_exhaustive]
294pub struct HyperServer<Transport> {
295 /// The inner transport.
296 ///
297 /// This is accessible by the user in case it contains something of use to the
298 /// [`Transformation`]s.
299 #[serde(flatten)]
300 pub transport: Transport,
301
302 /// Configuration of Hyper.
303 ///
304 /// The HTTP configuration is inside this.
305 #[serde(flatten)]
306 pub hyper_cfg: HyperCfg,
307}
308
309impl<Transport: Comparable> Comparable for HyperServer<Transport> {
310 fn compare(&self, other: &Self) -> Comparison {
311 let transport_cmp = self.transport.compare(&other.transport);
312 if transport_cmp == Comparison::Same && self.hyper_cfg != other.hyper_cfg {
313 Comparison::Similar
314 } else {
315 transport_cmp
316 }
317 }
318}
319
320impl<Transport> Fragment for HyperServer<Transport>
321where
322 Transport: Fragment + Debug + Clone + Comparable,
323{
324 type Driver = CacheSimilar<Self>;
325 type Installer = ();
326 type Seed = Transport::Seed;
327 type Resource = Builder<Acceptor<Transport::Resource>>;
328 fn make_seed(&self, name: &'static str) -> Result<Self::Seed, AnyError> {
329 self.transport.make_seed(name)
330 }
331 fn make_resource(
332 &self,
333 seed: &mut Self::Seed,
334 name: &'static str,
335 ) -> Result<Self::Resource, AnyError> {
336 debug!("Creating HTTP server {}", name);
337
338 let transport = self.transport.make_resource(seed, name)?;
339 let builder = self.hyper_cfg.builder(Acceptor(transport));
340
341 Ok(builder)
342 }
343}
344
345impl<Transport> Stackable for HyperServer<Transport> where Transport: Stackable {}
346
347/// A type alias for http (plain TCP) hyper server.
348pub type HttpServer<ExtraCfg = Empty> = HyperServer<WithLimits<TcpListen<ExtraCfg>>>;
349
350/// A plumbing helper type.
351pub struct Activate<Fut> {
352 build_server: Option<Box<dyn FnOnce(Receiver<()>) -> Fut + Send>>,
353 shut_guard: Option<ShutGuard>,
354 sender: Option<Sender<()>>,
355 join: Option<JoinHandle<()>>,
356 name: &'static str,
357}
358
359impl<Fut> Drop for Activate<Fut> {
360 fn drop(&mut self) {
361 // Tell the server to terminate
362 if let Some(sender) = self.sender.take() {
363 let _ = sender.send(());
364 }
365 }
366}
367
368impl<Fut, E> Future for Activate<Fut>
369where
370 Fut: Future<Output = Result<(), E>> + Send + 'static,
371 E: Into<AnyError>,
372{
373 type Output = ();
374
375 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<()> {
376 trace!("Poll on Activate({})", self.name);
377 if let Some(build_server) = self.build_server.take() {
378 trace!("Activating {}", self.name);
379 // Hack: Hyper is very unfriendly to constraining type parameters (not only there's
380 // like a whole paragraph on their methods, some of the traits are private so one can't
381 // really ask for them :-(.
382 //
383 // We don't want to box our future, and we want to remotely ask for graceful shutdown.
384 // Therefore we spawn the returned future separately and signal the shutdown from our
385 // drop.
386 //
387 // We also propagate termination of that server to us.
388 let (sender, receiver) = oneshot::channel();
389 let server = build_server(receiver);
390 let name = self.name;
391 let shut_guard = self.shut_guard.take();
392 let server = async move {
393 // Make sure the runtime doesn't terminate before the graceful shutdown of the
394 // server.
395 let _shut_guard = shut_guard;
396 if let Err(e) = server.await {
397 log_error!(
398 Error,
399 e.into()
400 .context(format!("HTTP server error {}", name))
401 .into()
402 );
403 }
404 trace!("Server {} terminated", name);
405 };
406 let join = tokio::spawn(server);
407 self.join = Some(join);
408 self.sender = Some(sender);
409 }
410
411 match Pin::new(self.join.as_mut().expect("Missing join handle")).poll(ctx) {
412 Poll::Ready(Ok(())) => {
413 debug!("Future of server {} terminated", self.name);
414 Poll::Ready(())
415 }
416 Poll::Ready(Err(e)) => {
417 debug!("Future of server {} errored out", self.name);
418 log_error!(
419 Error,
420 e.context(format!("HTTP server {} failed", self.name))
421 .into()
422 );
423 Poll::Ready(())
424 }
425 Poll::Pending => {
426 trace!("Future of server {} is still pending", self.name);
427 Poll::Pending
428 }
429 }
430 }
431}
432
433/// A [`Transformation`] to turn a [`Builder`] into a [`Server`].
434///
435/// The value wrapped in this shall be a closure taking:
436/// * The [`Builder`]
437/// * The configuration fragment ([`HyperServer`])
438/// * A `&str` name
439/// * A [`Receiver<()>`][Receiver].
440///
441/// It shall produce a server wrapped with a graceful shutdown. Technically, it can produce
442/// whatever future that'll terminate once the [`Receiver`] contains something.
443///
444/// Constructing the server builder directly is a bit of a chore (bunch of cloning, lifetimes, and
445/// it's something like 3rd-order function ‒ function that returns a function that returns a
446/// function...). In many cases, the [`server_from_handler`] can help.
447///
448/// An instance of [`ServerBuilder`] goes inside.
449pub struct BuildServer<BS>(pub BS);
450
451impl<Tr, Inst, BS> Transformation<Builder<Acceptor<Tr::Resource>>, Inst, HyperServer<Tr>>
452 for BuildServer<BS>
453where
454 Tr: Fragment + Clone + Send + 'static,
455 Tr::Resource: Send,
456 BS: ServerBuilder<Tr> + Clone + Send + 'static,
457 BS::OutputFut: Future<Output = Result<(), HyperError>>,
458{
459 type OutputResource = Activate<BS::OutputFut>;
460 type OutputInstaller = FutureInstaller;
461 fn installer(&mut self, _ii: Inst, _name: &'static str) -> Self::OutputInstaller {
462 FutureInstaller::default()
463 }
464 fn transform(
465 &mut self,
466 builder: Builder<Acceptor<Tr::Resource>>,
467 cfg: &HyperServer<Tr>,
468 name: &'static str,
469 ) -> Result<Self::OutputResource, AnyError> {
470 let build_server = self.0.clone();
471 let cfg = cfg.clone();
472 let build_server = move |receiver| build_server.build(builder, &cfg, name, receiver);
473 Ok(Activate {
474 build_server: Some(Box::new(build_server)),
475 shut_guard: runtime::shut_guard(),
476 join: None,
477 name,
478 sender: None,
479 })
480 }
481}
482
483/// A trait abstracting the creation of servers.
484///
485/// When spawning a server, there are 3 layers.
486///
487/// * A layer creating the [`Server`] from the builder.
488/// * A layer creating a service for each connection.
489/// * A layer responding to one request.
490///
491/// Each layer must be able to create new instances of the lower layer (by cloning, creating new
492/// instances, etc).
493///
494/// This represents the top-level layer. This shall do:
495///
496/// * Create the [`Server`].
497/// * Call the [`with_graceful_shutdown`][Server::with_graceful_shutdown] on it, tying it to the
498/// passed `shutdown` parameter.
499///
500/// You don't have to implement the trait by hand, a closure with the corresponding signature (see
501/// [`build`][ServerBuilder::build]) does the job.
502///
503/// This exists for two reasons:
504///
505/// * To enable different implementations than just closures.
506/// * To allow it to live in `impl Trait` position.
507///
508/// # Examples
509///
510/// ```
511/// use std::convert::Infallible;
512///
513/// use hyper::{Body, Request, Response};
514/// use hyper::server::Builder;
515/// use hyper::service::{make_service_fn, service_fn};
516/// use serde::Deserialize;
517/// use spirit::{Empty, Pipeline, Spirit};
518/// use spirit::prelude::*;
519/// use spirit_hyper::{BuildServer, HttpServer};
520/// use spirit_tokio::net::limits::Tracked;
521/// use tokio::net::TcpStream;
522/// use tokio::sync::oneshot::Receiver;
523///
524/// const DEFAULT_CONFIG: &str = r#"
525/// [server]
526/// port = 2235
527/// "#;
528///
529/// #[derive(Default, Deserialize)]
530/// struct Config {
531/// server: HttpServer,
532/// }
533///
534/// impl Config {
535/// fn server(&self) -> HttpServer {
536/// self.server.clone()
537/// }
538/// }
539///
540/// async fn request() -> Response<Body> {
541/// Response::new(Body::from("Hello world\n"))
542/// }
543///
544/// type Connection = Tracked<TcpStream>;
545///
546/// fn main() {
547/// let build_server =
548/// |builder: Builder<_>, cfg: &HttpServer, name: &str, shutdown: Receiver<()>| {
549/// eprintln!("Creating server {} for {:?}", name, cfg);
550/// builder
551/// .serve(make_service_fn(|conn: &Connection| {
552/// let conn_addr = conn.peer_addr().expect("Peer address doesn't fail");
553/// eprintln!("New connection {}", conn_addr);
554/// async {
555/// Ok::<_, Infallible>(service_fn(|_req: Request<Body>| async {
556/// Ok::<_, Infallible>(request().await)
557/// }))
558/// }
559/// }))
560/// .with_graceful_shutdown(async {
561/// // Shutting down both by receiving a message and the other end being
562/// // dropped.
563/// let _ = shutdown.await;
564/// })
565/// };
566/// Spirit::<Empty, Config>::new()
567/// .config_defaults(DEFAULT_CONFIG)
568/// .with(
569/// // Let's build a http server as configured by the user
570/// Pipeline::new("listen")
571/// .extract_cfg(Config::server)
572/// // This is where we teach the server what it serves. It is the usual stuff from
573/// // hyper.
574/// .transform(BuildServer(build_server))
575/// .check()
576/// )
577/// .run(|spirit| {
578/// # let spirit = std::sync::Arc::clone(spirit);
579/// # std::thread::spawn(move || spirit.terminate());
580/// Ok(())
581/// });
582/// }
583/// ```
584pub trait ServerBuilder<Tr>
585where
586 Tr: Fragment,
587{
588 /// The future returned by the build.
589 ///
590 /// The future shall represent the graceful shut down server.
591 type OutputFut: Future<Output = Result<(), HyperError>> + Send;
592
593 /// Invokes the build with the parameters.
594 ///
595 /// Directly corresponds to calling the closure for the blank implementation.
596 fn build(
597 &self,
598 builder: Builder<Acceptor<Tr::Resource>>,
599 cfg: &HyperServer<Tr>,
600 name: &'static str,
601 shutdown: Receiver<()>,
602 ) -> Self::OutputFut;
603}
604
605impl<F, Tr, Fut> ServerBuilder<Tr> for F
606where
607 Tr: Fragment,
608 F: Fn(Builder<Acceptor<Tr::Resource>>, &HyperServer<Tr>, &'static str, Receiver<()>) -> Fut,
609 Fut: Future<Output = Result<(), HyperError>> + Send,
610{
611 type OutputFut = Fut;
612
613 fn build(
614 &self,
615 builder: Builder<Acceptor<Tr::Resource>>,
616 cfg: &HyperServer<Tr>,
617 name: &'static str,
618 shutdown: Receiver<()>,
619 ) -> Fut {
620 self(builder, cfg, name, shutdown)
621 }
622}
623
624/// A simplified version of creating the [`ServerBuilder`].
625///
626/// Implementing the [`ServerBuilder`] by hand is possible and sometimes necessary (it is more
627/// flexible ‒ one can receive parameters on the way ‒ the configuration of the specific server
628/// instance in case there are multiple, getting access to the connection for which a service is
629/// being created, unusual ways of creating the instances). But it is quite a chore with a lot of
630/// `async` and `move` involved (you can have a look at source code of this function to get the
631/// idea, the `[SRC]` button at the right).
632///
633/// This gets it done for the very simple case ‒ in case there's an async function/closure that
634/// just takes a [`Request<Body>`][Request] and returns a [`Response<Body>`][Response].
635///
636/// See the [crate level][self] example.
637pub fn server_from_handler<H, Tr, S>(handler: H) -> impl ServerBuilder<Tr> + Clone + Send
638where
639 Tr: Fragment,
640 Tr::Resource: SpiritAccept + Unpin,
641 <Tr::Resource as SpiritAccept>::Connection: AsyncRead + AsyncWrite + Unpin,
642 H: Clone + Send + Sync + Fn(Request<Body>) -> S + 'static,
643 S: Future<Output = Response<Body>> + Send + 'static,
644{
645 move |builder: Builder<Acceptor<Tr::Resource>>, _: &_, name, shutdown| {
646 debug!("Creating server instance {}", name);
647 let handler = handler.clone();
648 builder
649 .serve(make_service_fn(move |_conn| {
650 trace!("Creating a service for {}", name);
651 let handler = handler.clone();
652 async move {
653 Ok::<_, Infallible>(service_fn(move |req| {
654 let handler = handler.clone();
655 async move { Ok::<_, Infallible>(handler(req).await) }
656 }))
657 }
658 }))
659 .with_graceful_shutdown(async move {
660 let _ = shutdown.await;
661 })
662 }
663}