pas_external/audit/rate_limit.rs
1//! `RateLimiter` port + `MemoryRateLimiter` token-bucket adapter (M49).
2//!
3//! See module-level docs in `audit/mod.rs` for the broader composition
4//! story (`RateLimitedAuditSink<S, L>` lands in Phase 9.C).
5//!
6//! ── Why a separate port from `AuditSink` ────────────────────────────────
7//!
8//! Per the Phase 9 architecture-watch (`feedback_planning_for_healthy_architecture`):
9//! `RateLimiter` has more than one consumer planned — Phase 9.C composes
10//! it into [`RateLimitedAuditSink`](super::rate_limited_sink::RateLimitedAuditSink)
11//! (when 9.C lands), but Phase 10.11 (RP middleware nonce-store throttle),
12//! OAuth callback PKCE-attempt throttle, and any future verify-side
13//! throttling all benefit from the same port. Single-use abstraction
14//! would force later callers to either re-invent or import an
15//! audit-flavored type.
16//!
17//! ── Substrate-failure policy: `bool`, not `Result<bool, _>` ────────────
18//!
19//! The trait returns plain `bool`. Each adapter chooses its own
20//! substrate-failure policy and embeds it: [`MemoryRateLimiter`] has
21//! no substrate to fail; a future `KvrocksRateLimiter` chooses
22//! fail-open (admit on outage — audit-pipeline use case) or
23//! fail-closed (deny on outage — hard-throttle use case) at
24//! construction time. Pushing the policy onto callers via `Result`
25//! would force every call site to make the same decision identically
26//! — an anti-pattern for a port whose policy is substrate-specific.
27//!
28//! ── HPA replicas implications ───────────────────────────────────────────
29//!
30//! [`MemoryRateLimiter`] is per-process: under HPA `replicas=10`, each
31//! pod tolerates `capacity` failures/window independently — total
32//! `10 * capacity` allowed. This is the right default for "log-flood
33//! DoS on the audit pipeline" (the threat model is each replica's
34//! local logging budget); a coordinated attacker would still need to
35//! flood every replica. Distributed substrates (KVRocks, Redis) come
36//! later when use cases need per-account global limits (Phase 10.11+).
37
38use std::collections::HashMap;
39use std::sync::Mutex;
40use std::time::{Duration, Instant};
41
42use async_trait::async_trait;
43
44use super::RateLimitKey;
45
46/// Per-source rate-limiting port (M49).
47///
48/// Returns `true` when the call is admitted (under quota), `false`
49/// when rate-limited. Adapters are responsible for atomic
50/// check-and-decrement semantics — naive split shapes
51/// (`check + decrement`) leak admits under concurrent calls.
52/// Single-method enforces atomicity at the trait surface.
53///
54/// `&self` (not `&mut self`) so a single limiter serves many
55/// concurrent callers; interior mutability lives in the adapter.
56#[async_trait]
57pub trait RateLimiter: std::fmt::Debug + Send + Sync {
58 async fn allow(&self, key: &RateLimitKey) -> bool;
59}
60
61/// In-memory token-bucket limiter — Phase 9 default substrate.
62///
63/// Per-key independent buckets. Each bucket holds up to `capacity`
64/// tokens; one token is consumed per `allow` call. Buckets refill at
65/// `capacity / window` tokens per second.
66///
67/// Algorithm: **lazy refill on access** — no background timer needed.
68/// Bucket state is `(tokens_remaining, last_refill_instant)`. On every
69/// `allow`, compute elapsed since `last_refill`, add proportional
70/// tokens (capped at `capacity` so a long-idle bucket doesn't bank
71/// more than its instantaneous quota), then attempt consume.
72///
73/// **Capacity semantics**: how many failures tolerated in a refill
74/// window. For "audit emission rate-limiting" with `capacity=10`,
75/// `window=60s`: each (client, kid) bucket admits 10 audit events
76/// per minute, then refuses until the next refill epoch. Bursts of 10
77/// admit instantly; sustained pressure drops to 1-per-6s steady state.
78#[derive(Debug)]
79pub struct MemoryRateLimiter {
80 capacity: u32,
81 window: Duration,
82 buckets: Mutex<HashMap<RateLimitKey, Bucket>>,
83}
84
85#[derive(Debug)]
86struct Bucket {
87 tokens: f64,
88 last_refill: Instant,
89}
90
91impl MemoryRateLimiter {
92 /// Construct a limiter with `capacity` tokens per `window`.
93 ///
94 /// `assert!` invariants — these are configuration bugs that
95 /// should fail loudly at startup, not silently at first failure
96 /// (per `feedback_audit_grilled_decisions` "easy path" check).
97 /// Both arguments are user-supplied at builder time, so this
98 /// runs once per verifier construction, not on the hot path.
99 ///
100 /// # Panics
101 ///
102 /// - if `capacity == 0` (every call would refuse)
103 /// - if `window` is zero (refill rate undefined)
104 #[must_use]
105 pub fn new(capacity: u32, window: Duration) -> Self {
106 assert!(capacity > 0, "RateLimiter capacity must be > 0");
107 assert!(!window.is_zero(), "RateLimiter window must be > 0");
108 Self {
109 capacity,
110 window,
111 buckets: Mutex::new(HashMap::new()),
112 }
113 }
114
115 fn refill_per_second(&self) -> f64 {
116 f64::from(self.capacity) / self.window.as_secs_f64()
117 }
118}
119
120impl Default for MemoryRateLimiter {
121 /// Default: 10 admits per 60s per key. Reasonable for audit
122 /// emission throttling — one event every 6s per source under
123 /// sustained pressure; bursts of 10 admit instantly.
124 fn default() -> Self {
125 Self::new(10, Duration::from_secs(60))
126 }
127}
128
129#[async_trait]
130impl RateLimiter for MemoryRateLimiter {
131 async fn allow(&self, key: &RateLimitKey) -> bool {
132 let mut buckets = self
133 .buckets
134 .lock()
135 .unwrap_or_else(|poisoned| poisoned.into_inner());
136 let now = Instant::now();
137 let bucket = buckets.entry(key.clone()).or_insert_with(|| Bucket {
138 tokens: f64::from(self.capacity),
139 last_refill: now,
140 });
141
142 // Lazy refill: add tokens based on elapsed wall-clock since
143 // last access. The `.min(capacity)` cap prevents banked tokens
144 // — a long-idle bucket grants only its instantaneous quota,
145 // not the full elapsed-time × refill-rate (which would let an
146 // attacker "save up" by going silent before a burst).
147 let elapsed = now.saturating_duration_since(bucket.last_refill);
148 let earned = elapsed.as_secs_f64() * self.refill_per_second();
149 bucket.tokens = (bucket.tokens + earned).min(f64::from(self.capacity));
150 bucket.last_refill = now;
151
152 if bucket.tokens >= 1.0 {
153 bucket.tokens -= 1.0;
154 true
155 } else {
156 false
157 }
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164
165 /// Bucket admits up to `capacity` calls in immediate succession.
166 /// `capacity=3` proves the parameter is honored, not collapsed.
167 #[tokio::test]
168 async fn bucket_admits_under_quota() {
169 let limiter = MemoryRateLimiter::new(3, Duration::from_secs(60));
170 let key = RateLimitKey::new("rcw::k1");
171 assert!(limiter.allow(&key).await);
172 assert!(limiter.allow(&key).await);
173 assert!(limiter.allow(&key).await);
174 }
175
176 /// (capacity + 1)-th call refuses. The 60s window guarantees no
177 /// refill on this CPU, so no sleep needed — pure logic test.
178 #[tokio::test]
179 async fn bucket_refuses_at_quota() {
180 let limiter = MemoryRateLimiter::new(2, Duration::from_secs(60));
181 let key = RateLimitKey::new("rcw::k1");
182 assert!(limiter.allow(&key).await);
183 assert!(limiter.allow(&key).await);
184 assert!(!limiter.allow(&key).await);
185 }
186
187 /// After the window passes, the bucket refills. Tiny window
188 /// (50ms) so the test runs in real time without flakiness.
189 #[tokio::test]
190 async fn bucket_refills_after_window() {
191 let limiter = MemoryRateLimiter::new(1, Duration::from_millis(50));
192 let key = RateLimitKey::new("rcw::k1");
193 assert!(limiter.allow(&key).await);
194 assert!(!limiter.allow(&key).await);
195 // Sleep slightly longer than window for full refill.
196 tokio::time::sleep(Duration::from_millis(75)).await;
197 assert!(limiter.allow(&key).await);
198 }
199
200 /// Different keys have independent buckets — exhausting one does
201 /// not affect another. Critical for the per-(client, kid) bucket
202 /// strategy from Phase 9 design call (e). A regression here would
203 /// silently bucket all sources together (the (d) "global" pattern),
204 /// defeating the per-source attribution.
205 #[tokio::test]
206 async fn key_isolation_keeps_buckets_independent() {
207 let limiter = MemoryRateLimiter::new(1, Duration::from_secs(60));
208 let key_a = RateLimitKey::new("rcw::k1");
209 let key_b = RateLimitKey::new("rcw::k2"); // same client, different kid
210 let key_c = RateLimitKey::new("ctw::k1"); // different client
211
212 assert!(limiter.allow(&key_a).await);
213 assert!(limiter.allow(&key_b).await);
214 assert!(limiter.allow(&key_c).await);
215
216 // All three exhausted; further calls refuse independently.
217 assert!(!limiter.allow(&key_a).await);
218 assert!(!limiter.allow(&key_b).await);
219 assert!(!limiter.allow(&key_c).await);
220 }
221
222 /// Default constructor (10 / 60s) admits initial burst of 10,
223 /// then refuses. Documents the magic numbers in the code.
224 #[tokio::test]
225 async fn default_admits_initial_burst_then_refuses() {
226 let limiter = MemoryRateLimiter::default();
227 let key = RateLimitKey::new("default-test");
228 for _ in 0..10 {
229 assert!(limiter.allow(&key).await);
230 }
231 assert!(!limiter.allow(&key).await);
232 }
233
234 /// Compile-time guard: a `MemoryRateLimiter` is usable behind
235 /// `Arc<dyn RateLimiter>` — the runtime shape adapter wrappers
236 /// inject. Object-safety regression catches generic-method
237 /// additions at compile time.
238 #[allow(dead_code)]
239 fn dyn_object_safety() {
240 use std::sync::Arc;
241 let _: Arc<dyn RateLimiter> = Arc::new(MemoryRateLimiter::default());
242 }
243
244 /// Banking attack defense: a bucket idle past `window` does NOT
245 /// grant more than `capacity` tokens. Without the `.min(capacity)`
246 /// cap, an attacker who waits 10×window then bursts would get
247 /// 10×capacity admits.
248 #[tokio::test]
249 async fn idle_bucket_does_not_bank_beyond_capacity() {
250 let limiter = MemoryRateLimiter::new(2, Duration::from_millis(20));
251 let key = RateLimitKey::new("attacker");
252
253 // Initial burst: 2 admits.
254 assert!(limiter.allow(&key).await);
255 assert!(limiter.allow(&key).await);
256 assert!(!limiter.allow(&key).await);
257
258 // Idle for 10× window (200ms) — without the cap, we'd accrue
259 // 2 × 10 = 20 tokens. With the cap, max 2.
260 tokio::time::sleep(Duration::from_millis(200)).await;
261
262 assert!(limiter.allow(&key).await);
263 assert!(limiter.allow(&key).await);
264 assert!(!limiter.allow(&key).await, "cap must hold");
265 }
266}