quota 0.2.1

High-performance Rate-limiter
Documentation
use crate::{Quota, QuotaPolicy, QuotaResult};
use dashmap::DashMap;
use parking_lot::Mutex;
use std::borrow::Borrow;
use std::hash::Hash;
use std::time::Instant;

pub type QuotaKey = String;

pub struct QuotaPool<K = QuotaKey>
where
    K: Eq + Hash,
{
    initial_available_tokens: u64,
    policy: QuotaPolicy,
    entries: DashMap<K, Mutex<QuotaSlot>>,
    epoch: Instant,
}

struct QuotaSlot {
    quota: Quota,
    last_access_time_ns: u64,
    last_refill_time_ns: u64,
}

impl QuotaSlot {
    #[inline]
    fn new(initial_available_tokens: u64) -> Self {
        Self {
            quota: Quota::with_initial_tokens(initial_available_tokens),
            last_access_time_ns: 0,
            last_refill_time_ns: 0,
        }
    }

    #[inline]
    fn consume(&mut self, policy: &QuotaPolicy, now_ns: u64, cost: u64) -> QuotaResult {
        let delta_ns = now_ns.saturating_sub(self.last_access_time_ns);
        self.last_access_time_ns = now_ns;

        let elapsed_since_refill_ns = now_ns.saturating_sub(self.last_refill_time_ns);
        let did_refill = policy.tick_with_interval(delta_ns, &self.quota, elapsed_since_refill_ns);

        if did_refill {
            self.last_refill_time_ns = now_ns;
        }

        self.quota.consume(cost)
    }
}

impl QuotaPool {
    #[inline]
    pub fn new(policy: QuotaPolicy, initial_available_tokens: u64) -> Self {
        Self::with_key_type(policy, initial_available_tokens)
    }
}

impl<K: Eq + Hash> QuotaPool<K> {
    #[inline]
    pub fn with_key_type(policy: QuotaPolicy, initial_available_tokens: u64) -> Self {
        Self::with_capacity(policy, initial_available_tokens, 0)
    }

    #[inline]
    pub fn with_capacity(
        policy: QuotaPolicy,
        initial_available_tokens: u64,
        capacity: usize,
    ) -> Self {
        Self {
            initial_available_tokens,
            policy,
            entries: DashMap::with_capacity(capacity),
            epoch: Instant::now(),
        }
    }

    #[inline]
    pub fn len(&self) -> usize {
        self.entries.len()
    }

    #[inline]
    pub fn is_empty(&self) -> bool {
        self.entries.is_empty()
    }

    #[inline]
    pub fn contains_key<Q>(&self, key: &Q) -> bool
    where
        K: Borrow<Q>,
        Q: Eq + Hash + ?Sized,
    {
        self.entries.contains_key(key)
    }

    #[inline]
    pub fn insert_keys<'a, Q, I>(&self, keys: I)
    where
        K: Borrow<Q>,
        Q: Eq + Hash + ToOwned<Owned = K> + ?Sized + 'a,
        I: IntoIterator<Item = &'a Q>,
    {
        for key in keys {
            self.insert_key(key);
        }
    }

    #[inline]
    pub fn insert_key<Q>(&self, key: &Q)
    where
        K: Borrow<Q>,
        Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
    {
        if self.entries.get(key).is_some() {
            return;
        }

        self.entries
            .entry(key.to_owned())
            .or_insert_with(|| Mutex::new(QuotaSlot::new(self.initial_available_tokens)));
    }

    #[inline]
    pub fn consume<Q>(&self, key: &Q, cost: u64) -> QuotaResult
    where
        K: Borrow<Q>,
        Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
    {
        if let Some(entry) = self.entries.get(key) {
            return self.consume_slot(&entry, cost);
        }

        let entry = self
            .entries
            .entry(key.to_owned())
            .or_insert_with(|| Mutex::new(QuotaSlot::new(self.initial_available_tokens)));

        self.consume_slot(&entry, cost)
    }

    #[inline]
    pub fn consume_owned(&self, key: K, cost: u64) -> QuotaResult {
        if let Some(entry) = self.entries.get(&key) {
            return self.consume_slot(&entry, cost);
        }

        let entry = self
            .entries
            .entry(key)
            .or_insert_with(|| Mutex::new(QuotaSlot::new(self.initial_available_tokens)));

        self.consume_slot(&entry, cost)
    }

    #[inline]
    fn consume_slot(&self, slot: &Mutex<QuotaSlot>, cost: u64) -> QuotaResult {
        let mut slot = slot.lock();
        let now_ns = self.epoch.elapsed().as_nanos() as u64;
        slot.consume(&self.policy, now_ns, cost)
    }
}

#[cfg(test)]
mod tests;