tower_spawn_ready/
future.rs

1//! Background readiness types
2
3use crate::Error;
4use futures_core::ready;
5use pin_project::pin_project;
6use std::marker::PhantomData;
7use std::{
8    future::Future,
9    pin::Pin,
10    task::{Context, Poll},
11};
12use tokio::sync::oneshot;
13use tower_service::Service;
14
15/// Drives a service to readiness.
16#[pin_project]
17#[derive(Debug)]
18pub struct BackgroundReady<T, Request> {
19    service: Option<T>,
20    tx: Option<oneshot::Sender<Result<T, Error>>>,
21    _req: PhantomData<Request>,
22}
23
24pub(crate) fn background_ready<T, Request>(
25    service: T,
26) -> (
27    BackgroundReady<T, Request>,
28    oneshot::Receiver<Result<T, Error>>,
29)
30where
31    T: Service<Request>,
32    T::Error: Into<Error>,
33{
34    let (tx, rx) = oneshot::channel();
35    let bg = BackgroundReady {
36        service: Some(service),
37        tx: Some(tx),
38        _req: PhantomData,
39    };
40    (bg, rx)
41}
42
43impl<T, Request> Future for BackgroundReady<T, Request>
44where
45    T: Service<Request>,
46    T::Error: Into<Error>,
47{
48    type Output = ();
49
50    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
51        let this = self.project();
52
53        if let Poll::Ready(_) = Pin::new(this.tx.as_mut().expect("illegal state")).poll_closed(cx) {
54            return Poll::Ready(());
55        }
56
57        let result = ready!(this.service.as_mut().expect("illegal state").poll_ready(cx))
58            .map(|()| this.service.take().expect("illegal state"));
59
60        let _ = this
61            .tx
62            .take()
63            .expect("illegal state")
64            .send(result.map_err(Into::into));
65
66        Poll::Ready(())
67    }
68}