async_graphql_warp/
batch_request.rs1use std::{io, str::FromStr};
2
3use async_graphql::{BatchRequest, Executor, http::MultipartOptions};
4use futures_util::TryStreamExt;
5use warp::{
6 Buf, Filter, Rejection, Reply,
7 http::{HeaderName, HeaderValue},
8 reply::Response as WarpResponse,
9};
10
11use crate::GraphQLBadRequest;
12
13pub fn graphql_batch<E>(
18 executor: E,
19) -> impl Filter<Extract = ((E, BatchRequest),), Error = Rejection> + Clone
20where
21 E: Executor,
22{
23 graphql_batch_opts(executor, Default::default())
24}
25
26pub fn graphql_batch_opts<E>(
29 executor: E,
30 opts: MultipartOptions,
31) -> impl Filter<Extract = ((E, BatchRequest),), Error = Rejection> + Clone
32where
33 E: Executor,
34{
35 warp::any()
36 .and(warp::get().and(warp::filters::query::raw()).and_then(
37 |query_string: String| async move {
38 async_graphql::http::parse_query_string(&query_string)
39 .map(Into::into)
40 .map_err(|e| warp::reject::custom(GraphQLBadRequest(e)))
41 },
42 ))
43 .or(warp::post()
44 .and(warp::header::optional::<String>("content-type"))
45 .and(warp::body::stream())
46 .and_then(move |content_type, body| async move {
47 async_graphql::http::receive_batch_body(
48 content_type,
49 TryStreamExt::map_err(body, io::Error::other)
50 .map_ok(|mut buf| {
51 let remaining = Buf::remaining(&buf);
52 Buf::copy_to_bytes(&mut buf, remaining)
53 })
54 .into_async_read(),
55 opts,
56 )
57 .await
58 .map_err(|e| warp::reject::custom(GraphQLBadRequest(e)))
59 }))
60 .unify()
61 .map(move |res| (executor.clone(), res))
62}
63
64#[derive(Debug)]
66pub struct GraphQLBatchResponse(pub async_graphql::BatchResponse);
67
68impl From<async_graphql::BatchResponse> for GraphQLBatchResponse {
69 fn from(resp: async_graphql::BatchResponse) -> Self {
70 GraphQLBatchResponse(resp)
71 }
72}
73
74impl Reply for GraphQLBatchResponse {
75 fn into_response(self) -> WarpResponse {
76 let mut resp = warp::reply::with_header(
77 warp::reply::json(&self.0),
78 "content-type",
79 "application/json",
80 )
81 .into_response();
82
83 if self.0.is_ok() {
84 if let Some(cache_control) = self.0.cache_control().value() {
85 if let Ok(value) = cache_control.try_into() {
86 resp.headers_mut().insert("cache-control", value);
87 }
88 }
89 }
90
91 resp.headers_mut()
92 .extend(self.0.http_headers().iter().filter_map(|(name, value)| {
93 HeaderName::from_str(name.as_str())
94 .ok()
95 .zip(HeaderValue::from_bytes(value.as_bytes()).ok())
96 }));
97 resp
98 }
99}