arcly_http/resilience/distributed_rate_limit.rs
1//! Cluster-wide rate limiting behind a pluggable async backend.
2//!
3//! The local [`RateLimit`](crate::resilience::RateLimit) is per-instance: on a
4//! 10-replica deployment a "30 req/min" limit silently becomes 300 req/min and
5//! resets on every pod restart. `DistributedRateLimit` delegates the counting
6//! to a shared backend (Redis sliding window in the enterprise example) so the
7//! limit holds across the whole fleet.
8//!
9//! ## Design rules
10//!
11//! - **Async by nature** — a network counter cannot hide behind the sync
12//! [`Guard`](crate::auth::guards::Guard) trait, so this type exposes
13//! `async fn check(&ctx)`; handlers `.await` it explicitly. No `block_on`,
14//! no executor stalls.
15//! - **One atomic round-trip** — `RateLimitBackend::hit` must count and decide
16//! in a single atomic operation (Lua script / GETDEL-style). Split
17//! check-then-increment is a TOCTOU bug, the same class this codebase
18//! already eliminated from refresh-token rotation.
19//! - **Explicit failure policy** — "backend down" is its own decision
20//! ([`RateDecision::Unavailable`]); whether that allows or denies is the
21//! caller's declared [`FailurePolicy`], never an accident of error mapping.
22//! - **Optional by construction** — no backend in the DI container means the
23//! check is a no-op, so dev environments need zero configuration.
24
25use std::net::IpAddr;
26
27use futures::future::BoxFuture;
28
29use crate::web::{Error, RequestContext};
30
31/// Outcome of one counted hit against the shared limiter.
32pub enum RateDecision {
33 Allow {
34 remaining: u32,
35 },
36 Deny {
37 retry_after_secs: u32,
38 },
39 /// The backend did not answer — resolved by [`FailurePolicy`].
40 Unavailable,
41}
42
43/// Shared counting backend (Redis, Memcached, …). Object-safe via `BoxFuture`,
44/// following the same pattern as `SessionStore` and `OAuth2Provider`.
45///
46/// Implementations MUST count-and-decide atomically in a single round-trip.
47pub trait RateLimitBackend: Send + Sync + 'static {
48 fn hit<'a>(&'a self, key: &'a str, max: u32, window_secs: u32) -> BoxFuture<'a, RateDecision>;
49}
50
51/// What to do when the backend is unreachable.
52#[derive(Clone, Copy, PartialEq, Eq)]
53pub enum FailurePolicy {
54 /// Let traffic through (availability over strictness) — sane default.
55 FailOpen,
56 /// Reject with `503` — for abuse-sensitive endpoints like `/auth/login`.
57 FailClosed,
58}
59
60/// Cluster-wide limiter. Build as a `static` and call from handlers:
61///
62/// ```ignore
63/// static LOGIN_RATE: DistributedRateLimit =
64/// DistributedRateLimit::new("login", 10, 60).fail_closed();
65///
66/// // in the handler:
67/// LOGIN_RATE.check(&ctx).await?;
68/// ```
69pub struct DistributedRateLimit {
70 pub name: &'static str,
71 pub max: u32,
72 pub window_secs: u32,
73 pub policy: FailurePolicy,
74}
75
76impl DistributedRateLimit {
77 pub const fn new(name: &'static str, max: u32, window_secs: u32) -> Self {
78 Self {
79 name,
80 max,
81 window_secs,
82 policy: FailurePolicy::FailOpen,
83 }
84 }
85
86 /// Reject with `503` when the backend is down (instead of letting traffic through).
87 pub const fn fail_closed(mut self) -> Self {
88 self.policy = FailurePolicy::FailClosed;
89 self
90 }
91
92 /// Rate-limit key for this request's principal.
93 ///
94 /// Authenticated identity (`sub` claim) wins over network address: it
95 /// keeps users behind one NAT from sharing a bucket, and an attacker with
96 /// a stolen account can't escape their per-user limit by rotating IPs.
97 /// Anonymous traffic falls back to the first `X-Forwarded-For` hop.
98 fn principal(ctx: &RequestContext) -> String {
99 if let Some(sub) = ctx
100 .claims()
101 .and_then(|c| c.get("sub"))
102 .and_then(|v| v.as_str())
103 {
104 return format!("sub:{sub}");
105 }
106 ctx.header("x-forwarded-for")
107 .and_then(|h| h.split(',').next())
108 .and_then(|s| s.trim().parse::<IpAddr>().ok())
109 .map(|ip| format!("ip:{ip}"))
110 .unwrap_or_else(|| "anon".to_owned())
111 }
112
113 pub async fn check(&self, ctx: &RequestContext) -> Result<(), Error> {
114 // No backend registered → limiter disabled (dev mode). The local
115 // `RateLimit` guard can still provide per-instance protection.
116 let Some(backend) = ctx.try_inject::<Box<dyn RateLimitBackend>>() else {
117 return Ok(());
118 };
119
120 let key = format!("rl:{}:{}", self.name, Self::principal(ctx));
121 match backend.hit(&key, self.max, self.window_secs).await {
122 RateDecision::Allow { .. } => Ok(()),
123 RateDecision::Deny { .. } => Err(Error::TooManyRequests),
124 RateDecision::Unavailable => match self.policy {
125 FailurePolicy::FailOpen => Ok(()),
126 FailurePolicy::FailClosed => {
127 Err(Error::ServiceUnavailable("rate limit backend unavailable"))
128 }
129 },
130 }
131 }
132}