Skip to main content

apollo_router/layers/
mod.rs

1//! Reusable layers
2//! Layers that are specific to one plugin should not be placed in this module.
3use std::future::Future;
4use std::ops::ControlFlow;
5
6use tower::BoxError;
7use tower::ServiceBuilder;
8use tower::buffer::BufferLayer;
9use tower::layer::util::Stack;
10use tower_service::Service;
11use tracing::Span;
12
13use self::map_first_graphql_response::MapFirstGraphqlResponseLayer;
14use self::map_first_graphql_response::MapFirstGraphqlResponseService;
15use crate::Context;
16use crate::graphql;
17use crate::layers::async_checkpoint::AsyncCheckpointLayer;
18use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer;
19use crate::layers::instrument::InstrumentLayer;
20use crate::layers::map_future_with_request_data::MapFutureWithRequestDataLayer;
21use crate::layers::map_future_with_request_data::MapFutureWithRequestDataService;
22use crate::layers::sync_checkpoint::CheckpointLayer;
23use crate::services::supergraph;
24
25pub mod async_checkpoint;
26pub mod instrument;
27pub mod map_first_graphql_response;
28pub mod map_future_with_request_data;
29pub mod sync_checkpoint;
30
31pub(crate) const DEFAULT_BUFFER_SIZE: usize = 20_000;
32
33/// Extension to the [`ServiceBuilder`] trait to make it easy to add router specific capabilities
34/// (e.g.: checkpoints) to a [`Service`].
35#[allow(clippy::type_complexity)]
36pub trait ServiceBuilderExt<L>: Sized {
37    /// Decide if processing should continue or not, and if not allow returning of a response.
38    ///
39    /// This is useful for validation functionality where you want to abort processing but return a
40    /// valid response.
41    ///
42    /// # Arguments
43    ///
44    /// * `checkpoint_fn`: Ths callback to decides if processing should continue or not.
45    ///
46    /// returns: ServiceBuilder<Stack<CheckpointLayer<S, Request>, L>>
47    ///
48    /// # Examples
49    ///
50    /// ```rust
51    /// # use std::ops::ControlFlow;
52    /// # use http::Method;
53    /// # use tower::ServiceBuilder;
54    /// # use tower_service::Service;
55    /// # use tracing::info_span;
56    /// # use apollo_router::services::supergraph;
57    /// # use apollo_router::layers::ServiceBuilderExt;
58    /// # fn test(service: supergraph::BoxService) {
59    /// let _ = ServiceBuilder::new()
60    ///     .checkpoint(|req: supergraph::Request|{
61    ///         if req.supergraph_request.method() == Method::GET {
62    ///             Ok(ControlFlow::Break(supergraph::Response::builder()
63    ///                 .data("Only get requests allowed")
64    ///                 .context(req.context)
65    ///                 .build()?))
66    ///         } else {
67    ///             Ok(ControlFlow::Continue(req))
68    ///         }
69    ///     })
70    ///     .service(service);
71    /// # }
72    /// ```
73    fn checkpoint<S, Request>(
74        self,
75        checkpoint_fn: impl Fn(
76            Request,
77        ) -> Result<
78            ControlFlow<<S as Service<Request>>::Response, Request>,
79            <S as Service<Request>>::Error,
80        > + Send
81        + Sync
82        + 'static,
83    ) -> ServiceBuilder<Stack<CheckpointLayer<S, Request>, L>>
84    where
85        S: Service<Request> + Send + 'static,
86        Request: Send + 'static,
87        S::Future: Send,
88        S::Response: Send + 'static,
89        S::Error: Into<BoxError> + Send + 'static,
90    {
91        self.layer(CheckpointLayer::new(checkpoint_fn))
92    }
93
94    /// Decide if processing should continue or not, and if not allow returning of a response.
95    /// Unlike checkpoint it is possible to perform async operations in the callback. However
96    /// this requires that the service is `Clone`. This can be achieved using `.buffered()`.
97    ///
98    /// This is useful for things like authentication where you need to make an external call to
99    /// check if a request should proceed or not.
100    ///
101    /// # Arguments
102    ///
103    /// * `async_checkpoint_fn`: The asynchronous callback to decide if processing should continue or not.
104    ///
105    /// returns: ServiceBuilder<Stack<AsyncCheckpointLayer<S, Request>, L>>
106    ///
107    /// # Examples
108    ///
109    /// ```rust
110    /// # use std::ops::ControlFlow;
111    /// use futures::FutureExt;
112    /// # use http::Method;
113    /// # use tower::ServiceBuilder;
114    /// # use tower_service::Service;
115    /// # use tracing::info_span;
116    /// # use apollo_router::services::supergraph;
117    /// # use apollo_router::layers::ServiceBuilderExt;
118    /// # fn test(service: supergraph::BoxService) {
119    /// let _ = ServiceBuilder::new()
120    ///     .checkpoint_async(|req: supergraph::Request|
121    ///         async {
122    ///             if req.supergraph_request.method() == Method::GET {
123    ///                 Ok(ControlFlow::Break(supergraph::Response::builder()
124    ///                     .data("Only get requests allowed")
125    ///                     .context(req.context)
126    ///                     .build()?))
127    ///             } else {
128    ///                 Ok(ControlFlow::Continue(req))
129    ///             }
130    ///         }
131    ///         .boxed()
132    ///     )
133    ///     .buffered()
134    ///     .service(service);
135    /// # }
136    /// ```
137    fn checkpoint_async<F, S, Fut, Request>(
138        self,
139        async_checkpoint_fn: F,
140    ) -> ServiceBuilder<Stack<AsyncCheckpointLayer<S, Fut, Request>, L>>
141    where
142        S: Service<Request, Error = BoxError> + Clone + Send + 'static,
143        Fut: Future<
144            Output = Result<ControlFlow<<S as Service<Request>>::Response, Request>, BoxError>,
145        >,
146        F: Fn(Request) -> Fut + Send + Sync + 'static,
147    {
148        self.layer(AsyncCheckpointLayer::new(async_checkpoint_fn))
149    }
150
151    /// Decide if processing should continue or not, and if not allow returning of a response.
152    /// Unlike checkpoint it is possible to perform async operations in the callback. Unlike
153    /// checkpoint_async, this does not require that the service is `Clone` and avoids the
154    /// requiremnent to buffer services.
155    ///
156    /// This is useful for things like authentication where you need to make an external call to
157    /// check if a request should proceed or not.
158    ///
159    /// # Arguments
160    ///
161    /// * `async_checkpoint_fn`: The asynchronous callback to decide if processing should continue or not.
162    ///
163    /// returns: ServiceBuilder<Stack<OneShotAsyncCheckpointLayer<S, Request>, L>>
164    ///
165    /// # Examples
166    ///
167    /// ```rust
168    /// # use std::ops::ControlFlow;
169    /// use futures::FutureExt;
170    /// # use http::Method;
171    /// # use tower::ServiceBuilder;
172    /// # use tower_service::Service;
173    /// # use tracing::info_span;
174    /// # use apollo_router::services::supergraph;
175    /// # use apollo_router::layers::ServiceBuilderExt;
176    /// # fn test(service: supergraph::BoxService) {
177    /// let _ = ServiceBuilder::new()
178    ///     .oneshot_checkpoint_async(|req: supergraph::Request|
179    ///         async {
180    ///             if req.supergraph_request.method() == Method::GET {
181    ///                 Ok(ControlFlow::Break(supergraph::Response::builder()
182    ///                     .data("Only get requests allowed")
183    ///                     .context(req.context)
184    ///                     .build()?))
185    ///             } else {
186    ///                 Ok(ControlFlow::Continue(req))
187    ///             }
188    ///         }
189    ///         .boxed()
190    ///     )
191    ///     .service(service);
192    /// # }
193    /// ```
194    fn oneshot_checkpoint_async<F, S, Fut, Request>(
195        self,
196        async_checkpoint_fn: F,
197    ) -> ServiceBuilder<Stack<OneShotAsyncCheckpointLayer<S, Fut, Request>, L>>
198    where
199        S: Service<Request, Error = BoxError> + Send + 'static,
200        Fut: Future<
201            Output = Result<ControlFlow<<S as Service<Request>>::Response, Request>, BoxError>,
202        >,
203        F: Fn(Request) -> Fut + Send + Sync + 'static,
204    {
205        self.layer(OneShotAsyncCheckpointLayer::new(async_checkpoint_fn))
206    }
207
208    /// Adds a buffer to the service stack with a default size.
209    ///
210    /// This is useful for making services `Clone` and `Send`
211    ///
212    /// # Examples
213    ///
214    /// ```rust
215    /// # use tower::ServiceBuilder;
216    /// # use tower_service::Service;
217    /// # use tracing::info_span;
218    /// # use apollo_router::services::supergraph;
219    /// # use apollo_router::layers::ServiceBuilderExt;
220    /// # fn test(service: supergraph::BoxService) {
221    /// let _ = ServiceBuilder::new()
222    ///             .buffered()
223    ///             .service(service);
224    /// # }
225    /// ```
226    fn buffered<Request>(self) -> ServiceBuilder<Stack<BufferLayer<Request>, L>>;
227
228    /// Place a span around the request.
229    ///
230    /// This is useful for adding a new span with custom attributes to tracing.
231    ///
232    /// Note that it is not possible to add extra attributes to existing spans. However, you can add
233    /// empty placeholder attributes to your span if you want to supply those attributes later.
234    ///
235    /// # Arguments
236    ///
237    /// * `span_fn`: The callback to create the span given the request.
238    ///
239    /// returns: ServiceBuilder<Stack<InstrumentLayer<F, Request>, L>>
240    ///
241    /// # Examples
242    ///
243    /// ```rust
244    /// # use tower::ServiceBuilder;
245    /// # use tower_service::Service;
246    /// # use tracing::info_span;
247    /// # use apollo_router::services::supergraph;
248    /// # use apollo_router::layers::ServiceBuilderExt;
249    /// # fn test(service: supergraph::BoxService) {
250    /// let instrumented = ServiceBuilder::new()
251    ///             .instrument(|_request| info_span!("query_planning"))
252    ///             .service(service);
253    /// # }
254    /// ```
255    fn instrument<F, Request>(
256        self,
257        span_fn: F,
258    ) -> ServiceBuilder<Stack<InstrumentLayer<F, Request>, L>>
259    where
260        F: Fn(&Request) -> Span,
261    {
262        self.layer(InstrumentLayer::new(span_fn))
263    }
264
265    /// Maps HTTP parts, as well as the first GraphQL response, to different values.
266    ///
267    /// In supergraph and execution services, the service response contains
268    /// not just one GraphQL response but a stream of them,
269    /// in order to support features such as `@defer`.
270    ///
271    /// This method wraps a service and calls a `callback` when the first GraphQL response
272    /// in the stream returned by the inner service becomes available.
273    /// The callback can then access the HTTP parts (headers, status code, etc)
274    /// or the first GraphQL response before returning them.
275    ///
276    /// Note that any subsequent GraphQL responses after the first will be forwarded unmodified.
277    /// In order to inspect or modify all GraphQL responses,
278    /// consider using [`map_response`][tower::ServiceExt::map_response]
279    /// together with [`supergraph::Response::map_stream`] instead.
280    /// (See the example in `map_stream`’s documentation.)
281    /// In that case however HTTP parts cannot be modified because they may have already been sent.
282    ///
283    /// # Example
284    ///
285    /// ```
286    /// use apollo_router::services::supergraph;
287    /// use apollo_router::layers::ServiceBuilderExt as _;
288    /// use tower::ServiceExt as _;
289    ///
290    /// struct ExamplePlugin;
291    ///
292    /// #[async_trait::async_trait]
293    /// impl apollo_router::plugin::Plugin for ExamplePlugin {
294    ///     # type Config = ();
295    ///     # async fn new(
296    ///     #     _init: apollo_router::plugin::PluginInit<Self::Config>,
297    ///     # ) -> Result<Self, tower::BoxError> {
298    ///     #     Ok(Self)
299    ///     # }
300    ///     // …
301    ///     fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
302    ///         tower::ServiceBuilder::new()
303    ///             .map_first_graphql_response(|context, mut http_parts, mut graphql_response| {
304    ///                 // Something interesting here
305    ///                 (http_parts, graphql_response)
306    ///             })
307    ///             .service(inner)
308    ///             .boxed()
309    ///     }
310    /// }
311    /// ```
312    fn map_first_graphql_response<Callback>(
313        self,
314        callback: Callback,
315    ) -> ServiceBuilder<Stack<MapFirstGraphqlResponseLayer<Callback>, L>>
316    where
317        Callback: FnOnce(
318                Context,
319                http::response::Parts,
320                graphql::Response,
321            ) -> (http::response::Parts, graphql::Response)
322            + Clone
323            + Send
324            + 'static,
325    {
326        self.layer(MapFirstGraphqlResponseLayer { callback })
327    }
328
329    /// Similar to map_future but also providing an opportunity to extract information out of the
330    /// request for use when constructing the response.
331    ///
332    /// # Arguments
333    ///
334    /// * `req_fn`: The callback to extract data from the request.
335    /// * `map_fn`: The callback to map the future.
336    ///
337    /// returns: ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>>
338    ///
339    /// # Examples
340    ///
341    /// ```rust
342    /// # use std::future::Future;
343    /// # use tower::{BoxError, ServiceBuilder, ServiceExt};
344    /// # use tower::util::BoxService;
345    /// # use tower_service::Service;
346    /// # use tracing::info_span;
347    /// # use apollo_router::Context;
348    /// # use apollo_router::services::supergraph;
349    /// # use apollo_router::layers::ServiceBuilderExt;
350    /// # fn test(service: supergraph::BoxService) {
351    /// let _ : supergraph::BoxService = ServiceBuilder::new()
352    ///     .map_future_with_request_data(
353    ///         |req: &supergraph::Request| req.context.clone(),
354    ///         |ctx : Context, fut| async { fut.await })
355    ///     .service(service)
356    ///     .boxed();
357    /// # }
358    /// ```
359    fn map_future_with_request_data<RF, MF>(
360        self,
361        req_fn: RF,
362        map_fn: MF,
363    ) -> ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>> {
364        self.layer(MapFutureWithRequestDataLayer::new(req_fn, map_fn))
365    }
366
367    /// Utility function to allow us to specify default methods on this trait rather than duplicating in the impl.
368    ///
369    /// # Arguments
370    ///
371    /// * `layer`: The layer to add to the service stack.
372    ///
373    /// returns: ServiceBuilder<Stack<T, L>>
374    ///
375    fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>>;
376}
377
378#[allow(clippy::type_complexity)]
379impl<L> ServiceBuilderExt<L> for ServiceBuilder<L> {
380    fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>> {
381        ServiceBuilder::layer(self, layer)
382    }
383
384    fn buffered<Request>(self) -> ServiceBuilder<Stack<BufferLayer<Request>, L>> {
385        self.buffer(DEFAULT_BUFFER_SIZE)
386    }
387}
388
389/// Extension trait for [`Service`].
390///
391/// Importing both this trait and [`tower::ServiceExt`] could lead a name collision error.
392/// To work around that, use `as _` syntax to make a trait’s methods available in a module
393/// without assigning it a name in that module’s namespace.
394///
395/// ```
396/// use apollo_router::layers::ServiceExt as _;
397/// use tower::ServiceExt as _;
398/// ```
399pub trait ServiceExt<Request>: Service<Request> {
400    /// Maps HTTP parts, as well as the first GraphQL response, to different values.
401    ///
402    /// In supergraph and execution services, the service response contains
403    /// not just one GraphQL response but a stream of them,
404    /// in order to support features such as `@defer`.
405    ///
406    /// This method wraps a service and call `callback` when the first GraphQL response
407    /// in the stream returned by the inner service becomes available.
408    /// The callback can then modify the HTTP parts (headers, status code, etc)
409    /// or the first GraphQL response before returning them.
410    ///
411    /// Note that any subsequent GraphQL responses after the first will be forwarded unmodified.
412    /// In order to inspect or modify all GraphQL responses,
413    /// consider using [`map_response`][tower::ServiceExt::map_response]
414    /// together with [`supergraph::Response::map_stream`] instead.
415    /// (See the example in `map_stream`’s documentation.)
416    /// In that case however HTTP parts cannot be modified because they may have already been sent.
417    ///
418    /// # Example
419    ///
420    /// ```
421    /// use apollo_router::services::supergraph;
422    /// use apollo_router::layers::ServiceExt as _;
423    /// use tower::ServiceExt as _;
424    ///
425    /// struct ExamplePlugin;
426    ///
427    /// #[async_trait::async_trait]
428    /// impl apollo_router::plugin::Plugin for ExamplePlugin {
429    ///     # type Config = ();
430    ///     # async fn new(
431    ///     #     _init: apollo_router::plugin::PluginInit<Self::Config>,
432    ///     # ) -> Result<Self, tower::BoxError> {
433    ///     #     Ok(Self)
434    ///     # }
435    ///     // …
436    ///     fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
437    ///         inner
438    ///             .map_first_graphql_response(|context, mut http_parts, mut graphql_response| {
439    ///                 // Something interesting here
440    ///                 (http_parts, graphql_response)
441    ///             })
442    ///             .boxed()
443    ///     }
444    /// }
445    /// ```
446    fn map_first_graphql_response<Callback>(
447        self,
448        callback: Callback,
449    ) -> MapFirstGraphqlResponseService<Self, Callback>
450    where
451        Self: Sized + Service<Request, Response = supergraph::Response>,
452        <Self as Service<Request>>::Future: Send + 'static,
453        Callback: FnOnce(
454                Context,
455                http::response::Parts,
456                graphql::Response,
457            ) -> (http::response::Parts, graphql::Response)
458            + Clone
459            + Send
460            + 'static,
461    {
462        ServiceBuilder::new()
463            .map_first_graphql_response(callback)
464            .service(self)
465    }
466
467    /// Similar to map_future but also providing an opportunity to extract information out of the
468    /// request for use when constructing the response.
469    ///
470    /// # Arguments
471    ///
472    /// * `req_fn`: The callback to extract data from the request.
473    /// * `map_fn`: The callback to map the future.
474    ///
475    /// returns: ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>>
476    ///
477    /// # Examples
478    ///
479    /// ```rust
480    /// # use std::future::Future;
481    /// # use tower::{BoxError, ServiceBuilder, ServiceExt};
482    /// # use tower::util::BoxService;
483    /// # use tower_service::Service;
484    /// # use tracing::info_span;
485    /// # use apollo_router::Context;
486    /// # use apollo_router::services::supergraph;
487    /// # use apollo_router::layers::ServiceBuilderExt;
488    /// # use apollo_router::layers::ServiceExt as ApolloServiceExt;
489    /// # fn test(service: supergraph::BoxService) {
490    /// let _ : supergraph::BoxService = service
491    ///     .map_future_with_request_data(
492    ///         |req: &supergraph::Request| req.context.clone(),
493    ///         |ctx : Context, fut| async { fut.await }
494    ///     )
495    ///     .boxed();
496    /// # }
497    /// ```
498    fn map_future_with_request_data<RF, MF>(
499        self,
500        req_fn: RF,
501        map_fn: MF,
502    ) -> MapFutureWithRequestDataService<Self, RF, MF>
503    where
504        Self: Sized,
505        RF: Clone,
506        MF: Clone,
507    {
508        MapFutureWithRequestDataService::new(self, req_fn, map_fn)
509    }
510}
511impl<T: ?Sized, Request> ServiceExt<Request> for T where T: Service<Request> {}