use parking_lot::{Mutex, RwLock};
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::Duration;
use crate::types::Time;
#[derive(Clone)]
pub struct RateLimitPolicy {
pub name: String,
pub rate: u32,
pub period: Duration,
pub burst: u32,
pub wait_strategy: WaitStrategy,
pub default_cost: u32,
pub algorithm: RateLimitAlgorithm,
}
#[derive(Clone, Debug, Default)]
pub enum WaitStrategy {
#[default]
Block,
Reject,
BlockWithTimeout(Duration),
}
#[derive(Clone, Debug, Default)]
pub enum RateLimitAlgorithm {
#[default]
TokenBucket,
SlidingWindowLog {
window_size: Duration,
},
FixedWindow,
}
impl fmt::Debug for RateLimitPolicy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RateLimitPolicy")
.field("name", &self.name)
.field("rate", &self.rate)
.field("period", &self.period)
.field("burst", &self.burst)
.field("wait_strategy", &self.wait_strategy)
.field("default_cost", &self.default_cost)
.field("algorithm", &self.algorithm)
.finish()
}
}
impl Default for RateLimitPolicy {
fn default() -> Self {
Self {
name: "default".into(),
rate: 100,
period: Duration::from_secs(1),
burst: 10,
wait_strategy: WaitStrategy::default(),
default_cost: 1,
algorithm: RateLimitAlgorithm::default(),
}
}
}
impl RateLimitPolicy {
#[must_use]
pub const fn rate(mut self, rate: u32) -> Self {
self.rate = rate;
self
}
#[must_use]
pub const fn burst(mut self, burst: u32) -> Self {
self.burst = burst;
self
}
}
#[derive(Clone, Debug, Default)]
pub struct RateLimitMetrics {
pub available_tokens: u32,
pub total_allowed: u64,
pub total_rejected: u64,
pub total_waited: u64,
pub total_wait_time: Duration,
pub avg_wait_time: Duration,
pub max_wait_time: Duration,
pub current_rate: u32,
pub next_token_available: Option<Duration>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RejectionReason {
Timeout,
}
type QueueEntryResult = Option<Result<(), RejectionReason>>;
const IMMEDIATE_ACQUIRE_SENTINEL: u64 = u64::MAX;
#[derive(Debug)]
struct QueueEntry {
id: u64,
cost: u32,
enqueued_at_millis: u64,
deadline_millis: u64,
result: QueueEntryResult,
}
#[inline]
fn duration_to_millis_saturating(duration: Duration) -> u64 {
duration.as_millis().min(u128::from(u64::MAX)) as u64
}
struct BucketState {
tokens: u32,
fractional: u64,
last_refill: u64,
}
pub struct RateLimiter {
policy: RateLimitPolicy,
state: Mutex<BucketState>,
wait_queue: RwLock<VecDeque<QueueEntry>>,
pending_queue_count: AtomicU32,
next_id: AtomicU64,
total_allowed: AtomicU64,
total_rejected: AtomicU64,
total_waited: AtomicU64,
total_wait_time_ms: AtomicU64,
max_wait_time_ms: AtomicU64,
}
struct TokenRefundGuard<'a> {
limiter: &'a RateLimiter,
cost: u32,
refund: bool,
}
impl Drop for TokenRefundGuard<'_> {
fn drop(&mut self) {
if self.refund {
self.limiter.refund_tokens(self.cost);
}
}
}
impl RateLimiter {
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn new(policy: RateLimitPolicy) -> Self {
let burst = policy.burst;
Self {
policy,
state: Mutex::new(BucketState {
tokens: burst,
fractional: 0,
last_refill: 0,
}),
wait_queue: RwLock::new(VecDeque::with_capacity(16)),
pending_queue_count: AtomicU32::new(0),
next_id: AtomicU64::new(0),
total_allowed: AtomicU64::new(0),
total_rejected: AtomicU64::new(0),
total_waited: AtomicU64::new(0),
total_wait_time_ms: AtomicU64::new(0),
max_wait_time_ms: AtomicU64::new(0),
}
}
#[must_use]
pub fn name(&self) -> &str {
&self.policy.name
}
#[must_use]
pub fn policy(&self) -> &RateLimitPolicy {
&self.policy
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn metrics(&self) -> RateLimitMetrics {
let available_tokens = {
let state = self.state.lock();
state.tokens
};
let total_waited = self.total_waited.load(Ordering::Relaxed);
let total_wait_time_ms = self.total_wait_time_ms.load(Ordering::Relaxed);
let max_wait_time_ms = self.max_wait_time_ms.load(Ordering::Relaxed);
let avg_wait_time = total_wait_time_ms
.checked_div(total_waited)
.map_or(Duration::ZERO, Duration::from_millis);
RateLimitMetrics {
available_tokens,
total_allowed: self.total_allowed.load(Ordering::Relaxed),
total_rejected: self.total_rejected.load(Ordering::Relaxed),
total_waited,
total_wait_time: Duration::from_millis(total_wait_time_ms),
avg_wait_time,
max_wait_time: Duration::from_millis(max_wait_time_ms),
current_rate: 0,
next_token_available: None,
}
}
#[allow(clippy::cast_precision_loss, clippy::cast_sign_loss)]
fn refill_inner(&self, state: &mut BucketState, now_millis: u64) {
if now_millis <= state.last_refill {
return;
}
let elapsed_ms = now_millis - state.last_refill;
let period_ms = duration_to_millis_saturating(self.policy.period);
if period_ms > 0 && self.policy.rate > 0 {
let added_fractional = u128::from(elapsed_ms) * u128::from(self.policy.rate);
let total_fractional = u128::from(state.fractional) + added_fractional;
let new_tokens =
(total_fractional / u128::from(period_ms)).min(u128::from(u64::MAX)) as u64;
let new_fractional = (total_fractional % u128::from(period_ms)) as u64;
state.tokens =
(u64::from(state.tokens) + new_tokens).min(u64::from(self.policy.burst)) as u32;
state.fractional = new_fractional;
if state.tokens == self.policy.burst {
state.fractional = 0;
}
}
state.last_refill = now_millis;
}
pub fn refill(&self, now: Time) {
let mut state = self.state.lock();
self.refill_inner(&mut state, now.as_millis());
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn try_acquire(&self, cost: u32, now: Time) -> bool {
if self.pending_queue_count.load(Ordering::Relaxed) > 0 {
let _ = self.process_queue(now);
if self.pending_queue_count.load(Ordering::Relaxed) > 0 {
return false;
}
}
let mut state = self.state.lock();
let now_millis = now.as_millis();
self.refill_inner(&mut state, now_millis);
if state.tokens >= cost {
state.tokens -= cost;
drop(state);
self.total_allowed.fetch_add(1, Ordering::Relaxed);
true
} else {
false
}
}
fn allocate_entry_id(&self, queue: &VecDeque<QueueEntry>) -> Result<u64, RateLimitError<()>> {
let mut current = self.next_id.load(Ordering::Relaxed);
loop {
let mut candidate = current;
while candidate != IMMEDIATE_ACQUIRE_SENTINEL
&& queue.iter().any(|entry| entry.id == candidate)
{
candidate = candidate.saturating_add(1);
}
if candidate == IMMEDIATE_ACQUIRE_SENTINEL {
return Err(RateLimitError::QueueIdExhausted);
}
let next = candidate.saturating_add(1);
match self.next_id.compare_exchange_weak(
current,
next,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return Ok(candidate),
Err(observed) => current = observed,
}
}
}
fn refund_tokens(&self, cost: u32) {
if cost == 0 {
return;
}
let mut state = self.state.lock();
state.tokens = state.tokens.saturating_add(cost).min(self.policy.burst);
}
#[must_use]
pub fn try_acquire_default(&self, now: Time) -> bool {
self.try_acquire(self.policy.default_cost, now)
}
pub fn call<T, E, F>(&self, now: Time, op: F) -> Result<T, RateLimitError<E>>
where
F: FnOnce() -> Result<T, E>,
{
self.call_weighted(now, self.policy.default_cost, op)
}
pub fn call_weighted<T, E, F>(
&self,
now: Time,
cost: u32,
op: F,
) -> Result<T, RateLimitError<E>>
where
F: FnOnce() -> Result<T, E>,
{
if !self.try_acquire(cost, now) {
self.total_rejected.fetch_add(1, Ordering::Relaxed);
return Err(RateLimitError::RateLimitExceeded);
}
let mut refund_guard = TokenRefundGuard {
limiter: self,
cost,
refund: true,
};
match catch_unwind(AssertUnwindSafe(op)) {
Ok(Ok(v)) => {
refund_guard.refund = false;
Ok(v)
}
Ok(Err(e)) => {
refund_guard.refund = false;
Err(RateLimitError::Inner(e))
}
Err(panic) => resume_unwind(panic),
}
}
#[must_use]
#[allow(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)]
pub fn time_until_available(&self, cost: u32, now: Time) -> Duration {
if cost > self.policy.burst {
return Duration::MAX;
}
let (current_tokens, current_fractional) = {
let mut state = self.state.lock();
self.refill_inner(&mut state, now.as_millis());
(state.tokens, state.fractional)
};
if current_tokens >= cost {
return Duration::ZERO;
}
if self.policy.rate == 0 || self.policy.period.as_millis() == 0 {
return Duration::MAX; }
let period_ms = duration_to_millis_saturating(self.policy.period);
let tokens_needed = cost - current_tokens;
let fractional_needed = u128::from(tokens_needed) * u128::from(period_ms);
let additional_fractional =
fractional_needed.saturating_sub(u128::from(current_fractional));
let rate = u128::from(self.policy.rate);
let ms_needed = additional_fractional
.div_ceil(rate)
.min(u128::from(u64::MAX)) as u64;
Duration::from_millis(ms_needed)
}
#[must_use]
pub fn retry_after(&self, cost: u32, now: Time) -> Duration {
self.time_until_available(cost, now)
}
#[must_use]
pub fn retry_after_default(&self, now: Time) -> Duration {
self.retry_after(self.policy.default_cost, now)
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn available_tokens(&self) -> u32 {
let state = self.state.lock();
state.tokens
}
#[allow(clippy::cast_precision_loss, clippy::significant_drop_tightening)]
pub fn enqueue(&self, cost: u32, now: Time) -> Result<u64, RateLimitError<()>> {
if self.try_acquire(cost, now) {
return Ok(IMMEDIATE_ACQUIRE_SENTINEL); }
if cost > self.policy.burst {
self.total_rejected.fetch_add(1, Ordering::Relaxed);
return Err(RateLimitError::RateLimitExceeded);
}
match &self.policy.wait_strategy {
WaitStrategy::Reject => {
self.total_rejected.fetch_add(1, Ordering::Relaxed);
return Err(RateLimitError::RateLimitExceeded);
}
WaitStrategy::Block | WaitStrategy::BlockWithTimeout(_) => {
}
}
let now_millis = now.as_millis();
let deadline_millis = match &self.policy.wait_strategy {
WaitStrategy::BlockWithTimeout(timeout) => {
let timeout_millis = duration_to_millis_saturating(*timeout);
now_millis.saturating_add(timeout_millis)
}
_ => u64::MAX,
};
let mut queue = self.wait_queue.write();
let entry_id = self.allocate_entry_id(&queue)?;
queue.push_back(QueueEntry {
id: entry_id,
cost,
enqueued_at_millis: now_millis,
deadline_millis,
result: None,
});
self.pending_queue_count.fetch_add(1, Ordering::Relaxed);
self.total_waited.fetch_add(1, Ordering::Relaxed);
Ok(entry_id)
}
#[allow(clippy::cast_precision_loss, clippy::significant_drop_tightening)]
pub fn process_queue(&self, now: Time) -> Option<u64> {
let now_millis = now.as_millis();
let mut queue = self.wait_queue.write();
let mut state = self.state.lock();
self.refill_inner(&mut state, now_millis);
let mut timeout_count = 0u64;
let mut timeout_wait_ms = 0u64;
let mut max_timeout_wait_ms = 0u64;
let mut fifo_blocked = false;
let mut first_granted = None;
let mut granted_count = 0u64;
let mut acc_wait_time = Duration::ZERO;
let mut max_wait_time = Duration::ZERO;
for entry in queue.iter_mut() {
if entry.result.is_some() {
continue;
}
match now_millis.cmp(&entry.deadline_millis) {
std::cmp::Ordering::Greater => {
entry.result = Some(Err(RejectionReason::Timeout));
timeout_count += 1;
let wait = now_millis.saturating_sub(entry.enqueued_at_millis);
timeout_wait_ms = timeout_wait_ms.saturating_add(wait);
if wait > max_timeout_wait_ms {
max_timeout_wait_ms = wait;
}
}
std::cmp::Ordering::Equal => {
if !fifo_blocked && state.tokens >= entry.cost {
state.tokens -= entry.cost;
entry.result = Some(Ok(()));
self.pending_queue_count.fetch_sub(1, Ordering::Relaxed);
if first_granted.is_none() {
first_granted = Some(entry.id);
}
let wait_ms = now_millis.saturating_sub(entry.enqueued_at_millis);
let wait_duration = Duration::from_millis(wait_ms);
acc_wait_time += wait_duration;
if wait_duration > max_wait_time {
max_wait_time = wait_duration;
}
granted_count += 1;
} else {
entry.result = Some(Err(RejectionReason::Timeout));
timeout_count += 1;
let wait = now_millis.saturating_sub(entry.enqueued_at_millis);
timeout_wait_ms = timeout_wait_ms.saturating_add(wait);
if wait > max_timeout_wait_ms {
max_timeout_wait_ms = wait;
}
}
}
std::cmp::Ordering::Less => {
if !fifo_blocked && state.tokens >= entry.cost {
state.tokens -= entry.cost;
entry.result = Some(Ok(()));
self.pending_queue_count.fetch_sub(1, Ordering::Relaxed);
if first_granted.is_none() {
first_granted = Some(entry.id);
}
let wait_ms = now_millis.saturating_sub(entry.enqueued_at_millis);
let wait_duration = Duration::from_millis(wait_ms);
acc_wait_time += wait_duration;
if wait_duration > max_wait_time {
max_wait_time = wait_duration;
}
granted_count += 1;
} else {
fifo_blocked = true;
}
}
}
}
if timeout_count > 0 {
#[allow(clippy::cast_possible_truncation)]
self.pending_queue_count
.fetch_sub(timeout_count as u32, Ordering::Relaxed);
self.total_wait_time_ms
.fetch_add(timeout_wait_ms, Ordering::Relaxed);
self.max_wait_time_ms
.fetch_max(max_timeout_wait_ms, Ordering::Relaxed);
}
if granted_count > 0 {
self.total_allowed
.fetch_add(granted_count, Ordering::Relaxed);
let wait_ms = duration_to_millis_saturating(acc_wait_time);
self.total_wait_time_ms
.fetch_add(wait_ms, Ordering::Relaxed);
let new_max_ms = duration_to_millis_saturating(max_wait_time);
self.max_wait_time_ms
.fetch_max(new_max_ms, Ordering::Relaxed);
}
first_granted
}
#[allow(clippy::option_if_let_else, clippy::significant_drop_tightening)]
pub fn check_entry(&self, entry_id: u64, now: Time) -> Result<bool, RateLimitError<()>> {
if entry_id == IMMEDIATE_ACQUIRE_SENTINEL {
return Ok(true);
}
let _ = self.process_queue(now);
let mut queue = self.wait_queue.write();
let entry_idx = queue.iter().position(|e| e.id == entry_id);
if let Some(idx) = entry_idx {
match queue[idx].result {
Some(Ok(())) => {
queue.remove(idx);
Ok(true)
}
Some(Err(RejectionReason::Timeout)) => {
let entry = queue.remove(idx).expect("must exist");
let wait_ms = now.as_millis().saturating_sub(entry.enqueued_at_millis);
Err(RateLimitError::Timeout {
waited: Duration::from_millis(wait_ms),
})
}
None => Ok(false),
}
} else {
Err(RateLimitError::Cancelled)
}
}
pub fn cancel_entry(&self, entry_id: u64, now: Time) {
if entry_id == IMMEDIATE_ACQUIRE_SENTINEL {
return; }
let mut queue = self.wait_queue.write();
if let Some(idx) = queue.iter().position(|e| e.id == entry_id) {
let entry = queue.remove(idx).expect("must exist");
let previous_result = entry.result;
let cost = entry.cost;
let enqueued_at_millis = entry.enqueued_at_millis;
drop(queue);
if previous_result == Some(Ok(())) {
let mut state = self.state.lock();
state.tokens = state.tokens.saturating_add(cost).min(self.policy.burst);
drop(state);
let _ = self.process_queue(now);
} else if previous_result.is_none() {
self.pending_queue_count.fetch_sub(1, Ordering::Relaxed);
let wait_ms = now.as_millis().saturating_sub(enqueued_at_millis);
self.total_wait_time_ms
.fetch_add(wait_ms, Ordering::Relaxed);
self.max_wait_time_ms.fetch_max(wait_ms, Ordering::Relaxed);
}
}
}
pub fn reset(&self) {
let initial_tokens = self.policy.burst;
{
let mut state = self.state.lock();
state.tokens = initial_tokens;
state.fractional = 0;
state.last_refill = 0;
}
self.wait_queue.write().clear();
self.pending_queue_count.store(0, Ordering::Relaxed);
}
}
impl fmt::Debug for RateLimiter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RateLimiter")
.field("name", &self.policy.name)
.field("available_tokens", &self.available_tokens())
.field("rate", &self.policy.rate)
.field("burst", &self.policy.burst)
.finish_non_exhaustive()
}
}
pub struct SlidingWindowRateLimiter {
policy: RateLimitPolicy,
window: RwLock<VecDeque<(u64, u32)>>,
window_cost: AtomicU32,
total_allowed: AtomicU64,
total_rejected: AtomicU64,
}
impl SlidingWindowRateLimiter {
#[must_use]
pub fn new(policy: RateLimitPolicy) -> Self {
let window_capacity = usize::try_from(policy.rate.max(policy.burst))
.unwrap_or(usize::MAX)
.max(1);
Self {
policy,
window: RwLock::new(VecDeque::with_capacity(window_capacity)),
window_cost: AtomicU32::new(0),
total_allowed: AtomicU64::new(0),
total_rejected: AtomicU64::new(0),
}
}
#[must_use]
pub fn name(&self) -> &str {
&self.policy.name
}
#[must_use]
#[allow(clippy::significant_drop_tightening, clippy::cast_possible_truncation)]
pub fn try_acquire(&self, cost: u32, now: Time) -> bool {
let now_millis = now.as_millis();
let period_millis = duration_to_millis_saturating(self.policy.period);
let mut window = self.window.write();
while let Some((t, c)) = window.front() {
if period_millis > 0 && now_millis.saturating_sub(*t) >= period_millis {
let evicted_cost = *c;
window.pop_front();
self.window_cost.fetch_sub(evicted_cost, Ordering::Relaxed);
} else {
break;
}
}
let usage = self.window_cost.load(Ordering::Relaxed);
if usage.saturating_add(cost) <= self.policy.rate {
if cost > 0 {
window.push_back((now_millis, cost));
self.window_cost.fetch_add(cost, Ordering::Relaxed);
}
drop(window);
self.total_allowed.fetch_add(1, Ordering::Relaxed);
true
} else {
drop(window);
self.total_rejected.fetch_add(1, Ordering::Relaxed);
false
}
}
#[must_use]
pub fn try_acquire_default(&self, now: Time) -> bool {
self.try_acquire(self.policy.default_cost, now)
}
#[must_use]
#[allow(clippy::cast_possible_truncation, clippy::significant_drop_tightening)]
pub fn time_until_available(&self, cost: u32, now: Time) -> Duration {
let now_millis = now.as_millis();
let period_millis = duration_to_millis_saturating(self.policy.period);
let mut window = self.window.write();
while let Some((t, c)) = window.front() {
if period_millis > 0 && now_millis.saturating_sub(*t) >= period_millis {
let evicted_cost = *c;
window.pop_front();
self.window_cost.fetch_sub(evicted_cost, Ordering::Relaxed);
} else {
break;
}
}
let usage = self.window_cost.load(Ordering::Relaxed);
if usage.saturating_add(cost) <= self.policy.rate {
return Duration::ZERO;
}
if period_millis == 0 {
return Duration::MAX;
}
let needed = usage.saturating_add(cost).saturating_sub(self.policy.rate);
let mut freed = 0u32;
for (t, c) in window.iter() {
freed += c;
if freed >= needed {
let expire_at = t.saturating_add(period_millis);
return Duration::from_millis(expire_at.saturating_sub(now_millis));
}
}
Duration::MAX
}
#[must_use]
pub fn retry_after(&self, cost: u32, now: Time) -> Duration {
self.time_until_available(cost, now)
}
#[must_use]
pub fn metrics(&self) -> RateLimitMetrics {
RateLimitMetrics {
total_allowed: self.total_allowed.load(Ordering::Relaxed),
total_rejected: self.total_rejected.load(Ordering::Relaxed),
..RateLimitMetrics::default()
}
}
pub fn reset(&self) {
let mut window = self.window.write();
window.clear();
self.window_cost.store(0, Ordering::Relaxed);
drop(window);
}
}
impl fmt::Debug for SlidingWindowRateLimiter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SlidingWindowRateLimiter")
.field("name", &self.policy.name)
.field("rate", &self.policy.rate)
.field("period", &self.policy.period)
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RateLimitError<E> {
RateLimitExceeded,
QueueIdExhausted,
Timeout {
waited: Duration,
},
Cancelled,
Inner(E),
}
impl<E: fmt::Display> fmt::Display for RateLimitError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::RateLimitExceeded => write!(f, "rate limit exceeded"),
Self::QueueIdExhausted => write!(f, "rate limit queue ID space exhausted"),
Self::Timeout { waited } => write!(f, "rate limit timeout after {waited:?}"),
Self::Cancelled => write!(f, "cancelled while waiting for rate limit"),
Self::Inner(e) => write!(f, "{e}"),
}
}
}
impl<E: fmt::Debug + fmt::Display> std::error::Error for RateLimitError<E> {}
#[derive(Default)]
pub struct RateLimitPolicyBuilder {
policy: RateLimitPolicy,
}
impl RateLimitPolicyBuilder {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn name(mut self, name: impl Into<String>) -> Self {
self.policy.name = name.into();
self
}
#[must_use]
pub const fn rate(mut self, rate: u32) -> Self {
self.policy.rate = rate;
self
}
#[must_use]
pub const fn period(mut self, period: Duration) -> Self {
self.policy.period = period;
self
}
#[must_use]
pub const fn burst(mut self, burst: u32) -> Self {
self.policy.burst = burst;
self
}
#[must_use]
pub fn wait_strategy(mut self, strategy: WaitStrategy) -> Self {
self.policy.wait_strategy = strategy;
self
}
#[must_use]
pub const fn default_cost(mut self, cost: u32) -> Self {
self.policy.default_cost = cost;
self
}
#[must_use]
pub fn algorithm(mut self, algorithm: RateLimitAlgorithm) -> Self {
self.policy.algorithm = algorithm;
self
}
#[must_use]
pub fn build(self) -> RateLimitPolicy {
self.policy
}
}
pub struct RateLimiterRegistry {
limiters: RwLock<HashMap<String, Arc<RateLimiter>>>,
default_policy: RateLimitPolicy,
}
impl RateLimiterRegistry {
#[must_use]
pub fn new(default_policy: RateLimitPolicy) -> Self {
Self {
limiters: RwLock::new(HashMap::with_capacity(8)),
default_policy,
}
}
pub fn get_or_create(&self, name: &str) -> Arc<RateLimiter> {
{
let limiters = self.limiters.read();
if let Some(l) = limiters.get(name) {
return l.clone();
}
}
let mut limiters = self.limiters.write();
limiters
.entry(name.to_string())
.or_insert_with(|| {
Arc::new(RateLimiter::new(RateLimitPolicy {
name: name.to_string(),
..self.default_policy.clone()
}))
})
.clone()
}
pub fn get_or_create_with(&self, name: &str, policy: RateLimitPolicy) -> Arc<RateLimiter> {
let mut limiters = self.limiters.write();
limiters
.entry(name.to_string())
.or_insert_with(|| Arc::new(RateLimiter::new(policy)))
.clone()
}
#[must_use]
pub fn all_metrics(&self) -> HashMap<String, RateLimitMetrics> {
let limiters = self.limiters.read();
limiters
.iter()
.map(|(name, l)| (name.clone(), l.metrics()))
.collect()
}
pub fn remove(&self, name: &str) -> Option<Arc<RateLimiter>> {
let mut limiters = self.limiters.write();
limiters.remove(name)
}
}
impl fmt::Debug for RateLimiterRegistry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let limiters = self.limiters.read();
f.debug_struct("RateLimiterRegistry")
.field("count", &limiters.len())
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_limiter_has_burst_tokens() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 10,
burst: 5,
..Default::default()
});
let tokens = rl.available_tokens();
assert_eq!(tokens, 5);
}
#[test]
fn acquire_reduces_tokens() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 10,
burst: 10,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(3, now));
let tokens = rl.available_tokens();
assert_eq!(tokens, 7);
}
#[test]
fn acquire_fails_when_insufficient_tokens() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 10,
burst: 5,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(5, now));
assert!(!rl.try_acquire(1, now));
}
#[test]
fn tokens_refill_over_time() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 10, period: Duration::from_secs(1),
burst: 10,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(10, now));
assert!(!rl.try_acquire(1, now));
let later = Time::from_millis(100);
rl.refill(later);
let tokens = rl.available_tokens();
assert_eq!(
tokens, 1,
"Expected 1 token after 100ms refill, got {tokens}"
);
}
#[test]
fn tokens_cap_at_burst() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 100,
period: Duration::from_secs(1),
burst: 10,
..Default::default()
});
let now = Time::from_millis(0);
rl.refill(now);
let later = Time::from_millis(10_000);
rl.refill(later);
assert_eq!(rl.available_tokens(), 10);
}
#[test]
fn zero_cost_always_succeeds() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 10,
burst: 0, ..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(0, now));
}
#[test]
fn reset_clears_pending_queue_state() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
burst: 1,
wait_strategy: WaitStrategy::Block,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now), "first token should be available");
let entry_id = rl.enqueue(1, now).expect("second request should enqueue");
assert_ne!(entry_id, u64::MAX, "enqueued entries use real IDs");
assert_eq!(
rl.pending_queue_count
.load(std::sync::atomic::Ordering::Relaxed),
1
);
assert_eq!(rl.wait_queue.read().len(), 1);
rl.reset();
assert_eq!(
rl.pending_queue_count
.load(std::sync::atomic::Ordering::Relaxed),
0,
"reset must clear pending queue count"
);
assert!(
rl.wait_queue.read().is_empty(),
"reset must clear wait queue"
);
assert!(
rl.available_tokens() == 1,
"reset must restore full burst capacity"
);
}
#[test]
fn reset_clears_fractional_accumulator() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
period: Duration::from_secs(10),
burst: 10,
..Default::default()
});
let t0 = Time::from_millis(0);
assert!(rl.try_acquire(10, t0));
let t1 = Time::from_millis(5_000);
rl.refill(t1);
assert_eq!(
rl.available_tokens(),
0,
"half-period yields no whole token"
);
rl.reset();
assert_eq!(rl.available_tokens(), 10);
let t2 = Time::from_millis(100_000);
assert!(rl.try_acquire(10, t2));
let t3 = Time::from_millis(110_000);
rl.refill(t3);
assert_eq!(
rl.available_tokens(),
1,
"exactly one period after reset+drain must yield exactly 1 token"
);
}
#[test]
fn reject_strategy_fails_immediately() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
burst: 1,
wait_strategy: WaitStrategy::Reject,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now));
assert!(!rl.try_acquire(1, now));
}
#[test]
fn enqueue_with_reject_strategy_returns_error() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
burst: 1,
wait_strategy: WaitStrategy::Reject,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now));
let result = rl.enqueue(1, now);
assert!(matches!(result, Err(RateLimitError::RateLimitExceeded)));
}
#[test]
fn weighted_operations_consume_multiple_tokens() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 100,
burst: 10,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(5, now));
assert_eq!(rl.available_tokens(), 5);
assert!(rl.try_acquire(5, now));
assert_eq!(rl.available_tokens(), 0);
assert!(!rl.try_acquire(1, now));
}
#[test]
fn call_panic_restores_consumed_tokens() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
burst: 1,
period: Duration::from_secs(60),
..Default::default()
});
let now = Time::from_millis(0);
let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = rl.call(now, || -> Result<(), &'static str> {
panic!("intentional rate-limit call panic")
});
}));
assert!(panic.is_err(), "inner operation should panic");
assert_eq!(
rl.available_tokens(),
1,
"panic path must refund the consumed token"
);
let result = rl.call(now, || Ok::<u32, &'static str>(7));
assert_eq!(result.unwrap(), 7);
}
#[test]
fn time_until_available_when_empty() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 10, period: Duration::from_secs(1),
burst: 10,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(10, now));
let wait = rl.time_until_available(1, now);
assert!(
wait.as_millis() >= 90 && wait.as_millis() <= 110,
"Expected ~100ms, got {wait:?}"
);
}
#[test]
fn time_until_available_zero_when_sufficient() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 100,
burst: 10,
..Default::default()
});
let now = Time::from_millis(0);
let wait = rl.time_until_available(5, now);
assert_eq!(wait, Duration::ZERO);
}
#[test]
fn retry_after_uses_provided_time() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 10,
period: Duration::from_secs(1),
burst: 10,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(10, now));
let retry = rl.retry_after(1, now);
assert!(retry.as_millis() >= 90 && retry.as_millis() <= 110);
let later = Time::from_millis(50);
let retry_later = rl.retry_after(1, later);
assert!(retry_later < retry);
}
#[test]
fn metrics_initial_values() {
let rl = RateLimiter::new(RateLimitPolicy {
name: "test".into(),
rate: 100,
burst: 10,
..Default::default()
});
let m = rl.metrics();
assert_eq!(m.total_allowed, 0);
assert_eq!(m.total_rejected, 0);
assert_eq!(m.total_waited, 0);
assert_eq!(m.total_wait_time, Duration::ZERO);
assert_eq!(m.max_wait_time, Duration::ZERO);
}
#[test]
fn metrics_track_allowed() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 100,
burst: 10,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now));
assert!(rl.try_acquire(1, now));
assert!(rl.try_acquire(1, now));
assert_eq!(rl.metrics().total_allowed, 3);
}
#[test]
fn enqueue_immediate_acquisition_returns_sentinel() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 100,
burst: 10,
wait_strategy: WaitStrategy::Block,
..Default::default()
});
let now = Time::from_millis(0);
let result = rl.enqueue(1, now);
assert_eq!(result, Ok(IMMEDIATE_ACQUIRE_SENTINEL));
}
#[test]
fn enqueue_adds_to_queue_when_exhausted() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
period: Duration::from_secs(1),
burst: 1,
wait_strategy: WaitStrategy::Block,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now));
let result = rl.enqueue(1, now);
assert!(result.is_ok());
assert_ne!(result.unwrap(), IMMEDIATE_ACQUIRE_SENTINEL);
}
#[test]
fn queued_entry_ids_fail_closed_when_id_space_exhausts() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
period: Duration::from_secs(1),
burst: 1,
wait_strategy: WaitStrategy::Block,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now), "first token should be consumed");
let first_live_id = rl
.enqueue(1, now)
.expect("first queued entry should be accepted");
assert_eq!(first_live_id, 0);
let last_real_id = rl
.enqueue(1, now)
.expect("queue entry before wrap should be accepted");
assert_eq!(last_real_id, 1);
rl.next_id
.store(IMMEDIATE_ACQUIRE_SENTINEL - 1, Ordering::Relaxed);
let edge_id = rl
.enqueue(1, now)
.expect("queue entry at wrap edge should be accepted");
assert_eq!(edge_id, IMMEDIATE_ACQUIRE_SENTINEL - 1);
let exhausted = rl.enqueue(1, now);
assert_eq!(
exhausted,
Err(RateLimitError::QueueIdExhausted),
"allocator must fail closed instead of reusing queue IDs after exhaustion"
);
assert_ne!(edge_id, IMMEDIATE_ACQUIRE_SENTINEL);
assert_ne!(edge_id, first_live_id);
assert_ne!(edge_id, last_real_id);
}
#[test]
fn stale_queue_handle_cannot_alias_later_waiter_after_exhaustion() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
period: Duration::from_secs(1),
burst: 1,
wait_strategy: WaitStrategy::Block,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now), "first token should be consumed");
let stale_id = rl
.enqueue(1, now)
.expect("first queued entry should be accepted");
assert_eq!(stale_id, 0);
rl.cancel_entry(stale_id, now);
rl.next_id
.store(IMMEDIATE_ACQUIRE_SENTINEL - 1, Ordering::Relaxed);
let live_id = rl
.enqueue(1, now)
.expect("last queue ID before exhaustion should still be usable");
assert_eq!(live_id, IMMEDIATE_ACQUIRE_SENTINEL - 1);
assert_eq!(
rl.enqueue(1, now),
Err(RateLimitError::QueueIdExhausted),
"new waiters must not reuse a stale external handle after exhaustion"
);
rl.cancel_entry(stale_id, now);
assert!(
matches!(rl.check_entry(live_id, now), Ok(false)),
"stale handle cancellation must not affect the later waiter"
);
}
#[test]
fn process_queue_grants_when_tokens_available() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 10, period: Duration::from_secs(1),
burst: 1,
wait_strategy: WaitStrategy::Block,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now));
let entry_id = rl.enqueue(1, now).unwrap();
assert!(rl.process_queue(now).is_none());
let later = Time::from_millis(100);
let granted = rl.process_queue(later);
assert_eq!(granted, Some(entry_id));
}
#[test]
fn check_entry_returns_granted() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 10,
period: Duration::from_secs(1),
burst: 1,
wait_strategy: WaitStrategy::Block,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now));
let entry_id = rl.enqueue(1, now).unwrap();
let result = rl.check_entry(entry_id, now);
assert!(matches!(result, Ok(false)));
let later = Time::from_millis(100);
let result = rl.check_entry(entry_id, later);
assert!(matches!(result, Ok(true)));
}
#[test]
fn check_entry_timeout() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
period: Duration::from_secs(60), burst: 1,
wait_strategy: WaitStrategy::BlockWithTimeout(Duration::from_millis(100)),
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now));
let entry_id = rl.enqueue(1, now).unwrap();
let later = Time::from_millis(200);
let result = rl.check_entry(entry_id, later);
assert!(matches!(result, Err(RateLimitError::Timeout { .. })));
}
#[test]
fn check_entry_grants_when_tokens_refill_exactly_at_timeout_boundary() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
period: Duration::from_millis(100),
burst: 1,
wait_strategy: WaitStrategy::BlockWithTimeout(Duration::from_millis(100)),
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now));
let entry_id = rl.enqueue(1, now).unwrap();
let boundary = Time::from_millis(100);
let result = rl.check_entry(entry_id, boundary);
assert!(
matches!(result, Ok(true)),
"entry should be granted when refill lands exactly on the timeout boundary, got {result:?}"
);
}
#[test]
fn cancel_entry_triggers_cancelled_error() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
period: Duration::from_secs(60),
burst: 1,
wait_strategy: WaitStrategy::Block,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now));
let entry_id = rl.enqueue(1, now).unwrap();
rl.cancel_entry(entry_id, now);
let result = rl.check_entry(entry_id, now);
assert!(matches!(result, Err(RateLimitError::Cancelled)));
}
#[test]
fn checked_timeout_behind_granted_is_immediately_removed() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
period: Duration::from_millis(100),
burst: 1,
wait_strategy: WaitStrategy::BlockWithTimeout(Duration::from_millis(150)),
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now));
let id1 = rl.enqueue(1, now).unwrap();
let id2 = rl.enqueue(1, now).unwrap();
let grant_time = Time::from_millis(100);
assert_eq!(rl.process_queue(grant_time), Some(id1));
assert_eq!(rl.wait_queue.read().len(), 2);
let timeout_time = Time::from_millis(200);
let result = rl.check_entry(id2, timeout_time);
assert!(matches!(result, Err(RateLimitError::Timeout { .. })));
assert_eq!(
rl.wait_queue.read().len(),
1,
"timed-out entry is immediately removed; only the unconsumed granted entry remains"
);
let _ = rl.check_entry(id1, timeout_time);
assert_eq!(
rl.wait_queue.read().len(),
0,
"granted entry removed on claim"
);
}
#[test]
fn try_acquire_clears_timed_out_queue_entries_before_rejecting_fast_path() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
period: Duration::from_millis(100),
burst: 1,
wait_strategy: WaitStrategy::BlockWithTimeout(Duration::from_millis(10)),
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now));
let _entry_id = rl.enqueue(1, now).expect("second request should enqueue");
let later = Time::from_millis(200);
assert!(
rl.try_acquire(1, later),
"timed-out queue entries must not permanently block later fast-path acquires"
);
}
#[test]
fn try_acquire_processes_queue_grants_before_evaluating_fast_path() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
period: Duration::from_millis(100),
burst: 1,
wait_strategy: WaitStrategy::Block,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now));
let entry_id = rl.enqueue(1, now).expect("second request should enqueue");
let later = Time::from_millis(100);
assert!(
!rl.try_acquire(1, later),
"queued waiter must be granted before a new fast-path caller can consume refilled tokens"
);
assert!(
matches!(rl.check_entry(entry_id, later), Ok(true)),
"queued waiter should have been granted when try_acquire processed the queue"
);
}
#[test]
fn metamorphic_appending_tail_waiters_preserves_first_fifo_grant() {
let base = RateLimiter::new(RateLimitPolicy {
rate: 1,
period: Duration::from_millis(100),
burst: 1,
wait_strategy: WaitStrategy::Block,
..Default::default()
});
let extended = RateLimiter::new(RateLimitPolicy {
rate: 1,
period: Duration::from_millis(100),
burst: 1,
wait_strategy: WaitStrategy::Block,
..Default::default()
});
let now = Time::from_millis(0);
assert!(base.try_acquire(1, now));
assert!(extended.try_acquire(1, now));
let base_head = base.enqueue(1, now).expect("head waiter should enqueue");
let extended_head = extended
.enqueue(1, now)
.expect("head waiter should enqueue");
let extended_tail = extended
.enqueue(1, now)
.expect("tail waiter should enqueue");
let refill = Time::from_millis(100);
assert_eq!(base.process_queue(refill), Some(base_head));
assert_eq!(extended.process_queue(refill), Some(extended_head));
assert!(
matches!(base.check_entry(base_head, refill), Ok(true)),
"single queued waiter should be granted on first refill"
);
assert!(
matches!(extended.check_entry(extended_head, refill), Ok(true)),
"head waiter must still win first refill after appending tail waiters"
);
assert!(
matches!(extended.check_entry(extended_tail, refill), Ok(false)),
"tail waiter must remain queued until a later refill"
);
}
#[test]
fn sliding_window_enforces_rate() {
let rl = SlidingWindowRateLimiter::new(RateLimitPolicy {
rate: 5,
period: Duration::from_secs(1),
..Default::default()
});
let now = Time::from_millis(0);
for _ in 0..5 {
assert!(rl.try_acquire(1, now));
}
assert!(!rl.try_acquire(1, now));
}
#[test]
fn sliding_window_clears_old_entries() {
let rl = SlidingWindowRateLimiter::new(RateLimitPolicy {
rate: 5,
period: Duration::from_secs(1),
..Default::default()
});
let now = Time::from_millis(0);
for _ in 0..5 {
assert!(rl.try_acquire(1, now));
}
let later = Time::from_millis(1100);
assert!(rl.try_acquire(1, later));
}
#[test]
fn sliding_window_time_until_available() {
let rl = SlidingWindowRateLimiter::new(RateLimitPolicy {
name: "test".into(),
rate: 5,
period: Duration::from_secs(1),
..Default::default()
});
let now = Time::from_millis(0);
for _ in 0..5 {
assert!(rl.try_acquire(1, now));
}
let wait = rl.time_until_available(1, now);
assert!(
wait >= Duration::from_millis(900) && wait <= Duration::from_millis(1100),
"Expected ~1000ms, got {wait:?}"
);
}
#[test]
fn sliding_window_period_overflow_does_not_wrap() {
let overflowing_millis = Duration::new(18_446_744_073_709_551, 616_000_000);
let rl = SlidingWindowRateLimiter::new(RateLimitPolicy {
rate: 1,
period: overflowing_millis,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now));
assert!(!rl.try_acquire(1, Time::from_millis(1)));
}
#[test]
fn registry_creates_named_limiters() {
let registry = RateLimiterRegistry::new(RateLimitPolicy::default());
let l1 = registry.get_or_create("api-a");
let l2 = registry.get_or_create("api-b");
let l3 = registry.get_or_create("api-a");
assert!(Arc::ptr_eq(&l1, &l3));
assert!(!Arc::ptr_eq(&l1, &l2));
}
#[test]
fn registry_uses_provided_name() {
let registry = RateLimiterRegistry::new(RateLimitPolicy::default());
let l = registry.get_or_create("my-api");
assert_eq!(l.name(), "my-api");
}
#[test]
fn registry_custom_policy() {
let registry = RateLimiterRegistry::new(RateLimitPolicy::default());
let l = registry.get_or_create_with(
"custom",
RateLimitPolicy {
rate: 1000,
burst: 500,
..Default::default()
},
);
assert_eq!(l.available_tokens(), 500);
}
#[test]
fn registry_remove() {
let registry = RateLimiterRegistry::new(RateLimitPolicy::default());
let l1 = registry.get_or_create("temp");
let removed = registry.remove("temp");
assert!(removed.is_some());
assert!(Arc::ptr_eq(&l1, &removed.unwrap()));
assert!(registry.remove("temp").is_none());
}
#[test]
fn registry_all_metrics() {
let registry = RateLimiterRegistry::new(RateLimitPolicy::default());
let l1 = registry.get_or_create("api-1");
let l2 = registry.get_or_create("api-2");
let now = Time::from_millis(0);
assert!(l1.try_acquire(1, now));
assert!(l2.try_acquire(2, now));
let all = registry.all_metrics();
assert_eq!(all.len(), 2);
assert_eq!(all.get("api-1").unwrap().total_allowed, 1);
assert_eq!(all.get("api-2").unwrap().total_allowed, 1);
}
#[test]
fn concurrent_acquire_safe() {
use std::sync::atomic::AtomicU32;
use std::thread;
let rl = Arc::new(RateLimiter::new(RateLimitPolicy {
rate: 1000,
burst: 1000,
..Default::default()
}));
let now = Time::from_millis(0);
let acquired = Arc::new(AtomicU32::new(0));
let handles: Vec<_> = (0..10)
.map(|_| {
let rl = rl.clone();
let acq = acquired.clone();
thread::spawn(move || {
for _ in 0..100 {
if rl.try_acquire(1, now) {
acq.fetch_add(1, Ordering::SeqCst);
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(acquired.load(Ordering::SeqCst), 1000);
}
#[test]
fn builder_creates_policy() {
let policy = RateLimitPolicyBuilder::new()
.name("test")
.rate(50)
.period(Duration::from_millis(500))
.burst(20)
.default_cost(2)
.wait_strategy(WaitStrategy::Reject)
.build();
assert_eq!(policy.name, "test");
assert_eq!(policy.rate, 50);
assert_eq!(policy.period, Duration::from_millis(500));
assert_eq!(policy.burst, 20);
assert_eq!(policy.default_cost, 2);
assert!(matches!(policy.wait_strategy, WaitStrategy::Reject));
}
#[test]
fn error_display() {
let exceeded: RateLimitError<&str> = RateLimitError::RateLimitExceeded;
assert!(exceeded.to_string().contains("exceeded"));
let exhausted: RateLimitError<&str> = RateLimitError::QueueIdExhausted;
assert!(exhausted.to_string().contains("queue ID space exhausted"));
let timeout: RateLimitError<&str> = RateLimitError::Timeout {
waited: Duration::from_millis(100),
};
assert!(timeout.to_string().contains("timeout"));
let cancelled: RateLimitError<&str> = RateLimitError::Cancelled;
assert!(cancelled.to_string().contains("cancelled"));
let inner: RateLimitError<&str> = RateLimitError::Inner("inner error");
assert_eq!(inner.to_string(), "inner error");
}
#[test]
fn rate_limit_error_debug_clone_eq() {
let e = RateLimitError::<String>::RateLimitExceeded;
let dbg = format!("{e:?}");
assert!(dbg.contains("RateLimitExceeded"), "{dbg}");
let cloned = e.clone();
assert_eq!(e, cloned);
let e2 = RateLimitError::<String>::Timeout {
waited: Duration::from_millis(200),
};
assert_ne!(e, e2);
}
#[test]
fn test_slow_rate_starvation() {
let rl = RateLimiter::new(RateLimitPolicy {
rate: 1,
period: Duration::from_secs(2),
burst: 1,
..Default::default()
});
let now = Time::from_millis(0);
assert!(rl.try_acquire(1, now));
let mut acquired = false;
let mut t = now;
for _ in 0..4000 {
t = Time::from_millis(t.as_millis() + 1);
if rl.try_acquire(1, t) {
acquired = true;
break;
}
}
assert!(
acquired,
"Failed to acquire token due to precision loss starvation"
);
}
}