use std::time::Duration;
use std::time::Instant;
#[derive(Default)]
pub struct BytesInFlight {
bytes_in_flight: usize,
bytes_in_flight_interval_start: Option<Instant>,
open_interval_duration: Duration,
closed_interval_duration: Duration,
}
impl BytesInFlight {
pub(crate) fn add(&mut self, delta: usize, now: Instant) {
if delta == 0 {
return;
}
self.bytes_in_flight += delta;
if self.bytes_in_flight_interval_start.is_some() {
self.update_in_flight_duration(now);
} else {
self.bytes_in_flight_interval_start = Some(now);
}
}
pub(crate) fn saturating_subtract(&mut self, delta: usize, now: Instant) {
self.bytes_in_flight = self.bytes_in_flight.saturating_sub(delta);
self.update_in_flight_duration(now);
}
pub(crate) fn get(&self) -> usize {
self.bytes_in_flight
}
pub(crate) fn is_zero(&self) -> bool {
self.bytes_in_flight == 0
}
pub(crate) fn get_duration(&self) -> Duration {
self.closed_interval_duration + self.open_interval_duration
}
fn update_in_flight_duration(&mut self, now: Instant) {
if let Some(start) = self.bytes_in_flight_interval_start {
if self.bytes_in_flight == 0 {
self.open_interval_duration = Duration::ZERO;
self.closed_interval_duration += now - start;
self.bytes_in_flight_interval_start = None;
} else {
self.open_interval_duration = now - start;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn bytes_in_flight_basic() {
let start = Instant::now();
let mut bytes_in_flight: BytesInFlight = Default::default();
assert_eq!(bytes_in_flight.get(), 0);
assert_eq!(bytes_in_flight.get_duration(), Duration::ZERO);
bytes_in_flight.add(1, start);
assert_eq!(bytes_in_flight.get(), 1);
assert_eq!(bytes_in_flight.get_duration(), Duration::ZERO);
let mut now = start + Duration::from_secs(2);
bytes_in_flight.add(2, now);
bytes_in_flight.add(3, now);
assert_eq!(bytes_in_flight.get(), 6);
assert_eq!(bytes_in_flight.get_duration(), Duration::from_secs(2));
now += Duration::from_secs(5);
bytes_in_flight.saturating_subtract(3, now);
assert_eq!(bytes_in_flight.get(), 3);
assert_eq!(bytes_in_flight.get_duration(), Duration::from_secs(7));
bytes_in_flight.saturating_subtract(3, now);
assert_eq!(bytes_in_flight.get(), 0);
assert_eq!(bytes_in_flight.get_duration(), Duration::from_secs(7));
now += Duration::from_secs(30);
bytes_in_flight.add(10, now);
assert_eq!(bytes_in_flight.get(), 10);
assert_eq!(bytes_in_flight.get_duration(), Duration::from_secs(7));
now += Duration::from_secs(5);
bytes_in_flight.saturating_subtract(10, now);
assert_eq!(bytes_in_flight.get(), 0);
assert_eq!(bytes_in_flight.get_duration(), Duration::from_secs(12));
}
#[test]
fn bytes_in_flight_saturating_sub() {
let start = Instant::now();
let mut bytes_in_flight: BytesInFlight = Default::default();
bytes_in_flight.add(10, start);
assert_eq!(bytes_in_flight.get(), 10);
bytes_in_flight.saturating_subtract(7, start + Duration::from_secs(3));
assert_eq!(bytes_in_flight.get(), 3);
assert_eq!(bytes_in_flight.get_duration(), Duration::from_secs(3));
bytes_in_flight.saturating_subtract(1, start + Duration::from_secs(20));
assert_eq!(bytes_in_flight.get(), 2);
assert_eq!(bytes_in_flight.get_duration(), Duration::from_secs(20));
bytes_in_flight.saturating_subtract(7, start + Duration::from_secs(25));
assert_eq!(bytes_in_flight.get(), 0);
assert_eq!(bytes_in_flight.get_duration(), Duration::from_secs(25));
}
}