s4_server/rate_limit.rs
1//! Per-(principal, bucket) token-bucket rate limiting (v0.4 #19).
2//!
3//! Operators describe the rules in JSON:
4//!
5//! ```json
6//! [
7//! {"principal": "AKIATENANT_A", "bucket": "tenant-a-*", "rps": 100, "burst": 500},
8//! {"principal": "*", "bucket": "*", "rps": 20, "burst": 60}
9//! ]
10//! ```
11//!
12//! Match precedence is **most-specific-first** by walk order — the JSON
13//! file's order is preserved, so put narrow rules above wildcards. Wildcards
14//! are simple `*` glob (any sequence) only; `?` is also accepted.
15//!
16//! On each PUT / GET / DELETE / List, the matching rule's bucket consumes
17//! one token. If the bucket is empty the request is rejected with
18//! `S3ErrorCode::SlowDown` (HTTP 503; AWS-spec response for "you're
19//! making requests faster than I can handle"). The
20//! `s4_rate_limit_throttled_total{principal,bucket}` Prometheus counter is
21//! bumped on every reject.
22
23use std::num::NonZeroU32;
24use std::path::Path;
25use std::sync::Arc;
26
27use dashmap::DashMap;
28use governor::clock::DefaultClock;
29use governor::state::{InMemoryState, NotKeyed};
30use governor::{Quota, RateLimiter};
31use serde::Deserialize;
32
33use crate::policy; // re-use the glob_match helper if exposed; otherwise inline below
34
35#[derive(Debug, Clone, Deserialize)]
36pub struct Rule {
37 /// `*` for any principal.
38 pub principal: String,
39 /// `*` for any bucket.
40 pub bucket: String,
41 /// Sustained requests per second.
42 pub rps: u32,
43 /// Initial / replenished bucket size.
44 pub burst: u32,
45}
46
47/// Compiled per-(principal, bucket) limiter pool. Rules are evaluated in
48/// the order they appear in the JSON; the first match wins.
49#[derive(Clone)]
50pub struct RateLimits {
51 rules: Arc<Vec<Rule>>,
52 /// Per-(rule index, principal, bucket) limiters. Created lazily —
53 /// the first request from a given principal/bucket pair instantiates
54 /// the limiter, subsequent requests reuse it.
55 limiters: Arc<DashMap<(usize, String, String), Arc<KeyLimiter>>>,
56 /// v0.8.12 HIGH-11 fix: per-rule shared limiter used as the
57 /// "overflow" fallback once `limiters` is at capacity. Lazily
58 /// initialised per rule on first overflow so steady-state
59 /// workloads under the cap don't allocate.
60 overflow: Arc<DashMap<usize, Arc<KeyLimiter>>>,
61}
62
63/// v0.8.12 HIGH-11 fix: hard cap on the per-(rule, principal, bucket)
64/// limiter pool. Without this, every distinct access-key-id /
65/// bucket-name combo a (potentially attacker-controlled) request
66/// stream presents allocates a fresh `Arc<KeyLimiter>` + a key tuple
67/// that's never reclaimed. At 1k req/s of unique fake principals the
68/// pool grows by ~3.6M entries/hour, each pinning ~200 B of memory
69/// → gateway OOMs in a single day. Once the pool fills, new keys
70/// fold onto a per-rule shared `overflow` limiter — they still get
71/// rate-limited (correctly, by the matching rule's quota), just
72/// share one bucket with every other overflowed key. Operators
73/// who legitimately need millions of distinct principals can raise
74/// the cap via `with_max_active_limiters(...)`; the default is a
75/// trade between memory budget (~3 MiB at this cap) and the realistic
76/// upper bound of distinct principals × buckets in a steady-state
77/// workload.
78pub const DEFAULT_MAX_ACTIVE_LIMITERS: usize = 16_384;
79
80type KeyLimiter = RateLimiter<NotKeyed, InMemoryState, DefaultClock>;
81
82impl RateLimits {
83 pub fn from_json_str(s: &str) -> Result<Self, String> {
84 let rules: Vec<Rule> =
85 serde_json::from_str(s).map_err(|e| format!("rate-limit JSON parse error: {e}"))?;
86 for r in &rules {
87 if r.rps == 0 || r.burst == 0 {
88 return Err(format!(
89 "rate-limit rule has rps=0 or burst=0 (would deny everything): {r:?}"
90 ));
91 }
92 }
93 Ok(Self {
94 rules: Arc::new(rules),
95 limiters: Arc::new(DashMap::new()),
96 overflow: Arc::new(DashMap::new()),
97 })
98 }
99
100 /// v0.8.12 HIGH-11 fix: current per-(rule, principal, bucket)
101 /// limiter count. Surfaced for tests and the
102 /// `rate_limit::active_limiters` Prometheus gauge.
103 pub fn active_limiter_count(&self) -> usize {
104 self.limiters.len()
105 }
106
107 pub fn from_path(path: &Path) -> Result<Self, String> {
108 let txt = std::fs::read_to_string(path)
109 .map_err(|e| format!("failed to read {}: {e}", path.display()))?;
110 Self::from_json_str(&txt)
111 }
112
113 /// Returns `true` if the request passes the limiter, `false` if
114 /// throttled. `principal_id` may be `None` (anonymous); rules with
115 /// `"principal": "*"` still match.
116 pub fn check(&self, principal_id: Option<&str>, bucket: &str) -> bool {
117 let principal = principal_id.unwrap_or("");
118 for (idx, rule) in self.rules.iter().enumerate() {
119 if !glob_match(&rule.principal, principal) {
120 continue;
121 }
122 if !glob_match(&rule.bucket, bucket) {
123 continue;
124 }
125 // v0.8.12 HIGH-11 fix: if the per-key pool is already at
126 // the cap AND we'd be inserting a new entry, fold this
127 // request onto the rule's shared `overflow` limiter
128 // instead. Pool growth is bounded by
129 // `DEFAULT_MAX_ACTIVE_LIMITERS` regardless of how many
130 // distinct principal / bucket strings the request stream
131 // presents.
132 let key = (idx, principal.to_owned(), bucket.to_owned());
133 let limiter = if let Some(existing) = self.limiters.get(&key) {
134 existing.clone()
135 } else if self.limiters.len() >= DEFAULT_MAX_ACTIVE_LIMITERS {
136 self.overflow
137 .entry(idx)
138 .or_insert_with(|| Self::build_limiter(rule))
139 .clone()
140 } else {
141 self.limiters
142 .entry(key)
143 .or_insert_with(|| Self::build_limiter(rule))
144 .clone()
145 };
146 return limiter.check().is_ok();
147 }
148 // No rule matched → no limit applies.
149 true
150 }
151
152 /// Helper: build a fresh `KeyLimiter` from a `Rule`'s `rps` /
153 /// `burst`. Pulled out so the cap-driven overflow fallback and
154 /// the normal-path `or_insert_with` share one construction
155 /// recipe.
156 fn build_limiter(rule: &Rule) -> Arc<KeyLimiter> {
157 let burst = NonZeroU32::new(rule.burst).expect("burst > 0 (validated)");
158 let rps = NonZeroU32::new(rule.rps).expect("rps > 0 (validated)");
159 let quota = Quota::per_second(rps).allow_burst(burst);
160 Arc::new(RateLimiter::direct(quota))
161 }
162}
163
164impl std::fmt::Debug for RateLimits {
165 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166 f.debug_struct("RateLimits")
167 .field("rules", &self.rules.len())
168 .field("active_limiters", &self.limiters.len())
169 .finish()
170 }
171}
172
173pub type SharedRateLimits = Arc<RateLimits>;
174
175/// Local minimal glob — same semantics as policy::glob_match but
176/// re-exposed here so we don't have to expose internals from `policy`.
177/// `*` = any sequence, `?` = any single char. Case-sensitive.
178fn glob_match(pattern: &str, s: &str) -> bool {
179 glob_match_bytes(pattern.as_bytes(), s.as_bytes())
180}
181
182fn glob_match_bytes(p: &[u8], s: &[u8]) -> bool {
183 let mut pi = 0;
184 let mut si = 0;
185 let mut star: Option<(usize, usize)> = None;
186 while si < s.len() {
187 if pi < p.len() && (p[pi] == b'?' || p[pi] == s[si]) {
188 pi += 1;
189 si += 1;
190 } else if pi < p.len() && p[pi] == b'*' {
191 star = Some((pi, si));
192 pi += 1;
193 } else if let Some((sp, ss)) = star {
194 pi = sp + 1;
195 si = ss + 1;
196 star = Some((sp, si));
197 } else {
198 return false;
199 }
200 }
201 while pi < p.len() && p[pi] == b'*' {
202 pi += 1;
203 }
204 pi == p.len()
205}
206
207// Touch a policy item to keep the import live (otherwise unused-imports fires
208// without changing visibility); we use the same matching shape on purpose.
209#[allow(dead_code)]
210fn _link() -> Option<policy::Effect> {
211 None
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use std::time::Duration;
218
219 fn rl(s: &str) -> RateLimits {
220 RateLimits::from_json_str(s).expect("rate-limit parse")
221 }
222
223 #[test]
224 fn parse_rejects_zero_rps_or_burst() {
225 let err = RateLimits::from_json_str(
226 r#"[{"principal": "*", "bucket": "*", "rps": 0, "burst": 10}]"#,
227 )
228 .unwrap_err();
229 assert!(err.contains("rps=0"));
230
231 let err = RateLimits::from_json_str(
232 r#"[{"principal": "*", "bucket": "*", "rps": 1, "burst": 0}]"#,
233 )
234 .unwrap_err();
235 assert!(err.contains("burst=0"));
236 }
237
238 #[test]
239 fn match_principal_and_bucket_globs() {
240 let r = rl(r#"[
241 {"principal": "AKIA*", "bucket": "tenant-a-*", "rps": 1000, "burst": 1000},
242 {"principal": "*", "bucket": "*", "rps": 1, "burst": 1}
243 ]"#);
244 // First rule matches → high quota
245 assert!(r.check(Some("AKIATEST"), "tenant-a-foo"));
246 // Other principal falls to second rule → 1 token left after first
247 assert!(r.check(Some("anonymous"), "any"));
248 // Burst exhausted → throttle
249 assert!(!r.check(Some("anonymous"), "any"));
250 }
251
252 #[test]
253 fn no_rule_means_no_limit() {
254 let r = rl(r#"[{"principal": "AKIATENANT", "bucket": "*", "rps": 1, "burst": 1}]"#);
255 // Different principal → no rule matches → unlimited
256 for _ in 0..100 {
257 assert!(r.check(Some("AKIAOTHER"), "anything"));
258 }
259 }
260
261 #[test]
262 fn refill_after_wait() {
263 let r = rl(r#"[{"principal": "*", "bucket": "*", "rps": 100, "burst": 1}]"#);
264 assert!(r.check(None, "b"));
265 assert!(!r.check(None, "b"));
266 std::thread::sleep(Duration::from_millis(15)); // 100 rps = 1 token / 10 ms
267 assert!(r.check(None, "b"));
268 }
269}