Skip to main content

hpx_util/tower/delay/
future.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll, ready},
4};
5
6use pin_project_lite::pin_project;
7use tokio::time::Sleep;
8use tower::{BoxError, Service};
9
10pin_project! {
11    /// Response future for delay middleware.
12    #[derive(Debug)]
13    #[project = ResponseFutureProj]
14    pub enum ResponseFuture<S, Req>
15    where
16        S: Service<Req>,
17    {
18        /// Waiting for the configured delay before issuing the request.
19        Delaying {
20            #[pin]
21            sleep: Sleep,
22            service: Option<S>,
23            request: Option<Req>,
24        },
25        /// Request has been dispatched to inner service.
26        Calling {
27            #[pin]
28            response: S::Future,
29        },
30    }
31}
32
33impl<S, Req> ResponseFuture<S, Req>
34where
35    S: Service<Req>,
36{
37    #[inline]
38    pub(crate) fn new(service: S, request: Req, sleep: Sleep) -> Self {
39        Self::Delaying {
40            sleep,
41            service: Some(service),
42            request: Some(request),
43        }
44    }
45}
46
47impl<S, Req> Future for ResponseFuture<S, Req>
48where
49    S: Service<Req>,
50    S::Error: Into<BoxError>,
51{
52    type Output = Result<S::Response, BoxError>;
53
54    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
55        let mut this = self.as_mut().project();
56
57        loop {
58            match this {
59                ResponseFutureProj::Delaying {
60                    sleep,
61                    service,
62                    request,
63                } => {
64                    ready!(sleep.poll(cx));
65                    let mut inner = service
66                        .take()
67                        .expect("delay future polled after service taken");
68                    let req = request
69                        .take()
70                        .expect("delay future polled after request taken");
71                    let response = inner.call(req);
72                    self.set(Self::Calling { response });
73                    this = self.as_mut().project();
74                }
75                ResponseFutureProj::Calling { response } => {
76                    return response.poll(cx).map_err(Into::into);
77                }
78            }
79        }
80    }
81}