Skip to main content

rmcp_server_kit/
bounded_limiter.rs

1//! Memory-bounded keyed rate limiter.
2//!
3//! [`crate::bounded_limiter::BoundedKeyedLimiter`] wraps a map of per-key
4//! [`governor::DefaultDirectRateLimiter`] instances behind a hard cap on the
5//! number of tracked keys, with an idle-eviction policy and an LRU fallback
6//! when the cap is reached.
7//!
8//! # Why
9//!
10//! The `governor` crate ships a [`governor::RateLimiter::keyed`] state store
11//! whose memory grows monotonically with the number of distinct keys
12//! observed. For server use cases keyed by source IP this is a
13//! denial-of-service vector: an attacker spraying packets from spoofed or
14//! distinct source addresses can exhaust process memory regardless of the
15//! per-key quota.
16//!
17//! [`crate::bounded_limiter::BoundedKeyedLimiter`] addresses this by:
18//!
19//! 1. Holding a [`std::collections::HashMap`] of `K -> Entry` where each
20//!    `Entry` carries its own direct (per-key) limiter and a `last_seen`
21//!    timestamp.
22//! 2. Capping the map at `max_tracked_keys` entries.
23//! 3. On insert when the map is full, first pruning entries whose
24//!    `last_seen` is older than `idle_eviction`, then -- if still full --
25//!    evicting the entry with the oldest `last_seen` ("LRU eviction").
26//!    The new key is **always** inserted; honest new clients are never
27//!    rejected because the table is full.
28//! 4. Updating `last_seen` on **every** check (including rate-limit
29//!    rejections) so an actively-firing attacker cannot dodge eviction by
30//!    appearing idle.
31//! 5. Optionally spawning a best-effort background prune task. Cap
32//!    enforcement does **not** depend on this task running -- it is
33//!    purely an optimization that reclaims memory between admission
34//!    events.
35//!
36//! # Trade-offs
37//!
38//! - When a previously-evicted key reappears it gets a **fresh** quota.
39//!   This is documented behaviour: a key under sustained load keeps its
40//!   `last_seen` updated and therefore is never evicted; eviction only
41//!   targets idle keys.
42//! - The map uses [`std::sync::Mutex`] (not [`tokio::sync::Mutex`]) since
43//!   admission checks must be synchronous and never `.await`.
44//! - We do not log inside the critical section.
45
46use std::{
47    collections::HashMap,
48    hash::Hash,
49    num::NonZeroU32,
50    sync::{Arc, Mutex, PoisonError, Weak},
51    time::{Duration, Instant},
52};
53
54use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
55
56/// Reason a [`BoundedKeyedLimiter::check_key`] call rejected a request.
57///
58/// Currently only carries a single variant; modelled as an enum (rather
59/// than a unit struct) so callers can `match` exhaustively and to leave
60/// room for future reasons (e.g. burst-debt or distinct quota classes).
61#[non_exhaustive]
62#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
63pub enum BoundedLimiterError {
64    /// The key has exceeded its per-key quota for the current window.
65    #[error("rate limit exceeded for key")]
66    RateLimited,
67}
68
69/// Per-key limiter entry: the underlying direct limiter plus the wall-clock
70/// timestamp of the most recent admission attempt for this key.
71struct Entry {
72    limiter: DefaultDirectRateLimiter,
73    last_seen: Instant,
74}
75
76/// Inner shared state. Held behind an [`Arc`] in [`BoundedKeyedLimiter`]
77/// and a [`Weak`] inside the optional background prune task so the task
78/// self-terminates once the limiter is dropped.
79struct Inner<K: Eq + Hash + Clone> {
80    map: Mutex<HashMap<K, Entry>>,
81    quota: Quota,
82    max_tracked_keys: usize,
83    idle_eviction: Duration,
84}
85
86/// Memory-bounded keyed rate limiter.
87///
88/// Cheaply cloneable; clones share state.
89#[allow(
90    missing_debug_implementations,
91    reason = "wraps governor RateLimiter which has no Debug impl"
92)]
93pub struct BoundedKeyedLimiter<K: Eq + Hash + Clone> {
94    inner: Arc<Inner<K>>,
95}
96
97impl<K: Eq + Hash + Clone> Clone for BoundedKeyedLimiter<K> {
98    fn clone(&self) -> Self {
99        Self {
100            inner: Arc::clone(&self.inner),
101        }
102    }
103}
104
105impl<K: Eq + Hash + Clone + Send + Sync + 'static> BoundedKeyedLimiter<K> {
106    /// Create a new bounded keyed limiter.
107    ///
108    /// * `quota` -- the per-key rate-limit quota applied to every entry.
109    /// * `max_tracked_keys` -- hard cap on the number of simultaneously
110    ///   tracked keys. When reached, an insert first prunes idle entries
111    ///   then falls back to LRU eviction.
112    /// * `idle_eviction` -- entries whose `last_seen` is older than this
113    ///   are eligible for opportunistic pruning.
114    ///
115    /// # Background prune task
116    ///
117    /// If a Tokio runtime is available at construction time, a best-effort
118    /// background task is spawned that periodically prunes idle entries.
119    /// Cap enforcement does **not** depend on this task; it is purely an
120    /// optimisation that reclaims memory between admission events. The
121    /// task self-terminates when the last [`BoundedKeyedLimiter`] clone is
122    /// dropped (it holds only a [`Weak`] reference to the inner state).
123    ///
124    /// If no Tokio runtime is available (e.g. unit tests using
125    /// `#[test]` rather than `#[tokio::test]`), no task is spawned and
126    /// pruning happens lazily on every full-table insert. Both behaviours
127    /// are correct.
128    #[must_use]
129    pub(crate) fn new(quota: Quota, max_tracked_keys: usize, idle_eviction: Duration) -> Self {
130        debug_assert!(
131            max_tracked_keys > 0,
132            "max_tracked_keys must be > 0; validated by McpServerConfig::check"
133        );
134        let inner = Arc::new(Inner {
135            map: Mutex::new(HashMap::new()),
136            quota,
137            max_tracked_keys,
138            idle_eviction,
139        });
140        Self::spawn_prune_task(&inner);
141        Self { inner }
142    }
143
144    /// Construct a [`BoundedKeyedLimiter`] with a per-minute quota.
145    ///
146    /// Convenience constructor that builds a per-minute [`Quota`] from
147    /// `requests_per_minute`. The rate is clamped to a minimum of `1`
148    /// request/min so a misconfigured `0` does not panic at startup.
149    ///
150    /// * `requests_per_minute` -- per-key rate, clamped to `>= 1`.
151    /// * `max_tracked_keys` -- hard cap on simultaneously tracked keys.
152    ///   When reached, an insert first prunes idle entries then falls
153    ///   back to LRU eviction.
154    /// * `idle_eviction` -- entries whose `last_seen` is older than this
155    ///   are eligible for opportunistic pruning.
156    #[must_use]
157    pub fn with_per_minute(
158        requests_per_minute: u32,
159        max_tracked_keys: usize,
160        idle_eviction: Duration,
161    ) -> Self {
162        let rate = NonZeroU32::new(requests_per_minute.max(1)).unwrap_or(NonZeroU32::MIN);
163        Self::new(Quota::per_minute(rate), max_tracked_keys, idle_eviction)
164    }
165
166    /// Construct a [`BoundedKeyedLimiter`] with a per-second quota.
167    ///
168    /// Convenience constructor that builds a per-second [`Quota`] from
169    /// `requests_per_second`. The rate is clamped to a minimum of `1`
170    /// request/sec so a misconfigured `0` does not panic at startup.
171    ///
172    /// * `requests_per_second` -- per-key rate, clamped to `>= 1`.
173    /// * `max_tracked_keys` -- hard cap on simultaneously tracked keys.
174    ///   When reached, an insert first prunes idle entries then falls
175    ///   back to LRU eviction.
176    /// * `idle_eviction` -- entries whose `last_seen` is older than this
177    ///   are eligible for opportunistic pruning.
178    #[must_use]
179    pub fn with_per_second(
180        requests_per_second: u32,
181        max_tracked_keys: usize,
182        idle_eviction: Duration,
183    ) -> Self {
184        let rate = NonZeroU32::new(requests_per_second.max(1)).unwrap_or(NonZeroU32::MIN);
185        Self::new(Quota::per_second(rate), max_tracked_keys, idle_eviction)
186    }
187
188    /// Spawn the optional background prune task. No-op if there is no
189    /// current Tokio runtime.
190    fn spawn_prune_task(inner: &Arc<Inner<K>>) {
191        let Ok(handle) = tokio::runtime::Handle::try_current() else {
192            return;
193        };
194        let weak: Weak<Inner<K>> = Arc::downgrade(inner);
195        // Prune at most once every quarter of `idle_eviction`, but never
196        // less than once per minute (to avoid waking up too often when
197        // operators configure a very long eviction window).
198        let interval = (inner.idle_eviction / 4).max(Duration::from_mins(1));
199        handle.spawn(async move {
200            let mut ticker = tokio::time::interval(interval);
201            // We just woke up from `Handle::spawn`; don't burn the first tick.
202            ticker.tick().await;
203            loop {
204                ticker.tick().await;
205                let Some(inner) = weak.upgrade() else {
206                    return;
207                };
208                Self::prune_idle(&inner);
209            }
210        });
211    }
212
213    /// Drop entries whose `last_seen` is older than `idle_eviction`.
214    fn prune_idle(inner: &Inner<K>) {
215        let mut guard = inner.map.lock().unwrap_or_else(PoisonError::into_inner);
216        let cutoff = Instant::now()
217            .checked_sub(inner.idle_eviction)
218            .unwrap_or_else(Instant::now);
219        guard.retain(|_, entry| entry.last_seen >= cutoff);
220    }
221
222    /// Evict the single entry with the oldest `last_seen`. Caller must hold
223    /// the map lock. Used only when the table is full *after* idle pruning.
224    fn evict_lru(map: &mut HashMap<K, Entry>) {
225        let oldest_key = map
226            .iter()
227            .min_by_key(|(_, entry)| entry.last_seen)
228            .map(|(k, _)| k.clone());
229        if let Some(key) = oldest_key {
230            map.remove(&key);
231        }
232    }
233
234    /// Test the per-key quota for `key`.
235    ///
236    /// Returns `Ok(())` if the request is allowed. The `last_seen`
237    /// timestamp is updated on **every** call -- including rate-limit
238    /// rejections -- so an actively firing attacker cannot age out into
239    /// a fresh quota by appearing idle.
240    ///
241    /// When inserting a new key into a full table, idle entries are pruned
242    /// first; if the table is still full, the entry with the oldest
243    /// `last_seen` is evicted (LRU). The new key is always inserted --
244    /// honest new clients are never rejected because the table is full.
245    ///
246    /// # Errors
247    ///
248    /// Returns [`BoundedLimiterError::RateLimited`] when `key` has
249    /// exceeded its per-key quota for the current window.
250    pub fn check_key(&self, key: &K) -> Result<(), BoundedLimiterError> {
251        let mut guard = self
252            .inner
253            .map
254            .lock()
255            .unwrap_or_else(PoisonError::into_inner);
256        let now = Instant::now();
257        if let Some(entry) = guard.get_mut(key) {
258            entry.last_seen = now;
259            return entry
260                .limiter
261                .check()
262                .map_err(|_| BoundedLimiterError::RateLimited);
263        }
264        // New key: make room if necessary, then insert.
265        if guard.len() >= self.inner.max_tracked_keys {
266            // Prune idle first.
267            let cutoff = now
268                .checked_sub(self.inner.idle_eviction)
269                .unwrap_or_else(Instant::now);
270            guard.retain(|_, entry| entry.last_seen >= cutoff);
271            // If still full, evict LRU.
272            if guard.len() >= self.inner.max_tracked_keys {
273                Self::evict_lru(&mut guard);
274            }
275        }
276        let limiter = RateLimiter::direct(self.inner.quota);
277        let result = limiter
278            .check()
279            .map_err(|_| BoundedLimiterError::RateLimited);
280        guard.insert(
281            key.clone(),
282            Entry {
283                limiter,
284                last_seen: now,
285            },
286        );
287        result
288    }
289
290    /// Number of currently tracked keys. Used by tests and admin endpoints.
291    #[must_use]
292    pub fn len(&self) -> usize {
293        self.inner
294            .map
295            .lock()
296            .unwrap_or_else(PoisonError::into_inner)
297            .len()
298    }
299
300    /// `true` when no keys are currently tracked.
301    #[must_use]
302    pub fn is_empty(&self) -> bool {
303        self.len() == 0
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use std::{net::IpAddr, num::NonZeroU32, time::Duration};
310
311    use governor::Quota;
312
313    use super::BoundedKeyedLimiter;
314
315    fn ip(n: u32) -> IpAddr {
316        IpAddr::from(n.to_be_bytes())
317    }
318
319    /// The hard cap on tracked keys must never be exceeded, even under a
320    /// stream of distinct keys far larger than the cap.
321    #[test]
322    fn never_exceeds_max_tracked_keys() {
323        let quota = Quota::per_minute(NonZeroU32::new(10).unwrap());
324        let limiter: BoundedKeyedLimiter<IpAddr> =
325            BoundedKeyedLimiter::new(quota, 100, Duration::from_hours(1));
326        for i in 0..10_000_u32 {
327            let _ = limiter.check_key(&ip(i));
328            assert!(
329                limiter.len() <= 100,
330                "tracked keys exceeded cap at iteration {i}: {} > 100",
331                limiter.len()
332            );
333        }
334        assert_eq!(limiter.len(), 100, "table should be full at the cap");
335    }
336
337    /// When a previously-evicted key reappears, it must get a fresh quota.
338    /// This is *documented* behaviour, not a bug: keys under sustained
339    /// load keep their `last_seen` updated and therefore are not evicted.
340    #[test]
341    fn evicted_keys_get_fresh_quota() {
342        let quota = Quota::per_minute(NonZeroU32::new(2).unwrap());
343        let limiter: BoundedKeyedLimiter<IpAddr> =
344            BoundedKeyedLimiter::new(quota, 2, Duration::from_hours(1));
345
346        let target = ip(1);
347        // Burn the quota for `target`.
348        assert!(limiter.check_key(&target).is_ok(), "first ok");
349        assert!(limiter.check_key(&target).is_ok(), "second ok");
350        assert!(limiter.check_key(&target).is_err(), "third blocked");
351
352        // Force eviction by inserting two unrelated keys (cap = 2). The
353        // attacker (`target`) is rate-limited -- it has a *recent*
354        // `last_seen` because of the failed check above. So inserting
355        // two new keys must NOT evict the attacker; instead one of the
356        // *other* unrelated keys gets evicted via LRU. We therefore
357        // need three unrelated keys to push `target` out by LRU.
358        //
359        // Sleep a tiny amount so unrelated keys have strictly newer
360        // last_seen than `target`'s last write.
361        std::thread::sleep(Duration::from_millis(5));
362        let _ = limiter.check_key(&ip(2));
363        std::thread::sleep(Duration::from_millis(5));
364        let _ = limiter.check_key(&ip(3));
365        // `target` is now the oldest entry; cap is 2. ip(3) eviction LRU'd
366        // either ip(2) or `target`. Inserting ip(4) again forces another
367        // eviction. After enough fresh inserts, `target` is gone.
368        std::thread::sleep(Duration::from_millis(5));
369        let _ = limiter.check_key(&ip(4));
370        std::thread::sleep(Duration::from_millis(5));
371        let _ = limiter.check_key(&ip(5));
372
373        // `target` should have been evicted by now -- a fresh check_key
374        // re-inserts with a fresh quota.
375        assert!(
376            limiter.check_key(&target).is_ok(),
377            "evicted key gets a fresh quota on reappearance"
378        );
379    }
380
381    /// An actively over-quota key must NOT be evicted just because new
382    /// keys are knocking. `last_seen` is updated on every check including
383    /// rate-limit rejections, so the attacker stays at the front of the
384    /// LRU queue. Other (older) entries are evicted instead.
385    #[test]
386    fn active_over_quota_key_not_evicted() {
387        let quota = Quota::per_minute(NonZeroU32::new(2).unwrap());
388        let limiter: BoundedKeyedLimiter<IpAddr> =
389            BoundedKeyedLimiter::new(quota, 3, Duration::from_hours(1));
390
391        // Seed the table with three idle entries so cap is reached.
392        for i in 100..103_u32 {
393            let _ = limiter.check_key(&ip(i));
394        }
395        assert_eq!(limiter.len(), 3);
396
397        // The attacker now starts firing. First two are allowed
398        // (fills quota), then we expect refusals -- but each refusal
399        // updates last_seen so the attacker stays "current".
400        std::thread::sleep(Duration::from_millis(5));
401        let attacker = ip(200);
402        // Inserting attacker evicts one of the older keys (cap=3).
403        let _ = limiter.check_key(&attacker);
404        let _ = limiter.check_key(&attacker);
405
406        // Interleave attacker hits with new-key knocks. The attacker
407        // keeps firing (last_seen always current), so when new keys
408        // arrive and force eviction, the LRU victim must be one of the
409        // *other* (older) entries, not the attacker.
410        for new_key in 300..310_u32 {
411            std::thread::sleep(Duration::from_millis(2));
412            let _ = limiter.check_key(&attacker); // attacker stays current
413            std::thread::sleep(Duration::from_millis(2));
414            let _ = limiter.check_key(&ip(new_key)); // forces eviction
415        }
416
417        // One final attacker hit immediately before the assertion to
418        // ensure no other key has been touched more recently.
419        let _ = limiter.check_key(&attacker);
420
421        // Attacker must STILL be rate-limited (quota exhausted, not a
422        // freshly-allocated entry). The check returns Err because the
423        // existing entry with exhausted quota is still there.
424        assert!(
425            limiter.check_key(&attacker).is_err(),
426            "actively over-quota attacker must not be evicted into a fresh quota"
427        );
428    }
429}