#![allow(dead_code)]
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::Mutex;
#[derive(Debug, Clone)]
pub struct ThrottleConfig {
pub capacity: u64,
pub refill_rate: f64,
pub max_concurrent: usize,
}
impl Default for ThrottleConfig {
fn default() -> Self {
Self {
capacity: 100,
refill_rate: 10.0,
max_concurrent: 0,
}
}
}
impl ThrottleConfig {
#[must_use]
pub fn new(capacity: u64, refill_rate: f64, max_concurrent: usize) -> Self {
Self {
capacity,
refill_rate,
max_concurrent,
}
}
}
#[derive(Debug)]
pub struct Throttle {
config: ThrottleConfig,
tokens: Mutex<f64>,
last_refill: Mutex<Instant>,
concurrent: AtomicUsize,
}
impl Throttle {
#[must_use]
pub fn new(config: ThrottleConfig) -> Self {
Self {
tokens: Mutex::new(config.capacity as f64),
last_refill: Mutex::new(Instant::now()),
concurrent: AtomicUsize::new(0),
config,
}
}
#[must_use]
pub fn default_config() -> Self {
Self::new(ThrottleConfig::default())
}
fn refill(&self) {
let mut last = self.last_refill.lock();
let now = Instant::now();
let elapsed = now.duration_since(*last).as_secs_f64();
*last = now;
drop(last);
let mut tokens = self.tokens.lock();
let new_tokens = elapsed * self.config.refill_rate;
*tokens = (*tokens + new_tokens).min(self.config.capacity as f64);
}
pub fn try_acquire(&self, n: u64) -> bool {
self.refill();
let mut tokens = self.tokens.lock();
if *tokens >= n as f64 {
*tokens -= n as f64;
true
} else {
false
}
}
pub fn acquire(&self, n: u64) {
loop {
if self.try_acquire(n) {
return;
}
std::thread::sleep(Duration::from_millis(5));
}
}
#[must_use]
pub fn available_tokens(&self) -> f64 {
self.refill();
*self.tokens.lock()
}
pub fn try_enter_concurrent(&self) -> bool {
if self.config.max_concurrent == 0 {
return true;
}
let prev = self.concurrent.fetch_add(1, Ordering::SeqCst);
if prev < self.config.max_concurrent {
true
} else {
self.concurrent.fetch_sub(1, Ordering::SeqCst);
false
}
}
pub fn leave_concurrent(&self) {
if self.config.max_concurrent > 0 {
self.concurrent.fetch_sub(1, Ordering::SeqCst);
}
}
#[must_use]
pub fn concurrent_count(&self) -> usize {
self.concurrent.load(Ordering::SeqCst)
}
#[must_use]
pub fn capacity(&self) -> u64 {
self.config.capacity
}
#[must_use]
pub fn refill_rate(&self) -> f64 {
self.config.refill_rate
}
}
#[derive(Debug, Clone)]
pub struct SharedThrottle(Arc<Throttle>);
impl SharedThrottle {
#[must_use]
pub fn new(config: ThrottleConfig) -> Self {
Self(Arc::new(Throttle::new(config)))
}
#[must_use]
pub fn try_acquire(&self, n: u64) -> bool {
self.0.try_acquire(n)
}
pub fn acquire(&self, n: u64) {
self.0.acquire(n);
}
#[must_use]
pub fn available_tokens(&self) -> f64 {
self.0.available_tokens()
}
}
#[derive(Debug)]
pub struct ConcurrencyLimiter {
limit: usize,
current: AtomicUsize,
}
impl ConcurrencyLimiter {
#[must_use]
pub fn new(limit: usize) -> Self {
Self {
limit,
current: AtomicUsize::new(0),
}
}
pub fn try_acquire(&self) -> bool {
let prev = self.current.fetch_add(1, Ordering::SeqCst);
if prev < self.limit {
true
} else {
self.current.fetch_sub(1, Ordering::SeqCst);
false
}
}
pub fn release(&self) {
self.current.fetch_sub(1, Ordering::SeqCst);
}
#[must_use]
pub fn current(&self) -> usize {
self.current.load(Ordering::SeqCst)
}
#[must_use]
pub fn limit(&self) -> usize {
self.limit
}
}
#[derive(Debug)]
pub struct SlidingWindowCounter {
window: Duration,
limit: u64,
requests: Mutex<Vec<Instant>>,
}
impl SlidingWindowCounter {
#[must_use]
pub fn new(window: Duration, limit: u64) -> Self {
Self {
window,
limit,
requests: Mutex::new(Vec::new()),
}
}
pub fn record(&self) -> bool {
let now = Instant::now();
let mut reqs = self.requests.lock();
if let Some(cutoff) = now.checked_sub(self.window) {
reqs.retain(|&t| t > cutoff);
}
if (reqs.len() as u64) < self.limit {
reqs.push(now);
true
} else {
false
}
}
#[must_use]
pub fn count_in_window(&self) -> usize {
let now = Instant::now();
let mut reqs = self.requests.lock();
if let Some(cutoff) = now.checked_sub(self.window) {
reqs.retain(|&t| t > cutoff);
}
reqs.len()
}
}
#[derive(Debug, Default)]
pub struct ThrottleStats {
pub attempts: AtomicI64,
pub successes: AtomicI64,
pub rejections: AtomicI64,
}
impl ThrottleStats {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn record(&self, success: bool) {
self.attempts.fetch_add(1, Ordering::Relaxed);
if success {
self.successes.fetch_add(1, Ordering::Relaxed);
} else {
self.rejections.fetch_add(1, Ordering::Relaxed);
}
}
#[must_use]
pub fn rejection_ratio(&self) -> f64 {
let attempts = self.attempts.load(Ordering::Relaxed);
if attempts == 0 {
return 0.0;
}
self.rejections.load(Ordering::Relaxed) as f64 / attempts as f64
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_throttle_config_default() {
let cfg = ThrottleConfig::default();
assert_eq!(cfg.capacity, 100);
assert!(cfg.refill_rate > 0.0);
assert_eq!(cfg.max_concurrent, 0);
}
#[test]
fn test_throttle_initial_tokens() {
let t = Throttle::new(ThrottleConfig {
capacity: 50,
refill_rate: 5.0,
max_concurrent: 0,
});
assert!(t.available_tokens() > 49.0);
}
#[test]
fn test_throttle_try_acquire_success() {
let t = Throttle::new(ThrottleConfig {
capacity: 10,
refill_rate: 1.0,
max_concurrent: 0,
});
assert!(t.try_acquire(5));
assert!(t.try_acquire(5));
}
#[test]
fn test_throttle_try_acquire_fail_when_empty() {
let t = Throttle::new(ThrottleConfig {
capacity: 3,
refill_rate: 0.001,
max_concurrent: 0,
});
assert!(t.try_acquire(3));
assert!(!t.try_acquire(1));
}
#[test]
fn test_throttle_capacity() {
let t = Throttle::new(ThrottleConfig::new(42, 1.0, 0));
assert_eq!(t.capacity(), 42);
}
#[test]
fn test_throttle_refill_rate() {
let t = Throttle::new(ThrottleConfig::new(10, 3.5, 0));
assert!((t.refill_rate() - 3.5).abs() < 1e-6);
}
#[test]
fn test_concurrent_limit_acquire_release() {
let t = Throttle::new(ThrottleConfig {
capacity: 100,
refill_rate: 10.0,
max_concurrent: 2,
});
assert!(t.try_enter_concurrent());
assert!(t.try_enter_concurrent());
assert!(!t.try_enter_concurrent()); t.leave_concurrent();
assert!(t.try_enter_concurrent());
}
#[test]
fn test_concurrent_count() {
let t = Throttle::new(ThrottleConfig {
capacity: 100,
refill_rate: 10.0,
max_concurrent: 5,
});
assert_eq!(t.concurrent_count(), 0);
t.try_enter_concurrent();
assert_eq!(t.concurrent_count(), 1);
t.leave_concurrent();
assert_eq!(t.concurrent_count(), 0);
}
#[test]
fn test_concurrency_limiter_basics() {
let l = ConcurrencyLimiter::new(3);
assert!(l.try_acquire());
assert!(l.try_acquire());
assert!(l.try_acquire());
assert!(!l.try_acquire());
l.release();
assert!(l.try_acquire());
}
#[test]
fn test_concurrency_limiter_current_and_limit() {
let l = ConcurrencyLimiter::new(5);
l.try_acquire();
assert_eq!(l.current(), 1);
assert_eq!(l.limit(), 5);
}
#[test]
fn test_sliding_window_counter_allows_within_limit() {
let sw = SlidingWindowCounter::new(Duration::from_secs(60), 5);
for _ in 0..5 {
assert!(sw.record());
}
assert!(!sw.record()); }
#[test]
fn test_sliding_window_count_in_window() {
let sw = SlidingWindowCounter::new(Duration::from_secs(60), 10);
sw.record();
sw.record();
assert_eq!(sw.count_in_window(), 2);
}
#[test]
fn test_throttle_stats_record() {
let stats = ThrottleStats::new();
stats.record(true);
stats.record(true);
stats.record(false);
assert_eq!(stats.attempts.load(Ordering::Relaxed), 3);
assert_eq!(stats.successes.load(Ordering::Relaxed), 2);
assert_eq!(stats.rejections.load(Ordering::Relaxed), 1);
}
#[test]
fn test_throttle_stats_rejection_ratio() {
let stats = ThrottleStats::new();
assert!((stats.rejection_ratio() - 0.0).abs() < 1e-6);
stats.record(true);
stats.record(false);
assert!((stats.rejection_ratio() - 0.5).abs() < 1e-6);
}
#[test]
fn test_shared_throttle_clone() {
let st = SharedThrottle::new(ThrottleConfig::new(20, 2.0, 0));
let st2 = st.clone();
assert!(st.try_acquire(10));
assert!(st2.try_acquire(10));
assert!(!st.try_acquire(1));
}
}