1#![allow(missing_docs)] use futures::future::ready;
4use futures::stream::StreamExt;
5use futures::stream::once;
6use http::HeaderValue;
7use http::StatusCode;
8use http::Uri;
9use http::header::HeaderName;
10use http::method::Method;
11use mime::APPLICATION_JSON;
12use multimap::MultiMap;
13use serde_json_bytes::ByteString;
14use serde_json_bytes::Map as JsonMap;
15use serde_json_bytes::Value;
16use static_assertions::assert_impl_all;
17use tower::BoxError;
18
19use crate::Context;
20use crate::context::CHUNK_CONTAINS_GRAPHQL_ERROR;
21use crate::context::CONTAINS_GRAPHQL_ERROR;
22use crate::error::Error;
23use crate::graphql;
24use crate::http_ext::TryIntoHeaderName;
25use crate::http_ext::TryIntoHeaderValue;
26use crate::http_ext::header_map;
27use crate::json_ext::Path;
28
29pub(crate) mod service;
30#[cfg(test)]
31mod tests;
32
33pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
34pub type BoxCloneService = tower::util::BoxCloneService<Request, Response, BoxError>;
35pub type ServiceResult = Result<Response, BoxError>;
36
37assert_impl_all!(Request: Send);
38#[non_exhaustive]
42pub struct Request {
43 pub supergraph_request: http::Request<graphql::Request>,
45
46 pub context: Context,
48}
49
50impl From<http::Request<graphql::Request>> for Request {
51 fn from(supergraph_request: http::Request<graphql::Request>) -> Self {
52 Self {
53 supergraph_request,
54 context: Context::new(),
55 }
56 }
57}
58
59impl std::fmt::Debug for Request {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("Request")
62 .field("context", &self.context)
64 .finish()
65 }
66}
67
68#[buildstructor::buildstructor]
69impl Request {
70 #[builder(visibility = "pub")]
74 fn new(
75 query: Option<String>,
76 operation_name: Option<String>,
77 variables: JsonMap<ByteString, Value>,
79 extensions: JsonMap<ByteString, Value>,
80 context: Context,
81 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
82 uri: Uri,
83 method: Method,
84 ) -> Result<Request, BoxError> {
85 let gql_request = graphql::Request::builder()
86 .and_query(query)
87 .and_operation_name(operation_name)
88 .variables(variables)
89 .extensions(extensions)
90 .build();
91 let mut supergraph_request = http::Request::builder()
92 .uri(uri)
93 .method(method)
94 .body(gql_request)?;
95 *supergraph_request.headers_mut() = header_map(headers)?;
96 Ok(Self {
97 supergraph_request,
98 context,
99 })
100 }
101
102 #[builder(visibility = "pub")]
110 fn fake_new(
111 query: Option<String>,
112 operation_name: Option<String>,
113 variables: JsonMap<ByteString, Value>,
115 extensions: JsonMap<ByteString, Value>,
116 context: Option<Context>,
117 mut headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
118 method: Option<Method>,
119 ) -> Result<Request, BoxError> {
120 headers
122 .entry(http::header::CONTENT_TYPE.into())
123 .or_insert(HeaderValue::from_static(APPLICATION_JSON.essence_str()).into());
124 let context = context.unwrap_or_default();
125
126 Request::new(
127 query,
128 operation_name,
129 variables,
130 extensions,
131 context,
132 headers,
133 Uri::from_static("http://default"),
134 method.unwrap_or(Method::POST),
135 )
136 }
137
138 #[builder(visibility = "pub")]
140 fn canned_new(
141 query: Option<String>,
142 operation_name: Option<String>,
143 extensions: JsonMap<ByteString, Value>,
145 context: Option<Context>,
146 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
147 ) -> Result<Request, BoxError> {
148 let default_query = "
149 query TopProducts($first: Int) {
150 topProducts(first: $first) {
151 upc
152 name
153 reviews {
154 id
155 product { name }
156 author { id name }
157 }
158 }
159 }
160 ";
161 let query = query.unwrap_or(default_query.to_string());
162 let mut variables = JsonMap::new();
163 variables.insert("first", 2_usize.into());
164 Self::fake_new(
165 Some(query),
166 operation_name,
167 variables,
168 extensions,
169 context,
170 headers,
171 None,
172 )
173 }
174}
175
176assert_impl_all!(Response: Send);
177#[non_exhaustive]
178pub struct Response {
179 pub response: http::Response<graphql::ResponseStream>,
180 pub context: Context,
181}
182
183impl std::fmt::Debug for Response {
184 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185 f.debug_struct("Response")
186 .field("context", &self.context)
187 .finish()
188 }
189}
190
191#[buildstructor::buildstructor]
192impl Response {
193 #[builder(visibility = "pub")]
197 fn new(
198 label: Option<String>,
199 data: Option<Value>,
200 path: Option<Path>,
201 errors: Vec<Error>,
202 extensions: JsonMap<ByteString, Value>,
204 status_code: Option<StatusCode>,
205 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
206 context: Context,
207 ) -> Result<Self, BoxError> {
208 let has_errors = !errors.is_empty();
209 if has_errors {
210 context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
211 }
212 context.insert_json_value(CHUNK_CONTAINS_GRAPHQL_ERROR, Value::Bool(has_errors));
213 let b = graphql::Response::builder()
215 .and_label(label)
216 .and_path(path)
217 .errors(errors)
218 .extensions(extensions);
219 let res = match data {
220 Some(data) => b.data(data).build(),
221 None => b.build(),
222 };
223
224 let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
226 for (key, values) in headers {
227 let header_name: HeaderName = key.try_into()?;
228 for value in values {
229 let header_value: HeaderValue = value.try_into()?;
230 builder = builder.header(header_name.clone(), header_value);
231 }
232 }
233
234 let response = builder.body(once(ready(res)).boxed())?;
235
236 Ok(Self { response, context })
237 }
238
239 #[builder(visibility = "pub")]
247 fn fake_new(
248 label: Option<String>,
249 data: Option<Value>,
250 path: Option<Path>,
251 errors: Vec<Error>,
252 extensions: JsonMap<ByteString, Value>,
254 status_code: Option<StatusCode>,
255 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
256 context: Option<Context>,
257 ) -> Result<Self, BoxError> {
258 Response::new(
259 label,
260 data,
261 path,
262 errors,
263 extensions,
264 status_code,
265 headers,
266 context.unwrap_or_default(),
267 )
268 }
269
270 #[builder(visibility = "pub")]
278 fn fake_stream_new(
279 responses: Vec<graphql::Response>,
280 status_code: Option<StatusCode>,
281 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
282 context: Context,
283 ) -> Result<Self, BoxError> {
284 let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
285 for (key, values) in headers {
286 let header_name: HeaderName = key.try_into()?;
287 for value in values {
288 let header_value: HeaderValue = value.try_into()?;
289 builder = builder.header(header_name.clone(), header_value);
290 }
291 }
292
293 let stream = futures::stream::iter(responses);
294 let response = builder.body(stream.boxed())?;
295 Ok(Self { response, context })
296 }
297
298 #[builder(visibility = "pub")]
302 fn error_new(
303 errors: Vec<Error>,
304 status_code: Option<StatusCode>,
305 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
306 context: Context,
307 ) -> Result<Self, BoxError> {
308 Response::new(
309 Default::default(),
310 Default::default(),
311 None,
312 errors,
313 Default::default(),
314 status_code,
315 headers,
316 context,
317 )
318 }
319
320 #[builder(visibility = "pub(crate)")]
324 fn infallible_new(
325 label: Option<String>,
326 data: Option<Value>,
327 path: Option<Path>,
328 errors: Vec<Error>,
329 extensions: JsonMap<ByteString, Value>,
331 status_code: Option<StatusCode>,
332 headers: MultiMap<HeaderName, HeaderValue>,
333 context: Context,
334 ) -> Self {
335 let has_errors = !errors.is_empty();
336 if has_errors {
337 context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
338 }
339 context.insert_json_value(CHUNK_CONTAINS_GRAPHQL_ERROR, Value::Bool(has_errors));
340 let b = graphql::Response::builder()
342 .and_label(label)
343 .and_path(path)
344 .errors(errors)
345 .extensions(extensions);
346 let res = match data {
347 Some(data) => b.data(data).build(),
348 None => b.build(),
349 };
350
351 let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
353 for (header_name, values) in headers {
354 for header_value in values {
355 builder = builder.header(header_name.clone(), header_value);
356 }
357 }
358
359 let response = builder.body(once(ready(res)).boxed()).expect("can't fail");
360
361 Self { response, context }
362 }
363
364 pub(crate) fn new_from_graphql_response(response: graphql::Response, context: Context) -> Self {
365 let has_errors = !response.errors.is_empty();
366 if has_errors {
367 context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
368 }
369 context.insert_json_value(CHUNK_CONTAINS_GRAPHQL_ERROR, Value::Bool(has_errors));
370
371 Self {
372 response: http::Response::new(once(ready(response)).boxed()),
373 context,
374 }
375 }
376}
377
378impl Response {
379 pub async fn next_response(&mut self) -> Option<graphql::Response> {
380 self.response.body_mut().next().await
381 }
382
383 pub(crate) fn new_from_response(
384 response: http::Response<graphql::ResponseStream>,
385 context: Context,
386 ) -> Self {
387 Self { context, response }.check_for_errors()
388 }
389
390 pub fn map<F>(self, f: F) -> Response
391 where
392 F: FnOnce(graphql::ResponseStream) -> graphql::ResponseStream,
393 {
394 Response {
395 context: self.context,
396 response: self.response.map(f),
397 }
398 }
399
400 pub fn map_stream<F>(self, f: F) -> Self
438 where
439 F: 'static + Send + FnMut(graphql::Response) -> graphql::Response,
440 {
441 self.map(move |stream| stream.map(f).boxed())
442 }
443
444 fn check_for_errors(self) -> Self {
445 let context = self.context.clone();
446 self.map_stream(move |response| {
447 let has_errors = response.contains_errors();
448 if has_errors {
449 context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
450 }
451 context.insert_json_value(CHUNK_CONTAINS_GRAPHQL_ERROR, Value::Bool(has_errors));
452 response
453 })
454 }
455}
456
457#[cfg(test)]
458mod test {
459 use http::HeaderValue;
460 use http::Method;
461 use http::Uri;
462 use serde_json::json;
463
464 use super::*;
465 use crate::graphql;
466
467 #[test]
468 fn supergraph_request_builder() {
469 let request = Request::builder()
470 .header("a", "b")
471 .header("a", "c")
472 .uri(Uri::from_static("http://example.com"))
473 .method(Method::POST)
474 .query("query { topProducts }")
475 .operation_name("Default")
476 .context(Context::new())
477 .extension("foo", json!({}))
479 .variable("bar", json!({}))
481 .build()
482 .unwrap();
483 assert_eq!(
484 request
485 .supergraph_request
486 .headers()
487 .get_all("a")
488 .into_iter()
489 .collect::<Vec<_>>(),
490 vec![HeaderValue::from_static("b"), HeaderValue::from_static("c")]
491 );
492 assert_eq!(
493 request.supergraph_request.uri(),
494 &Uri::from_static("http://example.com")
495 );
496 assert_eq!(
497 request.supergraph_request.body().extensions.get("foo"),
498 Some(&json!({}).into())
499 );
500 assert_eq!(
501 request.supergraph_request.body().variables.get("bar"),
502 Some(&json!({}).into())
503 );
504 assert_eq!(request.supergraph_request.method(), Method::POST);
505
506 let extensions = serde_json_bytes::Value::from(json!({"foo":{}}))
507 .as_object()
508 .unwrap()
509 .clone();
510
511 let variables = serde_json_bytes::Value::from(json!({"bar":{}}))
512 .as_object()
513 .unwrap()
514 .clone();
515 assert_eq!(
516 request.supergraph_request.body(),
517 &graphql::Request::builder()
518 .variables(variables)
519 .extensions(extensions)
520 .operation_name("Default")
521 .query("query { topProducts }")
522 .build()
523 );
524 }
525
526 #[tokio::test]
527 async fn supergraph_response_builder() {
528 let mut response = Response::builder()
529 .header("a", "b")
530 .header("a", "c")
531 .context(Context::new())
532 .extension("foo", json!({}))
533 .data(json!({}))
534 .build()
535 .unwrap();
536
537 assert_eq!(
538 response
539 .response
540 .headers()
541 .get_all("a")
542 .into_iter()
543 .collect::<Vec<_>>(),
544 vec![HeaderValue::from_static("b"), HeaderValue::from_static("c")]
545 );
546 let extensions = serde_json_bytes::Value::from(json!({"foo":{}}))
547 .as_object()
548 .unwrap()
549 .clone();
550 assert_eq!(
551 response.next_response().await.unwrap(),
552 graphql::Response::builder()
553 .extensions(extensions)
554 .data(json!({}))
555 .build()
556 );
557 }
558}