pas-external 0.12.0

Ppoppo Accounts System (PAS) external SDK — OAuth2 PKCE, JWT verification port, Axum middleware, session liveness
Documentation
//! [`InProcessTtlCache`] — per-pod in-process [`super::Cache`] impl.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use ppoppo_clock::ArcClock;
use ppoppo_clock::native::WallClock;
use tokio::sync::RwLock;

use super::Cache;

/// In-process TTL cache backing the [`super::Cache`] port. Per-pod —
/// cross-pod convergence happens at TTL expiry, NOT via shared
/// substrate. Default for RCW/CTW Slice 4/5 wiring (RFC §3.5 Row 5*).
///
/// ## TTL semantics
///
/// Entries are evicted lazily: a `get` past `inserted_at + ttl` returns
/// `None` and removes the row. There is no background reaper; memory
/// reclamation happens at next access. For pre-launch workloads with
/// bounded user counts this is appropriate; production hot-paths with
/// millions of unique subs should swap in a bounded-capacity Cache impl
/// (e.g. an LRU). The capacity question is intentionally NOT folded
/// into this struct — `STANDARDS_SHARED_CACHE.md` §3.1 documents the
/// canonical key shape, and bounded-capacity is a substrate concern
/// not visible at the port surface.
///
/// ## Concurrency
///
/// `RwLock<HashMap>` — reads scale; writes serialize. Under sv-axis
/// load profile (95%+ cache hits past warmup, writes only on miss),
/// the lock split is well-suited.
///
/// ## Why not shared KVRocks
///
/// Shared KVRocks (`STANDARDS_SHARED_CACHE.md` §3.1) is the textbook
/// chat-as-infrastructure substrate — sub-millisecond reads,
/// real-time cross-pod convergence, single canonical write site (PAS
/// on break-glass). 11.Z deliberately defers KVRocks ACL extension to
/// RCW/CTW (per user direction 2026-05-09). Wiring is a one-line
/// `Arc<dyn Cache>` swap when ACL extends in 11.AB+.
pub struct InProcessTtlCache {
    inner: RwLock<HashMap<String, (i64, i64)>>,
    ttl: Duration,
    clock: ArcClock,
}

impl InProcessTtlCache {
    /// Build with the specified TTL. Use [`super::SV_CACHE_TTL`]
    /// (re-exported from `ppoppo-token`, currently 60 seconds) to match
    /// the canonical `sv:{sub}` cache contract.
    #[must_use]
    pub fn new(ttl: Duration) -> Self {
        Self {
            inner: RwLock::new(HashMap::new()),
            ttl,
            clock: Arc::new(WallClock),
        }
    }

    #[must_use]
    pub fn with_clock(mut self, clock: ArcClock) -> Self {
        self.clock = clock;
        self
    }
}

impl std::fmt::Debug for InProcessTtlCache {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("InProcessTtlCache")
            .field("ttl", &self.ttl)
            .finish_non_exhaustive()
    }
}

#[async_trait]
impl Cache for InProcessTtlCache {
    async fn get(&self, key: &str) -> Option<i64> {
        let now_ms = self.clock.now_unix_millis();
        let ttl_ms = self.ttl.as_millis() as i64;
        // Read-fast path: hit + not-expired returns immediately under
        // the read guard. Expired-or-miss falls through to the write
        // path (eviction if expired, return None either way).
        {
            let read = self.inner.read().await;
            if let Some((sv, inserted_at_ms)) = read.get(key) {
                if now_ms - inserted_at_ms < ttl_ms {
                    return Some(*sv);
                }
            } else {
                return None;
            }
        }

        // Expired entry — acquire writer to evict so memory reclaims.
        let mut write = self.inner.write().await;
        if let Some((_, inserted_at_ms)) = write.get(key) {
            if now_ms - inserted_at_ms >= ttl_ms {
                write.remove(key);
            }
        }
        None
    }

    async fn set(&self, key: &str, sv: i64, _ttl: Duration) {
        // We use this cache's own configured TTL, not the per-call
        // value — the call-site `ttl` is the contract surface for
        // backends with native expiry (Redis `SETEX`, KVRocks). The
        // in-process impl's TTL is fixed at construction.
        let mut write = self.inner.write().await;
        write.insert(key.to_string(), (sv, self.clock.now_unix_millis()));
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn miss_returns_none() {
        let cache = InProcessTtlCache::new(Duration::from_secs(60));
        assert_eq!(cache.get("sv:nonexistent").await, None);
    }

    #[tokio::test]
    async fn hit_after_set() {
        let cache = InProcessTtlCache::new(Duration::from_secs(60));
        cache.set("sv:abc", 7, Duration::from_secs(60)).await;
        assert_eq!(cache.get("sv:abc").await, Some(7));
    }

    #[tokio::test]
    async fn expired_entry_evicts_on_access() {
        let cache = InProcessTtlCache::new(Duration::from_millis(10));
        cache.set("sv:abc", 5, Duration::from_millis(10)).await;
        assert_eq!(cache.get("sv:abc").await, Some(5));
        tokio::time::sleep(Duration::from_millis(25)).await;
        assert_eq!(cache.get("sv:abc").await, None);

        // Eviction occurred — the next set succeeds and returns the
        // new value, not a stale ghost.
        cache.set("sv:abc", 11, Duration::from_secs(60)).await;
        assert_eq!(cache.get("sv:abc").await, Some(11));
    }

    #[tokio::test]
    async fn set_overwrites_existing_entry() {
        let cache = InProcessTtlCache::new(Duration::from_secs(60));
        cache.set("sv:abc", 7, Duration::from_secs(60)).await;
        cache.set("sv:abc", 8, Duration::from_secs(60)).await;
        assert_eq!(cache.get("sv:abc").await, Some(8));
    }

    #[tokio::test]
    async fn distinct_keys_dont_collide() {
        let cache = InProcessTtlCache::new(Duration::from_secs(60));
        cache.set("sv:alpha", 1, Duration::from_secs(60)).await;
        cache.set("sv:beta", 2, Duration::from_secs(60)).await;
        assert_eq!(cache.get("sv:alpha").await, Some(1));
        assert_eq!(cache.get("sv:beta").await, Some(2));
    }
}