tower 0.4.13

Tower is a library of modular and reusable components for building robust clients and servers.
Documentation
use futures_util::ready;
use pin_project_lite::pin_project;
use std::time::Duration;
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};
use tower_service::Service;

use crate::util::Oneshot;

/// A policy which specifies how long each request should be delayed for.
pub trait Policy<Request> {
    fn delay(&self, req: &Request) -> Duration;
}

/// A middleware which delays sending the request to the underlying service
/// for an amount of time specified by the policy.
#[derive(Debug)]
pub struct Delay<P, S> {
    policy: P,
    service: S,
}

pin_project! {
    #[derive(Debug)]
    pub struct ResponseFuture<Request, S>
    where
        S: Service<Request>,
    {
        service: Option<S>,
        #[pin]
        state: State<Request, Oneshot<S, Request>>,
    }
}

pin_project! {
    #[project = StateProj]
    #[derive(Debug)]
    enum State<Request, F> {
        Delaying {
            #[pin]
            delay: tokio::time::Sleep,
            req: Option<Request>,
        },
        Called {
            #[pin]
            fut: F,
        },
    }
}

impl<Request, F> State<Request, F> {
    fn delaying(delay: tokio::time::Sleep, req: Option<Request>) -> Self {
        Self::Delaying { delay, req }
    }

    fn called(fut: F) -> Self {
        Self::Called { fut }
    }
}

impl<P, S> Delay<P, S> {
    pub fn new<Request>(policy: P, service: S) -> Self
    where
        P: Policy<Request>,
        S: Service<Request> + Clone,
        S::Error: Into<crate::BoxError>,
    {
        Delay { policy, service }
    }
}

impl<Request, P, S> Service<Request> for Delay<P, S>
where
    P: Policy<Request>,
    S: Service<Request> + Clone,
    S::Error: Into<crate::BoxError>,
{
    type Response = S::Response;
    type Error = crate::BoxError;
    type Future = ResponseFuture<Request, S>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // Calling self.service.poll_ready would reserve a slot for the delayed request,
        // potentially well in advance of actually making it.  Instead, signal readiness here and
        // treat the service as a Oneshot in the future.
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, request: Request) -> Self::Future {
        let delay = self.policy.delay(&request);
        ResponseFuture {
            service: Some(self.service.clone()),
            state: State::delaying(tokio::time::sleep(delay), Some(request)),
        }
    }
}

impl<Request, S, T, E> Future for ResponseFuture<Request, S>
where
    E: Into<crate::BoxError>,
    S: Service<Request, Response = T, Error = E>,
{
    type Output = Result<T, crate::BoxError>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.project();

        loop {
            match this.state.as_mut().project() {
                StateProj::Delaying { delay, req } => {
                    ready!(delay.poll(cx));
                    let req = req.take().expect("Missing request in delay");
                    let svc = this.service.take().expect("Missing service in delay");
                    let fut = Oneshot::new(svc, req);
                    this.state.set(State::called(fut));
                }
                StateProj::Called { fut } => {
                    return fut.poll(cx).map_err(Into::into);
                }
            };
        }
    }
}