use std::{cmp, num::NonZeroU32};
use metrics::gauge;
use super::{Clock, RealClock};
const INTERVAL_TICKS: u64 = 1_000_000;
#[derive(thiserror::Error, Debug, Clone, Copy)]
pub(crate) enum Error {
#[error("Capacity")]
Capacity,
}
#[derive(Debug)]
pub(crate) struct Stable<C = RealClock> {
last_tick: u64,
spare_capacity: u64,
maximum_capacity: u64,
refill_per_tick: u64,
clock: C,
labels: Vec<(String, String)>,
}
impl<C> Stable<C>
where
C: Clock + Send + Sync,
{
#[inline]
pub(crate) async fn wait(&mut self) -> Result<(), Error> {
let one = unsafe { NonZeroU32::new_unchecked(1_u32) };
self.wait_for(one).await
}
pub(crate) async fn wait_for(&mut self, request: NonZeroU32) -> Result<(), Error> {
gauge!(
"throttle_refills_per_tick",
self.refill_per_tick as f64,
&self.labels
);
if u64::from(request.get()) > self.maximum_capacity {
return Err(Error::Capacity);
}
let ticks_since_start = self.clock.ticks_elapsed();
let ticks_since_last_wait = ticks_since_start.saturating_sub(self.last_tick);
self.last_tick = ticks_since_start;
let refilled_capacity: u64 = cmp::min(
ticks_since_last_wait
.saturating_mul(self.refill_per_tick)
.saturating_add(self.spare_capacity),
self.maximum_capacity,
);
let capacity_request = u64::from(request.get());
if refilled_capacity > capacity_request {
self.spare_capacity = refilled_capacity - capacity_request;
} else {
self.spare_capacity = 0;
let slop = (capacity_request - refilled_capacity) / self.refill_per_tick;
self.clock.wait(slop).await;
}
Ok(())
}
pub(crate) fn with_clock(
maximum_capacity: NonZeroU32,
clock: C,
labels: Vec<(String, String)>,
) -> Self {
let refill_per_tick = cmp::max(1, u64::from(maximum_capacity.get()) / INTERVAL_TICKS);
Self {
last_tick: clock.ticks_elapsed(),
maximum_capacity: u64::from(maximum_capacity.get()),
refill_per_tick,
spare_capacity: 0,
clock,
labels,
}
}
}