openlatch-provider 0.0.0

Self-service onboarding CLI + runtime daemon for OpenLatch Editors and Providers
//! Replay-cache for inbound `webhook-id`s.
//!
//! Standard Webhooks v1 timestamp skew (±5 min) bounds freshness, but a stale
//! attacker that captured a request inside that window could replay it
//! verbatim. The cache is the second-line defense: a `webhook-id` we've
//! already processed comes back as the *previously cached signed verdict*
//! (idempotent — see Codex review #6 in `INDEX.md`), never as a fresh proxy
//! call, and never as a 4xx.
//!
//! TTL is set to match the Standard Webhooks skew window (5 min). LRU
//! eviction caps memory at `capacity` entries; on a 4-core host receiving 500
//! events/sec, the cache holds about one second of history before pressure —
//! plenty to absorb retry storms without unbounded growth.
//!
//! `parking_lot::Mutex` would shave nanoseconds; we use `std::sync::Mutex`
//! for stdlib-only blast-radius. P3.T4 has a criterion bench that flips to a
//! sharded LruCache if p95 contention insert exceeds 10 µs at 1000-inflight.

use std::num::NonZeroUsize;
use std::sync::Mutex;
use std::time::{Duration, Instant};

use bytes::Bytes;
use lru::LruCache;

/// What we replay back to the platform on a cache hit. Stored verbatim so
/// every retry produces a byte-identical response (including the signature
/// over the original timestamp — the platform recomputes against the headers
/// we replay, not against `now()`).
#[derive(Debug, Clone)]
pub struct CachedVerdict {
    pub status: u16,
    pub body: Bytes,
    pub webhook_id: String,
    pub webhook_timestamp: i64,
    pub webhook_signature: String,
    pub processing_ms: u64,
}

pub struct ReplayCache {
    inner: Mutex<LruCache<String, (Instant, CachedVerdict)>>,
    ttl: Duration,
}

impl ReplayCache {
    pub fn new(capacity: usize, ttl: Duration) -> Self {
        let cap = NonZeroUsize::new(capacity.max(1)).expect("non-zero (clamped above)");
        Self {
            inner: Mutex::new(LruCache::new(cap)),
            ttl,
        }
    }

    /// Spec defaults: 1000 entries, 5-min TTL.
    pub fn standard() -> Self {
        Self::new(1000, Duration::from_secs(300))
    }

    /// Look up by `webhook-id`. Returns the cached verdict on hit (and refreshes
    /// LRU recency). Expired entries are evicted lazily.
    pub fn lookup(&self, webhook_id: &str) -> Option<CachedVerdict> {
        let mut cache = self.inner.lock().expect("replay cache poisoned");
        let entry = cache.peek(webhook_id).cloned();
        match entry {
            Some((ts, _)) if ts.elapsed() > self.ttl => {
                cache.pop(webhook_id);
                None
            }
            Some(_) => {
                // promote to MRU
                cache.get(webhook_id).map(|(_, v)| v.clone())
            }
            None => None,
        }
    }

    pub fn insert(&self, webhook_id: String, verdict: CachedVerdict) {
        let mut cache = self.inner.lock().expect("replay cache poisoned");
        cache.put(webhook_id, (Instant::now(), verdict));
    }

    pub fn len(&self) -> usize {
        self.inner.lock().expect("replay cache poisoned").len()
    }

    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn fixture(id: &str) -> CachedVerdict {
        CachedVerdict {
            status: 200,
            body: Bytes::from_static(b"{}"),
            webhook_id: id.into(),
            webhook_timestamp: 1_700_000_000,
            webhook_signature: "v1,Zm9v".into(),
            processing_ms: 5,
        }
    }

    #[test]
    fn lookup_returns_none_for_missing_id() {
        let c = ReplayCache::standard();
        assert!(c.lookup("msg_x").is_none());
    }

    #[test]
    fn insert_then_lookup_round_trips() {
        let c = ReplayCache::standard();
        c.insert("msg_a".into(), fixture("msg_a"));
        let got = c.lookup("msg_a").expect("should hit");
        assert_eq!(got.webhook_id, "msg_a");
        assert_eq!(got.body, Bytes::from_static(b"{}"));
    }

    #[test]
    fn lookup_evicts_expired_entries() {
        let c = ReplayCache::new(8, Duration::from_millis(10));
        c.insert("msg_a".into(), fixture("msg_a"));
        std::thread::sleep(Duration::from_millis(25));
        assert!(c.lookup("msg_a").is_none());
        // The expired entry was popped — the cache is empty now.
        assert!(c.is_empty());
    }

    #[test]
    fn lru_evicts_oldest_when_capacity_exceeded() {
        let c = ReplayCache::new(2, Duration::from_secs(60));
        c.insert("a".into(), fixture("a"));
        c.insert("b".into(), fixture("b"));
        c.insert("c".into(), fixture("c"));
        assert!(c.lookup("a").is_none(), "a should have been evicted");
        assert!(c.lookup("b").is_some());
        assert!(c.lookup("c").is_some());
    }

    #[test]
    fn lookup_refreshes_recency() {
        let c = ReplayCache::new(2, Duration::from_secs(60));
        c.insert("a".into(), fixture("a"));
        c.insert("b".into(), fixture("b"));
        // Touch `a` so `b` becomes the LRU candidate.
        let _ = c.lookup("a");
        c.insert("c".into(), fixture("c"));
        assert!(c.lookup("a").is_some(), "a was promoted, must survive");
        assert!(c.lookup("b").is_none(), "b should have been evicted");
    }

    #[test]
    fn cache_remains_useful_at_zero_capacity_clamp() {
        // We clamp capacity to 1 minimum; LruCache::new requires NonZero.
        let c = ReplayCache::new(0, Duration::from_secs(1));
        c.insert("only".into(), fixture("only"));
        assert!(c.lookup("only").is_some());
    }
}