tower_spawn_ready/
service.rs

1use crate::{future::background_ready, Error};
2use futures_core::ready;
3use futures_util::future::{MapErr, TryFutureExt};
4use std::{
5    future::Future,
6    pin::Pin,
7    task::{Context, Poll},
8};
9use tokio::sync::oneshot;
10use tower_service::Service;
11
12/// Spawns tasks to drive an inner service to readiness.
13///
14/// See crate level documentation for more details.
15#[derive(Debug)]
16pub struct SpawnReady<T> {
17    inner: Inner<T>,
18}
19
20#[derive(Debug)]
21enum Inner<T> {
22    Service(Option<T>),
23    Future(oneshot::Receiver<Result<T, Error>>),
24}
25
26impl<T> SpawnReady<T> {
27    /// Creates a new `SpawnReady` wrapping `service`.
28    pub fn new(service: T) -> Self {
29        Self {
30            inner: Inner::Service(Some(service)),
31        }
32    }
33}
34
35impl<T, Request> Service<Request> for SpawnReady<T>
36where
37    T: Service<Request> + Send + 'static,
38    T::Error: Into<Error>,
39    Request: Send + 'static,
40{
41    type Response = T::Response;
42    type Error = Error;
43    type Future = MapErr<T::Future, fn(T::Error) -> Error>;
44
45    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
46        loop {
47            self.inner = match self.inner {
48                Inner::Service(ref mut svc) => {
49                    if let Poll::Ready(r) = svc.as_mut().expect("illegal state").poll_ready(cx) {
50                        return Poll::Ready(r.map_err(Into::into));
51                    }
52
53                    let (bg, rx) = background_ready(svc.take().expect("illegal state"));
54
55                    tokio::spawn(bg);
56
57                    Inner::Future(rx)
58                }
59                Inner::Future(ref mut fut) => {
60                    let svc = ready!(Pin::new(fut).poll(cx))??;
61                    Inner::Service(Some(svc))
62                }
63            }
64        }
65    }
66
67    fn call(&mut self, request: Request) -> Self::Future {
68        match self.inner {
69            Inner::Service(Some(ref mut svc)) => svc.call(request).map_err(Into::into),
70            _ => unreachable!("poll_ready must be called"),
71        }
72    }
73}