use std::sync::atomic::{AtomicI64, AtomicU32, AtomicU64, Ordering};
pub trait Limiter {
fn inc(&self, limit: u32) -> bool;
fn rate(&self) -> f64;
fn update_rate(&self) -> f64;
}
#[repr(C)]
pub struct LocalLimiter {
hit_count: AtomicI64,
last_update: AtomicU64,
last_limit: AtomicU32,
granularity: i64,
}
const TIME_PER_SECOND: i64 = 1_000_000_000;
fn now() -> u64 {
#[cfg(windows)]
let now = unsafe {
static FREQUENCY: AtomicU64 = AtomicU64::new(0);
let mut frequency = FREQUENCY.load(Ordering::Relaxed);
if frequency == 0 {
windows_sys::Win32::System::Performance::QueryPerformanceFrequency(
&mut frequency as *mut u64 as *mut i64,
);
FREQUENCY.store(frequency, Ordering::Relaxed);
}
let mut perf_counter = 0;
windows_sys::Win32::System::Performance::QueryPerformanceCounter(&mut perf_counter);
perf_counter as u64 * frequency / TIME_PER_SECOND as u64
};
#[cfg(not(windows))]
let now = {
let mut ts: libc::timespec = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) };
#[cfg(target_pointer_width = "32")]
{
(ts.tv_sec as i64 * TIME_PER_SECOND + ts.tv_nsec as i64) as u64
}
#[cfg(target_pointer_width = "64")]
{
(ts.tv_sec * TIME_PER_SECOND + ts.tv_nsec) as u64
}
};
now
}
impl Default for LocalLimiter {
fn default() -> Self {
LocalLimiter {
hit_count: Default::default(),
last_update: AtomicU64::from(now()),
last_limit: Default::default(),
granularity: TIME_PER_SECOND,
}
}
}
impl LocalLimiter {
pub fn with_granularity(seconds: u32) -> LocalLimiter {
let mut limiter = LocalLimiter::default();
limiter.granularity *= seconds as i64;
limiter
}
pub fn reset(&mut self, seconds: u32) {
self.last_update.store(now(), Ordering::Relaxed);
self.hit_count.store(0, Ordering::Relaxed);
self.last_limit.store(0, Ordering::Relaxed);
self.granularity = TIME_PER_SECOND * seconds as i64;
}
fn update(&self, limit: u32, inc: i64) -> i64 {
let now = now();
let last = self.last_update.swap(now, Ordering::SeqCst);
let clear_limit = limit.max(self.last_limit.load(Ordering::Relaxed));
let clear_counter = (now as i64 - last as i64) * (clear_limit as i64);
let subtract = clear_counter - inc;
let mut previous_hits = self.hit_count.fetch_sub(subtract, Ordering::SeqCst);
if previous_hits < subtract {
let add = clear_counter - previous_hits.max(0);
self.hit_count.fetch_add(add, Ordering::Acquire);
previous_hits += add - clear_counter;
}
previous_hits
}
}
impl Limiter for LocalLimiter {
fn inc(&self, limit: u32) -> bool {
let previous_hits = self.update(limit, self.granularity);
if previous_hits / self.granularity >= limit as i64 {
self.hit_count
.fetch_sub(self.granularity, Ordering::Acquire);
false
} else {
self.last_limit.store(limit, Ordering::Relaxed);
true
}
}
fn rate(&self) -> f64 {
let last_limit = self.last_limit.load(Ordering::Relaxed);
let hit_count = self.hit_count.load(Ordering::Relaxed);
(hit_count as f64 / (last_limit as i64 * self.granularity) as f64).clamp(0., 1.)
}
fn update_rate(&self) -> f64 {
self.update(0, self.granularity);
self.rate()
}
}
#[cfg(test)]
mod tests {
use crate::rate_limiter::{Limiter, LocalLimiter, TIME_PER_SECOND};
use std::sync::atomic::Ordering;
use std::thread::sleep;
use std::time::Duration;
#[test]
#[cfg_attr(miri, ignore)]
fn test_rate_limiter() {
let limiter = LocalLimiter::default();
assert!(limiter.inc(2));
assert!(limiter.rate() > 0.49999 && limiter.rate() <= 0.5);
sleep(Duration::from_micros(100));
assert!(limiter.inc(2));
assert!(limiter.rate() > 0.5 && limiter.rate() < 1.);
sleep(Duration::from_micros(100));
assert!(limiter.inc(2));
assert_eq!(1., limiter.rate());
sleep(Duration::from_micros(100));
assert!(!limiter.inc(2));
sleep(Duration::from_micros(100));
assert!(!limiter.inc(2));
sleep(Duration::from_micros(100));
limiter
.last_update
.fetch_sub(3 * TIME_PER_SECOND as u64, Ordering::Relaxed);
assert!(limiter.inc(2));
assert!(limiter.rate() > 0.49999 && limiter.rate() <= 0.5); sleep(Duration::from_micros(100));
assert!(limiter.inc(2));
sleep(Duration::from_micros(100));
assert!(limiter.inc(2));
sleep(Duration::from_micros(100));
assert!(!limiter.inc(2));
sleep(Duration::from_micros(100));
assert!(limiter.inc(3));
sleep(Duration::from_micros(100));
assert!(!limiter.inc(3));
assert!(!limiter.inc(1));
limiter
.last_update
.fetch_sub(2 * TIME_PER_SECOND as u64, Ordering::Relaxed);
assert!(limiter.inc(1));
}
}