lmn_core/execution/rate_limit.rs
1use std::num::NonZeroU32;
2use std::sync::Arc;
3
4use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
5
6// ── RpsLimiter ────────────────────────────────────────────────────────────────
7
8/// Shared request-rate limiter used to cap aggregate throughput across all VUs.
9///
10/// Wraps a `governor` token-bucket so VUs can `.acquire().await` one permit per
11/// request. Permits refill smoothly at the configured rate, so callers see a
12/// steady drip rather than a once-per-second burst.
13///
14/// The limiter is intended to be shared across many VUs behind an `Arc` and is
15/// `Send + Sync`.
16pub struct RpsLimiter {
17 inner: DefaultDirectRateLimiter,
18}
19
20impl RpsLimiter {
21 /// Constructs a new limiter that permits at most `rps` requests per second.
22 ///
23 /// Returns `None` if `rps` is zero or does not fit in `u32`. Callers should
24 /// interpret `None` as "no rate limit configured" and skip wiring the
25 /// limiter into VUs.
26 pub fn new(rps: usize) -> Option<Arc<Self>> {
27 let rps_u32: u32 = u32::try_from(rps).ok()?;
28 let nz = NonZeroU32::new(rps_u32)?;
29 Some(Arc::new(Self {
30 inner: RateLimiter::direct(Quota::per_second(nz)),
31 }))
32 }
33
34 /// Awaits until one permit is available, then claims it.
35 ///
36 /// Callers should typically wrap this in a `tokio::select!` against a
37 /// cancellation token so VUs do not block past the end of a run.
38 pub async fn acquire(&self) {
39 self.inner.until_ready().await;
40 }
41}
42
43// ── Tests ─────────────────────────────────────────────────────────────────────
44
45#[cfg(test)]
46mod tests {
47 use super::*;
48
49 #[test]
50 fn rps_limiter_zero_returns_none() {
51 assert!(RpsLimiter::new(0).is_none());
52 }
53
54 #[test]
55 fn rps_limiter_valid_rps_returns_some() {
56 assert!(RpsLimiter::new(100).is_some());
57 }
58
59 #[tokio::test(flavor = "current_thread")]
60 async fn first_acquire_is_immediate() {
61 let limiter = RpsLimiter::new(10).expect("rps=10 is valid");
62 let start = std::time::Instant::now();
63 limiter.acquire().await;
64 assert!(
65 start.elapsed().as_millis() < 50,
66 "first acquire should be near-instant (bucket starts full)"
67 );
68 }
69}