ant_quic/connection/
paths.rs

1use std::{cmp, net::SocketAddr};
2
3use tracing::trace;
4
5use super::{
6    mtud::MtuDiscovery,
7    pacing::Pacer,
8    spaces::{PacketSpace, SentPacket},
9};
10use crate::{Duration, Instant, TIMER_GRANULARITY, TransportConfig, congestion, packet::SpaceId};
11
12#[cfg(feature = "__qlog")]
13use qlog::events::quic::MetricsUpdated;
14
15/// Description of a particular network path
16pub(super) struct PathData {
17    pub(super) remote: SocketAddr,
18    pub(super) rtt: RttEstimator,
19    /// Whether we're enabling ECN on outgoing packets
20    pub(super) sending_ecn: bool,
21    /// Congestion controller state
22    pub(super) congestion: Box<dyn congestion::Controller>,
23    /// Pacing state
24    pub(super) pacing: Pacer,
25    pub(super) challenge: Option<u64>,
26    pub(super) challenge_pending: bool,
27    /// Whether we're certain the peer can both send and receive on this address
28    ///
29    /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
30    /// migration. Always true for clients.
31    pub(super) validated: bool,
32    /// Total size of all UDP datagrams sent on this path
33    pub(super) total_sent: u64,
34    /// Total size of all UDP datagrams received on this path
35    pub(super) total_recvd: u64,
36    /// The state of the MTU discovery process
37    pub(super) mtud: MtuDiscovery,
38    /// Packet number of the first packet sent after an RTT sample was collected on this path
39    ///
40    /// Used in persistent congestion determination.
41    pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
42    pub(super) in_flight: InFlight,
43    /// Number of the first packet sent on this path
44    ///
45    /// Used to determine whether a packet was sent on an earlier path. Insufficient to determine if
46    /// a packet was sent on a later path.
47    first_packet: Option<u64>,
48
49    /// Snapshot of the qlog recovery metrics
50    #[cfg(feature = "__qlog")]
51    congestion_metrics: CongestionMetrics,
52}
53
54impl PathData {
55    pub(super) fn new(
56        remote: SocketAddr,
57        allow_mtud: bool,
58        peer_max_udp_payload_size: Option<u16>,
59        now: Instant,
60        config: &TransportConfig,
61    ) -> Self {
62        let congestion = config
63            .congestion_controller_factory
64            .new_controller(config.get_initial_mtu() as u64, 16 * 1024 * 1024, now);
65        Self {
66            remote,
67            rtt: RttEstimator::new(config.initial_rtt),
68            sending_ecn: true,
69            pacing: Pacer::new(
70                config.initial_rtt,
71                congestion.initial_window(),
72                config.get_initial_mtu(),
73                now,
74            ),
75            congestion,
76            challenge: None,
77            challenge_pending: false,
78            validated: false,
79            total_sent: 0,
80            total_recvd: 0,
81            mtud: config
82                .mtu_discovery_config
83                .as_ref()
84                .filter(|_| allow_mtud)
85                .map_or(
86                    MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
87                    |mtud_config| {
88                        MtuDiscovery::new(
89                            config.get_initial_mtu(),
90                            config.min_mtu,
91                            peer_max_udp_payload_size,
92                            mtud_config.clone(),
93                        )
94                    },
95                ),
96            first_packet_after_rtt_sample: None,
97            in_flight: InFlight::new(),
98            first_packet: None,
99            #[cfg(feature = "__qlog")]
100            congestion_metrics: CongestionMetrics::default(),
101        }
102    }
103
104    pub(super) fn from_previous(remote: SocketAddr, prev: &Self, now: Instant) -> Self {
105        let congestion = prev.congestion.clone_box();
106        let smoothed_rtt = prev.rtt.get();
107        Self {
108            remote,
109            rtt: prev.rtt,
110            pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
111            sending_ecn: true,
112            congestion,
113            challenge: None,
114            challenge_pending: false,
115            validated: false,
116            total_sent: 0,
117            total_recvd: 0,
118            mtud: prev.mtud.clone(),
119            first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
120            in_flight: InFlight::new(),
121            first_packet: None,
122            #[cfg(feature = "__qlog")]
123            congestion_metrics: prev.congestion_metrics.clone(),
124        }
125    }
126
127    /// Resets RTT, congestion control and MTU states.
128    ///
129    /// This is useful when it is known the underlying path has changed.
130    pub(super) fn reset(&mut self, now: Instant, config: &TransportConfig) {
131        self.rtt = RttEstimator::new(config.initial_rtt);
132        self.congestion = config
133            .congestion_controller_factory
134            .new_controller(config.get_initial_mtu() as u64, 16 * 1024 * 1024, now);
135        self.mtud.reset(config.get_initial_mtu(), config.min_mtu);
136    }
137
138    /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
139    /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
140    pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
141        !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
142    }
143
144    /// Returns the path's current MTU
145    pub(super) fn current_mtu(&self) -> u16 {
146        self.mtud.current_mtu()
147    }
148
149    /// Account for transmission of `packet` with number `pn` in `space`
150    pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketSpace) {
151        self.in_flight.insert(&packet);
152        if self.first_packet.is_none() {
153            self.first_packet = Some(pn);
154        }
155        self.in_flight.bytes -= space.sent(pn, packet);
156    }
157
158    /// Remove `packet` with number `pn` from this path's congestion control counters, or return
159    /// `false` if `pn` was sent before this path was established.
160    pub(super) fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) -> bool {
161        if self.first_packet.map_or(true, |first| first > pn) {
162            return false;
163        }
164        self.in_flight.remove(packet);
165        true
166    }
167
168    #[cfg(feature = "__qlog")]
169    pub(super) fn qlog_congestion_metrics(&mut self, pto_count: u32) -> Option<MetricsUpdated> {
170        let controller_metrics = self.congestion.metrics();
171
172        let metrics = CongestionMetrics {
173            min_rtt: Some(self.rtt.min),
174            smoothed_rtt: Some(self.rtt.get()),
175            latest_rtt: Some(self.rtt.latest),
176            rtt_variance: Some(self.rtt.var),
177            pto_count: Some(pto_count),
178            bytes_in_flight: Some(self.in_flight.bytes),
179            packets_in_flight: Some(self.in_flight.ack_eliciting),
180
181            congestion_window: Some(controller_metrics.congestion_window),
182            ssthresh: controller_metrics.ssthresh,
183            pacing_rate: controller_metrics.pacing_rate,
184        };
185
186        let event = metrics.to_qlog_event(&self.congestion_metrics);
187        self.congestion_metrics = metrics;
188        event
189    }
190}
191
192/// Congestion metrics as described in [`recovery_metrics_updated`].
193///
194/// [`recovery_metrics_updated`]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-quic-events.html#name-recovery_metrics_updated
195#[cfg(feature = "__qlog")]
196#[derive(Default, Clone, PartialEq)]
197#[non_exhaustive]
198struct CongestionMetrics {
199    pub min_rtt: Option<Duration>,
200    pub smoothed_rtt: Option<Duration>,
201    pub latest_rtt: Option<Duration>,
202    pub rtt_variance: Option<Duration>,
203    pub pto_count: Option<u32>,
204    pub bytes_in_flight: Option<u64>,
205    pub packets_in_flight: Option<u64>,
206    pub congestion_window: Option<u64>,
207    pub ssthresh: Option<u64>,
208    pub pacing_rate: Option<u64>,
209}
210
211#[cfg(feature = "__qlog")]
212impl CongestionMetrics {
213    /// Retain only values that have been updated since the last snapshot.
214    fn retain_updated(&self, previous: &Self) -> Self {
215        macro_rules! keep_if_changed {
216            ($name:ident) => {
217                if previous.$name == self.$name {
218                    None
219                } else {
220                    self.$name
221                }
222            };
223        }
224
225        Self {
226            min_rtt: keep_if_changed!(min_rtt),
227            smoothed_rtt: keep_if_changed!(smoothed_rtt),
228            latest_rtt: keep_if_changed!(latest_rtt),
229            rtt_variance: keep_if_changed!(rtt_variance),
230            pto_count: keep_if_changed!(pto_count),
231            bytes_in_flight: keep_if_changed!(bytes_in_flight),
232            packets_in_flight: keep_if_changed!(packets_in_flight),
233            congestion_window: keep_if_changed!(congestion_window),
234            ssthresh: keep_if_changed!(ssthresh),
235            pacing_rate: keep_if_changed!(pacing_rate),
236        }
237    }
238
239    /// Emit a `MetricsUpdated` event containing only updated values
240    fn to_qlog_event(&self, previous: &Self) -> Option<MetricsUpdated> {
241        let updated = self.retain_updated(previous);
242
243        if updated == Self::default() {
244            return None;
245        }
246
247        Some(MetricsUpdated {
248            min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
249            smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
250            latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
251            rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
252            pto_count: updated
253                .pto_count
254                .map(|count| count.try_into().unwrap_or(u16::MAX)),
255            bytes_in_flight: updated.bytes_in_flight,
256            packets_in_flight: updated.packets_in_flight,
257            congestion_window: updated.congestion_window,
258            ssthresh: updated.ssthresh,
259            pacing_rate: updated.pacing_rate,
260        })
261    }
262}
263
264/// RTT estimation for a particular network path
265#[derive(Copy, Clone)]
266pub struct RttEstimator {
267    /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
268    latest: Duration,
269    /// The smoothed RTT of the connection, computed as described in RFC6298
270    smoothed: Option<Duration>,
271    /// The RTT variance, computed as described in RFC6298
272    var: Duration,
273    /// The minimum RTT seen in the connection, ignoring ack delay.
274    min: Duration,
275}
276
277impl RttEstimator {
278    fn new(initial_rtt: Duration) -> Self {
279        Self {
280            latest: initial_rtt,
281            smoothed: None,
282            var: initial_rtt / 2,
283            min: initial_rtt,
284        }
285    }
286
287    /// The current best RTT estimation.
288    pub fn get(&self) -> Duration {
289        self.smoothed.unwrap_or(self.latest)
290    }
291
292    /// Conservative estimate of RTT
293    ///
294    /// Takes the maximum of smoothed and latest RTT, as recommended
295    /// in 6.1.2 of the recovery spec (draft 29).
296    pub fn conservative(&self) -> Duration {
297        self.get().max(self.latest)
298    }
299
300    /// Minimum RTT registered so far for this estimator.
301    pub fn min(&self) -> Duration {
302        self.min
303    }
304
305    // PTO computed as described in RFC9002#6.2.1
306    pub(crate) fn pto_base(&self) -> Duration {
307        self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
308    }
309
310    pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
311        self.latest = rtt;
312        // min_rtt ignores ack delay.
313        self.min = cmp::min(self.min, self.latest);
314        // Based on RFC6298.
315        if let Some(smoothed) = self.smoothed {
316            let adjusted_rtt = if self.min + ack_delay <= self.latest {
317                self.latest - ack_delay
318            } else {
319                self.latest
320            };
321            let var_sample = if smoothed > adjusted_rtt {
322                smoothed - adjusted_rtt
323            } else {
324                adjusted_rtt - smoothed
325            };
326            self.var = (3 * self.var + var_sample) / 4;
327            self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
328        } else {
329            self.smoothed = Some(self.latest);
330            self.var = self.latest / 2;
331            self.min = self.latest;
332        }
333    }
334}
335
336#[derive(Default)]
337pub(crate) struct PathResponses {
338    pending: Vec<PathResponse>,
339}
340
341impl PathResponses {
342    pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
343        /// Arbitrary permissive limit to prevent abuse
344        const MAX_PATH_RESPONSES: usize = 16;
345        let response = PathResponse {
346            packet,
347            token,
348            remote,
349        };
350        let existing = self.pending.iter_mut().find(|x| x.remote == remote);
351        if let Some(existing) = existing {
352            // Update a queued response
353            if existing.packet <= packet {
354                *existing = response;
355            }
356            return;
357        }
358        if self.pending.len() < MAX_PATH_RESPONSES {
359            self.pending.push(response);
360        } else {
361            // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
362            // older challenges.
363            trace!("ignoring excessive PATH_CHALLENGE");
364        }
365    }
366
367    pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
368        let response = *self.pending.last()?;
369        if response.remote == remote {
370            // We don't bother searching further because we expect that the on-path response will
371            // get drained in the immediate future by a call to `pop_on_path`
372            return None;
373        }
374        self.pending.pop();
375        Some((response.token, response.remote))
376    }
377
378    pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
379        let response = *self.pending.last()?;
380        if response.remote != remote {
381            // We don't bother searching further because we expect that the off-path response will
382            // get drained in the immediate future by a call to `pop_off_path`
383            return None;
384        }
385        self.pending.pop();
386        Some(response.token)
387    }
388
389    pub(crate) fn is_empty(&self) -> bool {
390        self.pending.is_empty()
391    }
392}
393
394#[derive(Copy, Clone)]
395struct PathResponse {
396    /// The packet number the corresponding PATH_CHALLENGE was received in
397    packet: u64,
398    token: u64,
399    /// The address the corresponding PATH_CHALLENGE was received from
400    remote: SocketAddr,
401}
402
403/// Tracks PATH_CHALLENGE tokens for NAT traversal candidate validation
404#[derive(Default)]
405pub(crate) struct NatTraversalChallenges {
406    pending: Vec<NatTraversalChallenge>,
407}
408
409impl NatTraversalChallenges {
410    pub(crate) fn push(&mut self, remote: SocketAddr, token: u64) {
411        /// Arbitrary permissive limit to prevent abuse
412        const MAX_NAT_CHALLENGES: usize = 10;
413        
414        // Check if we already have a challenge for this address
415        if let Some(existing) = self.pending.iter_mut().find(|x| x.remote == remote) {
416            existing.token = token;
417            return;
418        }
419        
420        if self.pending.len() < MAX_NAT_CHALLENGES {
421            self.pending.push(NatTraversalChallenge { remote, token });
422        } else {
423            // Replace the oldest challenge
424            self.pending[0] = NatTraversalChallenge { remote, token };
425        }
426    }
427    
428    
429    pub(crate) fn is_empty(&self) -> bool {
430        self.pending.is_empty()
431    }
432}
433
434#[derive(Copy, Clone)]
435struct NatTraversalChallenge {
436    /// The address to send the PATH_CHALLENGE to
437    remote: SocketAddr,
438    /// The challenge token
439    token: u64,
440}
441
442/// Summary statistics of packets that have been sent on a particular path, but which have not yet
443/// been acked or deemed lost
444pub(super) struct InFlight {
445    /// Sum of the sizes of all sent packets considered "in flight" by congestion control
446    ///
447    /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
448    /// count towards this to ensure congestion control does not impede congestion feedback.
449    pub(super) bytes: u64,
450    /// Number of packets in flight containing frames other than ACK and PADDING
451    ///
452    /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
453    /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
454    /// also be nonzero.
455    pub(super) ack_eliciting: u64,
456}
457
458impl InFlight {
459    fn new() -> Self {
460        Self {
461            bytes: 0,
462            ack_eliciting: 0,
463        }
464    }
465
466    fn insert(&mut self, packet: &SentPacket) {
467        self.bytes += u64::from(packet.size);
468        self.ack_eliciting += u64::from(packet.ack_eliciting);
469    }
470
471    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
472    fn remove(&mut self, packet: &SentPacket) {
473        self.bytes -= u64::from(packet.size);
474        self.ack_eliciting -= u64::from(packet.ack_eliciting);
475    }
476}