1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
//! Background readiness types

use crate::error::Error;
use futures_core::ready;
use pin_project::pin_project;
use std::marker::PhantomData;
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};
use tokio_executor::TypedExecutor;
use tokio_sync::oneshot;
use tower_service::Service;

/// Drives a service to readiness.
#[pin_project]
#[derive(Debug)]
pub struct BackgroundReady<T, Request> {
    service: Option<T>,
    tx: Option<oneshot::Sender<Result<T, Error>>>,
    _req: PhantomData<Request>,
}

/// This trait allows you to use either Tokio's threaded runtime's executor or
/// the `current_thread` runtime's executor depending on if `T` is `Send` or
/// `!Send`.
pub trait BackgroundReadyExecutor<T, Request>: TypedExecutor<BackgroundReady<T, Request>>
where
    T: Service<Request>,
    T::Error: Into<Error>,
{
}

impl<T, Request, E> BackgroundReadyExecutor<T, Request> for E
where
    E: TypedExecutor<BackgroundReady<T, Request>>,
    T: Service<Request>,
    T::Error: Into<Error>,
{
}

pub(crate) fn background_ready<T, Request>(
    service: T,
) -> (
    BackgroundReady<T, Request>,
    oneshot::Receiver<Result<T, Error>>,
)
where
    T: Service<Request>,
    T::Error: Into<Error>,
{
    let (tx, rx) = oneshot::channel();
    let bg = BackgroundReady {
        service: Some(service),
        tx: Some(tx),
        _req: PhantomData,
    };
    (bg, rx)
}

impl<T, Request> Future for BackgroundReady<T, Request>
where
    T: Service<Request>,
    T::Error: Into<Error>,
{
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();

        if let Poll::Ready(_) = Pin::new(this.tx.as_mut().expect("illegal state")).poll_closed(cx) {
            return Poll::Ready(());
        }

        let result = ready!(this.service.as_mut().expect("illegal state").poll_ready(cx))
            .map(|()| this.service.take().expect("illegal state"));

        let _ = this
            .tx
            .take()
            .expect("illegal state")
            .send(result.map_err(Into::into));

        Poll::Ready(())
    }
}