apollo_router/layers/
mod.rs

1//! Reusable layers
2//! Layers that are specific to one plugin should not be placed in this module.
3use std::future::Future;
4use std::ops::ControlFlow;
5
6use tower::BoxError;
7use tower::ServiceBuilder;
8use tower::buffer::BufferLayer;
9use tower::layer::util::Stack;
10use tower_service::Service;
11use tracing::Span;
12
13use self::map_first_graphql_response::MapFirstGraphqlResponseLayer;
14use self::map_first_graphql_response::MapFirstGraphqlResponseService;
15use crate::Context;
16use crate::graphql;
17use crate::layers::async_checkpoint::AsyncCheckpointLayer;
18use crate::layers::instrument::InstrumentLayer;
19use crate::layers::map_future_with_request_data::MapFutureWithRequestDataLayer;
20use crate::layers::map_future_with_request_data::MapFutureWithRequestDataService;
21use crate::layers::sync_checkpoint::CheckpointLayer;
22use crate::services::supergraph;
23
24pub mod async_checkpoint;
25pub mod instrument;
26pub mod map_first_graphql_response;
27pub mod map_future_with_request_data;
28pub mod sync_checkpoint;
29
30// Note: We use Buffer in many places throughout the router. 50_000 represents
31// the "maximal number of requests that can be queued for the buffered
32// service before backpressure is applied to callers". We set this to be
33// so high, 50_000, because we anticipate that many users will want to
34//
35// Think of this as a backstop for when there are no other backpressure
36// enforcing limits configured in a router. In future we may tweak this
37// value higher or lower or expose it as a configurable.
38pub(crate) const DEFAULT_BUFFER_SIZE: usize = 50_000;
39
40/// Extension to the [`ServiceBuilder`] trait to make it easy to add router specific capabilities
41/// (e.g.: checkpoints) to a [`Service`].
42#[allow(clippy::type_complexity)]
43pub trait ServiceBuilderExt<L>: Sized {
44    /// Decide if processing should continue or not, and if not allow returning of a response.
45    ///
46    /// This is useful for validation functionality where you want to abort processing but return a
47    /// valid response.
48    ///
49    /// # Arguments
50    ///
51    /// * `checkpoint_fn`: Ths callback to decides if processing should continue or not.
52    ///
53    /// returns: ServiceBuilder<Stack<CheckpointLayer<S, Request>, L>>
54    ///
55    /// # Examples
56    ///
57    /// ```rust
58    /// # use std::ops::ControlFlow;
59    /// # use http::Method;
60    /// # use tower::ServiceBuilder;
61    /// # use tower_service::Service;
62    /// # use tracing::info_span;
63    /// # use apollo_router::services::supergraph;
64    /// # use apollo_router::layers::ServiceBuilderExt;
65    /// # fn test(service: supergraph::BoxService) {
66    /// let _ = ServiceBuilder::new()
67    ///     .checkpoint(|req: supergraph::Request|{
68    ///         if req.supergraph_request.method() == Method::GET {
69    ///             Ok(ControlFlow::Break(supergraph::Response::builder()
70    ///                 .data("Only get requests allowed")
71    ///                 .context(req.context)
72    ///                 .build()?))
73    ///         } else {
74    ///             Ok(ControlFlow::Continue(req))
75    ///         }
76    ///     })
77    ///     .service(service);
78    /// # }
79    /// ```
80    fn checkpoint<S, Request>(
81        self,
82        checkpoint_fn: impl Fn(
83            Request,
84        ) -> Result<
85            ControlFlow<<S as Service<Request>>::Response, Request>,
86            <S as Service<Request>>::Error,
87        > + Send
88        + Sync
89        + 'static,
90    ) -> ServiceBuilder<Stack<CheckpointLayer<S, Request>, L>>
91    where
92        S: Service<Request> + Send + 'static,
93        Request: Send + 'static,
94        S::Future: Send,
95        S::Response: Send + 'static,
96        S::Error: Into<BoxError> + Send + 'static,
97    {
98        self.layer(CheckpointLayer::new(checkpoint_fn))
99    }
100
101    /// Decide if processing should continue or not, and if not allow returning of a response.
102    /// Unlike checkpoint it is possible to perform async operations in the callback. However
103    /// this requires that the service is `Clone`. This can be achieved using `.buffered()`.
104    ///
105    /// This is useful for things like authentication where you need to make an external call to
106    /// check if a request should proceed or not.
107    ///
108    /// # Arguments
109    ///
110    /// * `async_checkpoint_fn`: The asynchronous callback to decide if processing should continue or not.
111    ///
112    /// returns: ServiceBuilder<Stack<AsyncCheckpointLayer<S, Request>, L>>
113    ///
114    /// # Examples
115    ///
116    /// ```rust
117    /// # use std::ops::ControlFlow;
118    /// use futures::FutureExt;
119    /// # use http::Method;
120    /// # use tower::ServiceBuilder;
121    /// # use tower_service::Service;
122    /// # use tracing::info_span;
123    /// # use apollo_router::services::supergraph;
124    /// # use apollo_router::layers::ServiceBuilderExt;
125    /// # fn test(service: supergraph::BoxService) {
126    /// let _ = ServiceBuilder::new()
127    ///     .checkpoint_async(|req: supergraph::Request|
128    ///         async {
129    ///             if req.supergraph_request.method() == Method::GET {
130    ///                 Ok(ControlFlow::Break(supergraph::Response::builder()
131    ///                     .data("Only get requests allowed")
132    ///                     .context(req.context)
133    ///                     .build()?))
134    ///             } else {
135    ///                 Ok(ControlFlow::Continue(req))
136    ///             }
137    ///         }
138    ///         .boxed()
139    ///     )
140    ///     .buffered()
141    ///     .service(service);
142    /// # }
143    /// ```
144    fn checkpoint_async<F, S, Fut, Request>(
145        self,
146        async_checkpoint_fn: F,
147    ) -> ServiceBuilder<Stack<AsyncCheckpointLayer<S, Fut, Request>, L>>
148    where
149        S: Service<Request, Error = BoxError> + Clone + Send + 'static,
150        Fut: Future<
151            Output = Result<ControlFlow<<S as Service<Request>>::Response, Request>, BoxError>,
152        >,
153        F: Fn(Request) -> Fut + Send + Sync + 'static,
154    {
155        self.layer(AsyncCheckpointLayer::new(async_checkpoint_fn))
156    }
157
158    /// Adds a buffer to the service stack with a default size.
159    ///
160    /// This is useful for making services `Clone` and `Send`
161    ///
162    /// # Examples
163    ///
164    /// ```rust
165    /// # use tower::ServiceBuilder;
166    /// # use tower_service::Service;
167    /// # use tracing::info_span;
168    /// # use apollo_router::services::supergraph;
169    /// # use apollo_router::layers::ServiceBuilderExt;
170    /// # fn test(service: supergraph::BoxService) {
171    /// let _ = ServiceBuilder::new()
172    ///             .buffered()
173    ///             .service(service);
174    /// # }
175    /// ```
176    fn buffered<Request>(self) -> ServiceBuilder<Stack<BufferLayer<Request>, L>>;
177
178    /// Place a span around the request.
179    ///
180    /// This is useful for adding a new span with custom attributes to tracing.
181    ///
182    /// Note that it is not possible to add extra attributes to existing spans. However, you can add
183    /// empty placeholder attributes to your span if you want to supply those attributes later.
184    ///
185    /// # Arguments
186    ///
187    /// * `span_fn`: The callback to create the span given the request.
188    ///
189    /// returns: ServiceBuilder<Stack<InstrumentLayer<F, Request>, L>>
190    ///
191    /// # Examples
192    ///
193    /// ```rust
194    /// # use tower::ServiceBuilder;
195    /// # use tower_service::Service;
196    /// # use tracing::info_span;
197    /// # use apollo_router::services::supergraph;
198    /// # use apollo_router::layers::ServiceBuilderExt;
199    /// # fn test(service: supergraph::BoxService) {
200    /// let instrumented = ServiceBuilder::new()
201    ///             .instrument(|_request| info_span!("query_planning"))
202    ///             .service(service);
203    /// # }
204    /// ```
205    fn instrument<F, Request>(
206        self,
207        span_fn: F,
208    ) -> ServiceBuilder<Stack<InstrumentLayer<F, Request>, L>>
209    where
210        F: Fn(&Request) -> Span,
211    {
212        self.layer(InstrumentLayer::new(span_fn))
213    }
214
215    /// Maps HTTP parts, as well as the first GraphQL response, to different values.
216    ///
217    /// In supergraph and execution services, the service response contains
218    /// not just one GraphQL response but a stream of them,
219    /// in order to support features such as `@defer`.
220    ///
221    /// This method wraps a service and calls a `callback` when the first GraphQL response
222    /// in the stream returned by the inner service becomes available.
223    /// The callback can then access the HTTP parts (headers, status code, etc)
224    /// or the first GraphQL response before returning them.
225    ///
226    /// Note that any subsequent GraphQL responses after the first will be forwarded unmodified.
227    /// In order to inspect or modify all GraphQL responses,
228    /// consider using [`map_response`][tower::ServiceExt::map_response]
229    /// together with [`supergraph::Response::map_stream`] instead.
230    /// (See the example in `map_stream`’s documentation.)
231    /// In that case however HTTP parts cannot be modified because they may have already been sent.
232    ///
233    /// # Example
234    ///
235    /// ```
236    /// use apollo_router::services::supergraph;
237    /// use apollo_router::layers::ServiceBuilderExt as _;
238    /// use tower::ServiceExt as _;
239    ///
240    /// struct ExamplePlugin;
241    ///
242    /// #[async_trait::async_trait]
243    /// impl apollo_router::plugin::Plugin for ExamplePlugin {
244    ///     # type Config = ();
245    ///     # async fn new(
246    ///     #     _init: apollo_router::plugin::PluginInit<Self::Config>,
247    ///     # ) -> Result<Self, tower::BoxError> {
248    ///     #     Ok(Self)
249    ///     # }
250    ///     // …
251    ///     fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
252    ///         tower::ServiceBuilder::new()
253    ///             .map_first_graphql_response(|context, mut http_parts, mut graphql_response| {
254    ///                 // Something interesting here
255    ///                 (http_parts, graphql_response)
256    ///             })
257    ///             .service(inner)
258    ///             .boxed()
259    ///     }
260    /// }
261    /// ```
262    fn map_first_graphql_response<Callback>(
263        self,
264        callback: Callback,
265    ) -> ServiceBuilder<Stack<MapFirstGraphqlResponseLayer<Callback>, L>>
266    where
267        Callback: FnOnce(
268                Context,
269                http::response::Parts,
270                graphql::Response,
271            ) -> (http::response::Parts, graphql::Response)
272            + Clone
273            + Send
274            + 'static,
275    {
276        self.layer(MapFirstGraphqlResponseLayer { callback })
277    }
278
279    /// Similar to map_future but also providing an opportunity to extract information out of the
280    /// request for use when constructing the response.
281    ///
282    /// # Arguments
283    ///
284    /// * `req_fn`: The callback to extract data from the request.
285    /// * `map_fn`: The callback to map the future.
286    ///
287    /// returns: ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>>
288    ///
289    /// # Examples
290    ///
291    /// ```rust
292    /// # use std::future::Future;
293    /// # use tower::{BoxError, ServiceBuilder, ServiceExt};
294    /// # use tower::util::BoxService;
295    /// # use tower_service::Service;
296    /// # use tracing::info_span;
297    /// # use apollo_router::Context;
298    /// # use apollo_router::services::supergraph;
299    /// # use apollo_router::layers::ServiceBuilderExt;
300    /// # fn test(service: supergraph::BoxService) {
301    /// let _ : supergraph::BoxService = ServiceBuilder::new()
302    ///     .map_future_with_request_data(
303    ///         |req: &supergraph::Request| req.context.clone(),
304    ///         |ctx : Context, fut| async { fut.await })
305    ///     .service(service)
306    ///     .boxed();
307    /// # }
308    /// ```
309    fn map_future_with_request_data<RF, MF>(
310        self,
311        req_fn: RF,
312        map_fn: MF,
313    ) -> ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>> {
314        self.layer(MapFutureWithRequestDataLayer::new(req_fn, map_fn))
315    }
316
317    /// Utility function to allow us to specify default methods on this trait rather than duplicating in the impl.
318    ///
319    /// # Arguments
320    ///
321    /// * `layer`: The layer to add to the service stack.
322    ///
323    /// returns: ServiceBuilder<Stack<T, L>>
324    ///
325    fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>>;
326}
327
328#[allow(clippy::type_complexity)]
329impl<L> ServiceBuilderExt<L> for ServiceBuilder<L> {
330    fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>> {
331        ServiceBuilder::layer(self, layer)
332    }
333
334    fn buffered<Request>(self) -> ServiceBuilder<Stack<BufferLayer<Request>, L>> {
335        self.buffer(DEFAULT_BUFFER_SIZE)
336    }
337}
338
339/// Extension trait for [`Service`].
340///
341/// Importing both this trait and [`tower::ServiceExt`] could lead a name collision error.
342/// To work around that, use `as _` syntax to make a trait’s methods available in a module
343/// without assigning it a name in that module’s namespace.
344///
345/// ```
346/// use apollo_router::layers::ServiceExt as _;
347/// use tower::ServiceExt as _;
348/// ```
349pub trait ServiceExt<Request>: Service<Request> {
350    /// Maps HTTP parts, as well as the first GraphQL response, to different values.
351    ///
352    /// In supergraph and execution services, the service response contains
353    /// not just one GraphQL response but a stream of them,
354    /// in order to support features such as `@defer`.
355    ///
356    /// This method wraps a service and call `callback` when the first GraphQL response
357    /// in the stream returned by the inner service becomes available.
358    /// The callback can then modify the HTTP parts (headers, status code, etc)
359    /// or the first GraphQL response before returning them.
360    ///
361    /// Note that any subsequent GraphQL responses after the first will be forwarded unmodified.
362    /// In order to inspect or modify all GraphQL responses,
363    /// consider using [`map_response`][tower::ServiceExt::map_response]
364    /// together with [`supergraph::Response::map_stream`] instead.
365    /// (See the example in `map_stream`’s documentation.)
366    /// In that case however HTTP parts cannot be modified because they may have already been sent.
367    ///
368    /// # Example
369    ///
370    /// ```
371    /// use apollo_router::services::supergraph;
372    /// use apollo_router::layers::ServiceExt as _;
373    /// use tower::ServiceExt as _;
374    ///
375    /// struct ExamplePlugin;
376    ///
377    /// #[async_trait::async_trait]
378    /// impl apollo_router::plugin::Plugin for ExamplePlugin {
379    ///     # type Config = ();
380    ///     # async fn new(
381    ///     #     _init: apollo_router::plugin::PluginInit<Self::Config>,
382    ///     # ) -> Result<Self, tower::BoxError> {
383    ///     #     Ok(Self)
384    ///     # }
385    ///     // …
386    ///     fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
387    ///         inner
388    ///             .map_first_graphql_response(|context, mut http_parts, mut graphql_response| {
389    ///                 // Something interesting here
390    ///                 (http_parts, graphql_response)
391    ///             })
392    ///             .boxed()
393    ///     }
394    /// }
395    /// ```
396    fn map_first_graphql_response<Callback>(
397        self,
398        callback: Callback,
399    ) -> MapFirstGraphqlResponseService<Self, Callback>
400    where
401        Self: Sized + Service<Request, Response = supergraph::Response>,
402        <Self as Service<Request>>::Future: Send + 'static,
403        Callback: FnOnce(
404                Context,
405                http::response::Parts,
406                graphql::Response,
407            ) -> (http::response::Parts, graphql::Response)
408            + Clone
409            + Send
410            + 'static,
411    {
412        ServiceBuilder::new()
413            .map_first_graphql_response(callback)
414            .service(self)
415    }
416
417    /// Similar to map_future but also providing an opportunity to extract information out of the
418    /// request for use when constructing the response.
419    ///
420    /// # Arguments
421    ///
422    /// * `req_fn`: The callback to extract data from the request.
423    /// * `map_fn`: The callback to map the future.
424    ///
425    /// returns: ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>>
426    ///
427    /// # Examples
428    ///
429    /// ```rust
430    /// # use std::future::Future;
431    /// # use tower::{BoxError, ServiceBuilder, ServiceExt};
432    /// # use tower::util::BoxService;
433    /// # use tower_service::Service;
434    /// # use tracing::info_span;
435    /// # use apollo_router::Context;
436    /// # use apollo_router::services::supergraph;
437    /// # use apollo_router::layers::ServiceBuilderExt;
438    /// # use apollo_router::layers::ServiceExt as ApolloServiceExt;
439    /// # fn test(service: supergraph::BoxService) {
440    /// let _ : supergraph::BoxService = service
441    ///     .map_future_with_request_data(
442    ///         |req: &supergraph::Request| req.context.clone(),
443    ///         |ctx : Context, fut| async { fut.await }
444    ///     )
445    ///     .boxed();
446    /// # }
447    /// ```
448    fn map_future_with_request_data<RF, MF>(
449        self,
450        req_fn: RF,
451        map_fn: MF,
452    ) -> MapFutureWithRequestDataService<Self, RF, MF>
453    where
454        Self: Sized,
455        RF: Clone,
456        MF: Clone,
457    {
458        MapFutureWithRequestDataService::new(self, req_fn, map_fn)
459    }
460}
461impl<T: ?Sized, Request> ServiceExt<Request> for T where T: Service<Request> {}