subgraph_mock/handle/
mod.rs

1use crate::state::State;
2use http_body_util::{BodyExt, Full, combinators::BoxBody};
3use hyper::{
4    Method, Request, Response, StatusCode,
5    body::{Body, Bytes},
6};
7use std::{error::Error, sync::Arc};
8use tokio::time::{Instant, sleep};
9use tracing::{trace, warn};
10
11pub mod graphql;
12
13pub type ByteResponse = Response<BoxBody<Bytes, hyper::Error>>;
14
15/// Top level handler function that is called for every incoming request from Hyper.
16pub async fn handle_request<B>(req: Request<B>, state: Arc<State>) -> anyhow::Result<ByteResponse>
17where
18    B: Body,
19    B::Error: Error + Send + Sync + 'static,
20{
21    let (parts, body) = req.into_parts();
22    let (method, path) = (parts.method, parts.uri.path());
23    let body_bytes = body.collect().await?.to_bytes().to_vec();
24
25    let config = state.config.read().await;
26
27    let (res, generator_override) = match (&method, path) {
28        // matches routes in the form of `/{subgraph_name}`
29        // all further path elements will be ignored for the sake of not spending too much
30        // compute time on this condition
31        (&Method::POST, route) if route.len() > 1 && route.starts_with('/') => {
32            let subgraph_name = route
33                .split('/')
34                .nth(1)
35                .expect("split will yield at least 2 elements based on the match condition");
36
37            (
38                graphql::handle(body_bytes, Some(subgraph_name), state.clone()).await,
39                config
40                    .subgraph_overrides
41                    .latency_generator
42                    .get(subgraph_name),
43            )
44        }
45        (&Method::POST, "/") => (graphql::handle(body_bytes, None, state.clone()).await, None),
46
47        // default to 404
48        (method, path) => {
49            warn!(%method, %path, "received unexpected request");
50            let mut resp = Response::new(
51                Full::new("Not found\n".into())
52                    .map_err(|never| match never {})
53                    .boxed(),
54            );
55            *resp.status_mut() = StatusCode::NOT_FOUND;
56
57            (Ok(resp), None)
58        }
59    };
60
61    // Skip latency injection when we have a non-2xx response
62    if res.is_ok() {
63        let latency = generator_override
64            .unwrap_or_else(|| &config.latency_generator)
65            .generate(Instant::now());
66        trace!(latency_ms = latency.as_millis(), "injecting latency");
67        sleep(latency).await;
68    }
69
70    res
71}