ppoppo-sdk-core 0.2.0

Internal shared primitives for the Ppoppo SDK family (pas-external, pas-plims, pcs-external) — verifier port, audit trait, session liveness port, OIDC discovery, perimeter Bearer-auth Layer kit, identity types. Not a stable public API; do not depend on this crate directly. Consume the SDK crates that re-export from it (e.g. `pas-external`).
Documentation
//! `RateLimitedAuditSink` — composition adapter wrapping any
//! [`AuditSink`] with any [`RateLimiter`] (refinement #1 from
//! Phase 9 deep-module audit).
//!
//! See module-level docs in `audit/mod.rs` for the broader
//! "composition over orchestration" rationale: `PasJwtVerifier` holds
//! ONE port (`Arc<dyn AuditSink>`), and the consumer chooses whether
//! to wrap a real sink in this layer for log-flood DoS defense (M49).
//!
//! ── Why non-generic ─────────────────────────────────────────────────────
//!
//! The verifier always erases to `Arc<dyn AuditSink>`. A generic
//! `RateLimitedAuditSink<S, L>` would defer the type erasure one frame
//! and force every test fixture to spell out the concrete types. The
//! two virtual calls per failure (inner sink + limiter) are
//! negligible against the audit emission cost itself — failures are
//! rare and the limiter substrate is the slow path.
//!
//! ── Why a counter, not a log ────────────────────────────────────────────
//!
//! The whole point of M49 is to prevent log spam on the audit pipeline.
//! Logging "I rate-limited an event" once per drop *partially defeats
//! that*. Instead we keep an `AtomicU64` counter readable via
//! [`RateLimitedAuditSink::dropped_total`]; the consumer holds an
//! `Arc<RateLimitedAuditSink>` outside the verifier (before coercing
//! to `Arc<dyn AuditSink>`) and reads the counter on demand from
//! health probes / ops dashboards.

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use async_trait::async_trait;

use super::{AuditEvent, AuditSink, RateLimiter};

/// Wraps any [`AuditSink`] with any [`RateLimiter`].
///
/// On every [`AuditSink::record_failure`] call:
///
/// 1. Compute `event.rate_limit_key()` (Phase 9 design call (e):
///    compound `client_id_hint ‖ kid_hint`).
/// 2. Consult the limiter: if admitted, forward to inner sink.
/// 3. If denied, increment `dropped_total` and silently drop the event.
///
/// The verify hot path NEVER blocks on rate-limit denial — the dropped
/// event simply never reaches the inner sink.
///
/// **Ops observation pattern** (per refinement #1 audit): hold an
/// `Arc<RateLimitedAuditSink>` outside the verifier wiring, then
/// coerce a clone to `Arc<dyn AuditSink>` for the verifier:
///
/// ```ignore
/// use std::sync::Arc;
/// use pas_external::{
///     AuditSink, MemoryRateLimiter, RateLimitedAuditSink, RateLimiter,
/// };
/// # let real_sink: Arc<dyn AuditSink> = unimplemented!();
/// # let verifier: pas_external::PasJwtVerifier = unimplemented!();
/// let limited = Arc::new(RateLimitedAuditSink::new(
///     real_sink,
///     Arc::new(MemoryRateLimiter::default()),
/// ));
/// let dropped_handle = Arc::clone(&limited);
/// let verifier = verifier.with_audit(limited as Arc<dyn AuditSink>);
/// // Later, in a health probe:
/// let _ = dropped_handle.dropped_total();
/// ```
#[derive(Debug)]
pub struct RateLimitedAuditSink {
    inner: Arc<dyn AuditSink>,
    limiter: Arc<dyn RateLimiter>,
    dropped_total: AtomicU64,
}

impl RateLimitedAuditSink {
    /// Compose an inner sink with a limiter.
    #[must_use]
    pub fn new(inner: Arc<dyn AuditSink>, limiter: Arc<dyn RateLimiter>) -> Self {
        Self {
            inner,
            limiter,
            dropped_total: AtomicU64::new(0),
        }
    }

    /// Total events dropped due to rate-limit denial since
    /// construction.
    ///
    /// Useful for ops dashboards / health probes that want to detect
    /// "we're suppressing audit signal because of upstream pressure."
    /// `Relaxed` ordering — the counter is monotonic and readers
    /// don't need cross-thread happens-before guarantees against
    /// audit emissions.
    #[must_use]
    pub fn dropped_total(&self) -> u64 {
        self.dropped_total.load(Ordering::Relaxed)
    }
}

#[async_trait]
impl AuditSink for RateLimitedAuditSink {
    async fn record_failure(&self, event: AuditEvent) {
        let key = event.rate_limit_key();
        if self.limiter.allow(&key).await {
            self.inner.record_failure(event).await;
        } else {
            self.dropped_total.fetch_add(1, Ordering::Relaxed);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::audit::{
        AuditEvent, MemoryAuditSink, MemoryRateLimiter, RateLimiter, VerifyErrorKind,
    };
    use std::collections::BTreeMap;
    use std::time::Duration;
    use time::OffsetDateTime;

    fn fixture(source_id: &str) -> AuditEvent {
        AuditEvent {
            kind: VerifyErrorKind::SignatureInvalid,
            occurred_at: OffsetDateTime::UNIX_EPOCH,
            source_id: source_id.to_owned(),
            client_id_hint: None,
            kid_hint: None,
            metadata: BTreeMap::new(),
        }
    }

    /// Sanity: forwards admitted events through to the inner sink.
    #[tokio::test]
    async fn admitted_event_reaches_inner_sink() {
        let memory = Arc::new(MemoryAuditSink::new());
        let limiter: Arc<dyn RateLimiter> =
            Arc::new(MemoryRateLimiter::new(10, Duration::from_secs(60)));
        let limited =
            RateLimitedAuditSink::new(memory.clone() as Arc<dyn AuditSink>, limiter);

        limited.record_failure(fixture("rcw::k1")).await;
        assert_eq!(memory.len(), 1);
        assert_eq!(limited.dropped_total(), 0);
    }

    /// Per-key independence: exhausting one key does not affect the
    /// other (Phase 9 design call (e) compound key).
    #[tokio::test]
    async fn dropped_total_only_counts_actual_drops() {
        let memory = Arc::new(MemoryAuditSink::new());
        let limiter: Arc<dyn RateLimiter> =
            Arc::new(MemoryRateLimiter::new(1, Duration::from_secs(60)));
        let limited =
            RateLimitedAuditSink::new(memory.clone() as Arc<dyn AuditSink>, limiter);

        // 1 admit + 2 drops on the same key.
        limited.record_failure(fixture("rcw::k1")).await;
        limited.record_failure(fixture("rcw::k1")).await;
        limited.record_failure(fixture("rcw::k1")).await;
        assert_eq!(memory.len(), 1);
        assert_eq!(limited.dropped_total(), 2);

        // Different key has its own bucket.
        limited.record_failure(fixture("rcw::k2")).await;
        assert_eq!(memory.len(), 2);
        assert_eq!(limited.dropped_total(), 2);
    }
}