trypema 1.1.0

High-performance rate limiting primitives in Rust, designed for concurrency safety, low overhead, and predictable latency.
Documentation
use std::{
    collections::VecDeque,
    sync::atomic::{AtomicU64, Ordering},
    time::{Duration, Instant},
};

use ahash::RandomState;
use dashmap::DashMap;

use crate::{
    LocalRateLimiterOptions,
    common::{InstantRate, RateGroupSizeMs, RateLimit, RateLimitDecision, WindowSizeSeconds},
};

#[derive(Debug)]
pub(crate) struct RateLimitSeries {
    pub limit: RateLimit,
    pub series: VecDeque<InstantRate>,
    pub total: AtomicU64,
}

impl RateLimitSeries {
    pub fn new(limit: RateLimit) -> Self {
        Self {
            limit,
            series: VecDeque::new(),
            total: AtomicU64::new(0),
        }
    }
}

/// Strict sliding-window rate limiter for in-process use.
///
/// Provides deterministic rate limiting with per-key state maintained in memory.
/// Uses a sliding time window to track request counts and enforce limits.
///
/// # Algorithm
///
/// 1. **Window capacity:** `window_size_seconds × rate_limit`
/// 2. **Admission check:** Sum all bucket counts within the window
/// 3. **Decision:** Allow if `total < capacity`, reject otherwise
/// 4. **Increment:** If allowed, add count to current (or coalesced) bucket
///
/// # Thread Safety
///
/// - Uses [`DashMap`](dashmap::DashMap) for concurrent key access
/// - Uses atomics for per-bucket counters
/// - Safe for multi-threaded use without external synchronization
///
/// # Semantics & Limitations
///
/// **Sticky rate limits:**
/// - The first call for a key stores the rate limit
/// - Subsequent calls for the same key do not update it
/// - Rationale: Avoids races where concurrent calls specify different limits
///
/// **Best-effort concurrency:**
/// - Admission check and increment are not atomic across calls
/// - Multiple threads can observe "allowed" simultaneously
/// - All may proceed, causing temporary overshoot
/// - This is **expected behavior**, not a bug
///
/// **Eviction granularity:**
/// - Uses `Instant::elapsed().as_millis()` (whole-millisecond truncation)
/// - Buckets expire close to `window_size_seconds` (lazy eviction may delay removal until next call)
///
/// **Memory growth:**
/// - Keys are not automatically removed
/// - Unbounded key cardinality will grow memory
/// - Use `run_cleanup_loop()` to periodically remove stale keys
///
/// **Lazy eviction:**
/// - Expired buckets are only removed when `is_allowed()` or `inc()` is called
/// - Stale buckets remain in memory until accessed or cleanup runs
///
/// # Performance
///
/// - **Admission check:** O(buckets_in_window) — typically < 10 buckets
/// - **Increment:** O(1) amortised (coalesced into existing bucket or appended via `push_back`)
/// - **Memory:** ~50–200 bytes per key (depends on bucket count)
///
/// # Examples
///
/// ```
/// # let rl = trypema::__doctest_helpers::rate_limiter();
/// use trypema::{RateLimit, RateLimitDecision};
///
/// let limiter = rl.local().absolute();
/// let rate = RateLimit::try_from(10.0).unwrap();
///
/// assert!(matches!(limiter.inc("user_123", &rate, 1), RateLimitDecision::Allowed));
/// assert!(matches!(limiter.is_allowed("user_123"), RateLimitDecision::Allowed));
/// ```
#[derive(Debug)]
pub struct AbsoluteLocalRateLimiter {
    window_size_seconds: WindowSizeSeconds,
    window_size_ms: u128,
    window_duration: Duration,
    rate_group_size_ms: RateGroupSizeMs,
    series: DashMap<String, RateLimitSeries, RandomState>,
}

impl AbsoluteLocalRateLimiter {
    pub(crate) fn new(options: LocalRateLimiterOptions) -> Self {
        Self {
            window_size_ms: (*options.window_size_seconds as u128).saturating_mul(1000),
            window_duration: Duration::from_secs(*options.window_size_seconds),
            window_size_seconds: options.window_size_seconds,
            rate_group_size_ms: options.rate_group_size_ms,
            series: DashMap::default(),
        }
    } // end constructor

    #[cfg(test)]
    pub(crate) fn series(&self) -> &DashMap<String, RateLimitSeries, RandomState> {
        &self.series
    }

    /// Check admission and, if allowed, record the increment for `key`.
    ///
    /// This is the primary method for rate limiting. It performs an admission check
    /// and, if allowed, records the increment in the key's state.
    ///
    /// # Arguments
    ///
    /// - `key`: Unique identifier for the rate-limited resource (e.g., `"user_123"`, `"api_endpoint"`)
    /// - `rate_limit`: Per-second rate limit. **Sticky:** stored on first call, ignored on subsequent calls
    /// - `count`: Amount to increment (typically `1` for single requests, or batch size)
    ///
    /// # Returns
    ///
    /// - [`RateLimitDecision::Allowed`]: Request admitted, increment recorded
    /// - [`RateLimitDecision::Rejected`]: Over limit, increment **not** recorded
    ///
    /// # Behavior
    ///
    /// 1. Check current window usage via `is_allowed(key)`
    /// 2. If over limit, return `Rejected` (no state change)
    /// 3. If allowed:
    ///    - Check if recent bucket exists within `rate_group_size_ms`
    ///    - If yes: add count to existing bucket (coalescing)
    ///    - If no: create new bucket with count
    ///    - Return `Allowed`
    ///
    /// # Concurrency
    ///
    /// **Not atomic across calls.** Under concurrent load:
    /// - Multiple threads may observe `Allowed` simultaneously
    /// - All may proceed and increment, causing temporary overshoot
    /// - This is **expected** and by design for performance
    ///
    /// For strict enforcement, use external synchronization (e.g., per-key locks).
    ///
    /// # Bucket Coalescing
    ///
    /// Increments within `rate_group_size_ms` of the most recent bucket are merged
    /// into that bucket. This reduces memory usage and improves performance.
    ///
    /// # Examples
    ///
    /// ```
    /// # let rl = trypema::__doctest_helpers::rate_limiter();
    /// use trypema::{RateLimit, RateLimitDecision};
    ///
    /// let limiter = rl.local().absolute();
    /// let rate = RateLimit::try_from(10.0).unwrap();
    ///
    /// // Single request
    /// assert!(matches!(limiter.inc("user_123", &rate, 1), RateLimitDecision::Allowed));
    ///
    /// // Batch of 10
    /// assert!(matches!(limiter.inc("user_456", &rate, 10), RateLimitDecision::Allowed));
    /// ```
    pub fn inc(&self, key: &str, rate_limit: &RateLimit, count: u64) -> RateLimitDecision {
        let is_allowed = self.is_allowed(key);

        if !matches!(is_allowed, RateLimitDecision::Allowed) {
            return is_allowed;
        }

        let rate_limit_series = match self.series.get(key) {
            Some(rate_limit_series) => rate_limit_series,
            None => {
                self.series
                    .entry(key.to_string())
                    .or_insert_with(|| RateLimitSeries::new(*rate_limit));
                let Some(rate_limit_series) = self.series.get(key) else {
                    unreachable!("AbsoluteLocalRateLimiter::inc: key should be in map");
                };

                rate_limit_series
            }
        };

        if let Some(last_entry) = rate_limit_series.series.back()
            && last_entry.timestamp.elapsed().as_millis() <= *self.rate_group_size_ms as u128
        {
            last_entry.count.fetch_add(count, Ordering::Relaxed);
            rate_limit_series.total.fetch_add(count, Ordering::Relaxed);
        } else {
            drop(rate_limit_series);

            let Some(mut rate_limit_series) = self.series.get_mut(key) else {
                unreachable!("AbsoluteLocalRateLimiter::inc: key should be in map");
            };

            rate_limit_series.series.push_back(InstantRate {
                count: count.into(),
                timestamp: Instant::now(),
                declined: AtomicU64::new(0),
            });

            rate_limit_series.total.fetch_add(count, Ordering::Relaxed);
        }

        RateLimitDecision::Allowed
    } // end method inc

    /// Check if `key` is currently under its rate limit (read-only).
    ///
    /// Performs an admission check **without** recording an increment. Useful for
    /// previewing whether a request would be allowed before doing expensive work.
    ///
    /// # Arguments
    ///
    /// - `key`: Unique identifier for the rate-limited resource
    ///
    /// # Returns
    ///
    /// - [`RateLimitDecision::Allowed`]: Key is under limit
    /// - [`RateLimitDecision::Rejected`]: Key is over limit, includes backoff hints
    ///
    /// # Behavior
    ///
    /// 1. If key doesn't exist, return `Allowed` (no state yet)
    /// 2. Perform lazy eviction of expired buckets
    /// 3. Sum remaining bucket counts
    /// 4. Compare against `window_capacity = window_size_seconds × rate_limit`
    /// 5. Return decision with metadata if rejected
    ///
    /// # Side Effects
    ///
    /// - **Lazy eviction:** Removes expired buckets from key's state
    /// - **No increment:** Does not modify counters (read-only check)
    ///
    /// # Use Cases
    ///
    /// - **Preview:** Check before expensive operations
    /// - **Metrics:** Sample rate limit status without affecting state
    /// - **Testing:** Verify rate limit behavior
    ///
    /// # Examples
    ///
    /// ```
    /// # let rl = trypema::__doctest_helpers::rate_limiter();
    /// use trypema::{RateLimit, RateLimitDecision};
    ///
    /// let limiter = rl.local().absolute();
    /// let rate = RateLimit::try_from(10.0).unwrap();
    ///
    /// // Unknown key → always allowed
    /// assert!(matches!(limiter.is_allowed("new_key"), RateLimitDecision::Allowed));
    ///
    /// // Check before recording
    /// if matches!(limiter.is_allowed("user_123"), RateLimitDecision::Allowed) {
    ///     limiter.inc("user_123", &rate, 1);
    /// }
    /// ```
    pub fn is_allowed(&self, key: &str) -> RateLimitDecision {
        let Some(rate_limit) = self.series.get(key) else {
            return RateLimitDecision::Allowed;
        };

        let mut total_count = rate_limit.total.load(Ordering::Relaxed);
        let window_limit = (*self.window_size_seconds as f64 * *rate_limit.limit) as u64;

        if total_count < window_limit {
            return RateLimitDecision::Allowed;
        }

        // Delay cleanup only if there is a possibility of rejection

        let rate_limit = match rate_limit.series.front() {
            None => rate_limit,
            Some(instant_rate)
                if instant_rate.timestamp.elapsed().as_millis() <= self.window_size_ms =>
            {
                rate_limit
            }
            Some(_) => {
                drop(rate_limit);

                let Some(mut rate_limit) = self.series.get_mut(key) else {
                    unreachable!("AbsoluteLocalRateLimiter::is_allowed: key should be in map");
                };

                let now = Instant::now();

                let split = rate_limit
                    .series
                    .partition_point(|r| now.duration_since(r.timestamp) > self.window_duration);

                let total = rate_limit
                    .series
                    .drain(..split)
                    .map(|r| r.count.load(Ordering::Relaxed))
                    .sum::<u64>();

                rate_limit.total.fetch_sub(total, Ordering::Relaxed);
                total_count -= total;

                drop(rate_limit);

                let Some(rate_limit) = self.series.get(key) else {
                    unreachable!("AbsoluteLocalRateLimiter::is_allowed: key should be in map");
                };

                rate_limit
            }
        };

        if total_count < window_limit {
            return RateLimitDecision::Allowed;
        }

        let (retry_after_ms, remaining_after_waiting) = match rate_limit.series.front() {
            None => (0, 0),
            Some(instant_rate) => {
                let elapsed_ms = instant_rate.timestamp.elapsed().as_millis();
                let retry_after_ms = self.window_size_ms.saturating_sub(elapsed_ms);

                let current_total = rate_limit.total.load(Ordering::Relaxed);
                let oldest_count = instant_rate.count.load(Ordering::Relaxed);
                let remaining_after_waiting = current_total.saturating_sub(oldest_count);

                (retry_after_ms, remaining_after_waiting)
            }
        };

        RateLimitDecision::Rejected {
            window_size_seconds: *self.window_size_seconds,
            retry_after_ms,
            remaining_after_waiting,
        }
    } // end method is_allowed

    pub(crate) fn cleanup(&self, stale_after_ms: u64) {
        self.series.retain(
            |_, rate_limit_series| match rate_limit_series.series.back() {
                None => false,
                Some(instant_rate)
                    if instant_rate.timestamp.elapsed().as_millis() > stale_after_ms as u128 =>
                {
                    false
                }
                Some(_) => true,
            },
        );
    } // end method cleanup
} // end of impl