#[cfg(test)]
use super::gcra::{burst_tokens, duration_ns, initial_tat_ns, ns_per_token};
use super::gcra::{check_one_tat, check_tat, consume_tat};
#[cfg(test)]
use crate::QuotaPolicy;
use crate::QuotaResult;
use std::sync::atomic::{AtomicU64, Ordering};
pub(super) struct ContinuousQuotaSlot {
tat_ns: AtomicU64,
}
pub(super) struct IntervalQuotaSlot {
#[cfg(test)]
initial_available_tokens: u64,
tat_ns: AtomicU64,
last_tick_time_ns: AtomicU64,
}
#[cfg(test)]
pub(super) type QuotaSlot = IntervalQuotaSlot;
impl ContinuousQuotaSlot {
#[inline]
pub(super) fn with_initial_tat(_initial_available_tokens: u64, initial_tat_ns: u64) -> Self {
Self {
tat_ns: AtomicU64::new(initial_tat_ns),
}
}
#[inline]
pub(super) fn consume_with_gcra(
&self,
now_ns: u64,
cost: u64,
ns_per_token: f64,
token_ns: u64,
burst_tokens: f64,
burst_ns: u64,
burst_after_one_ns: u64,
) -> QuotaResult {
consume_tat(
&self.tat_ns,
now_ns,
cost,
ns_per_token,
token_ns,
burst_tokens,
burst_ns,
burst_after_one_ns,
)
}
#[inline]
pub(super) fn check_with_gcra(
&self,
now_ns: u64,
cost: u64,
ns_per_token: f64,
token_ns: u64,
burst_ns: u64,
burst_after_one_ns: u64,
) -> bool {
check_tat(
&self.tat_ns,
now_ns,
cost,
ns_per_token,
token_ns,
burst_ns,
burst_after_one_ns,
)
}
#[inline]
pub(super) fn check_one_with_gcra(
&self,
now_ns: u64,
token_ns: u64,
burst_after_one_ns: u64,
) -> bool {
check_one_tat(&self.tat_ns, now_ns, token_ns, burst_after_one_ns)
}
}
impl IntervalQuotaSlot {
#[inline]
#[cfg(test)]
pub(super) fn new(policy: &QuotaPolicy, initial_available_tokens: u64) -> Self {
let rate_ns = policy.refill_rate_ns();
let ns_per_token = ns_per_token(rate_ns);
let burst_tokens = burst_tokens(policy, initial_available_tokens, rate_ns);
let initial_tat_ns = initial_tat_ns(burst_tokens, initial_available_tokens, ns_per_token);
Self {
initial_available_tokens,
tat_ns: AtomicU64::new(initial_tat_ns),
last_tick_time_ns: AtomicU64::new(0),
}
}
#[inline]
pub(super) fn with_initial_tat(_initial_available_tokens: u64, initial_tat_ns: u64) -> Self {
Self {
#[cfg(test)]
initial_available_tokens: _initial_available_tokens,
tat_ns: AtomicU64::new(initial_tat_ns),
last_tick_time_ns: AtomicU64::new(0),
}
}
#[inline]
#[cfg(test)]
pub(super) fn consume(&self, policy: &QuotaPolicy, now_ns: u64, cost: u64) -> QuotaResult {
let rate_ns = policy.refill_rate_ns();
let ns_per_token = ns_per_token(rate_ns);
let token_ns = duration_ns(1.0, ns_per_token);
let burst_tokens = burst_tokens(policy, self.initial_available_tokens, rate_ns);
let burst_ns = duration_ns(burst_tokens, ns_per_token);
let burst_after_one_ns = burst_ns.saturating_sub(token_ns);
self.consume_with_gcra(
now_ns,
cost,
rate_ns > 0.0,
policy.refill_interval_ns(),
ns_per_token,
token_ns,
burst_tokens,
burst_ns,
burst_after_one_ns,
)
}
#[inline]
pub(super) fn consume_with_gcra(
&self,
now_ns: u64,
cost: u64,
refill_enabled: bool,
refill_interval_ns: u64,
ns_per_token: f64,
token_ns: u64,
burst_tokens: f64,
burst_ns: u64,
burst_after_one_ns: u64,
) -> QuotaResult {
let effective_now_ns = if refill_enabled {
self.effective_now(refill_interval_ns, now_ns)
} else {
0
};
consume_tat(
&self.tat_ns,
effective_now_ns,
cost,
ns_per_token,
token_ns,
burst_tokens,
burst_ns,
burst_after_one_ns,
)
}
#[inline]
pub(super) fn check_with_gcra(
&self,
now_ns: u64,
cost: u64,
refill_enabled: bool,
refill_interval_ns: u64,
ns_per_token: f64,
token_ns: u64,
burst_ns: u64,
burst_after_one_ns: u64,
) -> bool {
if cost == 0 {
return true;
}
let effective_now_ns = if refill_enabled {
self.effective_now(refill_interval_ns, now_ns)
} else {
0
};
check_tat(
&self.tat_ns,
effective_now_ns,
cost,
ns_per_token,
token_ns,
burst_ns,
burst_after_one_ns,
)
}
#[inline]
pub(super) fn check_one_with_gcra(
&self,
now_ns: u64,
refill_enabled: bool,
refill_interval_ns: u64,
token_ns: u64,
burst_after_one_ns: u64,
) -> bool {
let effective_now_ns = if refill_enabled {
self.effective_now(refill_interval_ns, now_ns)
} else {
0
};
check_one_tat(&self.tat_ns, effective_now_ns, token_ns, burst_after_one_ns)
}
#[inline]
fn effective_now(&self, refill_interval_ns: u64, now_ns: u64) -> u64 {
if refill_interval_ns == 0 {
return now_ns;
}
let mut observed_tick_ns = self.last_tick_time_ns.load(Ordering::Relaxed);
loop {
let elapsed_ns = now_ns.saturating_sub(observed_tick_ns);
if elapsed_ns < refill_interval_ns {
return observed_tick_ns;
}
let elapsed_ticks = elapsed_ns / refill_interval_ns;
let next_tick_ns =
observed_tick_ns.saturating_add(elapsed_ticks.saturating_mul(refill_interval_ns));
match self.last_tick_time_ns.compare_exchange_weak(
observed_tick_ns,
next_tick_ns,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return next_tick_ns,
Err(actual) => observed_tick_ns = actual,
}
}
}
}