use std::time::{Duration, Instant};
use throttle_core::{SystemClock, Throttle as Inner};
pub use throttle_core::ThrottleError;
use crate::runtime::metrics;
pub struct Throttle {
name: &'static str,
inner: Inner<SystemClock>,
}
impl Throttle {
pub fn new(capacity: u64, refill_per_sec: u64) -> Self {
Self::with_name("default", capacity, refill_per_sec)
}
pub fn with_name(name: &'static str, capacity: u64, refill_per_sec: u64) -> Self {
let _ = metrics::throttle_wait().with_label_values(&[name]);
Self {
name,
inner: Inner::new(capacity, refill_per_sec),
}
}
pub fn capacity(&self) -> u64 {
self.inner.capacity()
}
pub fn refill_per_sec(&self) -> u64 {
self.inner.refill_per_sec()
}
pub fn available(&self) -> u64 {
self.inner.available()
}
pub fn try_acquire(&self, n: u64) -> bool {
self.inner.try_acquire(n)
}
pub async fn acquire(&self, n: u64) {
let capacity = self.inner.capacity();
assert!(
n <= capacity,
"throttle: requested {n} tokens > capacity {capacity}",
);
if n == 0 {
return;
}
let start = Instant::now();
if self.inner.try_acquire(n) {
return;
}
let refill = self.inner.refill_per_sec();
assert!(
refill > 0,
"throttle: zero refill rate cannot satisfy acquire({n})",
);
loop {
let needed = n.saturating_sub(self.inner.available());
let needed = needed.max(1);
let want_nanos = u128::from(needed).saturating_mul(1_000_000_000) / u128::from(refill);
let want_nanos = want_nanos.clamp(1_000_000, 1_000_000_000);
let dur = Duration::from_nanos(u64::try_from(want_nanos).unwrap_or(u64::MAX));
tokio::time::sleep(dur).await;
if self.inner.try_acquire(n) {
let waited = start.elapsed().as_secs_f64();
metrics::throttle_wait()
.with_label_values(&[self.name])
.observe(waited);
return;
}
}
}
}
impl std::fmt::Debug for Throttle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Throttle")
.field("name", &self.name)
.field("capacity", &self.capacity())
.field("refill_per_sec", &self.refill_per_sec())
.field("available", &self.available())
.finish_non_exhaustive()
}
}