use crate::{event, event::IntoEvent, recovery::congestion_controller::Publisher, time::Timestamp};
use core::{
cmp::{max, Ordering},
time::Duration,
};
use num_rational::Ratio;
use num_traits::Inv;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct PacketInfo {
pub delivered_bytes: u64,
pub delivered_time: Timestamp,
pub lost_bytes: u64,
pub ecn_ce_count: u64,
pub first_sent_time: Timestamp,
pub bytes_in_flight: u32,
pub is_app_limited: bool,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Bandwidth {
nanos_per_kibibyte: u64,
}
const KIBIBYTE_SHIFT: u8 = 10;
impl Bandwidth {
pub const ZERO: Bandwidth = Bandwidth {
nanos_per_kibibyte: u64::MAX,
};
pub const INFINITY: Bandwidth = Bandwidth {
nanos_per_kibibyte: 0,
};
pub const fn new(bytes: u64, interval: Duration) -> Self {
let interval = (interval.as_nanos() as u64) << KIBIBYTE_SHIFT;
if interval == 0 || bytes == 0 {
Bandwidth::ZERO
} else {
Self {
nanos_per_kibibyte: interval / bytes,
}
}
}
#[inline]
pub fn serialize(self) -> u64 {
self.nanos_per_kibibyte
}
#[inline]
pub fn deserialize(value: u64) -> Self {
Self {
nanos_per_kibibyte: value,
}
}
pub fn as_bytes_per_second(&self) -> u64 {
const ONE_SECOND_IN_NANOS: u64 = Duration::from_secs(1).as_nanos() as u64;
if *self == Bandwidth::INFINITY {
return u64::MAX;
}
(ONE_SECOND_IN_NANOS << KIBIBYTE_SHIFT) / self.nanos_per_kibibyte
}
}
impl Default for Bandwidth {
fn default() -> Self {
Bandwidth::ZERO
}
}
impl core::cmp::PartialOrd for Bandwidth {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl core::cmp::Ord for Bandwidth {
fn cmp(&self, other: &Self) -> Ordering {
self.nanos_per_kibibyte
.cmp(&other.nanos_per_kibibyte)
.reverse()
}
}
impl core::ops::Mul<Ratio<u64>> for Bandwidth {
type Output = Bandwidth;
fn mul(self, rhs: Ratio<u64>) -> Self::Output {
if self == Bandwidth::ZERO {
return Bandwidth::ZERO;
}
Bandwidth {
nanos_per_kibibyte: (rhs.inv() * self.nanos_per_kibibyte).to_integer(),
}
}
}
impl core::ops::Mul<Duration> for Bandwidth {
type Output = u64;
fn mul(self, rhs: Duration) -> Self::Output {
if self == Bandwidth::INFINITY {
return u64::MAX;
} else if rhs.is_zero() {
return 0;
}
let interval = (rhs.as_nanos() as u64) << KIBIBYTE_SHIFT;
if interval == 0 {
return u64::MAX;
}
interval / self.nanos_per_kibibyte
}
}
impl core::ops::Div<Bandwidth> for u64 {
type Output = Duration;
fn div(self, rhs: Bandwidth) -> Self::Output {
#[allow(clippy::suspicious_arithmetic_impl)]
Duration::from_nanos(rhs.nanos_per_kibibyte.saturating_mul(self) >> KIBIBYTE_SHIFT)
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct RateSample {
pub interval: Duration,
pub delivered_bytes: u64,
pub lost_bytes: u64,
pub ecn_ce_count: u64,
pub is_app_limited: bool,
pub prior_delivered_bytes: u64,
pub bytes_in_flight: u32,
pub prior_lost_bytes: u64,
pub prior_ecn_ce_count: u64,
}
impl RateSample {
fn on_ack(&mut self, packet_info: PacketInfo) {
debug_assert!(
packet_info.delivered_bytes >= self.prior_delivered_bytes,
"on_ack should only be called with the newest acknowledged packet"
);
self.is_app_limited = packet_info.is_app_limited;
self.prior_delivered_bytes = packet_info.delivered_bytes;
self.prior_lost_bytes = packet_info.lost_bytes;
self.prior_ecn_ce_count = packet_info.ecn_ce_count;
self.bytes_in_flight = packet_info.bytes_in_flight;
}
pub fn delivery_rate(&self) -> Bandwidth {
Bandwidth::new(self.delivered_bytes, self.interval)
}
}
impl IntoEvent<event::builder::RateSample> for RateSample {
#[inline]
fn into_event(self) -> event::builder::RateSample {
event::builder::RateSample {
interval: self.interval.into_event(),
delivered_bytes: self.delivered_bytes.into_event(),
lost_bytes: self.lost_bytes.into_event(),
ecn_ce_count: self.ecn_ce_count.into_event(),
is_app_limited: self.is_app_limited.into_event(),
prior_delivered_bytes: self.prior_delivered_bytes.into_event(),
bytes_in_flight: self.bytes_in_flight.into_event(),
prior_lost_bytes: self.prior_lost_bytes.into_event(),
prior_ecn_ce_count: self.prior_ecn_ce_count.into_event(),
delivery_rate_bytes_per_second: self.delivery_rate().as_bytes_per_second().into_event(),
}
}
}
#[derive(Clone, Debug, Default)]
pub struct Estimator {
delivered_bytes: u64,
delivered_time: Option<Timestamp>,
lost_bytes: u64,
ecn_ce_count: u64,
first_sent_time: Option<Timestamp>,
app_limited_delivered_bytes: Option<u64>,
rate_sample: RateSample,
}
impl Estimator {
pub fn delivered_bytes(&self) -> u64 {
self.delivered_bytes
}
pub fn lost_bytes(&self) -> u64 {
self.lost_bytes
}
pub fn rate_sample(&self) -> RateSample {
self.rate_sample
}
pub fn is_app_limited(&self) -> bool {
self.app_limited_delivered_bytes.is_some()
}
pub fn on_packet_sent(
&mut self,
prior_bytes_in_flight: u32,
sent_bytes: usize,
app_limited: Option<bool>,
now: Timestamp,
) -> PacketInfo {
if prior_bytes_in_flight == 0 {
self.first_sent_time = Some(now);
self.delivered_time = Some(now);
}
let bytes_in_flight = prior_bytes_in_flight.saturating_add(sent_bytes as u32);
if app_limited.unwrap_or(true) {
self.on_app_limited(bytes_in_flight);
}
PacketInfo {
delivered_bytes: self.delivered_bytes,
delivered_time: self
.delivered_time
.expect("initialized on first sent packet"),
lost_bytes: self.lost_bytes,
ecn_ce_count: self.ecn_ce_count,
first_sent_time: self
.first_sent_time
.expect("initialized on first sent packet"),
bytes_in_flight,
is_app_limited: self.app_limited_delivered_bytes.is_some(),
}
}
pub fn on_ack<Pub: Publisher>(
&mut self,
bytes_acknowledged: usize,
newest_acked_time_sent: Timestamp,
newest_acked_packet_info: PacketInfo,
now: Timestamp,
publisher: &mut Pub,
) {
self.delivered_bytes += bytes_acknowledged as u64;
self.delivered_time = Some(now);
let is_app_limited_period_over =
|app_limited_bytes| self.delivered_bytes > app_limited_bytes;
if self
.app_limited_delivered_bytes
.is_some_and(is_app_limited_period_over)
{
self.app_limited_delivered_bytes = None;
}
if self.rate_sample.prior_delivered_bytes == 0
|| newest_acked_packet_info.delivered_bytes > self.rate_sample.prior_delivered_bytes
{
self.rate_sample.on_ack(newest_acked_packet_info);
self.first_sent_time = Some(newest_acked_time_sent);
debug_assert!(
newest_acked_time_sent >= newest_acked_packet_info.first_sent_time,
"newest_acked_time_sent={newest_acked_time_sent}, first_sent_time={}",
newest_acked_packet_info.first_sent_time
);
let send_elapsed = newest_acked_time_sent - newest_acked_packet_info.first_sent_time;
debug_assert!(
now >= newest_acked_packet_info.delivered_time,
"now={now}, delivered_time={}",
newest_acked_packet_info.delivered_time
);
let ack_elapsed = now - newest_acked_packet_info.delivered_time;
self.rate_sample.interval = max(send_elapsed, ack_elapsed);
}
self.rate_sample.delivered_bytes =
self.delivered_bytes - self.rate_sample.prior_delivered_bytes;
self.rate_sample.lost_bytes = self.lost_bytes - self.rate_sample.prior_lost_bytes;
self.rate_sample.ecn_ce_count = self.ecn_ce_count - self.rate_sample.prior_ecn_ce_count;
publisher.on_delivery_rate_sampled(self.rate_sample);
}
#[inline]
pub fn on_loss(&mut self, lost_bytes: usize) {
self.lost_bytes += lost_bytes as u64;
self.rate_sample.lost_bytes = self.lost_bytes - self.rate_sample.prior_lost_bytes;
if let Some(ref mut app_limited_delivered_bytes) = self.app_limited_delivered_bytes {
*app_limited_delivered_bytes =
app_limited_delivered_bytes.saturating_sub(lost_bytes as u64)
}
}
#[inline]
pub fn on_packet_discarded(&mut self, bytes_sent: usize) {
if let Some(ref mut app_limited_delivered_bytes) = self.app_limited_delivered_bytes {
*app_limited_delivered_bytes =
app_limited_delivered_bytes.saturating_sub(bytes_sent as u64)
}
}
#[inline]
pub fn on_explicit_congestion(&mut self, ce_count: u64) {
self.ecn_ce_count += ce_count;
self.rate_sample.ecn_ce_count = self.ecn_ce_count - self.rate_sample.prior_ecn_ce_count;
}
pub fn on_app_limited(&mut self, bytes_in_flight: u32) {
self.app_limited_delivered_bytes = Some(self.delivered_bytes + bytes_in_flight as u64);
}
#[cfg(test)]
pub fn set_rate_sample_for_test(&mut self, rate_sample: RateSample) {
self.rate_sample = rate_sample
}
#[cfg(test)]
pub fn set_delivered_bytes_for_test(&mut self, delivered_bytes: u64) {
self.delivered_bytes = delivered_bytes
}
}
#[cfg(test)]
mod tests;