apollo_router/services/
supergraph.rs

1#![allow(missing_docs)] // FIXME
2
3use futures::future::ready;
4use futures::stream::StreamExt;
5use futures::stream::once;
6use http::HeaderValue;
7use http::StatusCode;
8use http::Uri;
9use http::header::HeaderName;
10use http::method::Method;
11use mime::APPLICATION_JSON;
12use multimap::MultiMap;
13use serde_json_bytes::ByteString;
14use serde_json_bytes::Map as JsonMap;
15use serde_json_bytes::Value;
16use static_assertions::assert_impl_all;
17use tower::BoxError;
18
19use crate::Context;
20use crate::context::CONTAINS_GRAPHQL_ERROR;
21use crate::error::Error;
22use crate::graphql;
23use crate::http_ext::TryIntoHeaderName;
24use crate::http_ext::TryIntoHeaderValue;
25use crate::http_ext::header_map;
26use crate::json_ext::Path;
27
28pub(crate) mod service;
29#[cfg(test)]
30mod tests;
31
32pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
33pub type BoxCloneService = tower::util::BoxCloneService<Request, Response, BoxError>;
34pub type ServiceResult = Result<Response, BoxError>;
35
36assert_impl_all!(Request: Send);
37/// Represents the router processing step of the processing pipeline.
38///
39/// This consists of the parsed graphql Request, HTTP headers and contextual data for extensions.
40#[non_exhaustive]
41pub struct Request {
42    /// Original request to the Router.
43    pub supergraph_request: http::Request<graphql::Request>,
44
45    /// Context for extension
46    pub context: Context,
47}
48
49impl From<http::Request<graphql::Request>> for Request {
50    fn from(supergraph_request: http::Request<graphql::Request>) -> Self {
51        Self {
52            supergraph_request,
53            context: Context::new(),
54        }
55    }
56}
57
58impl std::fmt::Debug for Request {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("Request")
61            // .field("supergraph_request", &self.supergraph_request)
62            .field("context", &self.context)
63            .finish()
64    }
65}
66
67#[buildstructor::buildstructor]
68impl Request {
69    /// This is the constructor (or builder) to use when constructing a real Request.
70    ///
71    /// Required parameters are required in non-testing code to create a Request.
72    #[builder(visibility = "pub")]
73    fn new(
74        query: Option<String>,
75        operation_name: Option<String>,
76        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
77        variables: JsonMap<ByteString, Value>,
78        extensions: JsonMap<ByteString, Value>,
79        context: Context,
80        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
81        uri: Uri,
82        method: Method,
83    ) -> Result<Request, BoxError> {
84        let gql_request = graphql::Request::builder()
85            .and_query(query)
86            .and_operation_name(operation_name)
87            .variables(variables)
88            .extensions(extensions)
89            .build();
90        let mut supergraph_request = http::Request::builder()
91            .uri(uri)
92            .method(method)
93            .body(gql_request)?;
94        *supergraph_request.headers_mut() = header_map(headers)?;
95        Ok(Self {
96            supergraph_request,
97            context,
98        })
99    }
100
101    /// This is the constructor (or builder) to use when constructing a "fake" Request.
102    ///
103    /// This does not enforce the provision of the data that is required for a fully functional
104    /// Request. It's usually enough for testing, when a fully constructed Request is
105    /// difficult to construct and not required for the purposes of the test.
106    ///
107    /// In addition, fake requests are expected to be valid, and will panic if given invalid values.
108    #[builder(visibility = "pub")]
109    fn fake_new(
110        query: Option<String>,
111        operation_name: Option<String>,
112        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
113        variables: JsonMap<ByteString, Value>,
114        extensions: JsonMap<ByteString, Value>,
115        context: Option<Context>,
116        mut headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
117        method: Option<Method>,
118    ) -> Result<Request, BoxError> {
119        // Avoid testing requests getting blocked by the CSRF-prevention plugin
120        headers
121            .entry(http::header::CONTENT_TYPE.into())
122            .or_insert(HeaderValue::from_static(APPLICATION_JSON.essence_str()).into());
123        let context = context.unwrap_or_default();
124
125        Request::new(
126            query,
127            operation_name,
128            variables,
129            extensions,
130            context,
131            headers,
132            Uri::from_static("http://default"),
133            method.unwrap_or(Method::POST),
134        )
135    }
136
137    /// Create a request with an example query, for tests
138    #[builder(visibility = "pub")]
139    fn canned_new(
140        query: Option<String>,
141        operation_name: Option<String>,
142        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
143        extensions: JsonMap<ByteString, Value>,
144        context: Option<Context>,
145        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
146    ) -> Result<Request, BoxError> {
147        let default_query = "
148            query TopProducts($first: Int) {
149                topProducts(first: $first) {
150                    upc
151                    name
152                    reviews {
153                        id
154                        product { name }
155                        author { id name }
156                    }
157                }
158            }
159        ";
160        let query = query.unwrap_or(default_query.to_string());
161        let mut variables = JsonMap::new();
162        variables.insert("first", 2_usize.into());
163        Self::fake_new(
164            Some(query),
165            operation_name,
166            variables,
167            extensions,
168            context,
169            headers,
170            None,
171        )
172    }
173}
174
175assert_impl_all!(Response: Send);
176#[non_exhaustive]
177pub struct Response {
178    pub response: http::Response<graphql::ResponseStream>,
179    pub context: Context,
180}
181
182impl std::fmt::Debug for Response {
183    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184        f.debug_struct("Response")
185            .field("context", &self.context)
186            .finish()
187    }
188}
189
190#[buildstructor::buildstructor]
191impl Response {
192    /// This is the constructor (or builder) to use when constructing a real Response..
193    ///
194    /// Required parameters are required in non-testing code to create a Response..
195    #[builder(visibility = "pub")]
196    fn new(
197        label: Option<String>,
198        data: Option<Value>,
199        path: Option<Path>,
200        errors: Vec<Error>,
201        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
202        extensions: JsonMap<ByteString, Value>,
203        status_code: Option<StatusCode>,
204        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
205        context: Context,
206    ) -> Result<Self, BoxError> {
207        if !errors.is_empty() {
208            context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
209        }
210        // Build a response
211        let b = graphql::Response::builder()
212            .and_label(label)
213            .and_path(path)
214            .errors(errors)
215            .extensions(extensions);
216        let res = match data {
217            Some(data) => b.data(data).build(),
218            None => b.build(),
219        };
220
221        // Build an http Response
222        let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
223        for (key, values) in headers {
224            let header_name: HeaderName = key.try_into()?;
225            for value in values {
226                let header_value: HeaderValue = value.try_into()?;
227                builder = builder.header(header_name.clone(), header_value);
228            }
229        }
230
231        let response = builder.body(once(ready(res)).boxed())?;
232
233        Ok(Self { response, context })
234    }
235
236    /// This is the constructor (or builder) to use when constructing a "fake" Response.
237    ///
238    /// This does not enforce the provision of the data that is required for a fully functional
239    /// Response. It's usually enough for testing, when a fully constructed Response is
240    /// difficult to construct and not required for the purposes of the test.
241    ///
242    /// In addition, fake responses are expected to be valid, and will panic if given invalid values.
243    #[builder(visibility = "pub")]
244    fn fake_new(
245        label: Option<String>,
246        data: Option<Value>,
247        path: Option<Path>,
248        errors: Vec<Error>,
249        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
250        extensions: JsonMap<ByteString, Value>,
251        status_code: Option<StatusCode>,
252        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
253        context: Option<Context>,
254    ) -> Result<Self, BoxError> {
255        Response::new(
256            label,
257            data,
258            path,
259            errors,
260            extensions,
261            status_code,
262            headers,
263            context.unwrap_or_default(),
264        )
265    }
266
267    /// This is the constructor (or builder) to use when constructing a "fake" Response stream.
268    ///
269    /// This does not enforce the provision of the data that is required for a fully functional
270    /// Response. It's usually enough for testing, when a fully constructed Response is
271    /// difficult to construct and not required for the purposes of the test.
272    ///
273    /// In addition, fake responses are expected to be valid, and will panic if given invalid values.
274    #[builder(visibility = "pub")]
275    fn fake_stream_new(
276        responses: Vec<graphql::Response>,
277        status_code: Option<StatusCode>,
278        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
279        context: Context,
280    ) -> Result<Self, BoxError> {
281        let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
282        for (key, values) in headers {
283            let header_name: HeaderName = key.try_into()?;
284            for value in values {
285                let header_value: HeaderValue = value.try_into()?;
286                builder = builder.header(header_name.clone(), header_value);
287            }
288        }
289
290        let stream = futures::stream::iter(responses);
291        let response = builder.body(stream.boxed())?;
292        Ok(Self { response, context })
293    }
294
295    /// This is the constructor (or builder) to use when constructing a Response that represents a global error.
296    /// It has no path and no response data.
297    /// This is useful for things such as authentication errors.
298    #[builder(visibility = "pub")]
299    fn error_new(
300        errors: Vec<Error>,
301        status_code: Option<StatusCode>,
302        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
303        context: Context,
304    ) -> Result<Self, BoxError> {
305        Response::new(
306            Default::default(),
307            Default::default(),
308            None,
309            errors,
310            Default::default(),
311            status_code,
312            headers,
313            context,
314        )
315    }
316
317    /// This is the constructor (or builder) to use when constructing a real Response..
318    ///
319    /// Required parameters are required in non-testing code to create a Response..
320    #[builder(visibility = "pub(crate)")]
321    fn infallible_new(
322        label: Option<String>,
323        data: Option<Value>,
324        path: Option<Path>,
325        errors: Vec<Error>,
326        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
327        extensions: JsonMap<ByteString, Value>,
328        status_code: Option<StatusCode>,
329        headers: MultiMap<HeaderName, HeaderValue>,
330        context: Context,
331    ) -> Self {
332        if !errors.is_empty() {
333            context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
334        }
335        // Build a response
336        let b = graphql::Response::builder()
337            .and_label(label)
338            .and_path(path)
339            .errors(errors)
340            .extensions(extensions);
341        let res = match data {
342            Some(data) => b.data(data).build(),
343            None => b.build(),
344        };
345
346        // Build an http Response
347        let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
348        for (header_name, values) in headers {
349            for header_value in values {
350                builder = builder.header(header_name.clone(), header_value);
351            }
352        }
353
354        let response = builder.body(once(ready(res)).boxed()).expect("can't fail");
355
356        Self { response, context }
357    }
358
359    pub(crate) fn new_from_graphql_response(response: graphql::Response, context: Context) -> Self {
360        if !response.errors.is_empty() {
361            context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
362        }
363
364        Self {
365            response: http::Response::new(once(ready(response)).boxed()),
366            context,
367        }
368    }
369}
370
371impl Response {
372    pub async fn next_response(&mut self) -> Option<graphql::Response> {
373        self.response.body_mut().next().await
374    }
375
376    pub(crate) fn new_from_response(
377        response: http::Response<graphql::ResponseStream>,
378        context: Context,
379    ) -> Self {
380        Self { context, response }.check_for_errors()
381    }
382
383    pub fn map<F>(self, f: F) -> Response
384    where
385        F: FnOnce(graphql::ResponseStream) -> graphql::ResponseStream,
386    {
387        Response {
388            context: self.context,
389            response: self.response.map(f),
390        }
391    }
392
393    /// Returns a new supergraph response where each [`graphql::Response`] is mapped through `f`.
394    ///
395    /// In supergraph and execution services, the service response contains
396    /// not just one GraphQL response but a stream of them,
397    /// in order to support features such as `@defer`.
398    /// This method uses [`futures::stream::StreamExt::map`] to map over each item in the stream.
399    ///
400    /// # Example
401    ///
402    /// ```
403    /// use apollo_router::services::supergraph;
404    /// use apollo_router::layers::ServiceExt as _;
405    /// use tower::ServiceExt as _;
406    ///
407    /// struct ExamplePlugin;
408    ///
409    /// #[async_trait::async_trait]
410    /// impl apollo_router::plugin::Plugin for ExamplePlugin {
411    ///     # type Config = ();
412    ///     # async fn new(
413    ///     #     _init: apollo_router::plugin::PluginInit<Self::Config>,
414    ///     # ) -> Result<Self, tower::BoxError> {
415    ///     #     Ok(Self)
416    ///     # }
417    ///     // …
418    ///     fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
419    ///         inner
420    ///             .map_response(|supergraph_response| {
421    ///                 supergraph_response.map_stream(|graphql_response| {
422    ///                     // Something interesting here
423    ///                     graphql_response
424    ///                 })
425    ///             })
426    ///             .boxed()
427    ///     }
428    /// }
429    /// ```
430    pub fn map_stream<F>(self, f: F) -> Self
431    where
432        F: 'static + Send + FnMut(graphql::Response) -> graphql::Response,
433    {
434        self.map(move |stream| stream.map(f).boxed())
435    }
436
437    fn check_for_errors(self) -> Self {
438        let context = self.context.clone();
439        self.map_stream(move |response| {
440            if !response.errors.is_empty() {
441                context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
442            }
443            response
444        })
445    }
446}
447
448#[cfg(test)]
449mod test {
450    use http::HeaderValue;
451    use http::Method;
452    use http::Uri;
453    use serde_json::json;
454
455    use super::*;
456    use crate::graphql;
457
458    #[test]
459    fn supergraph_request_builder() {
460        let request = Request::builder()
461            .header("a", "b")
462            .header("a", "c")
463            .uri(Uri::from_static("http://example.com"))
464            .method(Method::POST)
465            .query("query { topProducts }")
466            .operation_name("Default")
467            .context(Context::new())
468            // We need to follow up on this. How can users creat this easily?
469            .extension("foo", json!({}))
470            // We need to follow up on this. How can users creat this easily?
471            .variable("bar", json!({}))
472            .build()
473            .unwrap();
474        assert_eq!(
475            request
476                .supergraph_request
477                .headers()
478                .get_all("a")
479                .into_iter()
480                .collect::<Vec<_>>(),
481            vec![HeaderValue::from_static("b"), HeaderValue::from_static("c")]
482        );
483        assert_eq!(
484            request.supergraph_request.uri(),
485            &Uri::from_static("http://example.com")
486        );
487        assert_eq!(
488            request.supergraph_request.body().extensions.get("foo"),
489            Some(&json!({}).into())
490        );
491        assert_eq!(
492            request.supergraph_request.body().variables.get("bar"),
493            Some(&json!({}).into())
494        );
495        assert_eq!(request.supergraph_request.method(), Method::POST);
496
497        let extensions = serde_json_bytes::Value::from(json!({"foo":{}}))
498            .as_object()
499            .unwrap()
500            .clone();
501
502        let variables = serde_json_bytes::Value::from(json!({"bar":{}}))
503            .as_object()
504            .unwrap()
505            .clone();
506        assert_eq!(
507            request.supergraph_request.body(),
508            &graphql::Request::builder()
509                .variables(variables)
510                .extensions(extensions)
511                .operation_name("Default")
512                .query("query { topProducts }")
513                .build()
514        );
515    }
516
517    #[tokio::test]
518    async fn supergraph_response_builder() {
519        let mut response = Response::builder()
520            .header("a", "b")
521            .header("a", "c")
522            .context(Context::new())
523            .extension("foo", json!({}))
524            .data(json!({}))
525            .build()
526            .unwrap();
527
528        assert_eq!(
529            response
530                .response
531                .headers()
532                .get_all("a")
533                .into_iter()
534                .collect::<Vec<_>>(),
535            vec![HeaderValue::from_static("b"), HeaderValue::from_static("c")]
536        );
537        let extensions = serde_json_bytes::Value::from(json!({"foo":{}}))
538            .as_object()
539            .unwrap()
540            .clone();
541        assert_eq!(
542            response.next_response().await.unwrap(),
543            graphql::Response::builder()
544                .extensions(extensions)
545                .data(json!({}))
546                .build()
547        );
548    }
549}