Skip to main content

lmn_core/execution/
rate_limit.rs

1use std::num::NonZeroU32;
2use std::sync::Arc;
3
4use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
5
6// ── RpsLimiter ────────────────────────────────────────────────────────────────
7
8/// Shared request-rate limiter used to cap aggregate throughput across all VUs.
9///
10/// Wraps a `governor` token-bucket so VUs can `.acquire().await` one permit per
11/// request. Permits refill smoothly at the configured rate, so callers see a
12/// steady drip rather than a once-per-second burst.
13///
14/// The limiter is intended to be shared across many VUs behind an `Arc` and is
15/// `Send + Sync`.
16pub struct RpsLimiter {
17    inner: DefaultDirectRateLimiter,
18}
19
20impl RpsLimiter {
21    /// Constructs a new limiter that permits at most `rps` requests per second.
22    ///
23    /// Returns `None` if `rps` is zero or does not fit in `u32`. Callers should
24    /// interpret `None` as "no rate limit configured" and skip wiring the
25    /// limiter into VUs.
26    pub fn new(rps: usize) -> Option<Arc<Self>> {
27        let rps_u32: u32 = u32::try_from(rps).ok()?;
28        let nz = NonZeroU32::new(rps_u32)?;
29        Some(Arc::new(Self {
30            inner: RateLimiter::direct(Quota::per_second(nz)),
31        }))
32    }
33
34    /// Awaits until one permit is available, then claims it.
35    ///
36    /// Callers should typically wrap this in a `tokio::select!` against a
37    /// cancellation token so VUs do not block past the end of a run.
38    pub async fn acquire(&self) {
39        self.inner.until_ready().await;
40    }
41}
42
43// ── Tests ─────────────────────────────────────────────────────────────────────
44
45#[cfg(test)]
46mod tests {
47    use super::*;
48
49    #[test]
50    fn rps_limiter_zero_returns_none() {
51        assert!(RpsLimiter::new(0).is_none());
52    }
53
54    #[test]
55    fn rps_limiter_valid_rps_returns_some() {
56        assert!(RpsLimiter::new(100).is_some());
57    }
58
59    #[tokio::test(flavor = "current_thread")]
60    async fn first_acquire_is_immediate() {
61        let limiter = RpsLimiter::new(10).expect("rps=10 is valid");
62        let start = std::time::Instant::now();
63        limiter.acquire().await;
64        assert!(
65            start.elapsed().as_millis() < 50,
66            "first acquire should be near-instant (bucket starts full)"
67        );
68    }
69}