use std::{
fmt::Debug,
ops::RangeInclusive,
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
use async_trait::async_trait;
use tokio::sync::Mutex;
use crate::{limiter::Outcome, limits::defaults};
use super::{aimd::multiplicative_decrease, defaults::MIN_SAMPLE_LATENCY, LimitAlgorithm, Sample};
pub struct Vegas {
min_limit: usize,
max_limit: usize,
alpha: Box<dyn (Fn(usize) -> f64) + Send + Sync>,
beta: Box<dyn (Fn(usize) -> f64) + Send + Sync>,
limit: AtomicUsize,
inner: Mutex<Inner>,
}
#[derive(Debug)]
struct Inner {
base_latency: Duration,
}
impl Vegas {
const DEFAULT_ALPHA_MULTIPLIER: f64 = 3_f64;
const DEFAULT_BETA_MULTIPLIER: f64 = 6_f64;
const DEFAULT_DECREASE_FACTOR: f64 = 0.9;
const DEFAULT_INCREASE_MIN_UTILISATION: f64 = 0.8;
#[allow(missing_docs)]
pub fn new_with_initial_limit(initial_limit: usize) -> Self {
Self::new(
initial_limit,
defaults::DEFAULT_MIN_LIMIT..=defaults::DEFAULT_MAX_LIMIT,
)
}
#[allow(missing_docs)]
pub fn new(initial_limit: usize, limit_range: RangeInclusive<usize>) -> Self {
assert!(*limit_range.start() >= 1, "Limits must be at least 1");
assert!(
initial_limit >= *limit_range.start(),
"Initial limit less than minimum"
);
assert!(
initial_limit <= *limit_range.end(),
"Initial limit more than maximum"
);
Self {
limit: AtomicUsize::new(initial_limit),
min_limit: *limit_range.start(),
max_limit: *limit_range.end(),
alpha: Box::new(|limit| {
Self::DEFAULT_ALPHA_MULTIPLIER * (limit as f64).log10().max(1_f64)
}),
beta: Box::new(|limit| {
Self::DEFAULT_BETA_MULTIPLIER * (limit as f64).log10().max(1_f64)
}),
inner: Mutex::new(Inner {
base_latency: Duration::MAX,
}),
}
}
#[allow(missing_docs)]
pub fn with_max_limit(self, max: usize) -> Self {
assert!(max > 0);
Self {
max_limit: max,
..self
}
}
}
#[async_trait]
impl LimitAlgorithm for Vegas {
fn limit(&self) -> usize {
self.limit.load(Ordering::Acquire)
}
async fn update(&self, sample: Sample) -> usize {
if sample.latency < MIN_SAMPLE_LATENCY {
return self.limit.load(Ordering::Acquire);
}
let mut inner = self.inner.lock().await;
if sample.latency < inner.base_latency {
inner.base_latency = sample.latency;
}
let update_limit = |limit: usize| {
let actual_rate = sample.in_flight as f64 / sample.latency.as_secs_f64();
let extra_latency = sample.latency.as_secs_f64() - inner.base_latency.as_secs_f64();
let estimated_queued_jobs = actual_rate * extra_latency;
let utilisation = sample.in_flight as f64 / limit as f64;
let increment = limit.ilog10().max(1) as usize;
let limit = if sample.outcome == Outcome::Overload {
multiplicative_decrease(limit, Self::DEFAULT_DECREASE_FACTOR)
} else if estimated_queued_jobs > (self.beta)(limit) {
limit - increment
} else if estimated_queued_jobs < (self.alpha)(limit)
&& utilisation >= Self::DEFAULT_INCREASE_MIN_UTILISATION
{
limit + increment
} else {
limit
};
Some(limit.clamp(self.min_limit, self.max_limit))
};
self.limit
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, update_limit)
.expect("we always return Some(limit)");
self.limit.load(Ordering::SeqCst)
}
}
impl Debug for Vegas {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Vegas")
.field("limit", &self.limit)
.field("min_limit", &self.min_limit)
.field("max_limit", &self.max_limit)
.field("alpha(1)", &(self.alpha)(1))
.field("beta(1)", &(self.beta)(1))
.field("inner", &self.inner)
.finish()
}
}
#[cfg(test)]
mod tests {
use std::{collections::VecDeque, time::Duration};
use itertools::Itertools;
use crate::limiter::{DefaultLimiter, Limiter, Outcome};
use super::*;
#[tokio::test]
async fn it_works() {
static INIT_LIMIT: usize = 10;
let vegas = Vegas::new_with_initial_limit(INIT_LIMIT);
let limiter = DefaultLimiter::new(vegas);
let mut tokens = Vec::with_capacity(5);
for _ in 0..5 {
let token = limiter.try_acquire().await.unwrap();
tokens.push(token);
}
for mut token in tokens {
token.set_latency(Duration::from_millis(25));
limiter.release(token, Some(Outcome::Success)).await;
}
let mut tokens = Vec::with_capacity(9);
for _ in 0..9 {
let token = limiter.try_acquire().await.unwrap();
tokens.push(token);
}
for mut token in tokens {
token.set_latency(Duration::from_millis(25));
limiter.release(token, Some(Outcome::Success)).await;
}
let higher_limit = limiter.limit();
assert!(
higher_limit > INIT_LIMIT,
"Steady latency + high concurrency => increase limit"
);
let mut tokens = Vec::with_capacity(10);
for _ in 0..10 {
let mut token = limiter.try_acquire().await.unwrap();
token.set_latency(Duration::from_millis(250));
tokens.push(token);
}
for token in tokens {
limiter.release(token, Some(Outcome::Success)).await;
}
assert!(
limiter.limit() < higher_limit,
"Increased latency => decrease limit"
);
}
#[tokio::test]
async fn windowed() {
use crate::aggregation::Percentile;
use crate::limits::Windowed;
static INIT_LIMIT: usize = 10;
let vegas = Windowed::new(
Vegas::new_with_initial_limit(INIT_LIMIT),
Percentile::default(),
)
.with_min_samples(3)
.with_min_window(Duration::ZERO)
.with_max_window(Duration::ZERO);
let limiter = DefaultLimiter::new(vegas);
let mut next_tokens = VecDeque::with_capacity(9);
for _ in 0..9 {
let token = limiter.try_acquire().await.unwrap();
next_tokens.push_back(token);
}
let release_tokens = next_tokens.drain(0..).collect_vec();
for mut token in release_tokens {
token.set_latency(Duration::from_millis(25));
limiter.release(token, Some(Outcome::Success)).await;
let token = limiter.try_acquire().await.unwrap();
next_tokens.push_back(token);
}
let release_tokens = next_tokens.drain(0..).collect_vec();
for mut token in release_tokens {
token.set_latency(Duration::from_millis(25));
limiter.release(token, Some(Outcome::Success)).await;
let token = limiter.try_acquire().await.unwrap();
next_tokens.push_back(token);
}
let higher_limit = limiter.limit();
assert!(
higher_limit > INIT_LIMIT,
"Steady latency + high concurrency => increase limit. Limit: {}",
higher_limit
);
let release_tokens = next_tokens.drain(0..).collect_vec();
for mut token in release_tokens {
token.set_latency(Duration::from_millis(1000));
limiter.release(token, Some(Outcome::Success)).await;
let token = limiter.try_acquire().await.unwrap();
next_tokens.push_back(token);
}
let lower_limit = limiter.limit();
assert!(
lower_limit < higher_limit,
"Increased latency => decrease limit. Limit: {}",
lower_limit
);
}
}