apollo_router/layers/mod.rs
1//! Reusable layers
2//! Layers that are specific to one plugin should not be placed in this module.
3use std::future::Future;
4use std::ops::ControlFlow;
5
6use tower::BoxError;
7use tower::ServiceBuilder;
8use tower::layer::util::Stack;
9use tower_service::Service;
10use tracing::Span;
11
12use self::map_first_graphql_response::MapFirstGraphqlResponseLayer;
13use self::map_first_graphql_response::MapFirstGraphqlResponseService;
14use crate::Context;
15use crate::graphql;
16use crate::layers::async_checkpoint::AsyncCheckpointLayer;
17use crate::layers::instrument::InstrumentLayer;
18use crate::layers::map_future_with_request_data::MapFutureWithRequestDataLayer;
19use crate::layers::map_future_with_request_data::MapFutureWithRequestDataService;
20use crate::layers::sync_checkpoint::CheckpointLayer;
21use crate::layers::unconstrained_buffer::UnconstrainedBufferLayer;
22use crate::services::supergraph;
23
24pub mod async_checkpoint;
25pub mod instrument;
26pub mod map_first_graphql_response;
27pub mod map_future_with_request_data;
28pub mod sync_checkpoint;
29pub mod unconstrained_buffer;
30
31// Note: We use Buffer in many places throughout the router. 50_000 represents
32// the "maximal number of requests that can be queued for the buffered
33// service before backpressure is applied to callers". We set this to be
34// so high, 50_000, because we anticipate that many users will want to
35//
36// Think of this as a backstop for when there are no other backpressure
37// enforcing limits configured in a router. In future we may tweak this
38// value higher or lower or expose it as a configurable.
39pub(crate) const DEFAULT_BUFFER_SIZE: usize = 50_000;
40
41/// Extension to the [`ServiceBuilder`] trait to make it easy to add router specific capabilities
42/// (e.g.: checkpoints) to a [`Service`].
43#[allow(clippy::type_complexity)]
44pub trait ServiceBuilderExt<L>: Sized {
45 /// Decide if processing should continue or not, and if not allow returning of a response.
46 ///
47 /// This is useful for validation functionality where you want to abort processing but return a
48 /// valid response.
49 ///
50 /// # Arguments
51 ///
52 /// * `checkpoint_fn`: Ths callback to decides if processing should continue or not.
53 ///
54 /// returns: ServiceBuilder<Stack<CheckpointLayer<S, Request>, L>>
55 ///
56 /// # Examples
57 ///
58 /// ```rust
59 /// # use std::ops::ControlFlow;
60 /// # use http::Method;
61 /// # use tower::ServiceBuilder;
62 /// # use tower_service::Service;
63 /// # use tracing::info_span;
64 /// # use apollo_router::services::supergraph;
65 /// # use apollo_router::layers::ServiceBuilderExt;
66 /// # fn test(service: supergraph::BoxService) {
67 /// let _ = ServiceBuilder::new()
68 /// .checkpoint(|req: supergraph::Request|{
69 /// if req.supergraph_request.method() == Method::GET {
70 /// Ok(ControlFlow::Break(supergraph::Response::builder()
71 /// .data("Only get requests allowed")
72 /// .context(req.context)
73 /// .build()?))
74 /// } else {
75 /// Ok(ControlFlow::Continue(req))
76 /// }
77 /// })
78 /// .service(service);
79 /// # }
80 /// ```
81 fn checkpoint<S, Request>(
82 self,
83 checkpoint_fn: impl Fn(
84 Request,
85 ) -> Result<
86 ControlFlow<<S as Service<Request>>::Response, Request>,
87 <S as Service<Request>>::Error,
88 > + Send
89 + Sync
90 + 'static,
91 ) -> ServiceBuilder<Stack<CheckpointLayer<S, Request>, L>>
92 where
93 S: Service<Request> + Send + 'static,
94 Request: Send + 'static,
95 S::Future: Send,
96 S::Response: Send + 'static,
97 S::Error: Into<BoxError> + Send + 'static,
98 {
99 self.layer(CheckpointLayer::new(checkpoint_fn))
100 }
101
102 /// Decide if processing should continue or not, and if not allow returning of a response.
103 /// Unlike checkpoint it is possible to perform async operations in the callback. However
104 /// this requires that the service is `Clone`. This can be achieved using `.buffered()`.
105 ///
106 /// This is useful for things like authentication where you need to make an external call to
107 /// check if a request should proceed or not.
108 ///
109 /// # Arguments
110 ///
111 /// * `async_checkpoint_fn`: The asynchronous callback to decide if processing should continue or not.
112 ///
113 /// returns: ServiceBuilder<Stack<AsyncCheckpointLayer<S, Request>, L>>
114 ///
115 /// # Examples
116 ///
117 /// ```rust
118 /// # use std::ops::ControlFlow;
119 /// use futures::FutureExt;
120 /// # use http::Method;
121 /// # use tower::ServiceBuilder;
122 /// # use tower_service::Service;
123 /// # use tracing::info_span;
124 /// # use apollo_router::services::supergraph;
125 /// # use apollo_router::layers::ServiceBuilderExt;
126 /// # fn test(service: supergraph::BoxService) {
127 /// let _ = ServiceBuilder::new()
128 /// .checkpoint_async(|req: supergraph::Request|
129 /// async {
130 /// if req.supergraph_request.method() == Method::GET {
131 /// Ok(ControlFlow::Break(supergraph::Response::builder()
132 /// .data("Only get requests allowed")
133 /// .context(req.context)
134 /// .build()?))
135 /// } else {
136 /// Ok(ControlFlow::Continue(req))
137 /// }
138 /// }
139 /// .boxed()
140 /// )
141 /// .buffered()
142 /// .service(service);
143 /// # }
144 /// ```
145 fn checkpoint_async<F, S, Fut, Request>(
146 self,
147 async_checkpoint_fn: F,
148 ) -> ServiceBuilder<Stack<AsyncCheckpointLayer<S, Fut, Request>, L>>
149 where
150 S: Service<Request, Error = BoxError> + Clone + Send + 'static,
151 Fut: Future<
152 Output = Result<ControlFlow<<S as Service<Request>>::Response, Request>, BoxError>,
153 >,
154 F: Fn(Request) -> Fut + Send + Sync + 'static,
155 {
156 self.layer(AsyncCheckpointLayer::new(async_checkpoint_fn))
157 }
158
159 /// Adds a buffer to the service stack with a default size.
160 ///
161 /// This is useful for making services `Clone` and `Send`
162 ///
163 /// # Examples
164 ///
165 /// ```rust
166 /// # use tower::ServiceBuilder;
167 /// # use tower_service::Service;
168 /// # use tracing::info_span;
169 /// # use apollo_router::services::supergraph;
170 /// # use apollo_router::layers::ServiceBuilderExt;
171 /// # fn test(service: supergraph::BoxService) {
172 /// let _ = ServiceBuilder::new()
173 /// .buffered()
174 /// .service(service);
175 /// # }
176 /// ```
177 fn buffered<Request>(self) -> ServiceBuilder<Stack<UnconstrainedBufferLayer<Request>, L>>;
178
179 /// Place a span around the request.
180 ///
181 /// This is useful for adding a new span with custom attributes to tracing.
182 ///
183 /// Note that it is not possible to add extra attributes to existing spans. However, you can add
184 /// empty placeholder attributes to your span if you want to supply those attributes later.
185 ///
186 /// # Arguments
187 ///
188 /// * `span_fn`: The callback to create the span given the request.
189 ///
190 /// returns: ServiceBuilder<Stack<InstrumentLayer<F, Request>, L>>
191 ///
192 /// # Examples
193 ///
194 /// ```rust
195 /// # use tower::ServiceBuilder;
196 /// # use tower_service::Service;
197 /// # use tracing::info_span;
198 /// # use apollo_router::services::supergraph;
199 /// # use apollo_router::layers::ServiceBuilderExt;
200 /// # fn test(service: supergraph::BoxService) {
201 /// let instrumented = ServiceBuilder::new()
202 /// .instrument(|_request| info_span!("query_planning"))
203 /// .service(service);
204 /// # }
205 /// ```
206 fn instrument<F, Request>(
207 self,
208 span_fn: F,
209 ) -> ServiceBuilder<Stack<InstrumentLayer<F, Request>, L>>
210 where
211 F: Fn(&Request) -> Span,
212 {
213 self.layer(InstrumentLayer::new(span_fn))
214 }
215
216 /// Maps HTTP parts, as well as the first GraphQL response, to different values.
217 ///
218 /// In supergraph and execution services, the service response contains
219 /// not just one GraphQL response but a stream of them,
220 /// in order to support features such as `@defer`.
221 ///
222 /// This method wraps a service and calls a `callback` when the first GraphQL response
223 /// in the stream returned by the inner service becomes available.
224 /// The callback can then access the HTTP parts (headers, status code, etc)
225 /// or the first GraphQL response before returning them.
226 ///
227 /// Note that any subsequent GraphQLÂ responses after the first will be forwarded unmodified.
228 /// In order to inspect or modify all GraphQL responses,
229 /// consider using [`map_response`][tower::ServiceExt::map_response]
230 /// together with [`supergraph::Response::map_stream`] instead.
231 /// (See the example in `map_stream`’s documentation.)
232 /// In that case however HTTP parts cannot be modified because they may have already been sent.
233 ///
234 /// # Example
235 ///
236 /// ```
237 /// use apollo_router::services::supergraph;
238 /// use apollo_router::layers::ServiceBuilderExt as _;
239 /// use tower::ServiceExt as _;
240 ///
241 /// struct ExamplePlugin;
242 ///
243 /// #[async_trait::async_trait]
244 /// impl apollo_router::plugin::Plugin for ExamplePlugin {
245 /// # type Config = ();
246 /// # async fn new(
247 /// # _init: apollo_router::plugin::PluginInit<Self::Config>,
248 /// # ) -> Result<Self, tower::BoxError> {
249 /// # Ok(Self)
250 /// # }
251 /// // …
252 /// fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
253 /// tower::ServiceBuilder::new()
254 /// .map_first_graphql_response(|context, mut http_parts, mut graphql_response| {
255 /// // Something interesting here
256 /// (http_parts, graphql_response)
257 /// })
258 /// .service(inner)
259 /// .boxed()
260 /// }
261 /// }
262 /// ```
263 fn map_first_graphql_response<Callback>(
264 self,
265 callback: Callback,
266 ) -> ServiceBuilder<Stack<MapFirstGraphqlResponseLayer<Callback>, L>>
267 where
268 Callback: FnOnce(
269 Context,
270 http::response::Parts,
271 graphql::Response,
272 ) -> (http::response::Parts, graphql::Response)
273 + Clone
274 + Send
275 + 'static,
276 {
277 self.layer(MapFirstGraphqlResponseLayer { callback })
278 }
279
280 /// Similar to map_future but also providing an opportunity to extract information out of the
281 /// request for use when constructing the response.
282 ///
283 /// # Arguments
284 ///
285 /// * `req_fn`: The callback to extract data from the request.
286 /// * `map_fn`: The callback to map the future.
287 ///
288 /// returns: ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>>
289 ///
290 /// # Examples
291 ///
292 /// ```rust
293 /// # use std::future::Future;
294 /// # use tower::{BoxError, ServiceBuilder, ServiceExt};
295 /// # use tower::util::BoxService;
296 /// # use tower_service::Service;
297 /// # use tracing::info_span;
298 /// # use apollo_router::Context;
299 /// # use apollo_router::services::supergraph;
300 /// # use apollo_router::layers::ServiceBuilderExt;
301 /// # fn test(service: supergraph::BoxService) {
302 /// let _ : supergraph::BoxService = ServiceBuilder::new()
303 /// .map_future_with_request_data(
304 /// |req: &supergraph::Request| req.context.clone(),
305 /// |ctx : Context, fut| async { fut.await })
306 /// .service(service)
307 /// .boxed();
308 /// # }
309 /// ```
310 fn map_future_with_request_data<RF, MF>(
311 self,
312 req_fn: RF,
313 map_fn: MF,
314 ) -> ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>> {
315 self.layer(MapFutureWithRequestDataLayer::new(req_fn, map_fn))
316 }
317
318 /// Utility function to allow us to specify default methods on this trait rather than duplicating in the impl.
319 ///
320 /// # Arguments
321 ///
322 /// * `layer`: The layer to add to the service stack.
323 ///
324 /// returns: ServiceBuilder<Stack<T, L>>
325 ///
326 fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>>;
327}
328
329#[allow(clippy::type_complexity)]
330impl<L> ServiceBuilderExt<L> for ServiceBuilder<L> {
331 fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>> {
332 ServiceBuilder::layer(self, layer)
333 }
334
335 fn buffered<Request>(self) -> ServiceBuilder<Stack<UnconstrainedBufferLayer<Request>, L>> {
336 self.layer(UnconstrainedBufferLayer::new(DEFAULT_BUFFER_SIZE))
337 }
338}
339
340/// Extension trait for [`Service`].
341///
342/// Importing both this trait and [`tower::ServiceExt`] could lead a name collision error.
343/// To work around that, use `as _` syntax to make a trait’s methods available in a module
344/// without assigning it a name in that module’s namespace.
345///
346/// ```
347/// use apollo_router::layers::ServiceExt as _;
348/// use tower::ServiceExt as _;
349/// ```
350pub trait ServiceExt<Request>: Service<Request> {
351 /// Maps HTTP parts, as well as the first GraphQL response, to different values.
352 ///
353 /// In supergraph and execution services, the service response contains
354 /// not just one GraphQL response but a stream of them,
355 /// in order to support features such as `@defer`.
356 ///
357 /// This method wraps a service and call `callback` when the first GraphQL response
358 /// in the stream returned by the inner service becomes available.
359 /// The callback can then modify the HTTP parts (headers, status code, etc)
360 /// or the first GraphQL response before returning them.
361 ///
362 /// Note that any subsequent GraphQLÂ responses after the first will be forwarded unmodified.
363 /// In order to inspect or modify all GraphQL responses,
364 /// consider using [`map_response`][tower::ServiceExt::map_response]
365 /// together with [`supergraph::Response::map_stream`] instead.
366 /// (See the example in `map_stream`’s documentation.)
367 /// In that case however HTTP parts cannot be modified because they may have already been sent.
368 ///
369 /// # Example
370 ///
371 /// ```
372 /// use apollo_router::services::supergraph;
373 /// use apollo_router::layers::ServiceExt as _;
374 /// use tower::ServiceExt as _;
375 ///
376 /// struct ExamplePlugin;
377 ///
378 /// #[async_trait::async_trait]
379 /// impl apollo_router::plugin::Plugin for ExamplePlugin {
380 /// # type Config = ();
381 /// # async fn new(
382 /// # _init: apollo_router::plugin::PluginInit<Self::Config>,
383 /// # ) -> Result<Self, tower::BoxError> {
384 /// # Ok(Self)
385 /// # }
386 /// // …
387 /// fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
388 /// inner
389 /// .map_first_graphql_response(|context, mut http_parts, mut graphql_response| {
390 /// // Something interesting here
391 /// (http_parts, graphql_response)
392 /// })
393 /// .boxed()
394 /// }
395 /// }
396 /// ```
397 fn map_first_graphql_response<Callback>(
398 self,
399 callback: Callback,
400 ) -> MapFirstGraphqlResponseService<Self, Callback>
401 where
402 Self: Sized + Service<Request, Response = supergraph::Response>,
403 <Self as Service<Request>>::Future: Send + 'static,
404 Callback: FnOnce(
405 Context,
406 http::response::Parts,
407 graphql::Response,
408 ) -> (http::response::Parts, graphql::Response)
409 + Clone
410 + Send
411 + 'static,
412 {
413 ServiceBuilder::new()
414 .map_first_graphql_response(callback)
415 .service(self)
416 }
417
418 /// Similar to map_future but also providing an opportunity to extract information out of the
419 /// request for use when constructing the response.
420 ///
421 /// # Arguments
422 ///
423 /// * `req_fn`: The callback to extract data from the request.
424 /// * `map_fn`: The callback to map the future.
425 ///
426 /// returns: ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>>
427 ///
428 /// # Examples
429 ///
430 /// ```rust
431 /// # use std::future::Future;
432 /// # use tower::{BoxError, ServiceBuilder, ServiceExt};
433 /// # use tower::util::BoxService;
434 /// # use tower_service::Service;
435 /// # use tracing::info_span;
436 /// # use apollo_router::Context;
437 /// # use apollo_router::services::supergraph;
438 /// # use apollo_router::layers::ServiceBuilderExt;
439 /// # use apollo_router::layers::ServiceExt as ApolloServiceExt;
440 /// # fn test(service: supergraph::BoxService) {
441 /// let _ : supergraph::BoxService = service
442 /// .map_future_with_request_data(
443 /// |req: &supergraph::Request| req.context.clone(),
444 /// |ctx : Context, fut| async { fut.await }
445 /// )
446 /// .boxed();
447 /// # }
448 /// ```
449 fn map_future_with_request_data<RF, MF>(
450 self,
451 req_fn: RF,
452 map_fn: MF,
453 ) -> MapFutureWithRequestDataService<Self, RF, MF>
454 where
455 Self: Sized,
456 RF: Clone,
457 MF: Clone,
458 {
459 MapFutureWithRequestDataService::new(self, req_fn, map_fn)
460 }
461}
462impl<T: ?Sized, Request> ServiceExt<Request> for T where T: Service<Request> {}