actr-hyper 0.2.1

Hyper — Actor platform infrastructure: sandbox, transport, scheduler, WASM engine, signing, AIS bootstrap, persistence & crypto primitives
Documentation
//! Request deduplication with TTL-based response caching.
//!
//! ## Design
//!
//! When a caller retries a request (e.g. after a transient send failure), the receiver
//! may process the same `request_id` twice, leading to double side-effects.
//!
//! `DedupState` prevents this by caching the response for each `request_id` for a
//! fixed TTL (default 15 s).  A second call with the same `request_id` within the TTL
//! returns the cached response without re-invoking the handler.
//!
//! ## Eviction
//!
//! Entries older than the TTL are evicted lazily on every `check_or_mark` call.
//! No background task is required.

use actr_framework::Bytes;
use actr_protocol::ActorResult;
use std::collections::HashMap;
use std::time::{Duration, Instant};

/// Default TTL: must be comfortably longer than the longest RpcReliable retry window.
/// RpcReliable: 5 attempts, up to 5s per gap → worst-case ~20s total; 15s covers most cases.
pub(crate) const DEDUP_TTL: Duration = Duration::from_secs(15);

/// Cached outcome of a completed request.
#[derive(Clone, Debug)]
enum CachedResult {
    /// Request processed and response stored.
    Done(ActorResult<Bytes>),
    /// Request is currently in-flight (guards against concurrent duplicates).
    InFlight,
}

/// Entry stored per request_id.
#[derive(Debug)]
struct Entry {
    received_at: Instant,
    result: CachedResult,
}

/// Framework-side request deduplication state.
///
/// Not `Clone` intentionally; share via `Arc<Mutex<DedupState>>`.
#[derive(Debug, Default)]
pub(crate) struct DedupState {
    entries: HashMap<String, Entry>,
    ttl: Duration,
}

/// Outcome of `check_or_mark`.
#[derive(Debug)]
pub(crate) enum DedupOutcome {
    /// First time we see this request_id; proceed with handling.
    Fresh,
    /// Request is already being processed concurrently.
    InFlight,
    /// Request was already processed; the cached response is returned.
    Duplicate(ActorResult<Bytes>),
}

impl DedupState {
    /// Create a dedup state with the default 15 s TTL.
    pub(crate) fn new() -> Self {
        Self {
            entries: HashMap::new(),
            ttl: DEDUP_TTL,
        }
    }

    /// Check whether `request_id` is a duplicate.
    ///
    /// - If fresh: inserts an `InFlight` marker and returns `DedupOutcome::Fresh`.
    /// - If already seen and still in TTL: returns `InFlight` or `Duplicate`.
    /// - Expired entries are evicted on every call.
    pub(crate) fn check_or_mark(&mut self, request_id: &str) -> DedupOutcome {
        let now = Instant::now();
        self.evict_expired(now);

        match self.entries.get(request_id) {
            None => {
                self.entries.insert(
                    request_id.to_string(),
                    Entry {
                        received_at: now,
                        result: CachedResult::InFlight,
                    },
                );
                DedupOutcome::Fresh
            }
            Some(entry) => match &entry.result {
                CachedResult::InFlight => DedupOutcome::InFlight,
                CachedResult::Done(r) => DedupOutcome::Duplicate(r.clone()),
            },
        }
    }

    /// Record the completed response for `request_id`.
    ///
    /// Call this after the handler finishes (success or error).
    pub(crate) fn complete(&mut self, request_id: &str, result: ActorResult<Bytes>) {
        if let Some(entry) = self.entries.get_mut(request_id) {
            entry.result = CachedResult::Done(result);
        }
    }

    /// Evict entries that have exceeded the TTL.
    fn evict_expired(&mut self, now: Instant) {
        self.entries
            .retain(|_, e| now.duration_since(e.received_at) < self.ttl);
    }

    /// Number of entries currently tracked (for monitoring / tests).
    #[cfg(test)]
    pub(crate) fn len(&self) -> usize {
        self.entries.len()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn ok_bytes(s: &str) -> ActorResult<Bytes> {
        Ok(Bytes::from(s.as_bytes().to_vec()))
    }

    #[test]
    fn fresh_request_is_marked_in_flight() {
        let mut d = DedupState::new();
        assert!(matches!(d.check_or_mark("req-1"), DedupOutcome::Fresh));
        assert_eq!(d.len(), 1);
    }

    #[test]
    fn concurrent_duplicate_returns_in_flight() {
        let mut d = DedupState::new();
        d.check_or_mark("req-1"); // marks InFlight
        assert!(matches!(d.check_or_mark("req-1"), DedupOutcome::InFlight));
    }

    #[test]
    fn completed_duplicate_returns_cached_response() {
        let mut d = DedupState::new();
        d.check_or_mark("req-1");
        d.complete("req-1", ok_bytes("hello"));

        let outcome = d.check_or_mark("req-1");
        assert!(
            matches!(outcome, DedupOutcome::Duplicate(Ok(ref b)) if b == "hello"),
            "expected cached Ok(\"hello\")"
        );
    }

    #[test]
    fn error_response_is_cached_and_returned() {
        use actr_protocol::ActrError;
        let mut d = DedupState::new();
        d.check_or_mark("req-err");
        d.complete(
            "req-err",
            Err(ActrError::InvalidArgument("bad input".to_string())),
        );

        let outcome = d.check_or_mark("req-err");
        assert!(
            matches!(
                outcome,
                DedupOutcome::Duplicate(Err(ActrError::InvalidArgument(_)))
            ),
            "expected cached Err"
        );
    }

    #[test]
    fn expired_entry_is_evicted_and_treated_as_fresh() {
        let mut d = DedupState {
            ttl: Duration::from_nanos(1), // expire immediately
            ..DedupState::new()
        };
        d.check_or_mark("req-old");
        d.complete("req-old", ok_bytes("v1"));

        // Force TTL expiry by advancing: we can't time-travel Instant in stable Rust,
        // so set TTL to 0 and trigger eviction on the next check.
        // Insert a fresh entry to trigger evict_expired, then re-check the old one.
        d.check_or_mark("req-new"); // triggers evict with ttl=1ns, old entry expires
        assert!(matches!(d.check_or_mark("req-old"), DedupOutcome::Fresh));
    }

    #[test]
    fn different_request_ids_are_independent() {
        let mut d = DedupState::new();
        d.check_or_mark("req-a");
        d.complete("req-a", ok_bytes("a"));

        assert!(matches!(d.check_or_mark("req-b"), DedupOutcome::Fresh));
        assert!(matches!(
            d.check_or_mark("req-a"),
            DedupOutcome::Duplicate(_)
        ));
    }
}