rmcp_server_kit/bounded_limiter.rs
1//! Memory-bounded keyed rate limiter.
2//!
3//! [`crate::bounded_limiter::BoundedKeyedLimiter`] wraps a map of per-key
4//! [`governor::DefaultDirectRateLimiter`] instances behind a hard cap on the
5//! number of tracked keys, with an idle-eviction policy and an LRU fallback
6//! when the cap is reached.
7//!
8//! # Why
9//!
10//! The `governor` crate ships a [`governor::RateLimiter::keyed`] state store
11//! whose memory grows monotonically with the number of distinct keys
12//! observed. For server use cases keyed by source IP this is a
13//! denial-of-service vector: an attacker spraying packets from spoofed or
14//! distinct source addresses can exhaust process memory regardless of the
15//! per-key quota.
16//!
17//! [`crate::bounded_limiter::BoundedKeyedLimiter`] addresses this by:
18//!
19//! 1. Holding a [`std::collections::HashMap`] of `K -> Entry` where each
20//! `Entry` carries its own direct (per-key) limiter and a `last_seen`
21//! timestamp.
22//! 2. Capping the map at `max_tracked_keys` entries.
23//! 3. On insert when the map is full, first pruning entries whose
24//! `last_seen` is older than `idle_eviction`, then -- if still full --
25//! evicting the entry with the oldest `last_seen` ("LRU eviction").
26//! The new key is **always** inserted; honest new clients are never
27//! rejected because the table is full.
28//! 4. Updating `last_seen` on **every** check (including rate-limit
29//! rejections) so an actively-firing attacker cannot dodge eviction by
30//! appearing idle.
31//! 5. Optionally spawning a best-effort background prune task. Cap
32//! enforcement does **not** depend on this task running -- it is
33//! purely an optimization that reclaims memory between admission
34//! events.
35//!
36//! # Trade-offs
37//!
38//! - When a previously-evicted key reappears it gets a **fresh** quota.
39//! This is documented behaviour: a key under sustained load keeps its
40//! `last_seen` updated and therefore is never evicted; eviction only
41//! targets idle keys.
42//! - The map uses [`std::sync::Mutex`] (not [`tokio::sync::Mutex`]) since
43//! admission checks must be synchronous and never `.await`.
44//! - We do not log inside the critical section.
45
46use std::{
47 collections::HashMap,
48 hash::Hash,
49 num::NonZeroU32,
50 sync::{Arc, Mutex, PoisonError, Weak},
51 time::{Duration, Instant},
52};
53
54use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
55
56/// Reason a [`BoundedKeyedLimiter::check_key`] call rejected a request.
57///
58/// Currently only carries a single variant; modelled as an enum (rather
59/// than a unit struct) so callers can `match` exhaustively and to leave
60/// room for future reasons (e.g. burst-debt or distinct quota classes).
61#[non_exhaustive]
62#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
63pub enum BoundedLimiterError {
64 /// The key has exceeded its per-key quota for the current window.
65 #[error("rate limit exceeded for key")]
66 RateLimited,
67}
68
69/// Per-key limiter entry: the underlying direct limiter plus the wall-clock
70/// timestamp of the most recent admission attempt for this key.
71struct Entry {
72 limiter: DefaultDirectRateLimiter,
73 last_seen: Instant,
74}
75
76/// Inner shared state. Held behind an [`Arc`] in [`BoundedKeyedLimiter`]
77/// and a [`Weak`] inside the optional background prune task so the task
78/// self-terminates once the limiter is dropped.
79struct Inner<K: Eq + Hash + Clone> {
80 map: Mutex<HashMap<K, Entry>>,
81 quota: Quota,
82 max_tracked_keys: usize,
83 idle_eviction: Duration,
84}
85
86/// Memory-bounded keyed rate limiter.
87///
88/// Cheaply cloneable; clones share state.
89#[allow(
90 missing_debug_implementations,
91 reason = "wraps governor RateLimiter which has no Debug impl"
92)]
93pub struct BoundedKeyedLimiter<K: Eq + Hash + Clone> {
94 inner: Arc<Inner<K>>,
95}
96
97impl<K: Eq + Hash + Clone> Clone for BoundedKeyedLimiter<K> {
98 fn clone(&self) -> Self {
99 Self {
100 inner: Arc::clone(&self.inner),
101 }
102 }
103}
104
105impl<K: Eq + Hash + Clone + Send + Sync + 'static> BoundedKeyedLimiter<K> {
106 /// Create a new bounded keyed limiter.
107 ///
108 /// * `quota` -- the per-key rate-limit quota applied to every entry.
109 /// * `max_tracked_keys` -- hard cap on the number of simultaneously
110 /// tracked keys. When reached, an insert first prunes idle entries
111 /// then falls back to LRU eviction.
112 /// * `idle_eviction` -- entries whose `last_seen` is older than this
113 /// are eligible for opportunistic pruning.
114 ///
115 /// # Background prune task
116 ///
117 /// If a Tokio runtime is available at construction time, a best-effort
118 /// background task is spawned that periodically prunes idle entries.
119 /// Cap enforcement does **not** depend on this task; it is purely an
120 /// optimisation that reclaims memory between admission events. The
121 /// task self-terminates when the last [`BoundedKeyedLimiter`] clone is
122 /// dropped (it holds only a [`Weak`] reference to the inner state).
123 ///
124 /// If no Tokio runtime is available (e.g. unit tests using
125 /// `#[test]` rather than `#[tokio::test]`), no task is spawned and
126 /// pruning happens lazily on every full-table insert. Both behaviours
127 /// are correct.
128 #[must_use]
129 pub(crate) fn new(quota: Quota, max_tracked_keys: usize, idle_eviction: Duration) -> Self {
130 let inner = Arc::new(Inner {
131 map: Mutex::new(HashMap::new()),
132 quota,
133 max_tracked_keys,
134 idle_eviction,
135 });
136 Self::spawn_prune_task(&inner);
137 Self { inner }
138 }
139
140 /// Construct a [`BoundedKeyedLimiter`] with a per-minute quota.
141 ///
142 /// Convenience constructor that builds a per-minute [`Quota`] from
143 /// `requests_per_minute`. The rate is clamped to a minimum of `1`
144 /// request/min so a misconfigured `0` does not panic at startup.
145 ///
146 /// * `requests_per_minute` -- per-key rate, clamped to `>= 1`.
147 /// * `max_tracked_keys` -- hard cap on simultaneously tracked keys.
148 /// When reached, an insert first prunes idle entries then falls
149 /// back to LRU eviction.
150 /// * `idle_eviction` -- entries whose `last_seen` is older than this
151 /// are eligible for opportunistic pruning.
152 #[must_use]
153 pub fn with_per_minute(
154 requests_per_minute: u32,
155 max_tracked_keys: usize,
156 idle_eviction: Duration,
157 ) -> Self {
158 let rate = NonZeroU32::new(requests_per_minute.max(1)).unwrap_or(NonZeroU32::MIN);
159 Self::new(Quota::per_minute(rate), max_tracked_keys, idle_eviction)
160 }
161
162 /// Construct a [`BoundedKeyedLimiter`] with a per-second quota.
163 ///
164 /// Convenience constructor that builds a per-second [`Quota`] from
165 /// `requests_per_second`. The rate is clamped to a minimum of `1`
166 /// request/sec so a misconfigured `0` does not panic at startup.
167 ///
168 /// * `requests_per_second` -- per-key rate, clamped to `>= 1`.
169 /// * `max_tracked_keys` -- hard cap on simultaneously tracked keys.
170 /// When reached, an insert first prunes idle entries then falls
171 /// back to LRU eviction.
172 /// * `idle_eviction` -- entries whose `last_seen` is older than this
173 /// are eligible for opportunistic pruning.
174 #[must_use]
175 pub fn with_per_second(
176 requests_per_second: u32,
177 max_tracked_keys: usize,
178 idle_eviction: Duration,
179 ) -> Self {
180 let rate = NonZeroU32::new(requests_per_second.max(1)).unwrap_or(NonZeroU32::MIN);
181 Self::new(Quota::per_second(rate), max_tracked_keys, idle_eviction)
182 }
183
184 /// Spawn the optional background prune task. No-op if there is no
185 /// current Tokio runtime.
186 fn spawn_prune_task(inner: &Arc<Inner<K>>) {
187 let Ok(handle) = tokio::runtime::Handle::try_current() else {
188 return;
189 };
190 let weak: Weak<Inner<K>> = Arc::downgrade(inner);
191 // Prune at most once every quarter of `idle_eviction`, but never
192 // less than once per minute (to avoid waking up too often when
193 // operators configure a very long eviction window).
194 let interval = (inner.idle_eviction / 4).max(Duration::from_mins(1));
195 handle.spawn(async move {
196 let mut ticker = tokio::time::interval(interval);
197 // We just woke up from `Handle::spawn`; don't burn the first tick.
198 ticker.tick().await;
199 loop {
200 ticker.tick().await;
201 let Some(inner) = weak.upgrade() else {
202 return;
203 };
204 Self::prune_idle(&inner);
205 }
206 });
207 }
208
209 /// Drop entries whose `last_seen` is older than `idle_eviction`.
210 fn prune_idle(inner: &Inner<K>) {
211 let mut guard = inner.map.lock().unwrap_or_else(PoisonError::into_inner);
212 let cutoff = Instant::now()
213 .checked_sub(inner.idle_eviction)
214 .unwrap_or_else(Instant::now);
215 guard.retain(|_, entry| entry.last_seen >= cutoff);
216 }
217
218 /// Evict the single entry with the oldest `last_seen`. Caller must hold
219 /// the map lock. Used only when the table is full *after* idle pruning.
220 fn evict_lru(map: &mut HashMap<K, Entry>) {
221 let oldest_key = map
222 .iter()
223 .min_by_key(|(_, entry)| entry.last_seen)
224 .map(|(k, _)| k.clone());
225 if let Some(key) = oldest_key {
226 map.remove(&key);
227 }
228 }
229
230 /// Test the per-key quota for `key`.
231 ///
232 /// Returns `Ok(())` if the request is allowed. The `last_seen`
233 /// timestamp is updated on **every** call -- including rate-limit
234 /// rejections -- so an actively firing attacker cannot age out into
235 /// a fresh quota by appearing idle.
236 ///
237 /// When inserting a new key into a full table, idle entries are pruned
238 /// first; if the table is still full, the entry with the oldest
239 /// `last_seen` is evicted (LRU). The new key is always inserted --
240 /// honest new clients are never rejected because the table is full.
241 ///
242 /// # Errors
243 ///
244 /// Returns [`BoundedLimiterError::RateLimited`] when `key` has
245 /// exceeded its per-key quota for the current window.
246 pub fn check_key(&self, key: &K) -> Result<(), BoundedLimiterError> {
247 let mut guard = self
248 .inner
249 .map
250 .lock()
251 .unwrap_or_else(PoisonError::into_inner);
252 let now = Instant::now();
253 if let Some(entry) = guard.get_mut(key) {
254 entry.last_seen = now;
255 return entry
256 .limiter
257 .check()
258 .map_err(|_| BoundedLimiterError::RateLimited);
259 }
260 // New key: make room if necessary, then insert.
261 if guard.len() >= self.inner.max_tracked_keys {
262 // Prune idle first.
263 let cutoff = now
264 .checked_sub(self.inner.idle_eviction)
265 .unwrap_or_else(Instant::now);
266 guard.retain(|_, entry| entry.last_seen >= cutoff);
267 // If still full, evict LRU.
268 if guard.len() >= self.inner.max_tracked_keys {
269 Self::evict_lru(&mut guard);
270 }
271 }
272 let limiter = RateLimiter::direct(self.inner.quota);
273 let result = limiter
274 .check()
275 .map_err(|_| BoundedLimiterError::RateLimited);
276 guard.insert(
277 key.clone(),
278 Entry {
279 limiter,
280 last_seen: now,
281 },
282 );
283 result
284 }
285
286 /// Number of currently tracked keys. Used by tests and admin endpoints.
287 #[must_use]
288 pub fn len(&self) -> usize {
289 self.inner
290 .map
291 .lock()
292 .unwrap_or_else(PoisonError::into_inner)
293 .len()
294 }
295
296 /// `true` when no keys are currently tracked.
297 #[must_use]
298 pub fn is_empty(&self) -> bool {
299 self.len() == 0
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use std::{net::IpAddr, num::NonZeroU32, time::Duration};
306
307 use governor::Quota;
308
309 use super::BoundedKeyedLimiter;
310
311 fn ip(n: u32) -> IpAddr {
312 IpAddr::from(n.to_be_bytes())
313 }
314
315 /// The hard cap on tracked keys must never be exceeded, even under a
316 /// stream of distinct keys far larger than the cap.
317 #[test]
318 fn never_exceeds_max_tracked_keys() {
319 let quota = Quota::per_minute(NonZeroU32::new(10).unwrap());
320 let limiter: BoundedKeyedLimiter<IpAddr> =
321 BoundedKeyedLimiter::new(quota, 100, Duration::from_hours(1));
322 for i in 0..10_000_u32 {
323 let _ = limiter.check_key(&ip(i));
324 assert!(
325 limiter.len() <= 100,
326 "tracked keys exceeded cap at iteration {i}: {} > 100",
327 limiter.len()
328 );
329 }
330 assert_eq!(limiter.len(), 100, "table should be full at the cap");
331 }
332
333 /// When a previously-evicted key reappears, it must get a fresh quota.
334 /// This is *documented* behaviour, not a bug: keys under sustained
335 /// load keep their `last_seen` updated and therefore are not evicted.
336 #[test]
337 fn evicted_keys_get_fresh_quota() {
338 let quota = Quota::per_minute(NonZeroU32::new(2).unwrap());
339 let limiter: BoundedKeyedLimiter<IpAddr> =
340 BoundedKeyedLimiter::new(quota, 2, Duration::from_hours(1));
341
342 let target = ip(1);
343 // Burn the quota for `target`.
344 assert!(limiter.check_key(&target).is_ok(), "first ok");
345 assert!(limiter.check_key(&target).is_ok(), "second ok");
346 assert!(limiter.check_key(&target).is_err(), "third blocked");
347
348 // Force eviction by inserting two unrelated keys (cap = 2). The
349 // attacker (`target`) is rate-limited -- it has a *recent*
350 // `last_seen` because of the failed check above. So inserting
351 // two new keys must NOT evict the attacker; instead one of the
352 // *other* unrelated keys gets evicted via LRU. We therefore
353 // need three unrelated keys to push `target` out by LRU.
354 //
355 // Sleep a tiny amount so unrelated keys have strictly newer
356 // last_seen than `target`'s last write.
357 std::thread::sleep(Duration::from_millis(5));
358 let _ = limiter.check_key(&ip(2));
359 std::thread::sleep(Duration::from_millis(5));
360 let _ = limiter.check_key(&ip(3));
361 // `target` is now the oldest entry; cap is 2. ip(3) eviction LRU'd
362 // either ip(2) or `target`. Inserting ip(4) again forces another
363 // eviction. After enough fresh inserts, `target` is gone.
364 std::thread::sleep(Duration::from_millis(5));
365 let _ = limiter.check_key(&ip(4));
366 std::thread::sleep(Duration::from_millis(5));
367 let _ = limiter.check_key(&ip(5));
368
369 // `target` should have been evicted by now -- a fresh check_key
370 // re-inserts with a fresh quota.
371 assert!(
372 limiter.check_key(&target).is_ok(),
373 "evicted key gets a fresh quota on reappearance"
374 );
375 }
376
377 /// An actively over-quota key must NOT be evicted just because new
378 /// keys are knocking. `last_seen` is updated on every check including
379 /// rate-limit rejections, so the attacker stays at the front of the
380 /// LRU queue. Other (older) entries are evicted instead.
381 #[test]
382 fn active_over_quota_key_not_evicted() {
383 let quota = Quota::per_minute(NonZeroU32::new(2).unwrap());
384 let limiter: BoundedKeyedLimiter<IpAddr> =
385 BoundedKeyedLimiter::new(quota, 3, Duration::from_hours(1));
386
387 // Seed the table with three idle entries so cap is reached.
388 for i in 100..103_u32 {
389 let _ = limiter.check_key(&ip(i));
390 }
391 assert_eq!(limiter.len(), 3);
392
393 // The attacker now starts firing. First two are allowed
394 // (fills quota), then we expect refusals -- but each refusal
395 // updates last_seen so the attacker stays "current".
396 std::thread::sleep(Duration::from_millis(5));
397 let attacker = ip(200);
398 // Inserting attacker evicts one of the older keys (cap=3).
399 let _ = limiter.check_key(&attacker);
400 let _ = limiter.check_key(&attacker);
401
402 // Interleave attacker hits with new-key knocks. The attacker
403 // keeps firing (last_seen always current), so when new keys
404 // arrive and force eviction, the LRU victim must be one of the
405 // *other* (older) entries, not the attacker.
406 for new_key in 300..310_u32 {
407 std::thread::sleep(Duration::from_millis(2));
408 let _ = limiter.check_key(&attacker); // attacker stays current
409 std::thread::sleep(Duration::from_millis(2));
410 let _ = limiter.check_key(&ip(new_key)); // forces eviction
411 }
412
413 // One final attacker hit immediately before the assertion to
414 // ensure no other key has been touched more recently.
415 let _ = limiter.check_key(&attacker);
416
417 // Attacker must STILL be rate-limited (quota exhausted, not a
418 // freshly-allocated entry). The check returns Err because the
419 // existing entry with exhausted quota is still there.
420 assert!(
421 limiter.check_key(&attacker).is_err(),
422 "actively over-quota attacker must not be evicted into a fresh quota"
423 );
424 }
425}