use std::time::Duration;
use crate::Algorithm;
#[derive(Debug, Clone)]
pub struct Aimd {
estimated_limit: f64,
min_limit: usize,
max_limit: usize,
backoff_ratio: f64,
timeout: Duration,
}
impl Aimd {
pub fn builder() -> AimdBuilder {
AimdBuilder::default()
}
}
impl Default for Aimd {
fn default() -> Self {
AimdBuilder::default().build()
}
}
impl Algorithm for Aimd {
fn max_concurrency(&self) -> usize {
(self.estimated_limit as usize).clamp(self.min_limit, self.max_limit)
}
fn update(&mut self, rtt: Duration, num_inflight: usize, is_error: bool, is_canceled: bool) {
if is_canceled {
return;
}
let limit = self.estimated_limit;
let new_limit = if is_error || rtt > self.timeout {
limit * self.backoff_ratio
} else if num_inflight * 2 >= limit as usize {
limit + 1.0
} else {
return;
};
self.estimated_limit = new_limit.clamp(self.min_limit as f64, self.max_limit as f64);
}
}
pub struct AimdBuilder {
initial_limit: usize,
min_limit: usize,
max_limit: usize,
backoff_ratio: f64,
timeout: Duration,
}
impl Default for AimdBuilder {
fn default() -> Self {
Self {
initial_limit: 20,
min_limit: 20,
max_limit: 200,
backoff_ratio: 0.9,
timeout: Duration::from_secs(5),
}
}
}
impl AimdBuilder {
pub fn initial_limit(mut self, limit: usize) -> Self {
self.initial_limit = limit;
self
}
pub fn min_limit(mut self, limit: usize) -> Self {
self.min_limit = limit;
self
}
pub fn max_limit(mut self, limit: usize) -> Self {
self.max_limit = limit;
self
}
pub fn backoff_ratio(mut self, ratio: f64) -> Self {
self.backoff_ratio = ratio;
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn build(self) -> Aimd {
assert!(
(0.5..1.0).contains(&self.backoff_ratio),
"backoff_ratio must be in [0.5, 1.0), got {}",
self.backoff_ratio,
);
assert!(self.min_limit > 0, "min_limit must be > 0");
assert!(
self.min_limit <= self.max_limit,
"min_limit ({}) must be <= max_limit ({})",
self.min_limit,
self.max_limit,
);
assert!(
self.initial_limit >= self.min_limit && self.initial_limit <= self.max_limit,
"initial_limit ({}) must be in [min_limit({}), max_limit({})]",
self.initial_limit,
self.min_limit,
self.max_limit,
);
Aimd {
estimated_limit: self.initial_limit as f64,
min_limit: self.min_limit,
max_limit: self.max_limit,
backoff_ratio: self.backoff_ratio,
timeout: self.timeout,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn increase_limit_on_success_when_loaded() {
let mut aimd = Aimd::builder().initial_limit(10).min_limit(1).build();
aimd.update(Duration::from_millis(50), 10, false, false);
assert_eq!(aimd.max_concurrency(), 11);
}
#[test]
fn no_increase_when_lightly_loaded() {
let mut aimd = Aimd::builder().initial_limit(10).min_limit(1).build();
aimd.update(Duration::from_millis(50), 2, false, false);
assert_eq!(aimd.max_concurrency(), 10);
}
#[test]
fn decrease_limit_on_error() {
let mut aimd = Aimd::builder().initial_limit(10).min_limit(1).build();
aimd.update(Duration::from_millis(50), 10, true, false);
assert_eq!(aimd.max_concurrency(), 9); }
#[test]
fn decrease_limit_on_timeout() {
let mut aimd = Aimd::builder()
.initial_limit(10)
.min_limit(1)
.timeout(Duration::from_secs(1))
.build();
aimd.update(Duration::from_secs(2), 10, false, false);
assert_eq!(aimd.max_concurrency(), 9);
}
#[test]
fn canceled_requests_are_ignored() {
let mut aimd = Aimd::builder().initial_limit(10).min_limit(1).build();
aimd.update(Duration::from_millis(50), 10, true, true);
assert_eq!(aimd.max_concurrency(), 10);
}
#[test]
fn limit_does_not_drop_below_min() {
let mut aimd = Aimd::builder().initial_limit(5).min_limit(5).build();
for _ in 0..100 {
aimd.update(Duration::from_millis(50), 10, true, false);
}
assert_eq!(aimd.max_concurrency(), 5);
}
#[test]
fn limit_does_not_exceed_max() {
let mut aimd = Aimd::builder()
.initial_limit(10)
.min_limit(1)
.max_limit(12)
.build();
for _ in 0..100 {
aimd.update(Duration::from_millis(50), 10, false, false);
}
assert_eq!(aimd.max_concurrency(), 12);
}
#[test]
fn custom_backoff_ratio() {
let mut aimd = Aimd::builder()
.initial_limit(100)
.min_limit(1)
.backoff_ratio(0.5)
.build();
aimd.update(Duration::from_millis(50), 100, true, false);
assert_eq!(aimd.max_concurrency(), 50); }
#[test]
#[should_panic(expected = "backoff_ratio must be in [0.5, 1.0)")]
fn rejects_backoff_ratio_too_low() {
Aimd::builder().backoff_ratio(0.3).build();
}
#[test]
#[should_panic(expected = "backoff_ratio must be in [0.5, 1.0)")]
fn rejects_backoff_ratio_ge_one() {
Aimd::builder().backoff_ratio(1.0).build();
}
#[test]
#[should_panic(expected = "min_limit must be > 0")]
fn rejects_zero_min_limit() {
Aimd::builder().min_limit(0).build();
}
#[test]
#[should_panic(expected = "min_limit (50) must be <= max_limit (10)")]
fn rejects_min_exceeds_max() {
Aimd::builder()
.min_limit(50)
.max_limit(10)
.initial_limit(50)
.build();
}
#[test]
#[should_panic(expected = "initial_limit (5) must be in")]
fn rejects_initial_below_min() {
Aimd::builder()
.initial_limit(5)
.min_limit(10)
.max_limit(100)
.build();
}
}