use crate::{Quota, QuotaPolicy, QuotaResult};
use dashmap::DashMap;
use parking_lot::Mutex;
use std::borrow::Borrow;
use std::hash::Hash;
use std::time::Instant;
pub type QuotaKey = String;
pub struct QuotaPool<K = QuotaKey>
where
K: Eq + Hash,
{
initial_available_tokens: u64,
policy: QuotaPolicy,
entries: DashMap<K, Mutex<QuotaSlot>>,
epoch: Instant,
}
struct QuotaSlot {
quota: Quota,
last_access_time_ns: u64,
last_refill_time_ns: u64,
}
impl QuotaSlot {
#[inline]
fn new(initial_available_tokens: u64) -> Self {
Self {
quota: Quota::with_initial_tokens(initial_available_tokens),
last_access_time_ns: 0,
last_refill_time_ns: 0,
}
}
#[inline]
fn consume(&mut self, policy: &QuotaPolicy, now_ns: u64, cost: u64) -> QuotaResult {
let delta_ns = now_ns.saturating_sub(self.last_access_time_ns);
self.last_access_time_ns = now_ns;
let elapsed_since_refill_ns = now_ns.saturating_sub(self.last_refill_time_ns);
let did_refill = policy.tick_with_interval(delta_ns, &self.quota, elapsed_since_refill_ns);
if did_refill {
self.last_refill_time_ns = now_ns;
}
self.quota.consume(cost)
}
}
impl QuotaPool {
#[inline]
pub fn new(policy: QuotaPolicy, initial_available_tokens: u64) -> Self {
Self::with_key_type(policy, initial_available_tokens)
}
}
impl<K: Eq + Hash> QuotaPool<K> {
#[inline]
pub fn with_key_type(policy: QuotaPolicy, initial_available_tokens: u64) -> Self {
Self::with_capacity(policy, initial_available_tokens, 0)
}
#[inline]
pub fn with_capacity(
policy: QuotaPolicy,
initial_available_tokens: u64,
capacity: usize,
) -> Self {
Self {
initial_available_tokens,
policy,
entries: DashMap::with_capacity(capacity),
epoch: Instant::now(),
}
}
#[inline]
pub fn len(&self) -> usize {
self.entries.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
#[inline]
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.entries.contains_key(key)
}
#[inline]
pub fn insert_keys<'a, Q, I>(&self, keys: I)
where
K: Borrow<Q>,
Q: Eq + Hash + ToOwned<Owned = K> + ?Sized + 'a,
I: IntoIterator<Item = &'a Q>,
{
for key in keys {
self.insert_key(key);
}
}
#[inline]
pub fn insert_key<Q>(&self, key: &Q)
where
K: Borrow<Q>,
Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
{
if self.entries.get(key).is_some() {
return;
}
self.entries
.entry(key.to_owned())
.or_insert_with(|| Mutex::new(QuotaSlot::new(self.initial_available_tokens)));
}
#[inline]
pub fn consume<Q>(&self, key: &Q, cost: u64) -> QuotaResult
where
K: Borrow<Q>,
Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
{
if let Some(entry) = self.entries.get(key) {
return self.consume_slot(&entry, cost);
}
let entry = self
.entries
.entry(key.to_owned())
.or_insert_with(|| Mutex::new(QuotaSlot::new(self.initial_available_tokens)));
self.consume_slot(&entry, cost)
}
#[inline]
pub fn consume_owned(&self, key: K, cost: u64) -> QuotaResult {
if let Some(entry) = self.entries.get(&key) {
return self.consume_slot(&entry, cost);
}
let entry = self
.entries
.entry(key)
.or_insert_with(|| Mutex::new(QuotaSlot::new(self.initial_available_tokens)));
self.consume_slot(&entry, cost)
}
#[inline]
fn consume_slot(&self, slot: &Mutex<QuotaSlot>, cost: u64) -> QuotaResult {
let mut slot = slot.lock();
let now_ns = self.epoch.elapsed().as_nanos() as u64;
slot.consume(&self.policy, now_ns, cost)
}
}
#[cfg(test)]
mod tests;