ppoppo-token 0.3.0

JWT (RFC 9068, EdDSA) issuance + verification engine for the Ppoppo ecosystem. Single deep module with a small interface (issue, verify) hiding RFC 8725 mitigations M01-M45, JWKS handling, and substrate ports (epoch, session, replay).
Documentation
//! M37 propagation observability — engine layer (Phase 5 commit 5.3).
//!
//! Verifies that every port call site emits the canonical
//! `revocation.checked` tracing event so ops dashboards can measure
//! the ≤5min SLA. The substrate-side propagation mechanism (KVRocks
//! TTL + push paths) lives outside ppoppo-token; the engine's
//! responsibility is to emit the data needed to MEASURE it.
//!
//! Field contract (locked here, dashboards key off):
//!
//! - `target  = "ppoppo_token::revocation"` — RUST_LOG filter selector
//! - `message = "revocation.checked"`       — captured into the `message`
//!   field by `tracing`'s `Display`-formatted final string arg
//! - `port    = "replay" | "session" | "epoch"` — which axis fired
//! - `outcome = "admit" | "reject" | "transient"`
//! - `reason  = "replayed" | "revoked" | "stale"` (only on reject)
//! - `sub`    — subject ULID for per-account correlation
//!
//! Drift between this contract and the engine emit sites would silently
//! break operator dashboards — these tests are the regression guard.

#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]

mod common;

use std::sync::Arc;
use std::time::Duration;

use common::{CapturedEvent, EventCapture, MemoryReplayDefense, MemorySessionRevocation};
use ppoppo_token::access_token::{IssueConfig, IssueRequest, VerifyConfig, issue, verify};
use ppoppo_token::{SigningKey};
use tracing_subscriber::Registry;
use tracing_subscriber::layer::SubscriberExt;

const TEST_SUB: &str = "01HSAB00000000000000000000";
const TEST_CLIENT_ID: &str = "ppoppo-internal";
const TEST_SID: &str = "01HSESSION0000000000000000";
const TTL_15M: Duration = Duration::from_secs(900);
const ISSUER: &str = "https://accounts.ppoppo.com";
const AUDIENCE: &str = "ppoppo";

fn mint_token(signer: &SigningKey, with_sid: bool) -> String {
    let issue_cfg = IssueConfig::access_token(ISSUER, AUDIENCE, signer.kid());
    let mut req = IssueRequest::new(TEST_SUB, TEST_CLIENT_ID, TTL_15M);
    if with_sid {
        req = req.with_sid(TEST_SID);
    }
    issue(&req, &issue_cfg, signer, time::OffsetDateTime::now_utc().unix_timestamp()).expect("issue should succeed")
}

/// Find the single revocation event matching `port` + `outcome`.
/// Asserts uniqueness — multiple matches mean the engine fired the
/// event twice for one verify call (regression).
fn one_event(events: &[CapturedEvent], port: &str, outcome: &str) -> CapturedEvent {
    let matches: Vec<_> = events
        .iter()
        .filter(|e| {
            e.fields.get("port").map(|s| s.as_str()) == Some(port)
                && e.fields.get("outcome").map(|s| s.as_str()) == Some(outcome)
        })
        .collect();
    assert_eq!(
        matches.len(),
        1,
        "expected exactly one event with port={port:?} outcome={outcome:?}, got {}: {:?}",
        matches.len(),
        events,
    );
    matches[0].clone()
}

// ── M35 replay port observability ────────────────────────────────────────

#[tokio::test]
async fn replay_admit_emits_revocation_checked_admit() {
    let capture = EventCapture::default();
    let subscriber = Registry::default().with(capture.clone());
    let _guard = tracing::subscriber::set_default(subscriber);

    let (signer, key_set) = SigningKey::test_pair();
    let token = mint_token(&signer, false);
    let port = Arc::new(MemoryReplayDefense::new());
    let cfg = VerifyConfig::access_token(ISSUER, AUDIENCE).with_replay_defense(port);
    verify(&token, &cfg, &key_set, time::OffsetDateTime::now_utc().unix_timestamp()).await.expect("verify");

    drop(_guard);

    let events = capture.revocation_events();
    let event = one_event(&events, "replay", "admit");
    assert_eq!(
        event.fields.get("message").map(String::as_str),
        Some("revocation.checked"),
        "message field carries the canonical event name",
    );
    assert_eq!(event.fields.get("sub").map(String::as_str), Some(TEST_SUB));
    assert_eq!(event.level, "TRACE", "admit fires at TRACE (high-frequency path)");
}

#[tokio::test]
async fn replay_replayed_emits_reject_with_reason_replayed() {
    let capture = EventCapture::default();
    let subscriber = Registry::default().with(capture.clone());
    let _guard = tracing::subscriber::set_default(subscriber);

    let (signer, key_set) = SigningKey::test_pair();
    let token = mint_token(&signer, false);
    let port = Arc::new(MemoryReplayDefense::new());
    let cfg = VerifyConfig::access_token(ISSUER, AUDIENCE).with_replay_defense(port);
    verify(&token, &cfg, &key_set, time::OffsetDateTime::now_utc().unix_timestamp()).await.expect("first verify admits");
    let _ = verify(&token, &cfg, &key_set, time::OffsetDateTime::now_utc().unix_timestamp()).await; // second rejects

    drop(_guard);

    let events = capture.revocation_events();
    let event = one_event(&events, "replay", "reject");
    assert_eq!(
        event.fields.get("reason").map(String::as_str),
        Some("replayed"),
    );
    assert_eq!(event.level, "WARN", "reject fires at WARN (security-relevant)");
}

#[tokio::test]
async fn replay_transient_emits_transient_outcome() {
    let capture = EventCapture::default();
    let subscriber = Registry::default().with(capture.clone());
    let _guard = tracing::subscriber::set_default(subscriber);

    let (signer, key_set) = SigningKey::test_pair();
    let token = mint_token(&signer, false);
    let port = Arc::new(MemoryReplayDefense::failing());
    let cfg = VerifyConfig::access_token(ISSUER, AUDIENCE).with_replay_defense(port);
    let _ = verify(&token, &cfg, &key_set, time::OffsetDateTime::now_utc().unix_timestamp()).await;

    drop(_guard);

    let event = one_event(&capture.revocation_events(), "replay", "transient");
    assert!(
        event.fields.contains_key("detail"),
        "transient event must carry adapter detail for ops triage",
    );
    assert_eq!(event.level, "WARN", "transient fires at WARN (ops-relevant)");
}

// ── M36 session port observability ───────────────────────────────────────

#[tokio::test]
async fn session_admit_emits_admit_outcome() {
    let capture = EventCapture::default();
    let subscriber = Registry::default().with(capture.clone());
    let _guard = tracing::subscriber::set_default(subscriber);

    let (signer, key_set) = SigningKey::test_pair();
    let token = mint_token(&signer, true);
    let port = Arc::new(MemorySessionRevocation::new());
    let cfg = VerifyConfig::access_token(ISSUER, AUDIENCE).with_session_revocation(port);
    verify(&token, &cfg, &key_set, time::OffsetDateTime::now_utc().unix_timestamp()).await.expect("verify");

    drop(_guard);

    let event = one_event(&capture.revocation_events(), "session", "admit");
    assert_eq!(
        event.fields.get("message").map(String::as_str),
        Some("revocation.checked"),
    );
    assert_eq!(event.fields.get("sub").map(String::as_str), Some(TEST_SUB));
}

#[tokio::test]
async fn session_revoked_emits_reject_with_reason_revoked() {
    let capture = EventCapture::default();
    let subscriber = Registry::default().with(capture.clone());
    let _guard = tracing::subscriber::set_default(subscriber);

    let (signer, key_set) = SigningKey::test_pair();
    let token = mint_token(&signer, true);
    let port = Arc::new(MemorySessionRevocation::new());
    port.revoke(TEST_SUB, TEST_SID);
    let cfg = VerifyConfig::access_token(ISSUER, AUDIENCE).with_session_revocation(port);
    let _ = verify(&token, &cfg, &key_set, time::OffsetDateTime::now_utc().unix_timestamp()).await;

    drop(_guard);

    let event = one_event(&capture.revocation_events(), "session", "reject");
    assert_eq!(
        event.fields.get("reason").map(String::as_str),
        Some("revoked"),
    );
}

#[tokio::test]
async fn session_transient_emits_transient_outcome() {
    let capture = EventCapture::default();
    let subscriber = Registry::default().with(capture.clone());
    let _guard = tracing::subscriber::set_default(subscriber);

    let (signer, key_set) = SigningKey::test_pair();
    let token = mint_token(&signer, true);
    let port = Arc::new(MemorySessionRevocation::failing());
    let cfg = VerifyConfig::access_token(ISSUER, AUDIENCE).with_session_revocation(port);
    let _ = verify(&token, &cfg, &key_set, time::OffsetDateTime::now_utc().unix_timestamp()).await;

    drop(_guard);

    let event = one_event(&capture.revocation_events(), "session", "transient");
    assert!(event.fields.contains_key("detail"));
}

// ── Short-circuit silence ────────────────────────────────────────────────

#[tokio::test]
async fn no_event_when_replay_port_is_none() {
    // SLA dashboards count substrate calls — short-circuited verifies
    // (no port wired) must NOT contribute. Otherwise propagation
    // latency averages get diluted by no-op verifies.
    let capture = EventCapture::default();
    let subscriber = Registry::default().with(capture.clone());
    let _guard = tracing::subscriber::set_default(subscriber);

    let (signer, key_set) = SigningKey::test_pair();
    let token = mint_token(&signer, false);
    let cfg = VerifyConfig::access_token(ISSUER, AUDIENCE);
    verify(&token, &cfg, &key_set, time::OffsetDateTime::now_utc().unix_timestamp()).await.expect("verify");

    drop(_guard);

    assert!(
        capture.revocation_events().is_empty(),
        "no port wired → no event (SLA dashboards must not count short-circuits)",
    );
}

#[tokio::test]
async fn no_session_event_when_token_has_no_sid() {
    // Same dashboard-purity rule for the session axis: tokens without
    // `sid` short-circuit the gate without consulting the port.
    let capture = EventCapture::default();
    let subscriber = Registry::default().with(capture.clone());
    let _guard = tracing::subscriber::set_default(subscriber);

    let (signer, key_set) = SigningKey::test_pair();
    let token = mint_token(&signer, false); // no sid
    let port = Arc::new(MemorySessionRevocation::new());
    let cfg = VerifyConfig::access_token(ISSUER, AUDIENCE).with_session_revocation(port);
    verify(&token, &cfg, &key_set, time::OffsetDateTime::now_utc().unix_timestamp()).await.expect("verify");

    drop(_guard);

    let session_events: Vec<_> = capture
        .revocation_events()
        .into_iter()
        .filter(|e| e.fields.get("port").map(String::as_str) == Some("session"))
        .collect();
    assert!(
        session_events.is_empty(),
        "no sid → session gate skipped → no session event, got {session_events:?}",
    );
}

// ── Cross-axis: replay + session in one verify ──────────────────────────

#[tokio::test]
async fn both_ports_emit_distinct_events_in_one_verify() {
    // When both ports are wired and the token has an sid, both ports
    // run and each emits its own event. Field shape: same `name` and
    // `target`, distinct `port` field. Regression guard against
    // accidental event-name collision between axes.
    let capture = EventCapture::default();
    let subscriber = Registry::default().with(capture.clone());
    let _guard = tracing::subscriber::set_default(subscriber);

    let (signer, key_set) = SigningKey::test_pair();
    let token = mint_token(&signer, true);
    let cfg = VerifyConfig::access_token(ISSUER, AUDIENCE)
        .with_replay_defense(Arc::new(MemoryReplayDefense::new()))
        .with_session_revocation(Arc::new(MemorySessionRevocation::new()));
    verify(&token, &cfg, &key_set, time::OffsetDateTime::now_utc().unix_timestamp()).await.expect("verify");

    drop(_guard);

    let events = capture.revocation_events();
    one_event(&events, "replay", "admit");
    one_event(&events, "session", "admit");
    assert_eq!(events.len(), 2, "exactly 2 events: replay + session, got {events:?}");
}