Skip to main content

arcly_http/resilience/
distributed_rate_limit.rs

1//! Cluster-wide rate limiting behind a pluggable async backend.
2//!
3//! The local [`RateLimit`](crate::resilience::RateLimit) is per-instance: on a
4//! 10-replica deployment a "30 req/min" limit silently becomes 300 req/min and
5//! resets on every pod restart. `DistributedRateLimit` delegates the counting
6//! to a shared backend (Redis sliding window in the enterprise example) so the
7//! limit holds across the whole fleet.
8//!
9//! ## Design rules
10//!
11//! - **Async by nature** — a network counter cannot hide behind the sync
12//!   [`Guard`](crate::auth::guards::Guard) trait, so this type exposes
13//!   `async fn check(&ctx)`; handlers `.await` it explicitly. No `block_on`,
14//!   no executor stalls.
15//! - **One atomic round-trip** — `RateLimitBackend::hit` must count and decide
16//!   in a single atomic operation (Lua script / GETDEL-style). Split
17//!   check-then-increment is a TOCTOU bug, the same class this codebase
18//!   already eliminated from refresh-token rotation.
19//! - **Explicit failure policy** — "backend down" is its own decision
20//!   ([`RateDecision::Unavailable`]); whether that allows or denies is the
21//!   caller's declared [`FailurePolicy`], never an accident of error mapping.
22//! - **Optional by construction** — no backend in the DI container means the
23//!   check is a no-op, so dev environments need zero configuration.
24
25use std::net::IpAddr;
26
27use futures::future::BoxFuture;
28
29use crate::web::{Error, RequestContext};
30
31/// Outcome of one counted hit against the shared limiter.
32pub enum RateDecision {
33    Allow {
34        remaining: u32,
35    },
36    Deny {
37        retry_after_secs: u32,
38    },
39    /// The backend did not answer — resolved by [`FailurePolicy`].
40    Unavailable,
41}
42
43/// Shared counting backend (Redis, Memcached, …). Object-safe via `BoxFuture`,
44/// following the same pattern as `SessionStore` and `OAuth2Provider`.
45///
46/// Implementations MUST count-and-decide atomically in a single round-trip.
47pub trait RateLimitBackend: Send + Sync + 'static {
48    fn hit<'a>(&'a self, key: &'a str, max: u32, window_secs: u32) -> BoxFuture<'a, RateDecision>;
49}
50
51/// What to do when the backend is unreachable.
52#[derive(Clone, Copy, PartialEq, Eq)]
53pub enum FailurePolicy {
54    /// Let traffic through (availability over strictness) — sane default.
55    FailOpen,
56    /// Reject with `503` — for abuse-sensitive endpoints like `/auth/login`.
57    FailClosed,
58}
59
60/// Cluster-wide limiter. Build as a `static` and call from handlers:
61///
62/// ```ignore
63/// static LOGIN_RATE: DistributedRateLimit =
64///     DistributedRateLimit::new("login", 10, 60).fail_closed();
65///
66/// // in the handler:
67/// LOGIN_RATE.check(&ctx).await?;
68/// ```
69pub struct DistributedRateLimit {
70    pub name: &'static str,
71    pub max: u32,
72    pub window_secs: u32,
73    pub policy: FailurePolicy,
74}
75
76impl DistributedRateLimit {
77    pub const fn new(name: &'static str, max: u32, window_secs: u32) -> Self {
78        Self {
79            name,
80            max,
81            window_secs,
82            policy: FailurePolicy::FailOpen,
83        }
84    }
85
86    /// Reject with `503` when the backend is down (instead of letting traffic through).
87    pub const fn fail_closed(mut self) -> Self {
88        self.policy = FailurePolicy::FailClosed;
89        self
90    }
91
92    /// Rate-limit key for this request's principal.
93    ///
94    /// Authenticated identity (`sub` claim) wins over network address: it
95    /// keeps users behind one NAT from sharing a bucket, and an attacker with
96    /// a stolen account can't escape their per-user limit by rotating IPs.
97    /// Anonymous traffic falls back to the first `X-Forwarded-For` hop.
98    fn principal(ctx: &RequestContext) -> String {
99        if let Some(sub) = ctx
100            .claims()
101            .and_then(|c| c.get("sub"))
102            .and_then(|v| v.as_str())
103        {
104            return format!("sub:{sub}");
105        }
106        ctx.header("x-forwarded-for")
107            .and_then(|h| h.split(',').next())
108            .and_then(|s| s.trim().parse::<IpAddr>().ok())
109            .map(|ip| format!("ip:{ip}"))
110            .unwrap_or_else(|| "anon".to_owned())
111    }
112
113    pub async fn check(&self, ctx: &RequestContext) -> Result<(), Error> {
114        // No backend registered → limiter disabled (dev mode). The local
115        // `RateLimit` guard can still provide per-instance protection.
116        let Some(backend) = ctx.try_inject::<Box<dyn RateLimitBackend>>() else {
117            return Ok(());
118        };
119
120        let key = format!("rl:{}:{}", self.name, Self::principal(ctx));
121        match backend.hit(&key, self.max, self.window_secs).await {
122            RateDecision::Allow { .. } => Ok(()),
123            RateDecision::Deny { .. } => Err(Error::TooManyRequests),
124            RateDecision::Unavailable => match self.policy {
125                FailurePolicy::FailOpen => Ok(()),
126                FailurePolicy::FailClosed => {
127                    Err(Error::ServiceUnavailable("rate limit backend unavailable"))
128                }
129            },
130        }
131    }
132}