Skip to main content

rmcp_server_kit/
bounded_limiter.rs

1//! Memory-bounded keyed rate limiter.
2//!
3//! [`crate::bounded_limiter::BoundedKeyedLimiter`] wraps a map of per-key
4//! [`governor::DefaultDirectRateLimiter`] instances behind a hard cap on the
5//! number of tracked keys, with an idle-eviction policy and an LRU fallback
6//! when the cap is reached.
7//!
8//! # Why
9//!
10//! The `governor` crate ships a [`governor::RateLimiter::keyed`] state store
11//! whose memory grows monotonically with the number of distinct keys
12//! observed. For server use cases keyed by source IP this is a
13//! denial-of-service vector: an attacker spraying packets from spoofed or
14//! distinct source addresses can exhaust process memory regardless of the
15//! per-key quota.
16//!
17//! [`crate::bounded_limiter::BoundedKeyedLimiter`] addresses this by:
18//!
19//! 1. Holding a [`std::collections::HashMap`] of `K -> Entry` where each
20//!    `Entry` carries its own direct (per-key) limiter and a `last_seen`
21//!    timestamp.
22//! 2. Capping the map at `max_tracked_keys` entries.
23//! 3. On insert when the map is full, first pruning entries whose
24//!    `last_seen` is older than `idle_eviction`, then -- if still full --
25//!    evicting the entry with the oldest `last_seen` ("LRU eviction").
26//!    The new key is **always** inserted; honest new clients are never
27//!    rejected because the table is full.
28//! 4. Updating `last_seen` on **every** check (including rate-limit
29//!    rejections) so an actively-firing attacker cannot dodge eviction by
30//!    appearing idle.
31//! 5. Optionally spawning a best-effort background prune task. Cap
32//!    enforcement does **not** depend on this task running -- it is
33//!    purely an optimization that reclaims memory between admission
34//!    events.
35//!
36//! # Trade-offs
37//!
38//! - When a previously-evicted key reappears it gets a **fresh** quota.
39//!   This is documented behaviour: a key under sustained load keeps its
40//!   `last_seen` updated and therefore is never evicted; eviction only
41//!   targets idle keys.
42//! - The map uses [`std::sync::Mutex`] (not [`tokio::sync::Mutex`]) since
43//!   admission checks must be synchronous and never `.await`.
44//! - We do not log inside the critical section.
45
46use std::{
47    collections::HashMap,
48    hash::Hash,
49    num::NonZeroU32,
50    sync::{Arc, Mutex, PoisonError, Weak},
51    time::{Duration, Instant},
52};
53
54use governor::{
55    DefaultDirectRateLimiter, Quota, RateLimiter,
56    clock::{Clock as _, DefaultClock},
57};
58
59/// Reason a [`BoundedKeyedLimiter::check_key`] call rejected a request.
60///
61/// Currently only carries a single variant; modelled as an enum (rather
62/// than a unit struct) so callers can `match` exhaustively and to leave
63/// room for future reasons (e.g. burst-debt or distinct quota classes).
64#[non_exhaustive]
65#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
66pub enum BoundedLimiterError {
67    /// The key has exceeded its per-key quota for the current window.
68    #[error("rate limit exceeded for key")]
69    RateLimited,
70}
71
72/// Per-key limiter entry: the underlying direct limiter plus the wall-clock
73/// timestamp of the most recent admission attempt for this key.
74struct Entry {
75    limiter: DefaultDirectRateLimiter,
76    last_seen: Instant,
77}
78
79/// Inner shared state. Held behind an [`Arc`] in [`BoundedKeyedLimiter`]
80/// and a [`Weak`] inside the optional background prune task so the task
81/// self-terminates once the limiter is dropped.
82struct Inner<K: Eq + Hash + Clone> {
83    map: Mutex<HashMap<K, Entry>>,
84    quota: Quota,
85    max_tracked_keys: usize,
86    idle_eviction: Duration,
87}
88
89/// Memory-bounded keyed rate limiter.
90///
91/// Cheaply cloneable; clones share state.
92#[allow(
93    missing_debug_implementations,
94    reason = "wraps governor RateLimiter which has no Debug impl"
95)]
96pub struct BoundedKeyedLimiter<K: Eq + Hash + Clone> {
97    inner: Arc<Inner<K>>,
98}
99
100impl<K: Eq + Hash + Clone> Clone for BoundedKeyedLimiter<K> {
101    fn clone(&self) -> Self {
102        Self {
103            inner: Arc::clone(&self.inner),
104        }
105    }
106}
107
108impl<K: Eq + Hash + Clone + Send + Sync + 'static> BoundedKeyedLimiter<K> {
109    /// Create a new bounded keyed limiter.
110    ///
111    /// * `quota` -- the per-key rate-limit quota applied to every entry.
112    /// * `max_tracked_keys` -- hard cap on the number of simultaneously
113    ///   tracked keys. When reached, an insert first prunes idle entries
114    ///   then falls back to LRU eviction.
115    /// * `idle_eviction` -- entries whose `last_seen` is older than this
116    ///   are eligible for opportunistic pruning.
117    ///
118    /// # Background prune task
119    ///
120    /// If a Tokio runtime is available at construction time, a best-effort
121    /// background task is spawned that periodically prunes idle entries.
122    /// Cap enforcement does **not** depend on this task; it is purely an
123    /// optimisation that reclaims memory between admission events. The
124    /// task self-terminates when the last [`BoundedKeyedLimiter`] clone is
125    /// dropped (it holds only a [`Weak`] reference to the inner state).
126    ///
127    /// If no Tokio runtime is available (e.g. unit tests using
128    /// `#[test]` rather than `#[tokio::test]`), no task is spawned and
129    /// pruning happens lazily on every full-table insert. Both behaviours
130    /// are correct.
131    #[must_use]
132    pub(crate) fn new(quota: Quota, max_tracked_keys: usize, idle_eviction: Duration) -> Self {
133        debug_assert!(
134            max_tracked_keys > 0,
135            "max_tracked_keys must be > 0; validated by McpServerConfig::check"
136        );
137        let inner = Arc::new(Inner {
138            map: Mutex::new(HashMap::new()),
139            quota,
140            max_tracked_keys,
141            idle_eviction,
142        });
143        Self::spawn_prune_task(&inner);
144        Self { inner }
145    }
146
147    /// Construct a [`BoundedKeyedLimiter`] with a per-minute quota.
148    ///
149    /// Convenience constructor that builds a per-minute [`Quota`] from
150    /// `requests_per_minute`. The rate is clamped to a minimum of `1`
151    /// request/min so a misconfigured `0` does not panic at startup.
152    ///
153    /// * `requests_per_minute` -- per-key rate, clamped to `>= 1`.
154    /// * `max_tracked_keys` -- hard cap on simultaneously tracked keys.
155    ///   When reached, an insert first prunes idle entries then falls
156    ///   back to LRU eviction.
157    /// * `idle_eviction` -- entries whose `last_seen` is older than this
158    ///   are eligible for opportunistic pruning.
159    #[must_use]
160    pub fn with_per_minute(
161        requests_per_minute: u32,
162        max_tracked_keys: usize,
163        idle_eviction: Duration,
164    ) -> Self {
165        let rate = NonZeroU32::new(requests_per_minute.max(1)).unwrap_or(NonZeroU32::MIN);
166        Self::new(Quota::per_minute(rate), max_tracked_keys, idle_eviction)
167    }
168
169    /// Construct a [`BoundedKeyedLimiter`] with a per-second quota.
170    ///
171    /// Convenience constructor that builds a per-second [`Quota`] from
172    /// `requests_per_second`. The rate is clamped to a minimum of `1`
173    /// request/sec so a misconfigured `0` does not panic at startup.
174    ///
175    /// * `requests_per_second` -- per-key rate, clamped to `>= 1`.
176    /// * `max_tracked_keys` -- hard cap on simultaneously tracked keys.
177    ///   When reached, an insert first prunes idle entries then falls
178    ///   back to LRU eviction.
179    /// * `idle_eviction` -- entries whose `last_seen` is older than this
180    ///   are eligible for opportunistic pruning.
181    #[must_use]
182    pub fn with_per_second(
183        requests_per_second: u32,
184        max_tracked_keys: usize,
185        idle_eviction: Duration,
186    ) -> Self {
187        let rate = NonZeroU32::new(requests_per_second.max(1)).unwrap_or(NonZeroU32::MIN);
188        Self::new(Quota::per_second(rate), max_tracked_keys, idle_eviction)
189    }
190
191    /// Spawn the optional background prune task. No-op if there is no
192    /// current Tokio runtime.
193    fn spawn_prune_task(inner: &Arc<Inner<K>>) {
194        let Ok(handle) = tokio::runtime::Handle::try_current() else {
195            return;
196        };
197        let weak: Weak<Inner<K>> = Arc::downgrade(inner);
198        // Prune at most once every quarter of `idle_eviction`, but never
199        // less than once per minute (to avoid waking up too often when
200        // operators configure a very long eviction window).
201        let interval = (inner.idle_eviction / 4).max(Duration::from_mins(1));
202        handle.spawn(async move {
203            let mut ticker = tokio::time::interval(interval);
204            // We just woke up from `Handle::spawn`; don't burn the first tick.
205            ticker.tick().await;
206            loop {
207                ticker.tick().await;
208                let Some(inner) = weak.upgrade() else {
209                    return;
210                };
211                Self::prune_idle(&inner);
212            }
213        });
214    }
215
216    /// Drop entries whose `last_seen` is older than `idle_eviction`.
217    fn prune_idle(inner: &Inner<K>) {
218        let mut guard = inner.map.lock().unwrap_or_else(PoisonError::into_inner);
219        let cutoff = Instant::now()
220            .checked_sub(inner.idle_eviction)
221            .unwrap_or_else(Instant::now);
222        guard.retain(|_, entry| entry.last_seen >= cutoff);
223    }
224
225    /// Evict the single entry with the oldest `last_seen`. Caller must hold
226    /// the map lock. Used only when the table is full *after* idle pruning.
227    fn evict_lru(map: &mut HashMap<K, Entry>) {
228        let oldest_key = map
229            .iter()
230            .min_by_key(|(_, entry)| entry.last_seen)
231            .map(|(k, _)| k.clone());
232        if let Some(key) = oldest_key {
233            map.remove(&key);
234        }
235    }
236
237    /// Test the per-key quota for `key`.
238    ///
239    /// Returns `Ok(())` if the request is allowed. The `last_seen`
240    /// timestamp is updated on **every** call -- including rate-limit
241    /// rejections -- so an actively firing attacker cannot age out into
242    /// a fresh quota by appearing idle.
243    ///
244    /// When inserting a new key into a full table, idle entries are pruned
245    /// first; if the table is still full, the entry with the oldest
246    /// `last_seen` is evicted (LRU). The new key is always inserted --
247    /// honest new clients are never rejected because the table is full.
248    ///
249    /// # Errors
250    ///
251    /// Returns [`BoundedLimiterError::RateLimited`] when `key` has
252    /// exceeded its per-key quota for the current window.
253    pub fn check_key(&self, key: &K) -> Result<(), BoundedLimiterError> {
254        self.check_key_wait(key)
255            .map_err(|_| BoundedLimiterError::RateLimited)
256    }
257
258    /// Test the per-key quota for `key`, returning the wait time on deny.
259    ///
260    /// Identical admission semantics to [`check_key`](Self::check_key)
261    /// (same `last_seen` refresh, idle-prune, and LRU-eviction behavior);
262    /// the two methods share one code path.
263    ///
264    /// # Errors
265    ///
266    /// On deny, returns the **best-effort current wait** until the next
267    /// request for this key could be admitted, measured against
268    /// governor's default clock at the moment of the failed check. The
269    /// value is a raw [`Duration`]; rounding (e.g. ceiling to whole
270    /// seconds for a `Retry-After` header) is the caller's concern.
271    pub fn check_key_wait(&self, key: &K) -> Result<(), Duration> {
272        let mut guard = self
273            .inner
274            .map
275            .lock()
276            .unwrap_or_else(PoisonError::into_inner);
277        let now = Instant::now();
278        if let Some(entry) = guard.get_mut(key) {
279            entry.last_seen = now;
280            return entry
281                .limiter
282                .check()
283                .map_err(|not_until| not_until.wait_time_from(DefaultClock::default().now()));
284        }
285        // New key: make room if necessary, then insert.
286        if guard.len() >= self.inner.max_tracked_keys {
287            // Prune idle first.
288            let cutoff = now
289                .checked_sub(self.inner.idle_eviction)
290                .unwrap_or_else(Instant::now);
291            guard.retain(|_, entry| entry.last_seen >= cutoff);
292            // If still full, evict LRU.
293            if guard.len() >= self.inner.max_tracked_keys {
294                Self::evict_lru(&mut guard);
295            }
296        }
297        let limiter = RateLimiter::direct(self.inner.quota);
298        let result = limiter
299            .check()
300            .map_err(|not_until| not_until.wait_time_from(DefaultClock::default().now()));
301        guard.insert(
302            key.clone(),
303            Entry {
304                limiter,
305                last_seen: now,
306            },
307        );
308        result
309    }
310
311    /// Number of currently tracked keys. Used by tests and admin endpoints.
312    #[must_use]
313    pub fn len(&self) -> usize {
314        self.inner
315            .map
316            .lock()
317            .unwrap_or_else(PoisonError::into_inner)
318            .len()
319    }
320
321    /// `true` when no keys are currently tracked.
322    #[must_use]
323    pub fn is_empty(&self) -> bool {
324        self.len() == 0
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use std::{net::IpAddr, num::NonZeroU32, time::Duration};
331
332    use governor::Quota;
333
334    use super::BoundedKeyedLimiter;
335
336    fn ip(n: u32) -> IpAddr {
337        IpAddr::from(n.to_be_bytes())
338    }
339
340    /// Deny on the existing-key branch must report a positive,
341    /// quota-bounded wait time.
342    #[test]
343    fn check_key_wait_existing_key_deny_returns_bounded_wait() {
344        let quota = Quota::per_minute(NonZeroU32::new(1).unwrap());
345        let limiter: BoundedKeyedLimiter<IpAddr> =
346            BoundedKeyedLimiter::new(quota, 10, Duration::from_hours(1));
347        assert!(limiter.check_key_wait(&ip(1)).is_ok(), "burst admits first");
348        let wait = limiter
349            .check_key_wait(&ip(1))
350            .expect_err("second call within the window must deny");
351        assert!(wait > Duration::ZERO, "wait must be positive, got {wait:?}");
352        assert!(
353            wait <= Duration::from_secs(60),
354            "per-minute quota wait must be <= 60s, got {wait:?}"
355        );
356    }
357
358    /// The new-key branch always admits the first check: a freshly
359    /// constructed governor limiter starts with a full bucket and burst
360    /// capacity is `NonZeroU32` (>= 1). The deny arm on that branch is
361    /// defensive symmetry, not a reachable path.
362    #[test]
363    fn check_key_wait_new_key_first_check_admits() {
364        let quota = Quota::per_minute(NonZeroU32::new(1).unwrap());
365        let limiter: BoundedKeyedLimiter<IpAddr> =
366            BoundedKeyedLimiter::new(quota, 10, Duration::from_hours(1));
367        for i in 0..5_u32 {
368            assert!(
369                limiter.check_key_wait(&ip(i)).is_ok(),
370                "first check for new key {i} must admit"
371            );
372        }
373    }
374
375    /// `check_key` delegates to `check_key_wait`: identical admission
376    /// decisions, error mapped to the reason-only enum.
377    #[test]
378    fn check_key_delegates_to_wait_path() {
379        let quota = Quota::per_minute(NonZeroU32::new(1).unwrap());
380        let limiter: BoundedKeyedLimiter<IpAddr> =
381            BoundedKeyedLimiter::new(quota, 10, Duration::from_hours(1));
382        assert!(limiter.check_key(&ip(7)).is_ok());
383        assert_eq!(
384            limiter.check_key(&ip(7)),
385            Err(super::BoundedLimiterError::RateLimited)
386        );
387    }
388
389    /// The hard cap on tracked keys must never be exceeded, even under a
390    /// stream of distinct keys far larger than the cap.
391    #[test]
392    fn never_exceeds_max_tracked_keys() {
393        let quota = Quota::per_minute(NonZeroU32::new(10).unwrap());
394        let limiter: BoundedKeyedLimiter<IpAddr> =
395            BoundedKeyedLimiter::new(quota, 100, Duration::from_hours(1));
396        for i in 0..10_000_u32 {
397            let _ = limiter.check_key(&ip(i));
398            assert!(
399                limiter.len() <= 100,
400                "tracked keys exceeded cap at iteration {i}: {} > 100",
401                limiter.len()
402            );
403        }
404        assert_eq!(limiter.len(), 100, "table should be full at the cap");
405    }
406
407    /// When a previously-evicted key reappears, it must get a fresh quota.
408    /// This is *documented* behaviour, not a bug: keys under sustained
409    /// load keep their `last_seen` updated and therefore are not evicted.
410    #[test]
411    fn evicted_keys_get_fresh_quota() {
412        let quota = Quota::per_minute(NonZeroU32::new(2).unwrap());
413        let limiter: BoundedKeyedLimiter<IpAddr> =
414            BoundedKeyedLimiter::new(quota, 2, Duration::from_hours(1));
415
416        let target = ip(1);
417        // Burn the quota for `target`.
418        assert!(limiter.check_key(&target).is_ok(), "first ok");
419        assert!(limiter.check_key(&target).is_ok(), "second ok");
420        assert!(limiter.check_key(&target).is_err(), "third blocked");
421
422        // Force eviction by inserting two unrelated keys (cap = 2). The
423        // attacker (`target`) is rate-limited -- it has a *recent*
424        // `last_seen` because of the failed check above. So inserting
425        // two new keys must NOT evict the attacker; instead one of the
426        // *other* unrelated keys gets evicted via LRU. We therefore
427        // need three unrelated keys to push `target` out by LRU.
428        //
429        // Sleep a tiny amount so unrelated keys have strictly newer
430        // last_seen than `target`'s last write.
431        std::thread::sleep(Duration::from_millis(5));
432        let _ = limiter.check_key(&ip(2));
433        std::thread::sleep(Duration::from_millis(5));
434        let _ = limiter.check_key(&ip(3));
435        // `target` is now the oldest entry; cap is 2. ip(3) eviction LRU'd
436        // either ip(2) or `target`. Inserting ip(4) again forces another
437        // eviction. After enough fresh inserts, `target` is gone.
438        std::thread::sleep(Duration::from_millis(5));
439        let _ = limiter.check_key(&ip(4));
440        std::thread::sleep(Duration::from_millis(5));
441        let _ = limiter.check_key(&ip(5));
442
443        // `target` should have been evicted by now -- a fresh check_key
444        // re-inserts with a fresh quota.
445        assert!(
446            limiter.check_key(&target).is_ok(),
447            "evicted key gets a fresh quota on reappearance"
448        );
449    }
450
451    /// An actively over-quota key must NOT be evicted just because new
452    /// keys are knocking. `last_seen` is updated on every check including
453    /// rate-limit rejections, so the attacker stays at the front of the
454    /// LRU queue. Other (older) entries are evicted instead.
455    #[test]
456    fn active_over_quota_key_not_evicted() {
457        let quota = Quota::per_minute(NonZeroU32::new(2).unwrap());
458        let limiter: BoundedKeyedLimiter<IpAddr> =
459            BoundedKeyedLimiter::new(quota, 3, Duration::from_hours(1));
460
461        // Seed the table with three idle entries so cap is reached.
462        for i in 100..103_u32 {
463            let _ = limiter.check_key(&ip(i));
464        }
465        assert_eq!(limiter.len(), 3);
466
467        // The attacker now starts firing. First two are allowed
468        // (fills quota), then we expect refusals -- but each refusal
469        // updates last_seen so the attacker stays "current".
470        std::thread::sleep(Duration::from_millis(5));
471        let attacker = ip(200);
472        // Inserting attacker evicts one of the older keys (cap=3).
473        let _ = limiter.check_key(&attacker);
474        let _ = limiter.check_key(&attacker);
475
476        // Interleave attacker hits with new-key knocks. The attacker
477        // keeps firing (last_seen always current), so when new keys
478        // arrive and force eviction, the LRU victim must be one of the
479        // *other* (older) entries, not the attacker.
480        for new_key in 300..310_u32 {
481            std::thread::sleep(Duration::from_millis(2));
482            let _ = limiter.check_key(&attacker); // attacker stays current
483            std::thread::sleep(Duration::from_millis(2));
484            let _ = limiter.check_key(&ip(new_key)); // forces eviction
485        }
486
487        // One final attacker hit immediately before the assertion to
488        // ensure no other key has been touched more recently.
489        let _ = limiter.check_key(&attacker);
490
491        // Attacker must STILL be rate-limited (quota exhausted, not a
492        // freshly-allocated entry). The check returns Err because the
493        // existing entry with exhausted quota is still there.
494        assert!(
495            limiter.check_key(&attacker).is_err(),
496            "actively over-quota attacker must not be evicted into a fresh quota"
497        );
498    }
499}