pub(super) const INFLOW_MIN_REFRESH: i64 = 4 << 10;
#[derive(Clone, Copy, Debug)]
pub(super) struct Inflow {
avail: i64,
unsent: i64,
target: i64,
}
impl Inflow {
pub(super) const fn new(target: i64) -> Self {
Self {
avail: target,
unsent: 0,
target,
}
}
pub(super) fn take(&mut self, n: i64) -> bool {
if n > self.avail {
return false;
}
self.avail -= n;
true
}
pub(super) fn add(&mut self, n: i64) -> i64 {
debug_assert!(n >= 0, "flow-control credit must be non-negative");
self.unsent += n;
if self.unsent < INFLOW_MIN_REFRESH && self.unsent < self.avail {
return 0;
}
let increment = self.unsent;
self.avail += increment;
self.unsent = 0;
increment
}
pub(super) fn raise_target(&mut self, new_target: i64) -> i64 {
if new_target <= self.target {
return 0;
}
let delta = new_target - self.target;
self.target = new_target;
self.avail += delta;
delta
}
}
#[cfg(test)]
mod tests {
use super::{INFLOW_MIN_REFRESH, Inflow};
#[test]
fn take_spends_window_and_reports_overrun() {
let mut f = Inflow::new(100);
assert!(f.take(60));
assert_eq!(f.avail, 40);
assert!(f.take(40));
assert_eq!(f.avail, 0);
assert!(!f.take(1));
assert_eq!(f.avail, 0, "a rejected take leaves the window unchanged");
}
#[test]
fn add_buffers_below_threshold() {
let mut f = Inflow::new(1 << 20);
f.take(1 << 20); f.raise_target(2 << 20); let small = INFLOW_MIN_REFRESH - 1;
assert_eq!(f.add(small), 0, "sub-threshold credit is buffered");
assert_eq!(
f.add(1),
INFLOW_MIN_REFRESH,
"crossing the threshold flushes all buffered credit"
);
}
#[test]
fn add_flushes_when_it_would_double_the_window() {
let mut f = Inflow::new(100);
f.take(90); assert_eq!(f.add(10), 10);
assert_eq!(f.avail, 20);
}
#[test]
fn add_flushes_immediately_when_peer_is_blocked() {
let mut f = Inflow::new(100);
f.take(100); assert_eq!(f.add(1), 1);
assert_eq!(f.avail, 1);
}
#[test]
fn raise_target_grows_window_by_the_delta() {
let mut f = Inflow::new(256);
f.take(200); let inc = f.raise_target(1024);
assert_eq!(inc, 768, "increment is the target delta");
assert_eq!(f.avail, 56 + 768);
assert_eq!(f.target, 1024);
}
#[test]
fn raise_target_never_lowers() {
let mut f = Inflow::new(1024);
assert_eq!(f.raise_target(512), 0);
assert_eq!(f.raise_target(1024), 0);
assert_eq!(f.target, 1024);
assert_eq!(f.avail, 1024);
}
#[test]
fn invariant_avail_plus_buffered_stays_within_target() {
let mut f = Inflow::new(256);
let mut buffered: i64 = 0;
assert!(f.take(256));
buffered += 256;
assert!(f.avail + buffered <= f.target);
f.add(100);
buffered -= 100;
assert!(f.avail + buffered <= f.target, "after refill");
f.raise_target(1024);
let room = f.avail;
assert!(f.take(room));
buffered += room;
assert!(f.avail + buffered <= f.target, "after promotion + fill");
assert_eq!(f.avail, 0);
}
}