mysteriouspants_throttle/
lib.rs

1//! A simple throttle, used for slowing down repeated code. Use this to avoid drowning out
2//! downstream systems. For example, if I were reading the contents of a file repeatedly (polling
3//! for data, perhaps), or calling an external network resource, I could use a `Throttle` to slow
4//! that down to avoid resource contention or browning out a downstream service. Another potential
5//! use of a `Throttle` is in video game code to lock a framerate lower to promote predictable
6//! gameplay or to avoid burning up user's graphics hardware unnecessarily.
7//!
8//! This ranges in utility from a simple TPS throttle, "never go faster than *x* transactions per
9//! second,"
10//!
11//! ```rust
12//! # extern crate mysteriouspants_throttle;
13//! # use std::time::Instant;
14//! # use mysteriouspants_throttle::Throttle;
15//! # fn main() {
16//! // create a new Throttle that rate limits to 10 TPS
17//! let mut throttle = Throttle::new_tps_throttle(10.0);
18//!
19//! let iteration_start = Instant::now();
20//!
21//! // iterate eleven times, which at 10 TPS should take just over 1 second
22//! for _i in 0..11 {
23//!   throttle.acquire(());
24//!   // do the needful
25//! }
26//!
27//! // prove that it did, in fact, take 1 second
28//! assert_eq!(iteration_start.elapsed().as_secs() == 1, true);
29//! # }
30//! ```
31//!
32//! To more complicated variable-rate throttles, which may be as advanced as to slow in response to
33//! backpressure.
34//!
35//! ```rust
36//! # extern crate mysteriouspants_throttle;
37//! # use std::time::{Duration, Instant};
38//! # use mysteriouspants_throttle::Throttle;
39//! # fn main() {
40//! let mut throttle = Throttle::new_variable_throttle(
41//!     |arg: u64, _| Duration::from_millis(arg));
42//!
43//! let iteration_start = Instant::now();
44//!
45//! for i in 0..5 {
46//!   throttle.acquire(i * 100);
47//! }
48//!
49//! assert_eq!(iteration_start.elapsed().as_secs() == 1, true);
50//! # }
51//! ```
52//!
53//! When using your throttle, keep in mind that you are responsible for sharing it between
54//! threads safely and responsibly.
55
56use std::time::{Duration, Instant};
57use std::thread::sleep;
58
59#[derive(Copy, Clone)]
60enum ThrottleState {
61    Uninitialized,
62    Initialized {
63        previous_invocation: Instant
64    }
65}
66
67/// A simple configurable throttle for slowing down code, a little struct holding some state.
68pub struct Throttle<TArg> {
69    delay_calculator: Box<Fn(TArg, Duration) -> Duration + Send + Sync>,
70    state: ThrottleState
71}
72
73impl <TArg> Throttle<TArg> {
74    /// Creates a new `Throttle` with a variable delay controlled by a closure. `delay_calculator`
75    /// itself is an interesting type, any closure which satisfies `Fn(TArg, Duration) -> Duration`.
76    ///
77    /// This lambda is called to determine the duration between iterations of your code.
78    ///
79    /// ```text
80    /// |TArg, Duration| -> Duration
81    ///   |      |             |
82    ///   |      |             |
83    ///   |      |             v
84    ///   |      |    Duration that ought to have elapsed between calls
85    ///   |      |    to acquire. If the Duration you return is less
86    ///   |      |    than the Duration passed to you, or is zero, that
87    ///   |      |    means that no additional time will to be waited.
88    ///   |      |
89    ///   |      +--> The time since the previous call to acquire and now.
90    ///   |
91    ///   |           An argument passed through from the call to acquire.
92    ///   +---------> You can use this to change the behavior of your
93    ///               Throttle based on conditions in your calling code.
94    /// ```
95    ///
96    /// Expressed differently, on the axis of time,
97    ///
98    /// ```text
99    ///   /------------lambda return--------------------\
100    ///   /----duration arg-----\                       |
101    ///  +-----------------------------------------------+
102    ///  ^                       ^\------additional-----/
103    ///  |                       |       time waited
104    ///  |                       |
105    ///  previous call           acquire called
106    ///  to acquire
107    /// ```
108    ///
109    /// An example use of a variable-rate throttle might be to wait different periods of time
110    /// depending on whether your program is in backpressure, so "ease up" on your downstream call
111    /// rate, so to speak.
112    ///
113    /// ```
114    /// # extern crate mysteriouspants_throttle;
115    /// # use std::time::{Duration, Instant};
116    /// # use mysteriouspants_throttle::Throttle;
117    /// let mut throttle = Throttle::new_variable_throttle(
118    ///     |in_backpressure: bool, time_since_previous_acquire: Duration|
119    ///         match in_backpressure {
120    ///             true => Duration::from_millis(210),
121    ///             false => Duration::from_millis(110)
122    ///         });
123    ///
124    /// // the first one is free!
125    /// throttle.acquire(false);
126    ///
127    /// let start_nopressure = Instant::now();
128    /// throttle.acquire(false);
129    /// assert_eq!(start_nopressure.elapsed().as_secs() == 0, true);
130    /// assert_eq!(start_nopressure.elapsed().subsec_nanos() >= 100_000_000, true);
131    ///
132    /// let start_yespressure = Instant::now();
133    /// throttle.acquire(true);
134    /// assert_eq!(start_yespressure.elapsed().as_secs() == 0, true);
135    /// assert_eq!(start_yespressure.elapsed().subsec_nanos() >= 200_000_000, true);
136    /// ```
137    pub fn new_variable_throttle<TDelayCalculator: Fn(TArg, Duration) -> Duration + 'static + Send + Sync>(
138        delay_calculator: TDelayCalculator) -> Throttle<TArg> {
139        return Throttle {
140            delay_calculator: Box::new(delay_calculator),
141            state: ThrottleState::Uninitialized
142        };
143    }
144
145    /// Creates a new `Throttle` with a constant delay of `tps`<sup>-1</sup> &middot; 1000 ms, or
146    /// `tps`-transactions per second.
147    ///
148    /// ```rust
149    /// # extern crate mysteriouspants_throttle;
150    /// # use std::time::{Duration, Instant};
151    /// # use mysteriouspants_throttle::Throttle;
152    /// let mut throttle = Throttle::new_tps_throttle(0.9);
153    ///
154    /// // the first one is free!
155    /// throttle.acquire(());
156    ///
157    /// let start = Instant::now();
158    /// throttle.acquire(());
159    /// assert_eq!(start.elapsed().as_secs() == 1, true);
160    /// ```
161    pub fn new_tps_throttle(tps: f32) -> Throttle<TArg> {
162        let wait_for_millis = ((1.0 / tps) * 1000.0) as u64;
163        return Throttle {
164            delay_calculator: Box::new(move |_, _|
165                Duration::from_millis(wait_for_millis)),
166            state: ThrottleState::Uninitialized
167        };
168    }
169
170    /// Acquires the throttle, waiting (sleeping the current thread) until enough time has passed
171    /// for the running code to be at or slower than the throttle allows. The first call to
172    /// `acquire` will never wait because there has been an undefined or arguably infinite amount
173    /// of time from the previous time acquire was called. The argument `arg` is passed to the
174    /// closure governing the wait time.
175    pub fn acquire(&mut self, arg: TArg) {
176        match self.state {
177            ThrottleState::Initialized { previous_invocation } => {
178                let time_since_previous_acquire =
179                    Instant::now().duration_since(previous_invocation);
180                let delay_time = (self.delay_calculator)(arg, time_since_previous_acquire);
181
182                if delay_time > Duration::from_secs(0)
183                        && delay_time > time_since_previous_acquire {
184                    let additional_delay_required = delay_time - time_since_previous_acquire;
185
186                    if additional_delay_required > Duration::from_secs(0) {
187                        sleep(additional_delay_required);
188                    }
189                }
190
191                self.state = ThrottleState::Initialized { previous_invocation: Instant::now() };
192            },
193            ThrottleState::Uninitialized => {
194                self.state = ThrottleState::Initialized { previous_invocation: Instant::now() };
195            }
196        }
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use std::time::{Duration, Instant};
203    use std::thread::sleep;
204    use Throttle;
205
206    #[test]
207    fn it_works() {
208        // simple throttle configured for 10 TPS
209        let mut throttle = Throttle::new_tps_throttle(10.0);
210
211        // the first one is free
212        throttle.acquire(());
213
214        let iteration_start = Instant::now();
215
216        for _i in 0..10 {
217            throttle.acquire(());
218        }
219
220        assert_eq!(iteration_start.elapsed().as_secs() == 1, true);
221    }
222
223    #[test]
224    fn it_works_more_complicated() {
225        let mut throttle = Throttle::new_variable_throttle(
226            |arg: u64, _| Duration::from_millis(arg));
227
228        let iteration_start = Instant::now();
229
230        for i in 0..5 {
231            throttle.acquire(i * 100);
232        }
233
234        assert_eq!(iteration_start.elapsed().as_secs() == 1, true);
235    }
236
237    // from a user-perspective, a delay of zero ought to mean "no delay," and I don't want to
238    // worry about pesky panics trying to subtract durations!
239
240    #[test]
241    fn it_works_with_no_delay_at_all_tps() {
242        let mut throttle = Throttle::new_tps_throttle(0.0);
243
244        throttle.acquire(());
245        throttle.acquire(());
246
247        // no panic, no problem!
248    }
249
250    #[test]
251    fn it_works_with_no_delay_at_all_variable() {
252        let mut throttle = Throttle::new_variable_throttle(
253            |_, _| Duration::from_millis(0));
254
255        throttle.acquire(());
256        throttle.acquire(());
257
258        // no panic, no problem!
259    }
260
261    #[test]
262    fn it_works_with_duration_smaller_than_already_elapsed_time() {
263        // iterate every 10 ms
264        let mut throttle = Throttle::new_tps_throttle(100.0);
265
266        // the first one is free!
267        throttle.acquire(());
268
269        sleep(Duration::from_millis(20));
270
271        throttle.acquire(());
272
273        // no panic, no problem!
274    }
275
276    // these break if the struct loses it's sync+send status
277
278    fn is_send<T: Send>() { }
279
280    #[test]
281    fn enforce_send() {
282        is_send::<Throttle<()>>()
283    }
284
285    fn is_sync<T: Sync>() { }
286
287    #[test]
288    fn enforce_sync() {
289        is_sync::<Throttle<()>>()
290    }
291}