use std::time::Duration;
use std::time::Instant;
use crate::recovery::bandwidth::Bandwidth;
use super::Acked;
use super::Sent;
#[derive(Debug)]
pub struct Rate {
delivered: usize,
delivered_time: Instant,
first_sent_time: Instant,
end_of_app_limited: u64,
last_sent_packet: u64,
largest_acked: u64,
rate_sample: RateSample,
}
impl Default for Rate {
fn default() -> Self {
let now = Instant::now();
Rate {
delivered: 0,
delivered_time: now,
first_sent_time: now,
end_of_app_limited: 0,
last_sent_packet: 0,
largest_acked: 0,
rate_sample: RateSample::new(),
}
}
}
impl Rate {
pub fn on_packet_sent(
&mut self, pkt: &mut Sent, bytes_in_flight: usize, bytes_lost: u64,
) {
if bytes_in_flight == 0 {
self.first_sent_time = pkt.time_sent;
self.delivered_time = pkt.time_sent;
}
pkt.first_sent_time = self.first_sent_time;
pkt.delivered_time = self.delivered_time;
pkt.delivered = self.delivered;
pkt.is_app_limited = self.app_limited();
pkt.tx_in_flight = bytes_in_flight;
pkt.lost = bytes_lost;
self.last_sent_packet = pkt.pkt_num;
}
pub fn update_rate_sample(&mut self, pkt: &Acked, now: Instant) {
self.delivered += pkt.size;
self.delivered_time = now;
if self.rate_sample.prior_time.is_none() ||
pkt.delivered >= self.rate_sample.prior_delivered
{
self.rate_sample.prior_delivered = pkt.delivered;
self.rate_sample.prior_time = Some(pkt.delivered_time);
self.rate_sample.is_app_limited = pkt.is_app_limited;
self.rate_sample.send_elapsed =
pkt.time_sent.saturating_duration_since(pkt.first_sent_time);
self.rate_sample.rtt = pkt.rtt;
self.rate_sample.ack_elapsed = self
.delivered_time
.saturating_duration_since(pkt.delivered_time);
self.first_sent_time = pkt.time_sent;
}
self.largest_acked = self.largest_acked.max(pkt.pkt_num);
}
pub fn generate_rate_sample(&mut self, min_rtt: Duration) {
if self.app_limited() && self.largest_acked > self.end_of_app_limited {
self.update_app_limited(false);
}
if self.rate_sample.prior_time.is_some() {
let interval = self
.rate_sample
.send_elapsed
.max(self.rate_sample.ack_elapsed);
self.rate_sample.delivered =
self.delivered - self.rate_sample.prior_delivered;
self.rate_sample.interval = interval;
if interval < min_rtt {
self.rate_sample.interval = Duration::ZERO;
return;
}
if !interval.is_zero() {
let rate_sample_bandwidth = {
let rate_sample_bytes_per_second = (self.rate_sample.delivered
as f64 /
interval.as_secs_f64())
as u64;
Bandwidth::from_bytes_per_second(rate_sample_bytes_per_second)
};
if !self.rate_sample.is_app_limited ||
rate_sample_bandwidth > self.rate_sample.bandwidth
{
self.update_delivery_rate(rate_sample_bandwidth);
}
}
}
}
fn update_delivery_rate(&mut self, bandwidth: Bandwidth) {
self.rate_sample.bandwidth = bandwidth;
}
pub fn update_app_limited(&mut self, v: bool) {
self.end_of_app_limited =
if v { self.last_sent_packet.max(1) } else { 0 };
}
pub fn app_limited(&mut self) -> bool {
self.end_of_app_limited != 0
}
#[cfg(test)]
pub fn delivered(&self) -> usize {
self.delivered
}
pub fn sample_delivery_rate(&self) -> Bandwidth {
self.rate_sample.bandwidth
}
#[cfg(test)]
pub fn sample_is_app_limited(&self) -> bool {
self.rate_sample.is_app_limited
}
}
#[derive(Debug)]
struct RateSample {
bandwidth: Bandwidth,
is_app_limited: bool,
interval: Duration,
delivered: usize,
prior_delivered: usize,
prior_time: Option<Instant>,
send_elapsed: Duration,
ack_elapsed: Duration,
rtt: Duration,
}
impl RateSample {
const fn new() -> Self {
RateSample {
bandwidth: Bandwidth::zero(),
is_app_limited: false,
interval: Duration::ZERO,
delivered: 0,
prior_delivered: 0,
prior_time: None,
send_elapsed: Duration::ZERO,
ack_elapsed: Duration::ZERO,
rtt: Duration::ZERO,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::packet;
use crate::ranges;
use crate::recovery::congestion::recovery::LegacyRecovery;
use crate::recovery::HandshakeStatus;
use crate::recovery::RecoveryOps;
use crate::test_utils;
use crate::Config;
use crate::OnAckReceivedOutcome;
use std::ops::Range;
#[test]
fn sample_is_app_limited() {
let config = Config::new(0xbabababa).unwrap();
let mut r = LegacyRecovery::new(&config);
let mut now = Instant::now();
let mss = r.max_datagram_size();
assert!(!r.congestion.delivery_rate.app_limited());
assert_eq!(r.congestion.delivery_rate.end_of_app_limited, 0);
assert!(!r.congestion.delivery_rate.sample_is_app_limited());
let rtt = Duration::from_secs(2);
helper_send_and_ack_packets(&mut r, 0..4, now, rtt, mss);
r.delivery_rate_update_app_limited(true);
assert!(r.congestion.delivery_rate.app_limited());
assert_eq!(r.congestion.delivery_rate.end_of_app_limited, 3);
assert!(r.congestion.delivery_rate.app_limited());
assert!(!r.congestion.delivery_rate.sample_is_app_limited());
now += rtt;
helper_send_and_ack_packets(&mut r, 4..8, now, rtt, mss);
assert!(!r.congestion.delivery_rate.app_limited());
assert_eq!(r.congestion.delivery_rate.end_of_app_limited, 0);
assert!(r.congestion.delivery_rate.sample_is_app_limited());
}
#[test]
fn app_limited_delivery_rate() {
let config = Config::new(0xbabababa).unwrap();
let mut r = LegacyRecovery::new(&config);
let mut now = Instant::now();
let mss = r.max_datagram_size();
assert!(!r.congestion.delivery_rate.app_limited());
assert_eq!(r.congestion.delivery_rate.end_of_app_limited, 0);
assert!(!r.congestion.delivery_rate.sample_is_app_limited());
let mut rtt = Duration::from_secs(2);
helper_send_and_ack_packets(&mut r, 0..2, now, rtt, mss);
r.delivery_rate_update_app_limited(true);
assert!(r.congestion.delivery_rate.app_limited());
assert_eq!(r.congestion.delivery_rate.end_of_app_limited, 1);
assert!(!r.congestion.delivery_rate.sample_is_app_limited());
let first_delivery_rate = r.delivery_rate().to_bytes_per_second();
let expected_delivery_rate = (mss * 2) as u64 / rtt.as_secs();
assert_eq!(expected_delivery_rate, 1200);
assert_eq!(first_delivery_rate, expected_delivery_rate);
now += rtt;
rtt = Duration::from_secs(4);
helper_send_and_ack_packets(&mut r, 2..4, now, rtt, mss);
assert!(!r.congestion.delivery_rate.app_limited());
assert_eq!(r.congestion.delivery_rate.end_of_app_limited, 0);
assert!(r.congestion.delivery_rate.sample_is_app_limited());
let expected_delivery_rate = (mss * 2) as u64 / rtt.as_secs();
assert_eq!(expected_delivery_rate, 600);
let app_limited_delivery_rate = r.delivery_rate().to_bytes_per_second();
assert_eq!(app_limited_delivery_rate, first_delivery_rate);
r.delivery_rate_update_app_limited(true);
assert!(r.congestion.delivery_rate.app_limited());
assert_eq!(r.congestion.delivery_rate.end_of_app_limited, 3);
assert!(r.congestion.delivery_rate.sample_is_app_limited());
now += rtt;
rtt = Duration::from_secs(1);
helper_send_and_ack_packets(&mut r, 4..6, now, rtt, mss);
assert!(!r.congestion.delivery_rate.app_limited());
assert_eq!(r.congestion.delivery_rate.end_of_app_limited, 0);
assert!(r.congestion.delivery_rate.sample_is_app_limited());
let expected_delivery_rate = (mss * 2) as u64 / rtt.as_secs();
assert_eq!(expected_delivery_rate, 2400);
let app_limited_delivery_rate = r.delivery_rate().to_bytes_per_second();
assert_eq!(app_limited_delivery_rate, expected_delivery_rate);
}
#[test]
fn rate_check() {
let config = Config::new(0xbabababa).unwrap();
let mut r = LegacyRecovery::new(&config);
let now = Instant::now();
let mss = r.max_datagram_size();
for pn in 0..2 {
let pkt = test_utils::helper_packet_sent(pn, now, mss);
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
}
let rtt = Duration::from_millis(50);
let now = now + rtt;
for pn in 0..2 {
let acked = Acked {
pkt_num: pn,
time_sent: now,
size: mss,
rtt,
delivered: 0,
delivered_time: now,
first_sent_time: now.checked_sub(rtt).unwrap(),
is_app_limited: false,
};
r.congestion.delivery_rate.update_rate_sample(&acked, now);
}
r.congestion.delivery_rate.generate_rate_sample(rtt);
assert_eq!(r.congestion.delivery_rate.delivered(), 2400);
assert_eq!(r.delivery_rate().to_bytes_per_second(), 48000);
}
#[test]
fn app_limited_cwnd_full() {
let config = Config::new(0xbabababa).unwrap();
let mut r = LegacyRecovery::new(&config);
let now = Instant::now();
let mss = r.max_datagram_size();
assert!(!r.app_limited());
assert!(!r.congestion.delivery_rate.sample_is_app_limited());
for pn in 0..5 {
let pkt = test_utils::helper_packet_sent(pn, now, mss);
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
}
assert!(r.app_limited());
assert!(!r.congestion.delivery_rate.sample_is_app_limited());
for pn in 5..10 {
let pkt = test_utils::helper_packet_sent(pn, now, mss);
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
}
assert!(!r.app_limited());
assert!(!r.congestion.delivery_rate.sample_is_app_limited());
}
fn helper_send_and_ack_packets(
recovery: &mut LegacyRecovery, range: Range<u64>, now: Instant,
rtt: Duration, mss: usize,
) {
for pn in range.clone() {
let pkt = test_utils::helper_packet_sent(pn, now, mss);
recovery.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
}
let packet_count = range.clone().count();
let mut acked = ranges::RangeSet::default();
acked.insert(range);
let ack_outcome = recovery
.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now + rtt,
None,
"",
)
.unwrap();
assert_eq!(ack_outcome, OnAckReceivedOutcome {
lost_packets: 0,
lost_bytes: 0,
acked_bytes: mss * packet_count,
spurious_losses: 0,
});
}
}