Skip to main content

apollo_router/services/
supergraph.rs

1#![allow(missing_docs)] // FIXME
2
3use futures::future::ready;
4use futures::stream::StreamExt;
5use futures::stream::once;
6use http::HeaderValue;
7use http::StatusCode;
8use http::Uri;
9use http::header::HeaderName;
10use http::method::Method;
11use mime::APPLICATION_JSON;
12use multimap::MultiMap;
13use serde_json_bytes::ByteString;
14use serde_json_bytes::Map as JsonMap;
15use serde_json_bytes::Value;
16use static_assertions::assert_impl_all;
17use tower::BoxError;
18
19use crate::Context;
20use crate::error::Error;
21use crate::graphql;
22use crate::http_ext::TryIntoHeaderName;
23use crate::http_ext::TryIntoHeaderValue;
24use crate::http_ext::header_map;
25use crate::json_ext::Path;
26
27pub(crate) mod service;
28#[cfg(test)]
29mod tests;
30
31pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
32pub type BoxCloneService = tower::util::BoxCloneService<Request, Response, BoxError>;
33pub type ServiceResult = Result<Response, BoxError>;
34
35assert_impl_all!(Request: Send);
36/// Represents the router processing step of the processing pipeline.
37///
38/// This consists of the parsed graphql Request, HTTP headers and contextual data for extensions.
39#[non_exhaustive]
40pub struct Request {
41    /// Original request to the Router.
42    pub supergraph_request: http::Request<graphql::Request>,
43
44    /// Context for extension
45    pub context: Context,
46}
47
48impl From<http::Request<graphql::Request>> for Request {
49    fn from(supergraph_request: http::Request<graphql::Request>) -> Self {
50        Self {
51            supergraph_request,
52            context: Context::new(),
53        }
54    }
55}
56
57impl std::fmt::Debug for Request {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("Request")
60            // .field("supergraph_request", &self.supergraph_request)
61            .field("context", &self.context)
62            .finish()
63    }
64}
65
66#[buildstructor::buildstructor]
67impl Request {
68    /// This is the constructor (or builder) to use when constructing a real Request.
69    ///
70    /// Required parameters are required in non-testing code to create a Request.
71    #[builder(visibility = "pub")]
72    fn new(
73        query: Option<String>,
74        operation_name: Option<String>,
75        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
76        variables: JsonMap<ByteString, Value>,
77        extensions: JsonMap<ByteString, Value>,
78        context: Context,
79        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
80        uri: Uri,
81        method: Method,
82    ) -> Result<Request, BoxError> {
83        let gql_request = graphql::Request::builder()
84            .and_query(query)
85            .and_operation_name(operation_name)
86            .variables(variables)
87            .extensions(extensions)
88            .build();
89        let mut supergraph_request = http::Request::builder()
90            .uri(uri)
91            .method(method)
92            .body(gql_request)?;
93        *supergraph_request.headers_mut() = header_map(headers)?;
94        Ok(Self {
95            supergraph_request,
96            context,
97        })
98    }
99
100    /// This is the constructor (or builder) to use when constructing a "fake" Request.
101    ///
102    /// This does not enforce the provision of the data that is required for a fully functional
103    /// Request. It's usually enough for testing, when a fully constructed Request is
104    /// difficult to construct and not required for the purposes of the test.
105    ///
106    /// In addition, fake requests are expected to be valid, and will panic if given invalid values.
107    #[builder(visibility = "pub")]
108    fn fake_new(
109        query: Option<String>,
110        operation_name: Option<String>,
111        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
112        variables: JsonMap<ByteString, Value>,
113        extensions: JsonMap<ByteString, Value>,
114        context: Option<Context>,
115        mut headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
116        method: Option<Method>,
117    ) -> Result<Request, BoxError> {
118        // Avoid testing requests getting blocked by the CSRF-prevention plugin
119        headers
120            .entry(http::header::CONTENT_TYPE.into())
121            .or_insert(HeaderValue::from_static(APPLICATION_JSON.essence_str()).into());
122        let context = context.unwrap_or_default();
123
124        Request::new(
125            query,
126            operation_name,
127            variables,
128            extensions,
129            context,
130            headers,
131            Uri::from_static("http://default"),
132            method.unwrap_or(Method::POST),
133        )
134    }
135
136    /// Create a request with an example query, for tests
137    #[builder(visibility = "pub")]
138    fn canned_new(
139        query: Option<String>,
140        operation_name: Option<String>,
141        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
142        extensions: JsonMap<ByteString, Value>,
143        context: Option<Context>,
144        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
145    ) -> Result<Request, BoxError> {
146        let default_query = "
147            query TopProducts($first: Int) {
148                topProducts(first: $first) {
149                    upc
150                    name
151                    reviews {
152                        id
153                        product { name }
154                        author { id name }
155                    }
156                }
157            }
158        ";
159        let query = query.unwrap_or(default_query.to_string());
160        let mut variables = JsonMap::new();
161        variables.insert("first", 2_usize.into());
162        Self::fake_new(
163            Some(query),
164            operation_name,
165            variables,
166            extensions,
167            context,
168            headers,
169            None,
170        )
171    }
172}
173
174assert_impl_all!(Response: Send);
175#[non_exhaustive]
176pub struct Response {
177    pub response: http::Response<graphql::ResponseStream>,
178    pub context: Context,
179}
180
181#[buildstructor::buildstructor]
182impl Response {
183    /// This is the constructor (or builder) to use when constructing a real Response..
184    ///
185    /// Required parameters are required in non-testing code to create a Response..
186    #[builder(visibility = "pub")]
187    fn new(
188        label: Option<String>,
189        data: Option<Value>,
190        path: Option<Path>,
191        errors: Vec<Error>,
192        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
193        extensions: JsonMap<ByteString, Value>,
194        status_code: Option<StatusCode>,
195        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
196        context: Context,
197    ) -> Result<Self, BoxError> {
198        // Build a response
199        let b = graphql::Response::builder()
200            .and_label(label)
201            .and_path(path)
202            .errors(errors)
203            .extensions(extensions);
204        let res = match data {
205            Some(data) => b.data(data).build(),
206            None => b.build(),
207        };
208
209        // Build an http Response
210        let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
211        for (key, values) in headers {
212            let header_name: HeaderName = key.try_into()?;
213            for value in values {
214                let header_value: HeaderValue = value.try_into()?;
215                builder = builder.header(header_name.clone(), header_value);
216            }
217        }
218
219        let response = builder.body(once(ready(res)).boxed())?;
220
221        Ok(Self { response, context })
222    }
223
224    /// This is the constructor (or builder) to use when constructing a "fake" Response.
225    ///
226    /// This does not enforce the provision of the data that is required for a fully functional
227    /// Response. It's usually enough for testing, when a fully constructed Response is
228    /// difficult to construct and not required for the purposes of the test.
229    ///
230    /// In addition, fake responses are expected to be valid, and will panic if given invalid values.
231    #[builder(visibility = "pub")]
232    fn fake_new(
233        label: Option<String>,
234        data: Option<Value>,
235        path: Option<Path>,
236        errors: Vec<Error>,
237        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
238        extensions: JsonMap<ByteString, Value>,
239        status_code: Option<StatusCode>,
240        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
241        context: Option<Context>,
242    ) -> Result<Self, BoxError> {
243        Response::new(
244            label,
245            data,
246            path,
247            errors,
248            extensions,
249            status_code,
250            headers,
251            context.unwrap_or_default(),
252        )
253    }
254
255    /// This is the constructor (or builder) to use when constructing a "fake" Response stream.
256    ///
257    /// This does not enforce the provision of the data that is required for a fully functional
258    /// Response. It's usually enough for testing, when a fully constructed Response is
259    /// difficult to construct and not required for the purposes of the test.
260    ///
261    /// In addition, fake responses are expected to be valid, and will panic if given invalid values.
262    #[builder(visibility = "pub")]
263    fn fake_stream_new(
264        responses: Vec<graphql::Response>,
265        status_code: Option<StatusCode>,
266        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
267        context: Context,
268    ) -> Result<Self, BoxError> {
269        let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
270        for (key, values) in headers {
271            let header_name: HeaderName = key.try_into()?;
272            for value in values {
273                let header_value: HeaderValue = value.try_into()?;
274                builder = builder.header(header_name.clone(), header_value);
275            }
276        }
277
278        let stream = futures::stream::iter(responses);
279        let response = builder.body(stream.boxed())?;
280        Ok(Self { response, context })
281    }
282
283    /// This is the constructor (or builder) to use when constructing a Response that represents a global error.
284    /// It has no path and no response data.
285    /// This is useful for things such as authentication errors.
286    #[builder(visibility = "pub")]
287    fn error_new(
288        errors: Vec<Error>,
289        status_code: Option<StatusCode>,
290        headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
291        context: Context,
292    ) -> Result<Self, BoxError> {
293        Response::new(
294            Default::default(),
295            Default::default(),
296            None,
297            errors,
298            Default::default(),
299            status_code,
300            headers,
301            context,
302        )
303    }
304
305    /// This is the constructor (or builder) to use when constructing a real Response..
306    ///
307    /// Required parameters are required in non-testing code to create a Response..
308    #[builder(visibility = "pub(crate)")]
309    fn infallible_new(
310        label: Option<String>,
311        data: Option<Value>,
312        path: Option<Path>,
313        errors: Vec<Error>,
314        // Skip the `Object` type alias in order to use buildstructor’s map special-casing
315        extensions: JsonMap<ByteString, Value>,
316        status_code: Option<StatusCode>,
317        headers: MultiMap<HeaderName, HeaderValue>,
318        context: Context,
319    ) -> Self {
320        // Build a response
321        let b = graphql::Response::builder()
322            .and_label(label)
323            .and_path(path)
324            .errors(errors)
325            .extensions(extensions);
326        let res = match data {
327            Some(data) => b.data(data).build(),
328            None => b.build(),
329        };
330
331        // Build an http Response
332        let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
333        for (header_name, values) in headers {
334            for header_value in values {
335                builder = builder.header(header_name.clone(), header_value);
336            }
337        }
338
339        let response = builder.body(once(ready(res)).boxed()).expect("can't fail");
340
341        Self { response, context }
342    }
343
344    pub(crate) fn new_from_graphql_response(response: graphql::Response, context: Context) -> Self {
345        Self {
346            response: http::Response::new(once(ready(response)).boxed()),
347            context,
348        }
349    }
350}
351
352impl Response {
353    pub async fn next_response(&mut self) -> Option<graphql::Response> {
354        self.response.body_mut().next().await
355    }
356
357    pub(crate) fn new_from_response(
358        response: http::Response<graphql::ResponseStream>,
359        context: Context,
360    ) -> Self {
361        Self { response, context }
362    }
363
364    pub fn map<F>(self, f: F) -> Response
365    where
366        F: FnOnce(graphql::ResponseStream) -> graphql::ResponseStream,
367    {
368        Response {
369            context: self.context,
370            response: self.response.map(f),
371        }
372    }
373
374    /// Returns a new supergraph response where each [`graphql::Response`] is mapped through `f`.
375    ///
376    /// In supergraph and execution services, the service response contains
377    /// not just one GraphQL response but a stream of them,
378    /// in order to support features such as `@defer`.
379    /// This method uses [`futures::stream::StreamExt::map`] to map over each item in the stream.
380    ///
381    /// # Example
382    ///
383    /// ```
384    /// use apollo_router::services::supergraph;
385    /// use apollo_router::layers::ServiceExt as _;
386    /// use tower::ServiceExt as _;
387    ///
388    /// struct ExamplePlugin;
389    ///
390    /// #[async_trait::async_trait]
391    /// impl apollo_router::plugin::Plugin for ExamplePlugin {
392    ///     # type Config = ();
393    ///     # async fn new(
394    ///     #     _init: apollo_router::plugin::PluginInit<Self::Config>,
395    ///     # ) -> Result<Self, tower::BoxError> {
396    ///     #     Ok(Self)
397    ///     # }
398    ///     // …
399    ///     fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
400    ///         inner
401    ///             .map_response(|supergraph_response| {
402    ///                 supergraph_response.map_stream(|graphql_response| {
403    ///                     // Something interesting here
404    ///                     graphql_response
405    ///                 })
406    ///             })
407    ///             .boxed()
408    ///     }
409    /// }
410    /// ```
411    pub fn map_stream<F>(self, f: F) -> Self
412    where
413        F: 'static + Send + FnMut(graphql::Response) -> graphql::Response,
414    {
415        self.map(move |stream| stream.map(f).boxed())
416    }
417}
418
419#[cfg(test)]
420mod test {
421    use http::HeaderValue;
422    use http::Method;
423    use http::Uri;
424    use serde_json::json;
425
426    use super::*;
427    use crate::graphql;
428
429    #[test]
430    fn supergraph_request_builder() {
431        let request = Request::builder()
432            .header("a", "b")
433            .header("a", "c")
434            .uri(Uri::from_static("http://example.com"))
435            .method(Method::POST)
436            .query("query { topProducts }")
437            .operation_name("Default")
438            .context(Context::new())
439            // We need to follow up on this. How can users creat this easily?
440            .extension("foo", json!({}))
441            // We need to follow up on this. How can users creat this easily?
442            .variable("bar", json!({}))
443            .build()
444            .unwrap();
445        assert_eq!(
446            request
447                .supergraph_request
448                .headers()
449                .get_all("a")
450                .into_iter()
451                .collect::<Vec<_>>(),
452            vec![HeaderValue::from_static("b"), HeaderValue::from_static("c")]
453        );
454        assert_eq!(
455            request.supergraph_request.uri(),
456            &Uri::from_static("http://example.com")
457        );
458        assert_eq!(
459            request.supergraph_request.body().extensions.get("foo"),
460            Some(&json!({}).into())
461        );
462        assert_eq!(
463            request.supergraph_request.body().variables.get("bar"),
464            Some(&json!({}).into())
465        );
466        assert_eq!(request.supergraph_request.method(), Method::POST);
467
468        let extensions = serde_json_bytes::Value::from(json!({"foo":{}}))
469            .as_object()
470            .unwrap()
471            .clone();
472
473        let variables = serde_json_bytes::Value::from(json!({"bar":{}}))
474            .as_object()
475            .unwrap()
476            .clone();
477        assert_eq!(
478            request.supergraph_request.body(),
479            &graphql::Request::builder()
480                .variables(variables)
481                .extensions(extensions)
482                .operation_name("Default")
483                .query("query { topProducts }")
484                .build()
485        );
486    }
487
488    #[tokio::test]
489    async fn supergraph_response_builder() {
490        let mut response = Response::builder()
491            .header("a", "b")
492            .header("a", "c")
493            .context(Context::new())
494            .extension("foo", json!({}))
495            .data(json!({}))
496            .build()
497            .unwrap();
498
499        assert_eq!(
500            response
501                .response
502                .headers()
503                .get_all("a")
504                .into_iter()
505                .collect::<Vec<_>>(),
506            vec![HeaderValue::from_static("b"), HeaderValue::from_static("c")]
507        );
508        let extensions = serde_json_bytes::Value::from(json!({"foo":{}}))
509            .as_object()
510            .unwrap()
511            .clone();
512        assert_eq!(
513            response.next_response().await.unwrap(),
514            graphql::Response::builder()
515                .extensions(extensions)
516                .data(json!({}))
517                .build()
518        );
519    }
520}