apollo_router/layers/mod.rs
1//! Reusable layers
2//! Layers that are specific to one plugin should not be placed in this module.
3use std::future::Future;
4use std::ops::ControlFlow;
5
6use tower::BoxError;
7use tower::ServiceBuilder;
8use tower::buffer::BufferLayer;
9use tower::layer::util::Stack;
10use tower_service::Service;
11use tracing::Span;
12
13use self::map_first_graphql_response::MapFirstGraphqlResponseLayer;
14use self::map_first_graphql_response::MapFirstGraphqlResponseService;
15use crate::Context;
16use crate::graphql;
17use crate::layers::async_checkpoint::AsyncCheckpointLayer;
18use crate::layers::instrument::InstrumentLayer;
19use crate::layers::map_future_with_request_data::MapFutureWithRequestDataLayer;
20use crate::layers::map_future_with_request_data::MapFutureWithRequestDataService;
21use crate::layers::sync_checkpoint::CheckpointLayer;
22use crate::services::supergraph;
23
24pub mod async_checkpoint;
25pub mod instrument;
26pub mod map_first_graphql_response;
27pub mod map_future_with_request_data;
28pub mod sync_checkpoint;
29
30// Note: We use Buffer in many places throughout the router. 50_000 represents
31// the "maximal number of requests that can be queued for the buffered
32// service before backpressure is applied to callers". We set this to be
33// so high, 50_000, because we anticipate that many users will want to
34//
35// Think of this as a backstop for when there are no other backpressure
36// enforcing limits configured in a router. In future we may tweak this
37// value higher or lower or expose it as a configurable.
38pub(crate) const DEFAULT_BUFFER_SIZE: usize = 50_000;
39
40/// Extension to the [`ServiceBuilder`] trait to make it easy to add router specific capabilities
41/// (e.g.: checkpoints) to a [`Service`].
42#[allow(clippy::type_complexity)]
43pub trait ServiceBuilderExt<L>: Sized {
44 /// Decide if processing should continue or not, and if not allow returning of a response.
45 ///
46 /// This is useful for validation functionality where you want to abort processing but return a
47 /// valid response.
48 ///
49 /// # Arguments
50 ///
51 /// * `checkpoint_fn`: Ths callback to decides if processing should continue or not.
52 ///
53 /// returns: ServiceBuilder<Stack<CheckpointLayer<S, Request>, L>>
54 ///
55 /// # Examples
56 ///
57 /// ```rust
58 /// # use std::ops::ControlFlow;
59 /// # use http::Method;
60 /// # use tower::ServiceBuilder;
61 /// # use tower_service::Service;
62 /// # use tracing::info_span;
63 /// # use apollo_router::services::supergraph;
64 /// # use apollo_router::layers::ServiceBuilderExt;
65 /// # fn test(service: supergraph::BoxService) {
66 /// let _ = ServiceBuilder::new()
67 /// .checkpoint(|req: supergraph::Request|{
68 /// if req.supergraph_request.method() == Method::GET {
69 /// Ok(ControlFlow::Break(supergraph::Response::builder()
70 /// .data("Only get requests allowed")
71 /// .context(req.context)
72 /// .build()?))
73 /// } else {
74 /// Ok(ControlFlow::Continue(req))
75 /// }
76 /// })
77 /// .service(service);
78 /// # }
79 /// ```
80 fn checkpoint<S, Request>(
81 self,
82 checkpoint_fn: impl Fn(
83 Request,
84 ) -> Result<
85 ControlFlow<<S as Service<Request>>::Response, Request>,
86 <S as Service<Request>>::Error,
87 > + Send
88 + Sync
89 + 'static,
90 ) -> ServiceBuilder<Stack<CheckpointLayer<S, Request>, L>>
91 where
92 S: Service<Request> + Send + 'static,
93 Request: Send + 'static,
94 S::Future: Send,
95 S::Response: Send + 'static,
96 S::Error: Into<BoxError> + Send + 'static,
97 {
98 self.layer(CheckpointLayer::new(checkpoint_fn))
99 }
100
101 /// Decide if processing should continue or not, and if not allow returning of a response.
102 /// Unlike checkpoint it is possible to perform async operations in the callback. However
103 /// this requires that the service is `Clone`. This can be achieved using `.buffered()`.
104 ///
105 /// This is useful for things like authentication where you need to make an external call to
106 /// check if a request should proceed or not.
107 ///
108 /// # Arguments
109 ///
110 /// * `async_checkpoint_fn`: The asynchronous callback to decide if processing should continue or not.
111 ///
112 /// returns: ServiceBuilder<Stack<AsyncCheckpointLayer<S, Request>, L>>
113 ///
114 /// # Examples
115 ///
116 /// ```rust
117 /// # use std::ops::ControlFlow;
118 /// use futures::FutureExt;
119 /// # use http::Method;
120 /// # use tower::ServiceBuilder;
121 /// # use tower_service::Service;
122 /// # use tracing::info_span;
123 /// # use apollo_router::services::supergraph;
124 /// # use apollo_router::layers::ServiceBuilderExt;
125 /// # fn test(service: supergraph::BoxService) {
126 /// let _ = ServiceBuilder::new()
127 /// .checkpoint_async(|req: supergraph::Request|
128 /// async {
129 /// if req.supergraph_request.method() == Method::GET {
130 /// Ok(ControlFlow::Break(supergraph::Response::builder()
131 /// .data("Only get requests allowed")
132 /// .context(req.context)
133 /// .build()?))
134 /// } else {
135 /// Ok(ControlFlow::Continue(req))
136 /// }
137 /// }
138 /// .boxed()
139 /// )
140 /// .buffered()
141 /// .service(service);
142 /// # }
143 /// ```
144 fn checkpoint_async<F, S, Fut, Request>(
145 self,
146 async_checkpoint_fn: F,
147 ) -> ServiceBuilder<Stack<AsyncCheckpointLayer<S, Fut, Request>, L>>
148 where
149 S: Service<Request, Error = BoxError> + Clone + Send + 'static,
150 Fut: Future<
151 Output = Result<ControlFlow<<S as Service<Request>>::Response, Request>, BoxError>,
152 >,
153 F: Fn(Request) -> Fut + Send + Sync + 'static,
154 {
155 self.layer(AsyncCheckpointLayer::new(async_checkpoint_fn))
156 }
157
158 /// Adds a buffer to the service stack with a default size.
159 ///
160 /// This is useful for making services `Clone` and `Send`
161 ///
162 /// # Examples
163 ///
164 /// ```rust
165 /// # use tower::ServiceBuilder;
166 /// # use tower_service::Service;
167 /// # use tracing::info_span;
168 /// # use apollo_router::services::supergraph;
169 /// # use apollo_router::layers::ServiceBuilderExt;
170 /// # fn test(service: supergraph::BoxService) {
171 /// let _ = ServiceBuilder::new()
172 /// .buffered()
173 /// .service(service);
174 /// # }
175 /// ```
176 fn buffered<Request>(self) -> ServiceBuilder<Stack<BufferLayer<Request>, L>>;
177
178 /// Place a span around the request.
179 ///
180 /// This is useful for adding a new span with custom attributes to tracing.
181 ///
182 /// Note that it is not possible to add extra attributes to existing spans. However, you can add
183 /// empty placeholder attributes to your span if you want to supply those attributes later.
184 ///
185 /// # Arguments
186 ///
187 /// * `span_fn`: The callback to create the span given the request.
188 ///
189 /// returns: ServiceBuilder<Stack<InstrumentLayer<F, Request>, L>>
190 ///
191 /// # Examples
192 ///
193 /// ```rust
194 /// # use tower::ServiceBuilder;
195 /// # use tower_service::Service;
196 /// # use tracing::info_span;
197 /// # use apollo_router::services::supergraph;
198 /// # use apollo_router::layers::ServiceBuilderExt;
199 /// # fn test(service: supergraph::BoxService) {
200 /// let instrumented = ServiceBuilder::new()
201 /// .instrument(|_request| info_span!("query_planning"))
202 /// .service(service);
203 /// # }
204 /// ```
205 fn instrument<F, Request>(
206 self,
207 span_fn: F,
208 ) -> ServiceBuilder<Stack<InstrumentLayer<F, Request>, L>>
209 where
210 F: Fn(&Request) -> Span,
211 {
212 self.layer(InstrumentLayer::new(span_fn))
213 }
214
215 /// Maps HTTP parts, as well as the first GraphQL response, to different values.
216 ///
217 /// In supergraph and execution services, the service response contains
218 /// not just one GraphQL response but a stream of them,
219 /// in order to support features such as `@defer`.
220 ///
221 /// This method wraps a service and calls a `callback` when the first GraphQL response
222 /// in the stream returned by the inner service becomes available.
223 /// The callback can then access the HTTP parts (headers, status code, etc)
224 /// or the first GraphQL response before returning them.
225 ///
226 /// Note that any subsequent GraphQLÂ responses after the first will be forwarded unmodified.
227 /// In order to inspect or modify all GraphQL responses,
228 /// consider using [`map_response`][tower::ServiceExt::map_response]
229 /// together with [`supergraph::Response::map_stream`] instead.
230 /// (See the example in `map_stream`’s documentation.)
231 /// In that case however HTTP parts cannot be modified because they may have already been sent.
232 ///
233 /// # Example
234 ///
235 /// ```
236 /// use apollo_router::services::supergraph;
237 /// use apollo_router::layers::ServiceBuilderExt as _;
238 /// use tower::ServiceExt as _;
239 ///
240 /// struct ExamplePlugin;
241 ///
242 /// #[async_trait::async_trait]
243 /// impl apollo_router::plugin::Plugin for ExamplePlugin {
244 /// # type Config = ();
245 /// # async fn new(
246 /// # _init: apollo_router::plugin::PluginInit<Self::Config>,
247 /// # ) -> Result<Self, tower::BoxError> {
248 /// # Ok(Self)
249 /// # }
250 /// // …
251 /// fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
252 /// tower::ServiceBuilder::new()
253 /// .map_first_graphql_response(|context, mut http_parts, mut graphql_response| {
254 /// // Something interesting here
255 /// (http_parts, graphql_response)
256 /// })
257 /// .service(inner)
258 /// .boxed()
259 /// }
260 /// }
261 /// ```
262 fn map_first_graphql_response<Callback>(
263 self,
264 callback: Callback,
265 ) -> ServiceBuilder<Stack<MapFirstGraphqlResponseLayer<Callback>, L>>
266 where
267 Callback: FnOnce(
268 Context,
269 http::response::Parts,
270 graphql::Response,
271 ) -> (http::response::Parts, graphql::Response)
272 + Clone
273 + Send
274 + 'static,
275 {
276 self.layer(MapFirstGraphqlResponseLayer { callback })
277 }
278
279 /// Similar to map_future but also providing an opportunity to extract information out of the
280 /// request for use when constructing the response.
281 ///
282 /// # Arguments
283 ///
284 /// * `req_fn`: The callback to extract data from the request.
285 /// * `map_fn`: The callback to map the future.
286 ///
287 /// returns: ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>>
288 ///
289 /// # Examples
290 ///
291 /// ```rust
292 /// # use std::future::Future;
293 /// # use tower::{BoxError, ServiceBuilder, ServiceExt};
294 /// # use tower::util::BoxService;
295 /// # use tower_service::Service;
296 /// # use tracing::info_span;
297 /// # use apollo_router::Context;
298 /// # use apollo_router::services::supergraph;
299 /// # use apollo_router::layers::ServiceBuilderExt;
300 /// # fn test(service: supergraph::BoxService) {
301 /// let _ : supergraph::BoxService = ServiceBuilder::new()
302 /// .map_future_with_request_data(
303 /// |req: &supergraph::Request| req.context.clone(),
304 /// |ctx : Context, fut| async { fut.await })
305 /// .service(service)
306 /// .boxed();
307 /// # }
308 /// ```
309 fn map_future_with_request_data<RF, MF>(
310 self,
311 req_fn: RF,
312 map_fn: MF,
313 ) -> ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>> {
314 self.layer(MapFutureWithRequestDataLayer::new(req_fn, map_fn))
315 }
316
317 /// Utility function to allow us to specify default methods on this trait rather than duplicating in the impl.
318 ///
319 /// # Arguments
320 ///
321 /// * `layer`: The layer to add to the service stack.
322 ///
323 /// returns: ServiceBuilder<Stack<T, L>>
324 ///
325 fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>>;
326}
327
328#[allow(clippy::type_complexity)]
329impl<L> ServiceBuilderExt<L> for ServiceBuilder<L> {
330 fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>> {
331 ServiceBuilder::layer(self, layer)
332 }
333
334 fn buffered<Request>(self) -> ServiceBuilder<Stack<BufferLayer<Request>, L>> {
335 self.buffer(DEFAULT_BUFFER_SIZE)
336 }
337}
338
339/// Extension trait for [`Service`].
340///
341/// Importing both this trait and [`tower::ServiceExt`] could lead a name collision error.
342/// To work around that, use `as _` syntax to make a trait’s methods available in a module
343/// without assigning it a name in that module’s namespace.
344///
345/// ```
346/// use apollo_router::layers::ServiceExt as _;
347/// use tower::ServiceExt as _;
348/// ```
349pub trait ServiceExt<Request>: Service<Request> {
350 /// Maps HTTP parts, as well as the first GraphQL response, to different values.
351 ///
352 /// In supergraph and execution services, the service response contains
353 /// not just one GraphQL response but a stream of them,
354 /// in order to support features such as `@defer`.
355 ///
356 /// This method wraps a service and call `callback` when the first GraphQL response
357 /// in the stream returned by the inner service becomes available.
358 /// The callback can then modify the HTTP parts (headers, status code, etc)
359 /// or the first GraphQL response before returning them.
360 ///
361 /// Note that any subsequent GraphQLÂ responses after the first will be forwarded unmodified.
362 /// In order to inspect or modify all GraphQL responses,
363 /// consider using [`map_response`][tower::ServiceExt::map_response]
364 /// together with [`supergraph::Response::map_stream`] instead.
365 /// (See the example in `map_stream`’s documentation.)
366 /// In that case however HTTP parts cannot be modified because they may have already been sent.
367 ///
368 /// # Example
369 ///
370 /// ```
371 /// use apollo_router::services::supergraph;
372 /// use apollo_router::layers::ServiceExt as _;
373 /// use tower::ServiceExt as _;
374 ///
375 /// struct ExamplePlugin;
376 ///
377 /// #[async_trait::async_trait]
378 /// impl apollo_router::plugin::Plugin for ExamplePlugin {
379 /// # type Config = ();
380 /// # async fn new(
381 /// # _init: apollo_router::plugin::PluginInit<Self::Config>,
382 /// # ) -> Result<Self, tower::BoxError> {
383 /// # Ok(Self)
384 /// # }
385 /// // …
386 /// fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
387 /// inner
388 /// .map_first_graphql_response(|context, mut http_parts, mut graphql_response| {
389 /// // Something interesting here
390 /// (http_parts, graphql_response)
391 /// })
392 /// .boxed()
393 /// }
394 /// }
395 /// ```
396 fn map_first_graphql_response<Callback>(
397 self,
398 callback: Callback,
399 ) -> MapFirstGraphqlResponseService<Self, Callback>
400 where
401 Self: Sized + Service<Request, Response = supergraph::Response>,
402 <Self as Service<Request>>::Future: Send + 'static,
403 Callback: FnOnce(
404 Context,
405 http::response::Parts,
406 graphql::Response,
407 ) -> (http::response::Parts, graphql::Response)
408 + Clone
409 + Send
410 + 'static,
411 {
412 ServiceBuilder::new()
413 .map_first_graphql_response(callback)
414 .service(self)
415 }
416
417 /// Similar to map_future but also providing an opportunity to extract information out of the
418 /// request for use when constructing the response.
419 ///
420 /// # Arguments
421 ///
422 /// * `req_fn`: The callback to extract data from the request.
423 /// * `map_fn`: The callback to map the future.
424 ///
425 /// returns: ServiceBuilder<Stack<MapFutureWithRequestDataLayer<RF, MF>, L>>
426 ///
427 /// # Examples
428 ///
429 /// ```rust
430 /// # use std::future::Future;
431 /// # use tower::{BoxError, ServiceBuilder, ServiceExt};
432 /// # use tower::util::BoxService;
433 /// # use tower_service::Service;
434 /// # use tracing::info_span;
435 /// # use apollo_router::Context;
436 /// # use apollo_router::services::supergraph;
437 /// # use apollo_router::layers::ServiceBuilderExt;
438 /// # use apollo_router::layers::ServiceExt as ApolloServiceExt;
439 /// # fn test(service: supergraph::BoxService) {
440 /// let _ : supergraph::BoxService = service
441 /// .map_future_with_request_data(
442 /// |req: &supergraph::Request| req.context.clone(),
443 /// |ctx : Context, fut| async { fut.await }
444 /// )
445 /// .boxed();
446 /// # }
447 /// ```
448 fn map_future_with_request_data<RF, MF>(
449 self,
450 req_fn: RF,
451 map_fn: MF,
452 ) -> MapFutureWithRequestDataService<Self, RF, MF>
453 where
454 Self: Sized,
455 RF: Clone,
456 MF: Clone,
457 {
458 MapFutureWithRequestDataService::new(self, req_fn, map_fn)
459 }
460}
461impl<T: ?Sized, Request> ServiceExt<Request> for T where T: Service<Request> {}