1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
//! The Generic Cell Rate Algorithm

use thread_safety::ThreadsafeWrapper;
use {
    algorithms::{Algorithm, NonConformance, RateLimitState},
    InconsistentCapacity, NegativeMultiDecision,
};

use evmap::ShallowCopy;

use std::cmp;
use std::num::NonZeroU32;
use std::time::{Duration, Instant};

/// The GCRA's state about a single rate limiting history.
#[derive(Debug, Eq, PartialEq, Default, Clone)]
pub struct State(ThreadsafeWrapper<Tat>);

impl ShallowCopy for State {
    unsafe fn shallow_copy(&mut self) -> Self {
        State(self.0.shallow_copy())
    }
}

impl RateLimitState<GCRA> for State {
    fn last_touched(&self, params: &GCRA) -> Instant {
        let data = self.0.snapshot();
        data.0.unwrap_or_else(Instant::now) + params.tau
    }
}

#[derive(Debug, Eq, PartialEq, Clone)]
struct Tat(Option<Instant>);

impl Default for Tat {
    fn default() -> Self {
        Tat(None)
    }
}

/// Returned in case of a negative rate-limiting decision. Indicates
/// the earliest instant that a cell might get accepted again.
///
/// To avoid thundering herd effects, client code should always add a
/// random amount of jitter to wait time estimates.
#[derive(Fail, Debug, PartialEq)]
#[fail(display = "rate-limited until {:?}", _0)]
pub struct NotUntil(Instant);

impl NonConformance for NotUntil {
    fn earliest_possible(&self) -> Instant {
        self.0
    }

    fn wait_time_from(&self, from: Instant) -> Duration {
        self.0.duration_since(from)
    }
}

/// Implements the virtual scheduling description of the Generic Cell
/// Rate Algorithm, attributed to ITU-T in recommendation I.371
/// Traffic control and congestion control in B-ISDN; from
/// [Wikipedia](https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm).
///
///
/// While algorithms like leaky-bucket rate limiters allow cells to be
/// distributed across time in any way, GCRA is a rate-limiting *and*
/// traffic-shaping algorithm. It mandates that a minimum amount of
/// time passes between cells being measured. For example, if your API
/// mandates that only 20 requests can be made per second, GCRA will
/// ensure that each request is at least 50ms apart from the previous
/// request. This makes GCRA suitable for shaping traffic in
/// networking and telecom equipment (it was initially made for
/// asynchronous transfer mode networks), or for outgoing workloads on
/// *consumers* of attention, e.g. distributing outgoing emails across
/// a day.
///
/// # A note about batch decisions
/// In a blatant side-stepping of the above traffic-shaping criteria,
/// this implementation of GCRA comes with an extension that allows
/// measuring multiple cells at once, assuming that if a pause of
/// `n*(the minimum time between cells)` has passed, we can allow a
/// single big batch of `n` cells through. This assumption may not be
/// correct for your application, but if you depend on GCRA's
/// traffic-shaping properties, it's better to not use the `_n`
/// suffixed check functions.
///
/// # Example
/// In this example, we construct a rate-limiter with the GCR
/// algorithm that can accommodate 20 cells per second. This translates
/// to the GCRA parameters τ=1s, T=50ms (that's 1s / 20 cells).
///
/// ```
/// # use ratelimit_meter::{DirectRateLimiter, GCRA};
/// # use std::num::NonZeroU32;
/// # use std::time::{Instant, Duration};
/// # #[macro_use] extern crate nonzero_ext;
/// # extern crate ratelimit_meter;
/// # fn main () {
/// let mut limiter = DirectRateLimiter::<GCRA>::per_second(nonzero!(20u32));
/// let now = Instant::now();
/// let ms = Duration::from_millis(1);
/// assert_eq!(Ok(()), limiter.check_at(now)); // the first cell is free
/// for i in 0..20 {
///     // Spam a lot:
///     assert!(limiter.check_at(now).is_ok(), "at {}", i);
/// }
/// // We have exceeded the bucket capacity:
/// assert!(limiter.check_at(now).is_err());
///
/// // After a sufficient time period, cells are allowed again:
/// assert_eq!(Ok(()), limiter.check_at(now + ms*50));
/// # }
#[derive(Debug, Clone)]
pub struct GCRA {
    // The "weight" of a single packet in units of time.
    t: Duration,

    // The "capacity" of the bucket.
    tau: Duration,
}

impl Algorithm for GCRA {
    type BucketState = State;

    type NegativeDecision = NotUntil;

    fn construct(
        capacity: NonZeroU32,
        cell_weight: NonZeroU32,
        per_time_unit: Duration,
    ) -> Result<Self, InconsistentCapacity> {
        if capacity < cell_weight {
            return Err(InconsistentCapacity {
                capacity,
                cell_weight,
            });
        }
        Ok(GCRA {
            t: (per_time_unit / capacity.get()) * cell_weight.get(),
            tau: per_time_unit,
        })
    }

    /// Tests if a single cell can be accommodated by the
    /// rate-limiter. This is a threadsafe implementation of the
    /// method described directly in the GCRA algorithm.
    fn test_and_update(
        &self,
        state: &Self::BucketState,
        t0: Instant,
    ) -> Result<(), Self::NegativeDecision> {
        let tau = self.tau;
        let t = self.t;
        state.0.measure_and_replace(|tat| {
            let tat = tat.0.unwrap_or(t0);
            if t0 < tat - tau {
                (Err(NotUntil(tat)), None)
            } else {
                (Ok(()), Some(Tat(Some(cmp::max(tat, t0) + t))))
            }
        })
    }

    /// Tests if `n` cells can be accommodated by the rate-limiter
    /// and updates rate limiter state iff they can be.
    ///
    /// As this method is an extension of GCRA (using multiplication),
    /// it is likely not as fast (and not as obviously "right") as the
    /// single-cell variant.
    fn test_n_and_update(
        &self,
        state: &Self::BucketState,
        n: u32,
        t0: Instant,
    ) -> Result<(), NegativeMultiDecision<Self::NegativeDecision>> {
        let tau = self.tau;
        let t = self.t;
        state.0.measure_and_replace(|tat| {
            let tat = tat.0.unwrap_or(t0);
            let tat = match n {
                0 => t0,
                1 => tat,
                _ => {
                    let weight = t * (n - 1);
                    if (weight + t) > tau {
                        // The bucket capacity can never accommodate this request
                        return (Err(NegativeMultiDecision::InsufficientCapacity(n)), None);
                    }
                    tat + weight
                }
            };

            let additional_weight = match n {
                0 => Duration::new(0, 0),
                1 => t,
                _ => t * n,
            };
            if t0 < tat - tau {
                (
                    Err(NegativeMultiDecision::BatchNonConforming(n, NotUntil(tat))),
                    None,
                )
            } else {
                (
                    Ok(()),
                    Some(Tat(Some(cmp::max(tat, t0) + additional_weight))),
                )
            }
        })
    }
}