#![allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlowControlState {
Open,
Throttled,
Blocked,
SlowStart,
}
impl FlowControlState {
#[must_use]
pub fn is_throttled(self) -> bool {
matches!(self, Self::Throttled | Self::Blocked)
}
#[must_use]
pub fn is_blocked(self) -> bool {
self == Self::Blocked
}
}
#[derive(Debug, Clone)]
pub struct CongestionWindow {
max_window: u32,
window: u32,
in_flight: u32,
ssthresh: u32,
}
impl CongestionWindow {
#[must_use]
pub fn new(initial: u32, max_window: u32) -> Self {
Self {
max_window,
window: initial,
in_flight: 0,
ssthresh: max_window / 2,
}
}
#[must_use]
pub fn window_size(&self) -> u32 {
self.window
}
#[must_use]
pub fn in_flight(&self) -> u32 {
self.in_flight
}
#[must_use]
pub fn available(&self) -> u32 {
self.window.saturating_sub(self.in_flight)
}
#[must_use]
pub fn can_send(&self) -> bool {
self.available() > 0
}
pub fn on_send(&mut self, n: u32) {
self.in_flight = self.in_flight.saturating_add(n);
}
pub fn on_ack(&mut self, n: u32) {
self.in_flight = self.in_flight.saturating_sub(n);
if self.window < self.ssthresh {
self.window = (self.window + n).min(self.max_window);
} else {
self.window = (self.window + 1).min(self.max_window);
}
}
pub fn on_loss(&mut self) {
self.ssthresh = (self.window / 2).max(2);
self.window = self.ssthresh;
self.in_flight = self.in_flight.min(self.window);
}
}
#[derive(Debug, Clone)]
pub struct FlowController {
state: FlowControlState,
cwindow: CongestionWindow,
nack_count: u32,
nack_limit: u32,
}
impl FlowController {
#[must_use]
pub fn new(initial_window: u32, max_window: u32, nack_limit: u32) -> Self {
Self {
state: FlowControlState::SlowStart,
cwindow: CongestionWindow::new(initial_window, max_window),
nack_count: 0,
nack_limit,
}
}
#[must_use]
pub fn state(&self) -> FlowControlState {
self.state
}
#[must_use]
pub fn current_window_size(&self) -> u32 {
self.cwindow.window_size()
}
#[must_use]
pub fn can_send(&self) -> bool {
!self.state.is_blocked() && self.cwindow.can_send()
}
pub fn ack(&mut self, n: u32) {
self.cwindow.on_ack(n);
self.nack_count = self.nack_count.saturating_sub(1);
if self.state == FlowControlState::Blocked && self.cwindow.can_send() {
self.state = FlowControlState::Throttled;
} else if self.state == FlowControlState::Throttled && self.nack_count == 0 {
self.state = FlowControlState::Open;
} else if self.state == FlowControlState::SlowStart && self.cwindow.can_send() {
self.state = FlowControlState::Open;
}
}
pub fn nack(&mut self) {
self.cwindow.on_loss();
self.nack_count += 1;
if self.nack_count >= self.nack_limit {
self.state = FlowControlState::Blocked;
} else {
self.state = FlowControlState::Throttled;
}
}
pub fn reset(&mut self) {
self.nack_count = 0;
self.state = FlowControlState::SlowStart;
self.cwindow.in_flight = 0;
self.cwindow.window = 1;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_state_open_not_throttled() {
assert!(!FlowControlState::Open.is_throttled());
}
#[test]
fn test_state_throttled_is_throttled() {
assert!(FlowControlState::Throttled.is_throttled());
}
#[test]
fn test_state_blocked_is_throttled() {
assert!(FlowControlState::Blocked.is_throttled());
}
#[test]
fn test_state_blocked_is_blocked() {
assert!(FlowControlState::Blocked.is_blocked());
}
#[test]
fn test_state_open_not_blocked() {
assert!(!FlowControlState::Open.is_blocked());
}
#[test]
fn test_window_can_send_initial() {
let w = CongestionWindow::new(4, 64);
assert!(w.can_send());
}
#[test]
fn test_window_available_after_send() {
let mut w = CongestionWindow::new(4, 64);
w.on_send(4);
assert_eq!(w.available(), 0);
assert!(!w.can_send());
}
#[test]
fn test_window_grows_on_ack() {
let mut w = CongestionWindow::new(1, 64);
w.on_send(1);
w.on_ack(1);
assert!(w.window_size() >= 2);
}
#[test]
fn test_window_shrinks_on_loss() {
let mut w = CongestionWindow::new(16, 64);
w.on_loss();
assert!(w.window_size() < 16);
}
#[test]
fn test_controller_initial_state_slow_start() {
let c = FlowController::new(4, 64, 3);
assert_eq!(c.state(), FlowControlState::SlowStart);
}
#[test]
fn test_controller_ack_transitions_to_open() {
let mut c = FlowController::new(4, 64, 3);
c.ack(1);
assert_eq!(c.state(), FlowControlState::Open);
}
#[test]
fn test_controller_nack_throttles() {
let mut c = FlowController::new(4, 64, 5);
c.ack(1); c.nack();
assert_eq!(c.state(), FlowControlState::Throttled);
}
#[test]
fn test_controller_nack_limit_blocks() {
let mut c = FlowController::new(4, 64, 2);
c.ack(1);
c.nack();
c.nack();
assert_eq!(c.state(), FlowControlState::Blocked);
}
#[test]
fn test_controller_reset() {
let mut c = FlowController::new(4, 64, 2);
c.nack();
c.reset();
assert_eq!(c.state(), FlowControlState::SlowStart);
assert_eq!(c.nack_count, 0);
}
#[test]
fn test_controller_current_window_size() {
let c = FlowController::new(8, 64, 3);
assert_eq!(c.current_window_size(), 8);
}
#[test]
fn test_controller_can_send_in_open() {
let mut c = FlowController::new(4, 64, 3);
c.ack(1);
assert!(c.can_send());
}
}