mysteriouspants_throttle/lib.rs
1//! A simple throttle, used for slowing down repeated code. Use this to avoid drowning out
2//! downstream systems. For example, if I were reading the contents of a file repeatedly (polling
3//! for data, perhaps), or calling an external network resource, I could use a `Throttle` to slow
4//! that down to avoid resource contention or browning out a downstream service. Another potential
5//! use of a `Throttle` is in video game code to lock a framerate lower to promote predictable
6//! gameplay or to avoid burning up user's graphics hardware unnecessarily.
7//!
8//! This ranges in utility from a simple TPS throttle, "never go faster than *x* transactions per
9//! second,"
10//!
11//! ```rust
12//! # extern crate mysteriouspants_throttle;
13//! # use std::time::Instant;
14//! # use mysteriouspants_throttle::Throttle;
15//! # fn main() {
16//! // create a new Throttle that rate limits to 10 TPS
17//! let mut throttle = Throttle::new_tps_throttle(10.0);
18//!
19//! let iteration_start = Instant::now();
20//!
21//! // iterate eleven times, which at 10 TPS should take just over 1 second
22//! for _i in 0..11 {
23//! throttle.acquire(());
24//! // do the needful
25//! }
26//!
27//! // prove that it did, in fact, take 1 second
28//! assert_eq!(iteration_start.elapsed().as_secs() == 1, true);
29//! # }
30//! ```
31//!
32//! To more complicated variable-rate throttles, which may be as advanced as to slow in response to
33//! backpressure.
34//!
35//! ```rust
36//! # extern crate mysteriouspants_throttle;
37//! # use std::time::{Duration, Instant};
38//! # use mysteriouspants_throttle::Throttle;
39//! # fn main() {
40//! let mut throttle = Throttle::new_variable_throttle(
41//! |arg: u64, _| Duration::from_millis(arg));
42//!
43//! let iteration_start = Instant::now();
44//!
45//! for i in 0..5 {
46//! throttle.acquire(i * 100);
47//! }
48//!
49//! assert_eq!(iteration_start.elapsed().as_secs() == 1, true);
50//! # }
51//! ```
52//!
53//! When using your throttle, keep in mind that you are responsible for sharing it between
54//! threads safely and responsibly.
55
56use std::time::{Duration, Instant};
57use std::thread::sleep;
58
59#[derive(Copy, Clone)]
60enum ThrottleState {
61 Uninitialized,
62 Initialized {
63 previous_invocation: Instant
64 }
65}
66
67/// A simple configurable throttle for slowing down code, a little struct holding some state.
68pub struct Throttle<TArg> {
69 delay_calculator: Box<Fn(TArg, Duration) -> Duration + Send + Sync>,
70 state: ThrottleState
71}
72
73impl <TArg> Throttle<TArg> {
74 /// Creates a new `Throttle` with a variable delay controlled by a closure. `delay_calculator`
75 /// itself is an interesting type, any closure which satisfies `Fn(TArg, Duration) -> Duration`.
76 ///
77 /// This lambda is called to determine the duration between iterations of your code.
78 ///
79 /// ```text
80 /// |TArg, Duration| -> Duration
81 /// | | |
82 /// | | |
83 /// | | v
84 /// | | Duration that ought to have elapsed between calls
85 /// | | to acquire. If the Duration you return is less
86 /// | | than the Duration passed to you, or is zero, that
87 /// | | means that no additional time will to be waited.
88 /// | |
89 /// | +--> The time since the previous call to acquire and now.
90 /// |
91 /// | An argument passed through from the call to acquire.
92 /// +---------> You can use this to change the behavior of your
93 /// Throttle based on conditions in your calling code.
94 /// ```
95 ///
96 /// Expressed differently, on the axis of time,
97 ///
98 /// ```text
99 /// /------------lambda return--------------------\
100 /// /----duration arg-----\ |
101 /// +-----------------------------------------------+
102 /// ^ ^\------additional-----/
103 /// | | time waited
104 /// | |
105 /// previous call acquire called
106 /// to acquire
107 /// ```
108 ///
109 /// An example use of a variable-rate throttle might be to wait different periods of time
110 /// depending on whether your program is in backpressure, so "ease up" on your downstream call
111 /// rate, so to speak.
112 ///
113 /// ```
114 /// # extern crate mysteriouspants_throttle;
115 /// # use std::time::{Duration, Instant};
116 /// # use mysteriouspants_throttle::Throttle;
117 /// let mut throttle = Throttle::new_variable_throttle(
118 /// |in_backpressure: bool, time_since_previous_acquire: Duration|
119 /// match in_backpressure {
120 /// true => Duration::from_millis(210),
121 /// false => Duration::from_millis(110)
122 /// });
123 ///
124 /// // the first one is free!
125 /// throttle.acquire(false);
126 ///
127 /// let start_nopressure = Instant::now();
128 /// throttle.acquire(false);
129 /// assert_eq!(start_nopressure.elapsed().as_secs() == 0, true);
130 /// assert_eq!(start_nopressure.elapsed().subsec_nanos() >= 100_000_000, true);
131 ///
132 /// let start_yespressure = Instant::now();
133 /// throttle.acquire(true);
134 /// assert_eq!(start_yespressure.elapsed().as_secs() == 0, true);
135 /// assert_eq!(start_yespressure.elapsed().subsec_nanos() >= 200_000_000, true);
136 /// ```
137 pub fn new_variable_throttle<TDelayCalculator: Fn(TArg, Duration) -> Duration + 'static + Send + Sync>(
138 delay_calculator: TDelayCalculator) -> Throttle<TArg> {
139 return Throttle {
140 delay_calculator: Box::new(delay_calculator),
141 state: ThrottleState::Uninitialized
142 };
143 }
144
145 /// Creates a new `Throttle` with a constant delay of `tps`<sup>-1</sup> · 1000 ms, or
146 /// `tps`-transactions per second.
147 ///
148 /// ```rust
149 /// # extern crate mysteriouspants_throttle;
150 /// # use std::time::{Duration, Instant};
151 /// # use mysteriouspants_throttle::Throttle;
152 /// let mut throttle = Throttle::new_tps_throttle(0.9);
153 ///
154 /// // the first one is free!
155 /// throttle.acquire(());
156 ///
157 /// let start = Instant::now();
158 /// throttle.acquire(());
159 /// assert_eq!(start.elapsed().as_secs() == 1, true);
160 /// ```
161 pub fn new_tps_throttle(tps: f32) -> Throttle<TArg> {
162 let wait_for_millis = ((1.0 / tps) * 1000.0) as u64;
163 return Throttle {
164 delay_calculator: Box::new(move |_, _|
165 Duration::from_millis(wait_for_millis)),
166 state: ThrottleState::Uninitialized
167 };
168 }
169
170 /// Acquires the throttle, waiting (sleeping the current thread) until enough time has passed
171 /// for the running code to be at or slower than the throttle allows. The first call to
172 /// `acquire` will never wait because there has been an undefined or arguably infinite amount
173 /// of time from the previous time acquire was called. The argument `arg` is passed to the
174 /// closure governing the wait time.
175 pub fn acquire(&mut self, arg: TArg) {
176 match self.state {
177 ThrottleState::Initialized { previous_invocation } => {
178 let time_since_previous_acquire =
179 Instant::now().duration_since(previous_invocation);
180 let delay_time = (self.delay_calculator)(arg, time_since_previous_acquire);
181
182 if delay_time > Duration::from_secs(0)
183 && delay_time > time_since_previous_acquire {
184 let additional_delay_required = delay_time - time_since_previous_acquire;
185
186 if additional_delay_required > Duration::from_secs(0) {
187 sleep(additional_delay_required);
188 }
189 }
190
191 self.state = ThrottleState::Initialized { previous_invocation: Instant::now() };
192 },
193 ThrottleState::Uninitialized => {
194 self.state = ThrottleState::Initialized { previous_invocation: Instant::now() };
195 }
196 }
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use std::time::{Duration, Instant};
203 use std::thread::sleep;
204 use Throttle;
205
206 #[test]
207 fn it_works() {
208 // simple throttle configured for 10 TPS
209 let mut throttle = Throttle::new_tps_throttle(10.0);
210
211 // the first one is free
212 throttle.acquire(());
213
214 let iteration_start = Instant::now();
215
216 for _i in 0..10 {
217 throttle.acquire(());
218 }
219
220 assert_eq!(iteration_start.elapsed().as_secs() == 1, true);
221 }
222
223 #[test]
224 fn it_works_more_complicated() {
225 let mut throttle = Throttle::new_variable_throttle(
226 |arg: u64, _| Duration::from_millis(arg));
227
228 let iteration_start = Instant::now();
229
230 for i in 0..5 {
231 throttle.acquire(i * 100);
232 }
233
234 assert_eq!(iteration_start.elapsed().as_secs() == 1, true);
235 }
236
237 // from a user-perspective, a delay of zero ought to mean "no delay," and I don't want to
238 // worry about pesky panics trying to subtract durations!
239
240 #[test]
241 fn it_works_with_no_delay_at_all_tps() {
242 let mut throttle = Throttle::new_tps_throttle(0.0);
243
244 throttle.acquire(());
245 throttle.acquire(());
246
247 // no panic, no problem!
248 }
249
250 #[test]
251 fn it_works_with_no_delay_at_all_variable() {
252 let mut throttle = Throttle::new_variable_throttle(
253 |_, _| Duration::from_millis(0));
254
255 throttle.acquire(());
256 throttle.acquire(());
257
258 // no panic, no problem!
259 }
260
261 #[test]
262 fn it_works_with_duration_smaller_than_already_elapsed_time() {
263 // iterate every 10 ms
264 let mut throttle = Throttle::new_tps_throttle(100.0);
265
266 // the first one is free!
267 throttle.acquire(());
268
269 sleep(Duration::from_millis(20));
270
271 throttle.acquire(());
272
273 // no panic, no problem!
274 }
275
276 // these break if the struct loses it's sync+send status
277
278 fn is_send<T: Send>() { }
279
280 #[test]
281 fn enforce_send() {
282 is_send::<Throttle<()>>()
283 }
284
285 fn is_sync<T: Sync>() { }
286
287 #[test]
288 fn enforce_sync() {
289 is_sync::<Throttle<()>>()
290 }
291}