Skip to main content

apollo_router/services/
supergraph.rs

1#![allow(missing_docs)] // FIXME
2
3use futures::future::ready;
4use futures::stream::StreamExt;
5use futures::stream::once;
6use http::HeaderValue;
7use http::StatusCode;
8use http::Uri;
9use http::header::HeaderName;
10use http::method::Method;
11use mime::APPLICATION_JSON;
12use multimap::MultiMap;
13use serde_json_bytes::ByteString;
14use serde_json_bytes::Map as JsonMap;
15use serde_json_bytes::Value;
16use static_assertions::assert_impl_all;
17use tower::BoxError;
18
19use crate::Context;
20use crate::context::CHUNK_CONTAINS_GRAPHQL_ERROR;
21use crate::context::CONTAINS_GRAPHQL_ERROR;
22use crate::error::Error;
23use crate::graphql;
24use crate::http_ext::TryIntoHeaderName;
25use crate::http_ext::TryIntoHeaderValue;
26use crate::http_ext::header_map;
27use crate::json_ext::Path;
28
29pub(crate) mod service;
30#[cfg(test)]
31mod tests;
32
33pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
34pub type BoxCloneService = tower::util::BoxCloneService<Request, Response, BoxError>;
35pub type ServiceResult = Result<Response, BoxError>;
36
37assert_impl_all!(Request: Send);
38/// Represents the router processing step of the processing pipeline.
39///
40/// This consists of the parsed graphql Request, HTTP headers and contextual data for extensions.
41#[non_exhaustive]
42pub struct Request {
43    /// Original request to the Router.
44    pub supergraph_request: http::Request<graphql::Request>,
45
46    /// Context for extension
47    pub context: Context,
48}
49
50impl From<http::Request<graphql::Request>> for Request {
51    fn from(supergraph_request: http::Request<graphql::Request>) -> Self {
52        Self {
53            supergraph_request,
54            context: Context::new(),
55        }
56    }
57}
58
59impl std::fmt::Debug for Request {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct("Request")
62            // .field("supergraph_request", &self.supergraph_request)
63            .field("context", &self.context)
64            .finish()
65    }
66}
67
68#[buildstructor::buildstructor]
69impl Request {
70    /// This is the constructor (or builder) to use when constructing a real Request.
71    ///
72    /// Required parameters are required in non-testing code to create a Request.
73    #[builder(visibility = "pub")]
74    fn new(
75        query: Option<String>,
76        operation_name: Option<String>,
77        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
78        variables: JsonMap<ByteString, Value>,
79        extensions: JsonMap<ByteString, Value>,
80        context: Context,
81        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
82        uri: Uri,
83        method: Method,
84    ) -> Result<Request, BoxError> {
85        let gql_request = graphql::Request::builder()
86            .and_query(query)
87            .and_operation_name(operation_name)
88            .variables(variables)
89            .extensions(extensions)
90            .build();
91        let mut supergraph_request = http::Request::builder()
92            .uri(uri)
93            .method(method)
94            .body(gql_request)?;
95        *supergraph_request.headers_mut() = header_map(headers)?;
96        Ok(Self {
97            supergraph_request,
98            context,
99        })
100    }
101
102    /// This is the constructor (or builder) to use when constructing a "fake" Request.
103    ///
104    /// This does not enforce the provision of the data that is required for a fully functional
105    /// Request. It's usually enough for testing, when a fully constructed Request is
106    /// difficult to construct and not required for the purposes of the test.
107    ///
108    /// In addition, fake requests are expected to be valid, and will panic if given invalid values.
109    #[builder(visibility = "pub")]
110    fn fake_new(
111        query: Option<String>,
112        operation_name: Option<String>,
113        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
114        variables: JsonMap<ByteString, Value>,
115        extensions: JsonMap<ByteString, Value>,
116        context: Option<Context>,
117        mut headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
118        method: Option<Method>,
119    ) -> Result<Request, BoxError> {
120        // Avoid testing requests getting blocked by the CSRF-prevention plugin
121        headers
122            .entry(http::header::CONTENT_TYPE.into())
123            .or_insert(HeaderValue::from_static(APPLICATION_JSON.essence_str()).into());
124        let context = context.unwrap_or_default();
125
126        Request::new(
127            query,
128            operation_name,
129            variables,
130            extensions,
131            context,
132            headers,
133            Uri::from_static("http://default"),
134            method.unwrap_or(Method::POST),
135        )
136    }
137
138    /// Create a request with an example query, for tests
139    #[builder(visibility = "pub")]
140    fn canned_new(
141        query: Option<String>,
142        operation_name: Option<String>,
143        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
144        extensions: JsonMap<ByteString, Value>,
145        context: Option<Context>,
146        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
147    ) -> Result<Request, BoxError> {
148        let default_query = "
149            query TopProducts($first: Int) {
150                topProducts(first: $first) {
151                    upc
152                    name
153                    reviews {
154                        id
155                        product { name }
156                        author { id name }
157                    }
158                }
159            }
160        ";
161        let query = query.unwrap_or(default_query.to_string());
162        let mut variables = JsonMap::new();
163        variables.insert("first", 2_usize.into());
164        Self::fake_new(
165            Some(query),
166            operation_name,
167            variables,
168            extensions,
169            context,
170            headers,
171            None,
172        )
173    }
174}
175
176assert_impl_all!(Response: Send);
177#[non_exhaustive]
178pub struct Response {
179    pub response: http::Response<graphql::ResponseStream>,
180    pub context: Context,
181}
182
183impl std::fmt::Debug for Response {
184    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185        f.debug_struct("Response")
186            .field("context", &self.context)
187            .finish()
188    }
189}
190
191#[buildstructor::buildstructor]
192impl Response {
193    /// This is the constructor (or builder) to use when constructing a real Response..
194    ///
195    /// Required parameters are required in non-testing code to create a Response..
196    #[builder(visibility = "pub")]
197    fn new(
198        label: Option<String>,
199        data: Option<Value>,
200        path: Option<Path>,
201        errors: Vec<Error>,
202        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
203        extensions: JsonMap<ByteString, Value>,
204        status_code: Option<StatusCode>,
205        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
206        context: Context,
207    ) -> Result<Self, BoxError> {
208        let has_errors = !errors.is_empty();
209        if has_errors {
210            context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
211        }
212        context.insert_json_value(CHUNK_CONTAINS_GRAPHQL_ERROR, Value::Bool(has_errors));
213        // Build a response
214        let b = graphql::Response::builder()
215            .and_label(label)
216            .and_path(path)
217            .errors(errors)
218            .extensions(extensions);
219        let res = match data {
220            Some(data) => b.data(data).build(),
221            None => b.build(),
222        };
223
224        // Build an http Response
225        let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
226        for (key, values) in headers {
227            let header_name: HeaderName = key.try_into()?;
228            for value in values {
229                let header_value: HeaderValue = value.try_into()?;
230                builder = builder.header(header_name.clone(), header_value);
231            }
232        }
233
234        let response = builder.body(once(ready(res)).boxed())?;
235
236        Ok(Self { response, context })
237    }
238
239    /// This is the constructor (or builder) to use when constructing a "fake" Response.
240    ///
241    /// This does not enforce the provision of the data that is required for a fully functional
242    /// Response. It's usually enough for testing, when a fully constructed Response is
243    /// difficult to construct and not required for the purposes of the test.
244    ///
245    /// In addition, fake responses are expected to be valid, and will panic if given invalid values.
246    #[builder(visibility = "pub")]
247    fn fake_new(
248        label: Option<String>,
249        data: Option<Value>,
250        path: Option<Path>,
251        errors: Vec<Error>,
252        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
253        extensions: JsonMap<ByteString, Value>,
254        status_code: Option<StatusCode>,
255        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
256        context: Option<Context>,
257    ) -> Result<Self, BoxError> {
258        Response::new(
259            label,
260            data,
261            path,
262            errors,
263            extensions,
264            status_code,
265            headers,
266            context.unwrap_or_default(),
267        )
268    }
269
270    /// This is the constructor (or builder) to use when constructing a "fake" Response stream.
271    ///
272    /// This does not enforce the provision of the data that is required for a fully functional
273    /// Response. It's usually enough for testing, when a fully constructed Response is
274    /// difficult to construct and not required for the purposes of the test.
275    ///
276    /// In addition, fake responses are expected to be valid, and will panic if given invalid values.
277    #[builder(visibility = "pub")]
278    fn fake_stream_new(
279        responses: Vec<graphql::Response>,
280        status_code: Option<StatusCode>,
281        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
282        context: Context,
283    ) -> Result<Self, BoxError> {
284        let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
285        for (key, values) in headers {
286            let header_name: HeaderName = key.try_into()?;
287            for value in values {
288                let header_value: HeaderValue = value.try_into()?;
289                builder = builder.header(header_name.clone(), header_value);
290            }
291        }
292
293        let stream = futures::stream::iter(responses);
294        let response = builder.body(stream.boxed())?;
295        Ok(Self { response, context })
296    }
297
298    /// This is the constructor (or builder) to use when constructing a Response that represents a global error.
299    /// It has no path and no response data.
300    /// This is useful for things such as authentication errors.
301    #[builder(visibility = "pub")]
302    fn error_new(
303        errors: Vec<Error>,
304        status_code: Option<StatusCode>,
305        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
306        context: Context,
307    ) -> Result<Self, BoxError> {
308        Response::new(
309            Default::default(),
310            Default::default(),
311            None,
312            errors,
313            Default::default(),
314            status_code,
315            headers,
316            context,
317        )
318    }
319
320    /// This is the constructor (or builder) to use when constructing a real Response..
321    ///
322    /// Required parameters are required in non-testing code to create a Response..
323    #[builder(visibility = "pub(crate)")]
324    fn infallible_new(
325        label: Option<String>,
326        data: Option<Value>,
327        path: Option<Path>,
328        errors: Vec<Error>,
329        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
330        extensions: JsonMap<ByteString, Value>,
331        status_code: Option<StatusCode>,
332        headers: MultiMap<HeaderName, HeaderValue>,
333        context: Context,
334    ) -> Self {
335        let has_errors = !errors.is_empty();
336        if has_errors {
337            context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
338        }
339        context.insert_json_value(CHUNK_CONTAINS_GRAPHQL_ERROR, Value::Bool(has_errors));
340        // Build a response
341        let b = graphql::Response::builder()
342            .and_label(label)
343            .and_path(path)
344            .errors(errors)
345            .extensions(extensions);
346        let res = match data {
347            Some(data) => b.data(data).build(),
348            None => b.build(),
349        };
350
351        // Build an http Response
352        let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
353        for (header_name, values) in headers {
354            for header_value in values {
355                builder = builder.header(header_name.clone(), header_value);
356            }
357        }
358
359        let response = builder.body(once(ready(res)).boxed()).expect("can't fail");
360
361        Self { response, context }
362    }
363
364    pub(crate) fn new_from_graphql_response(response: graphql::Response, context: Context) -> Self {
365        let has_errors = !response.errors.is_empty();
366        if has_errors {
367            context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
368        }
369        context.insert_json_value(CHUNK_CONTAINS_GRAPHQL_ERROR, Value::Bool(has_errors));
370
371        Self {
372            response: http::Response::new(once(ready(response)).boxed()),
373            context,
374        }
375    }
376}
377
378impl Response {
379    pub async fn next_response(&mut self) -> Option<graphql::Response> {
380        self.response.body_mut().next().await
381    }
382
383    pub(crate) fn new_from_response(
384        response: http::Response<graphql::ResponseStream>,
385        context: Context,
386    ) -> Self {
387        Self { context, response }.check_for_errors()
388    }
389
390    pub fn map<F>(self, f: F) -> Response
391    where
392        F: FnOnce(graphql::ResponseStream) -> graphql::ResponseStream,
393    {
394        Response {
395            context: self.context,
396            response: self.response.map(f),
397        }
398    }
399
400    /// Returns a new supergraph response where each [`graphql::Response`] is mapped through `f`.
401    ///
402    /// In supergraph and execution services, the service response contains
403    /// not just one GraphQL response but a stream of them,
404    /// in order to support features such as `@defer`.
405    /// This method uses [`futures::stream::StreamExt::map`] to map over each item in the stream.
406    ///
407    /// # Example
408    ///
409    /// ```
410    /// use apollo_router::services::supergraph;
411    /// use apollo_router::layers::ServiceExt as _;
412    /// use tower::ServiceExt as _;
413    ///
414    /// struct ExamplePlugin;
415    ///
416    /// #[async_trait::async_trait]
417    /// impl apollo_router::plugin::Plugin for ExamplePlugin {
418    ///     # type Config = ();
419    ///     # async fn new(
420    ///     #     _init: apollo_router::plugin::PluginInit<Self::Config>,
421    ///     # ) -> Result<Self, tower::BoxError> {
422    ///     #     Ok(Self)
423    ///     # }
424    ///     // …
425    ///     fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
426    ///         inner
427    ///             .map_response(|supergraph_response| {
428    ///                 supergraph_response.map_stream(|graphql_response| {
429    ///                     // Something interesting here
430    ///                     graphql_response
431    ///                 })
432    ///             })
433    ///             .boxed()
434    ///     }
435    /// }
436    /// ```
437    pub fn map_stream<F>(self, f: F) -> Self
438    where
439        F: 'static + Send + FnMut(graphql::Response) -> graphql::Response,
440    {
441        self.map(move |stream| stream.map(f).boxed())
442    }
443
444    fn check_for_errors(self) -> Self {
445        let context = self.context.clone();
446        self.map_stream(move |response| {
447            let has_errors = response.contains_errors();
448            if has_errors {
449                context.insert_json_value(CONTAINS_GRAPHQL_ERROR, Value::Bool(true));
450            }
451            context.insert_json_value(CHUNK_CONTAINS_GRAPHQL_ERROR, Value::Bool(has_errors));
452            response
453        })
454    }
455}
456
457#[cfg(test)]
458mod test {
459    use http::HeaderValue;
460    use http::Method;
461    use http::Uri;
462    use serde_json::json;
463
464    use super::*;
465    use crate::graphql;
466
467    #[test]
468    fn supergraph_request_builder() {
469        let request = Request::builder()
470            .header("a", "b")
471            .header("a", "c")
472            .uri(Uri::from_static("http://example.com"))
473            .method(Method::POST)
474            .query("query { topProducts }")
475            .operation_name("Default")
476            .context(Context::new())
477            // We need to follow up on this. How can users creat this easily?
478            .extension("foo", json!({}))
479            // We need to follow up on this. How can users creat this easily?
480            .variable("bar", json!({}))
481            .build()
482            .unwrap();
483        assert_eq!(
484            request
485                .supergraph_request
486                .headers()
487                .get_all("a")
488                .into_iter()
489                .collect::<Vec<_>>(),
490            vec![HeaderValue::from_static("b"), HeaderValue::from_static("c")]
491        );
492        assert_eq!(
493            request.supergraph_request.uri(),
494            &Uri::from_static("http://example.com")
495        );
496        assert_eq!(
497            request.supergraph_request.body().extensions.get("foo"),
498            Some(&json!({}).into())
499        );
500        assert_eq!(
501            request.supergraph_request.body().variables.get("bar"),
502            Some(&json!({}).into())
503        );
504        assert_eq!(request.supergraph_request.method(), Method::POST);
505
506        let extensions = serde_json_bytes::Value::from(json!({"foo":{}}))
507            .as_object()
508            .unwrap()
509            .clone();
510
511        let variables = serde_json_bytes::Value::from(json!({"bar":{}}))
512            .as_object()
513            .unwrap()
514            .clone();
515        assert_eq!(
516            request.supergraph_request.body(),
517            &graphql::Request::builder()
518                .variables(variables)
519                .extensions(extensions)
520                .operation_name("Default")
521                .query("query { topProducts }")
522                .build()
523        );
524    }
525
526    #[tokio::test]
527    async fn supergraph_response_builder() {
528        let mut response = Response::builder()
529            .header("a", "b")
530            .header("a", "c")
531            .context(Context::new())
532            .extension("foo", json!({}))
533            .data(json!({}))
534            .build()
535            .unwrap();
536
537        assert_eq!(
538            response
539                .response
540                .headers()
541                .get_all("a")
542                .into_iter()
543                .collect::<Vec<_>>(),
544            vec![HeaderValue::from_static("b"), HeaderValue::from_static("c")]
545        );
546        let extensions = serde_json_bytes::Value::from(json!({"foo":{}}))
547            .as_object()
548            .unwrap()
549            .clone();
550        assert_eq!(
551            response.next_response().await.unwrap(),
552            graphql::Response::builder()
553                .extensions(extensions)
554                .data(json!({}))
555                .build()
556        );
557    }
558}