1use std::{cmp, net::SocketAddr};
2
3use tracing::trace;
4
5use super::{
6 mtud::MtuDiscovery,
7 pacing::Pacer,
8 spaces::{PacketSpace, SentPacket},
9};
10use crate::{Duration, Instant, TIMER_GRANULARITY, TransportConfig, congestion, packet::SpaceId};
11
12#[cfg(feature = "__qlog")]
13use qlog::events::quic::MetricsUpdated;
14
15pub(super) struct PathData {
17 pub(super) remote: SocketAddr,
18 pub(super) rtt: RttEstimator,
19 pub(super) sending_ecn: bool,
21 pub(super) congestion: Box<dyn congestion::Controller>,
23 pub(super) pacing: Pacer,
25 pub(super) challenge: Option<u64>,
26 pub(super) challenge_pending: bool,
27 pub(super) validated: bool,
32 pub(super) total_sent: u64,
34 pub(super) total_recvd: u64,
36 pub(super) mtud: MtuDiscovery,
38 pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
42 pub(super) in_flight: InFlight,
43 first_packet: Option<u64>,
48
49 #[cfg(feature = "__qlog")]
51 congestion_metrics: CongestionMetrics,
52}
53
54impl PathData {
55 pub(super) fn new(
56 remote: SocketAddr,
57 allow_mtud: bool,
58 peer_max_udp_payload_size: Option<u16>,
59 now: Instant,
60 config: &TransportConfig,
61 ) -> Self {
62 let congestion = config
63 .congestion_controller_factory
64 .new_controller(config.get_initial_mtu() as u64, 16 * 1024 * 1024, now);
65 Self {
66 remote,
67 rtt: RttEstimator::new(config.initial_rtt),
68 sending_ecn: true,
69 pacing: Pacer::new(
70 config.initial_rtt,
71 congestion.initial_window(),
72 config.get_initial_mtu(),
73 now,
74 ),
75 congestion,
76 challenge: None,
77 challenge_pending: false,
78 validated: false,
79 total_sent: 0,
80 total_recvd: 0,
81 mtud: config
82 .mtu_discovery_config
83 .as_ref()
84 .filter(|_| allow_mtud)
85 .map_or(
86 MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
87 |mtud_config| {
88 MtuDiscovery::new(
89 config.get_initial_mtu(),
90 config.min_mtu,
91 peer_max_udp_payload_size,
92 mtud_config.clone(),
93 )
94 },
95 ),
96 first_packet_after_rtt_sample: None,
97 in_flight: InFlight::new(),
98 first_packet: None,
99 #[cfg(feature = "__qlog")]
100 congestion_metrics: CongestionMetrics::default(),
101 }
102 }
103
104 pub(super) fn from_previous(remote: SocketAddr, prev: &Self, now: Instant) -> Self {
105 let congestion = prev.congestion.clone_box();
106 let smoothed_rtt = prev.rtt.get();
107 Self {
108 remote,
109 rtt: prev.rtt,
110 pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
111 sending_ecn: true,
112 congestion,
113 challenge: None,
114 challenge_pending: false,
115 validated: false,
116 total_sent: 0,
117 total_recvd: 0,
118 mtud: prev.mtud.clone(),
119 first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
120 in_flight: InFlight::new(),
121 first_packet: None,
122 #[cfg(feature = "__qlog")]
123 congestion_metrics: prev.congestion_metrics.clone(),
124 }
125 }
126
127 pub(super) fn reset(&mut self, now: Instant, config: &TransportConfig) {
131 self.rtt = RttEstimator::new(config.initial_rtt);
132 self.congestion = config
133 .congestion_controller_factory
134 .new_controller(config.get_initial_mtu() as u64, 16 * 1024 * 1024, now);
135 self.mtud.reset(config.get_initial_mtu(), config.min_mtu);
136 }
137
138 pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
141 !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
142 }
143
144 pub(super) fn current_mtu(&self) -> u16 {
146 self.mtud.current_mtu()
147 }
148
149 pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketSpace) {
151 self.in_flight.insert(&packet);
152 if self.first_packet.is_none() {
153 self.first_packet = Some(pn);
154 }
155 self.in_flight.bytes -= space.sent(pn, packet);
156 }
157
158 pub(super) fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) -> bool {
161 if self.first_packet.map_or(true, |first| first > pn) {
162 return false;
163 }
164 self.in_flight.remove(packet);
165 true
166 }
167
168 #[cfg(feature = "__qlog")]
169 pub(super) fn qlog_congestion_metrics(&mut self, pto_count: u32) -> Option<MetricsUpdated> {
170 let controller_metrics = self.congestion.metrics();
171
172 let metrics = CongestionMetrics {
173 min_rtt: Some(self.rtt.min),
174 smoothed_rtt: Some(self.rtt.get()),
175 latest_rtt: Some(self.rtt.latest),
176 rtt_variance: Some(self.rtt.var),
177 pto_count: Some(pto_count),
178 bytes_in_flight: Some(self.in_flight.bytes),
179 packets_in_flight: Some(self.in_flight.ack_eliciting),
180
181 congestion_window: Some(controller_metrics.congestion_window),
182 ssthresh: controller_metrics.ssthresh,
183 pacing_rate: controller_metrics.pacing_rate,
184 };
185
186 let event = metrics.to_qlog_event(&self.congestion_metrics);
187 self.congestion_metrics = metrics;
188 event
189 }
190}
191
192#[cfg(feature = "__qlog")]
196#[derive(Default, Clone, PartialEq)]
197#[non_exhaustive]
198struct CongestionMetrics {
199 pub min_rtt: Option<Duration>,
200 pub smoothed_rtt: Option<Duration>,
201 pub latest_rtt: Option<Duration>,
202 pub rtt_variance: Option<Duration>,
203 pub pto_count: Option<u32>,
204 pub bytes_in_flight: Option<u64>,
205 pub packets_in_flight: Option<u64>,
206 pub congestion_window: Option<u64>,
207 pub ssthresh: Option<u64>,
208 pub pacing_rate: Option<u64>,
209}
210
211#[cfg(feature = "__qlog")]
212impl CongestionMetrics {
213 fn retain_updated(&self, previous: &Self) -> Self {
215 macro_rules! keep_if_changed {
216 ($name:ident) => {
217 if previous.$name == self.$name {
218 None
219 } else {
220 self.$name
221 }
222 };
223 }
224
225 Self {
226 min_rtt: keep_if_changed!(min_rtt),
227 smoothed_rtt: keep_if_changed!(smoothed_rtt),
228 latest_rtt: keep_if_changed!(latest_rtt),
229 rtt_variance: keep_if_changed!(rtt_variance),
230 pto_count: keep_if_changed!(pto_count),
231 bytes_in_flight: keep_if_changed!(bytes_in_flight),
232 packets_in_flight: keep_if_changed!(packets_in_flight),
233 congestion_window: keep_if_changed!(congestion_window),
234 ssthresh: keep_if_changed!(ssthresh),
235 pacing_rate: keep_if_changed!(pacing_rate),
236 }
237 }
238
239 fn to_qlog_event(&self, previous: &Self) -> Option<MetricsUpdated> {
241 let updated = self.retain_updated(previous);
242
243 if updated == Self::default() {
244 return None;
245 }
246
247 Some(MetricsUpdated {
248 min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
249 smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
250 latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
251 rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
252 pto_count: updated
253 .pto_count
254 .map(|count| count.try_into().unwrap_or(u16::MAX)),
255 bytes_in_flight: updated.bytes_in_flight,
256 packets_in_flight: updated.packets_in_flight,
257 congestion_window: updated.congestion_window,
258 ssthresh: updated.ssthresh,
259 pacing_rate: updated.pacing_rate,
260 })
261 }
262}
263
264#[derive(Copy, Clone)]
266pub struct RttEstimator {
267 latest: Duration,
269 smoothed: Option<Duration>,
271 var: Duration,
273 min: Duration,
275}
276
277impl RttEstimator {
278 fn new(initial_rtt: Duration) -> Self {
279 Self {
280 latest: initial_rtt,
281 smoothed: None,
282 var: initial_rtt / 2,
283 min: initial_rtt,
284 }
285 }
286
287 pub fn get(&self) -> Duration {
289 self.smoothed.unwrap_or(self.latest)
290 }
291
292 pub fn conservative(&self) -> Duration {
297 self.get().max(self.latest)
298 }
299
300 pub fn min(&self) -> Duration {
302 self.min
303 }
304
305 pub(crate) fn pto_base(&self) -> Duration {
307 self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
308 }
309
310 pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
311 self.latest = rtt;
312 self.min = cmp::min(self.min, self.latest);
314 if let Some(smoothed) = self.smoothed {
316 let adjusted_rtt = if self.min + ack_delay <= self.latest {
317 self.latest - ack_delay
318 } else {
319 self.latest
320 };
321 let var_sample = if smoothed > adjusted_rtt {
322 smoothed - adjusted_rtt
323 } else {
324 adjusted_rtt - smoothed
325 };
326 self.var = (3 * self.var + var_sample) / 4;
327 self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
328 } else {
329 self.smoothed = Some(self.latest);
330 self.var = self.latest / 2;
331 self.min = self.latest;
332 }
333 }
334}
335
336#[derive(Default)]
337pub(crate) struct PathResponses {
338 pending: Vec<PathResponse>,
339}
340
341impl PathResponses {
342 pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
343 const MAX_PATH_RESPONSES: usize = 16;
345 let response = PathResponse {
346 packet,
347 token,
348 remote,
349 };
350 let existing = self.pending.iter_mut().find(|x| x.remote == remote);
351 if let Some(existing) = existing {
352 if existing.packet <= packet {
354 *existing = response;
355 }
356 return;
357 }
358 if self.pending.len() < MAX_PATH_RESPONSES {
359 self.pending.push(response);
360 } else {
361 trace!("ignoring excessive PATH_CHALLENGE");
364 }
365 }
366
367 pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
368 let response = *self.pending.last()?;
369 if response.remote == remote {
370 return None;
373 }
374 self.pending.pop();
375 Some((response.token, response.remote))
376 }
377
378 pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
379 let response = *self.pending.last()?;
380 if response.remote != remote {
381 return None;
384 }
385 self.pending.pop();
386 Some(response.token)
387 }
388
389 pub(crate) fn is_empty(&self) -> bool {
390 self.pending.is_empty()
391 }
392}
393
394#[derive(Copy, Clone)]
395struct PathResponse {
396 packet: u64,
398 token: u64,
399 remote: SocketAddr,
401}
402
403#[derive(Default)]
405pub(crate) struct NatTraversalChallenges {
406 pending: Vec<NatTraversalChallenge>,
407}
408
409impl NatTraversalChallenges {
410 pub(crate) fn push(&mut self, remote: SocketAddr, token: u64) {
411 const MAX_NAT_CHALLENGES: usize = 10;
413
414 if let Some(existing) = self.pending.iter_mut().find(|x| x.remote == remote) {
416 existing.token = token;
417 return;
418 }
419
420 if self.pending.len() < MAX_NAT_CHALLENGES {
421 self.pending.push(NatTraversalChallenge { remote, token });
422 } else {
423 self.pending[0] = NatTraversalChallenge { remote, token };
425 }
426 }
427
428
429 pub(crate) fn is_empty(&self) -> bool {
430 self.pending.is_empty()
431 }
432}
433
434#[derive(Copy, Clone)]
435struct NatTraversalChallenge {
436 remote: SocketAddr,
438 token: u64,
440}
441
442pub(super) struct InFlight {
445 pub(super) bytes: u64,
450 pub(super) ack_eliciting: u64,
456}
457
458impl InFlight {
459 fn new() -> Self {
460 Self {
461 bytes: 0,
462 ack_eliciting: 0,
463 }
464 }
465
466 fn insert(&mut self, packet: &SentPacket) {
467 self.bytes += u64::from(packet.size);
468 self.ack_eliciting += u64::from(packet.ack_eliciting);
469 }
470
471 fn remove(&mut self, packet: &SentPacket) {
473 self.bytes -= u64::from(packet.size);
474 self.ack_eliciting -= u64::from(packet.ack_eliciting);
475 }
476}