wreq_util/tower/delay/
service.rs1use std::{
2 task::{Context, Poll},
3 time::Duration,
4};
5
6use tower::{BoxError, Service};
7
8use super::{future::ResponseFuture, jittered_duration};
9
10#[derive(Debug, Clone)]
12pub struct Delay<S> {
13 inner: S,
14 delay: Duration,
15}
16
17#[derive(Clone, Debug)]
22pub struct DelayWith<S, P> {
23 inner: Delay<S>,
24 predicate: P,
25}
26
27#[derive(Clone, Debug)]
32pub struct JitterDelay<S> {
33 inner: S,
34 base: Duration,
35 pct: f64,
36}
37
38#[derive(Clone, Debug)]
43pub struct JitterDelayWith<S, P> {
44 inner: JitterDelay<S>,
45 predicate: P,
46}
47
48impl<S> Delay<S> {
51 #[inline]
53 pub fn new(inner: S, delay: Duration) -> Self {
54 Delay { inner, delay }
55 }
56}
57
58impl<S, Request> Service<Request> for Delay<S>
59where
60 S: Service<Request>,
61 S::Error: Into<BoxError>,
62{
63 type Response = S::Response;
64 type Error = BoxError;
65 type Future = ResponseFuture<S::Future>;
66
67 #[inline]
68 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
69 self.inner.poll_ready(cx).map_err(Into::into)
70 }
71
72 fn call(&mut self, req: Request) -> Self::Future {
73 let response = self.inner.call(req);
74 let sleep = tokio::time::sleep(self.delay);
75 ResponseFuture::new(response, sleep)
76 }
77}
78
79impl<S, P> DelayWith<S, P> {
82 #[inline]
84 pub fn new(inner: S, delay: Duration, predicate: P) -> Self {
85 Self {
86 inner: Delay::new(inner, delay),
87 predicate,
88 }
89 }
90}
91
92impl<S, Req, P> Service<Req> for DelayWith<S, P>
93where
94 S: Service<Req>,
95 S::Error: Into<BoxError>,
96 P: Fn(&Req) -> bool,
97{
98 type Response = S::Response;
99 type Error = BoxError;
100 type Future = ResponseFuture<S::Future>;
101
102 #[inline]
103 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
104 self.inner.poll_ready(cx).map_err(Into::into)
105 }
106
107 fn call(&mut self, req: Req) -> Self::Future {
108 if !(self.predicate)(&req) {
109 self.inner.delay = Duration::ZERO;
110 }
111 self.inner.call(req)
112 }
113}
114
115impl<S> JitterDelay<S> {
118 #[inline]
120 pub fn new(inner: S, base: Duration, pct: f64) -> Self {
121 Self {
122 inner,
123 base,
124 pct: pct.clamp(0.0, 1.0),
125 }
126 }
127}
128
129impl<S, Req> Service<Req> for JitterDelay<S>
130where
131 S: Service<Req>,
132 S::Error: Into<BoxError>,
133{
134 type Response = S::Response;
135 type Error = BoxError;
136 type Future = ResponseFuture<S::Future>;
137
138 #[inline]
139 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
140 self.inner.poll_ready(cx).map_err(Into::into)
141 }
142
143 fn call(&mut self, req: Req) -> Self::Future {
144 let delay = jittered_duration(self.base, self.pct);
145 let sleep = tokio::time::sleep(delay);
146 let fut = self.inner.call(req);
147 ResponseFuture::new(fut, sleep)
148 }
149}
150
151impl<S, P> JitterDelayWith<S, P> {
154 #[inline]
156 pub fn new(inner: S, base: Duration, pct: f64, predicate: P) -> Self {
157 Self {
158 inner: JitterDelay::new(inner, base, pct),
159 predicate,
160 }
161 }
162}
163
164impl<S, Req, P> Service<Req> for JitterDelayWith<S, P>
165where
166 S: Service<Req>,
167 S::Error: Into<BoxError>,
168 P: Fn(&Req) -> bool,
169{
170 type Response = S::Response;
171 type Error = BoxError;
172 type Future = ResponseFuture<S::Future>;
173
174 #[inline]
175 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
176 self.inner.poll_ready(cx).map_err(Into::into)
177 }
178
179 fn call(&mut self, req: Req) -> Self::Future {
180 let delay = if (self.predicate)(&req) {
181 jittered_duration(self.inner.base, self.inner.pct)
182 } else {
183 Duration::ZERO
184 };
185
186 let sleep = tokio::time::sleep(delay);
187 let fut = self.inner.inner.call(req);
188 ResponseFuture::new(fut, sleep)
189 }
190}