tower_spawn_ready/
service.rs1use crate::{future::background_ready, Error};
2use futures_core::ready;
3use futures_util::future::{MapErr, TryFutureExt};
4use std::{
5 future::Future,
6 pin::Pin,
7 task::{Context, Poll},
8};
9use tokio::sync::oneshot;
10use tower_service::Service;
11
12#[derive(Debug)]
16pub struct SpawnReady<T> {
17 inner: Inner<T>,
18}
19
20#[derive(Debug)]
21enum Inner<T> {
22 Service(Option<T>),
23 Future(oneshot::Receiver<Result<T, Error>>),
24}
25
26impl<T> SpawnReady<T> {
27 pub fn new(service: T) -> Self {
29 Self {
30 inner: Inner::Service(Some(service)),
31 }
32 }
33}
34
35impl<T, Request> Service<Request> for SpawnReady<T>
36where
37 T: Service<Request> + Send + 'static,
38 T::Error: Into<Error>,
39 Request: Send + 'static,
40{
41 type Response = T::Response;
42 type Error = Error;
43 type Future = MapErr<T::Future, fn(T::Error) -> Error>;
44
45 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
46 loop {
47 self.inner = match self.inner {
48 Inner::Service(ref mut svc) => {
49 if let Poll::Ready(r) = svc.as_mut().expect("illegal state").poll_ready(cx) {
50 return Poll::Ready(r.map_err(Into::into));
51 }
52
53 let (bg, rx) = background_ready(svc.take().expect("illegal state"));
54
55 tokio::spawn(bg);
56
57 Inner::Future(rx)
58 }
59 Inner::Future(ref mut fut) => {
60 let svc = ready!(Pin::new(fut).poll(cx))??;
61 Inner::Service(Some(svc))
62 }
63 }
64 }
65 }
66
67 fn call(&mut self, request: Request) -> Self::Future {
68 match self.inner {
69 Inner::Service(Some(ref mut svc)) => svc.call(request).map_err(Into::into),
70 _ => unreachable!("poll_ready must be called"),
71 }
72 }
73}