dbs_utils/
rate_limiter.rs

1// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4#![deny(missing_docs)]
5//! # Rate Limiter
6//!
7//! Provides a rate limiter written in Rust useful for IO operations that need to
8//! be throttled.
9//!
10//! ## Behavior
11//!
12//! The rate limiter starts off as 'unblocked' with two token buckets configured
13//! with the values passed in the `RateLimiter::new()` constructor.
14//! All subsequent accounting is done independently for each token bucket based
15//! on the `TokenType` used. If any of the buckets runs out of budget, the limiter
16//! goes in the 'blocked' state. At this point an internal timer is set up which
17//! will later 'wake up' the user in order to retry sending data. The 'wake up'
18//! notification will be dispatched as an event on the FD provided by the `AsRawFD`
19//! trait implementation.
20//!
21//! The contract is that the user shall also call the `event_handler()` method on
22//! receipt of such an event.
23//!
24//! The token buckets are replenished every time a `consume()` is called, before
25//! actually trying to consume the requested amount of tokens. The amount of tokens
26//! replenished is automatically calculated to respect the `complete_refill_time`
27//! configuration parameter provided by the user. The token buckets will never
28//! replenish above their respective `size`.
29//!
30//! Each token bucket can start off with a `one_time_burst` initial extra capacity
31//! on top of their `size`. This initial extra credit does not replenish and
32//! can be used for an initial burst of data.
33//!
34//! The granularity for 'wake up' events when the rate limiter is blocked is
35//! currently hardcoded to `10 milliseconds`.
36//!
37//! ## Limitations
38//!
39//! This rate limiter implementation relies on the *Linux kernel's timerfd* so its
40//! usage is limited to Linux systems.
41//!
42//! Another particularity of this implementation is that it is not self-driving.
43//! It is meant to be used in an external event loop and thus implements the `AsRawFd`
44//! trait and provides an *event-handler* as part of its API. This *event-handler*
45//! needs to be called by the user on every event on the rate limiter's `AsRawFd` FD.
46
47use std::os::unix::io::{AsRawFd, RawFd};
48use std::time::{Duration, Instant};
49use std::{fmt, io};
50
51use log::error;
52use timerfd::{ClockId, SetTimeFlags, TimerFd, TimerState};
53
54#[derive(Debug)]
55/// Describes the errors that may occur while handling rate limiter events.
56pub enum Error {
57    /// The event handler was called spuriously.
58    SpuriousRateLimiterEvent(&'static str),
59}
60
61// Interval at which the refill timer will run when limiter is at capacity.
62const REFILL_TIMER_INTERVAL_MS: u64 = 10;
63const TIMER_REFILL_STATE: TimerState =
64    TimerState::Oneshot(Duration::from_millis(REFILL_TIMER_INTERVAL_MS));
65
66const NANOSEC_IN_ONE_MILLISEC: u64 = 1_000_000;
67
68// Euclid's two-thousand-year-old algorithm for finding the greatest common divisor.
69fn gcd(x: u64, y: u64) -> u64 {
70    let mut x = x;
71    let mut y = y;
72    while y != 0 {
73        let t = y;
74        y = x % y;
75        x = t;
76    }
77    x
78}
79
80/// Enum describing the outcomes of a `reduce()` call on a `TokenBucket`.
81#[derive(Clone, Debug, PartialEq)]
82pub enum BucketReduction {
83    /// No enough tokens
84    Failure,
85    /// Part of the available tokens have been consumed.
86    Success,
87    /// A number of tokens `inner` times larger than the bucket size have been consumed.
88    OverConsumption(f64),
89}
90
91/// TokenBucket provides a lower level interface to rate limiting with a
92/// configurable capacity, refill-rate and initial burst.
93#[derive(Clone, Debug, Eq, PartialEq)]
94pub struct TokenBucket {
95    // Bucket defining traits.
96    size: u64,
97    // Initial burst size.
98    initial_one_time_burst: u64,
99    // Complete refill time in milliseconds.
100    refill_time: u64,
101
102    // Internal state descriptors.
103
104    // Number of free initial tokens, that can be consumed at no cost.
105    one_time_burst: u64,
106    // Current token budget.
107    budget: u64,
108    // Last time this token bucket saw activity.
109    last_update: Instant,
110
111    // Fields used for pre-processing optimizations.
112    processed_capacity: u64,
113    processed_refill_time: u64,
114}
115
116impl TokenBucket {
117    /// Creates a `TokenBucket` wrapped in an `Option`.
118    ///
119    /// TokenBucket created is of `size` total capacity and takes `complete_refill_time_ms`
120    /// milliseconds to go from zero tokens to total capacity. The `one_time_burst` is initial
121    /// extra credit on top of total capacity, that does not replenish and which can be used
122    /// for an initial burst of data.
123    ///
124    /// If the `size` or the `complete refill time` are zero, then `None` is returned.
125    pub fn new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Self {
126        // If either token bucket capacity or refill time is 0, disable limiting.
127        debug_assert!(size != 0 && complete_refill_time_ms != 0);
128
129        // Formula for computing current refill amount:
130        // refill_token_count = (delta_time * size) / (complete_refill_time_ms * 1_000_000)
131        // In order to avoid overflows, simplify the fractions by computing greatest common divisor.
132
133        let complete_refill_time_ns = complete_refill_time_ms * NANOSEC_IN_ONE_MILLISEC;
134        // Get the greatest common factor between `size` and `complete_refill_time_ns`.
135        let common_factor = gcd(size, complete_refill_time_ns);
136        // The division will be exact since `common_factor` is a factor of `size`.
137        let processed_capacity: u64 = size / common_factor;
138        // The division will be exact since `common_factor` is a factor of `complete_refill_time_ns`.
139        let processed_refill_time: u64 = complete_refill_time_ns / common_factor;
140
141        TokenBucket {
142            size,
143            one_time_burst,
144            initial_one_time_burst: one_time_burst,
145            refill_time: complete_refill_time_ms,
146            // Start off full.
147            budget: size,
148            // Last updated is now.
149            last_update: Instant::now(),
150            processed_capacity,
151            processed_refill_time,
152        }
153    }
154
155    // Replenishes token bucket based on elapsed time. Should only be called internally by `Self`.
156    fn auto_replenish(&mut self) {
157        // Compute time passed since last refill/update.
158        let time_delta = self.last_update.elapsed().as_nanos() as u64;
159        self.last_update = Instant::now();
160
161        // At each 'time_delta' nanoseconds the bucket should refill with:
162        // refill_amount = (time_delta * size) / (complete_refill_time_ms * 1_000_000)
163        // `processed_capacity` and `processed_refill_time` are the result of simplifying above
164        // fraction formula with their greatest-common-factor.
165        let tokens = (time_delta * self.processed_capacity) / self.processed_refill_time;
166        self.budget = std::cmp::min(self.budget + tokens, self.size);
167    }
168
169    /// Attempts to consume `tokens` from the bucket and returns whether the action succeeded.
170    pub fn reduce(&mut self, mut tokens: u64) -> BucketReduction {
171        // First things first: consume the one-time-burst budget.
172        if self.one_time_burst > 0 {
173            // We still have burst budget for *all* tokens requests.
174            if self.one_time_burst >= tokens {
175                self.one_time_burst -= tokens;
176                self.last_update = Instant::now();
177                // No need to continue to the refill process, we still have burst budget to consume from.
178                return BucketReduction::Success;
179            } else {
180                // We still have burst budget for *some* of the tokens requests.
181                // The tokens left unfulfilled will be consumed from current `self.budget`.
182                tokens -= self.one_time_burst;
183                self.one_time_burst = 0;
184            }
185        }
186
187        if tokens > self.budget {
188            // Hit the bucket bottom, let's auto-replenish and try again.
189            self.auto_replenish();
190
191            // This operation requests a bandwidth higher than the bucket size
192            if tokens > self.size {
193                error!(
194                    "Consumed {} tokens from bucket of size {}",
195                    tokens, self.size
196                );
197                // Empty the bucket and report an overconsumption of
198                // (remaining tokens / size) times larger than the bucket size
199                tokens -= self.budget;
200                self.budget = 0;
201                return BucketReduction::OverConsumption(tokens as f64 / self.size as f64);
202            }
203
204            if tokens > self.budget {
205                // Still not enough tokens, consume() fails, return false.
206                return BucketReduction::Failure;
207            }
208        }
209
210        self.budget -= tokens;
211        BucketReduction::Success
212    }
213
214    /// "Manually" adds tokens to bucket.
215    pub fn force_replenish(&mut self, tokens: u64) {
216        // This means we are still during the burst interval.
217        // Of course there is a very small chance  that the last reduce() also used up burst
218        // budget which should now be replenished, but for performance and code-complexity
219        // reasons we're just gonna let that slide since it's practically inconsequential.
220        if self.one_time_burst > 0 {
221            self.one_time_burst += tokens;
222            return;
223        }
224        self.budget = std::cmp::min(self.budget + tokens, self.size);
225    }
226
227    /// Returns the capacity of the token bucket.
228    pub fn capacity(&self) -> u64 {
229        self.size
230    }
231
232    /// Returns the remaining one time burst budget.
233    pub fn one_time_burst(&self) -> u64 {
234        self.one_time_burst
235    }
236
237    /// Returns the time in milliseconds required to to completely fill the bucket.
238    pub fn refill_time_ms(&self) -> u64 {
239        self.refill_time
240    }
241
242    /// Returns the current budget (one time burst allowance notwithstanding).
243    pub fn budget(&self) -> u64 {
244        self.budget
245    }
246
247    /// Returns the initially configured one time burst budget.
248    pub fn initial_one_time_burst(&self) -> u64 {
249        self.initial_one_time_burst
250    }
251}
252
253/// Enum that describes the type of token used.
254pub enum TokenType {
255    /// Token type used for bandwidth limiting.
256    Bytes,
257    /// Token type used for operations/second limiting.
258    Ops,
259}
260
261/// Enum that describes the type of token bucket update.
262#[derive(Clone, Debug)]
263pub enum BucketUpdate {
264    /// No Update - same as before.
265    None,
266    /// Rate Limiting is disabled on this bucket.
267    Disabled,
268    /// Rate Limiting enabled with updated bucket.
269    Update(TokenBucket),
270}
271
272/// Rate Limiter that works on both bandwidth and ops/s limiting.
273///
274/// Bandwidth (bytes/s) and ops/s limiting can be used at the same time or individually.
275///
276/// Implementation uses a single timer through TimerFd to refresh either or
277/// both token buckets.
278///
279/// Its internal buckets are 'passively' replenished as they're being used (as
280/// part of `consume()` operations).
281/// A timer is enabled and used to 'actively' replenish the token buckets when
282/// limiting is in effect and `consume()` operations are disabled.
283///
284/// RateLimiters will generate events on the FDs provided by their `AsRawFd` trait
285/// implementation. These events are meant to be consumed by the user of this struct.
286/// On each such event, the user must call the `event_handler()` method.
287pub struct RateLimiter {
288    /// Bandwidth limit in bytes/s
289    bandwidth: Option<TokenBucket>,
290    /// Operate limit in ops/s
291    ops: Option<TokenBucket>,
292    /// Timer handle
293    timer_fd: TimerFd,
294    /// Internal flag that quickly determines timer state.
295    timer_active: bool,
296}
297
298impl PartialEq for RateLimiter {
299    fn eq(&self, other: &RateLimiter) -> bool {
300        self.bandwidth == other.bandwidth && self.ops == other.ops
301    }
302}
303
304impl fmt::Debug for RateLimiter {
305    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
306        write!(
307            f,
308            "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
309            self.bandwidth, self.ops
310        )
311    }
312}
313
314impl RateLimiter {
315    /// This function creates a `TokenBucket` wrapped in an `Option` with a given total capacity,
316    /// one time burst, and complete refill time (in miliseconds). If the total capacity or the
317    /// complete refill time are zero, then `None` is returned.
318    pub fn make_bucket(
319        total_capacity: u64,
320        one_time_burst: u64,
321        complete_refill_time_ms: u64,
322    ) -> Option<TokenBucket> {
323        // If either token bucket capacity or refill time is 0, disable limiting.
324        if total_capacity != 0 && complete_refill_time_ms != 0 {
325            Some(TokenBucket::new(
326                total_capacity,
327                one_time_burst,
328                complete_refill_time_ms,
329            ))
330        } else {
331            None
332        }
333    }
334    /// Creates a new Rate Limiter that can limit on both bytes/s and ops/s.
335    ///
336    /// # Arguments
337    ///
338    /// * `bytes_total_capacity` - the total capacity of the `TokenType::Bytes` token bucket.
339    /// * `bytes_one_time_burst` - initial extra credit on top of `bytes_total_capacity`,
340    /// that does not replenish and which can be used for an initial burst of data.
341    /// * `bytes_complete_refill_time_ms` - number of milliseconds for the `TokenType::Bytes`
342    /// token bucket to go from zero Bytes to `bytes_total_capacity` Bytes.
343    /// * `ops_total_capacity` - the total capacity of the `TokenType::Ops` token bucket.
344    /// * `ops_one_time_burst` - initial extra credit on top of `ops_total_capacity`,
345    /// that does not replenish and which can be used for an initial burst of data.
346    /// * `ops_complete_refill_time_ms` - number of milliseconds for the `TokenType::Ops` token
347    /// bucket to go from zero Ops to `ops_total_capacity` Ops.
348    ///
349    /// If either bytes/ops *size* or *refill_time* are **zero**, the limiter
350    /// is **disabled** for that respective token type.
351    ///
352    /// # Errors
353    ///
354    /// If the timerfd creation fails, an error is returned.
355    pub fn new(
356        bytes_total_capacity: u64,
357        bytes_one_time_burst: u64,
358        bytes_complete_refill_time_ms: u64,
359        ops_total_capacity: u64,
360        ops_one_time_burst: u64,
361        ops_complete_refill_time_ms: u64,
362    ) -> io::Result<Self> {
363        let bytes_token_bucket = Self::make_bucket(
364            bytes_total_capacity,
365            bytes_one_time_burst,
366            bytes_complete_refill_time_ms,
367        );
368
369        let ops_token_bucket = Self::make_bucket(
370            ops_total_capacity,
371            ops_one_time_burst,
372            ops_complete_refill_time_ms,
373        );
374
375        // We'll need a timer_fd, even if our current config effectively disables rate limiting,
376        // because `Self::update_buckets()` might re-enable it later, and we might be
377        // seccomp-blocked from creating the timer_fd at that time.
378        let timer_fd = TimerFd::new_custom(ClockId::Monotonic, true, true)?;
379
380        Ok(RateLimiter {
381            bandwidth: bytes_token_bucket,
382            ops: ops_token_bucket,
383            timer_fd,
384            timer_active: false,
385        })
386    }
387
388    // Arm the timer of the rate limiter with the provided `TimerState`.
389    fn activate_timer(&mut self, timer_state: TimerState) {
390        // Register the timer; don't care about its previous state
391        self.timer_fd.set_state(timer_state, SetTimeFlags::Default);
392        self.timer_active = true;
393    }
394
395    /// Attempts to consume tokens and returns whether that is possible.
396    ///
397    /// If rate limiting is disabled on provided `token_type`, this function will always succeed.
398    pub fn consume(&mut self, tokens: u64, token_type: TokenType) -> bool {
399        // If the timer is active, we can't consume tokens from any bucket and the function fails.
400        if self.timer_active {
401            return false;
402        }
403
404        // Identify the required token bucket.
405        let token_bucket = match token_type {
406            TokenType::Bytes => self.bandwidth.as_mut(),
407            TokenType::Ops => self.ops.as_mut(),
408        };
409        // Try to consume from the token bucket.
410        if let Some(bucket) = token_bucket {
411            let refill_time = bucket.refill_time_ms();
412            match bucket.reduce(tokens) {
413                // When we report budget is over, there will be no further calls here,
414                // register a timer to replenish the bucket and resume processing;
415                // make sure there is only one running timer for this limiter.
416                BucketReduction::Failure => {
417                    if !self.timer_active {
418                        self.activate_timer(TIMER_REFILL_STATE);
419                    }
420                    false
421                }
422                // The operation succeeded and further calls can be made.
423                BucketReduction::Success => true,
424                // The operation succeeded as the tokens have been consumed
425                // but the timer still needs to be armed.
426                BucketReduction::OverConsumption(ratio) => {
427                    // The operation "borrowed" a number of tokens `ratio` times
428                    // greater than the size of the bucket, and since it takes
429                    // `refill_time` milliseconds to fill an empty bucket, in
430                    // order to enforce the bandwidth limit we need to prevent
431                    // further calls to the rate limiter for
432                    // `ratio * refill_time` milliseconds.
433                    self.activate_timer(TimerState::Oneshot(Duration::from_millis(
434                        (ratio * refill_time as f64) as u64,
435                    )));
436                    true
437                }
438            }
439        } else {
440            // If bucket is not present rate limiting is disabled on token type,
441            // consume() will always succeed.
442            true
443        }
444    }
445
446    /// Adds tokens of `token_type` to their respective bucket.
447    ///
448    /// Can be used to *manually* add tokens to a bucket. Useful for reverting a
449    /// `consume()` if needed.
450    pub fn manual_replenish(&mut self, tokens: u64, token_type: TokenType) {
451        // Identify the required token bucket.
452        let token_bucket = match token_type {
453            TokenType::Bytes => self.bandwidth.as_mut(),
454            TokenType::Ops => self.ops.as_mut(),
455        };
456        // Add tokens to the token bucket.
457        if let Some(bucket) = token_bucket {
458            bucket.force_replenish(tokens);
459        }
460    }
461
462    /// Returns whether this rate limiter is blocked.
463    ///
464    /// The limiter 'blocks' when a `consume()` operation fails because there was not enough
465    /// budget for it.
466    /// An event will be generated on the exported FD when the limiter 'unblocks'.
467    pub fn is_blocked(&self) -> bool {
468        self.timer_active
469    }
470
471    /// This function needs to be called every time there is an event on the
472    /// FD provided by this object's `AsRawFd` trait implementation.
473    ///
474    /// # Errors
475    ///
476    /// If the rate limiter is disabled or is not blocked, an error is returned.
477    pub fn event_handler(&mut self) -> Result<(), Error> {
478        match self.timer_fd.read() {
479            0 => Err(Error::SpuriousRateLimiterEvent(
480                "Rate limiter event handler called without a present timer",
481            )),
482            _ => {
483                self.timer_active = false;
484                Ok(())
485            }
486        }
487    }
488
489    /// Updates the parameters of the token buckets associated with this RateLimiter.
490    // TODO: Please note that, right now, the buckets become full after being updated.
491    pub fn update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate) {
492        match bytes {
493            BucketUpdate::Disabled => self.bandwidth = None,
494            BucketUpdate::Update(tb) => self.bandwidth = Some(tb),
495            BucketUpdate::None => (),
496        };
497        match ops {
498            BucketUpdate::Disabled => self.ops = None,
499            BucketUpdate::Update(tb) => self.ops = Some(tb),
500            BucketUpdate::None => (),
501        };
502    }
503    /// Returns an immutable view of the inner bandwidth token bucket.
504    pub fn bandwidth(&self) -> Option<&TokenBucket> {
505        self.bandwidth.as_ref()
506    }
507
508    /// Returns an immutable view of the inner ops token bucket.
509    pub fn ops(&self) -> Option<&TokenBucket> {
510        self.ops.as_ref()
511    }
512}
513
514impl AsRawFd for RateLimiter {
515    /// Provides a FD which needs to be monitored for POLLIN events.
516    ///
517    /// This object's `event_handler()` method must be called on such events.
518    ///
519    /// Will return a negative value if rate limiting is disabled on both
520    /// token types.
521    fn as_raw_fd(&self) -> RawFd {
522        self.timer_fd.as_raw_fd()
523    }
524}
525
526impl Default for RateLimiter {
527    /// Default RateLimiter is a no-op limiter with infinite budget.
528    fn default() -> Self {
529        // Safe to unwrap since this will not attempt to create timer_fd.
530        RateLimiter::new(0, 0, 0, 0, 0, 0).expect("Failed to build default RateLimiter")
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537    use std::thread;
538    use std::time::Duration;
539    const TEST_REFILL_TIMER_INTERVAL_MS: u64 = 100;
540    impl TokenBucket {
541        // Resets the token bucket: budget set to max capacity and last-updated set to now.
542        fn reset(&mut self) {
543            self.budget = self.size;
544            self.last_update = Instant::now();
545        }
546
547        fn get_last_update(&self) -> &Instant {
548            &self.last_update
549        }
550
551        fn get_processed_capacity(&self) -> u64 {
552            self.processed_capacity
553        }
554
555        fn get_processed_refill_time(&self) -> u64 {
556            self.processed_refill_time
557        }
558
559        // After a restore, we cannot be certain that the last_update field has the same value.
560        pub fn partial_eq(&self, other: &TokenBucket) -> bool {
561            (other.capacity() == self.capacity())
562                && (other.one_time_burst() == self.one_time_burst())
563                && (other.refill_time_ms() == self.refill_time_ms())
564                && (other.budget() == self.budget())
565        }
566    }
567
568    impl RateLimiter {
569        fn get_token_bucket(&self, token_type: TokenType) -> Option<&TokenBucket> {
570            match token_type {
571                TokenType::Bytes => self.bandwidth.as_ref(),
572                TokenType::Ops => self.ops.as_ref(),
573            }
574        }
575    }
576
577    #[test]
578    fn test_token_bucket_create() {
579        let before = Instant::now();
580        let tb = TokenBucket::new(1000, 0, 1000);
581        assert_eq!(tb.capacity(), 1000);
582        assert_eq!(tb.budget(), 1000);
583        assert_eq!(tb.initial_one_time_burst(), 0);
584        assert!(*tb.get_last_update() >= before);
585        let after = Instant::now();
586        assert!(*tb.get_last_update() <= after);
587        assert_eq!(tb.get_processed_capacity(), 1);
588        assert_eq!(tb.get_processed_refill_time(), 1_000_000);
589    }
590
591    #[test]
592    fn test_token_bucket_preprocess() {
593        let tb = TokenBucket::new(1000, 0, 1000);
594        assert_eq!(tb.get_processed_capacity(), 1);
595        assert_eq!(tb.get_processed_refill_time(), NANOSEC_IN_ONE_MILLISEC);
596
597        let thousand = 1000;
598        let tb = TokenBucket::new(3 * 7 * 11 * 19 * thousand, 0, 7 * 11 * 13 * 17);
599        assert_eq!(tb.get_processed_capacity(), 3 * 19);
600        assert_eq!(
601            tb.get_processed_refill_time(),
602            13 * 17 * (NANOSEC_IN_ONE_MILLISEC / thousand)
603        );
604    }
605
606    #[test]
607    fn test_token_bucket_reduce() {
608        // token bucket with capacity 1000 and refill time of 1000 milliseconds
609        // allowing rate of 1 token/ms.
610        let capacity = 1000;
611        let refill_ms = 1000;
612        let mut tb = TokenBucket::new(capacity, 0, refill_ms as u64);
613
614        assert_eq!(tb.reduce(123), BucketReduction::Success);
615        assert_eq!(tb.budget(), capacity - 123);
616        assert_eq!(tb.reduce(capacity), BucketReduction::Failure);
617
618        // Since the CI machine might be slow, we should sleep less milliseconds here than desired 123 ms to avoid errors caused by CI machines.
619        thread::sleep(Duration::from_millis(80));
620        assert_eq!(tb.reduce(1), BucketReduction::Success);
621        assert_eq!(tb.reduce(100), BucketReduction::Success);
622        assert_eq!(tb.reduce(capacity), BucketReduction::Failure);
623
624        // token bucket with capacity 1000 and refill time of 1000 milliseconds
625        let mut tb = TokenBucket::new(1000, 1100, 1000);
626        // safely assuming the thread can run these 3 commands in less than 500ms
627        assert_eq!(tb.reduce(1000), BucketReduction::Success);
628        assert_eq!(tb.one_time_burst(), 100);
629        assert_eq!(tb.reduce(500), BucketReduction::Success);
630        assert_eq!(tb.one_time_burst(), 0);
631        assert_eq!(tb.reduce(500), BucketReduction::Success);
632        assert_eq!(tb.reduce(500), BucketReduction::Failure);
633        thread::sleep(Duration::from_millis(500));
634        assert_eq!(tb.reduce(500), BucketReduction::Success);
635        thread::sleep(Duration::from_millis(1000));
636        assert_eq!(tb.reduce(2500), BucketReduction::OverConsumption(1.5));
637
638        let before = Instant::now();
639        tb.reset();
640        assert_eq!(tb.capacity(), 1000);
641        assert_eq!(tb.budget(), 1000);
642        assert!(*tb.get_last_update() >= before);
643        let after = Instant::now();
644        assert!(*tb.get_last_update() <= after);
645    }
646
647    #[test]
648    fn test_rate_limiter_default() {
649        let mut l = RateLimiter::default();
650
651        // limiter should not be blocked
652        assert!(!l.is_blocked());
653        // limiter should be disabled so consume(whatever) should work
654        assert!(l.consume(u64::max_value(), TokenType::Ops));
655        assert!(l.consume(u64::max_value(), TokenType::Bytes));
656        // calling the handler without there having been an event should error
657        assert!(l.event_handler().is_err());
658        assert_eq!(
659            format!("{:?}", l.event_handler().err().unwrap()),
660            "SpuriousRateLimiterEvent(\
661             \"Rate limiter event handler called without a present timer\")"
662        );
663    }
664
665    #[test]
666    fn test_rate_limiter_new() {
667        let l = RateLimiter::new(1000, 1001, 1002, 1003, 1004, 1005).unwrap();
668
669        let bw = l.bandwidth.unwrap();
670        assert_eq!(bw.capacity(), 1000);
671        assert_eq!(bw.one_time_burst(), 1001);
672        assert_eq!(bw.initial_one_time_burst(), 1001);
673        assert_eq!(bw.refill_time_ms(), 1002);
674        assert_eq!(bw.budget(), 1000);
675
676        let ops = l.ops.unwrap();
677        assert_eq!(ops.capacity(), 1003);
678        assert_eq!(ops.one_time_burst(), 1004);
679        assert_eq!(ops.initial_one_time_burst(), 1004);
680        assert_eq!(ops.refill_time_ms(), 1005);
681        assert_eq!(ops.budget(), 1003);
682    }
683
684    #[test]
685    fn test_rate_limiter_manual_replenish() {
686        // rate limiter with limit of 1000 bytes/s and 1000 ops/s
687        let mut l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
688
689        // consume 123 bytes
690        assert!(l.consume(123, TokenType::Bytes));
691        l.manual_replenish(23, TokenType::Bytes);
692        {
693            let bytes_tb = l.get_token_bucket(TokenType::Bytes).unwrap();
694            assert_eq!(bytes_tb.budget(), 900);
695        }
696        // consume 123 ops
697        assert!(l.consume(123, TokenType::Ops));
698        l.manual_replenish(23, TokenType::Ops);
699        {
700            let bytes_tb = l.get_token_bucket(TokenType::Ops).unwrap();
701            assert_eq!(bytes_tb.budget(), 900);
702        }
703    }
704
705    #[test]
706    fn test_rate_limiter_bandwidth() {
707        // rate limiter with limit of 1000 bytes/s
708        let mut l = RateLimiter::new(1000, 0, 1000, 0, 0, 0).unwrap();
709
710        // limiter should not be blocked
711        assert!(!l.is_blocked());
712        // raw FD for this disabled should be valid
713        assert!(l.as_raw_fd() > 0);
714
715        // ops/s limiter should be disabled so consume(whatever) should work
716        assert!(l.consume(u64::max_value(), TokenType::Ops));
717
718        // do full 1000 bytes
719        assert!(l.consume(1000, TokenType::Bytes));
720        // try and fail on another 100
721        assert!(!l.consume(100, TokenType::Bytes));
722        // since consume failed, limiter should be blocked now
723        assert!(l.is_blocked());
724        // wait half the timer period
725        thread::sleep(Duration::from_millis(TEST_REFILL_TIMER_INTERVAL_MS / 2));
726        // limiter should still be blocked
727        assert!(l.is_blocked());
728        // wait the other half of the timer period
729        thread::sleep(Duration::from_millis(TEST_REFILL_TIMER_INTERVAL_MS / 2));
730        // the timer_fd should have an event on it by now
731        assert!(l.event_handler().is_ok());
732        // limiter should now be unblocked
733        assert!(!l.is_blocked());
734        // try and succeed on another 100 bytes this time
735        assert!(l.consume(100, TokenType::Bytes));
736    }
737
738    #[test]
739    fn test_rate_limiter_ops() {
740        // rate limiter with limit of 1000 ops/s
741        let mut l = RateLimiter::new(0, 0, 0, 1000, 0, 1000).unwrap();
742
743        // limiter should not be blocked
744        assert!(!l.is_blocked());
745        // raw FD for this disabled should be valid
746        assert!(l.as_raw_fd() > 0);
747
748        // bytes/s limiter should be disabled so consume(whatever) should work
749        assert!(l.consume(u64::max_value(), TokenType::Bytes));
750
751        // do full 1000 ops
752        assert!(l.consume(1000, TokenType::Ops));
753        // try and fail on another 100
754        assert!(!l.consume(100, TokenType::Ops));
755        // since consume failed, limiter should be blocked now
756        assert!(l.is_blocked());
757        // wait half the timer period
758        thread::sleep(Duration::from_millis(TEST_REFILL_TIMER_INTERVAL_MS / 2));
759        // limiter should still be blocked
760        assert!(l.is_blocked());
761        // wait the other half of the timer period
762        thread::sleep(Duration::from_millis(TEST_REFILL_TIMER_INTERVAL_MS / 2));
763        // the timer_fd should have an event on it by now
764        assert!(l.event_handler().is_ok());
765        // limiter should now be unblocked
766        assert!(!l.is_blocked());
767        // try and succeed on another 100 ops this time
768        assert!(l.consume(100, TokenType::Ops));
769    }
770
771    #[test]
772    fn test_rate_limiter_full() {
773        // rate limiter with limit of 1000 bytes/s and 1000 ops/s
774        let mut l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
775
776        // limiter should not be blocked
777        assert!(!l.is_blocked());
778        // raw FD for this disabled should be valid
779        assert!(l.as_raw_fd() > 0);
780
781        // do full 1000 bytes
782        assert!(l.consume(1000, TokenType::Ops));
783        // do full 1000 bytes
784        assert!(l.consume(1000, TokenType::Bytes));
785        // try and fail on another 100 ops
786        assert!(!l.consume(100, TokenType::Ops));
787        // try and fail on another 100 bytes
788        assert!(!l.consume(100, TokenType::Bytes));
789        // since consume failed, limiter should be blocked now
790        assert!(l.is_blocked());
791        // wait half the timer period
792        thread::sleep(Duration::from_millis(TEST_REFILL_TIMER_INTERVAL_MS / 2));
793        // limiter should still be blocked
794        assert!(l.is_blocked());
795        // wait the other half of the timer period
796        thread::sleep(Duration::from_millis(TEST_REFILL_TIMER_INTERVAL_MS / 2));
797        // the timer_fd should have an event on it by now
798        assert!(l.event_handler().is_ok());
799        // limiter should now be unblocked
800        assert!(!l.is_blocked());
801        // try and succeed on another 100 ops this time
802        assert!(l.consume(100, TokenType::Ops));
803        // try and succeed on another 100 bytes this time
804        assert!(l.consume(100, TokenType::Bytes));
805    }
806
807    #[test]
808    fn test_rate_limiter_overconsumption() {
809        // initialize the rate limiter
810        let mut l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
811        // try to consume 2.5x the bucket size
812        // we are "borrowing" 1.5x the bucket size in tokens since
813        // the bucket is full
814        assert!(l.consume(2500, TokenType::Bytes));
815
816        // check that even after a whole second passes, the rate limiter
817        // is still blocked
818        thread::sleep(Duration::from_millis(1000));
819        assert!(l.event_handler().is_err());
820        assert!(l.is_blocked());
821
822        // after 1.5x the replenish time has passed, the rate limiter
823        // is available again
824        thread::sleep(Duration::from_millis(500));
825        assert!(l.event_handler().is_ok());
826        assert!(!l.is_blocked());
827
828        // reset the rate limiter
829        let mut l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
830        // try to consume 1.5x the bucket size
831        // we are "borrowing" 1.5x the bucket size in tokens since
832        // the bucket is full, should arm the timer to 0.5x replenish
833        // time, which is 500 ms
834        assert!(l.consume(1500, TokenType::Bytes));
835
836        // check that after more than the minimum refill time,
837        // the rate limiter is still blocked
838        thread::sleep(Duration::from_millis(200));
839        assert!(l.event_handler().is_err());
840        assert!(l.is_blocked());
841
842        // try to consume some tokens, which should fail as the timer
843        // is still active
844        assert!(!l.consume(100, TokenType::Bytes));
845        assert!(l.event_handler().is_err());
846        assert!(l.is_blocked());
847
848        // check that after the minimum refill time, the timer was not
849        // overwritten and the rate limiter is still blocked from the
850        // borrowing we performed earlier
851        thread::sleep(Duration::from_millis(90));
852        assert!(l.event_handler().is_err());
853        assert!(l.is_blocked());
854        assert!(!l.consume(100, TokenType::Bytes));
855
856        // after waiting out the full duration, rate limiter should be
857        // availale again
858        thread::sleep(Duration::from_millis(210));
859        assert!(l.event_handler().is_ok());
860        assert!(!l.is_blocked());
861        assert!(l.consume(100, TokenType::Bytes));
862    }
863
864    #[test]
865    fn test_update_buckets() {
866        let mut x = RateLimiter::new(1000, 2000, 1000, 10, 20, 1000).unwrap();
867
868        let initial_bw = x.bandwidth.clone();
869        let initial_ops = x.ops.clone();
870
871        x.update_buckets(BucketUpdate::None, BucketUpdate::None);
872        assert_eq!(x.bandwidth, initial_bw);
873        assert_eq!(x.ops, initial_ops);
874
875        let new_bw = RateLimiter::make_bucket(123, 0, 57).unwrap();
876        let new_ops = RateLimiter::make_bucket(321, 12346, 89).unwrap();
877        x.update_buckets(
878            BucketUpdate::Update(new_bw.clone()),
879            BucketUpdate::Update(new_ops.clone()),
880        );
881
882        // We have manually adjust the last_update field, because it changes when update_buckets()
883        // constructs new buckets (and thus gets a different value for last_update). We do this so
884        // it makes sense to test the following assertions.
885        x.bandwidth.as_mut().unwrap().last_update = new_bw.last_update;
886        x.ops.as_mut().unwrap().last_update = new_ops.last_update;
887
888        assert_eq!(x.bandwidth, Some(new_bw));
889        assert_eq!(x.ops, Some(new_ops));
890
891        x.update_buckets(BucketUpdate::Disabled, BucketUpdate::Disabled);
892        assert_eq!(x.bandwidth, None);
893        assert_eq!(x.ops, None);
894    }
895
896    #[test]
897    fn test_rate_limiter_debug() {
898        let l = RateLimiter::new(1, 2, 3, 4, 5, 6).unwrap();
899        assert_eq!(
900            format!("{:?}", l),
901            format!(
902                "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
903                l.bandwidth(),
904                l.ops()
905            ),
906        );
907    }
908}