hyper_fast/server/
http_server.rs1use std::mem;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::Instant;
5
6use anyhow::Context;
7use http::{Method, Request};
8use hyper::Body;
9use hyper::server::conn::AddrStream;
10use hyper::service::{make_service_fn, service_fn};
11#[allow(unused_imports)]
12use log::{debug, error, info, warn};
13
14use crate::server::{HttpResult, IN_ROTATION, Service, ServiceBuilder, ServiceDaemon, SHUTDOWN};
15
16use super::health_check::{get_in_rotation_status, oor_handler};
17use super::http_response::HttpResponse;
18use super::HttpRoute;
19#[cfg(any(feature = "access_log", feature = "metrics"))]
20use super::logger;
21#[cfg(feature = "metrics")]
22use super::logger::METRICS_LOGGER;
23
24fn index(route: &HttpRoute<'_>) -> HttpResult {
25 let body = Body::from("Hello, World!");
26 HttpResponse::ok(route, body)
27}
28
29async fn shutdown_signal() {
30 info!("Installing server shutdown signal");
32
33 tokio::signal::ctrl_c()
34 .await
35 .expect("failed to install CTRL+C signal handler");
36
37 SHUTDOWN.store(true, std::sync::atomic::Ordering::Relaxed);
38 IN_ROTATION.store(false, std::sync::atomic::Ordering::Relaxed);
39
40 warn!("Received server shutdown signal");
41 std::process::exit(0);
42}
43
44async fn route_handler<App>(
46 mut req: Request<Body>,
47 remote_addr: SocketAddr,
48 app: Arc<App>,
49) -> HttpResult
50 where
51 App: 'static + Service,
52{
53 let req_time = chrono::Local::now();
54 let req_instant = Instant::now();
55
56 let req_body = mem::replace(req.body_mut(), Body::empty());
57 let route = HttpRoute::new(&req, req_time, req_instant, remote_addr);
58
59 let parts: Vec<_> = route
60 .path
61 .split("/")
62 .filter(|part| !part.is_empty())
63 .collect();
64
65 let response = match &parts[..] {
66 [] if matches!(route.method, &Method::GET) => index(&route),
67 ["oor"] => oor_handler(&route),
68 ["health"] if matches!(route.method, &Method::GET) => get_in_rotation_status(&route),
69
70 #[cfg(feature = "metrics")]
71 ["metrics", rest @ ..] => METRICS_LOGGER.api_handler(req_body, &route, rest).await,
72
73 ["api", rest @ ..] => app.api_handler(req_body, &route, rest).await,
74 _ => HttpResponse::not_found(route.path),
75 };
76
77 #[cfg(feature = "response_time")]
78 let response = match response {
79 Ok(mut response) => {
80 let time_taken = format!("{}", humantime::Duration::from(req_instant.elapsed()));
81 let time_taken_header = http::HeaderValue::from_str(&time_taken)
82 .with_context(|| format!("Error in building header value time_taken"))?;
83 response
84 .headers_mut()
85 .append("X-time-taken", time_taken_header);
86 Ok(response)
87 }
88 Err(err) => err.into(),
89 };
90
91 #[cfg(any(feature = "access_log", feature = "metrics"))]
93 logger::log_api(&route, &response);
94
95 response
96}
97
98pub async fn start_http_server<App, AppDaemon, AppBuilder>(
99 addr: &str,
100 app_builder: AppBuilder,
101) -> anyhow::Result<()>
102 where
103 App: 'static + Service,
104 AppDaemon: 'static + ServiceDaemon<App>,
105 AppBuilder: 'static + ServiceBuilder<App, AppDaemon>,
106{
107 info!("Starting server at addr: {}", addr);
108
109 let addr = addr
110 .parse::<SocketAddr>()
111 .with_context(|| format!("Parsing node addr '{}' as SocketAddr", addr))?;
112
113 let (app, app_daemon) = app_builder
114 .build()
115 .await
116 .with_context(|| "Error in building app")?;
117 let app = Arc::new(app);
118
119 if let Some(app_daemon) = app_daemon {
120 let cloned_app = app.clone();
122 tokio::task::spawn(async move {
123 app_daemon.start(cloned_app).await;
124 });
125 }
126
127 let make_svc = make_service_fn(move |transport: &AddrStream| {
128 let remote_addr = transport.remote_addr();
130 let app = app.clone();
131
132 async move {
133 Ok::<_, anyhow::Error>(service_fn(move |req| {
134 route_handler(req, remote_addr, app.clone())
136 }))
137 }
138 });
139
140 let server = hyper::Server::try_bind(&addr)
141 .with_context(|| "Error in binding to address")?
142 .http1_keepalive(true)
143 .http1_preserve_header_case(true)
144 .http1_title_case_headers(true)
145 .serve(make_svc);
146
147 let graceful = server.with_graceful_shutdown(shutdown_signal());
148
149 info!("Started server");
150
151 graceful.await.with_context(|| "Error in starting server")
153}