Skip to main content

pas_external/audit/
rate_limit.rs

1//! `RateLimiter` port + `MemoryRateLimiter` token-bucket adapter (M49).
2//!
3//! See module-level docs in `audit/mod.rs` for the broader composition
4//! story (`RateLimitedAuditSink<S, L>` lands in Phase 9.C).
5//!
6//! ── Why a separate port from `AuditSink` ────────────────────────────────
7//!
8//! Per the Phase 9 architecture-watch (`feedback_planning_for_healthy_architecture`):
9//! `RateLimiter` has more than one consumer planned — Phase 9.C composes
10//! it into [`RateLimitedAuditSink`](super::rate_limited_sink::RateLimitedAuditSink)
11//! (when 9.C lands), but Phase 10.11 (RP middleware nonce-store throttle),
12//! OAuth callback PKCE-attempt throttle, and any future verify-side
13//! throttling all benefit from the same port. Single-use abstraction
14//! would force later callers to either re-invent or import an
15//! audit-flavored type.
16//!
17//! ── Substrate-failure policy: `bool`, not `Result<bool, _>` ────────────
18//!
19//! The trait returns plain `bool`. Each adapter chooses its own
20//! substrate-failure policy and embeds it: [`MemoryRateLimiter`] has
21//! no substrate to fail; a future `KvrocksRateLimiter` chooses
22//! fail-open (admit on outage — audit-pipeline use case) or
23//! fail-closed (deny on outage — hard-throttle use case) at
24//! construction time. Pushing the policy onto callers via `Result`
25//! would force every call site to make the same decision identically
26//! — an anti-pattern for a port whose policy is substrate-specific.
27//!
28//! ── HPA replicas implications ───────────────────────────────────────────
29//!
30//! [`MemoryRateLimiter`] is per-process: under HPA `replicas=10`, each
31//! pod tolerates `capacity` failures/window independently — total
32//! `10 * capacity` allowed. This is the right default for "log-flood
33//! DoS on the audit pipeline" (the threat model is each replica's
34//! local logging budget); a coordinated attacker would still need to
35//! flood every replica. Distributed substrates (KVRocks, Redis) come
36//! later when use cases need per-account global limits (Phase 10.11+).
37
38use std::collections::HashMap;
39use std::sync::Mutex;
40use std::time::{Duration, Instant};
41
42use async_trait::async_trait;
43
44use super::RateLimitKey;
45
46/// Per-source rate-limiting port (M49).
47///
48/// Returns `true` when the call is admitted (under quota), `false`
49/// when rate-limited. Adapters are responsible for atomic
50/// check-and-decrement semantics — naive split shapes
51/// (`check + decrement`) leak admits under concurrent calls.
52/// Single-method enforces atomicity at the trait surface.
53///
54/// `&self` (not `&mut self`) so a single limiter serves many
55/// concurrent callers; interior mutability lives in the adapter.
56#[async_trait]
57pub trait RateLimiter: std::fmt::Debug + Send + Sync {
58    async fn allow(&self, key: &RateLimitKey) -> bool;
59}
60
61/// In-memory token-bucket limiter — Phase 9 default substrate.
62///
63/// Per-key independent buckets. Each bucket holds up to `capacity`
64/// tokens; one token is consumed per `allow` call. Buckets refill at
65/// `capacity / window` tokens per second.
66///
67/// Algorithm: **lazy refill on access** — no background timer needed.
68/// Bucket state is `(tokens_remaining, last_refill_instant)`. On every
69/// `allow`, compute elapsed since `last_refill`, add proportional
70/// tokens (capped at `capacity` so a long-idle bucket doesn't bank
71/// more than its instantaneous quota), then attempt consume.
72///
73/// **Capacity semantics**: how many failures tolerated in a refill
74/// window. For "audit emission rate-limiting" with `capacity=10`,
75/// `window=60s`: each (client, kid) bucket admits 10 audit events
76/// per minute, then refuses until the next refill epoch. Bursts of 10
77/// admit instantly; sustained pressure drops to 1-per-6s steady state.
78#[derive(Debug)]
79pub struct MemoryRateLimiter {
80    capacity: u32,
81    window: Duration,
82    buckets: Mutex<HashMap<RateLimitKey, Bucket>>,
83}
84
85#[derive(Debug)]
86struct Bucket {
87    tokens: f64,
88    last_refill: Instant,
89}
90
91impl MemoryRateLimiter {
92    /// Construct a limiter with `capacity` tokens per `window`.
93    ///
94    /// `assert!` invariants — these are configuration bugs that
95    /// should fail loudly at startup, not silently at first failure
96    /// (per `feedback_audit_grilled_decisions` "easy path" check).
97    /// Both arguments are user-supplied at builder time, so this
98    /// runs once per verifier construction, not on the hot path.
99    ///
100    /// # Panics
101    ///
102    /// - if `capacity == 0` (every call would refuse)
103    /// - if `window` is zero (refill rate undefined)
104    #[must_use]
105    pub fn new(capacity: u32, window: Duration) -> Self {
106        assert!(capacity > 0, "RateLimiter capacity must be > 0");
107        assert!(!window.is_zero(), "RateLimiter window must be > 0");
108        Self {
109            capacity,
110            window,
111            buckets: Mutex::new(HashMap::new()),
112        }
113    }
114
115    fn refill_per_second(&self) -> f64 {
116        f64::from(self.capacity) / self.window.as_secs_f64()
117    }
118}
119
120impl Default for MemoryRateLimiter {
121    /// Default: 10 admits per 60s per key. Reasonable for audit
122    /// emission throttling — one event every 6s per source under
123    /// sustained pressure; bursts of 10 admit instantly.
124    fn default() -> Self {
125        Self::new(10, Duration::from_secs(60))
126    }
127}
128
129#[async_trait]
130impl RateLimiter for MemoryRateLimiter {
131    async fn allow(&self, key: &RateLimitKey) -> bool {
132        let mut buckets = self
133            .buckets
134            .lock()
135            .unwrap_or_else(|poisoned| poisoned.into_inner());
136        let now = Instant::now();
137        let bucket = buckets.entry(key.clone()).or_insert_with(|| Bucket {
138            tokens: f64::from(self.capacity),
139            last_refill: now,
140        });
141
142        // Lazy refill: add tokens based on elapsed wall-clock since
143        // last access. The `.min(capacity)` cap prevents banked tokens
144        // — a long-idle bucket grants only its instantaneous quota,
145        // not the full elapsed-time × refill-rate (which would let an
146        // attacker "save up" by going silent before a burst).
147        let elapsed = now.saturating_duration_since(bucket.last_refill);
148        let earned = elapsed.as_secs_f64() * self.refill_per_second();
149        bucket.tokens = (bucket.tokens + earned).min(f64::from(self.capacity));
150        bucket.last_refill = now;
151
152        if bucket.tokens >= 1.0 {
153            bucket.tokens -= 1.0;
154            true
155        } else {
156            false
157        }
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164
165    /// Bucket admits up to `capacity` calls in immediate succession.
166    /// `capacity=3` proves the parameter is honored, not collapsed.
167    #[tokio::test]
168    async fn bucket_admits_under_quota() {
169        let limiter = MemoryRateLimiter::new(3, Duration::from_secs(60));
170        let key = RateLimitKey::new("rcw::k1");
171        assert!(limiter.allow(&key).await);
172        assert!(limiter.allow(&key).await);
173        assert!(limiter.allow(&key).await);
174    }
175
176    /// (capacity + 1)-th call refuses. The 60s window guarantees no
177    /// refill on this CPU, so no sleep needed — pure logic test.
178    #[tokio::test]
179    async fn bucket_refuses_at_quota() {
180        let limiter = MemoryRateLimiter::new(2, Duration::from_secs(60));
181        let key = RateLimitKey::new("rcw::k1");
182        assert!(limiter.allow(&key).await);
183        assert!(limiter.allow(&key).await);
184        assert!(!limiter.allow(&key).await);
185    }
186
187    /// After the window passes, the bucket refills. Tiny window
188    /// (50ms) so the test runs in real time without flakiness.
189    #[tokio::test]
190    async fn bucket_refills_after_window() {
191        let limiter = MemoryRateLimiter::new(1, Duration::from_millis(50));
192        let key = RateLimitKey::new("rcw::k1");
193        assert!(limiter.allow(&key).await);
194        assert!(!limiter.allow(&key).await);
195        // Sleep slightly longer than window for full refill.
196        tokio::time::sleep(Duration::from_millis(75)).await;
197        assert!(limiter.allow(&key).await);
198    }
199
200    /// Different keys have independent buckets — exhausting one does
201    /// not affect another. Critical for the per-(client, kid) bucket
202    /// strategy from Phase 9 design call (e). A regression here would
203    /// silently bucket all sources together (the (d) "global" pattern),
204    /// defeating the per-source attribution.
205    #[tokio::test]
206    async fn key_isolation_keeps_buckets_independent() {
207        let limiter = MemoryRateLimiter::new(1, Duration::from_secs(60));
208        let key_a = RateLimitKey::new("rcw::k1");
209        let key_b = RateLimitKey::new("rcw::k2"); // same client, different kid
210        let key_c = RateLimitKey::new("ctw::k1"); // different client
211
212        assert!(limiter.allow(&key_a).await);
213        assert!(limiter.allow(&key_b).await);
214        assert!(limiter.allow(&key_c).await);
215
216        // All three exhausted; further calls refuse independently.
217        assert!(!limiter.allow(&key_a).await);
218        assert!(!limiter.allow(&key_b).await);
219        assert!(!limiter.allow(&key_c).await);
220    }
221
222    /// Default constructor (10 / 60s) admits initial burst of 10,
223    /// then refuses. Documents the magic numbers in the code.
224    #[tokio::test]
225    async fn default_admits_initial_burst_then_refuses() {
226        let limiter = MemoryRateLimiter::default();
227        let key = RateLimitKey::new("default-test");
228        for _ in 0..10 {
229            assert!(limiter.allow(&key).await);
230        }
231        assert!(!limiter.allow(&key).await);
232    }
233
234    /// Compile-time guard: a `MemoryRateLimiter` is usable behind
235    /// `Arc<dyn RateLimiter>` — the runtime shape adapter wrappers
236    /// inject. Object-safety regression catches generic-method
237    /// additions at compile time.
238    #[allow(dead_code)]
239    fn dyn_object_safety() {
240        use std::sync::Arc;
241        let _: Arc<dyn RateLimiter> = Arc::new(MemoryRateLimiter::default());
242    }
243
244    /// Banking attack defense: a bucket idle past `window` does NOT
245    /// grant more than `capacity` tokens. Without the `.min(capacity)`
246    /// cap, an attacker who waits 10×window then bursts would get
247    /// 10×capacity admits.
248    #[tokio::test]
249    async fn idle_bucket_does_not_bank_beyond_capacity() {
250        let limiter = MemoryRateLimiter::new(2, Duration::from_millis(20));
251        let key = RateLimitKey::new("attacker");
252
253        // Initial burst: 2 admits.
254        assert!(limiter.allow(&key).await);
255        assert!(limiter.allow(&key).await);
256        assert!(!limiter.allow(&key).await);
257
258        // Idle for 10× window (200ms) — without the cap, we'd accrue
259        // 2 × 10 = 20 tokens. With the cap, max 2.
260        tokio::time::sleep(Duration::from_millis(200)).await;
261
262        assert!(limiter.allow(&key).await);
263        assert!(limiter.allow(&key).await);
264        assert!(!limiter.allow(&key).await, "cap must hold");
265    }
266}