mkt_core/http/
rate_limit.rs1use std::num::NonZeroU32;
10use std::sync::Arc;
11
12use governor::clock::DefaultClock;
13use governor::state::{InMemoryState, NotKeyed};
14use governor::{Quota, RateLimiter as GovernorLimiter};
15
16type DirectLimiter = GovernorLimiter<NotKeyed, InMemoryState, DefaultClock>;
17
18#[derive(Debug, Clone)]
20pub struct RateLimiter {
21 limiter: Arc<DirectLimiter>,
22 max_cost: u32,
23}
24
25impl RateLimiter {
26 #[must_use]
30 pub fn new(per_second: u32) -> Self {
31 let rate = NonZeroU32::new(per_second.max(1)).unwrap_or(NonZeroU32::MIN);
32 let burst = NonZeroU32::new(per_second.max(3)).unwrap_or(NonZeroU32::MIN);
33 Self {
34 limiter: Arc::new(GovernorLimiter::direct(
35 Quota::per_second(rate).allow_burst(burst),
36 )),
37 max_cost: burst.get(),
38 }
39 }
40
41 pub async fn acquire(&self, cost: u32) -> crate::error::Result<()> {
50 let cost = cost.clamp(1, self.max_cost);
51 let cells = NonZeroU32::new(cost).unwrap_or(NonZeroU32::MIN);
52 if let Err(error) = self.limiter.until_n_ready(cells).await {
55 return Err(crate::error::MktError::ConfigError(format!(
56 "rate limiter misconfigured: {error}"
57 )));
58 }
59 Ok(())
60 }
61}
62
63#[cfg(test)]
64mod tests {
65 #![allow(clippy::unwrap_used)]
66
67 use std::time::{Duration, Instant};
68
69 use super::*;
70
71 #[tokio::test]
72 async fn burst_within_budget_is_immediate() {
73 let limiter = RateLimiter::new(100);
74 let start = Instant::now();
75 for _ in 0..5 {
76 limiter.acquire(1).await.unwrap();
77 }
78 assert!(
79 start.elapsed() < Duration::from_millis(100),
80 "within-burst acquires must not block, took {:?}",
81 start.elapsed()
82 );
83 }
84
85 #[tokio::test]
86 async fn exceeding_the_rate_actually_waits() {
87 let limiter = RateLimiter::new(5);
89 let start = Instant::now();
90 for _ in 0..6 {
91 limiter.acquire(1).await.unwrap();
92 }
93 assert!(
94 start.elapsed() >= Duration::from_millis(120),
95 "the limiter must actually delay past the burst, took {:?}",
96 start.elapsed()
97 );
98 }
99
100 #[tokio::test]
101 async fn write_cost_exceeding_burst_is_clamped_not_stuck() {
102 let limiter = RateLimiter::new(1); limiter.acquire(3).await.unwrap();
104 let limiter = RateLimiter::new(1);
106 limiter.acquire(100).await.unwrap();
107 }
108}