use crate::error::{Error, ErrorCode};
use crate::settings::DEFAULT_INITIAL_WINDOW_SIZE;
pub const MAX_WINDOW_SIZE: u32 = 2_147_483_647;
#[derive(Debug, Clone)]
pub struct FlowControl {
send_window: i64,
recv_window: i64,
initial_window_size: u32,
}
impl FlowControl {
#[must_use]
pub fn new(initial_window_size: u32) -> Self {
Self {
send_window: i64::from(initial_window_size),
recv_window: i64::from(initial_window_size),
initial_window_size,
}
}
#[must_use]
pub fn with_separate_windows(send_initial: u32, recv_initial: u32) -> Self {
Self {
send_window: i64::from(send_initial),
recv_window: i64::from(recv_initial),
initial_window_size: send_initial,
}
}
#[must_use]
pub const fn send_window(&self) -> i64 {
self.send_window
}
#[must_use]
pub const fn recv_window(&self) -> i64 {
self.recv_window
}
#[must_use]
pub const fn initial_window_size(&self) -> u32 {
self.initial_window_size
}
#[must_use]
pub fn send_available(&self) -> usize {
if self.send_window > 0 {
self.send_window as usize
} else {
0
}
}
pub fn consume_send(&mut self, size: usize) -> Result<(), Error> {
let size = i64::try_from(size).map_err(|_| {
Error::connection_error(ErrorCode::FlowControlError, "data size too large")
})?;
if size > self.send_window {
return Err(Error::connection_error(
ErrorCode::FlowControlError,
"send window exhausted",
));
}
self.send_window -= size;
Ok(())
}
pub fn consume_recv(&mut self, size: usize) -> Result<(), Error> {
let size = i64::try_from(size).map_err(|_| {
Error::connection_error(ErrorCode::FlowControlError, "data size too large")
})?;
if size > self.recv_window {
return Err(Error::connection_error(
ErrorCode::FlowControlError,
"recv window exceeded",
));
}
self.recv_window -= size;
Ok(())
}
pub fn recv_window_update(&mut self, increment: u32) -> Result<(), Error> {
if increment == 0 {
return Err(Error::connection_error(
ErrorCode::ProtocolError,
"WINDOW_UPDATE with zero increment",
));
}
let new_window = self.send_window + i64::from(increment);
if new_window > i64::from(MAX_WINDOW_SIZE) {
return Err(Error::connection_error(
ErrorCode::FlowControlError,
"window size overflow",
));
}
self.send_window = new_window;
Ok(())
}
pub fn add_recv_window(&mut self, increment: u32) -> Result<(), Error> {
if increment == 0 {
return Err(Error::connection_error(
ErrorCode::ProtocolError,
"window increment must be non-zero",
));
}
let new_window = self.recv_window + i64::from(increment);
if new_window > i64::from(MAX_WINDOW_SIZE) {
return Err(Error::connection_error(
ErrorCode::FlowControlError,
"window size overflow",
));
}
self.recv_window = new_window;
Ok(())
}
pub fn update_initial_window_size(&mut self, new_size: u32) -> Result<(), Error> {
let delta = i64::from(new_size) - i64::from(self.initial_window_size);
let new_window = self.send_window + delta;
if new_window > i64::from(MAX_WINDOW_SIZE) {
return Err(Error::connection_error(
ErrorCode::FlowControlError,
"window size overflow after SETTINGS update",
));
}
self.send_window = new_window;
self.initial_window_size = new_size;
Ok(())
}
#[must_use]
pub fn should_send_window_update(&self) -> bool {
self.recv_window < i64::from(self.initial_window_size / 2)
}
#[must_use]
pub fn window_update_increment(&self) -> u32 {
let target = i64::from(self.initial_window_size);
let increment = target - self.recv_window;
if increment > 0 && increment <= i64::from(MAX_WINDOW_SIZE) {
increment as u32
} else {
0
}
}
}
impl Default for FlowControl {
fn default() -> Self {
Self::new(DEFAULT_INITIAL_WINDOW_SIZE)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_flow_control() {
let fc = FlowControl::new(65535);
assert_eq!(fc.send_window(), 65535);
assert_eq!(fc.recv_window(), 65535);
}
#[test]
fn test_consume_send() {
let mut fc = FlowControl::new(65535);
fc.consume_send(1000).unwrap();
assert_eq!(fc.send_window(), 64535);
}
#[test]
fn test_consume_send_exhausted() {
let mut fc = FlowControl::new(100);
assert!(fc.consume_send(101).is_err());
}
#[test]
fn test_recv_window_update() {
let mut fc = FlowControl::new(65535);
fc.consume_send(10000).unwrap();
assert_eq!(fc.send_window(), 55535);
fc.recv_window_update(5000).unwrap();
assert_eq!(fc.send_window(), 60535);
}
#[test]
fn test_window_update_overflow() {
let mut fc = FlowControl::new(MAX_WINDOW_SIZE);
assert!(fc.recv_window_update(1).is_err());
}
#[test]
fn test_should_send_window_update() {
let mut fc = FlowControl::new(65535);
assert!(!fc.should_send_window_update());
fc.consume_recv(40000).unwrap();
assert!(fc.should_send_window_update());
}
#[test]
fn test_update_initial_window_size() {
let mut fc = FlowControl::new(65535);
fc.consume_send(10000).unwrap();
assert_eq!(fc.send_window(), 55535);
fc.update_initial_window_size(100000).unwrap();
assert_eq!(fc.send_window(), 90000);
}
}