rate_net/limiter.rs
1//! The rate limiter and the trait every algorithm shares.
2
3use std::fmt;
4use std::num::NonZeroUsize;
5use std::time::Duration;
6
7use clock_lib::{Clock, Monotonic, SystemClock};
8
9use crate::algo::AlgoState;
10use crate::algorithm::Algorithm;
11use crate::decision::Decision;
12use crate::eviction::Eviction;
13use crate::key::Key;
14use crate::quota::Quota;
15use crate::store::Store;
16
17/// Default shard count when the caller does not choose one: four shards per
18/// available core, rounded to a power of two and clamped to a sane range. More
19/// shards means less contention between unrelated keys, at the cost of a little
20/// more memory.
21pub(crate) fn default_shard_count() -> usize {
22 let parallelism = std::thread::available_parallelism()
23 .map(NonZeroUsize::get)
24 .unwrap_or(1);
25 (parallelism.saturating_mul(4))
26 .next_power_of_two()
27 .clamp(1, 4096)
28}
29
30/// The shared rate-limiting surface, independent of the algorithm behind it.
31///
32/// Every limiter — whatever algorithm it uses — answers the same question:
33/// *is this key allowed right now?* `Limiter` is that contract, so generic code
34/// can hold any limiter and call [`check`](Self::check) without naming the
35/// concrete type or its clock. [`RateLimiter`] is the implementation this crate
36/// ships.
37///
38/// Implementors only need to provide [`check_n`](Self::check_n);
39/// [`check`](Self::check) defaults to one unit.
40///
41/// # Examples
42///
43/// ```
44/// use rate_net::{Limiter, RateLimiter};
45///
46/// // Generic over any limiter implementation.
47/// fn admit_one<L: Limiter>(limiter: &L, key: &str) -> bool {
48/// limiter.check(key).is_allow()
49/// }
50///
51/// let limiter = RateLimiter::per_second(2);
52/// assert!(admit_one(&limiter, "user:1"));
53/// ```
54pub trait Limiter {
55 /// Checks `n` units against `key`, returning the [`Decision`].
56 fn check_n(&self, key: impl Into<Key>, n: u32) -> Decision;
57
58 /// Checks a single unit against `key`. Equivalent to `check_n(key, 1)`.
59 fn check(&self, key: impl Into<Key>) -> Decision {
60 self.check_n(key, 1)
61 }
62}
63
64/// A keyed rate limiter.
65///
66/// Tracks an independent allowance for every key it sees and answers
67/// [`check`](Self::check) in the time it takes to hash the key and run its
68/// bucket. Per-key state lives in a [sharded](Self::with_shards) concurrent
69/// store: unrelated keys land in different shards and never contend, an
70/// existing-key check takes only a shared read lock plus the bucket's atomic
71/// accounting, and memory is bounded by [eviction](Self::with_eviction) so a
72/// flood of unique keys hits a cap instead of growing without limit. The
73/// limiter is `Send + Sync` and is meant to be shared — behind an
74/// [`Arc`](std::sync::Arc), or as a `static` — across all the threads serving
75/// requests.
76///
77/// The default algorithm is the token bucket, whose accounting is delegated to
78/// [`better-bucket`](https://crates.io/crates/better-bucket): each key bursts up
79/// to its [`Quota`] immediately, then sustains the quota rate as the allowance
80/// refills. Time comes from an injectable [`Clock`] — [`SystemClock`] in
81/// production, or a `ManualClock` in tests via [`with_clock`](Self::with_clock).
82///
83/// # Examples
84///
85/// ```
86/// use rate_net::{RateLimiter, Decision};
87///
88/// // 100 requests per second, per key.
89/// let limiter = RateLimiter::per_second(100);
90///
91/// match limiter.check("user:42") {
92/// Decision::Allow => { /* serve the request */ }
93/// Decision::Deny { retry_after } => {
94/// // 429, Retry-After: retry_after
95/// let _ = retry_after;
96/// }
97/// _ => {}
98/// }
99/// ```
100pub struct RateLimiter<C: Clock + Clone = SystemClock> {
101 algorithm: Algorithm,
102 quota: Quota,
103 clock: C,
104 epoch: Monotonic,
105 shards: usize,
106 eviction: Eviction,
107 store: Store<C>,
108 /// Whether the check path must read the clock: the token bucket reads its
109 /// own clock and capacity-only eviction needs no real time, so the common
110 /// case skips the read entirely. Window algorithms (which need `now`) and an
111 /// idle TTL (which measures real elapsed time) turn it on.
112 reads_clock: bool,
113}
114
115impl RateLimiter<SystemClock> {
116 /// Creates a limiter allowing `limit` requests per second, per key, driven
117 /// by the OS monotonic clock.
118 ///
119 /// The headline Tier-1 constructor. A `limit` of `0` yields a limiter that
120 /// denies every request.
121 ///
122 /// # Examples
123 ///
124 /// ```
125 /// use rate_net::RateLimiter;
126 ///
127 /// let limiter = RateLimiter::per_second(10);
128 /// assert_eq!(limiter.quota().limit(), 10);
129 /// ```
130 #[must_use]
131 pub fn per_second(limit: u32) -> Self {
132 Self::with_quota(Quota::per_second(limit))
133 }
134
135 /// Creates a limiter allowing `limit` requests per minute, per key.
136 ///
137 /// # Examples
138 ///
139 /// ```
140 /// use rate_net::RateLimiter;
141 /// use std::time::Duration;
142 ///
143 /// let limiter = RateLimiter::per_minute(600);
144 /// assert_eq!(limiter.quota().period(), Duration::from_secs(60));
145 /// ```
146 #[must_use]
147 pub fn per_minute(limit: u32) -> Self {
148 Self::with_quota(Quota::per_minute(limit))
149 }
150
151 /// Creates a limiter from an explicit [`Quota`], driven by the OS monotonic
152 /// clock, with default sharding and a bounded-memory [`Eviction`] policy.
153 ///
154 /// Use this with [`Quota::rate`] when the window is neither a second nor a
155 /// minute.
156 ///
157 /// # Examples
158 ///
159 /// ```
160 /// use rate_net::{RateLimiter, Quota};
161 /// use std::time::Duration;
162 ///
163 /// let quota = Quota::rate(5, Duration::from_millis(100))?;
164 /// let limiter = RateLimiter::with_quota(quota);
165 /// assert_eq!(limiter.quota().limit(), 5);
166 /// # Ok::<(), rate_net::RateLimiterError>(())
167 /// ```
168 #[must_use]
169 pub fn with_quota(quota: Quota) -> Self {
170 Self::build(
171 Algorithm::default(),
172 quota,
173 SystemClock::new(),
174 default_shard_count(),
175 Eviction::default(),
176 )
177 }
178
179 /// Starts a [`Builder`](crate::Builder) — the Tier-2 path that selects the
180 /// algorithm, quota, burst, shard count, eviction policy, and clock in one
181 /// fluent surface.
182 ///
183 /// # Examples
184 ///
185 /// ```
186 /// use rate_net::{RateLimiter, Eviction};
187 /// use std::time::Duration;
188 ///
189 /// let limiter = RateLimiter::builder()
190 /// .quota(1000, Duration::from_secs(60)) // 1000 / minute
191 /// .burst(50)
192 /// .shards(64)
193 /// .eviction(Eviction::idle(Duration::from_secs(300)))
194 /// .build();
195 /// assert_eq!(limiter.quota().limit(), 1000);
196 /// assert_eq!(limiter.quota().burst(), 50);
197 /// ```
198 pub fn builder() -> crate::builder::Builder<SystemClock> {
199 crate::builder::Builder::new()
200 }
201}
202
203impl<C: Clock + Clone> RateLimiter<C> {
204 /// Assembles a limiter from its parts, anchoring the eviction clock. Shared
205 /// with [`Builder`](crate::Builder).
206 pub(crate) fn build(
207 algorithm: Algorithm,
208 quota: Quota,
209 clock: C,
210 shards: usize,
211 eviction: Eviction,
212 ) -> Self {
213 let epoch = clock.now();
214 let store = Store::new(shards, eviction);
215 // The token bucket reads its own clock and capacity-only eviction orders
216 // by a logical counter, so that combination needs no clock read here.
217 let reads_clock = algorithm != Algorithm::TokenBucket || eviction.idle_ttl().is_some();
218 Self {
219 algorithm,
220 quota,
221 clock,
222 epoch,
223 shards,
224 eviction,
225 store,
226 reads_clock,
227 }
228 }
229
230 /// Replaces the limiter's time source, discarding any per-key state.
231 ///
232 /// This is the clock-injection seam, intended for use immediately after
233 /// construction. Injecting a `ManualClock` makes refill behaviour
234 /// deterministic so window and rollover tests run with no `sleep`. The shard
235 /// count and eviction policy are preserved.
236 ///
237 /// # Examples
238 ///
239 /// ```
240 /// use rate_net::RateLimiter;
241 /// use clock_lib::ManualClock;
242 /// use std::sync::Arc;
243 /// use std::time::Duration;
244 ///
245 /// let clock = Arc::new(ManualClock::new());
246 /// let limiter = RateLimiter::per_second(5).with_clock(Arc::clone(&clock));
247 ///
248 /// // Drain the key's allowance.
249 /// for _ in 0..5 {
250 /// assert!(limiter.check("k").is_allow());
251 /// }
252 /// assert!(limiter.check("k").is_deny());
253 ///
254 /// // Advance one second — no real sleep — and the allowance is back.
255 /// clock.advance(Duration::from_secs(1));
256 /// assert!(limiter.check("k").is_allow());
257 /// ```
258 #[must_use]
259 pub fn with_clock<C2: Clock + Clone>(self, clock: C2) -> RateLimiter<C2> {
260 RateLimiter::build(
261 self.algorithm,
262 self.quota,
263 clock,
264 self.shards,
265 self.eviction,
266 )
267 }
268
269 /// Selects the algorithm, discarding any per-key state.
270 ///
271 /// Intended immediately after construction. The leaky bucket and the window
272 /// algorithms require the `algorithms` feature; without it the only
273 /// selectable variant is [`Algorithm::TokenBucket`].
274 ///
275 /// # Examples
276 ///
277 /// ```
278 /// # #[cfg(feature = "algorithms")] {
279 /// use rate_net::{RateLimiter, Algorithm};
280 ///
281 /// let limiter = RateLimiter::per_second(100).with_algorithm(Algorithm::SlidingWindowCounter);
282 /// assert_eq!(limiter.algorithm(), Algorithm::SlidingWindowCounter);
283 /// # }
284 /// ```
285 #[must_use]
286 pub fn with_algorithm(self, algorithm: Algorithm) -> Self {
287 Self::build(
288 algorithm,
289 self.quota,
290 self.clock,
291 self.shards,
292 self.eviction,
293 )
294 }
295
296 /// Sets the shard count, discarding any per-key state.
297 ///
298 /// Intended immediately after construction. More shards reduce contention
299 /// between unrelated keys; the value is rounded up to a power of two. A good
300 /// starting point is a small multiple of the core count.
301 ///
302 /// # Examples
303 ///
304 /// ```
305 /// use rate_net::RateLimiter;
306 ///
307 /// let limiter = RateLimiter::per_second(1000).with_shards(64);
308 /// assert_eq!(limiter.shards(), 64);
309 /// ```
310 #[must_use]
311 pub fn with_shards(self, shards: usize) -> Self {
312 Self::build(
313 self.algorithm,
314 self.quota,
315 self.clock,
316 shards,
317 self.eviction,
318 )
319 }
320
321 /// Sets the eviction policy, discarding any per-key state.
322 ///
323 /// Intended immediately after construction. The default policy bounds memory
324 /// with a generous key-capacity cap; override it to tune the cap or add an
325 /// idle TTL.
326 ///
327 /// # Examples
328 ///
329 /// ```
330 /// use rate_net::{RateLimiter, Eviction};
331 /// use std::time::Duration;
332 ///
333 /// let limiter = RateLimiter::per_second(1000)
334 /// .with_eviction(Eviction::capacity(100_000).with_idle(Duration::from_secs(300)));
335 /// assert_eq!(limiter.eviction().max_keys(), Some(100_000));
336 /// ```
337 #[must_use]
338 pub fn with_eviction(self, eviction: Eviction) -> Self {
339 Self::build(
340 self.algorithm,
341 self.quota,
342 self.clock,
343 self.shards,
344 eviction,
345 )
346 }
347
348 /// Checks a single unit against `key`.
349 ///
350 /// Returns [`Decision::Allow`] if the key is within its limit (the unit is
351 /// counted), or [`Decision::Deny`] with the wait until it would be admitted.
352 /// The key can be anything that converts into a [`Key`] — a string, an IP
353 /// address, a user id.
354 ///
355 /// # Examples
356 ///
357 /// ```
358 /// use rate_net::{RateLimiter, Decision};
359 ///
360 /// let limiter = RateLimiter::per_second(1);
361 /// assert_eq!(limiter.check("user:42"), Decision::Allow);
362 /// assert!(limiter.check("user:42").is_deny()); // limit reached
363 /// ```
364 #[inline]
365 pub fn check(&self, key: impl Into<Key>) -> Decision {
366 self.check_inner(key.into(), 1)
367 }
368
369 /// Checks `n` units against `key` in one operation.
370 ///
371 /// Useful when a single request costs more than one unit (a batch, a
372 /// weighted endpoint). Either all `n` units are admitted or none are.
373 /// Requesting `0` always succeeds; requesting more than the quota can never
374 /// succeed, and the denial's `retry_after` is [`Duration::MAX`].
375 ///
376 /// [`Duration::MAX`]: std::time::Duration::MAX
377 ///
378 /// # Examples
379 ///
380 /// ```
381 /// use rate_net::{RateLimiter, Decision};
382 ///
383 /// let limiter = RateLimiter::per_second(10);
384 /// assert_eq!(limiter.check_n("tenant:acme", 4), Decision::Allow);
385 /// assert_eq!(limiter.check_n("tenant:acme", 6), Decision::Allow);
386 /// assert!(limiter.check_n("tenant:acme", 1).is_deny()); // 10 spent
387 /// ```
388 #[inline]
389 pub fn check_n(&self, key: impl Into<Key>, n: u32) -> Decision {
390 self.check_inner(key.into(), n)
391 }
392
393 /// The quota every key is limited to.
394 ///
395 /// # Examples
396 ///
397 /// ```
398 /// use rate_net::RateLimiter;
399 ///
400 /// assert_eq!(RateLimiter::per_second(50).quota().limit(), 50);
401 /// ```
402 #[must_use]
403 pub fn quota(&self) -> Quota {
404 self.quota
405 }
406
407 /// The algorithm this limiter applies.
408 ///
409 /// # Examples
410 ///
411 /// ```
412 /// use rate_net::{RateLimiter, Algorithm};
413 ///
414 /// assert_eq!(RateLimiter::per_second(1).algorithm(), Algorithm::TokenBucket);
415 /// ```
416 #[must_use]
417 pub const fn algorithm(&self) -> Algorithm {
418 self.algorithm
419 }
420
421 /// The eviction policy bounding the per-key store.
422 ///
423 /// # Examples
424 ///
425 /// ```
426 /// use rate_net::{RateLimiter, DEFAULT_MAX_KEYS};
427 ///
428 /// assert_eq!(RateLimiter::per_second(1).eviction().max_keys(), Some(DEFAULT_MAX_KEYS));
429 /// ```
430 #[must_use]
431 pub const fn eviction(&self) -> Eviction {
432 self.eviction
433 }
434
435 /// The number of shards the per-key store is split across (a power of two).
436 ///
437 /// # Examples
438 ///
439 /// ```
440 /// use rate_net::RateLimiter;
441 ///
442 /// assert_eq!(RateLimiter::per_second(1).with_shards(32).shards(), 32);
443 /// ```
444 #[must_use]
445 pub fn shards(&self) -> usize {
446 self.store.shard_count()
447 }
448
449 /// The number of keys with live state right now.
450 ///
451 /// A momentary snapshot, advisory under concurrent access — and bounded by
452 /// the [eviction](Self::eviction) policy.
453 ///
454 /// # Examples
455 ///
456 /// ```
457 /// use rate_net::RateLimiter;
458 ///
459 /// let limiter = RateLimiter::per_second(1);
460 /// assert_eq!(limiter.tracked_keys(), 0);
461 /// let _ = limiter.check("a");
462 /// assert_eq!(limiter.tracked_keys(), 1);
463 /// ```
464 #[must_use]
465 pub fn tracked_keys(&self) -> usize {
466 self.store.len()
467 }
468
469 /// The shared check path: hand the key to the store as of the elapsed time,
470 /// seeding fresh per-key state if this is the first time the key is seen.
471 #[inline]
472 fn check_inner(&self, key: Key, n: u32) -> Decision {
473 let now = if self.reads_clock {
474 self.now()
475 } else {
476 Duration::ZERO
477 };
478 self.store.check(key, n, now, || self.new_state(now))
479 }
480
481 /// Builds fresh per-key state for the configured algorithm and quota,
482 /// anchored at the elapsed time `now`.
483 fn new_state(&self, now: Duration) -> AlgoState<C> {
484 AlgoState::new(self.algorithm, &self.quota, self.clock.clone(), now)
485 }
486
487 /// Monotonic elapsed time since this limiter's epoch, for the window
488 /// algorithms and eviction timestamps. Saturating, so a multi-million-year
489 /// uptime cannot wrap it.
490 fn now(&self) -> Duration {
491 self.clock.now().saturating_duration_since(self.epoch)
492 }
493}
494
495impl<C: Clock + Clone> Limiter for RateLimiter<C> {
496 fn check_n(&self, key: impl Into<Key>, n: u32) -> Decision {
497 self.check_inner(key.into(), n)
498 }
499}
500
501impl<C: Clock + Clone> fmt::Debug for RateLimiter<C> {
502 /// Formats the limiter without exposing any key. Keys can be caller
503 /// identities or other sensitive values, so only the configuration and the
504 /// live key count are shown.
505 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
506 f.debug_struct("RateLimiter")
507 .field("algorithm", &self.algorithm())
508 .field("quota", &self.quota)
509 .field("shards", &self.shards())
510 .field("eviction", &self.eviction)
511 .field("tracked_keys", &self.store.len())
512 .finish()
513 }
514}
515
516#[cfg(all(test, not(loom)))]
517mod tests {
518 #![allow(clippy::unwrap_used)]
519
520 use std::sync::Arc;
521 use std::time::Duration;
522
523 use clock_lib::ManualClock;
524
525 use super::{Limiter, RateLimiter};
526 use crate::algorithm::Algorithm;
527 use crate::decision::Decision;
528 use crate::eviction::Eviction;
529 use crate::quota::Quota;
530
531 fn manual() -> (Arc<ManualClock>, RateLimiter<Arc<ManualClock>>) {
532 let clock = Arc::new(ManualClock::new());
533 let limiter = RateLimiter::per_second(5).with_clock(Arc::clone(&clock));
534 (clock, limiter)
535 }
536
537 #[test]
538 fn test_fresh_key_is_admitted() {
539 let limiter = RateLimiter::per_second(1);
540 assert_eq!(limiter.check("user:1"), Decision::Allow);
541 }
542
543 #[test]
544 fn test_quota_is_exhausted_then_refills_on_advance() {
545 let (clock, limiter) = manual();
546
547 for _ in 0..5 {
548 assert_eq!(limiter.check("k"), Decision::Allow);
549 }
550 let decision = limiter.check("k");
551 assert!(decision.is_deny());
552 assert!(decision.retry_after().is_some());
553
554 clock.advance(Duration::from_secs(1));
555 assert_eq!(limiter.check("k"), Decision::Allow);
556 }
557
558 #[test]
559 fn test_keys_are_independent() {
560 let (_clock, limiter) = manual();
561
562 for _ in 0..5 {
563 assert!(limiter.check("a").is_allow());
564 }
565 assert!(limiter.check("a").is_deny());
566
567 assert!(limiter.check("b").is_allow());
568 }
569
570 #[test]
571 fn test_check_n_takes_multiple_units_atomically() {
572 let (_clock, limiter) = manual();
573 assert_eq!(limiter.check_n("batch", 3), Decision::Allow);
574 assert_eq!(limiter.check_n("batch", 2), Decision::Allow);
575 assert!(limiter.check_n("batch", 1).is_deny());
576 }
577
578 #[test]
579 fn test_check_n_zero_always_admits() {
580 let (_clock, limiter) = manual();
581 for _ in 0..5 {
582 assert!(limiter.check("k").is_allow());
583 }
584 assert_eq!(limiter.check_n("k", 0), Decision::Allow);
585 }
586
587 #[test]
588 fn test_request_larger_than_quota_can_never_succeed() {
589 let (clock, limiter) = manual();
590 let decision = limiter.check_n("k", 6); // quota is 5
591 assert_eq!(
592 decision,
593 Decision::Deny {
594 retry_after: Duration::MAX
595 }
596 );
597 clock.advance(Duration::from_secs(10));
598 assert_eq!(limiter.check_n("k", 6).retry_after(), Some(Duration::MAX));
599 }
600
601 #[test]
602 fn test_zero_limit_denies_everything() {
603 let limiter = RateLimiter::with_quota(Quota::per_second(0));
604 assert!(limiter.check("k").is_deny());
605 }
606
607 #[test]
608 fn test_partial_refill_admits_proportionally() {
609 let clock = Arc::new(ManualClock::new());
610 let limiter = RateLimiter::per_second(10).with_clock(Arc::clone(&clock));
611 for _ in 0..10 {
612 assert!(limiter.check("k").is_allow());
613 }
614 assert!(limiter.check("k").is_deny());
615
616 clock.advance(Duration::from_millis(300));
617 assert!(limiter.check("k").is_allow());
618 assert!(limiter.check("k").is_allow());
619 assert!(limiter.check("k").is_allow());
620 assert!(limiter.check("k").is_deny());
621 }
622
623 #[test]
624 fn test_tracked_keys_counts_distinct_keys() {
625 let (_clock, limiter) = manual();
626 assert_eq!(limiter.tracked_keys(), 0);
627 let _ = limiter.check("a");
628 let _ = limiter.check("b");
629 let _ = limiter.check("a");
630 assert_eq!(limiter.tracked_keys(), 2);
631 }
632
633 #[test]
634 fn test_introspection_reports_token_bucket() {
635 let limiter = RateLimiter::per_second(1);
636 assert_eq!(limiter.algorithm(), Algorithm::TokenBucket);
637 }
638
639 #[test]
640 fn test_with_shards_rounds_to_power_of_two() {
641 let limiter = RateLimiter::per_second(1).with_shards(5);
642 assert_eq!(limiter.shards(), 8);
643 }
644
645 #[test]
646 fn test_with_eviction_is_reported() {
647 let limiter = RateLimiter::per_second(1).with_eviction(Eviction::capacity(10));
648 assert_eq!(limiter.eviction().max_keys(), Some(10));
649 }
650
651 #[test]
652 fn test_unique_key_flood_is_bounded_by_capacity() {
653 let limiter = RateLimiter::per_second(1)
654 .with_shards(8)
655 .with_eviction(Eviction::capacity(100));
656 for k in 0..50_000u64 {
657 let _ = limiter.check(k);
658 }
659 // Per-shard rounding of a 100-key cap across 8 shards.
660 let bound = 100usize.div_ceil(8).max(1) * 8;
661 assert!(
662 limiter.tracked_keys() <= bound,
663 "flood grew to {} keys, bound {bound}",
664 limiter.tracked_keys()
665 );
666 }
667
668 #[test]
669 fn test_limiter_trait_generic() {
670 fn count_admitted<L: Limiter>(limiter: &L, key: &str, attempts: u32) -> u32 {
671 (0..attempts)
672 .filter(|_| limiter.check(key).is_allow())
673 .count() as u32
674 }
675 let limiter = RateLimiter::per_second(3);
676 assert_eq!(count_admitted(&limiter, "k", 10), 3);
677 }
678
679 #[test]
680 fn test_debug_does_not_leak_keys() {
681 let (_clock, limiter) = manual();
682 let _ = limiter.check("secret-token-do-not-print");
683 let rendered = format!("{limiter:?}");
684 assert!(!rendered.contains("secret-token"));
685 assert!(rendered.contains("RateLimiter"));
686 assert!(rendered.contains("tracked_keys"));
687 }
688}