pas-external 0.10.0

Ppoppo Accounts System (PAS) external SDK — OAuth2 PKCE, JWT verification port, Axum middleware, session liveness
Documentation
//! `RateLimiter` port + `MemoryRateLimiter` token-bucket adapter (M49).
//!
//! See module-level docs in `audit/mod.rs` for the broader composition
//! story (`RateLimitedAuditSink<S, L>` lands in Phase 9.C).
//!
//! ── Why a separate port from `AuditSink` ────────────────────────────────
//!
//! Per the Phase 9 architecture-watch (`feedback_planning_for_healthy_architecture`):
//! `RateLimiter` has more than one consumer planned — Phase 9.C composes
//! it into [`RateLimitedAuditSink`](super::rate_limited_sink::RateLimitedAuditSink)
//! (when 9.C lands), but Phase 10.11 (RP middleware nonce-store throttle),
//! OAuth callback PKCE-attempt throttle, and any future verify-side
//! throttling all benefit from the same port. Single-use abstraction
//! would force later callers to either re-invent or import an
//! audit-flavored type.
//!
//! ── Substrate-failure policy: `bool`, not `Result<bool, _>` ────────────
//!
//! The trait returns plain `bool`. Each adapter chooses its own
//! substrate-failure policy and embeds it: [`MemoryRateLimiter`] has
//! no substrate to fail; a future `KvrocksRateLimiter` chooses
//! fail-open (admit on outage — audit-pipeline use case) or
//! fail-closed (deny on outage — hard-throttle use case) at
//! construction time. Pushing the policy onto callers via `Result`
//! would force every call site to make the same decision identically
//! — an anti-pattern for a port whose policy is substrate-specific.
//!
//! ── HPA replicas implications ───────────────────────────────────────────
//!
//! [`MemoryRateLimiter`] is per-process: under HPA `replicas=10`, each
//! pod tolerates `capacity` failures/window independently — total
//! `10 * capacity` allowed. This is the right default for "log-flood
//! DoS on the audit pipeline" (the threat model is each replica's
//! local logging budget); a coordinated attacker would still need to
//! flood every replica. Distributed substrates (KVRocks, Redis) come
//! later when use cases need per-account global limits (Phase 10.11+).

use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};

use async_trait::async_trait;

use super::RateLimitKey;

/// Per-source rate-limiting port (M49).
///
/// Returns `true` when the call is admitted (under quota), `false`
/// when rate-limited. Adapters are responsible for atomic
/// check-and-decrement semantics — naive split shapes
/// (`check + decrement`) leak admits under concurrent calls.
/// Single-method enforces atomicity at the trait surface.
///
/// `&self` (not `&mut self`) so a single limiter serves many
/// concurrent callers; interior mutability lives in the adapter.
#[async_trait]
pub trait RateLimiter: std::fmt::Debug + Send + Sync {
    async fn allow(&self, key: &RateLimitKey) -> bool;
}

/// In-memory token-bucket limiter — Phase 9 default substrate.
///
/// Per-key independent buckets. Each bucket holds up to `capacity`
/// tokens; one token is consumed per `allow` call. Buckets refill at
/// `capacity / window` tokens per second.
///
/// Algorithm: **lazy refill on access** — no background timer needed.
/// Bucket state is `(tokens_remaining, last_refill_instant)`. On every
/// `allow`, compute elapsed since `last_refill`, add proportional
/// tokens (capped at `capacity` so a long-idle bucket doesn't bank
/// more than its instantaneous quota), then attempt consume.
///
/// **Capacity semantics**: how many failures tolerated in a refill
/// window. For "audit emission rate-limiting" with `capacity=10`,
/// `window=60s`: each (client, kid) bucket admits 10 audit events
/// per minute, then refuses until the next refill epoch. Bursts of 10
/// admit instantly; sustained pressure drops to 1-per-6s steady state.
#[derive(Debug)]
pub struct MemoryRateLimiter {
    capacity: u32,
    window: Duration,
    buckets: Mutex<HashMap<RateLimitKey, Bucket>>,
}

#[derive(Debug)]
struct Bucket {
    tokens: f64,
    last_refill: Instant,
}

impl MemoryRateLimiter {
    /// Construct a limiter with `capacity` tokens per `window`.
    ///
    /// `assert!` invariants — these are configuration bugs that
    /// should fail loudly at startup, not silently at first failure
    /// (per `feedback_audit_grilled_decisions` "easy path" check).
    /// Both arguments are user-supplied at builder time, so this
    /// runs once per verifier construction, not on the hot path.
    ///
    /// # Panics
    ///
    /// - if `capacity == 0` (every call would refuse)
    /// - if `window` is zero (refill rate undefined)
    #[must_use]
    pub fn new(capacity: u32, window: Duration) -> Self {
        assert!(capacity > 0, "RateLimiter capacity must be > 0");
        assert!(!window.is_zero(), "RateLimiter window must be > 0");
        Self {
            capacity,
            window,
            buckets: Mutex::new(HashMap::new()),
        }
    }

    fn refill_per_second(&self) -> f64 {
        f64::from(self.capacity) / self.window.as_secs_f64()
    }
}

impl Default for MemoryRateLimiter {
    /// Default: 10 admits per 60s per key. Reasonable for audit
    /// emission throttling — one event every 6s per source under
    /// sustained pressure; bursts of 10 admit instantly.
    fn default() -> Self {
        Self::new(10, Duration::from_secs(60))
    }
}

#[async_trait]
impl RateLimiter for MemoryRateLimiter {
    async fn allow(&self, key: &RateLimitKey) -> bool {
        let mut buckets = self
            .buckets
            .lock()
            .unwrap_or_else(|poisoned| poisoned.into_inner());
        let now = Instant::now();
        let bucket = buckets.entry(key.clone()).or_insert_with(|| Bucket {
            tokens: f64::from(self.capacity),
            last_refill: now,
        });

        // Lazy refill: add tokens based on elapsed wall-clock since
        // last access. The `.min(capacity)` cap prevents banked tokens
        // — a long-idle bucket grants only its instantaneous quota,
        // not the full elapsed-time × refill-rate (which would let an
        // attacker "save up" by going silent before a burst).
        let elapsed = now.saturating_duration_since(bucket.last_refill);
        let earned = elapsed.as_secs_f64() * self.refill_per_second();
        bucket.tokens = (bucket.tokens + earned).min(f64::from(self.capacity));
        bucket.last_refill = now;

        if bucket.tokens >= 1.0 {
            bucket.tokens -= 1.0;
            true
        } else {
            false
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Bucket admits up to `capacity` calls in immediate succession.
    /// `capacity=3` proves the parameter is honored, not collapsed.
    #[tokio::test]
    async fn bucket_admits_under_quota() {
        let limiter = MemoryRateLimiter::new(3, Duration::from_secs(60));
        let key = RateLimitKey::new("rcw::k1");
        assert!(limiter.allow(&key).await);
        assert!(limiter.allow(&key).await);
        assert!(limiter.allow(&key).await);
    }

    /// (capacity + 1)-th call refuses. The 60s window guarantees no
    /// refill on this CPU, so no sleep needed — pure logic test.
    #[tokio::test]
    async fn bucket_refuses_at_quota() {
        let limiter = MemoryRateLimiter::new(2, Duration::from_secs(60));
        let key = RateLimitKey::new("rcw::k1");
        assert!(limiter.allow(&key).await);
        assert!(limiter.allow(&key).await);
        assert!(!limiter.allow(&key).await);
    }

    /// After the window passes, the bucket refills. Tiny window
    /// (50ms) so the test runs in real time without flakiness.
    #[tokio::test]
    async fn bucket_refills_after_window() {
        let limiter = MemoryRateLimiter::new(1, Duration::from_millis(50));
        let key = RateLimitKey::new("rcw::k1");
        assert!(limiter.allow(&key).await);
        assert!(!limiter.allow(&key).await);
        // Sleep slightly longer than window for full refill.
        tokio::time::sleep(Duration::from_millis(75)).await;
        assert!(limiter.allow(&key).await);
    }

    /// Different keys have independent buckets — exhausting one does
    /// not affect another. Critical for the per-(client, kid) bucket
    /// strategy from Phase 9 design call (e). A regression here would
    /// silently bucket all sources together (the (d) "global" pattern),
    /// defeating the per-source attribution.
    #[tokio::test]
    async fn key_isolation_keeps_buckets_independent() {
        let limiter = MemoryRateLimiter::new(1, Duration::from_secs(60));
        let key_a = RateLimitKey::new("rcw::k1");
        let key_b = RateLimitKey::new("rcw::k2"); // same client, different kid
        let key_c = RateLimitKey::new("ctw::k1"); // different client

        assert!(limiter.allow(&key_a).await);
        assert!(limiter.allow(&key_b).await);
        assert!(limiter.allow(&key_c).await);

        // All three exhausted; further calls refuse independently.
        assert!(!limiter.allow(&key_a).await);
        assert!(!limiter.allow(&key_b).await);
        assert!(!limiter.allow(&key_c).await);
    }

    /// Default constructor (10 / 60s) admits initial burst of 10,
    /// then refuses. Documents the magic numbers in the code.
    #[tokio::test]
    async fn default_admits_initial_burst_then_refuses() {
        let limiter = MemoryRateLimiter::default();
        let key = RateLimitKey::new("default-test");
        for _ in 0..10 {
            assert!(limiter.allow(&key).await);
        }
        assert!(!limiter.allow(&key).await);
    }

    /// Compile-time guard: a `MemoryRateLimiter` is usable behind
    /// `Arc<dyn RateLimiter>` — the runtime shape adapter wrappers
    /// inject. Object-safety regression catches generic-method
    /// additions at compile time.
    #[allow(dead_code)]
    fn dyn_object_safety() {
        use std::sync::Arc;
        let _: Arc<dyn RateLimiter> = Arc::new(MemoryRateLimiter::default());
    }

    /// Banking attack defense: a bucket idle past `window` does NOT
    /// grant more than `capacity` tokens. Without the `.min(capacity)`
    /// cap, an attacker who waits 10×window then bursts would get
    /// 10×capacity admits.
    #[tokio::test]
    async fn idle_bucket_does_not_bank_beyond_capacity() {
        let limiter = MemoryRateLimiter::new(2, Duration::from_millis(20));
        let key = RateLimitKey::new("attacker");

        // Initial burst: 2 admits.
        assert!(limiter.allow(&key).await);
        assert!(limiter.allow(&key).await);
        assert!(!limiter.allow(&key).await);

        // Idle for 10× window (200ms) — without the cap, we'd accrue
        // 2 × 10 = 20 tokens. With the cap, max 2.
        tokio::time::sleep(Duration::from_millis(200)).await;

        assert!(limiter.allow(&key).await);
        assert!(limiter.allow(&key).await);
        assert!(!limiter.allow(&key).await, "cap must hold");
    }
}