use std::sync::atomic::{AtomicI32, Ordering};
const UNCLAIMED_NUMERATOR: i32 = 1;
const UNCLAIMED_DENOMINATOR: i32 = 2;
const_assert!(0 <= UNCLAIMED_NUMERATOR && UNCLAIMED_NUMERATOR < UNCLAIMED_DENOMINATOR);
#[derive(Debug)]
pub(super) struct TxFlowControl(AtomicI32);
impl TxFlowControl {
pub(super) fn new(initial: i32) -> Self {
assert!(initial > 0, "negative initial window");
Self(AtomicI32::new(initial))
}
pub(super) fn try_acquire(&self) -> bool {
let prev = self.0.fetch_sub(1, Ordering::Relaxed);
assert_ne!(prev, i32::MIN, "window underflow");
if prev > 0 {
true
} else {
self.0.fetch_add(1, Ordering::Relaxed);
false
}
}
pub(super) fn do_acquire(&self) {
let prev = self.0.fetch_sub(1, Ordering::Relaxed);
assert_ne!(prev, i32::MIN, "window underflow");
}
pub(super) fn release(&self, delta: i32) -> bool {
let prev = self.0.fetch_add(delta, Ordering::Relaxed);
prev.checked_add(delta).expect("window overflow") > 0
}
}
#[derive(Debug)]
pub(super) struct RxFlowControl {
tx_window: i32,
rx_window: i32,
}
impl RxFlowControl {
pub(super) fn new(initial: i32) -> Self {
assert!(initial >= 0, "initial window must be non-negative");
Self {
tx_window: initial,
rx_window: initial,
}
}
pub(super) fn do_acquire(&mut self, tx_knows: bool) {
if tx_knows {
self.tx_window = self.tx_window.checked_sub(1).expect("window underflow");
}
self.rx_window = self.rx_window.checked_sub(1).expect("window underflow");
}
pub(super) fn release(&mut self, delta: i32) -> Option<i32> {
self.rx_window = self.rx_window.checked_add(delta).expect("window overflow");
if self.rx_window <= 0 {
return None;
}
if self.tx_window >= self.rx_window {
return None;
}
let threshold = self.tx_window / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR;
let release = self.rx_window.saturating_sub(self.tx_window);
if release >= threshold {
self.tx_window += release;
Some(release)
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn tx_flow_control() {
let fc = TxFlowControl::new(2);
assert!(fc.try_acquire());
assert!(fc.try_acquire());
assert!(!fc.try_acquire());
assert!(!fc.try_acquire());
assert!(fc.release(1));
assert!(fc.try_acquire());
assert!(!fc.try_acquire());
assert!(fc.release(2));
assert!(fc.release(1));
for _ in 0..5 {
fc.do_acquire();
}
assert!(!fc.release(1));
assert!(!fc.release(1));
assert!(fc.release(1));
}
#[test]
fn rx_flow_control() {
let mut fc = RxFlowControl::new(1000);
let total = 1000;
let sent = (0..total).fold(0, |sent, _| {
fc.do_acquire(true);
sent + u32::from(fc.release(1).is_some())
});
let ratio = sent as f64 / total as f64;
assert!(ratio < 0.01, "{}", ratio);
let mut fc = RxFlowControl::new(0);
let total = 2000;
let sent = (0..total).fold(0, |sent, _| sent + fc.release(1).is_some() as u32);
let ratio = sent as f64 / total as f64;
assert!(ratio < 0.01, "{}", ratio);
}
}