Skip to main content

mkt_core/http/
rate_limit.rs

1//! Token-bucket rate limiter for API calls.
2//!
3//! Backed by [`governor`] (GCRA): `acquire` genuinely waits until the
4//! bucket has room, unlike the previous semaphore that released permits
5//! immediately and limited nothing. Each provider client constructs its
6//! limiter with a conservative requests-per-second default; reads cost 1
7//! cell and writes 3, so writes consume budget faster.
8
9use std::num::NonZeroU32;
10use std::sync::Arc;
11
12use governor::clock::DefaultClock;
13use governor::state::{InMemoryState, NotKeyed};
14use governor::{Quota, RateLimiter as GovernorLimiter};
15
16type DirectLimiter = GovernorLimiter<NotKeyed, InMemoryState, DefaultClock>;
17
18/// A client-side requests-per-second limiter shared by one provider client.
19#[derive(Debug, Clone)]
20pub struct RateLimiter {
21    limiter: Arc<DirectLimiter>,
22    max_cost: u32,
23}
24
25impl RateLimiter {
26    /// Create a limiter allowing `per_second` request cells per second
27    /// (burst capacity equals one second of budget, minimum 3 so a
28    /// single write always fits).
29    #[must_use]
30    pub fn new(per_second: u32) -> Self {
31        let rate = NonZeroU32::new(per_second.max(1)).unwrap_or(NonZeroU32::MIN);
32        let burst = NonZeroU32::new(per_second.max(3)).unwrap_or(NonZeroU32::MIN);
33        Self {
34            limiter: Arc::new(GovernorLimiter::direct(
35                Quota::per_second(rate).allow_burst(burst),
36            )),
37            max_cost: burst.get(),
38        }
39    }
40
41    /// Wait until `cost` cells are available (1 = read, 3 = write).
42    ///
43    /// Costs above the burst capacity are clamped so the call can ever
44    /// complete.
45    ///
46    /// # Errors
47    ///
48    /// Currently infallible; `Result` is kept for API stability.
49    pub async fn acquire(&self, cost: u32) -> crate::error::Result<()> {
50        let cost = cost.clamp(1, self.max_cost);
51        let cells = NonZeroU32::new(cost).unwrap_or(NonZeroU32::MIN);
52        // until_n_ready only errors when cells > burst, which the clamp
53        // above rules out.
54        if let Err(error) = self.limiter.until_n_ready(cells).await {
55            return Err(crate::error::MktError::ConfigError(format!(
56                "rate limiter misconfigured: {error}"
57            )));
58        }
59        Ok(())
60    }
61}
62
63#[cfg(test)]
64mod tests {
65    #![allow(clippy::unwrap_used)]
66
67    use std::time::{Duration, Instant};
68
69    use super::*;
70
71    #[tokio::test]
72    async fn burst_within_budget_is_immediate() {
73        let limiter = RateLimiter::new(100);
74        let start = Instant::now();
75        for _ in 0..5 {
76            limiter.acquire(1).await.unwrap();
77        }
78        assert!(
79            start.elapsed() < Duration::from_millis(100),
80            "within-burst acquires must not block, took {:?}",
81            start.elapsed()
82        );
83    }
84
85    #[tokio::test]
86    async fn exceeding_the_rate_actually_waits() {
87        // 5 cells/s, burst 5: the 6th cell must wait ~200ms for refill.
88        let limiter = RateLimiter::new(5);
89        let start = Instant::now();
90        for _ in 0..6 {
91            limiter.acquire(1).await.unwrap();
92        }
93        assert!(
94            start.elapsed() >= Duration::from_millis(120),
95            "the limiter must actually delay past the burst, took {:?}",
96            start.elapsed()
97        );
98    }
99
100    #[tokio::test]
101    async fn write_cost_exceeding_burst_is_clamped_not_stuck() {
102        let limiter = RateLimiter::new(1); // burst raised to 3 internally
103        limiter.acquire(3).await.unwrap();
104        // A pathological cost above burst still completes.
105        let limiter = RateLimiter::new(1);
106        limiter.acquire(100).await.unwrap();
107    }
108}