1use pin_project_lite::pin_project;
2use std::time::Duration;
3use std::{
4 future::Future,
5 pin::Pin,
6 task::{ready, Context, Poll},
7};
8use tower_service::Service;
9
10use crate::util::Oneshot;
11
12pub trait Policy<Request> {
14 fn delay(&self, req: &Request) -> Duration;
15}
16
17#[derive(Debug)]
20pub struct Delay<P, S> {
21 policy: P,
22 service: S,
23}
24
25pin_project! {
26 #[derive(Debug)]
27 pub struct ResponseFuture<Request, S>
28 where
29 S: Service<Request>,
30 {
31 service: Option<S>,
32 #[pin]
33 state: State<Request, Oneshot<S, Request>>,
34 }
35}
36
37pin_project! {
38 #[project = StateProj]
39 #[derive(Debug)]
40 enum State<Request, F> {
41 Delaying {
42 #[pin]
43 delay: tokio::time::Sleep,
44 req: Option<Request>,
45 },
46 Called {
47 #[pin]
48 fut: F,
49 },
50 }
51}
52
53impl<Request, F> State<Request, F> {
54 fn delaying(delay: tokio::time::Sleep, req: Option<Request>) -> Self {
55 Self::Delaying { delay, req }
56 }
57
58 fn called(fut: F) -> Self {
59 Self::Called { fut }
60 }
61}
62
63impl<P, S> Delay<P, S> {
64 pub const fn new<Request>(policy: P, service: S) -> Self
65 where
66 P: Policy<Request>,
67 S: Service<Request> + Clone,
68 S::Error: Into<crate::BoxError>,
69 {
70 Delay { policy, service }
71 }
72}
73
74impl<Request, P, S> Service<Request> for Delay<P, S>
75where
76 P: Policy<Request>,
77 S: Service<Request> + Clone,
78 S::Error: Into<crate::BoxError>,
79{
80 type Response = S::Response;
81 type Error = crate::BoxError;
82 type Future = ResponseFuture<Request, S>;
83
84 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85 Poll::Ready(Ok(()))
89 }
90
91 fn call(&mut self, request: Request) -> Self::Future {
92 let delay = self.policy.delay(&request);
93 ResponseFuture {
94 service: Some(self.service.clone()),
95 state: State::delaying(tokio::time::sleep(delay), Some(request)),
96 }
97 }
98}
99
100impl<Request, S, T, E> Future for ResponseFuture<Request, S>
101where
102 E: Into<crate::BoxError>,
103 S: Service<Request, Response = T, Error = E>,
104{
105 type Output = Result<T, crate::BoxError>;
106
107 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
108 let mut this = self.project();
109
110 loop {
111 match this.state.as_mut().project() {
112 StateProj::Delaying { delay, req } => {
113 ready!(delay.poll(cx));
114 let req = req.take().expect("Missing request in delay");
115 let svc = this.service.take().expect("Missing service in delay");
116 let fut = Oneshot::new(svc, req);
117 this.state.set(State::called(fut));
118 }
119 StateProj::Called { fut } => {
120 return fut.poll(cx).map_err(Into::into);
121 }
122 };
123 }
124 }
125}