1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//! Handles interfacing with mmids via an http based interface.  Routes are defined by consumers,
//! which define the code that should execute when that route gets hit.

pub mod handlers;
pub mod routing;

use crate::http_api::routing::RoutingTable;
use hyper::header::HeaderName;
use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server, StatusCode};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::oneshot::{channel, Receiver, Sender};
use tracing::{error, info, instrument};
use uuid::Uuid;

pub struct HttpApiShutdownSignal {}

pub fn start_http_api(
    bind_address: SocketAddr,
    routes: RoutingTable,
) -> Sender<HttpApiShutdownSignal> {
    let routes = Arc::new(routes);
    let service = make_service_fn(move |socket: &AddrStream| {
        let remote_address = socket.remote_addr();
        let routes_clone = routes.clone();
        async move {
            Ok::<_, hyper::Error>(service_fn(move |request: Request<Body>| {
                execute_request(
                    request,
                    remote_address,
                    routes_clone.clone(),
                    Uuid::new_v4().to_string(),
                )
            }))
        }
    });

    let (sender, receiver) = channel();
    let server = Server::bind(&bind_address)
        .serve(service)
        .with_graceful_shutdown(graceful_shutdown(receiver));

    info!("Starting HTTP api on {}", bind_address);
    tokio::spawn(async { server.await });

    sender
}

async fn graceful_shutdown(shutdown_signal: Receiver<HttpApiShutdownSignal>) {
    let _ = shutdown_signal.await;
}

#[instrument(
    skip(request, client_address, routes),
    fields(
        http_method = %request.method(),
        http_uri = %request.uri(),
        client_ip = %client_address.ip(),
    )
)]
async fn execute_request(
    mut request: Request<Body>,
    client_address: SocketAddr,
    routes: Arc<RoutingTable>,
    request_id: String,
) -> Result<Response<Body>, hyper::Error> {
    info!(
        "Incoming HTTP request for {} {} from {}",
        request.method(),
        request.uri(),
        client_address.ip()
    );

    let started_at = Instant::now();

    let parts = request
        .uri()
        .path()
        .split('/')
        .filter(|x| x.trim() != "")
        .collect::<Vec<_>>();

    match routes.get_route(request.method(), &parts) {
        Some(route) => {
            let parameters = route.get_parameters(&parts);
            match route
                .handler
                .execute(&mut request, parameters, request_id.clone())
                .await
            {
                Ok(mut response) => {
                    let elapsed = started_at.elapsed();
                    info!(
                        duration = %elapsed.as_millis(),
                        "Request returning status code {} in {} ms", response.status(), elapsed.as_millis()
                    );

                    let headers = response.headers_mut();
                    headers.insert(
                        HeaderName::from_lowercase(b"x-request-id").unwrap(),
                        request_id.parse().unwrap(),
                    );

                    Ok(response)
                }

                Err(error) => {
                    let elapsed = started_at.elapsed();
                    error!(
                        duration = %elapsed.as_millis(),
                        "Request thrown error: {:?}", error
                    );

                    Err(error)
                }
            }
        }

        None => {
            info!("No route found for this URL, returning 404");
            let mut response = Response::new(Body::from("Invalid URL"));
            *response.status_mut() = StatusCode::NOT_FOUND;

            Ok(response)
        }
    }
}