1#![allow(missing_docs)] use futures::future::ready;
4use futures::stream::StreamExt;
5use futures::stream::once;
6use http::HeaderValue;
7use http::StatusCode;
8use http::Uri;
9use http::header::HeaderName;
10use http::method::Method;
11use mime::APPLICATION_JSON;
12use multimap::MultiMap;
13use serde_json_bytes::ByteString;
14use serde_json_bytes::Map as JsonMap;
15use serde_json_bytes::Value;
16use static_assertions::assert_impl_all;
17use tower::BoxError;
18
19use crate::Context;
20use crate::context::CONTAINS_GRAPHQL_ERROR;
21use crate::error::Error;
22use crate::graphql;
23use crate::http_ext::TryIntoHeaderName;
24use crate::http_ext::TryIntoHeaderValue;
25use crate::http_ext::header_map;
26use crate::json_ext::Path;
27
28pub(crate) mod service;
29#[cfg(test)]
30mod tests;
31
32pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
33pub type BoxCloneService = tower::util::BoxCloneService<Request, Response, BoxError>;
34pub type ServiceResult = Result<Response, BoxError>;
35
36assert_impl_all!(Request: Send);
37#[non_exhaustive]
41pub struct Request {
42 pub supergraph_request: http::Request<graphql::Request>,
44
45 pub context: Context,
47}
48
49impl From<http::Request<graphql::Request>> for Request {
50 fn from(supergraph_request: http::Request<graphql::Request>) -> Self {
51 Self {
52 supergraph_request,
53 context: Context::new(),
54 }
55 }
56}
57
58impl std::fmt::Debug for Request {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 f.debug_struct("Request")
61 .field("context", &self.context)
63 .finish()
64 }
65}
66
67#[buildstructor::buildstructor]
68impl Request {
69 #[builder(visibility = "pub")]
73 fn new(
74 query: Option<String>,
75 operation_name: Option<String>,
76 variables: JsonMap<ByteString, Value>,
78 extensions: JsonMap<ByteString, Value>,
79 context: Context,
80 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
81 uri: Uri,
82 method: Method,
83 ) -> Result<Request, BoxError> {
84 let gql_request = graphql::Request::builder()
85 .and_query(query)
86 .and_operation_name(operation_name)
87 .variables(variables)
88 .extensions(extensions)
89 .build();
90 let mut supergraph_request = http::Request::builder()
91 .uri(uri)
92 .method(method)
93 .body(gql_request)?;
94 *supergraph_request.headers_mut() = header_map(headers)?;
95 Ok(Self {
96 supergraph_request,
97 context,
98 })
99 }
100
101 #[builder(visibility = "pub")]
109 fn fake_new(
110 query: Option<String>,
111 operation_name: Option<String>,
112 variables: JsonMap<ByteString, Value>,
114 extensions: JsonMap<ByteString, Value>,
115 context: Option<Context>,
116 mut headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
117 method: Option<Method>,
118 ) -> Result<Request, BoxError> {
119 headers
121 .entry(http::header::CONTENT_TYPE.into())
122 .or_insert(HeaderValue::from_static(APPLICATION_JSON.essence_str()).into());
123 let context = context.unwrap_or_default();
124
125 Request::new(
126 query,
127 operation_name,
128 variables,
129 extensions,
130 context,
131 headers,
132 Uri::from_static("http://default"),
133 method.unwrap_or(Method::POST),
134 )
135 }
136
137 #[builder(visibility = "pub")]
139 fn canned_new(
140 query: Option<String>,
141 operation_name: Option<String>,
142 extensions: JsonMap<ByteString, Value>,
144 context: Option<Context>,
145 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
146 ) -> Result<Request, BoxError> {
147 let default_query = "
148 query TopProducts($first: Int) {
149 topProducts(first: $first) {
150 upc
151 name
152 reviews {
153 id
154 product { name }
155 author { id name }
156 }
157 }
158 }
159 ";
160 let query = query.unwrap_or(default_query.to_string());
161 let mut variables = JsonMap::new();
162 variables.insert("first", 2_usize.into());
163 Self::fake_new(
164 Some(query),
165 operation_name,
166 variables,
167 extensions,
168 context,
169 headers,
170 None,
171 )
172 }
173}
174
175assert_impl_all!(Response: Send);
176#[non_exhaustive]
177pub struct Response {
178 pub response: http::Response<graphql::ResponseStream>,
179 pub context: Context,
180}
181
182impl std::fmt::Debug for Response {
183 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184 f.debug_struct("Response")
185 .field("context", &self.context)
186 .finish()
187 }
188}
189
190#[buildstructor::buildstructor]
191impl Response {
192 #[builder(visibility = "pub")]
196 fn new(
197 label: Option<String>,
198 data: Option<Value>,
199 path: Option<Path>,
200 errors: Vec<Error>,
201 extensions: JsonMap<ByteString, Value>,
203 status_code: Option<StatusCode>,
204 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
205 context: Context,
206 ) -> Result<Self, BoxError> {
207 if !errors.is_empty() {
208 context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
209 }
210 let b = graphql::Response::builder()
212 .and_label(label)
213 .and_path(path)
214 .errors(errors)
215 .extensions(extensions);
216 let res = match data {
217 Some(data) => b.data(data).build(),
218 None => b.build(),
219 };
220
221 let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
223 for (key, values) in headers {
224 let header_name: HeaderName = key.try_into()?;
225 for value in values {
226 let header_value: HeaderValue = value.try_into()?;
227 builder = builder.header(header_name.clone(), header_value);
228 }
229 }
230
231 let response = builder.body(once(ready(res)).boxed())?;
232
233 Ok(Self { response, context })
234 }
235
236 #[builder(visibility = "pub")]
244 fn fake_new(
245 label: Option<String>,
246 data: Option<Value>,
247 path: Option<Path>,
248 errors: Vec<Error>,
249 extensions: JsonMap<ByteString, Value>,
251 status_code: Option<StatusCode>,
252 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
253 context: Option<Context>,
254 ) -> Result<Self, BoxError> {
255 Response::new(
256 label,
257 data,
258 path,
259 errors,
260 extensions,
261 status_code,
262 headers,
263 context.unwrap_or_default(),
264 )
265 }
266
267 #[builder(visibility = "pub")]
275 fn fake_stream_new(
276 responses: Vec<graphql::Response>,
277 status_code: Option<StatusCode>,
278 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
279 context: Context,
280 ) -> Result<Self, BoxError> {
281 let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
282 for (key, values) in headers {
283 let header_name: HeaderName = key.try_into()?;
284 for value in values {
285 let header_value: HeaderValue = value.try_into()?;
286 builder = builder.header(header_name.clone(), header_value);
287 }
288 }
289
290 let stream = futures::stream::iter(responses);
291 let response = builder.body(stream.boxed())?;
292 Ok(Self { response, context })
293 }
294
295 #[builder(visibility = "pub")]
299 fn error_new(
300 errors: Vec<Error>,
301 status_code: Option<StatusCode>,
302 headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
303 context: Context,
304 ) -> Result<Self, BoxError> {
305 Response::new(
306 Default::default(),
307 Default::default(),
308 None,
309 errors,
310 Default::default(),
311 status_code,
312 headers,
313 context,
314 )
315 }
316
317 #[builder(visibility = "pub(crate)")]
321 fn infallible_new(
322 label: Option<String>,
323 data: Option<Value>,
324 path: Option<Path>,
325 errors: Vec<Error>,
326 extensions: JsonMap<ByteString, Value>,
328 status_code: Option<StatusCode>,
329 headers: MultiMap<HeaderName, HeaderValue>,
330 context: Context,
331 ) -> Self {
332 if !errors.is_empty() {
333 context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
334 }
335 let b = graphql::Response::builder()
337 .and_label(label)
338 .and_path(path)
339 .errors(errors)
340 .extensions(extensions);
341 let res = match data {
342 Some(data) => b.data(data).build(),
343 None => b.build(),
344 };
345
346 let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
348 for (header_name, values) in headers {
349 for header_value in values {
350 builder = builder.header(header_name.clone(), header_value);
351 }
352 }
353
354 let response = builder.body(once(ready(res)).boxed()).expect("can't fail");
355
356 Self { response, context }
357 }
358
359 pub(crate) fn new_from_graphql_response(response: graphql::Response, context: Context) -> Self {
360 if !response.errors.is_empty() {
361 context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
362 }
363
364 Self {
365 response: http::Response::new(once(ready(response)).boxed()),
366 context,
367 }
368 }
369}
370
371impl Response {
372 pub async fn next_response(&mut self) -> Option<graphql::Response> {
373 self.response.body_mut().next().await
374 }
375
376 pub(crate) fn new_from_response(
377 response: http::Response<graphql::ResponseStream>,
378 context: Context,
379 ) -> Self {
380 Self { context, response }.check_for_errors()
381 }
382
383 pub fn map<F>(self, f: F) -> Response
384 where
385 F: FnOnce(graphql::ResponseStream) -> graphql::ResponseStream,
386 {
387 Response {
388 context: self.context,
389 response: self.response.map(f),
390 }
391 }
392
393 pub fn map_stream<F>(self, f: F) -> Self
431 where
432 F: 'static + Send + FnMut(graphql::Response) -> graphql::Response,
433 {
434 self.map(move |stream| stream.map(f).boxed())
435 }
436
437 fn check_for_errors(self) -> Self {
438 let context = self.context.clone();
439 self.map_stream(move |response| {
440 if !response.errors.is_empty() {
441 context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
442 }
443 response
444 })
445 }
446}
447
448#[cfg(test)]
449mod test {
450 use http::HeaderValue;
451 use http::Method;
452 use http::Uri;
453 use serde_json::json;
454
455 use super::*;
456 use crate::graphql;
457
458 #[test]
459 fn supergraph_request_builder() {
460 let request = Request::builder()
461 .header("a", "b")
462 .header("a", "c")
463 .uri(Uri::from_static("http://example.com"))
464 .method(Method::POST)
465 .query("query { topProducts }")
466 .operation_name("Default")
467 .context(Context::new())
468 .extension("foo", json!({}))
470 .variable("bar", json!({}))
472 .build()
473 .unwrap();
474 assert_eq!(
475 request
476 .supergraph_request
477 .headers()
478 .get_all("a")
479 .into_iter()
480 .collect::<Vec<_>>(),
481 vec![HeaderValue::from_static("b"), HeaderValue::from_static("c")]
482 );
483 assert_eq!(
484 request.supergraph_request.uri(),
485 &Uri::from_static("http://example.com")
486 );
487 assert_eq!(
488 request.supergraph_request.body().extensions.get("foo"),
489 Some(&json!({}).into())
490 );
491 assert_eq!(
492 request.supergraph_request.body().variables.get("bar"),
493 Some(&json!({}).into())
494 );
495 assert_eq!(request.supergraph_request.method(), Method::POST);
496
497 let extensions = serde_json_bytes::Value::from(json!({"foo":{}}))
498 .as_object()
499 .unwrap()
500 .clone();
501
502 let variables = serde_json_bytes::Value::from(json!({"bar":{}}))
503 .as_object()
504 .unwrap()
505 .clone();
506 assert_eq!(
507 request.supergraph_request.body(),
508 &graphql::Request::builder()
509 .variables(variables)
510 .extensions(extensions)
511 .operation_name("Default")
512 .query("query { topProducts }")
513 .build()
514 );
515 }
516
517 #[tokio::test]
518 async fn supergraph_response_builder() {
519 let mut response = Response::builder()
520 .header("a", "b")
521 .header("a", "c")
522 .context(Context::new())
523 .extension("foo", json!({}))
524 .data(json!({}))
525 .build()
526 .unwrap();
527
528 assert_eq!(
529 response
530 .response
531 .headers()
532 .get_all("a")
533 .into_iter()
534 .collect::<Vec<_>>(),
535 vec![HeaderValue::from_static("b"), HeaderValue::from_static("c")]
536 );
537 let extensions = serde_json_bytes::Value::from(json!({"foo":{}}))
538 .as_object()
539 .unwrap()
540 .clone();
541 assert_eq!(
542 response.next_response().await.unwrap(),
543 graphql::Response::builder()
544 .extensions(extensions)
545 .data(json!({}))
546 .build()
547 );
548 }
549}