use std::time::Instant;
use crate::recovery::gcongestion::bbr2::BBRv2;
use crate::recovery::gcongestion::Bandwidth;
use crate::recovery::gcongestion::CongestionControl;
use crate::recovery::rtt::RttStats;
use crate::recovery::RecoveryStats;
use crate::recovery::ReleaseDecision;
use crate::recovery::ReleaseTime;
use super::Acked;
use super::Lost;
const LUMPY_PACING_CWND_FRACTION: f64 = 0.25;
const LUMPY_PACING_SIZE: usize = 2;
const LUMPY_PACING_MIN_BANDWIDTH_KBPS: Bandwidth =
Bandwidth::from_kbits_per_second(1_200);
const INITIAL_UNPACED_BURST: usize = 10;
#[derive(Debug)]
pub struct Pacer {
enabled: bool,
sender: BBRv2,
max_pacing_rate: Option<Bandwidth>,
burst_tokens: usize,
ideal_next_packet_send_time: ReleaseTime,
initial_burst_size: usize,
lumpy_tokens: usize,
pacing_limited: bool,
}
impl Pacer {
pub(crate) fn new(
enabled: bool, congestion: BBRv2, max_pacing_rate: Option<Bandwidth>,
) -> Self {
Pacer {
enabled,
sender: congestion,
max_pacing_rate,
burst_tokens: INITIAL_UNPACED_BURST,
ideal_next_packet_send_time: ReleaseTime::Immediate,
initial_burst_size: INITIAL_UNPACED_BURST,
lumpy_tokens: 0,
pacing_limited: false,
}
}
pub fn get_next_release_time(&self) -> ReleaseDecision {
if !self.enabled {
return ReleaseDecision {
time: ReleaseTime::Immediate,
allow_burst: true,
};
}
let allow_burst = self.burst_tokens > 0 || self.lumpy_tokens > 0;
ReleaseDecision {
time: self.ideal_next_packet_send_time,
allow_burst,
}
}
#[cfg(feature = "qlog")]
pub fn state_str(&self) -> &'static str {
self.sender.state_str()
}
pub fn get_congestion_window(&self) -> usize {
self.sender.get_congestion_window()
}
pub fn on_packet_sent(
&mut self, sent_time: Instant, bytes_in_flight: usize,
packet_number: u64, bytes: usize, is_retransmissible: bool,
rtt_stats: &RttStats,
) {
self.sender.on_packet_sent(
sent_time,
bytes_in_flight,
packet_number,
bytes,
is_retransmissible,
);
if !self.enabled || !is_retransmissible {
return;
}
if bytes_in_flight == 0 && !self.sender.is_in_recovery() {
self.burst_tokens = self
.initial_burst_size
.min(self.sender.get_congestion_window_in_packets());
}
if self.burst_tokens > 0 {
self.burst_tokens -= 1;
self.ideal_next_packet_send_time = ReleaseTime::Immediate;
self.pacing_limited = false;
return;
}
let delay = self
.pacing_rate(bytes_in_flight + bytes, rtt_stats)
.transfer_time(bytes);
if !self.pacing_limited || self.lumpy_tokens == 0 {
self.lumpy_tokens = 1.max(LUMPY_PACING_SIZE.min(
(self.sender.get_congestion_window_in_packets() as f64 *
LUMPY_PACING_CWND_FRACTION) as usize,
));
if self.sender.bandwidth_estimate(rtt_stats) <
LUMPY_PACING_MIN_BANDWIDTH_KBPS
{
self.lumpy_tokens = 1;
}
if bytes_in_flight + bytes >= self.sender.get_congestion_window() {
self.lumpy_tokens = 1;
}
}
self.lumpy_tokens -= 1;
self.ideal_next_packet_send_time.set_max(sent_time);
self.ideal_next_packet_send_time.inc(delay);
self.pacing_limited = self.sender.can_send(bytes_in_flight + bytes);
}
#[allow(clippy::too_many_arguments)]
#[inline]
pub fn on_congestion_event(
&mut self, rtt_updated: bool, prior_in_flight: usize,
bytes_in_flight: usize, event_time: Instant, acked_packets: &[Acked],
lost_packets: &[Lost], least_unacked: u64, rtt_stats: &RttStats,
recovery_stats: &mut RecoveryStats,
) {
self.sender.on_congestion_event(
rtt_updated,
prior_in_flight,
bytes_in_flight,
event_time,
acked_packets,
lost_packets,
least_unacked,
rtt_stats,
recovery_stats,
);
if !self.enabled {
return;
}
if !lost_packets.is_empty() {
self.burst_tokens = 0;
}
if let Some(max_pacing_rate) = self.max_pacing_rate {
if rtt_updated {
let max_rate = max_pacing_rate * 1.25f32;
let max_cwnd =
max_rate.to_bytes_per_period(rtt_stats.smoothed_rtt);
self.sender.limit_cwnd(max_cwnd as usize);
}
}
}
pub fn on_packet_neutered(&mut self, packet_number: u64) {
self.sender.on_packet_neutered(packet_number);
}
pub fn on_retransmission_timeout(&mut self, packets_retransmitted: bool) {
self.sender.on_retransmission_timeout(packets_retransmitted)
}
pub fn pacing_rate(
&self, bytes_in_flight: usize, rtt_stats: &RttStats,
) -> Bandwidth {
let sender_rate = self.sender.pacing_rate(bytes_in_flight, rtt_stats);
match self.max_pacing_rate {
Some(rate) if self.enabled => rate.min(sender_rate),
_ => sender_rate,
}
}
pub fn bandwidth_estimate(&self, rtt_stats: &RttStats) -> Bandwidth {
self.sender.bandwidth_estimate(rtt_stats)
}
pub fn max_bandwidth(&self) -> Bandwidth {
self.sender.max_bandwidth()
}
pub fn on_app_limited(&mut self, bytes_in_flight: usize) {
self.pacing_limited = false;
self.sender.on_app_limited(bytes_in_flight);
}
pub fn update_mss(&mut self, new_mss: usize) {
self.sender.update_mss(new_mss)
}
#[cfg(feature = "qlog")]
pub fn ssthresh(&self) -> Option<u64> {
self.sender.ssthresh()
}
#[cfg(test)]
pub fn is_app_limited(&self, bytes_in_flight: usize) -> bool {
!self.is_cwnd_limited(bytes_in_flight)
}
#[cfg(test)]
fn is_cwnd_limited(&self, bytes_in_flight: usize) -> bool {
!self.pacing_limited && self.sender.is_cwnd_limited(bytes_in_flight)
}
}