apollo_router/layers/mod.rs
1//! Reusable layers
2//! Layers that are specific to one plugin should not be placed in this module.
3use std::future::Future;
4use std::ops::ControlFlow;
5
6use tower::BoxError;
7use tower::ServiceBuilder;
8use tower::buffer::BufferLayer;
9use tower::layer::util::Stack;
10use tower_service::Service;
11use tracing::Span;
12
13use self::map_first_graphql_response::MapFirstGraphqlResponseLayer;
14use self::map_first_graphql_response::MapFirstGraphqlResponseService;
15use crate::Context;
16use crate::graphql;
17use crate::layers::async_checkpoint::AsyncCheckpointLayer;
18use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer;
19use crate::layers::instrument::InstrumentLayer;
20use crate::layers::map_future_with_request_data::MapFutureWithRequestDataLayer;
21use crate::layers::map_future_with_request_data::MapFutureWithRequestDataService;
22use crate::layers::sync_checkpoint::CheckpointLayer;
23use crate::services::supergraph;
24
25pub mod async_checkpoint;
26pub mod instrument;
27pub mod map_first_graphql_response;
28pub mod map_future_with_request_data;
29pub mod sync_checkpoint;
30
31pub(crate) const DEFAULT_BUFFER_SIZE: usize = 20_000;
32
33/// Extension to the [`ServiceBuilder`] trait to make it easy to add router specific capabilities
34/// (e.g.: checkpoints) to a [`Service`].
35#[allow(clippy::type_complexity)]
36pub trait ServiceBuilderExt<L>: Sized {
37 /// Decide if processing should continue or not, and if not allow returning of a response.
38 ///
39 /// This is useful for validation functionality where you want to abort processing but return a
40 /// valid response.
41 ///
42 /// # Arguments
43 ///
44 /// * `checkpoint_fn`: Ths callback to decides if processing should continue or not.
45 ///
46 /// returns: ServiceBuilder<Stack<CheckpointLayer<S, Request>, L>>
47 ///
48 /// # Examples
49 ///
50 /// ```rust
51 /// # use std::ops::ControlFlow;
52 /// # use http::Method;
53 /// # use tower::ServiceBuilder;
54 /// # use tower_service::Service;
55 /// # use tracing::info_span;
56 /// # use apollo_router::services::supergraph;
57 /// # use apollo_router::layers::ServiceBuilderExt;
58 /// # fn test(service: supergraph::BoxService) {
59 /// let _ = ServiceBuilder::new()
60 /// .checkpoint(|req: supergraph::Request|{
61 /// if req.supergraph_request.method() == Method::GET {
62 /// Ok(ControlFlow::Break(supergraph::Response::builder()
63 /// .data("Only get requests allowed")
64 /// .context(req.context)
65 /// .build()?))
66 /// } else {
67 /// Ok(ControlFlow::Continue(req))
68 /// }
69 /// })
70 /// .service(service);
71 /// # }
72 /// ```
73 fn checkpoint<S, Request>(
74 self,
75 checkpoint_fn: impl Fn(
76 Request,
77 ) -> Result<
78 ControlFlow<<S as Service<Request>>::Response, Request>,
79 <S as Service<Request>>::Error,
80 > + Send
81 + Sync
82 + 'static,
83 ) -> ServiceBuilder<Stack<CheckpointLayer<S, Request>, L>>
84 where
85 S: Service<Request> + Send + 'static,
86 Request: Send + 'static,
87 S::Future: Send,
88 S::Response: Send + 'static,
89 S::Error: Into<BoxError> + Send + 'static,
90 {
91 self.layer(CheckpointLayer::new(checkpoint_fn))
92 }
93
94 /// Decide if processing should continue or not, and if not allow returning of a response.
95 /// Unlike checkpoint it is possible to perform async operations in the callback. However
96 /// this requires that the service is `Clone`. This can be achieved using `.buffered()`.
97 ///
98 /// This is useful for things like authentication where you need to make an external call to
99 /// check if a request should proceed or not.
100 ///
101 /// # Arguments
102 ///
103 /// * `async_checkpoint_fn`: The asynchronous callback to decide if processing should continue or not.
104 ///
105 /// returns: ServiceBuilder<Stack<AsyncCheckpointLayer<S, Request>, L>>
106 ///
107 /// # Examples
108 ///
109 /// ```rust
110 /// # use std::ops::ControlFlow;
111 /// use futures::FutureExt;
112 /// # use http::Method;
113 /// # use tower::ServiceBuilder;
114 /// # use tower_service::Service;
115 /// # use tracing::info_span;
116 /// # use apollo_router::services::supergraph;
117 /// # use apollo_router::layers::ServiceBuilderExt;
118 /// # fn test(service: supergraph::BoxService) {
119 /// let _ = ServiceBuilder::new()
120 /// .checkpoint_async(|req: supergraph::Request|
121 /// async {
122 /// if req.supergraph_request.method() == Method::GET {
123 /// Ok(ControlFlow::Break(supergraph::Response::builder()
124 /// .data("Only get requests allowed")
125 /// .context(req.context)
126 /// .build()?))
127 /// } else {
128 /// Ok(ControlFlow::Continue(req))
129 /// }
130 /// }
131 /// .boxed()
132 /// )
133 /// .buffered()
134 /// .service(service);
135 /// # }
136 /// ```
137 fn checkpoint_async<F, S, Fut, Request>(
138 self,
139 async_checkpoint_fn: F,
140 ) -> ServiceBuilder<Stack<AsyncCheckpointLayer<S, Fut, Request>, L>>
141 where
142 S: Service<Request, Error = BoxError> + Clone + Send + 'static,
143 Fut: Future<
144 Output = Result<ControlFlow<<S as Service<Request>>::Response, Request>, BoxError>,
145 >,
146 F: Fn(Request) -> Fut + Send + Sync + 'static,
147 {
148 self.layer(AsyncCheckpointLayer::new(async_checkpoint_fn))
149 }
150
151 /// Decide if processing should continue or not, and if not allow returning of a response.
152 /// Unlike checkpoint it is possible to perform async operations in the callback. Unlike
153 /// checkpoint_async, this does not require that the service is `Clone` and avoids the
154 /// requiremnent to buffer services.
155 ///
156 /// This is useful for things like authentication where you need to make an external call to
157 /// check if a request should proceed or not.
158 ///
159 /// # Arguments
160 ///
161 /// * `async_checkpoint_fn`: The asynchronous callback to decide if processing should continue or not.
162 ///
163 /// returns: ServiceBuilder<Stack<OneShotAsyncCheckpointLayer<S, Request>, L>>
164 ///
165 /// # Examples
166 ///
167 /// ```rust
168 /// # use std::ops::ControlFlow;
169 /// use futures::FutureExt;
170 /// # use http::Method;
171 /// # use tower::ServiceBuilder;
172 /// # use tower_service::Service;
173 /// # use tracing::info_span;
174 /// # use apollo_router::services::supergraph;
175 /// # use apollo_router::layers::ServiceBuilderExt;
176 /// # fn test(service: supergraph::BoxService) {
177 /// let _ = ServiceBuilder::new()
178 /// .oneshot_checkpoint_async(|req: supergraph::Request|
179 /// async {
180 /// if req.supergraph_request.method() == Method::GET {
181 /// Ok(ControlFlow::Break(supergraph::Response::builder()
182 /// .data("Only get requests allowed")
183 /// .context(req.context)
184 /// .build()?))
185 /// } else {
186 /// Ok(ControlFlow::Continue(req))
187 /// }
188 /// }
189 /// .boxed()
190 /// )
191 /// .service(service);
192 /// # }
193 /// ```
194 fn oneshot_checkpoint_async<F, S, Fut, Request>(
195 self,
196 async_checkpoint_fn: F,
197 ) -> ServiceBuilder<Stack<OneShotAsyncCheckpointLayer<S, Fut, Request>, L>>
198 where
199 S: Service<Request, Error = BoxError> + Send + 'static,
200 Fut: Future<
201 Output = Result<ControlFlow<<S as Service<Request>>::Response, Request>, BoxError>,
202 >,
203 F: Fn(Request) -> Fut + Send + Sync + 'static,
204 {
205 self.layer(OneShotAsyncCheckpointLayer::new(async_checkpoint_fn))
206 }
207
208 /// Adds a buffer to the service stack with a default size.
209 ///
210 /// This is useful for making services `Clone` and `Send`
211 ///
212 /// # Examples
213 ///
214 /// ```rust
215 /// # use tower::ServiceBuilder;
216 /// # use tower_service::Service;
217 /// # use tracing::info_span;
218 /// # use apollo_router::services::supergraph;
219 /// # use apollo_router::layers::ServiceBuilderExt;
220 /// # fn test(service: supergraph::BoxService) {
221 /// let _ = ServiceBuilder::new()
222 /// .buffered()
223 /// .service(service);
224 /// # }
225 /// ```
226 fn buffered<Request>(self) -> ServiceBuilder<Stack<BufferLayer<Request>, L>>;
227
228 /// Place a span around the request.
229 ///
230 /// This is useful for adding a new span with custom attributes to tracing.
231 ///
232 /// Note that it is not possible to add extra attributes to existing spans. However, you can add
233 /// empty placeholder attributes to your span if you want to supply those attributes later.
234 ///
235 /// # Arguments
236 ///
237 /// * `span_fn`: The callback to create the span given the request.
238 ///
239 /// returns: ServiceBuilder<Stack<InstrumentLayer<F, Request>, L>>
240 ///
241 /// # Examples
242 ///
243 /// ```rust
244 /// # use tower::ServiceBuilder;
245 /// # use tower_service::Service;
246 /// # use tracing::info_span;
247 /// # use apollo_router::services::supergraph;
248 /// # use apollo_router::layers::ServiceBuilderExt;
249 /// # fn test(service: supergraph::BoxService) {
250 /// let instrumented = ServiceBuilder::new()
251 /// .instrument(|_request| info_span!("query_planning"))
252 /// .service(service);
253 /// # }
254 /// ```
255 fn instrument<F, Request>(
256 self,
257 span_fn: F,
258 ) -> ServiceBuilder<Stack<InstrumentLayer<F, Request>, L>>
259 where
260 F: Fn(&Request) -> Span,
261 {
262 self.layer(InstrumentLayer::new(span_fn))
263 }
264
265 /// Maps HTTP parts, as well as the first GraphQL response, to different values.
266 ///
267 /// In supergraph and execution services, the service response contains
268 /// not just one GraphQL response but a stream of them,
269 /// in order to support features such as `@defer`.
270 ///
271 /// This method wraps a service and calls a `callback` when the first GraphQL response
272 /// in the stream returned by the inner service becomes available.
273 /// The callback can then access the HTTP parts (headers, status code, etc)
274 /// or the first GraphQL response before returning them.
275 ///
276 /// Note that any subsequent GraphQLÂ responses after the first will be forwarded unmodified.
277 /// In order to inspect or modify all GraphQL responses,
278 /// consider using [`map_response`][tower::ServiceExt::map_response]
279 /// together with [`supergraph::Response::map_stream`] instead.
280 /// (See the example in `map_stream`’s documentation.)
281 /// In that case however HTTP parts cannot be modified because they may have already been sent.
282 ///
283 /// # Example
284 ///
285 /// ```
286 /// use apollo_router::services::supergraph;
287 /// use apollo_router::layers::ServiceBuilderExt as _;
288 /// use tower::ServiceExt as _;
289 ///
290 /// struct ExamplePlugin;
291 ///
292 /// #[async_trait::async_trait]
293 /// impl apollo_router::plugin::Plugin for ExamplePlugin {
294 /// # type Config = ();
295 /// # async fn new(
296 /// # _init: apollo_router::plugin::PluginInit<Self::Config>,
297 /// # ) -> Result<Self, tower::BoxError> {
298 /// # Ok(Self)
299 /// # }
300 /// // …
301 /// fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
302 /// tower::ServiceBuilder::new()
303 /// .map_first_graphql_response(|context, mut http_parts, mut graphql_response| {
304 /// // Something interesting here
305 /// (http_parts, graphql_response)
306 /// })
307 /// .service(inner)
308 /// .boxed()
309 /// }
310 /// }
311 /// ```
312 fn map_first_graphql_response<Callback>(
313 self,
314 callback: Callback,
315 ) -> ServiceBuilder<Stack<MapFirstGraphqlResponseLayer<Callback>, L>>
316 where
317 Callback: FnOnce(
318 Context,
319 http::response::Parts,
320 graphql::Response,
321 ) -> (http::response::Parts, graphql::Response)
322 + Clone
323 + Send
324 + 'static,
325 {
326 self.layer(MapFirstGraphqlResponseLayer { callback })
327 }
328
329 /// Similar to map_future but also providing an opportunity to extract information out of the
330 /// request for use when constructing the response.
331 ///
332 /// # Arguments
333 ///
334 /// * `req_fn`: The callback to extract data from the request.
335 /// * `map_fn`: The callback to map the future.
336 ///
337 /// returns: ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>>
338 ///
339 /// # Examples
340 ///
341 /// ```rust
342 /// # use std::future::Future;
343 /// # use tower::{BoxError, ServiceBuilder, ServiceExt};
344 /// # use tower::util::BoxService;
345 /// # use tower_service::Service;
346 /// # use tracing::info_span;
347 /// # use apollo_router::Context;
348 /// # use apollo_router::services::supergraph;
349 /// # use apollo_router::layers::ServiceBuilderExt;
350 /// # fn test(service: supergraph::BoxService) {
351 /// let _ : supergraph::BoxService = ServiceBuilder::new()
352 /// .map_future_with_request_data(
353 /// |req: &supergraph::Request| req.context.clone(),
354 /// |ctx : Context, fut| async { fut.await })
355 /// .service(service)
356 /// .boxed();
357 /// # }
358 /// ```
359 fn map_future_with_request_data<RF, MF>(
360 self,
361 req_fn: RF,
362 map_fn: MF,
363 ) -> ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>> {
364 self.layer(MapFutureWithRequestDataLayer::new(req_fn, map_fn))
365 }
366
367 /// Utility function to allow us to specify default methods on this trait rather than duplicating in the impl.
368 ///
369 /// # Arguments
370 ///
371 /// * `layer`: The layer to add to the service stack.
372 ///
373 /// returns: ServiceBuilder<Stack<T, L>>
374 ///
375 fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>>;
376}
377
378#[allow(clippy::type_complexity)]
379impl<L> ServiceBuilderExt<L> for ServiceBuilder<L> {
380 fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>> {
381 ServiceBuilder::layer(self, layer)
382 }
383
384 fn buffered<Request>(self) -> ServiceBuilder<Stack<BufferLayer<Request>, L>> {
385 self.buffer(DEFAULT_BUFFER_SIZE)
386 }
387}
388
389/// Extension trait for [`Service`].
390///
391/// Importing both this trait and [`tower::ServiceExt`] could lead a name collision error.
392/// To work around that, use `as _` syntax to make a trait’s methods available in a module
393/// without assigning it a name in that module’s namespace.
394///
395/// ```
396/// use apollo_router::layers::ServiceExt as _;
397/// use tower::ServiceExt as _;
398/// ```
399pub trait ServiceExt<Request>: Service<Request> {
400 /// Maps HTTP parts, as well as the first GraphQL response, to different values.
401 ///
402 /// In supergraph and execution services, the service response contains
403 /// not just one GraphQL response but a stream of them,
404 /// in order to support features such as `@defer`.
405 ///
406 /// This method wraps a service and call `callback` when the first GraphQL response
407 /// in the stream returned by the inner service becomes available.
408 /// The callback can then modify the HTTP parts (headers, status code, etc)
409 /// or the first GraphQL response before returning them.
410 ///
411 /// Note that any subsequent GraphQLÂ responses after the first will be forwarded unmodified.
412 /// In order to inspect or modify all GraphQL responses,
413 /// consider using [`map_response`][tower::ServiceExt::map_response]
414 /// together with [`supergraph::Response::map_stream`] instead.
415 /// (See the example in `map_stream`’s documentation.)
416 /// In that case however HTTP parts cannot be modified because they may have already been sent.
417 ///
418 /// # Example
419 ///
420 /// ```
421 /// use apollo_router::services::supergraph;
422 /// use apollo_router::layers::ServiceExt as _;
423 /// use tower::ServiceExt as _;
424 ///
425 /// struct ExamplePlugin;
426 ///
427 /// #[async_trait::async_trait]
428 /// impl apollo_router::plugin::Plugin for ExamplePlugin {
429 /// # type Config = ();
430 /// # async fn new(
431 /// # _init: apollo_router::plugin::PluginInit<Self::Config>,
432 /// # ) -> Result<Self, tower::BoxError> {
433 /// # Ok(Self)
434 /// # }
435 /// // …
436 /// fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
437 /// inner
438 /// .map_first_graphql_response(|context, mut http_parts, mut graphql_response| {
439 /// // Something interesting here
440 /// (http_parts, graphql_response)
441 /// })
442 /// .boxed()
443 /// }
444 /// }
445 /// ```
446 fn map_first_graphql_response<Callback>(
447 self,
448 callback: Callback,
449 ) -> MapFirstGraphqlResponseService<Self, Callback>
450 where
451 Self: Sized + Service<Request, Response = supergraph::Response>,
452 <Self as Service<Request>>::Future: Send + 'static,
453 Callback: FnOnce(
454 Context,
455 http::response::Parts,
456 graphql::Response,
457 ) -> (http::response::Parts, graphql::Response)
458 + Clone
459 + Send
460 + 'static,
461 {
462 ServiceBuilder::new()
463 .map_first_graphql_response(callback)
464 .service(self)
465 }
466
467 /// Similar to map_future but also providing an opportunity to extract information out of the
468 /// request for use when constructing the response.
469 ///
470 /// # Arguments
471 ///
472 /// * `req_fn`: The callback to extract data from the request.
473 /// * `map_fn`: The callback to map the future.
474 ///
475 /// returns: ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>>
476 ///
477 /// # Examples
478 ///
479 /// ```rust
480 /// # use std::future::Future;
481 /// # use tower::{BoxError, ServiceBuilder, ServiceExt};
482 /// # use tower::util::BoxService;
483 /// # use tower_service::Service;
484 /// # use tracing::info_span;
485 /// # use apollo_router::Context;
486 /// # use apollo_router::services::supergraph;
487 /// # use apollo_router::layers::ServiceBuilderExt;
488 /// # use apollo_router::layers::ServiceExt as ApolloServiceExt;
489 /// # fn test(service: supergraph::BoxService) {
490 /// let _ : supergraph::BoxService = service
491 /// .map_future_with_request_data(
492 /// |req: &supergraph::Request| req.context.clone(),
493 /// |ctx : Context, fut| async { fut.await }
494 /// )
495 /// .boxed();
496 /// # }
497 /// ```
498 fn map_future_with_request_data<RF, MF>(
499 self,
500 req_fn: RF,
501 map_fn: MF,
502 ) -> MapFutureWithRequestDataService<Self, RF, MF>
503 where
504 Self: Sized,
505 RF: Clone,
506 MF: Clone,
507 {
508 MapFutureWithRequestDataService::new(self, req_fn, map_fn)
509 }
510}
511impl<T: ?Sized, Request> ServiceExt<Request> for T where T: Service<Request> {}