tower_async/util/mod.rs
1//! Various utility types and functions that are generally used with Tower.
2
3mod and_then;
4mod either;
5
6mod map_err;
7mod map_request;
8mod map_response;
9mod map_result;
10
11mod service_fn;
12mod then;
13
14pub mod backoff;
15pub mod rng;
16
17pub use self::{
18 and_then::{AndThen, AndThenLayer},
19 either::Either,
20 map_err::{MapErr, MapErrLayer},
21 map_request::{MapRequest, MapRequestLayer},
22 map_response::{MapResponse, MapResponseLayer},
23 map_result::{MapResult, MapResultLayer},
24 service_fn::{service_fn, ServiceFn},
25 then::{Then, ThenLayer},
26};
27
28use std::future::Future;
29
30use crate::layer::util::Identity;
31
32/// An extension trait for `Service`s that provides a variety of convenient
33/// adapters
34pub trait ServiceExt<Request>: tower_async_service::Service<Request> {
35 /// Consume this `Service`, calling it with the provided request once and only once.
36 fn oneshot(
37 self,
38 req: Request,
39 ) -> impl std::future::Future<Output = Result<Self::Response, Self::Error>>
40 where
41 Self: Sized,
42 {
43 async move { self.call(req).await }
44 }
45
46 /// Executes a new future after this service's future resolves.
47 ///
48 /// This method can be used to change the [`Response`] type of the service
49 /// into a different type. You can use this method to chain along a computation once the
50 /// service's response has been resolved.
51 ///
52 /// [`Response`]: crate::Service::Response
53 ///
54 /// # Example
55 /// ```
56 /// # use tower_async::{Service, ServiceExt};
57 /// #
58 /// # struct DatabaseService;
59 /// # impl DatabaseService {
60 /// # fn new(address: &str) -> Self {
61 /// # DatabaseService
62 /// # }
63 /// # }
64 /// #
65 /// # struct Record {
66 /// # pub name: String,
67 /// # pub age: u16
68 /// # }
69 /// #
70 /// # impl Service<u32> for DatabaseService {
71 /// # type Response = Record;
72 /// # type Error = u8;
73 /// #
74 /// # async fn call(&self, request: u32) -> Result<Self::Response, Self::Error> {
75 /// # Ok(Record { name: "Jack".into(), age: 32 })
76 /// # }
77 /// # }
78 /// #
79 /// # async fn avatar_lookup(name: String) -> Result<Vec<u8>, u8> { Ok(vec![]) }
80 /// #
81 /// # fn main() {
82 /// # async {
83 /// // A service returning Result<Record, _>
84 /// let service = DatabaseService::new("127.0.0.1:8080");
85 ///
86 /// // Map the response into a new response
87 /// let mut new_service = service.and_then(|record: Record| async move {
88 /// let name = record.name;
89 /// avatar_lookup(name).await
90 /// });
91 ///
92 /// // Call the new service
93 /// let id = 13;
94 /// let avatar = new_service.call(id).await.unwrap();
95 /// # };
96 /// # }
97 /// ```
98 fn and_then<F>(self, f: F) -> AndThen<Self, F>
99 where
100 Self: Sized,
101 F: Clone,
102 {
103 AndThen::new(self, f)
104 }
105
106 /// Maps this service's response value to a different value.
107 ///
108 /// This method can be used to change the [`Response`] type of the service
109 /// into a different type. It is similar to the [`Result::map`]
110 /// method. You can use this method to chain along a computation once the
111 /// service's response has been resolved.
112 ///
113 /// [`Response`]: crate::Service::Response
114 ///
115 /// # Example
116 /// ```
117 /// # use tower_async::{Service, ServiceExt};
118 /// #
119 /// # struct DatabaseService;
120 /// # impl DatabaseService {
121 /// # fn new(address: &str) -> Self {
122 /// # DatabaseService
123 /// # }
124 /// # }
125 /// #
126 /// # struct Record {
127 /// # pub name: String,
128 /// # pub age: u16
129 /// # }
130 /// #
131 /// # impl Service<u32> for DatabaseService {
132 /// # type Response = Record;
133 /// # type Error = u8;
134 /// #
135 /// # async fn call(&self, request: u32) -> Result<Self::Response, Self::Error> {
136 /// # Ok(Record { name: "Jack".into(), age: 32 })
137 /// # }
138 /// # }
139 /// #
140 /// # fn main() {
141 /// # async {
142 /// // A service returning Result<Record, _>
143 /// let service = DatabaseService::new("127.0.0.1:8080");
144 ///
145 /// // Map the response into a new response
146 /// let mut new_service = service.map_response(|record| record.name);
147 ///
148 /// // Call the new service
149 /// let id = 13;
150 /// let name = new_service
151 /// .call(id)
152 /// .await?;
153 /// # Ok::<(), u8>(())
154 /// # };
155 /// # }
156 /// ```
157 fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
158 where
159 Self: Sized,
160 F: Fn(Self::Response) -> Response,
161 {
162 MapResponse::new(self, f)
163 }
164
165 /// Maps this service's error value to a different value.
166 ///
167 /// This method can be used to change the [`Error`] type of the service
168 /// into a different type. It is similar to the [`Result::map_err`] method.
169 ///
170 /// [`Error`]: crate::Service::Error
171 ///
172 /// # Example
173 /// ```
174 /// # use tower_async::{Service, ServiceExt};
175 /// #
176 /// # struct DatabaseService;
177 /// # impl DatabaseService {
178 /// # fn new(address: &str) -> Self {
179 /// # DatabaseService
180 /// # }
181 /// # }
182 /// #
183 /// # struct Error {
184 /// # pub code: u32,
185 /// # pub message: String
186 /// # }
187 /// #
188 /// # impl Service<u32> for DatabaseService {
189 /// # type Response = String;
190 /// # type Error = Error;
191 /// #
192 /// # async fn call(&self, request: u32) -> Result<Self::Response, Self::Error> {
193 /// # Ok(String::new())
194 /// # }
195 /// # }
196 /// #
197 /// # fn main() {
198 /// # async {
199 /// // A service returning Result<_, Error>
200 /// let service = DatabaseService::new("127.0.0.1:8080");
201 ///
202 /// // Map the error to a new error
203 /// let mut new_service = service.map_err(|err| err.code);
204 ///
205 /// // Call the new service
206 /// let id = 13;
207 /// let code = new_service
208 /// .call(id)
209 /// .await
210 /// .unwrap_err();
211 /// # Ok::<(), u32>(())
212 /// # };
213 /// # }
214 /// ```
215 fn map_err<F, Error>(self, f: F) -> MapErr<Self, F>
216 where
217 Self: Sized,
218 F: Fn(Self::Error) -> Error,
219 {
220 MapErr::new(self, f)
221 }
222
223 /// Maps this service's result type (`Result<Self::Response, Self::Error>`)
224 /// to a different value, regardless of whether the future succeeds or
225 /// fails.
226 ///
227 /// This is similar to the [`map_response`] and [`map_err`] combinators,
228 /// except that the *same* function is invoked when the service's future
229 /// completes, whether it completes successfully or fails. This function
230 /// takes the [`Result`] returned by the service's future, and returns a
231 /// [`Result`].
232 ///
233 /// Like the standard library's [`Result::and_then`], this method can be
234 /// used to implement control flow based on `Result` values. For example, it
235 /// may be used to implement error recovery, by turning some [`Err`]
236 /// responses from the service into [`Ok`] responses. Similarly, some
237 /// successful responses from the service could be rejected, by returning an
238 /// [`Err`] conditionally, depending on the value inside the [`Ok`.] Finally,
239 /// this method can also be used to implement behaviors that must run when a
240 /// service's future completes, regardless of whether it succeeded or failed.
241 ///
242 /// This method can be used to change the [`Response`] type of the service
243 /// into a different type. It can also be used to change the [`Error`] type
244 /// of the service.
245 ///
246 /// # Examples
247 ///
248 /// Recovering from certain errors:
249 ///
250 /// ```
251 /// # use tower_async::{Service, ServiceExt};
252 /// #
253 /// # struct DatabaseService;
254 /// # impl DatabaseService {
255 /// # fn new(address: &str) -> Self {
256 /// # DatabaseService
257 /// # }
258 /// # }
259 /// #
260 /// # struct Record {
261 /// # pub name: String,
262 /// # pub age: u16
263 /// # }
264 /// # #[derive(Debug)]
265 /// # enum DbError {
266 /// # Parse(std::num::ParseIntError),
267 /// # NoRecordsFound,
268 /// # }
269 /// #
270 /// # impl Service<u32> for DatabaseService {
271 /// # type Response = Vec<Record>;
272 /// # type Error = DbError;
273 /// #
274 /// # async fn call(&self, request: u32) -> Result<Self::Response, Self::Error> {
275 /// # Ok(vec![Record { name: "Jack".into(), age: 32 }])
276 /// # }
277 /// # }
278 /// #
279 /// # fn main() {
280 /// # async {
281 /// // A service returning Result<Vec<Record>, DbError>
282 /// let service = DatabaseService::new("127.0.0.1:8080");
283 ///
284 /// // If the database returns no records for the query, we just want an empty `Vec`.
285 /// let mut new_service = service.map_result(|result| match result {
286 /// // If the error indicates that no records matched the query, return an empty
287 /// // `Vec` instead.
288 /// Err(DbError::NoRecordsFound) => Ok(Vec::new()),
289 /// // Propagate all other responses (`Ok` and `Err`) unchanged
290 /// x => x,
291 /// });
292 ///
293 /// // Call the new service
294 /// let id = 13;
295 /// let name = new_service
296 /// .call(id)
297 /// .await?;
298 /// # Ok::<(), DbError>(())
299 /// # };
300 /// # }
301 /// ```
302 ///
303 /// Rejecting some `Ok` responses:
304 ///
305 /// ```
306 /// # use tower_async::{Service, ServiceExt};
307 /// #
308 /// # struct DatabaseService;
309 /// # impl DatabaseService {
310 /// # fn new(address: &str) -> Self {
311 /// # DatabaseService
312 /// # }
313 /// # }
314 /// #
315 /// # struct Record {
316 /// # pub name: String,
317 /// # pub age: u16
318 /// # }
319 /// # type DbError = String;
320 /// # type AppError = String;
321 /// #
322 /// # impl Service<u32> for DatabaseService {
323 /// # type Response = Record;
324 /// # type Error = DbError;
325 /// #
326 /// # async fn call(&self, request: u32) -> Result<Self::Response, Self::Error> {
327 /// # Ok(Record { name: "Jack".into(), age: 32 })
328 /// # }
329 /// # }
330 /// #
331 /// # fn main() {
332 /// # async {
333 /// use tower_async::BoxError;
334 ///
335 /// // A service returning Result<Record, DbError>
336 /// let service = DatabaseService::new("127.0.0.1:8080");
337 ///
338 /// // If the user is zero years old, return an error.
339 /// let mut new_service = service.map_result(|result| {
340 /// let record = result?;
341 ///
342 /// if record.age == 0 {
343 /// // Users must have been born to use our app!
344 /// let app_error = AppError::from("users cannot be 0 years old!");
345 ///
346 /// // Box the error to erase its type (as it can be an `AppError`
347 /// // *or* the inner service's `DbError`).
348 /// return Err(BoxError::from(app_error));
349 /// }
350 ///
351 /// // Otherwise, return the record.
352 /// Ok(record)
353 /// });
354 ///
355 /// // Call the new service
356 /// let id = 13;
357 /// let record = new_service
358 /// .call(id)
359 /// .await?;
360 /// # Ok::<(), BoxError>(())
361 /// # };
362 /// # }
363 /// ```
364 ///
365 /// Performing an action that must be run for both successes and failures:
366 ///
367 /// ```
368 /// # use std::convert::TryFrom;
369 /// # use tower_async::{Service, ServiceExt};
370 /// #
371 /// # struct DatabaseService;
372 /// # impl DatabaseService {
373 /// # fn new(address: &str) -> Self {
374 /// # DatabaseService
375 /// # }
376 /// # }
377 /// #
378 /// # impl Service<u32> for DatabaseService {
379 /// # type Response = String;
380 /// # type Error = u8;
381 /// #
382 /// # async fn call(&self, request: u32) -> Result<Self::Response, Self::Error> {
383 /// # Ok(String::new())
384 /// # }
385 /// # }
386 /// #
387 /// # fn main() {
388 /// # async {
389 /// // A service returning Result<Record, DbError>
390 /// let service = DatabaseService::new("127.0.0.1:8080");
391 ///
392 /// // Print a message whenever a query completes.
393 /// let mut new_service = service.map_result(|result| {
394 /// println!("query completed; success={}", result.is_ok());
395 /// result
396 /// });
397 ///
398 /// // Call the new service
399 /// let id = 13;
400 /// let response = new_service
401 /// .call(id)
402 /// .await;
403 /// # response
404 /// # };
405 /// # }
406 /// ```
407 ///
408 /// [`map_response`]: ServiceExt::map_response
409 /// [`map_err`]: ServiceExt::map_err
410 /// [`map_result`]: ServiceExt::map_result
411 /// [`Error`]: crate::Service::Error
412 /// [`Response`]: crate::Service::Response
413 /// [`BoxError`]: crate::BoxError
414 fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
415 where
416 Self: Sized,
417 Error: From<Self::Error>,
418 F: Fn(Result<Self::Response, Self::Error>) -> Result<Response, Error>,
419 {
420 MapResult::new(self, f)
421 }
422
423 /// Composes a function *in front of* the service.
424 ///
425 /// This adapter produces a new service that passes each value through the
426 /// given function `f` before sending it to `self`.
427 ///
428 /// # Example
429 /// ```
430 /// # use std::convert::TryFrom;
431 /// # use tower_async::{Service, ServiceExt};
432 /// #
433 /// # struct DatabaseService;
434 /// # impl DatabaseService {
435 /// # fn new(address: &str) -> Self {
436 /// # DatabaseService
437 /// # }
438 /// # }
439 /// #
440 /// # impl Service<String> for DatabaseService {
441 /// # type Response = String;
442 /// # type Error = u8;
443 /// #
444 /// # async fn call(&self, request: String) -> Result<Self::Response, Self::Error> {
445 /// # Ok(String::new())
446 /// # }
447 /// # }
448 /// #
449 /// # fn main() {
450 /// # async {
451 /// // A service taking a String as a request
452 /// let service = DatabaseService::new("127.0.0.1:8080");
453 ///
454 /// // Map the request to a new request
455 /// let mut new_service = service.map_request(|id: u32| id.to_string());
456 ///
457 /// // Call the new service
458 /// let id = 13;
459 /// let response = new_service
460 /// .call(id)
461 /// .await;
462 /// # response
463 /// # };
464 /// # }
465 /// ```
466 fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F>
467 where
468 Self: Sized,
469 F: Fn(NewRequest) -> Request,
470 {
471 MapRequest::new(self, f)
472 }
473
474 /// Composes this service with a [`Filter`] that conditionally accepts or
475 /// rejects requests based on a [predicate].
476 ///
477 /// This adapter produces a new service that passes each value through the
478 /// given function `predicate` before sending it to `self`.
479 ///
480 /// # Example
481 /// ```
482 /// # use std::convert::TryFrom;
483 /// # use tower_async::{Service, ServiceExt};
484 /// #
485 /// # struct DatabaseService;
486 /// # impl DatabaseService {
487 /// # fn new(address: &str) -> Self {
488 /// # DatabaseService
489 /// # }
490 /// # }
491 /// #
492 /// # #[derive(Debug)] enum DbError {
493 /// # Parse(std::num::ParseIntError)
494 /// # }
495 /// #
496 /// # impl std::fmt::Display for DbError {
497 /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
498 /// # }
499 /// # impl std::error::Error for DbError {}
500 /// # impl Service<u32> for DatabaseService {
501 /// # type Response = String;
502 /// # type Error = DbError;
503 /// #
504 /// # async fn call(&self, request: u32) -> Result<Self::Response, Self::Error> {
505 /// # Ok(String::new())
506 /// # }
507 /// # }
508 /// #
509 /// # fn main() {
510 /// # async {
511 /// // A service taking a u32 as a request and returning Result<_, DbError>
512 /// let service = DatabaseService::new("127.0.0.1:8080");
513 ///
514 /// // Fallibly map the request to a new request
515 /// let mut new_service = service
516 /// .filter(|id_str: &str| id_str.parse().map_err(DbError::Parse));
517 ///
518 /// // Call the new service
519 /// let id = "13";
520 /// let response = new_service
521 /// .call(id)
522 /// .await;
523 /// # response
524 /// # };
525 /// # }
526 /// ```
527 ///
528 /// [`Filter`]: crate::filter::Filter
529 /// [predicate]: crate::filter::Predicate
530 #[cfg(feature = "filter")]
531 fn filter<F, NewRequest>(self, filter: F) -> crate::filter::Filter<Self, F>
532 where
533 Self: Sized,
534 F: crate::filter::Predicate<NewRequest>,
535 {
536 crate::filter::Filter::new(self, filter)
537 }
538
539 /// Composes this service with an [`AsyncFilter`] that conditionally accepts or
540 /// rejects requests based on an [async predicate].
541 ///
542 /// This adapter produces a new service that passes each value through the
543 /// given function `predicate` before sending it to `self`.
544 ///
545 /// # Example
546 /// ```
547 /// # use std::convert::TryFrom;
548 /// # use tower_async::{Service, ServiceExt};
549 /// #
550 /// # #[derive(Clone)] struct DatabaseService;
551 /// # impl DatabaseService {
552 /// # fn new(address: &str) -> Self {
553 /// # DatabaseService
554 /// # }
555 /// # }
556 /// # #[derive(Debug)]
557 /// # enum DbError {
558 /// # Rejected
559 /// # }
560 /// # impl std::fmt::Display for DbError {
561 /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
562 /// # }
563 /// # impl std::error::Error for DbError {}
564 /// #
565 /// # impl Service<u32> for DatabaseService {
566 /// # type Response = String;
567 /// # type Error = DbError;
568 /// #
569 /// # async fn call(&self, request: u32) -> Result<Self::Response, Self::Error> {
570 /// # Ok(String::new())
571 /// # }
572 /// # }
573 /// #
574 /// # fn main() {
575 /// # async {
576 /// // A service taking a u32 as a request and returning Result<_, DbError>
577 /// let service = DatabaseService::new("127.0.0.1:8080");
578 ///
579 /// /// Returns `true` if we should query the database for an ID.
580 /// async fn should_query(id: u32) -> bool {
581 /// // ...
582 /// # true
583 /// }
584 ///
585 /// // Filter requests based on `should_query`.
586 /// let mut new_service = service
587 /// .filter_async(|id: u32| async move {
588 /// if should_query(id).await {
589 /// return Ok(id);
590 /// }
591 ///
592 /// Err(DbError::Rejected)
593 /// });
594 ///
595 /// // Call the new service
596 /// let id = 13;
597 /// # let id: u32 = id;
598 /// let response = new_service
599 /// .call(id)
600 /// .await;
601 /// # response
602 /// # };
603 /// # }
604 /// ```
605 ///
606 /// [`AsyncFilter`]: crate::filter::AsyncFilter
607 /// [asynchronous predicate]: crate::filter::AsyncPredicate
608 #[cfg(feature = "filter")]
609 fn filter_async<F, NewRequest>(self, filter: F) -> crate::filter::AsyncFilter<Self, F>
610 where
611 Self: Sized,
612 F: crate::filter::AsyncPredicate<NewRequest>,
613 {
614 crate::filter::AsyncFilter::new(self, filter)
615 }
616
617 /// Composes an asynchronous function *after* this service.
618 ///
619 /// This takes a function or closure returning a future, and returns a new
620 /// `Service` that chains that function after this service's Future. The
621 /// new `Service`'s future will consist of this service's future, followed
622 /// by the future returned by calling the chained function with the future's
623 /// [`Output`] type. The chained function is called regardless of whether
624 /// this service's future completes with a successful response or with an
625 /// error.
626 ///
627 /// This method can be thought of as an equivalent to the [`futures`
628 /// crate]'s [`FutureExt::then`] combinator, but acting on `Service`s that
629 /// _return_ futures, rather than on an individual future. Similarly to that
630 /// combinator, [`ServiceExt::then`] can be used to implement asynchronous
631 /// error recovery, by calling some asynchronous function with errors
632 /// returned by this service. Alternatively, it may also be used to call a
633 /// fallible async function with the successful response of this service.
634 ///
635 /// This method can be used to change the [`Response`] type of the service
636 /// into a different type. It can also be used to change the [`Error`] type
637 /// of the service.
638 ///
639 /// # Examples
640 ///
641 /// ```
642 /// # use tower_async::{Service, ServiceExt};
643 /// #
644 /// # struct DatabaseService;
645 /// # impl DatabaseService {
646 /// # fn new(address: &str) -> Self {
647 /// # DatabaseService
648 /// # }
649 /// # }
650 /// #
651 /// # type Record = ();
652 /// # type DbError = ();
653 /// #
654 /// # impl Service<u32> for DatabaseService {
655 /// # type Response = Record;
656 /// # type Error = DbError;
657 /// #
658 /// # async fn call(&self, request: u32) -> Result<Self::Response, Self::Error> {
659 /// # Ok(())
660 /// # }
661 /// # }
662 /// #
663 /// # fn main() {
664 /// // A service returning Result<Record, DbError>
665 /// let service = DatabaseService::new("127.0.0.1:8080");
666 ///
667 /// // An async function that attempts to recover from errors returned by the
668 /// // database.
669 /// async fn recover_from_error(error: DbError) -> Result<Record, DbError> {
670 /// // ...
671 /// # Ok(())
672 /// }
673 /// # async {
674 ///
675 /// // If the database service returns an error, attempt to recover by
676 /// // calling `recover_from_error`. Otherwise, return the successful response.
677 /// let new_service = service.then(|result| async move {
678 /// match result {
679 /// Ok(record) => Ok(record),
680 /// Err(e) => recover_from_error(e).await,
681 /// }
682 /// });
683 ///
684 /// // Call the new service
685 /// let id = 13;
686 /// let record = new_service
687 /// .call(id)
688 /// .await?;
689 /// # Ok::<(), DbError>(())
690 /// # };
691 /// # }
692 /// ```
693 ///
694 /// [`Output`]: std::future::Future::Output
695 /// [`futures` crate]: https://docs.rs/futures
696 /// [`FutureExt::then`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.then
697 /// [`Error`]: crate::Service::Error
698 /// [`Response`]: crate::Service::Response
699 /// [`BoxError`]: crate::BoxError
700 fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F>
701 where
702 Self: Sized,
703 Error: From<Self::Error>,
704 F: Fn(Result<Self::Response, Self::Error>) -> Fut,
705 Fut: Future<Output = Result<Response, Error>>,
706 {
707 Then::new(self, f)
708 }
709}
710
711impl<T: ?Sized, Request> ServiceExt<Request> for T where T: tower_async_service::Service<Request> {}
712
713/// Convert an `Option<Layer>` into a [`Layer`].
714///
715/// ```
716/// # use std::time::Duration;
717/// # use tower_async::Service;
718/// # use tower_async::builder::ServiceBuilder;
719/// use tower_async::util::option_layer;
720/// # use tower_async::timeout::TimeoutLayer;
721/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send {
722/// # let timeout = Some(Duration::new(10, 0));
723/// // Layer to apply a timeout if configured
724/// let maybe_timeout = option_layer(timeout.map(TimeoutLayer::new));
725///
726/// ServiceBuilder::new()
727/// .layer(maybe_timeout)
728/// .service(svc);
729/// # }
730/// ```
731///
732/// [`Layer`]: crate::layer::Layer
733pub fn option_layer<L>(layer: Option<L>) -> Either<L, Identity> {
734 if let Some(layer) = layer {
735 Either::Left(layer)
736 } else {
737 Either::Right(Identity::new())
738 }
739}