tower/hedge/
delay.rs

1use pin_project_lite::pin_project;
2use std::time::Duration;
3use std::{
4    future::Future,
5    pin::Pin,
6    task::{ready, Context, Poll},
7};
8use tower_service::Service;
9
10use crate::util::Oneshot;
11
12/// A policy which specifies how long each request should be delayed for.
13pub trait Policy<Request> {
14    fn delay(&self, req: &Request) -> Duration;
15}
16
17/// A middleware which delays sending the request to the underlying service
18/// for an amount of time specified by the policy.
19#[derive(Debug)]
20pub struct Delay<P, S> {
21    policy: P,
22    service: S,
23}
24
25pin_project! {
26    #[derive(Debug)]
27    pub struct ResponseFuture<Request, S>
28    where
29        S: Service<Request>,
30    {
31        service: Option<S>,
32        #[pin]
33        state: State<Request, Oneshot<S, Request>>,
34    }
35}
36
37pin_project! {
38    #[project = StateProj]
39    #[derive(Debug)]
40    enum State<Request, F> {
41        Delaying {
42            #[pin]
43            delay: tokio::time::Sleep,
44            req: Option<Request>,
45        },
46        Called {
47            #[pin]
48            fut: F,
49        },
50    }
51}
52
53impl<Request, F> State<Request, F> {
54    fn delaying(delay: tokio::time::Sleep, req: Option<Request>) -> Self {
55        Self::Delaying { delay, req }
56    }
57
58    fn called(fut: F) -> Self {
59        Self::Called { fut }
60    }
61}
62
63impl<P, S> Delay<P, S> {
64    pub const fn new<Request>(policy: P, service: S) -> Self
65    where
66        P: Policy<Request>,
67        S: Service<Request> + Clone,
68        S::Error: Into<crate::BoxError>,
69    {
70        Delay { policy, service }
71    }
72}
73
74impl<Request, P, S> Service<Request> for Delay<P, S>
75where
76    P: Policy<Request>,
77    S: Service<Request> + Clone,
78    S::Error: Into<crate::BoxError>,
79{
80    type Response = S::Response;
81    type Error = crate::BoxError;
82    type Future = ResponseFuture<Request, S>;
83
84    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85        // Calling self.service.poll_ready would reserve a slot for the delayed request,
86        // potentially well in advance of actually making it.  Instead, signal readiness here and
87        // treat the service as a Oneshot in the future.
88        Poll::Ready(Ok(()))
89    }
90
91    fn call(&mut self, request: Request) -> Self::Future {
92        let delay = self.policy.delay(&request);
93        ResponseFuture {
94            service: Some(self.service.clone()),
95            state: State::delaying(tokio::time::sleep(delay), Some(request)),
96        }
97    }
98}
99
100impl<Request, S, T, E> Future for ResponseFuture<Request, S>
101where
102    E: Into<crate::BoxError>,
103    S: Service<Request, Response = T, Error = E>,
104{
105    type Output = Result<T, crate::BoxError>;
106
107    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
108        let mut this = self.project();
109
110        loop {
111            match this.state.as_mut().project() {
112                StateProj::Delaying { delay, req } => {
113                    ready!(delay.poll(cx));
114                    let req = req.take().expect("Missing request in delay");
115                    let svc = this.service.take().expect("Missing service in delay");
116                    let fut = Oneshot::new(svc, req);
117                    this.state.set(State::called(fut));
118                }
119                StateProj::Called { fut } => {
120                    return fut.poll(cx).map_err(Into::into);
121                }
122            };
123        }
124    }
125}