use std::cmp;
use std::collections::VecDeque;
use std::ops::Range;
use std::time;
use std::time::Duration;
use std::time::Instant;
use log::*;
use super::rtt::RttEstimator;
use super::space::AckedPacket;
use super::space::PacketNumSpace;
use super::space::PacketNumSpaceMap;
use super::space::SentPacket;
use super::space::SpaceId;
use super::space::SpaceId::*;
use super::Connection;
use super::HandshakeStatus;
use crate::congestion_control;
use crate::congestion_control::CongestionController;
use crate::congestion_control::Pacer;
use crate::connection::Timer;
use crate::frame;
#[cfg(feature = "qlog")]
use crate::qlog;
#[cfg(feature = "qlog")]
use crate::qlog::events::EventData;
use crate::ranges::RangeSet;
use crate::Error;
use crate::PathStats;
use crate::RecoveryConfig;
use crate::Result;
use crate::TIMER_GRANULARITY;
const INITIAL_PACKET_THRESHOLD: u64 = 3;
const INITIAL_TIME_THRESHOLD: f64 = 9.0 / 8.0;
const MAX_PTO_PROBES_COUNT: usize = 2;
pub struct Recovery {
pub max_ack_delay: Duration,
pub max_datagram_size: usize,
pto_linear_factor: u64,
max_pto: Duration,
pto_count: usize,
loss_detection_timer: Option<Instant>,
pub pkt_thresh: u64,
pub time_thresh: f64,
pub bytes_in_flight: usize,
pub ack_eliciting_in_flight: u64,
pub rtt: RttEstimator,
pub congestion: Box<dyn CongestionController>,
pub pacer: Pacer,
pub pacer_timer: Option<Instant>,
pub cache_pkt_size: usize,
last_cwnd_limited_time: Option<Instant>,
pub stats: PathStats,
#[cfg(feature = "qlog")]
last_metrics: RecoveryMetrics,
trace_id: String,
}
impl Recovery {
pub(super) fn new(conf: &RecoveryConfig) -> Self {
Recovery {
max_ack_delay: conf.max_ack_delay,
max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE,
pto_linear_factor: conf.pto_linear_factor,
max_pto: conf.max_pto,
pto_count: 0,
loss_detection_timer: None,
pkt_thresh: INITIAL_PACKET_THRESHOLD,
time_thresh: INITIAL_TIME_THRESHOLD,
bytes_in_flight: 0,
ack_eliciting_in_flight: 0,
rtt: RttEstimator::new(conf.initial_rtt),
congestion: congestion_control::build_congestion_controller(conf),
pacer: Pacer::build_pacer_controller(conf),
pacer_timer: None,
cache_pkt_size: conf.max_datagram_size,
last_cwnd_limited_time: None,
stats: PathStats::default(),
#[cfg(feature = "qlog")]
last_metrics: RecoveryMetrics::default(),
trace_id: String::from(""),
}
}
pub fn set_trace_id(&mut self, trace_id: &str) {
self.trace_id = trace_id.to_string();
}
pub(super) fn on_packet_sent(
&mut self,
mut pkt: SentPacket,
space_id: SpaceId,
spaces: &mut PacketNumSpaceMap,
handshake_status: HandshakeStatus,
now: Instant,
) {
let in_flight = pkt.in_flight;
let ack_eliciting = pkt.ack_eliciting;
let pacing = pkt.pacing;
let sent_size = pkt.sent_size;
pkt.time_sent = now;
let space = match spaces.get_mut(space_id) {
Some(space) => space,
None => return,
};
if in_flight {
if space_id != SpaceId::Initial && space_id != SpaceId::Handshake {
self.congestion
.on_sent(now, &mut pkt, self.bytes_in_flight as u64);
trace!(
"now={:?} {} {} ON_SENT {:?} inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
pkt,
self.bytes_in_flight,
self.congestion.congestion_window()
);
}
}
space.sent.push_back(pkt);
if ack_eliciting {
space.consecutive_non_ack_eliciting_sent = 0;
} else {
space.consecutive_non_ack_eliciting_sent += 1;
}
if in_flight {
if ack_eliciting {
space.time_of_last_sent_ack_eliciting_pkt = Some(now);
space.loss_probes = space.loss_probes.saturating_sub(1);
space.ack_eliciting_in_flight += 1;
self.ack_eliciting_in_flight += 1;
}
space.bytes_in_flight += sent_size;
self.bytes_in_flight += sent_size;
self.cache_pkt_size = sent_size;
self.set_loss_detection_timer(space_id, spaces, handshake_status, now);
}
if pacing {
self.pacer.on_sent(sent_size as u64);
}
}
#[allow(clippy::too_many_arguments)]
pub(super) fn on_ack_received(
&mut self,
ranges: &RangeSet,
ack_delay: u64,
space_id: SpaceId,
spaces: &mut PacketNumSpaceMap,
handshake_status: HandshakeStatus,
#[cfg(feature = "qlog")] qlog: Option<&mut qlog::QlogWriter>,
now: Instant,
) -> Result<(u64, u64)> {
let space = spaces.get_mut(space_id).ok_or(Error::InternalError)?;
let largest_acked_pkt = ranges.max().unwrap();
if space.largest_acked_pkt == u64::MAX {
space.largest_acked_pkt = largest_acked_pkt;
} else {
space.largest_acked_pkt = cmp::max(space.largest_acked_pkt, largest_acked_pkt);
}
if space_id != SpaceId::Initial && space_id != SpaceId::Handshake {
self.congestion.begin_ack(now, self.bytes_in_flight as u64);
}
trace!(
"now={:?} {} {} BEGIN_ACK inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
self.bytes_in_flight,
self.congestion.congestion_window()
);
let mut newly_acked_pkts = Vec::<AckedPacket>::new();
let rtt_sample = self.detect_acked_packets(ranges, space, &mut newly_acked_pkts, now);
if newly_acked_pkts.is_empty() {
return Ok((0, 0));
}
if let Some(rtt) = rtt_sample {
let ack_delay = Duration::from_micros(ack_delay);
if !rtt.is_zero() {
self.rtt.update(ack_delay, rtt);
}
}
let (lost_packets, lost_bytes) = self.detect_lost_packets(
space,
#[cfg(feature = "qlog")]
qlog,
now,
);
self.drain_sent_packets(space, now, self.rtt.smoothed_rtt());
if space_id != SpaceId::Initial && space_id != SpaceId::Handshake {
self.congestion.end_ack();
}
self.pto_count = 0;
self.set_loss_detection_timer(space_id, spaces, handshake_status, now);
Ok((lost_packets, lost_bytes))
}
fn detect_acked_packets(
&mut self,
ranges: &RangeSet,
space: &mut PacketNumSpace,
newly_acked: &mut Vec<AckedPacket>,
now: Instant,
) -> Option<Duration> {
let mut largest_newly_acked_pkt_num = 0;
let mut largest_newly_acked_sent_time = now;
let mut newly_ack_eliciting_pkt_acked = false;
let mut idx = 0;
'ranges_loop: for r in ranges.iter() {
'sent_pkt_loop: while idx < space.sent.len() {
let sent_pkt = space.sent.get_mut(idx).unwrap();
if sent_pkt.pkt_num < r.start || sent_pkt.time_acked.is_some() {
idx += 1;
continue 'sent_pkt_loop;
}
if sent_pkt.pkt_num >= r.end {
continue 'ranges_loop;
}
sent_pkt.time_acked = Some(now);
largest_newly_acked_pkt_num = sent_pkt.pkt_num;
largest_newly_acked_sent_time = sent_pkt.time_sent;
if sent_pkt.ack_eliciting {
newly_ack_eliciting_pkt_acked = true
}
if sent_pkt.in_flight {
space.bytes_in_flight =
space.bytes_in_flight.saturating_sub(sent_pkt.sent_size);
self.bytes_in_flight = self.bytes_in_flight.saturating_sub(sent_pkt.sent_size);
if sent_pkt.ack_eliciting {
space.ack_eliciting_in_flight =
space.ack_eliciting_in_flight.saturating_sub(1);
self.ack_eliciting_in_flight =
self.ack_eliciting_in_flight.saturating_sub(1);
}
}
if space.id != SpaceId::Initial && space.id != SpaceId::Handshake {
self.congestion.on_ack(
sent_pkt,
now,
false,
&self.rtt,
self.bytes_in_flight as u64,
);
}
trace!(
"now={:?} {} {} ON_ACK {:?} inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
sent_pkt,
self.bytes_in_flight,
self.congestion.congestion_window()
);
self.stat_acked_event(1, sent_pkt.sent_size as u64);
space.acked.append(&mut sent_pkt.frames);
newly_acked.push(AckedPacket {
pkt_num: sent_pkt.pkt_num,
time_sent: sent_pkt.time_sent,
rtt: now.saturating_duration_since(sent_pkt.time_sent),
});
idx += 1;
if idx == space.sent.len() {
break 'ranges_loop;
}
}
}
if largest_newly_acked_pkt_num == space.largest_acked_pkt && newly_ack_eliciting_pkt_acked {
let latest_rtt = now.saturating_duration_since(largest_newly_acked_sent_time);
Some(latest_rtt)
} else {
None
}
}
fn in_persistent_congestion(&self) -> bool {
false
}
fn detect_lost_packets(
&mut self,
space: &mut PacketNumSpace,
#[cfg(feature = "qlog")] mut qlog: Option<&mut qlog::QlogWriter>,
now: Instant,
) -> (u64, u64) {
space.loss_time = None;
let mut lost_packets = 0;
let mut lost_bytes = 0;
let mut latest_lost_packet = None;
let loss_delay =
cmp::max(self.rtt.latest_rtt(), self.rtt.smoothed_rtt()).mul_f64(self.time_thresh);
let loss_delay = cmp::max(loss_delay, TIMER_GRANULARITY);
let lost_send_time = now - loss_delay;
let unacked_iter = space
.sent
.iter_mut()
.take_while(|p| p.pkt_num <= space.largest_acked_pkt)
.filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
for unacked in unacked_iter {
if unacked.time_sent <= lost_send_time
|| unacked.pkt_num + self.pkt_thresh <= space.largest_acked_pkt
{
space.lost.append(&mut unacked.frames);
unacked.time_lost = Some(now);
lost_packets += 1;
if unacked.in_flight {
lost_bytes += unacked.sent_size as u64;
space.bytes_in_flight = space.bytes_in_flight.saturating_sub(unacked.sent_size);
self.bytes_in_flight = self.bytes_in_flight.saturating_sub(unacked.sent_size);
if unacked.ack_eliciting {
space.ack_eliciting_in_flight =
space.ack_eliciting_in_flight.saturating_sub(1);
self.ack_eliciting_in_flight =
self.ack_eliciting_in_flight.saturating_sub(1);
}
}
if !unacked.pmtu_probe {
latest_lost_packet = Some(unacked.clone());
}
#[cfg(feature = "qlog")]
if let Some(qlog) = qlog.as_mut() {
self.qlog_recovery_packet_lost(qlog, unacked);
}
trace!(
"now={:?} {} {} ON_LOST {:?} inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
unacked,
self.bytes_in_flight,
self.congestion.congestion_window()
);
} else {
let loss_time = match space.loss_time {
None => unacked.time_sent + loss_delay,
Some(loss_time) => cmp::min(loss_time, unacked.time_sent + loss_delay),
};
space.loss_time = Some(loss_time);
}
}
if let Some(lost_packet) = latest_lost_packet {
if space.id != SpaceId::Initial && space.id != SpaceId::Handshake {
self.congestion.on_congestion_event(
now,
&lost_packet,
self.in_persistent_congestion(),
lost_bytes,
self.bytes_in_flight as u64,
);
trace!(
"now={:?} {} {} ON_CONGESTION_EVENT lost_size={} inflight={} cwnd={}",
now,
self.trace_id,
self.congestion.name(),
lost_bytes,
self.bytes_in_flight,
self.congestion.congestion_window()
);
}
}
self.stat_lost_event(lost_packets, lost_bytes);
(lost_packets, lost_bytes)
}
fn drain_sent_packets(&mut self, space: &mut PacketNumSpace, now: Instant, rtt: Duration) {
let mut lowest_non_expired_pkt_index = space.sent.len();
for (i, pkt) in space.sent.iter().enumerate() {
if pkt.time_acked.is_none() && pkt.time_lost.is_none() {
lowest_non_expired_pkt_index = i;
break;
}
if let Some(time_lost) = pkt.time_lost {
if time_lost + rtt > now {
lowest_non_expired_pkt_index = i;
break;
}
}
}
space.sent.drain(..lowest_non_expired_pkt_index);
}
fn set_loss_detection_timer(
&mut self,
space_id: SpaceId,
spaces: &mut PacketNumSpaceMap,
handshake_status: HandshakeStatus,
now: Instant,
) {
let (earliest_loss_time, _) = self.get_loss_time_and_space(space_id, spaces);
if earliest_loss_time.is_some() {
self.loss_detection_timer = earliest_loss_time;
return;
}
if self.ack_eliciting_in_flight == 0 && handshake_status.peer_verified_address {
self.loss_detection_timer = None;
return;
}
let (timeout, _) = self.get_pto_time_and_space(space_id, spaces, handshake_status, now);
self.loss_detection_timer = timeout;
}
pub(super) fn loss_detection_timer(&self) -> Option<Instant> {
self.loss_detection_timer
}
pub(super) fn on_loss_detection_timeout(
&mut self,
space_id: SpaceId,
spaces: &mut PacketNumSpaceMap,
handshake_status: HandshakeStatus,
#[cfg(feature = "qlog")] qlog: Option<&mut qlog::QlogWriter>,
now: Instant,
) -> (u64, u64) {
let (earliest_loss_time, sid) = self.get_loss_time_and_space(space_id, spaces);
let space = match spaces.get_mut(sid) {
Some(space) => space,
None => return (0, 0),
};
if earliest_loss_time.is_some() {
let (lost_packets, lost_bytes) = self.detect_lost_packets(
space,
#[cfg(feature = "qlog")]
qlog,
now,
);
self.drain_sent_packets(space, now, self.rtt.smoothed_rtt());
self.set_loss_detection_timer(space_id, spaces, handshake_status, now);
return (lost_packets, lost_bytes);
}
let sid = if self.ack_eliciting_in_flight > 0 {
let (_, e) = self.get_pto_time_and_space(space_id, spaces, handshake_status, now);
e
} else {
if handshake_status.derived_handshake_keys {
Handshake
} else {
Initial
}
};
let space = match spaces.get_mut(sid) {
Some(space) => space,
None => return (0, 0),
};
self.pto_count += 1;
space.loss_probes = match sid {
Initial | Handshake => 1,
_ => cmp::min(self.pto_count, MAX_PTO_PROBES_COUNT),
};
let unacked_iter = space
.sent
.iter_mut()
.filter(|p| p.has_data && p.time_acked.is_none() && p.time_lost.is_none())
.take(space.loss_probes);
for unacked in unacked_iter {
space.lost.extend_from_slice(&unacked.frames);
}
self.set_loss_detection_timer(space_id, spaces, handshake_status, now);
(0, 0)
}
fn get_loss_time_and_space(
&mut self,
space_id: SpaceId,
spaces: &PacketNumSpaceMap,
) -> (Option<Instant>, SpaceId) {
let candidates = if space_id != Initial && space_id != Handshake {
[Initial, Handshake, space_id]
} else {
[Initial, Handshake, Data]
};
let mut sid = Initial;
let mut time = None;
for s in candidates {
let space = match spaces.get(s) {
Some(space) => space,
None => continue,
};
let new_time = space.loss_time;
if time.is_none() || (new_time.is_some() && new_time < time) {
time = new_time;
sid = s;
}
}
(time, sid)
}
fn calculate_pto(&self) -> Duration {
let backoff_factor = self
.pto_count
.saturating_sub(self.pto_linear_factor as usize);
cmp::min(
self.rtt.pto_base() * 2_u32.saturating_pow(backoff_factor as u32),
self.max_pto,
)
}
fn pto_with_ack_delay(&self, duration: Duration) -> Duration {
let backoff_factor = self
.pto_count
.saturating_sub(self.pto_linear_factor as usize);
cmp::min(
duration + self.max_ack_delay * 2_u32.saturating_pow(backoff_factor as u32),
self.max_pto,
)
}
fn get_pto_time_and_space(
&self,
space_id: SpaceId,
spaces: &mut PacketNumSpaceMap,
handshake_status: HandshakeStatus,
now: Instant,
) -> (Option<Instant>, SpaceId) {
let mut duration = self.calculate_pto();
if self.ack_eliciting_in_flight == 0 {
if handshake_status.derived_handshake_keys {
return (Some(now + duration), SpaceId::Handshake);
} else {
return (Some(now + duration), SpaceId::Initial);
}
}
let candidates = if space_id != Initial && space_id != Handshake {
[Initial, Handshake, space_id]
} else {
[Initial, Handshake, Data]
};
let mut pto_timeout = None;
let mut pto_space = Initial;
for sid in candidates {
let space = match spaces.get_mut(sid) {
Some(space) => space,
None => continue,
};
if space.ack_eliciting_in_flight == 0 {
continue;
}
if sid == Data {
if !handshake_status.completed {
return (pto_timeout, pto_space);
}
duration = self.pto_with_ack_delay(duration);
}
let new_time = space
.time_of_last_sent_ack_eliciting_pkt
.map(|t| t + duration);
if pto_timeout.is_none() || new_time < pto_timeout {
pto_timeout = new_time;
pto_space = sid;
}
}
(pto_timeout, pto_space)
}
pub(super) fn on_pkt_num_space_discarded(
&mut self,
space_id: SpaceId,
spaces: &mut PacketNumSpaceMap,
handshake_status: HandshakeStatus,
now: Instant,
) {
let space = match spaces.get_mut(space_id) {
Some(space) => space,
None => return,
};
self.remove_from_bytes_in_flight(space);
space.sent.clear();
space.lost.clear();
space.acked.clear();
space.time_of_last_sent_ack_eliciting_pkt = None;
space.loss_time = None;
space.loss_probes = 0;
space.bytes_in_flight = 0;
space.ack_eliciting_in_flight = 0;
self.set_loss_detection_timer(space_id, spaces, handshake_status, now);
}
fn remove_from_bytes_in_flight(&mut self, space: &PacketNumSpace) {
for pkt in &space.sent {
if !pkt.in_flight || pkt.time_acked.is_some() || pkt.time_lost.is_some() {
continue;
}
self.bytes_in_flight = self.bytes_in_flight.saturating_sub(pkt.sent_size);
if pkt.ack_eliciting {
self.ack_eliciting_in_flight = self.ack_eliciting_in_flight.saturating_sub(1);
}
}
}
pub(super) fn update_max_datagram_size(
&mut self,
mut max_datagram_size: usize,
is_upper: bool,
) {
if is_upper {
max_datagram_size = cmp::min(self.max_datagram_size, max_datagram_size);
}
self.max_datagram_size = max_datagram_size;
}
pub(crate) fn can_send(&mut self) -> bool {
if self.bytes_in_flight >= self.congestion.congestion_window() as usize {
trace!(
"{} sending is limited by congestion controller, inflight {}, window {}",
self.trace_id,
self.bytes_in_flight,
self.congestion.congestion_window()
);
return false;
}
if self.pacer.enabled() && !self.can_pacing() {
trace!(
"{} sending is limited by pacer, pacing timer {:?}",
self.trace_id,
self.pacer_timer
.map(|t| t.saturating_duration_since(Instant::now()))
);
return false;
}
true
}
fn can_pacing(&mut self) -> bool {
let now = time::Instant::now();
let cwnd = self.congestion.congestion_window();
let srtt = self.rtt.smoothed_rtt() as Duration;
if let Some(pr) = self.congestion.pacing_rate() {
self.pacer_timer = self.pacer.schedule(
self.cache_pkt_size as u64,
pr,
srtt,
cwnd,
self.max_datagram_size as u64,
now,
);
}
self.pacer_timer.is_none()
}
pub(crate) fn stat_sent_event(&mut self, sent_pkts: u64, sent_bytes: u64) {
self.stats.sent_count = self.stats.sent_count.saturating_add(sent_pkts);
self.stats.sent_bytes = self.stats.sent_bytes.saturating_add(sent_bytes);
self.stat_cwnd_updated();
}
pub(crate) fn stat_recv_event(&mut self, recv_pkts: u64, recv_bytes: u64) {
self.stats.recv_count = self.stats.recv_count.saturating_add(recv_pkts);
self.stats.recv_bytes = self.stats.recv_bytes.saturating_add(recv_bytes);
}
pub(crate) fn stat_acked_event(&mut self, acked_pkts: u64, acked_bytes: u64) {
self.stats.acked_count = self.stats.acked_count.saturating_add(acked_pkts);
self.stats.acked_bytes = self.stats.acked_bytes.saturating_add(acked_bytes);
}
pub(crate) fn stat_lost_event(&mut self, lost_pkts: u64, lost_bytes: u64) {
self.stats.lost_count = self.stats.lost_count.saturating_add(lost_pkts);
self.stats.lost_bytes = self.stats.lost_bytes.saturating_add(lost_bytes);
}
pub(crate) fn stat_cwnd_updated(&mut self) {
let cwnd = self.congestion.congestion_window();
if self.stats.init_cwnd == 0 {
self.stats.init_cwnd = cwnd;
self.stats.min_cwnd = cwnd;
self.stats.max_cwnd = cwnd;
}
self.stats.final_cwnd = cwnd;
if self.stats.max_cwnd < cwnd {
self.stats.max_cwnd = cwnd;
}
if self.stats.min_cwnd > cwnd {
self.stats.min_cwnd = cwnd;
}
let bytes_in_flight = self.bytes_in_flight as u64;
if self.stats.max_inflight < bytes_in_flight {
self.stats.max_inflight = bytes_in_flight;
}
}
pub(crate) fn stat_cwnd_limited(&mut self) {
let is_cwnd_limited = self.bytes_in_flight >= self.congestion.congestion_window() as usize;
let now = Instant::now();
if let Some(last_cwnd_limited_time) = self.last_cwnd_limited_time {
let duration = now.saturating_duration_since(last_cwnd_limited_time);
let duration = duration.as_millis() as u64;
self.stats.cwnd_limited_duration =
self.stats.cwnd_limited_duration.saturating_add(duration);
if is_cwnd_limited {
self.last_cwnd_limited_time = Some(now);
} else {
self.last_cwnd_limited_time = None;
}
} else if is_cwnd_limited {
self.stats.cwnd_limited_count = self.stats.cwnd_limited_count.saturating_add(1);
self.last_cwnd_limited_time = Some(now);
}
}
pub(crate) fn stat_lazy_update(&mut self) {
self.stats.min_rtt = self.rtt.min_rtt().as_micros() as u64;
self.stats.max_rtt = self.rtt.max_rtt().as_micros() as u64;
self.stats.srtt = self.rtt.smoothed_rtt().as_micros() as u64;
self.stats.rttvar = self.rtt.rttvar().as_micros() as u64;
self.stats.in_slow_start = self.congestion.in_slow_start();
self.stats.pacing_rate = self.congestion.pacing_rate().unwrap_or_default();
}
#[cfg(feature = "qlog")]
pub(crate) fn qlog_recovery_metrics_updated(&mut self, qlog: &mut qlog::QlogWriter) {
let mut updated = false;
let mut min_rtt = None;
if self.last_metrics.min_rtt != self.rtt.min_rtt() {
self.last_metrics.min_rtt = self.rtt.min_rtt();
min_rtt = Some(self.last_metrics.min_rtt.as_secs_f32() * 1000.0);
updated = true;
}
let mut smoothed_rtt = None;
if self.last_metrics.smoothed_rtt != self.rtt.smoothed_rtt() {
self.last_metrics.smoothed_rtt = self.rtt.smoothed_rtt();
smoothed_rtt = Some(self.last_metrics.smoothed_rtt.as_secs_f32() * 1000.0);
updated = true;
}
let mut latest_rtt = None;
if self.last_metrics.latest_rtt != self.rtt.latest_rtt() {
self.last_metrics.latest_rtt = self.rtt.latest_rtt();
latest_rtt = Some(self.last_metrics.latest_rtt.as_secs_f32() * 1000.0);
updated = true;
}
let mut rtt_variance = None;
if self.last_metrics.rttvar != self.rtt.rttvar() {
self.last_metrics.rttvar = self.rtt.rttvar();
rtt_variance = Some(self.last_metrics.rttvar.as_secs_f32() * 1000.0);
updated = true;
}
let mut congestion_window = None;
if self.last_metrics.cwnd != self.congestion.congestion_window() {
self.last_metrics.cwnd = self.congestion.congestion_window();
congestion_window = Some(self.last_metrics.cwnd);
updated = true;
}
let mut bytes_in_flight = None;
if self.last_metrics.bytes_in_flight != self.bytes_in_flight as u64 {
self.last_metrics.bytes_in_flight = self.bytes_in_flight as u64;
bytes_in_flight = Some(self.last_metrics.bytes_in_flight);
updated = true;
}
let mut pacing_rate = None;
if self.last_metrics.pacing_rate != self.congestion.pacing_rate() {
self.last_metrics.pacing_rate = self.congestion.pacing_rate();
pacing_rate = self.last_metrics.pacing_rate.map(|v| v * 8); updated = true;
}
if !updated {
return;
}
let ev_data = EventData::RecoveryMetricsUpdated {
min_rtt,
smoothed_rtt,
latest_rtt,
rtt_variance,
pto_count: None,
congestion_window,
bytes_in_flight,
ssthresh: None,
packets_in_flight: None,
pacing_rate,
};
qlog.add_event_data(Instant::now(), ev_data).ok();
}
#[cfg(feature = "qlog")]
pub(crate) fn qlog_recovery_packet_lost(
&mut self,
qlog: &mut qlog::QlogWriter,
pkt: &SentPacket,
) {
let ev_data = EventData::RecoveryPacketLost {
header: Some(qlog::events::PacketHeader {
packet_type: pkt.pkt_type.to_qlog(),
packet_number: pkt.pkt_num,
..qlog::events::PacketHeader::default()
}),
frames: None,
is_mtu_probe_packet: None,
trigger: None,
};
qlog.add_event_data(Instant::now(), ev_data).ok();
}
}
#[cfg(feature = "qlog")]
#[derive(Default)]
struct RecoveryMetrics {
min_rtt: Duration,
smoothed_rtt: Duration,
latest_rtt: Duration,
rttvar: Duration,
cwnd: u64,
bytes_in_flight: u64,
pacing_rate: Option<u64>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::congestion_control::CongestionControlAlgorithm;
use crate::connection::space::RateSamplePacketState;
use crate::ranges::RangeSet;
use std::time::Duration;
use std::time::Instant;
fn new_test_sent_packet(pkt_num: u64, sent_size: usize, now: Instant) -> SentPacket {
SentPacket {
pkt_num,
frames: vec![],
time_sent: now,
time_acked: None,
time_lost: None,
sent_size,
ack_eliciting: true,
in_flight: true,
has_data: true,
rate_sample_state: Default::default(),
..SentPacket::default()
}
}
fn new_test_recovery_config() -> RecoveryConfig {
RecoveryConfig {
max_datagram_size: 1200,
max_ack_delay: Duration::from_millis(100),
congestion_control_algorithm: CongestionControlAlgorithm::Bbr,
min_congestion_window: 2_u64,
initial_congestion_window: 10_u64,
initial_rtt: crate::INITIAL_RTT,
pto_linear_factor: crate::DEFAULT_PTO_LINEAR_FACTOR,
max_pto: crate::MAX_PTO,
..RecoveryConfig::default()
}
}
#[test]
fn loss_on_timeout() -> Result<()> {
let conf = new_test_recovery_config();
let mut recovery = Recovery::new(&conf);
let mut spaces = PacketNumSpaceMap::new();
let space_id = SpaceId::Handshake;
let status = HandshakeStatus {
derived_handshake_keys: true,
peer_verified_address: true,
completed: false,
};
let mut now = Instant::now();
let sent_pkt0 = new_test_sent_packet(0, 1000, now);
recovery.on_packet_sent(sent_pkt0, space_id, &mut spaces, status, now);
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 1);
assert_eq!(spaces.get(space_id).unwrap().bytes_in_flight, 1000);
assert_eq!(recovery.bytes_in_flight, 1000);
let sent_pkt1 = new_test_sent_packet(1, 1001, now);
recovery.on_packet_sent(sent_pkt1, space_id, &mut spaces, status, now);
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 2);
assert_eq!(spaces.get(space_id).unwrap().bytes_in_flight, 2001);
assert_eq!(recovery.bytes_in_flight, 2001);
let sent_pkt2 = new_test_sent_packet(2, 1002, now);
recovery.on_packet_sent(sent_pkt2, space_id, &mut spaces, status, now);
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 3);
assert_eq!(spaces.get(space_id).unwrap().bytes_in_flight, 3003);
assert_eq!(spaces.get(space_id).unwrap().ack_eliciting_in_flight, 3);
assert_eq!(recovery.bytes_in_flight, 3003);
assert_eq!(recovery.ack_eliciting_in_flight, 3);
now += Duration::from_millis(100);
let mut acked = RangeSet::default();
acked.insert(0..1);
acked.insert(2..3);
recovery.on_ack_received(
&acked,
0,
SpaceId::Handshake,
&mut spaces,
status,
#[cfg(feature = "qlog")]
None,
now,
)?;
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 2);
assert_eq!(spaces.get(space_id).unwrap().ack_eliciting_in_flight, 1);
assert_eq!(recovery.ack_eliciting_in_flight, 1);
now = recovery.loss_detection_timer().unwrap();
let (lost_pkts, lost_bytes) = recovery.on_loss_detection_timeout(
SpaceId::Handshake,
&mut spaces,
status,
#[cfg(feature = "qlog")]
None,
now,
);
assert_eq!(lost_pkts, 1);
assert_eq!(lost_bytes, 1001);
assert_eq!(spaces.get(space_id).unwrap().ack_eliciting_in_flight, 0);
assert_eq!(recovery.ack_eliciting_in_flight, 0);
Ok(())
}
#[test]
fn loss_on_reordering() -> Result<()> {
let conf = new_test_recovery_config();
let mut recovery = Recovery::new(&conf);
let mut spaces = PacketNumSpaceMap::new();
let space_id = SpaceId::Handshake;
let status = HandshakeStatus {
derived_handshake_keys: true,
peer_verified_address: true,
completed: false,
};
let mut now = Instant::now();
let sent_pkt0 = new_test_sent_packet(0, 1000, now);
recovery.on_packet_sent(sent_pkt0, space_id, &mut spaces, status, now);
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 1);
assert_eq!(spaces.get(space_id).unwrap().bytes_in_flight, 1000);
assert_eq!(recovery.bytes_in_flight, 1000);
let sent_pkt1 = new_test_sent_packet(1, 1001, now);
recovery.on_packet_sent(sent_pkt1, space_id, &mut spaces, status, now);
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 2);
assert_eq!(spaces.get(space_id).unwrap().bytes_in_flight, 2001);
assert_eq!(recovery.bytes_in_flight, 2001);
let sent_pkt2 = new_test_sent_packet(2, 1002, now);
recovery.on_packet_sent(sent_pkt2, space_id, &mut spaces, status, now);
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 3);
assert_eq!(spaces.get(space_id).unwrap().bytes_in_flight, 3003);
assert_eq!(recovery.bytes_in_flight, 3003);
let sent_pkt2 = new_test_sent_packet(3, 1003, now);
recovery.on_packet_sent(sent_pkt2, space_id, &mut spaces, status, now);
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 4);
assert_eq!(spaces.get(space_id).unwrap().bytes_in_flight, 4006);
assert_eq!(recovery.bytes_in_flight, 4006);
now += Duration::from_millis(100);
let mut acked = RangeSet::default();
acked.insert(1..4);
let (lost_pkts, lost_bytes) = recovery.on_ack_received(
&acked,
0,
SpaceId::Handshake,
&mut spaces,
status,
#[cfg(feature = "qlog")]
None,
now,
)?;
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 4);
assert_eq!(lost_pkts, 1);
assert_eq!(lost_bytes, 1000);
now += recovery.rtt.smoothed_rtt();
let (lost_pkts, lost_bytes) = recovery.on_ack_received(
&acked,
0,
SpaceId::Handshake,
&mut spaces,
status,
#[cfg(feature = "qlog")]
None,
now,
)?;
assert_eq!(lost_pkts, 0);
assert_eq!(lost_bytes, 0);
recovery.drain_sent_packets(
spaces.get_mut(space_id).unwrap(),
now,
recovery.rtt.smoothed_rtt(),
);
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 0);
Ok(())
}
#[test]
fn pto() -> Result<()> {
let conf = new_test_recovery_config();
let mut recovery = Recovery::new(&conf);
let mut spaces = PacketNumSpaceMap::new();
let space_id = SpaceId::Handshake;
let status = HandshakeStatus {
derived_handshake_keys: true,
peer_verified_address: true,
completed: false,
};
let mut now = Instant::now();
let sent_pkt0 = new_test_sent_packet(0, 1000, now);
recovery.on_packet_sent(sent_pkt0, space_id, &mut spaces, status, now);
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 1);
assert_eq!(spaces.get(space_id).unwrap().bytes_in_flight, 1000);
assert_eq!(recovery.bytes_in_flight, 1000);
let sent_pkt1 = new_test_sent_packet(1, 1001, now);
recovery.on_packet_sent(sent_pkt1, space_id, &mut spaces, status, now);
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 2);
assert_eq!(spaces.get(space_id).unwrap().bytes_in_flight, 2001);
assert_eq!(recovery.bytes_in_flight, 2001);
now += Duration::from_millis(100);
let mut acked = RangeSet::default();
acked.insert(0..1);
let (lost_pkts, lost_bytes) = recovery.on_ack_received(
&acked,
0,
SpaceId::Handshake,
&mut spaces,
status,
#[cfg(feature = "qlog")]
None,
now,
)?;
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 1);
assert_eq!(lost_pkts, 0);
assert_eq!(lost_bytes, 0);
assert!(recovery.loss_detection_timer().is_some());
now = recovery.loss_detection_timer().unwrap();
let (lost_pkts, lost_bytes) = recovery.on_loss_detection_timeout(
SpaceId::Handshake,
&mut spaces,
status,
#[cfg(feature = "qlog")]
None,
now,
);
assert_eq!(recovery.pto_count, 1);
assert_eq!(lost_pkts, 0);
assert_eq!(lost_bytes, 0);
Ok(())
}
#[test]
fn discard_pkt_num_space() -> Result<()> {
let conf = new_test_recovery_config();
let mut recovery = Recovery::new(&conf);
let mut spaces = PacketNumSpaceMap::new();
let space_id = SpaceId::Handshake;
let status = HandshakeStatus {
derived_handshake_keys: true,
peer_verified_address: true,
completed: false,
};
let mut now = Instant::now();
let sent_pkt0 = new_test_sent_packet(0, 1000, now);
recovery.on_packet_sent(sent_pkt0, space_id, &mut spaces, status, now);
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 1);
assert_eq!(spaces.get(space_id).unwrap().bytes_in_flight, 1000);
assert_eq!(recovery.bytes_in_flight, 1000);
let sent_pkt1 = new_test_sent_packet(1, 1001, now);
recovery.on_packet_sent(sent_pkt1, space_id, &mut spaces, status, now);
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 2);
assert_eq!(spaces.get(space_id).unwrap().bytes_in_flight, 2001);
assert_eq!(recovery.bytes_in_flight, 2001);
let sent_pkt2 = new_test_sent_packet(2, 1002, now);
recovery.on_packet_sent(sent_pkt2, space_id, &mut spaces, status, now);
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 3);
assert_eq!(spaces.get(space_id).unwrap().bytes_in_flight, 3003);
assert_eq!(recovery.bytes_in_flight, 3003);
let sent_pkt3 = new_test_sent_packet(0, 1003, now);
recovery.on_packet_sent(sent_pkt3, SpaceId::Data, &mut spaces, status, now);
assert_eq!(spaces.get(SpaceId::Data).unwrap().sent.len(), 1);
assert_eq!(spaces.get(SpaceId::Data).unwrap().bytes_in_flight, 1003);
assert_eq!(recovery.bytes_in_flight, 4006);
now += Duration::from_millis(100);
let mut acked = RangeSet::default();
acked.insert(0..2);
recovery.on_ack_received(
&acked,
0,
SpaceId::Handshake,
&mut spaces,
status,
#[cfg(feature = "qlog")]
None,
now,
)?;
assert_eq!(spaces.get(SpaceId::Handshake).unwrap().sent.len(), 1);
assert_eq!(
spaces.get(SpaceId::Handshake).unwrap().bytes_in_flight,
1002
);
assert_eq!(recovery.bytes_in_flight, 2005);
recovery.on_pkt_num_space_discarded(space_id, &mut spaces, status, now);
assert_eq!(spaces.get(space_id).unwrap().sent.len(), 0);
assert_eq!(spaces.get(space_id).unwrap().bytes_in_flight, 0);
assert_eq!(spaces.get(space_id).unwrap().ack_eliciting_in_flight, 0);
assert_eq!(recovery.bytes_in_flight, 1003);
assert_eq!(recovery.ack_eliciting_in_flight, 1);
Ok(())
}
fn check_acked_packets(sent: &VecDeque<SentPacket>, acked_ranges: Vec<Range<u64>>) -> bool {
let ranges_contain = |pkt_num: u64, ranges: Vec<Range<u64>>| {
for range in &ranges {
if range.contains(&pkt_num) {
return true;
}
}
return false;
};
for sent_pkt in sent {
match sent_pkt.pkt_num {
pkt_num if ranges_contain(pkt_num, acked_ranges.clone()) => {
if sent_pkt.time_acked.is_none() {
return false;
}
}
_ => {
if sent_pkt.time_acked.is_some() {
return false;
}
}
}
}
true
}
fn generate_ack(ranges: Vec<Range<u64>>) -> RangeSet {
let mut acked = RangeSet::default();
for range in ranges {
acked.insert(range);
}
acked
}
#[test]
fn detect_acked_packets() -> Result<()> {
let conf = new_test_recovery_config();
let mut recovery = Recovery::new(&conf);
let mut spaces = PacketNumSpaceMap::new();
let status = HandshakeStatus {
derived_handshake_keys: true,
peer_verified_address: true,
completed: false,
};
let mut now = Instant::now();
for pkt_num in 100..1000 as u64 {
let sent_pkt = new_test_sent_packet(pkt_num, 1000, now);
recovery.on_packet_sent(sent_pkt, SpaceId::Data, &mut spaces, status, now);
}
now += Duration::from_millis(100);
let ack = generate_ack(vec![500..550, 600..650, 700..750, 800..850, 900..950]);
let mut newly_acked_pkts = Vec::<AckedPacket>::new();
_ = recovery.detect_acked_packets(
&ack,
spaces.get_mut(SpaceId::Data).unwrap(),
&mut newly_acked_pkts,
now,
);
assert!(check_acked_packets(
&spaces.get(SpaceId::Data).unwrap().sent,
vec![500..550, 600..650, 700..750, 800..850, 900..950],
));
let ack = generate_ack(vec![550..600, 650..700, 750..800, 850..900]);
_ = recovery.detect_acked_packets(
&ack,
spaces.get_mut(SpaceId::Data).unwrap(),
&mut newly_acked_pkts,
now,
);
assert!(check_acked_packets(
&spaces.get(SpaceId::Data).unwrap().sent,
vec![500..950],
));
recovery.on_ack_received(
&ack,
0,
SpaceId::Data,
&mut spaces,
status,
#[cfg(feature = "qlog")]
None,
now,
)?;
assert!(check_acked_packets(
&spaces.get(SpaceId::Data).unwrap().sent,
vec![500..950],
));
let ack = generate_ack(vec![0..100, 1000..1100]);
_ = recovery.detect_acked_packets(
&ack,
spaces.get_mut(SpaceId::Data).unwrap(),
&mut newly_acked_pkts,
now,
);
assert!(check_acked_packets(
&spaces.get(SpaceId::Data).unwrap().sent,
vec![500..950],
));
let ack = generate_ack(vec![50..150, 950..1050]);
_ = recovery.detect_acked_packets(
&ack,
spaces.get_mut(SpaceId::Data).unwrap(),
&mut newly_acked_pkts,
now,
);
assert!(check_acked_packets(
&spaces.get(SpaceId::Data).unwrap().sent,
vec![100..150, 500..1000],
));
let ack = generate_ack(vec![100..200, 300..400]);
_ = recovery.detect_acked_packets(
&ack,
spaces.get_mut(SpaceId::Data).unwrap(),
&mut newly_acked_pkts,
now,
);
assert!(check_acked_packets(
&spaces.get(SpaceId::Data).unwrap().sent,
vec![100..200, 300..400, 500..1000],
));
Ok(())
}
#[test]
fn check_cwnd_for_non_app_data_ack() -> Result<()> {
let conf = new_test_recovery_config();
let mut recovery = Recovery::new(&conf);
let mut spaces = PacketNumSpaceMap::new();
let space_id = SpaceId::Handshake;
let status = HandshakeStatus {
derived_handshake_keys: true,
peer_verified_address: true,
completed: false,
};
let mut now = Instant::now();
let cwnd_before_ack = recovery.congestion.congestion_window();
let sent_pkt0 = new_test_sent_packet(0, 1000, now);
recovery.on_packet_sent(sent_pkt0, space_id, &mut spaces, status, now);
let sent_pkt1 = new_test_sent_packet(1, 2000, now);
recovery.on_packet_sent(sent_pkt1, space_id, &mut spaces, status, now);
now += Duration::from_millis(100);
let mut acked = RangeSet::default();
acked.insert(0..2);
let (lost_pkts, lost_bytes) = recovery.on_ack_received(
&acked,
0,
SpaceId::Handshake,
&mut spaces,
status,
#[cfg(feature = "qlog")]
None,
now,
)?;
assert_eq!(cwnd_before_ack, recovery.congestion.congestion_window());
Ok(())
}
const MAX_PTO_UT: Duration = Duration::from_secs(30);
fn calculate_pto_with_count(count: usize) -> (Duration, Duration) {
let mut conf = new_test_recovery_config();
conf.pto_linear_factor = 2;
conf.max_pto = MAX_PTO_UT;
let mut recovery = Recovery::new(&conf);
recovery.pto_count = count;
let duration = recovery.calculate_pto();
(duration, recovery.pto_with_ack_delay(duration))
}
#[test]
fn calculate_pto() -> Result<()> {
assert_eq!(
calculate_pto_with_count(0),
(
Duration::from_millis(999), Duration::from_millis(1099) )
);
assert_eq!(
calculate_pto_with_count(2),
(
Duration::from_millis(999), Duration::from_millis(1099) )
);
assert_eq!(
calculate_pto_with_count(3),
(
Duration::from_millis(1998), Duration::from_millis(2198) )
);
assert_eq!(calculate_pto_with_count(100), (MAX_PTO_UT, MAX_PTO_UT));
Ok(())
}
}