use std::{cmp, net::SocketAddr};
use identity_hash::IntMap;
use thiserror::Error;
use tracing::{debug, trace};
use super::{
PathStats, SpaceKind,
mtud::MtuDiscovery,
pacing::Pacer,
spaces::{PacketNumberSpace, SentPacket},
};
use crate::{
ConnectionId, Duration, FourTuple, Instant, TIMER_GRANULARITY, TransportConfig,
TransportErrorCode, VarInt,
coding::{self, Decodable, Encodable},
congestion,
frame::ObservedAddr,
};
#[cfg(feature = "qlog")]
use qlog::events::quic::RecoveryMetricsUpdated;
#[cfg_attr(test, derive(test_strategy::Arbitrary))]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default)]
pub struct PathId(pub(crate) u32);
impl std::hash::Hash for PathId {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
state.write_u32(self.0);
}
}
impl identity_hash::IdentityHashable for PathId {}
impl Decodable for PathId {
fn decode<B: bytes::Buf>(r: &mut B) -> coding::Result<Self> {
let v = VarInt::decode(r)?;
let v = u32::try_from(v.0).map_err(|_| coding::UnexpectedEnd)?;
Ok(Self(v))
}
}
impl Encodable for PathId {
fn encode<B: bytes::BufMut>(&self, w: &mut B) {
VarInt(self.0.into()).encode(w)
}
}
impl PathId {
pub const MAX: Self = Self(u32::MAX);
pub const ZERO: Self = Self(0);
pub(crate) const fn size(&self) -> usize {
VarInt(self.0 as u64).size()
}
pub fn saturating_add(self, rhs: impl Into<Self>) -> Self {
let rhs = rhs.into();
let inner = self.0.saturating_add(rhs.0);
Self(inner)
}
pub fn saturating_sub(self, rhs: impl Into<Self>) -> Self {
let rhs = rhs.into();
let inner = self.0.saturating_sub(rhs.0);
Self(inner)
}
pub(crate) fn next(&self) -> Self {
self.saturating_add(Self(1))
}
pub(crate) fn as_u32(&self) -> u32 {
self.0
}
}
impl std::fmt::Display for PathId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl<T: Into<u32>> From<T> for PathId {
fn from(source: T) -> Self {
Self(source.into())
}
}
#[derive(Debug)]
pub(super) struct PathState {
pub(super) data: PathData,
pub(super) prev: Option<(ConnectionId, PathData)>,
}
impl PathState {
pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) {
for path_data in [&mut self.data]
.into_iter()
.chain(self.prev.as_mut().map(|(_, data)| data))
{
if path_data.remove_in_flight(packet) {
return;
}
}
}
}
#[derive(Debug)]
pub(super) struct SentChallengeInfo {
pub(super) sent_instant: Instant,
pub(super) network_path: FourTuple,
}
#[derive(Debug)]
pub(super) struct PathData {
pub(super) network_path: FourTuple,
pub(super) rtt: RttEstimator,
pub(super) sending_ecn: bool,
pub(super) congestion: Box<dyn congestion::Controller>,
pub(super) pacing: Pacer,
pub(super) app_limited: bool,
on_path_challenges_unconfirmed: IntMap<u64, SentChallengeInfo>,
off_path_challenges_unconfirmed: IntMap<u64, SentChallengeInfo>,
pub(super) pending_on_path_challenge: bool,
pub(super) path_responses: PathResponses,
pub(super) validated: bool,
pub(super) total_sent: u64,
pub(super) total_recvd: u64,
pub(super) mtud: MtuDiscovery,
pub(super) first_packet_after_rtt_sample: Option<(SpaceKind, u64)>,
pub(super) in_flight: InFlight,
pub(super) observed_addr_sent: bool,
pub(super) last_observed_addr_report: Option<ObservedAddr>,
pub(super) status: PathStatusState,
first_packet: Option<u64>,
pub(super) pto_count: u32,
pub(super) idle_timeout: Option<Duration>,
pub(super) keep_alive: Option<Duration>,
pub(super) permit_idle_reset: bool,
pub(super) open_status: OpenStatus,
pub(super) draining: bool,
#[cfg(feature = "qlog")]
recovery_metrics: RecoveryMetrics,
generation: u64,
}
impl PathData {
pub(super) fn new(
network_path: FourTuple,
allow_mtud: bool,
peer_max_udp_payload_size: Option<u16>,
generation: u64,
now: Instant,
config: &TransportConfig,
) -> Self {
let congestion = config
.congestion_controller_factory
.clone()
.build(now, config.get_initial_mtu());
Self {
network_path,
rtt: RttEstimator::new(config.initial_rtt),
sending_ecn: true,
pacing: Pacer::new(
config.initial_rtt,
congestion.initial_window(),
config.get_initial_mtu(),
now,
),
congestion,
app_limited: false,
on_path_challenges_unconfirmed: Default::default(),
off_path_challenges_unconfirmed: Default::default(),
pending_on_path_challenge: false,
path_responses: PathResponses::default(),
validated: false,
total_sent: 0,
total_recvd: 0,
mtud: config
.mtu_discovery_config
.as_ref()
.filter(|_| allow_mtud)
.map_or_else(
|| MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
|mtud_config| {
MtuDiscovery::new(
config.get_initial_mtu(),
config.min_mtu,
peer_max_udp_payload_size,
mtud_config.clone(),
)
},
),
first_packet_after_rtt_sample: None,
in_flight: InFlight::new(),
observed_addr_sent: false,
last_observed_addr_report: None,
status: Default::default(),
first_packet: None,
pto_count: 0,
idle_timeout: config.default_path_max_idle_timeout,
keep_alive: config.default_path_keep_alive_interval,
permit_idle_reset: true,
open_status: OpenStatus::default(),
draining: false,
#[cfg(feature = "qlog")]
recovery_metrics: RecoveryMetrics::default(),
generation,
}
}
pub(super) fn from_previous(
network_path: FourTuple,
prev: &Self,
generation: u64,
now: Instant,
) -> Self {
let congestion = prev.congestion.clone_box();
let smoothed_rtt = prev.rtt.get();
Self {
network_path,
rtt: prev.rtt,
pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
sending_ecn: true,
congestion,
app_limited: false,
on_path_challenges_unconfirmed: Default::default(),
off_path_challenges_unconfirmed: Default::default(),
pending_on_path_challenge: false,
path_responses: PathResponses::default(),
validated: false,
total_sent: 0,
total_recvd: 0,
mtud: prev.mtud.clone(),
first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
in_flight: InFlight::new(),
observed_addr_sent: false,
last_observed_addr_report: None,
status: prev.status.clone(),
first_packet: None,
pto_count: 0,
idle_timeout: prev.idle_timeout,
keep_alive: prev.keep_alive,
permit_idle_reset: true,
open_status: OpenStatus::default(),
draining: false,
#[cfg(feature = "qlog")]
recovery_metrics: prev.recovery_metrics.clone(),
generation,
}
}
pub(super) fn is_validating_path(&self) -> bool {
!self.on_path_challenges_unconfirmed.is_empty() || self.pending_on_path_challenge
}
pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
!self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
}
pub(super) fn current_mtu(&self) -> u16 {
self.mtud.current_mtu()
}
pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketNumberSpace) {
self.in_flight.insert(&packet);
if self.first_packet.is_none() {
self.first_packet = Some(pn);
}
if let Some(forgotten) = space.sent(pn, packet) {
self.remove_in_flight(&forgotten);
}
}
pub(super) fn record_path_challenge_sent(
&mut self,
now: Instant,
token: u64,
network_path: FourTuple,
) {
let info = SentChallengeInfo {
sent_instant: now,
network_path,
};
if network_path == self.network_path {
self.on_path_challenges_unconfirmed.insert(token, info);
} else {
self.off_path_challenges_unconfirmed.insert(token, info);
}
}
pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
if packet.path_generation != self.generation {
return false;
}
self.in_flight.remove(packet);
true
}
pub(super) fn inc_total_sent(&mut self, inc: u64) {
self.total_sent = self.total_sent.saturating_add(inc);
if !self.validated {
trace!(
network_path = %self.network_path,
anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
"anti amplification budget decreased"
);
}
}
pub(super) fn inc_total_recvd(&mut self, inc: u64) {
self.total_recvd = self.total_recvd.saturating_add(inc);
if !self.validated {
trace!(
network_path = %self.network_path,
anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
"anti amplification budget increased"
);
}
}
pub(super) fn earliest_on_path_expiring_challenge(&self) -> Option<Instant> {
if self.on_path_challenges_unconfirmed.is_empty() {
return None;
}
let pto = self.rtt.pto_base();
self.on_path_challenges_unconfirmed
.values()
.map(|info| info.sent_instant + pto)
.min()
}
pub(super) fn on_path_response_received(
&mut self,
now: Instant,
token: u64,
network_path: FourTuple,
) -> OnPathResponseReceived {
match self.on_path_challenges_unconfirmed.remove(&token) {
Some(info) if info.network_path.is_probably_same_path(&self.network_path) => {
self.network_path.update_local_if_same_remote(&network_path);
let sent_instant = info.sent_instant;
if !std::mem::replace(&mut self.validated, true) {
trace!("new path validated");
}
self.on_path_challenges_unconfirmed.clear();
self.pending_on_path_challenge = false;
let rtt = now.saturating_duration_since(sent_instant);
self.rtt.reset_initial_rtt(rtt);
let prev_status = std::mem::replace(&mut self.open_status, OpenStatus::Informed);
OnPathResponseReceived::OnPath {
was_open: matches!(
prev_status,
OpenStatus::Informed | OpenStatus::Revalidating
),
}
}
Some(info) => {
self.on_path_challenges_unconfirmed
.retain(|_token, i| i.network_path == self.network_path);
if !self.on_path_challenges_unconfirmed.is_empty() {
self.pending_on_path_challenge = true;
}
OnPathResponseReceived::Ignored {
sent_on: info.network_path,
current_path: self.network_path,
}
}
None => match self.off_path_challenges_unconfirmed.remove(&token) {
Some(info) => {
self.off_path_challenges_unconfirmed
.retain(|_token, i| i.network_path.remote != info.network_path.remote);
OnPathResponseReceived::OffPath
}
None => OnPathResponseReceived::Unknown,
},
}
}
pub(super) fn reset_on_path_challenges(&mut self) {
self.on_path_challenges_unconfirmed.clear();
self.pending_on_path_challenge = false;
}
pub(super) fn has_off_path_challenges(&self) -> bool {
!self.off_path_challenges_unconfirmed.is_empty()
}
pub(super) fn clear_off_path_challenges(&mut self) {
self.off_path_challenges_unconfirmed.clear();
}
#[cfg(feature = "qlog")]
pub(super) fn qlog_recovery_metrics(
&mut self,
path_id: PathId,
) -> Option<RecoveryMetricsUpdated> {
let controller_metrics = self.congestion.metrics();
let metrics = RecoveryMetrics {
min_rtt: Some(self.rtt.min),
smoothed_rtt: Some(self.rtt.get()),
latest_rtt: Some(self.rtt.latest),
rtt_variance: Some(self.rtt.var),
pto_count: Some(self.pto_count),
bytes_in_flight: Some(self.in_flight.bytes),
packets_in_flight: Some(self.in_flight.ack_eliciting),
congestion_window: Some(controller_metrics.congestion_window),
ssthresh: controller_metrics.ssthresh,
pacing_rate: controller_metrics.pacing_rate,
};
let event = metrics.to_qlog_event(path_id, &self.recovery_metrics);
self.recovery_metrics = metrics;
event
}
pub(super) fn pacing_delay(&mut self, bytes_to_send: u64, now: Instant) -> Option<Duration> {
let smoothed_rtt = self.rtt.get();
self.pacing.delay(
smoothed_rtt,
bytes_to_send,
self.current_mtu(),
self.congestion.window(),
now,
)
}
#[must_use = "updated observed address must be reported to the application"]
pub(super) fn update_observed_addr_report(
&mut self,
observed: ObservedAddr,
) -> Option<SocketAddr> {
match self.last_observed_addr_report.as_mut() {
Some(prev) => {
if prev.seq_no >= observed.seq_no {
None
} else if prev.ip == observed.ip && prev.port == observed.port {
prev.seq_no = observed.seq_no;
None
} else {
let addr = observed.socket_addr();
self.last_observed_addr_report = Some(observed);
Some(addr)
}
}
None => {
let addr = observed.socket_addr();
self.last_observed_addr_report = Some(observed);
Some(addr)
}
}
}
pub(crate) fn remote_status(&self) -> Option<PathStatus> {
self.status.remote_status.map(|(_seq, status)| status)
}
pub(crate) fn local_status(&self) -> PathStatus {
self.status.local_status
}
pub(super) fn generation(&self) -> u64 {
self.generation
}
}
pub(super) enum OnPathResponseReceived {
OnPath { was_open: bool },
OffPath,
Unknown,
Ignored {
sent_on: FourTuple,
current_path: FourTuple,
},
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub(super) enum OpenStatus {
#[default]
Pending,
Sent,
Informed,
Revalidating,
}
#[cfg(feature = "qlog")]
#[derive(Default, Clone, PartialEq, Debug)]
#[non_exhaustive]
struct RecoveryMetrics {
pub min_rtt: Option<Duration>,
pub smoothed_rtt: Option<Duration>,
pub latest_rtt: Option<Duration>,
pub rtt_variance: Option<Duration>,
pub pto_count: Option<u32>,
pub bytes_in_flight: Option<u64>,
pub packets_in_flight: Option<u64>,
pub congestion_window: Option<u64>,
pub ssthresh: Option<u64>,
pub pacing_rate: Option<u64>,
}
#[cfg(feature = "qlog")]
impl RecoveryMetrics {
fn retain_updated(&self, previous: &Self) -> Self {
macro_rules! keep_if_changed {
($name:ident) => {
if previous.$name == self.$name {
None
} else {
self.$name
}
};
}
Self {
min_rtt: keep_if_changed!(min_rtt),
smoothed_rtt: keep_if_changed!(smoothed_rtt),
latest_rtt: keep_if_changed!(latest_rtt),
rtt_variance: keep_if_changed!(rtt_variance),
pto_count: keep_if_changed!(pto_count),
bytes_in_flight: keep_if_changed!(bytes_in_flight),
packets_in_flight: keep_if_changed!(packets_in_flight),
congestion_window: keep_if_changed!(congestion_window),
ssthresh: keep_if_changed!(ssthresh),
pacing_rate: keep_if_changed!(pacing_rate),
}
}
fn to_qlog_event(&self, path_id: PathId, previous: &Self) -> Option<RecoveryMetricsUpdated> {
let updated = self.retain_updated(previous);
if updated == Self::default() {
return None;
}
Some(RecoveryMetricsUpdated {
min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
pto_count: updated
.pto_count
.map(|count| count.try_into().unwrap_or(u16::MAX)),
bytes_in_flight: updated.bytes_in_flight,
packets_in_flight: updated.packets_in_flight,
congestion_window: updated.congestion_window,
ssthresh: updated.ssthresh,
pacing_rate: updated.pacing_rate,
path_id: Some(path_id.as_u32() as u64),
})
}
}
#[derive(Copy, Clone, Debug)]
pub struct RttEstimator {
latest: Duration,
smoothed: Option<Duration>,
var: Duration,
min: Duration,
}
impl RttEstimator {
pub(super) fn new(initial_rtt: Duration) -> Self {
Self {
latest: initial_rtt,
smoothed: None,
var: initial_rtt / 2,
min: initial_rtt,
}
}
pub(crate) fn reset_initial_rtt(&mut self, initial_rtt: Duration) {
if self.smoothed.is_none() {
self.latest = initial_rtt;
self.var = initial_rtt / 2;
self.min = initial_rtt;
}
}
pub fn get(&self) -> Duration {
self.smoothed.unwrap_or(self.latest)
}
pub fn conservative(&self) -> Duration {
self.get().max(self.latest)
}
pub fn min(&self) -> Duration {
self.min
}
pub(crate) fn pto_base(&self) -> Duration {
self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
}
pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
self.latest = rtt;
self.min = cmp::min(self.min, self.latest);
if let Some(smoothed) = self.smoothed {
let adjusted_rtt = if self.min + ack_delay <= self.latest {
self.latest - ack_delay
} else {
self.latest
};
let var_sample = smoothed.abs_diff(adjusted_rtt);
self.var = (3 * self.var + var_sample) / 4;
self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
} else {
self.smoothed = Some(self.latest);
self.var = self.latest / 2;
self.min = self.latest;
}
}
}
#[derive(Default, Debug)]
pub(crate) struct PathResponses {
pending: Vec<PathResponse>,
}
impl PathResponses {
pub(crate) fn push(&mut self, packet: u64, token: u64, network_path: FourTuple) {
const MAX_PATH_RESPONSES: usize = 16;
let response = PathResponse {
packet,
token,
network_path,
};
let existing = self
.pending
.iter_mut()
.find(|x| x.network_path.remote == network_path.remote);
if let Some(existing) = existing {
if existing.packet <= packet {
*existing = response;
}
return;
}
if self.pending.len() < MAX_PATH_RESPONSES {
self.pending.push(response);
} else {
trace!("ignoring excessive PATH_CHALLENGE");
}
}
pub(crate) fn pop_off_path(&mut self, network_path: FourTuple) -> Option<(u64, FourTuple)> {
let response = *self.pending.last()?;
if response.network_path == network_path {
return None;
}
self.pending.pop();
Some((response.token, response.network_path))
}
pub(crate) fn pop_on_path(&mut self, network_path: FourTuple) -> Option<u64> {
let response = *self.pending.last()?;
if response.network_path != network_path {
return None;
}
self.pending.pop();
Some(response.token)
}
pub(crate) fn is_empty(&self) -> bool {
self.pending.is_empty()
}
}
#[derive(Copy, Clone, Debug)]
struct PathResponse {
packet: u64,
token: u64,
network_path: FourTuple,
}
#[derive(Debug)]
pub(super) struct InFlight {
pub(super) bytes: u64,
pub(super) ack_eliciting: u64,
}
impl InFlight {
fn new() -> Self {
Self {
bytes: 0,
ack_eliciting: 0,
}
}
fn insert(&mut self, packet: &SentPacket) {
self.bytes += u64::from(packet.size);
self.ack_eliciting += u64::from(packet.ack_eliciting);
}
fn remove(&mut self, packet: &SentPacket) {
self.bytes -= u64::from(packet.size);
self.ack_eliciting -= u64::from(packet.ack_eliciting);
}
}
#[derive(Debug, Clone, Default)]
pub(super) struct PathStatusState {
local_status: PathStatus,
local_seq: VarInt,
remote_status: Option<(VarInt, PathStatus)>,
}
impl PathStatusState {
pub(super) fn remote_update(&mut self, status: PathStatus, seq: VarInt) {
if self.remote_status.is_some_and(|(curr, _)| curr >= seq) {
return trace!(%seq, "ignoring path status update");
}
let prev = self.remote_status.replace((seq, status)).map(|(_, s)| s);
if prev != Some(status) {
debug!(?status, ?seq, "remote changed path status");
}
}
pub(super) fn local_update(&mut self, status: PathStatus) -> Option<PathStatus> {
if self.local_status == status {
return None;
}
self.local_seq = self.local_seq.saturating_add(1u8);
Some(std::mem::replace(&mut self.local_status, status))
}
pub(crate) fn seq(&self) -> VarInt {
self.local_seq
}
}
#[cfg_attr(test, derive(test_strategy::Arbitrary))]
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
pub enum PathStatus {
#[default]
Available,
Backup,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PathEvent {
Opened {
id: PathId,
},
Abandoned {
id: PathId,
reason: PathAbandonReason,
},
Discarded {
id: PathId,
path_stats: Box<PathStats>,
},
RemoteStatus {
id: PathId,
status: PathStatus,
},
ObservedAddr {
id: PathId,
addr: SocketAddr,
},
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum PathAbandonReason {
ApplicationClosed {
error_code: VarInt,
},
ValidationFailed,
TimedOut,
UnusableAfterNetworkChange,
NatTraversalRoundEnded,
RemoteAbandoned {
error_code: VarInt,
},
}
impl PathAbandonReason {
pub(crate) fn is_remote(&self) -> bool {
matches!(self, Self::RemoteAbandoned { .. })
}
pub(crate) fn error_code(&self) -> TransportErrorCode {
match self {
Self::ApplicationClosed { error_code } => (*error_code).into(),
Self::NatTraversalRoundEnded => TransportErrorCode::APPLICATION_ABANDON_PATH,
Self::ValidationFailed | Self::TimedOut | Self::UnusableAfterNetworkChange => {
TransportErrorCode::PATH_UNSTABLE_OR_POOR
}
Self::RemoteAbandoned { error_code } => (*error_code).into(),
}
}
}
#[derive(Debug, Error, Clone, PartialEq, Eq)]
pub enum SetPathStatusError {
#[error("closed path")]
ClosedPath,
#[error("multipath not negotiated")]
MultipathNotNegotiated,
}
#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
#[error("closed path")]
pub struct ClosedPath {
pub(super) _private: (),
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_path_id_saturating_add() {
let large: PathId = u16::MAX.into();
let next = u32::from(u16::MAX) + 1;
assert_eq!(large.saturating_add(1u8), PathId::from(next));
assert_eq!(PathId::MAX.saturating_add(1u8), PathId::MAX)
}
}