mod gcra;
mod slot;
use self::gcra::{burst_tokens, duration_ns, initial_tat_ns, ns_per_token};
#[cfg(test)]
use self::slot::QuotaSlot;
use self::slot::{ContinuousQuotaSlot, IntervalQuotaSlot};
use crate::quota_map::QuotaMap;
use crate::{QuotaPolicy, QuotaResult};
use std::borrow::Borrow;
use std::hash::Hash;
pub type QuotaKey = String;
pub struct QuotaPool<K = QuotaKey>
where
K: Eq + Hash,
{
initial_available_tokens: u64,
refill_enabled: bool,
refill_interval_ns: u64,
ns_per_token: f64,
token_ns: u64,
burst_tokens: f64,
burst_ns: u64,
burst_after_one_ns: u64,
initial_tat_ns: u64,
entries: QuotaEntries<K>,
clock: quanta::Clock,
start_raw: u64,
}
enum QuotaEntries<K> {
Continuous(QuotaMap<K, ContinuousQuotaSlot>),
Interval(QuotaMap<K, IntervalQuotaSlot>),
}
impl QuotaPool {
#[inline]
pub fn new(policy: QuotaPolicy, initial_available_tokens: u64) -> Self {
Self::with_key_type(policy, initial_available_tokens)
}
}
impl<K: Eq + Hash + Clone> QuotaPool<K> {
#[inline]
pub fn with_key_type(policy: QuotaPolicy, initial_available_tokens: u64) -> Self {
Self::with_capacity(policy, initial_available_tokens, 0)
}
#[inline]
pub fn with_capacity(
policy: QuotaPolicy,
initial_available_tokens: u64,
capacity: usize,
) -> Self {
let refill_rate_ns = policy.refill_rate_ns();
let refill_enabled = refill_rate_ns > 0.0;
let refill_interval_ns = policy.refill_interval_ns();
let ns_per_token = ns_per_token(refill_rate_ns);
let token_ns = duration_ns(1.0, ns_per_token);
let burst_tokens = burst_tokens(&policy, initial_available_tokens, refill_rate_ns);
let burst_ns = duration_ns(burst_tokens, ns_per_token);
let burst_after_one_ns = burst_ns.saturating_sub(token_ns);
let initial_tat_ns = initial_tat_ns(burst_tokens, initial_available_tokens, ns_per_token);
let clock = quanta::Clock::default();
let start_raw = clock.raw();
Self {
initial_available_tokens,
refill_enabled,
refill_interval_ns,
ns_per_token,
token_ns,
burst_tokens,
burst_ns,
burst_after_one_ns,
initial_tat_ns,
entries: if refill_interval_ns == 0 {
QuotaEntries::Continuous(QuotaMap::with_capacity(capacity))
} else {
QuotaEntries::Interval(QuotaMap::with_capacity(capacity))
},
clock,
start_raw,
}
}
#[inline]
pub fn len(&self) -> usize {
match &self.entries {
QuotaEntries::Continuous(entries) => entries.len(),
QuotaEntries::Interval(entries) => entries.len(),
}
}
#[inline]
pub fn is_empty(&self) -> bool {
match &self.entries {
QuotaEntries::Continuous(entries) => entries.is_empty(),
QuotaEntries::Interval(entries) => entries.is_empty(),
}
}
#[inline]
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
match &self.entries {
QuotaEntries::Continuous(entries) => entries.contains_key(key),
QuotaEntries::Interval(entries) => entries.contains_key(key),
}
}
#[inline]
pub fn insert_keys<'a, Q, I>(&self, keys: I)
where
K: Borrow<Q>,
Q: Eq + Hash + ToOwned<Owned = K> + ?Sized + 'a,
I: IntoIterator<Item = &'a Q>,
{
for key in keys {
self.insert_key(key);
}
}
#[inline]
pub fn insert_key<Q>(&self, key: &Q)
where
K: Borrow<Q>,
Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
{
match &self.entries {
QuotaEntries::Continuous(entries) => {
entries.with_or_insert(
key,
|_| {
ContinuousQuotaSlot::with_initial_tat(
self.initial_available_tokens,
self.initial_tat_ns,
)
},
|_| (),
);
}
QuotaEntries::Interval(entries) => {
entries.with_or_insert(
key,
|_| {
IntervalQuotaSlot::with_initial_tat(
self.initial_available_tokens,
self.initial_tat_ns,
)
},
|_| (),
);
}
}
}
#[inline]
pub fn consume<Q>(&self, key: &Q, cost: u64) -> QuotaResult
where
K: Borrow<Q>,
Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
{
let now_ns = self.now_ns();
match &self.entries {
QuotaEntries::Continuous(entries) => entries.with_or_insert(
key,
|_| {
ContinuousQuotaSlot::with_initial_tat(
self.initial_available_tokens,
self.initial_tat_ns,
)
},
|slot| self.consume_continuous_slot_at(slot, cost, now_ns),
),
QuotaEntries::Interval(entries) => entries.with_or_insert(
key,
|_| {
IntervalQuotaSlot::with_initial_tat(
self.initial_available_tokens,
self.initial_tat_ns,
)
},
|slot| self.consume_interval_slot_at(slot, cost, now_ns),
),
}
}
#[inline]
pub fn consume_owned(&self, key: K, cost: u64) -> QuotaResult {
self.consume(&key, cost)
}
#[inline]
pub fn check<Q>(&self, key: &Q, cost: u64) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
{
let now_ns = self.now_ns();
match &self.entries {
QuotaEntries::Continuous(entries) => entries.with_or_insert(
key,
|_| {
ContinuousQuotaSlot::with_initial_tat(
self.initial_available_tokens,
self.initial_tat_ns,
)
},
|slot| self.check_continuous_slot_at(slot, cost, now_ns),
),
QuotaEntries::Interval(entries) => entries.with_or_insert(
key,
|_| {
IntervalQuotaSlot::with_initial_tat(
self.initial_available_tokens,
self.initial_tat_ns,
)
},
|slot| self.check_interval_slot_at(slot, cost, now_ns),
),
}
}
#[inline]
pub fn check_owned(&self, key: K, cost: u64) -> bool {
self.check(&key, cost)
}
#[inline]
pub fn remove<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
match &self.entries {
QuotaEntries::Continuous(entries) => entries.remove(key),
QuotaEntries::Interval(entries) => entries.remove(key),
}
}
#[inline]
fn now_ns(&self) -> u64 {
if self.refill_enabled {
self.clock.delta_as_nanos(self.start_raw, self.clock.raw())
} else {
0
}
}
#[inline]
fn consume_continuous_slot_at(
&self,
slot: &ContinuousQuotaSlot,
cost: u64,
now_ns: u64,
) -> QuotaResult {
slot.consume_with_gcra(
now_ns,
cost,
self.ns_per_token,
self.token_ns,
self.burst_tokens,
self.burst_ns,
self.burst_after_one_ns,
)
}
#[inline]
fn consume_interval_slot_at(
&self,
slot: &IntervalQuotaSlot,
cost: u64,
now_ns: u64,
) -> QuotaResult {
slot.consume_with_gcra(
now_ns,
cost,
self.refill_enabled,
self.refill_interval_ns,
self.ns_per_token,
self.token_ns,
self.burst_tokens,
self.burst_ns,
self.burst_after_one_ns,
)
}
#[inline]
fn check_continuous_slot_at(&self, slot: &ContinuousQuotaSlot, cost: u64, now_ns: u64) -> bool {
if cost == 1 {
return slot.check_one_with_gcra(now_ns, self.token_ns, self.burst_after_one_ns);
}
slot.check_with_gcra(
now_ns,
cost,
self.ns_per_token,
self.token_ns,
self.burst_ns,
self.burst_after_one_ns,
)
}
#[inline]
fn check_interval_slot_at(&self, slot: &IntervalQuotaSlot, cost: u64, now_ns: u64) -> bool {
if cost == 1 {
return slot.check_one_with_gcra(
now_ns,
self.refill_enabled,
self.refill_interval_ns,
self.token_ns,
self.burst_after_one_ns,
);
}
slot.check_with_gcra(
now_ns,
cost,
self.refill_enabled,
self.refill_interval_ns,
self.ns_per_token,
self.token_ns,
self.burst_ns,
self.burst_after_one_ns,
)
}
}
#[cfg(test)]
mod tests;