async_graphql_warp/
batch_request.rs

1use std::{io, str::FromStr};
2
3use async_graphql::{BatchRequest, Executor, http::MultipartOptions};
4use futures_util::TryStreamExt;
5use warp::{
6    Buf, Filter, Rejection, Reply,
7    http::{HeaderName, HeaderValue},
8    reply::Response as WarpResponse,
9};
10
11use crate::GraphQLBadRequest;
12
13/// GraphQL batch request filter
14///
15/// It outputs a tuple containing the `async_graphql::Executor` and
16/// `async_graphql::BatchRequest`.
17pub fn graphql_batch<E>(
18    executor: E,
19) -> impl Filter<Extract = ((E, BatchRequest),), Error = Rejection> + Clone
20where
21    E: Executor,
22{
23    graphql_batch_opts(executor, Default::default())
24}
25
26/// Similar to graphql_batch, but you can set the options with
27/// :`async_graphql::MultipartOptions`.
28pub fn graphql_batch_opts<E>(
29    executor: E,
30    opts: MultipartOptions,
31) -> impl Filter<Extract = ((E, BatchRequest),), Error = Rejection> + Clone
32where
33    E: Executor,
34{
35    warp::any()
36        .and(warp::get().and(warp::filters::query::raw()).and_then(
37            |query_string: String| async move {
38                async_graphql::http::parse_query_string(&query_string)
39                    .map(Into::into)
40                    .map_err(|e| warp::reject::custom(GraphQLBadRequest(e)))
41            },
42        ))
43        .or(warp::post()
44            .and(warp::header::optional::<String>("content-type"))
45            .and(warp::body::stream())
46            .and_then(move |content_type, body| async move {
47                async_graphql::http::receive_batch_body(
48                    content_type,
49                    TryStreamExt::map_err(body, io::Error::other)
50                        .map_ok(|mut buf| {
51                            let remaining = Buf::remaining(&buf);
52                            Buf::copy_to_bytes(&mut buf, remaining)
53                        })
54                        .into_async_read(),
55                    opts,
56                )
57                .await
58                .map_err(|e| warp::reject::custom(GraphQLBadRequest(e)))
59            }))
60        .unify()
61        .map(move |res| (executor.clone(), res))
62}
63
64/// Reply for `async_graphql::BatchRequest`.
65#[derive(Debug)]
66pub struct GraphQLBatchResponse(pub async_graphql::BatchResponse);
67
68impl From<async_graphql::BatchResponse> for GraphQLBatchResponse {
69    fn from(resp: async_graphql::BatchResponse) -> Self {
70        GraphQLBatchResponse(resp)
71    }
72}
73
74impl Reply for GraphQLBatchResponse {
75    fn into_response(self) -> WarpResponse {
76        let mut resp = warp::reply::with_header(
77            warp::reply::json(&self.0),
78            "content-type",
79            "application/json",
80        )
81        .into_response();
82
83        if self.0.is_ok() {
84            if let Some(cache_control) = self.0.cache_control().value() {
85                if let Ok(value) = cache_control.try_into() {
86                    resp.headers_mut().insert("cache-control", value);
87                }
88            }
89        }
90
91        resp.headers_mut()
92            .extend(self.0.http_headers().iter().filter_map(|(name, value)| {
93                HeaderName::from_str(name.as_str())
94                    .ok()
95                    .zip(HeaderValue::from_bytes(value.as_bytes()).ok())
96            }));
97        resp
98    }
99}