wreq_util/tower/delay/
service.rs

1use std::{
2    task::{Context, Poll},
3    time::Duration,
4};
5
6use tower::{BoxError, Service};
7
8use super::{future::ResponseFuture, jittered_duration};
9
10/// A Tower [`Service`] that introduces a fixed delay before each request.
11#[derive(Debug, Clone)]
12pub struct Delay<S> {
13    inner: S,
14    delay: Duration,
15}
16
17/// A Tower [`Service`] that conditionally applies fixed delay based on a predicate.
18///
19/// Requests that match the predicate will have the delay applied;
20/// other requests pass through immediately.
21#[derive(Clone, Debug)]
22pub struct DelayWith<S, P> {
23    inner: Delay<S>,
24    predicate: P,
25}
26
27/// A Tower [`Service`] that applies jittered delay to requests.
28///
29/// This service wraps an inner service and introduces a random delay
30/// (within a configured range) before each request.
31#[derive(Clone, Debug)]
32pub struct JitterDelay<S> {
33    inner: S,
34    base: Duration,
35    pct: f64,
36}
37
38/// A Tower [`Service`] that conditionally applies jittered delay based on a predicate.
39///
40/// Requests that match the predicate will have a jittered delay applied;
41/// other requests pass through immediately.
42#[derive(Clone, Debug)]
43pub struct JitterDelayWith<S, P> {
44    inner: JitterDelay<S>,
45    predicate: P,
46}
47
48// ===== impl Delay =====
49
50impl<S> Delay<S> {
51    /// Create a new [`Delay`] service wrapping the given inner service
52    #[inline]
53    pub fn new(inner: S, delay: Duration) -> Self {
54        Delay { inner, delay }
55    }
56}
57
58impl<S, Request> Service<Request> for Delay<S>
59where
60    S: Service<Request>,
61    S::Error: Into<BoxError>,
62{
63    type Response = S::Response;
64    type Error = BoxError;
65    type Future = ResponseFuture<S::Future>;
66
67    #[inline]
68    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
69        self.inner.poll_ready(cx).map_err(Into::into)
70    }
71
72    fn call(&mut self, req: Request) -> Self::Future {
73        let response = self.inner.call(req);
74        let sleep = tokio::time::sleep(self.delay);
75        ResponseFuture::new(response, sleep)
76    }
77}
78
79// ===== impl DelayWith =====
80
81impl<S, P> DelayWith<S, P> {
82    /// Creates a new [`DelayWith`].
83    #[inline]
84    pub fn new(inner: S, delay: Duration, predicate: P) -> Self {
85        Self {
86            inner: Delay::new(inner, delay),
87            predicate,
88        }
89    }
90}
91
92impl<S, Req, P> Service<Req> for DelayWith<S, P>
93where
94    S: Service<Req>,
95    S::Error: Into<BoxError>,
96    P: Fn(&Req) -> bool,
97{
98    type Response = S::Response;
99    type Error = BoxError;
100    type Future = ResponseFuture<S::Future>;
101
102    #[inline]
103    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
104        self.inner.poll_ready(cx).map_err(Into::into)
105    }
106
107    fn call(&mut self, req: Req) -> Self::Future {
108        if !(self.predicate)(&req) {
109            self.inner.delay = Duration::ZERO;
110        }
111        self.inner.call(req)
112    }
113}
114
115// ===== impl JitterDelay =====
116
117impl<S> JitterDelay<S> {
118    /// Creates a new [`JitterDelay`].
119    #[inline]
120    pub fn new(inner: S, base: Duration, pct: f64) -> Self {
121        Self {
122            inner,
123            base,
124            pct: pct.clamp(0.0, 1.0),
125        }
126    }
127}
128
129impl<S, Req> Service<Req> for JitterDelay<S>
130where
131    S: Service<Req>,
132    S::Error: Into<BoxError>,
133{
134    type Response = S::Response;
135    type Error = BoxError;
136    type Future = ResponseFuture<S::Future>;
137
138    #[inline]
139    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
140        self.inner.poll_ready(cx).map_err(Into::into)
141    }
142
143    fn call(&mut self, req: Req) -> Self::Future {
144        let delay = jittered_duration(self.base, self.pct);
145        let sleep = tokio::time::sleep(delay);
146        let fut = self.inner.call(req);
147        ResponseFuture::new(fut, sleep)
148    }
149}
150
151// ===== impl JitterDelayWith =====
152
153impl<S, P> JitterDelayWith<S, P> {
154    /// Creates a new [`JitterDelayWith`].
155    #[inline]
156    pub fn new(inner: S, base: Duration, pct: f64, predicate: P) -> Self {
157        Self {
158            inner: JitterDelay::new(inner, base, pct),
159            predicate,
160        }
161    }
162}
163
164impl<S, Req, P> Service<Req> for JitterDelayWith<S, P>
165where
166    S: Service<Req>,
167    S::Error: Into<BoxError>,
168    P: Fn(&Req) -> bool,
169{
170    type Response = S::Response;
171    type Error = BoxError;
172    type Future = ResponseFuture<S::Future>;
173
174    #[inline]
175    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
176        self.inner.poll_ready(cx).map_err(Into::into)
177    }
178
179    fn call(&mut self, req: Req) -> Self::Future {
180        let delay = if (self.predicate)(&req) {
181            jittered_duration(self.inner.base, self.inner.pct)
182        } else {
183            Duration::ZERO
184        };
185
186        let sleep = tokio::time::sleep(delay);
187        let fut = self.inner.inner.call(req);
188        ResponseFuture::new(fut, sleep)
189    }
190}