use std::{cmp, num::NonZeroU32};
use metrics::gauge;
use tracing::info;
use crate::target;
use super::{Clock, RealClock};
const INTERVAL_TICKS: u64 = 1_000_000;
#[derive(thiserror::Error, Debug, Clone, Copy)]
pub(crate) enum Error {
#[error("Requested capacity is greater than maximum allowed capacity")]
Capacity,
}
#[derive(Debug)]
pub(crate) struct Predictive<C = RealClock> {
last_tick: u64,
spare_capacity: u64,
maximum_capacity: u64,
requested_budget: u64,
projected_budget: u64,
interval: u64,
refill_per_tick: u64,
clock: C,
labels: Vec<(String, String)>,
}
impl<C> Predictive<C>
where
C: Clock + Send + Sync,
{
#[inline]
pub(crate) async fn wait(&mut self) -> Result<(), Error> {
let one = unsafe { NonZeroU32::new_unchecked(1_u32) };
self.wait_for(one).await
}
pub(crate) async fn wait_for(&mut self, request: NonZeroU32) -> Result<(), Error> {
gauge!(
"throttle_spare_capacity",
self.spare_capacity as f64,
&self.labels
);
gauge!(
"throttle_refills_per_tick",
self.refill_per_tick as f64,
&self.labels
);
gauge!(
"throttle_requested_budget",
self.requested_budget as f64,
&self.labels
);
gauge!(
"throttle_projected_budget",
self.projected_budget as f64,
&self.labels
);
loop {
if target::Meta::rss_bytes_limit_exceeded() {
info!("RSS byte limit exceeded, backing off...");
self.clock.wait(INTERVAL_TICKS).await;
} else {
break;
}
}
if u64::from(request.get()) > self.maximum_capacity {
return Err(Error::Capacity);
}
let ticks_since_start = self.clock.ticks_elapsed();
let ticks_since_last_wait = ticks_since_start.saturating_sub(self.last_tick);
self.last_tick = ticks_since_start;
let refilled_capacity: u64 = cmp::min(
ticks_since_last_wait
.saturating_mul(self.refill_per_tick)
.saturating_add(self.spare_capacity),
self.projected_budget,
);
let current_interval = ticks_since_start / INTERVAL_TICKS;
if current_interval == self.interval {
} else {
if self.requested_budget <= self.projected_budget {
let diff = self.projected_budget - self.requested_budget;
self.projected_budget -= diff / 8;
} else {
let diff = self.requested_budget - self.projected_budget;
self.projected_budget =
cmp::min(self.projected_budget + (diff / 4), self.maximum_capacity);
}
assert!(current_interval > self.interval);
self.interval = current_interval;
self.requested_budget = 0;
}
self.requested_budget = cmp::min(
self.requested_budget.wrapping_add(u64::from(request.get())),
self.maximum_capacity,
);
let capacity_request = u64::from(request.get());
if refilled_capacity > capacity_request {
self.spare_capacity = refilled_capacity - capacity_request;
} else {
self.spare_capacity = 0;
let slop = (capacity_request - refilled_capacity) / self.refill_per_tick;
self.clock.wait(slop).await;
}
Ok(())
}
pub(crate) fn with_clock(
maximum_capacity: NonZeroU32,
clock: C,
labels: Vec<(String, String)>,
) -> Self {
let refill_per_tick = cmp::max(1, u64::from(maximum_capacity.get()) / INTERVAL_TICKS);
Self {
last_tick: clock.ticks_elapsed(),
maximum_capacity: u64::from(maximum_capacity.get()),
refill_per_tick,
requested_budget: 0,
projected_budget: u64::from(maximum_capacity.get()),
interval: 0,
spare_capacity: 0,
clock,
labels,
}
}
#[cfg(test)]
fn maximum_capacity(&self) -> u64 {
self.maximum_capacity
}
#[cfg(test)]
fn requested_budget(&self) -> u64 {
self.requested_budget
}
#[cfg(test)]
fn projected_budget(&self) -> u64 {
self.projected_budget
}
#[cfg(test)]
fn interval(&self) -> u64 {
self.interval
}
}
#[cfg(test)]
mod test {
use std::{
num::NonZeroU32,
sync::Mutex,
task::{Context, Poll},
};
use async_trait::async_trait;
use futures::{task::noop_waker, Future, FutureExt};
use proptest::prelude::*;
use super::{Clock, Predictive};
#[derive(Debug)]
struct TestClockMeta {
idx: usize,
ticks_elapsed: u64,
}
#[derive(Debug)]
struct TestClock {
meta: Mutex<TestClockMeta>,
tick_progressions: Vec<u8>, }
impl TestClock {
fn new(tick_progressions: Vec<u8>) -> Self {
Self {
meta: Mutex::new(TestClockMeta {
idx: 0,
ticks_elapsed: 0,
}),
tick_progressions,
}
}
}
#[async_trait]
impl Clock for TestClock {
fn ticks_elapsed(&self) -> u64 {
self.meta.lock().unwrap().ticks_elapsed
}
async fn wait(&self, request: u64) {
let mut meta = self.meta.lock().unwrap();
let current_tick = self.tick_progressions[meta.idx];
meta.ticks_elapsed += u64::from(current_tick) + request;
meta.idx = (meta.idx + 1) % self.tick_progressions.len();
}
}
fn drive<T>(future: impl Future<Output = T>) -> T {
let waker = noop_waker();
let mut ctx = Context::from_waker(&waker);
let mut future = Box::pin(future);
loop {
match future.poll_unpin(&mut ctx) {
Poll::Pending => continue,
Poll::Ready(res) => return res,
}
}
}
fn ticks() -> impl Strategy<Value = Vec<u8>> {
prop::collection::vec(any::<u8>(), 1..5000)
}
fn nonempty_requests() -> impl Strategy<Value = Vec<u32>> {
let vals = any::<u32>().prop_filter("non-zero", |x| *x != 0_u32);
prop::collection::vec(vals, 1..10_000)
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 1_000,
max_shrink_iters: 100_000,
.. ProptestConfig::default()
})]
#[test]
fn projected_budget_always_le_max(
tick_progressions in ticks(),
maximum_capacity in 1_u32..u32::MAX,
requests in nonempty_requests(),
) {
let maximum_capacity = NonZeroU32::new(maximum_capacity).unwrap();
let requests: Vec<NonZeroU32> = requests
.into_iter()
.filter(|x| *x > 0)
.map(|x| NonZeroU32::new(x).unwrap())
.collect();
let clock = TestClock::new(tick_progressions);
let mut throttle = Predictive::with_clock(maximum_capacity, clock, vec![]);
for request in requests {
assert!(throttle.maximum_capacity() >= throttle.projected_budget());
let _: Result<(), _> = drive(throttle.wait_for(request));
assert!(throttle.maximum_capacity() >= throttle.projected_budget());
}
}
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 1_000,
max_shrink_iters: 100_000,
.. ProptestConfig::default()
})]
#[test]
fn requested_budget_always_le_max(
tick_progressions in ticks(),
maximum_capacity in 1_u32..u32::MAX,
requests in nonempty_requests(),
) {
let maximum_capacity = NonZeroU32::new(maximum_capacity).unwrap();
let requests: Vec<NonZeroU32> = requests
.into_iter()
.filter(|x| *x > 0)
.map(|x| NonZeroU32::new(x).unwrap())
.collect();
let clock = TestClock::new(tick_progressions);
let mut throttle = Predictive::with_clock(maximum_capacity, clock, vec![]);
for request in requests {
assert!(throttle.maximum_capacity() >= throttle.requested_budget());
let _: Result<(), _> = drive(throttle.wait_for(request));
assert!(throttle.maximum_capacity() >= throttle.requested_budget());
}
}
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 1_000,
max_shrink_iters: 100_000,
.. ProptestConfig::default()
})]
#[test]
fn time_advances(
tick_progressions in ticks(),
maximum_capacity in 1_u32..u32::MAX,
requests in nonempty_requests(),
) {
let maximum_capacity = NonZeroU32::new(maximum_capacity).unwrap();
let requests: Vec<NonZeroU32> = requests
.into_iter()
.filter(|x| *x > 0)
.map(|x| NonZeroU32::new(x).unwrap())
.collect();
let clock = TestClock::new(tick_progressions);
let mut throttle = Predictive::with_clock(maximum_capacity, clock, vec![]);
let mut prev_interval = throttle.interval();
for request in requests {
let _: Result<(), _> = drive(throttle.wait_for(request));
assert!(throttle.interval() >= prev_interval);
prev_interval = throttle.interval();
}
}
}
}