use std::time::Duration;
use std::time::Instant;
const WINDOW_INCREASE_FACTOR: u64 = 2;
const WINDOW_TRIGGER_FACTOR: u32 = 2;
#[derive(Default, Debug)]
pub struct FlowControl {
consumed: u64,
max_data: u64,
window: u64,
max_window: u64,
last_update: Option<Instant>,
}
impl FlowControl {
pub fn new(max_data: u64, window: u64, max_window: u64) -> Self {
Self {
max_data,
window: std::cmp::min(window, max_window),
max_window,
..Default::default()
}
}
pub fn window(&self) -> u64 {
self.window
}
pub fn max_data(&self) -> u64 {
self.max_data
}
#[cfg(test)]
pub fn consumed(&self) -> u64 {
self.consumed
}
pub fn add_consumed(&mut self, consumed: u64) {
self.consumed += consumed;
}
pub fn should_update_max_data(&self) -> bool {
let available_window = self.max_data - self.consumed;
available_window < (self.window / 2)
}
pub fn max_data_next(&self) -> u64 {
self.consumed + self.window
}
pub fn update_max_data(&mut self, now: Instant) {
self.max_data = self.max_data_next();
self.last_update = Some(now);
}
pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
if let Some(last_update) = self.last_update {
if now - last_update < rtt * WINDOW_TRIGGER_FACTOR {
self.set_window(self.window * WINDOW_INCREASE_FACTOR);
}
}
}
fn set_window(&mut self, window: u64) {
self.window = std::cmp::min(window, self.max_window);
}
pub fn set_window_if_not_tuned_yet(&mut self, window: u64) {
if self.last_update.is_none() {
self.set_window(window);
}
}
pub fn ensure_window_lower_bound(&mut self, min_window: u64) {
if min_window > self.window {
self.set_window(min_window);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn max_window_in_new() {
let fc = FlowControl::new(100, 100, 50);
assert_eq!(fc.max_data(), 100);
assert_eq!(fc.window, 50);
}
#[test]
fn max_data() {
let fc = FlowControl::new(100, 20, 100);
assert_eq!(fc.max_data(), 100);
}
#[test]
fn should_update_max_data() {
let mut fc = FlowControl::new(100, 20, 100);
fc.add_consumed(85);
assert!(!fc.should_update_max_data());
fc.add_consumed(10);
assert!(fc.should_update_max_data());
}
#[test]
fn max_data_next() {
let mut fc = FlowControl::new(100, 20, 100);
let consumed = 95;
fc.add_consumed(consumed);
assert!(fc.should_update_max_data());
assert_eq!(fc.max_data_next(), consumed + 20);
}
#[test]
fn update_max_data() {
let mut fc = FlowControl::new(100, 20, 100);
let consumed = 95;
fc.add_consumed(consumed);
assert!(fc.should_update_max_data());
let max_data_next = fc.max_data_next();
assert_eq!(fc.max_data_next(), consumed + 20);
fc.update_max_data(Instant::now());
assert_eq!(fc.max_data(), max_data_next);
}
#[test]
fn autotune_window() {
let w = 20;
let mut fc = FlowControl::new(100, w, 100);
let consumed = 95;
fc.add_consumed(consumed);
assert!(fc.should_update_max_data());
let max_data_next = fc.max_data_next();
assert_eq!(max_data_next, consumed + w);
fc.update_max_data(Instant::now());
assert_eq!(fc.max_data(), max_data_next);
fc.autotune_window(Instant::now(), Duration::from_millis(100));
let w = w * 2;
let consumed_inc = 15;
fc.add_consumed(consumed_inc);
assert!(fc.should_update_max_data());
let max_data_next = fc.max_data_next();
assert_eq!(max_data_next, consumed + consumed_inc + w);
}
#[test]
fn ensure_window_lower_bound() {
let w = 20;
let mut fc = FlowControl::new(100, w, 100);
fc.ensure_window_lower_bound(w);
assert_eq!(fc.window(), 20);
fc.ensure_window_lower_bound(w * 2);
assert_eq!(fc.window(), 40);
fc.ensure_window_lower_bound(101);
assert_eq!(fc.window(), 100);
}
}