Skip to main content

rate_net/
limiter.rs

1//! The rate limiter and the trait every algorithm shares.
2
3use std::fmt;
4use std::num::NonZeroUsize;
5use std::time::Duration;
6
7use clock_lib::{Clock, Monotonic, SystemClock};
8
9use crate::algo::AlgoState;
10use crate::algorithm::Algorithm;
11use crate::decision::Decision;
12use crate::eviction::Eviction;
13use crate::key::Key;
14use crate::quota::Quota;
15use crate::store::Store;
16
17/// Default shard count when the caller does not choose one: four shards per
18/// available core, rounded to a power of two and clamped to a sane range. More
19/// shards means less contention between unrelated keys, at the cost of a little
20/// more memory.
21pub(crate) fn default_shard_count() -> usize {
22    let parallelism = std::thread::available_parallelism()
23        .map(NonZeroUsize::get)
24        .unwrap_or(1);
25    (parallelism.saturating_mul(4))
26        .next_power_of_two()
27        .clamp(1, 4096)
28}
29
30/// The shared rate-limiting surface, independent of the algorithm behind it.
31///
32/// Every limiter — whatever algorithm it uses — answers the same question:
33/// *is this key allowed right now?* `Limiter` is that contract, so generic code
34/// can hold any limiter and call [`check`](Self::check) without naming the
35/// concrete type or its clock. [`RateLimiter`] is the implementation this crate
36/// ships.
37///
38/// Implementors only need to provide [`check_n`](Self::check_n);
39/// [`check`](Self::check) defaults to one unit.
40///
41/// # Examples
42///
43/// ```
44/// use rate_net::{Limiter, RateLimiter};
45///
46/// // Generic over any limiter implementation.
47/// fn admit_one<L: Limiter>(limiter: &L, key: &str) -> bool {
48///     limiter.check(key).is_allow()
49/// }
50///
51/// let limiter = RateLimiter::per_second(2);
52/// assert!(admit_one(&limiter, "user:1"));
53/// ```
54pub trait Limiter {
55    /// Checks `n` units against `key`, returning the [`Decision`].
56    fn check_n(&self, key: impl Into<Key>, n: u32) -> Decision;
57
58    /// Checks a single unit against `key`. Equivalent to `check_n(key, 1)`.
59    fn check(&self, key: impl Into<Key>) -> Decision {
60        self.check_n(key, 1)
61    }
62}
63
64/// A keyed rate limiter.
65///
66/// Tracks an independent allowance for every key it sees and answers
67/// [`check`](Self::check) in the time it takes to hash the key and run its
68/// bucket. Per-key state lives in a [sharded](Self::with_shards) concurrent
69/// store: unrelated keys land in different shards and never contend, an
70/// existing-key check takes only a shared read lock plus the bucket's atomic
71/// accounting, and memory is bounded by [eviction](Self::with_eviction) so a
72/// flood of unique keys hits a cap instead of growing without limit. The
73/// limiter is `Send + Sync` and is meant to be shared — behind an
74/// [`Arc`](std::sync::Arc), or as a `static` — across all the threads serving
75/// requests.
76///
77/// The default algorithm is the token bucket, whose accounting is delegated to
78/// [`better-bucket`](https://crates.io/crates/better-bucket): each key bursts up
79/// to its [`Quota`] immediately, then sustains the quota rate as the allowance
80/// refills. Time comes from an injectable [`Clock`] — [`SystemClock`] in
81/// production, or a `ManualClock` in tests via [`with_clock`](Self::with_clock).
82///
83/// # Examples
84///
85/// ```
86/// use rate_net::{RateLimiter, Decision};
87///
88/// // 100 requests per second, per key.
89/// let limiter = RateLimiter::per_second(100);
90///
91/// match limiter.check("user:42") {
92///     Decision::Allow => { /* serve the request */ }
93///     Decision::Deny { retry_after } => {
94///         // 429, Retry-After: retry_after
95///         let _ = retry_after;
96///     }
97///     _ => {}
98/// }
99/// ```
100pub struct RateLimiter<C: Clock + Clone = SystemClock> {
101    algorithm: Algorithm,
102    quota: Quota,
103    clock: C,
104    epoch: Monotonic,
105    shards: usize,
106    eviction: Eviction,
107    store: Store<C>,
108    /// Whether the check path must read the clock: the token bucket reads its
109    /// own clock and capacity-only eviction needs no real time, so the common
110    /// case skips the read entirely. Window algorithms (which need `now`) and an
111    /// idle TTL (which measures real elapsed time) turn it on.
112    reads_clock: bool,
113}
114
115impl RateLimiter<SystemClock> {
116    /// Creates a limiter allowing `limit` requests per second, per key, driven
117    /// by the OS monotonic clock.
118    ///
119    /// The headline Tier-1 constructor. A `limit` of `0` yields a limiter that
120    /// denies every request.
121    ///
122    /// # Examples
123    ///
124    /// ```
125    /// use rate_net::RateLimiter;
126    ///
127    /// let limiter = RateLimiter::per_second(10);
128    /// assert_eq!(limiter.quota().limit(), 10);
129    /// ```
130    #[must_use]
131    pub fn per_second(limit: u32) -> Self {
132        Self::with_quota(Quota::per_second(limit))
133    }
134
135    /// Creates a limiter allowing `limit` requests per minute, per key.
136    ///
137    /// # Examples
138    ///
139    /// ```
140    /// use rate_net::RateLimiter;
141    /// use std::time::Duration;
142    ///
143    /// let limiter = RateLimiter::per_minute(600);
144    /// assert_eq!(limiter.quota().period(), Duration::from_secs(60));
145    /// ```
146    #[must_use]
147    pub fn per_minute(limit: u32) -> Self {
148        Self::with_quota(Quota::per_minute(limit))
149    }
150
151    /// Creates a limiter from an explicit [`Quota`], driven by the OS monotonic
152    /// clock, with default sharding and a bounded-memory [`Eviction`] policy.
153    ///
154    /// Use this with [`Quota::rate`] when the window is neither a second nor a
155    /// minute.
156    ///
157    /// # Examples
158    ///
159    /// ```
160    /// use rate_net::{RateLimiter, Quota};
161    /// use std::time::Duration;
162    ///
163    /// let quota = Quota::rate(5, Duration::from_millis(100))?;
164    /// let limiter = RateLimiter::with_quota(quota);
165    /// assert_eq!(limiter.quota().limit(), 5);
166    /// # Ok::<(), rate_net::RateLimiterError>(())
167    /// ```
168    #[must_use]
169    pub fn with_quota(quota: Quota) -> Self {
170        Self::build(
171            Algorithm::default(),
172            quota,
173            SystemClock::new(),
174            default_shard_count(),
175            Eviction::default(),
176        )
177    }
178
179    /// Starts a [`Builder`](crate::Builder) — the Tier-2 path that selects the
180    /// algorithm, quota, burst, shard count, eviction policy, and clock in one
181    /// fluent surface.
182    ///
183    /// # Examples
184    ///
185    /// ```
186    /// use rate_net::{RateLimiter, Eviction};
187    /// use std::time::Duration;
188    ///
189    /// let limiter = RateLimiter::builder()
190    ///     .quota(1000, Duration::from_secs(60)) // 1000 / minute
191    ///     .burst(50)
192    ///     .shards(64)
193    ///     .eviction(Eviction::idle(Duration::from_secs(300)))
194    ///     .build();
195    /// assert_eq!(limiter.quota().limit(), 1000);
196    /// assert_eq!(limiter.quota().burst(), 50);
197    /// ```
198    pub fn builder() -> crate::builder::Builder<SystemClock> {
199        crate::builder::Builder::new()
200    }
201}
202
203impl<C: Clock + Clone> RateLimiter<C> {
204    /// Assembles a limiter from its parts, anchoring the eviction clock. Shared
205    /// with [`Builder`](crate::Builder).
206    pub(crate) fn build(
207        algorithm: Algorithm,
208        quota: Quota,
209        clock: C,
210        shards: usize,
211        eviction: Eviction,
212    ) -> Self {
213        let epoch = clock.now();
214        let store = Store::new(shards, eviction);
215        // The token bucket reads its own clock and capacity-only eviction orders
216        // by a logical counter, so that combination needs no clock read here.
217        let reads_clock = algorithm != Algorithm::TokenBucket || eviction.idle_ttl().is_some();
218        Self {
219            algorithm,
220            quota,
221            clock,
222            epoch,
223            shards,
224            eviction,
225            store,
226            reads_clock,
227        }
228    }
229
230    /// Replaces the limiter's time source, discarding any per-key state.
231    ///
232    /// This is the clock-injection seam, intended for use immediately after
233    /// construction. Injecting a `ManualClock` makes refill behaviour
234    /// deterministic so window and rollover tests run with no `sleep`. The shard
235    /// count and eviction policy are preserved.
236    ///
237    /// # Examples
238    ///
239    /// ```
240    /// use rate_net::RateLimiter;
241    /// use clock_lib::ManualClock;
242    /// use std::sync::Arc;
243    /// use std::time::Duration;
244    ///
245    /// let clock = Arc::new(ManualClock::new());
246    /// let limiter = RateLimiter::per_second(5).with_clock(Arc::clone(&clock));
247    ///
248    /// // Drain the key's allowance.
249    /// for _ in 0..5 {
250    ///     assert!(limiter.check("k").is_allow());
251    /// }
252    /// assert!(limiter.check("k").is_deny());
253    ///
254    /// // Advance one second — no real sleep — and the allowance is back.
255    /// clock.advance(Duration::from_secs(1));
256    /// assert!(limiter.check("k").is_allow());
257    /// ```
258    #[must_use]
259    pub fn with_clock<C2: Clock + Clone>(self, clock: C2) -> RateLimiter<C2> {
260        RateLimiter::build(
261            self.algorithm,
262            self.quota,
263            clock,
264            self.shards,
265            self.eviction,
266        )
267    }
268
269    /// Selects the algorithm, discarding any per-key state.
270    ///
271    /// Intended immediately after construction. The leaky bucket and the window
272    /// algorithms require the `algorithms` feature; without it the only
273    /// selectable variant is [`Algorithm::TokenBucket`].
274    ///
275    /// # Examples
276    ///
277    /// ```
278    /// # #[cfg(feature = "algorithms")] {
279    /// use rate_net::{RateLimiter, Algorithm};
280    ///
281    /// let limiter = RateLimiter::per_second(100).with_algorithm(Algorithm::SlidingWindowCounter);
282    /// assert_eq!(limiter.algorithm(), Algorithm::SlidingWindowCounter);
283    /// # }
284    /// ```
285    #[must_use]
286    pub fn with_algorithm(self, algorithm: Algorithm) -> Self {
287        Self::build(
288            algorithm,
289            self.quota,
290            self.clock,
291            self.shards,
292            self.eviction,
293        )
294    }
295
296    /// Sets the shard count, discarding any per-key state.
297    ///
298    /// Intended immediately after construction. More shards reduce contention
299    /// between unrelated keys; the value is rounded up to a power of two. A good
300    /// starting point is a small multiple of the core count.
301    ///
302    /// # Examples
303    ///
304    /// ```
305    /// use rate_net::RateLimiter;
306    ///
307    /// let limiter = RateLimiter::per_second(1000).with_shards(64);
308    /// assert_eq!(limiter.shards(), 64);
309    /// ```
310    #[must_use]
311    pub fn with_shards(self, shards: usize) -> Self {
312        Self::build(
313            self.algorithm,
314            self.quota,
315            self.clock,
316            shards,
317            self.eviction,
318        )
319    }
320
321    /// Sets the eviction policy, discarding any per-key state.
322    ///
323    /// Intended immediately after construction. The default policy bounds memory
324    /// with a generous key-capacity cap; override it to tune the cap or add an
325    /// idle TTL.
326    ///
327    /// # Examples
328    ///
329    /// ```
330    /// use rate_net::{RateLimiter, Eviction};
331    /// use std::time::Duration;
332    ///
333    /// let limiter = RateLimiter::per_second(1000)
334    ///     .with_eviction(Eviction::capacity(100_000).with_idle(Duration::from_secs(300)));
335    /// assert_eq!(limiter.eviction().max_keys(), Some(100_000));
336    /// ```
337    #[must_use]
338    pub fn with_eviction(self, eviction: Eviction) -> Self {
339        Self::build(
340            self.algorithm,
341            self.quota,
342            self.clock,
343            self.shards,
344            eviction,
345        )
346    }
347
348    /// Checks a single unit against `key`.
349    ///
350    /// Returns [`Decision::Allow`] if the key is within its limit (the unit is
351    /// counted), or [`Decision::Deny`] with the wait until it would be admitted.
352    /// The key can be anything that converts into a [`Key`] — a string, an IP
353    /// address, a user id.
354    ///
355    /// # Examples
356    ///
357    /// ```
358    /// use rate_net::{RateLimiter, Decision};
359    ///
360    /// let limiter = RateLimiter::per_second(1);
361    /// assert_eq!(limiter.check("user:42"), Decision::Allow);
362    /// assert!(limiter.check("user:42").is_deny()); // limit reached
363    /// ```
364    #[inline]
365    pub fn check(&self, key: impl Into<Key>) -> Decision {
366        self.check_inner(key.into(), 1)
367    }
368
369    /// Checks `n` units against `key` in one operation.
370    ///
371    /// Useful when a single request costs more than one unit (a batch, a
372    /// weighted endpoint). Either all `n` units are admitted or none are.
373    /// Requesting `0` always succeeds; requesting more than the quota can never
374    /// succeed, and the denial's `retry_after` is [`Duration::MAX`].
375    ///
376    /// [`Duration::MAX`]: std::time::Duration::MAX
377    ///
378    /// # Examples
379    ///
380    /// ```
381    /// use rate_net::{RateLimiter, Decision};
382    ///
383    /// let limiter = RateLimiter::per_second(10);
384    /// assert_eq!(limiter.check_n("tenant:acme", 4), Decision::Allow);
385    /// assert_eq!(limiter.check_n("tenant:acme", 6), Decision::Allow);
386    /// assert!(limiter.check_n("tenant:acme", 1).is_deny()); // 10 spent
387    /// ```
388    #[inline]
389    pub fn check_n(&self, key: impl Into<Key>, n: u32) -> Decision {
390        self.check_inner(key.into(), n)
391    }
392
393    /// The quota every key is limited to.
394    ///
395    /// # Examples
396    ///
397    /// ```
398    /// use rate_net::RateLimiter;
399    ///
400    /// assert_eq!(RateLimiter::per_second(50).quota().limit(), 50);
401    /// ```
402    #[must_use]
403    pub fn quota(&self) -> Quota {
404        self.quota
405    }
406
407    /// The algorithm this limiter applies.
408    ///
409    /// # Examples
410    ///
411    /// ```
412    /// use rate_net::{RateLimiter, Algorithm};
413    ///
414    /// assert_eq!(RateLimiter::per_second(1).algorithm(), Algorithm::TokenBucket);
415    /// ```
416    #[must_use]
417    pub const fn algorithm(&self) -> Algorithm {
418        self.algorithm
419    }
420
421    /// The eviction policy bounding the per-key store.
422    ///
423    /// # Examples
424    ///
425    /// ```
426    /// use rate_net::{RateLimiter, DEFAULT_MAX_KEYS};
427    ///
428    /// assert_eq!(RateLimiter::per_second(1).eviction().max_keys(), Some(DEFAULT_MAX_KEYS));
429    /// ```
430    #[must_use]
431    pub const fn eviction(&self) -> Eviction {
432        self.eviction
433    }
434
435    /// The number of shards the per-key store is split across (a power of two).
436    ///
437    /// # Examples
438    ///
439    /// ```
440    /// use rate_net::RateLimiter;
441    ///
442    /// assert_eq!(RateLimiter::per_second(1).with_shards(32).shards(), 32);
443    /// ```
444    #[must_use]
445    pub fn shards(&self) -> usize {
446        self.store.shard_count()
447    }
448
449    /// The number of keys with live state right now.
450    ///
451    /// A momentary snapshot, advisory under concurrent access — and bounded by
452    /// the [eviction](Self::eviction) policy.
453    ///
454    /// # Examples
455    ///
456    /// ```
457    /// use rate_net::RateLimiter;
458    ///
459    /// let limiter = RateLimiter::per_second(1);
460    /// assert_eq!(limiter.tracked_keys(), 0);
461    /// let _ = limiter.check("a");
462    /// assert_eq!(limiter.tracked_keys(), 1);
463    /// ```
464    #[must_use]
465    pub fn tracked_keys(&self) -> usize {
466        self.store.len()
467    }
468
469    /// The shared check path: hand the key to the store as of the elapsed time,
470    /// seeding fresh per-key state if this is the first time the key is seen.
471    #[inline]
472    fn check_inner(&self, key: Key, n: u32) -> Decision {
473        let now = if self.reads_clock {
474            self.now()
475        } else {
476            Duration::ZERO
477        };
478        self.store.check(key, n, now, || self.new_state(now))
479    }
480
481    /// Builds fresh per-key state for the configured algorithm and quota,
482    /// anchored at the elapsed time `now`.
483    fn new_state(&self, now: Duration) -> AlgoState<C> {
484        AlgoState::new(self.algorithm, &self.quota, self.clock.clone(), now)
485    }
486
487    /// Monotonic elapsed time since this limiter's epoch, for the window
488    /// algorithms and eviction timestamps. Saturating, so a multi-million-year
489    /// uptime cannot wrap it.
490    fn now(&self) -> Duration {
491        self.clock.now().saturating_duration_since(self.epoch)
492    }
493}
494
495impl<C: Clock + Clone> Limiter for RateLimiter<C> {
496    fn check_n(&self, key: impl Into<Key>, n: u32) -> Decision {
497        self.check_inner(key.into(), n)
498    }
499}
500
501impl<C: Clock + Clone> fmt::Debug for RateLimiter<C> {
502    /// Formats the limiter without exposing any key. Keys can be caller
503    /// identities or other sensitive values, so only the configuration and the
504    /// live key count are shown.
505    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
506        f.debug_struct("RateLimiter")
507            .field("algorithm", &self.algorithm())
508            .field("quota", &self.quota)
509            .field("shards", &self.shards())
510            .field("eviction", &self.eviction)
511            .field("tracked_keys", &self.store.len())
512            .finish()
513    }
514}
515
516#[cfg(all(test, not(loom)))]
517mod tests {
518    #![allow(clippy::unwrap_used)]
519
520    use std::sync::Arc;
521    use std::time::Duration;
522
523    use clock_lib::ManualClock;
524
525    use super::{Limiter, RateLimiter};
526    use crate::algorithm::Algorithm;
527    use crate::decision::Decision;
528    use crate::eviction::Eviction;
529    use crate::quota::Quota;
530
531    fn manual() -> (Arc<ManualClock>, RateLimiter<Arc<ManualClock>>) {
532        let clock = Arc::new(ManualClock::new());
533        let limiter = RateLimiter::per_second(5).with_clock(Arc::clone(&clock));
534        (clock, limiter)
535    }
536
537    #[test]
538    fn test_fresh_key_is_admitted() {
539        let limiter = RateLimiter::per_second(1);
540        assert_eq!(limiter.check("user:1"), Decision::Allow);
541    }
542
543    #[test]
544    fn test_quota_is_exhausted_then_refills_on_advance() {
545        let (clock, limiter) = manual();
546
547        for _ in 0..5 {
548            assert_eq!(limiter.check("k"), Decision::Allow);
549        }
550        let decision = limiter.check("k");
551        assert!(decision.is_deny());
552        assert!(decision.retry_after().is_some());
553
554        clock.advance(Duration::from_secs(1));
555        assert_eq!(limiter.check("k"), Decision::Allow);
556    }
557
558    #[test]
559    fn test_keys_are_independent() {
560        let (_clock, limiter) = manual();
561
562        for _ in 0..5 {
563            assert!(limiter.check("a").is_allow());
564        }
565        assert!(limiter.check("a").is_deny());
566
567        assert!(limiter.check("b").is_allow());
568    }
569
570    #[test]
571    fn test_check_n_takes_multiple_units_atomically() {
572        let (_clock, limiter) = manual();
573        assert_eq!(limiter.check_n("batch", 3), Decision::Allow);
574        assert_eq!(limiter.check_n("batch", 2), Decision::Allow);
575        assert!(limiter.check_n("batch", 1).is_deny());
576    }
577
578    #[test]
579    fn test_check_n_zero_always_admits() {
580        let (_clock, limiter) = manual();
581        for _ in 0..5 {
582            assert!(limiter.check("k").is_allow());
583        }
584        assert_eq!(limiter.check_n("k", 0), Decision::Allow);
585    }
586
587    #[test]
588    fn test_request_larger_than_quota_can_never_succeed() {
589        let (clock, limiter) = manual();
590        let decision = limiter.check_n("k", 6); // quota is 5
591        assert_eq!(
592            decision,
593            Decision::Deny {
594                retry_after: Duration::MAX
595            }
596        );
597        clock.advance(Duration::from_secs(10));
598        assert_eq!(limiter.check_n("k", 6).retry_after(), Some(Duration::MAX));
599    }
600
601    #[test]
602    fn test_zero_limit_denies_everything() {
603        let limiter = RateLimiter::with_quota(Quota::per_second(0));
604        assert!(limiter.check("k").is_deny());
605    }
606
607    #[test]
608    fn test_partial_refill_admits_proportionally() {
609        let clock = Arc::new(ManualClock::new());
610        let limiter = RateLimiter::per_second(10).with_clock(Arc::clone(&clock));
611        for _ in 0..10 {
612            assert!(limiter.check("k").is_allow());
613        }
614        assert!(limiter.check("k").is_deny());
615
616        clock.advance(Duration::from_millis(300));
617        assert!(limiter.check("k").is_allow());
618        assert!(limiter.check("k").is_allow());
619        assert!(limiter.check("k").is_allow());
620        assert!(limiter.check("k").is_deny());
621    }
622
623    #[test]
624    fn test_tracked_keys_counts_distinct_keys() {
625        let (_clock, limiter) = manual();
626        assert_eq!(limiter.tracked_keys(), 0);
627        let _ = limiter.check("a");
628        let _ = limiter.check("b");
629        let _ = limiter.check("a");
630        assert_eq!(limiter.tracked_keys(), 2);
631    }
632
633    #[test]
634    fn test_introspection_reports_token_bucket() {
635        let limiter = RateLimiter::per_second(1);
636        assert_eq!(limiter.algorithm(), Algorithm::TokenBucket);
637    }
638
639    #[test]
640    fn test_with_shards_rounds_to_power_of_two() {
641        let limiter = RateLimiter::per_second(1).with_shards(5);
642        assert_eq!(limiter.shards(), 8);
643    }
644
645    #[test]
646    fn test_with_eviction_is_reported() {
647        let limiter = RateLimiter::per_second(1).with_eviction(Eviction::capacity(10));
648        assert_eq!(limiter.eviction().max_keys(), Some(10));
649    }
650
651    #[test]
652    fn test_unique_key_flood_is_bounded_by_capacity() {
653        let limiter = RateLimiter::per_second(1)
654            .with_shards(8)
655            .with_eviction(Eviction::capacity(100));
656        for k in 0..50_000u64 {
657            let _ = limiter.check(k);
658        }
659        // Per-shard rounding of a 100-key cap across 8 shards.
660        let bound = 100usize.div_ceil(8).max(1) * 8;
661        assert!(
662            limiter.tracked_keys() <= bound,
663            "flood grew to {} keys, bound {bound}",
664            limiter.tracked_keys()
665        );
666    }
667
668    #[test]
669    fn test_limiter_trait_generic() {
670        fn count_admitted<L: Limiter>(limiter: &L, key: &str, attempts: u32) -> u32 {
671            (0..attempts)
672                .filter(|_| limiter.check(key).is_allow())
673                .count() as u32
674        }
675        let limiter = RateLimiter::per_second(3);
676        assert_eq!(count_admitted(&limiter, "k", 10), 3);
677    }
678
679    #[test]
680    fn test_debug_does_not_leak_keys() {
681        let (_clock, limiter) = manual();
682        let _ = limiter.check("secret-token-do-not-print");
683        let rendered = format!("{limiter:?}");
684        assert!(!rendered.contains("secret-token"));
685        assert!(rendered.contains("RateLimiter"));
686        assert!(rendered.contains("tracked_keys"));
687    }
688}