mmids_core/http_api/
mod.rs1pub mod handlers;
5pub mod routing;
6
7use crate::http_api::routing::RoutingTable;
8use hyper::header::HeaderName;
9use hyper::server::conn::AddrStream;
10use hyper::service::{make_service_fn, service_fn};
11use hyper::{Body, Request, Response, Server, StatusCode};
12use std::net::SocketAddr;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::oneshot::{channel, Receiver, Sender};
16use tracing::{error, info, instrument};
17use uuid::Uuid;
18
19pub struct HttpApiShutdownSignal {}
20
21pub fn start_http_api(
22 bind_address: SocketAddr,
23 routes: RoutingTable,
24) -> Sender<HttpApiShutdownSignal> {
25 let routes = Arc::new(routes);
26 let service = make_service_fn(move |socket: &AddrStream| {
27 let remote_address = socket.remote_addr();
28 let routes_clone = routes.clone();
29 async move {
30 Ok::<_, hyper::Error>(service_fn(move |request: Request<Body>| {
31 execute_request(
32 request,
33 remote_address,
34 routes_clone.clone(),
35 Uuid::new_v4().to_string(),
36 )
37 }))
38 }
39 });
40
41 let (sender, receiver) = channel();
42 let server = Server::bind(&bind_address)
43 .serve(service)
44 .with_graceful_shutdown(graceful_shutdown(receiver));
45
46 info!("Starting HTTP api on {}", bind_address);
47 tokio::spawn(async { server.await });
48
49 sender
50}
51
52async fn graceful_shutdown(shutdown_signal: Receiver<HttpApiShutdownSignal>) {
53 let _ = shutdown_signal.await;
54}
55
56#[instrument(
57 skip(request, client_address, routes),
58 fields(
59 http_method = %request.method(),
60 http_uri = %request.uri(),
61 client_ip = %client_address.ip(),
62 )
63)]
64async fn execute_request(
65 mut request: Request<Body>,
66 client_address: SocketAddr,
67 routes: Arc<RoutingTable>,
68 request_id: String,
69) -> Result<Response<Body>, hyper::Error> {
70 info!(
71 "Incoming HTTP request for {} {} from {}",
72 request.method(),
73 request.uri(),
74 client_address.ip()
75 );
76
77 let started_at = Instant::now();
78
79 let parts = request
80 .uri()
81 .path()
82 .split('/')
83 .filter(|x| x.trim() != "")
84 .collect::<Vec<_>>();
85
86 match routes.get_route(request.method(), &parts) {
87 Some(route) => {
88 let parameters = route.get_parameters(&parts);
89 match route
90 .handler
91 .execute(&mut request, parameters, request_id.clone())
92 .await
93 {
94 Ok(mut response) => {
95 let elapsed = started_at.elapsed();
96 info!(
97 duration = %elapsed.as_millis(),
98 "Request returning status code {} in {} ms", response.status(), elapsed.as_millis()
99 );
100
101 let headers = response.headers_mut();
102 headers.insert(
103 HeaderName::from_lowercase(b"x-request-id").unwrap(),
104 request_id.parse().unwrap(),
105 );
106
107 Ok(response)
108 }
109
110 Err(error) => {
111 let elapsed = started_at.elapsed();
112 error!(
113 duration = %elapsed.as_millis(),
114 "Request thrown error: {:?}", error
115 );
116
117 Err(error)
118 }
119 }
120 }
121
122 None => {
123 info!("No route found for this URL, returning 404");
124 let mut response = Response::new(Body::from("Invalid URL"));
125 *response.status_mut() = StatusCode::NOT_FOUND;
126
127 Ok(response)
128 }
129 }
130}