use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
const MIN_PACING_RATE: u64 = 1_024;
const MAX_BURST_BYTES: u64 = 64 * 1_024;
pub struct Pacer {
tokens: AtomicU64,
rate_bps: AtomicU64,
max_burst: u64,
last_refill_ns: AtomicU64,
enabled: AtomicBool,
epoch: Instant,
}
impl Pacer {
pub fn new(initial_rate_bps: u64) -> Self {
let rate = initial_rate_bps.max(MIN_PACING_RATE);
let epoch = Instant::now();
Self {
tokens: AtomicU64::new(MAX_BURST_BYTES),
rate_bps: AtomicU64::new(rate),
max_burst: MAX_BURST_BYTES,
last_refill_ns: AtomicU64::new(0),
enabled: AtomicBool::new(true),
epoch,
}
}
pub fn unlimited() -> Self {
let pacer = Self::new(u64::MAX);
pacer.enabled.store(false, Ordering::Relaxed);
pacer
}
pub fn try_consume(&self, bytes: u64) -> bool {
if !self.enabled.load(Ordering::Relaxed) {
return true;
}
self.refill_tokens();
let current = self.tokens.load(Ordering::Relaxed);
if current >= bytes {
self.tokens.fetch_sub(bytes, Ordering::Relaxed);
true
} else {
false
}
}
pub fn time_until_available(&self, bytes: u64) -> Duration {
if !self.enabled.load(Ordering::Relaxed) {
return Duration::ZERO;
}
self.refill_tokens();
let current = self.tokens.load(Ordering::Relaxed);
if current >= bytes {
return Duration::ZERO;
}
let deficit = bytes - current;
let rate = self.rate_bps.load(Ordering::Relaxed).max(1);
Duration::from_nanos(deficit * 1_000_000_000 / rate)
}
pub fn set_rate(&self, rate_bps: u64) {
let rate = rate_bps.max(MIN_PACING_RATE);
self.rate_bps.store(rate, Ordering::Relaxed);
}
pub fn rate(&self) -> u64 {
self.rate_bps.load(Ordering::Relaxed)
}
pub fn set_enabled(&self, enabled: bool) {
self.enabled.store(enabled, Ordering::Relaxed);
}
pub fn is_enabled(&self) -> bool {
self.enabled.load(Ordering::Relaxed)
}
pub fn available_tokens(&self) -> u64 {
self.refill_tokens();
self.tokens.load(Ordering::Relaxed)
}
fn refill_tokens(&self) {
let now_ns = self.epoch.elapsed().as_nanos() as u64;
let last_ns = self.last_refill_ns.load(Ordering::Relaxed);
let elapsed_ns = now_ns.saturating_sub(last_ns);
if elapsed_ns == 0 {
return;
}
let rate = self.rate_bps.load(Ordering::Relaxed);
let new_tokens = (rate as u128 * elapsed_ns as u128 / 1_000_000_000) as u64;
if new_tokens > 0 {
loop {
let current = self.tokens.load(Ordering::Relaxed);
let updated = (current + new_tokens).min(self.max_burst);
if self
.tokens
.compare_exchange_weak(current, updated, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break;
}
}
self.last_refill_ns.store(now_ns, Ordering::Relaxed);
}
}
}
impl std::fmt::Debug for Pacer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pacer")
.field("rate_bps", &self.rate_bps.load(Ordering::Relaxed))
.field("tokens", &self.tokens.load(Ordering::Relaxed))
.field("enabled", &self.enabled.load(Ordering::Relaxed))
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_pacer_allows_burst() {
let pacer = Pacer::new(1_000_000); assert!(pacer.try_consume(1400)); assert!(pacer.try_consume(1400)); }
#[test]
fn test_pacer_blocks_when_empty() {
let pacer = Pacer::new(1_024); assert!(pacer.try_consume(MAX_BURST_BYTES));
assert!(!pacer.try_consume(1));
}
#[test]
fn test_pacer_refills_over_time() {
let pacer = Pacer::new(100_000); assert!(pacer.try_consume(MAX_BURST_BYTES));
assert!(!pacer.try_consume(1));
thread::sleep(Duration::from_millis(50));
let available = pacer.available_tokens();
assert!(
available > 0,
"expected tokens after sleep, got {}",
available
);
}
#[test]
fn test_pacer_rate_update() {
let pacer = Pacer::new(1_000_000);
assert_eq!(pacer.rate(), 1_000_000);
pacer.set_rate(2_000_000);
assert_eq!(pacer.rate(), 2_000_000);
}
#[test]
fn test_pacer_min_rate() {
let pacer = Pacer::new(0); assert_eq!(pacer.rate(), MIN_PACING_RATE);
}
#[test]
fn test_pacer_unlimited() {
let pacer = Pacer::unlimited();
assert!(!pacer.is_enabled());
assert!(pacer.try_consume(u64::MAX));
assert_eq!(pacer.time_until_available(1), Duration::ZERO);
}
#[test]
fn test_pacer_time_until_available() {
let pacer = Pacer::new(1_000_000); pacer.try_consume(MAX_BURST_BYTES);
let wait = pacer.time_until_available(10_000);
assert!(wait > Duration::ZERO);
assert!(wait < Duration::from_millis(50), "wait was {:?}", wait);
}
}