quota 0.3.3

Fastest Lane-parallel Rate-limiter for Rust
Documentation
mod gcra;
mod slot;

use self::gcra::{burst_tokens, duration_ns, initial_tat_ns, ns_per_token};
#[cfg(test)]
use self::slot::QuotaSlot;
use self::slot::{ContinuousQuotaSlot, IntervalQuotaSlot};
use crate::quota_map::QuotaMap;
use crate::{QuotaPolicy, QuotaResult};
use std::borrow::Borrow;
use std::hash::Hash;

/// The default owned key type for a keyed quota pool.
pub type QuotaKey = String;

/// A concurrent keyed quota pool.
///
/// Each key gets an independent GCRA state. Continuous refill policies use a compact
/// one-atomic slot per key, while policies with a discrete refill interval use a slot that also
/// tracks the last refill tick.
pub struct QuotaPool<K = QuotaKey>
where
    K: Eq + Hash,
{
    initial_available_tokens: u64,
    refill_enabled: bool,
    refill_interval_ns: u64,
    ns_per_token: f64,
    token_ns: u64,
    burst_tokens: f64,
    burst_ns: u64,
    burst_after_one_ns: u64,
    initial_tat_ns: u64,
    entries: QuotaEntries<K>,
    clock: quanta::Clock,
    start_raw: u64,
}

enum QuotaEntries<K> {
    Continuous(QuotaMap<K, ContinuousQuotaSlot>),
    Interval(QuotaMap<K, IntervalQuotaSlot>),
}

impl QuotaPool {
    #[inline]
    /// Creates a `String`-keyed quota pool with no initial map reservation.
    pub fn new(policy: QuotaPolicy, initial_available_tokens: u64) -> Self {
        Self::with_key_type(policy, initial_available_tokens)
    }
}

impl<K: Eq + Hash + Clone> QuotaPool<K> {
    #[inline]
    /// Creates a quota pool using the caller's key type.
    pub fn with_key_type(policy: QuotaPolicy, initial_available_tokens: u64) -> Self {
        Self::with_capacity(policy, initial_available_tokens, 0)
    }

    #[inline]
    /// Creates a quota pool and reserves enough map capacity for the expected key count.
    pub fn with_capacity(
        policy: QuotaPolicy,
        initial_available_tokens: u64,
        capacity: usize,
    ) -> Self {
        let refill_rate_ns = policy.refill_rate_ns();
        let refill_enabled = refill_rate_ns > 0.0;
        let refill_interval_ns = policy.refill_interval_ns();
        let ns_per_token = ns_per_token(refill_rate_ns);
        let token_ns = duration_ns(1.0, ns_per_token);
        let burst_tokens = burst_tokens(&policy, initial_available_tokens, refill_rate_ns);
        let burst_ns = duration_ns(burst_tokens, ns_per_token);
        let burst_after_one_ns = burst_ns.saturating_sub(token_ns);
        let initial_tat_ns = initial_tat_ns(burst_tokens, initial_available_tokens, ns_per_token);
        let clock = quanta::Clock::default();
        let start_raw = clock.raw();

        Self {
            initial_available_tokens,
            refill_enabled,
            refill_interval_ns,
            ns_per_token,
            token_ns,
            burst_tokens,
            burst_ns,
            burst_after_one_ns,
            initial_tat_ns,
            entries: if refill_interval_ns == 0 {
                QuotaEntries::Continuous(QuotaMap::with_capacity(capacity))
            } else {
                QuotaEntries::Interval(QuotaMap::with_capacity(capacity))
            },
            clock,
            start_raw,
        }
    }

    #[inline]
    /// Returns the number of keys currently tracked by the pool.
    pub fn len(&self) -> usize {
        match &self.entries {
            QuotaEntries::Continuous(entries) => entries.len(),
            QuotaEntries::Interval(entries) => entries.len(),
        }
    }

    #[inline]
    /// Returns whether the pool currently tracks no keys.
    pub fn is_empty(&self) -> bool {
        match &self.entries {
            QuotaEntries::Continuous(entries) => entries.is_empty(),
            QuotaEntries::Interval(entries) => entries.is_empty(),
        }
    }

    #[inline]
    /// Returns whether the pool already has a quota slot for `key`.
    pub fn contains_key<Q>(&self, key: &Q) -> bool
    where
        K: Borrow<Q>,
        Q: Eq + Hash + ?Sized,
    {
        match &self.entries {
            QuotaEntries::Continuous(entries) => entries.contains_key(key),
            QuotaEntries::Interval(entries) => entries.contains_key(key),
        }
    }

    #[inline]
    /// Inserts several keys without consuming quota.
    pub fn insert_keys<'a, Q, I>(&self, keys: I)
    where
        K: Borrow<Q>,
        Q: Eq + Hash + ToOwned<Owned = K> + ?Sized + 'a,
        I: IntoIterator<Item = &'a Q>,
    {
        for key in keys {
            self.insert_key(key);
        }
    }

    #[inline]
    /// Inserts a key without consuming quota.
    pub fn insert_key<Q>(&self, key: &Q)
    where
        K: Borrow<Q>,
        Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
    {
        match &self.entries {
            QuotaEntries::Continuous(entries) => {
                entries.with_or_insert(
                    key,
                    |_| {
                        ContinuousQuotaSlot::with_initial_tat(
                            self.initial_available_tokens,
                            self.initial_tat_ns,
                        )
                    },
                    |_| (),
                );
            }
            QuotaEntries::Interval(entries) => {
                entries.with_or_insert(
                    key,
                    |_| {
                        IntervalQuotaSlot::with_initial_tat(
                            self.initial_available_tokens,
                            self.initial_tat_ns,
                        )
                    },
                    |_| (),
                );
            }
        }
    }

    #[inline]
    /// Attempts to consume `cost` tokens for `key`.
    pub fn consume<Q>(&self, key: &Q, cost: u64) -> QuotaResult
    where
        K: Borrow<Q>,
        Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
    {
        let now_ns = self.now_ns();

        match &self.entries {
            QuotaEntries::Continuous(entries) => entries.with_or_insert(
                key,
                |_| {
                    ContinuousQuotaSlot::with_initial_tat(
                        self.initial_available_tokens,
                        self.initial_tat_ns,
                    )
                },
                |slot| self.consume_continuous_slot_at(slot, cost, now_ns),
            ),
            QuotaEntries::Interval(entries) => entries.with_or_insert(
                key,
                |_| {
                    IntervalQuotaSlot::with_initial_tat(
                        self.initial_available_tokens,
                        self.initial_tat_ns,
                    )
                },
                |slot| self.consume_interval_slot_at(slot, cost, now_ns),
            ),
        }
    }

    #[inline]
    /// Attempts to consume `cost` tokens for an owned key.
    pub fn consume_owned(&self, key: K, cost: u64) -> QuotaResult {
        self.consume(&key, cost)
    }

    #[inline]
    /// Fast boolean form of `consume` for callers that do not need the quota snapshot.
    pub fn check<Q>(&self, key: &Q, cost: u64) -> bool
    where
        K: Borrow<Q>,
        Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
    {
        let now_ns = self.now_ns();

        match &self.entries {
            QuotaEntries::Continuous(entries) => entries.with_or_insert(
                key,
                |_| {
                    ContinuousQuotaSlot::with_initial_tat(
                        self.initial_available_tokens,
                        self.initial_tat_ns,
                    )
                },
                |slot| self.check_continuous_slot_at(slot, cost, now_ns),
            ),
            QuotaEntries::Interval(entries) => entries.with_or_insert(
                key,
                |_| {
                    IntervalQuotaSlot::with_initial_tat(
                        self.initial_available_tokens,
                        self.initial_tat_ns,
                    )
                },
                |slot| self.check_interval_slot_at(slot, cost, now_ns),
            ),
        }
    }

    #[inline]
    /// Fast boolean form of `consume_owned`.
    pub fn check_owned(&self, key: K, cost: u64) -> bool {
        self.check(&key, cost)
    }

    #[inline]
    /// Removes a key and its quota state from the pool.
    pub fn remove<Q>(&self, key: &Q) -> bool
    where
        K: Borrow<Q>,
        Q: Eq + Hash + ?Sized,
    {
        match &self.entries {
            QuotaEntries::Continuous(entries) => entries.remove(key),
            QuotaEntries::Interval(entries) => entries.remove(key),
        }
    }

    #[inline]
    fn now_ns(&self) -> u64 {
        if self.refill_enabled {
            self.clock.delta_as_nanos(self.start_raw, self.clock.raw())
        } else {
            0
        }
    }

    #[inline]
    fn consume_continuous_slot_at(
        &self,
        slot: &ContinuousQuotaSlot,
        cost: u64,
        now_ns: u64,
    ) -> QuotaResult {
        slot.consume_with_gcra(
            now_ns,
            cost,
            self.ns_per_token,
            self.token_ns,
            self.burst_tokens,
            self.burst_ns,
            self.burst_after_one_ns,
        )
    }

    #[inline]
    fn consume_interval_slot_at(
        &self,
        slot: &IntervalQuotaSlot,
        cost: u64,
        now_ns: u64,
    ) -> QuotaResult {
        slot.consume_with_gcra(
            now_ns,
            cost,
            self.refill_enabled,
            self.refill_interval_ns,
            self.ns_per_token,
            self.token_ns,
            self.burst_tokens,
            self.burst_ns,
            self.burst_after_one_ns,
        )
    }

    #[inline]
    fn check_continuous_slot_at(&self, slot: &ContinuousQuotaSlot, cost: u64, now_ns: u64) -> bool {
        if cost == 1 {
            return slot.check_one_with_gcra(now_ns, self.token_ns, self.burst_after_one_ns);
        }

        slot.check_with_gcra(
            now_ns,
            cost,
            self.ns_per_token,
            self.token_ns,
            self.burst_ns,
            self.burst_after_one_ns,
        )
    }

    #[inline]
    fn check_interval_slot_at(&self, slot: &IntervalQuotaSlot, cost: u64, now_ns: u64) -> bool {
        if cost == 1 {
            return slot.check_one_with_gcra(
                now_ns,
                self.refill_enabled,
                self.refill_interval_ns,
                self.token_ns,
                self.burst_after_one_ns,
            );
        }

        slot.check_with_gcra(
            now_ns,
            cost,
            self.refill_enabled,
            self.refill_interval_ns,
            self.ns_per_token,
            self.token_ns,
            self.burst_ns,
            self.burst_after_one_ns,
        )
    }
}

#[cfg(test)]
mod tests;