Skip to main content

apollo_router/layers/
mod.rs

1//! Reusable layers
2//! Layers that are specific to one plugin should not be placed in this module.
3use std::future::Future;
4use std::ops::ControlFlow;
5
6use tower::BoxError;
7use tower::ServiceBuilder;
8use tower::layer::util::Stack;
9use tower_service::Service;
10use tracing::Span;
11
12use self::map_first_graphql_response::MapFirstGraphqlResponseLayer;
13use self::map_first_graphql_response::MapFirstGraphqlResponseService;
14use crate::Context;
15use crate::graphql;
16use crate::layers::async_checkpoint::AsyncCheckpointLayer;
17use crate::layers::instrument::InstrumentLayer;
18use crate::layers::map_future_with_request_data::MapFutureWithRequestDataLayer;
19use crate::layers::map_future_with_request_data::MapFutureWithRequestDataService;
20use crate::layers::sync_checkpoint::CheckpointLayer;
21use crate::layers::unconstrained_buffer::UnconstrainedBufferLayer;
22use crate::services::supergraph;
23
24pub mod async_checkpoint;
25pub mod instrument;
26pub mod map_first_graphql_response;
27pub mod map_future_with_request_data;
28pub mod sync_checkpoint;
29pub mod unconstrained_buffer;
30
31// Note: We use Buffer in many places throughout the router. 50_000 represents
32// the "maximal number of requests that can be queued for the buffered
33// service before backpressure is applied to callers". We set this to be
34// so high, 50_000, because we anticipate that many users will want to
35//
36// Think of this as a backstop for when there are no other backpressure
37// enforcing limits configured in a router. In future we may tweak this
38// value higher or lower or expose it as a configurable.
39pub(crate) const DEFAULT_BUFFER_SIZE: usize = 50_000;
40
41/// Extension to the [`ServiceBuilder`] trait to make it easy to add router specific capabilities
42/// (e.g.: checkpoints) to a [`Service`].
43#[allow(clippy::type_complexity)]
44pub trait ServiceBuilderExt<L>: Sized {
45    /// Decide if processing should continue or not, and if not allow returning of a response.
46    ///
47    /// This is useful for validation functionality where you want to abort processing but return a
48    /// valid response.
49    ///
50    /// # Arguments
51    ///
52    /// * `checkpoint_fn`: Ths callback to decides if processing should continue or not.
53    ///
54    /// returns: ServiceBuilder<Stack<CheckpointLayer<S, Request>, L>>
55    ///
56    /// # Examples
57    ///
58    /// ```rust
59    /// # use std::ops::ControlFlow;
60    /// # use http::Method;
61    /// # use tower::ServiceBuilder;
62    /// # use tower_service::Service;
63    /// # use tracing::info_span;
64    /// # use apollo_router::services::supergraph;
65    /// # use apollo_router::layers::ServiceBuilderExt;
66    /// # fn test(service: supergraph::BoxService) {
67    /// let _ = ServiceBuilder::new()
68    ///     .checkpoint(|req: supergraph::Request|{
69    ///         if req.supergraph_request.method() == Method::GET {
70    ///             Ok(ControlFlow::Break(supergraph::Response::builder()
71    ///                 .data("Only get requests allowed")
72    ///                 .context(req.context)
73    ///                 .build()?))
74    ///         } else {
75    ///             Ok(ControlFlow::Continue(req))
76    ///         }
77    ///     })
78    ///     .service(service);
79    /// # }
80    /// ```
81    fn checkpoint<S, Request>(
82        self,
83        checkpoint_fn: impl Fn(
84            Request,
85        ) -> Result<
86            ControlFlow<<S as Service<Request>>::Response, Request>,
87            <S as Service<Request>>::Error,
88        > + Send
89        + Sync
90        + 'static,
91    ) -> ServiceBuilder<Stack<CheckpointLayer<S, Request>, L>>
92    where
93        S: Service<Request> + Send + 'static,
94        Request: Send + 'static,
95        S::Future: Send,
96        S::Response: Send + 'static,
97        S::Error: Into<BoxError> + Send + 'static,
98    {
99        self.layer(CheckpointLayer::new(checkpoint_fn))
100    }
101
102    /// Decide if processing should continue or not, and if not allow returning of a response.
103    /// Unlike checkpoint it is possible to perform async operations in the callback. However
104    /// this requires that the service is `Clone`. This can be achieved using `.buffered()`.
105    ///
106    /// This is useful for things like authentication where you need to make an external call to
107    /// check if a request should proceed or not.
108    ///
109    /// # Arguments
110    ///
111    /// * `async_checkpoint_fn`: The asynchronous callback to decide if processing should continue or not.
112    ///
113    /// returns: ServiceBuilder<Stack<AsyncCheckpointLayer<S, Request>, L>>
114    ///
115    /// # Examples
116    ///
117    /// ```rust
118    /// # use std::ops::ControlFlow;
119    /// use futures::FutureExt;
120    /// # use http::Method;
121    /// # use tower::ServiceBuilder;
122    /// # use tower_service::Service;
123    /// # use tracing::info_span;
124    /// # use apollo_router::services::supergraph;
125    /// # use apollo_router::layers::ServiceBuilderExt;
126    /// # fn test(service: supergraph::BoxService) {
127    /// let _ = ServiceBuilder::new()
128    ///     .checkpoint_async(|req: supergraph::Request|
129    ///         async {
130    ///             if req.supergraph_request.method() == Method::GET {
131    ///                 Ok(ControlFlow::Break(supergraph::Response::builder()
132    ///                     .data("Only get requests allowed")
133    ///                     .context(req.context)
134    ///                     .build()?))
135    ///             } else {
136    ///                 Ok(ControlFlow::Continue(req))
137    ///             }
138    ///         }
139    ///         .boxed()
140    ///     )
141    ///     .buffered()
142    ///     .service(service);
143    /// # }
144    /// ```
145    fn checkpoint_async<F, S, Fut, Request>(
146        self,
147        async_checkpoint_fn: F,
148    ) -> ServiceBuilder<Stack<AsyncCheckpointLayer<S, Fut, Request>, L>>
149    where
150        S: Service<Request, Error = BoxError> + Clone + Send + 'static,
151        Fut: Future<
152            Output = Result<ControlFlow<<S as Service<Request>>::Response, Request>, BoxError>,
153        >,
154        F: Fn(Request) -> Fut + Send + Sync + 'static,
155    {
156        self.layer(AsyncCheckpointLayer::new(async_checkpoint_fn))
157    }
158
159    /// Adds a buffer to the service stack with a default size.
160    ///
161    /// This is useful for making services `Clone` and `Send`
162    ///
163    /// # Examples
164    ///
165    /// ```rust
166    /// # use tower::ServiceBuilder;
167    /// # use tower_service::Service;
168    /// # use tracing::info_span;
169    /// # use apollo_router::services::supergraph;
170    /// # use apollo_router::layers::ServiceBuilderExt;
171    /// # fn test(service: supergraph::BoxService) {
172    /// let _ = ServiceBuilder::new()
173    ///             .buffered()
174    ///             .service(service);
175    /// # }
176    /// ```
177    fn buffered<Request>(self) -> ServiceBuilder<Stack<UnconstrainedBufferLayer<Request>, L>>;
178
179    /// Place a span around the request.
180    ///
181    /// This is useful for adding a new span with custom attributes to tracing.
182    ///
183    /// Note that it is not possible to add extra attributes to existing spans. However, you can add
184    /// empty placeholder attributes to your span if you want to supply those attributes later.
185    ///
186    /// # Arguments
187    ///
188    /// * `span_fn`: The callback to create the span given the request.
189    ///
190    /// returns: ServiceBuilder<Stack<InstrumentLayer<F, Request>, L>>
191    ///
192    /// # Examples
193    ///
194    /// ```rust
195    /// # use tower::ServiceBuilder;
196    /// # use tower_service::Service;
197    /// # use tracing::info_span;
198    /// # use apollo_router::services::supergraph;
199    /// # use apollo_router::layers::ServiceBuilderExt;
200    /// # fn test(service: supergraph::BoxService) {
201    /// let instrumented = ServiceBuilder::new()
202    ///             .instrument(|_request| info_span!("query_planning"))
203    ///             .service(service);
204    /// # }
205    /// ```
206    fn instrument<F, Request>(
207        self,
208        span_fn: F,
209    ) -> ServiceBuilder<Stack<InstrumentLayer<F, Request>, L>>
210    where
211        F: Fn(&Request) -> Span,
212    {
213        self.layer(InstrumentLayer::new(span_fn))
214    }
215
216    /// Maps HTTP parts, as well as the first GraphQL response, to different values.
217    ///
218    /// In supergraph and execution services, the service response contains
219    /// not just one GraphQL response but a stream of them,
220    /// in order to support features such as `@defer`.
221    ///
222    /// This method wraps a service and calls a `callback` when the first GraphQL response
223    /// in the stream returned by the inner service becomes available.
224    /// The callback can then access the HTTP parts (headers, status code, etc)
225    /// or the first GraphQL response before returning them.
226    ///
227    /// Note that any subsequent GraphQL responses after the first will be forwarded unmodified.
228    /// In order to inspect or modify all GraphQL responses,
229    /// consider using [`map_response`][tower::ServiceExt::map_response]
230    /// together with [`supergraph::Response::map_stream`] instead.
231    /// (See the example in `map_stream`’s documentation.)
232    /// In that case however HTTP parts cannot be modified because they may have already been sent.
233    ///
234    /// # Example
235    ///
236    /// ```
237    /// use apollo_router::services::supergraph;
238    /// use apollo_router::layers::ServiceBuilderExt as _;
239    /// use tower::ServiceExt as _;
240    ///
241    /// struct ExamplePlugin;
242    ///
243    /// #[async_trait::async_trait]
244    /// impl apollo_router::plugin::Plugin for ExamplePlugin {
245    ///     # type Config = ();
246    ///     # async fn new(
247    ///     #     _init: apollo_router::plugin::PluginInit<Self::Config>,
248    ///     # ) -> Result<Self, tower::BoxError> {
249    ///     #     Ok(Self)
250    ///     # }
251    ///     // …
252    ///     fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
253    ///         tower::ServiceBuilder::new()
254    ///             .map_first_graphql_response(|context, mut http_parts, mut graphql_response| {
255    ///                 // Something interesting here
256    ///                 (http_parts, graphql_response)
257    ///             })
258    ///             .service(inner)
259    ///             .boxed()
260    ///     }
261    /// }
262    /// ```
263    fn map_first_graphql_response<Callback>(
264        self,
265        callback: Callback,
266    ) -> ServiceBuilder<Stack<MapFirstGraphqlResponseLayer<Callback>, L>>
267    where
268        Callback: FnOnce(
269                Context,
270                http::response::Parts,
271                graphql::Response,
272            ) -> (http::response::Parts, graphql::Response)
273            + Clone
274            + Send
275            + 'static,
276    {
277        self.layer(MapFirstGraphqlResponseLayer { callback })
278    }
279
280    /// Similar to map_future but also providing an opportunity to extract information out of the
281    /// request for use when constructing the response.
282    ///
283    /// # Arguments
284    ///
285    /// * `req_fn`: The callback to extract data from the request.
286    /// * `map_fn`: The callback to map the future.
287    ///
288    /// returns: ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>>
289    ///
290    /// # Examples
291    ///
292    /// ```rust
293    /// # use std::future::Future;
294    /// # use tower::{BoxError, ServiceBuilder, ServiceExt};
295    /// # use tower::util::BoxService;
296    /// # use tower_service::Service;
297    /// # use tracing::info_span;
298    /// # use apollo_router::Context;
299    /// # use apollo_router::services::supergraph;
300    /// # use apollo_router::layers::ServiceBuilderExt;
301    /// # fn test(service: supergraph::BoxService) {
302    /// let _ : supergraph::BoxService = ServiceBuilder::new()
303    ///     .map_future_with_request_data(
304    ///         |req: &supergraph::Request| req.context.clone(),
305    ///         |ctx : Context, fut| async { fut.await })
306    ///     .service(service)
307    ///     .boxed();
308    /// # }
309    /// ```
310    fn map_future_with_request_data<RF, MF>(
311        self,
312        req_fn: RF,
313        map_fn: MF,
314    ) -> ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>> {
315        self.layer(MapFutureWithRequestDataLayer::new(req_fn, map_fn))
316    }
317
318    /// Utility function to allow us to specify default methods on this trait rather than duplicating in the impl.
319    ///
320    /// # Arguments
321    ///
322    /// * `layer`: The layer to add to the service stack.
323    ///
324    /// returns: ServiceBuilder<Stack<T, L>>
325    ///
326    fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>>;
327}
328
329#[allow(clippy::type_complexity)]
330impl<L> ServiceBuilderExt<L> for ServiceBuilder<L> {
331    fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>> {
332        ServiceBuilder::layer(self, layer)
333    }
334
335    fn buffered<Request>(self) -> ServiceBuilder<Stack<UnconstrainedBufferLayer<Request>, L>> {
336        self.layer(UnconstrainedBufferLayer::new(DEFAULT_BUFFER_SIZE))
337    }
338}
339
340/// Extension trait for [`Service`].
341///
342/// Importing both this trait and [`tower::ServiceExt`] could lead a name collision error.
343/// To work around that, use `as _` syntax to make a trait’s methods available in a module
344/// without assigning it a name in that module’s namespace.
345///
346/// ```
347/// use apollo_router::layers::ServiceExt as _;
348/// use tower::ServiceExt as _;
349/// ```
350pub trait ServiceExt<Request>: Service<Request> {
351    /// Maps HTTP parts, as well as the first GraphQL response, to different values.
352    ///
353    /// In supergraph and execution services, the service response contains
354    /// not just one GraphQL response but a stream of them,
355    /// in order to support features such as `@defer`.
356    ///
357    /// This method wraps a service and call `callback` when the first GraphQL response
358    /// in the stream returned by the inner service becomes available.
359    /// The callback can then modify the HTTP parts (headers, status code, etc)
360    /// or the first GraphQL response before returning them.
361    ///
362    /// Note that any subsequent GraphQL responses after the first will be forwarded unmodified.
363    /// In order to inspect or modify all GraphQL responses,
364    /// consider using [`map_response`][tower::ServiceExt::map_response]
365    /// together with [`supergraph::Response::map_stream`] instead.
366    /// (See the example in `map_stream`’s documentation.)
367    /// In that case however HTTP parts cannot be modified because they may have already been sent.
368    ///
369    /// # Example
370    ///
371    /// ```
372    /// use apollo_router::services::supergraph;
373    /// use apollo_router::layers::ServiceExt as _;
374    /// use tower::ServiceExt as _;
375    ///
376    /// struct ExamplePlugin;
377    ///
378    /// #[async_trait::async_trait]
379    /// impl apollo_router::plugin::Plugin for ExamplePlugin {
380    ///     # type Config = ();
381    ///     # async fn new(
382    ///     #     _init: apollo_router::plugin::PluginInit<Self::Config>,
383    ///     # ) -> Result<Self, tower::BoxError> {
384    ///     #     Ok(Self)
385    ///     # }
386    ///     // …
387    ///     fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
388    ///         inner
389    ///             .map_first_graphql_response(|context, mut http_parts, mut graphql_response| {
390    ///                 // Something interesting here
391    ///                 (http_parts, graphql_response)
392    ///             })
393    ///             .boxed()
394    ///     }
395    /// }
396    /// ```
397    fn map_first_graphql_response<Callback>(
398        self,
399        callback: Callback,
400    ) -> MapFirstGraphqlResponseService<Self, Callback>
401    where
402        Self: Sized + Service<Request, Response = supergraph::Response>,
403        <Self as Service<Request>>::Future: Send + 'static,
404        Callback: FnOnce(
405                Context,
406                http::response::Parts,
407                graphql::Response,
408            ) -> (http::response::Parts, graphql::Response)
409            + Clone
410            + Send
411            + 'static,
412    {
413        ServiceBuilder::new()
414            .map_first_graphql_response(callback)
415            .service(self)
416    }
417
418    /// Similar to map_future but also providing an opportunity to extract information out of the
419    /// request for use when constructing the response.
420    ///
421    /// # Arguments
422    ///
423    /// * `req_fn`: The callback to extract data from the request.
424    /// * `map_fn`: The callback to map the future.
425    ///
426    /// returns: ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>>
427    ///
428    /// # Examples
429    ///
430    /// ```rust
431    /// # use std::future::Future;
432    /// # use tower::{BoxError, ServiceBuilder, ServiceExt};
433    /// # use tower::util::BoxService;
434    /// # use tower_service::Service;
435    /// # use tracing::info_span;
436    /// # use apollo_router::Context;
437    /// # use apollo_router::services::supergraph;
438    /// # use apollo_router::layers::ServiceBuilderExt;
439    /// # use apollo_router::layers::ServiceExt as ApolloServiceExt;
440    /// # fn test(service: supergraph::BoxService) {
441    /// let _ : supergraph::BoxService = service
442    ///     .map_future_with_request_data(
443    ///         |req: &supergraph::Request| req.context.clone(),
444    ///         |ctx : Context, fut| async { fut.await }
445    ///     )
446    ///     .boxed();
447    /// # }
448    /// ```
449    fn map_future_with_request_data<RF, MF>(
450        self,
451        req_fn: RF,
452        map_fn: MF,
453    ) -> MapFutureWithRequestDataService<Self, RF, MF>
454    where
455        Self: Sized,
456        RF: Clone,
457        MF: Clone,
458    {
459        MapFutureWithRequestDataService::new(self, req_fn, map_fn)
460    }
461}
462impl<T: ?Sized, Request> ServiceExt<Request> for T where T: Service<Request> {}