hyper_fast/server/
http_server.rs

1use std::mem;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::Instant;
5
6use anyhow::Context;
7use http::{Method, Request};
8use hyper::Body;
9use hyper::server::conn::AddrStream;
10use hyper::service::{make_service_fn, service_fn};
11#[allow(unused_imports)]
12use log::{debug, error, info, warn};
13
14use crate::server::{HttpResult, IN_ROTATION, Service, ServiceBuilder, ServiceDaemon, SHUTDOWN};
15
16use super::health_check::{get_in_rotation_status, oor_handler};
17use super::http_response::HttpResponse;
18use super::HttpRoute;
19#[cfg(any(feature = "access_log", feature = "metrics"))]
20use super::logger;
21#[cfg(feature = "metrics")]
22use super::logger::METRICS_LOGGER;
23
24fn index(route: &HttpRoute<'_>) -> HttpResult {
25    let body = Body::from("Hello, World!");
26    HttpResponse::ok(route, body)
27}
28
29async fn shutdown_signal() {
30    // Wait for the CTRL+C signal
31    info!("Installing server shutdown signal");
32
33    tokio::signal::ctrl_c()
34        .await
35        .expect("failed to install CTRL+C signal handler");
36
37    SHUTDOWN.store(true, std::sync::atomic::Ordering::Relaxed);
38    IN_ROTATION.store(false, std::sync::atomic::Ordering::Relaxed);
39
40    warn!("Received server shutdown signal");
41    std::process::exit(0);
42}
43
44// TODO: payload limit - json_payload_limit_conf()
45async fn route_handler<App>(
46    mut req: Request<Body>,
47    remote_addr: SocketAddr,
48    app: Arc<App>,
49) -> HttpResult
50    where
51        App: 'static + Service,
52{
53    let req_time = chrono::Local::now();
54    let req_instant = Instant::now();
55
56    let req_body = mem::replace(req.body_mut(), Body::empty());
57    let route = HttpRoute::new(&req, req_time, req_instant, remote_addr);
58
59    let parts: Vec<_> = route
60        .path
61        .split("/")
62        .filter(|part| !part.is_empty())
63        .collect();
64
65    let response = match &parts[..] {
66        [] if matches!(route.method, &Method::GET) => index(&route),
67        ["oor"] => oor_handler(&route),
68        ["health"] if matches!(route.method, &Method::GET) => get_in_rotation_status(&route),
69
70        #[cfg(feature = "metrics")]
71        ["metrics", rest @ ..] => METRICS_LOGGER.api_handler(req_body, &route, rest).await,
72
73        ["api", rest @ ..] => app.api_handler(req_body, &route, rest).await,
74        _ => HttpResponse::not_found(route.path),
75    };
76
77    #[cfg(feature = "response_time")]
78        let response = match response {
79        Ok(mut response) => {
80            let time_taken = format!("{}", humantime::Duration::from(req_instant.elapsed()));
81            let time_taken_header = http::HeaderValue::from_str(&time_taken)
82                .with_context(|| format!("Error in building header value time_taken"))?;
83            response
84                .headers_mut()
85                .append("X-time-taken", time_taken_header);
86            Ok(response)
87        }
88        Err(err) => err.into(),
89    };
90
91    // log & metrics
92    #[cfg(any(feature = "access_log", feature = "metrics"))]
93    logger::log_api(&route, &response);
94
95    response
96}
97
98pub async fn start_http_server<App, AppDaemon, AppBuilder>(
99    addr: &str,
100    app_builder: AppBuilder,
101) -> anyhow::Result<()>
102    where
103        App: 'static + Service,
104        AppDaemon: 'static + ServiceDaemon<App>,
105        AppBuilder: 'static + ServiceBuilder<App, AppDaemon>,
106{
107    info!("Starting server at addr: {}", addr);
108
109    let addr = addr
110        .parse::<SocketAddr>()
111        .with_context(|| format!("Parsing node addr '{}' as SocketAddr", addr))?;
112
113    let (app, app_daemon) = app_builder
114        .build()
115        .await
116        .with_context(|| "Error in building app")?;
117    let app = Arc::new(app);
118
119    if let Some(app_daemon) = app_daemon {
120        // TODO: capture join handle and use in shutdown signal
121        let cloned_app = app.clone();
122        tokio::task::spawn(async move {
123            app_daemon.start(cloned_app).await;
124        });
125    }
126
127    let make_svc = make_service_fn(move |transport: &AddrStream| {
128        // TODO: log new connection
129        let remote_addr = transport.remote_addr();
130        let app = app.clone();
131
132        async move {
133            Ok::<_, anyhow::Error>(service_fn(move |req| {
134                // Clone again to ensure that client outlives this closure.
135                route_handler(req, remote_addr, app.clone())
136            }))
137        }
138    });
139
140    let server = hyper::Server::try_bind(&addr)
141        .with_context(|| "Error in binding to address")?
142        .http1_keepalive(true)
143        .http1_preserve_header_case(true)
144        .http1_title_case_headers(true)
145        .serve(make_svc);
146
147    let graceful = server.with_graceful_shutdown(shutdown_signal());
148
149    info!("Started server");
150
151    // Run this server for... forever!
152    graceful.await.with_context(|| "Error in starting server")
153}