1use std::{cmp, net::SocketAddr};
2
3use tracing::trace;
4
5use super::{
6 mtud::MtuDiscovery,
7 pacing::Pacer,
8 spaces::{PacketSpace, SentPacket},
9};
10use crate::{Duration, Instant, TIMER_GRANULARITY, TransportConfig, congestion, packet::SpaceId};
11
12#[cfg(feature = "__qlog")]
13use qlog::events::quic::MetricsUpdated;
14
15pub(super) struct PathData {
17 pub(super) remote: SocketAddr,
18 pub(super) rtt: RttEstimator,
19 pub(super) sending_ecn: bool,
21 pub(super) congestion: Box<dyn congestion::Controller>,
23 pub(super) pacing: Pacer,
25 pub(super) challenge: Option<u64>,
26 pub(super) challenge_pending: bool,
27 pub(super) validated: bool,
32 pub(super) total_sent: u64,
34 pub(super) total_recvd: u64,
36 pub(super) mtud: MtuDiscovery,
38 pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
42 pub(super) in_flight: InFlight,
43 first_packet: Option<u64>,
48
49 #[cfg(feature = "__qlog")]
51 congestion_metrics: CongestionMetrics,
52}
53
54impl PathData {
55 pub(super) fn new(
56 remote: SocketAddr,
57 allow_mtud: bool,
58 peer_max_udp_payload_size: Option<u16>,
59 now: Instant,
60 config: &TransportConfig,
61 ) -> Self {
62 let congestion = config
63 .congestion_controller_factory
64 .clone()
65 .build(now, config.get_initial_mtu());
66 Self {
67 remote,
68 rtt: RttEstimator::new(config.initial_rtt),
69 sending_ecn: true,
70 pacing: Pacer::new(
71 config.initial_rtt,
72 congestion.initial_window(),
73 config.get_initial_mtu(),
74 now,
75 ),
76 congestion,
77 challenge: None,
78 challenge_pending: false,
79 validated: false,
80 total_sent: 0,
81 total_recvd: 0,
82 mtud: config
83 .mtu_discovery_config
84 .as_ref()
85 .filter(|_| allow_mtud)
86 .map_or(
87 MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
88 |mtud_config| {
89 MtuDiscovery::new(
90 config.get_initial_mtu(),
91 config.min_mtu,
92 peer_max_udp_payload_size,
93 mtud_config.clone(),
94 )
95 },
96 ),
97 first_packet_after_rtt_sample: None,
98 in_flight: InFlight::new(),
99 first_packet: None,
100 #[cfg(feature = "__qlog")]
101 congestion_metrics: CongestionMetrics::default(),
102 }
103 }
104
105 pub(super) fn from_previous(remote: SocketAddr, prev: &Self, now: Instant) -> Self {
106 let congestion = prev.congestion.clone_box();
107 let smoothed_rtt = prev.rtt.get();
108 Self {
109 remote,
110 rtt: prev.rtt,
111 pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
112 sending_ecn: true,
113 congestion,
114 challenge: None,
115 challenge_pending: false,
116 validated: false,
117 total_sent: 0,
118 total_recvd: 0,
119 mtud: prev.mtud.clone(),
120 first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
121 in_flight: InFlight::new(),
122 first_packet: None,
123 #[cfg(feature = "__qlog")]
124 congestion_metrics: prev.congestion_metrics.clone(),
125 }
126 }
127
128 pub(super) fn reset(&mut self, now: Instant, config: &TransportConfig) {
132 self.rtt = RttEstimator::new(config.initial_rtt);
133 self.congestion = config
134 .congestion_controller_factory
135 .clone()
136 .build(now, config.get_initial_mtu());
137 self.mtud.reset(config.get_initial_mtu(), config.min_mtu);
138 }
139
140 pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
143 !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
144 }
145
146 pub(super) fn current_mtu(&self) -> u16 {
148 self.mtud.current_mtu()
149 }
150
151 pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketSpace) {
153 self.in_flight.insert(&packet);
154 if self.first_packet.is_none() {
155 self.first_packet = Some(pn);
156 }
157 self.in_flight.bytes -= space.sent(pn, packet);
158 }
159
160 pub(super) fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) -> bool {
163 if self.first_packet.map_or(true, |first| first > pn) {
164 return false;
165 }
166 self.in_flight.remove(packet);
167 true
168 }
169
170 #[cfg(feature = "__qlog")]
171 pub(super) fn qlog_congestion_metrics(&mut self, pto_count: u32) -> Option<MetricsUpdated> {
172 let controller_metrics = self.congestion.metrics();
173
174 let metrics = CongestionMetrics {
175 min_rtt: Some(self.rtt.min),
176 smoothed_rtt: Some(self.rtt.get()),
177 latest_rtt: Some(self.rtt.latest),
178 rtt_variance: Some(self.rtt.var),
179 pto_count: Some(pto_count),
180 bytes_in_flight: Some(self.in_flight.bytes),
181 packets_in_flight: Some(self.in_flight.ack_eliciting),
182
183 congestion_window: Some(controller_metrics.congestion_window),
184 ssthresh: controller_metrics.ssthresh,
185 pacing_rate: controller_metrics.pacing_rate,
186 };
187
188 let event = metrics.to_qlog_event(&self.congestion_metrics);
189 self.congestion_metrics = metrics;
190 event
191 }
192}
193
194#[cfg(feature = "__qlog")]
198#[derive(Default, Clone, PartialEq)]
199#[non_exhaustive]
200struct CongestionMetrics {
201 pub min_rtt: Option<Duration>,
202 pub smoothed_rtt: Option<Duration>,
203 pub latest_rtt: Option<Duration>,
204 pub rtt_variance: Option<Duration>,
205 pub pto_count: Option<u32>,
206 pub bytes_in_flight: Option<u64>,
207 pub packets_in_flight: Option<u64>,
208 pub congestion_window: Option<u64>,
209 pub ssthresh: Option<u64>,
210 pub pacing_rate: Option<u64>,
211}
212
213#[cfg(feature = "__qlog")]
214impl CongestionMetrics {
215 fn retain_updated(&self, previous: &Self) -> Self {
217 macro_rules! keep_if_changed {
218 ($name:ident) => {
219 if previous.$name == self.$name {
220 None
221 } else {
222 self.$name
223 }
224 };
225 }
226
227 Self {
228 min_rtt: keep_if_changed!(min_rtt),
229 smoothed_rtt: keep_if_changed!(smoothed_rtt),
230 latest_rtt: keep_if_changed!(latest_rtt),
231 rtt_variance: keep_if_changed!(rtt_variance),
232 pto_count: keep_if_changed!(pto_count),
233 bytes_in_flight: keep_if_changed!(bytes_in_flight),
234 packets_in_flight: keep_if_changed!(packets_in_flight),
235 congestion_window: keep_if_changed!(congestion_window),
236 ssthresh: keep_if_changed!(ssthresh),
237 pacing_rate: keep_if_changed!(pacing_rate),
238 }
239 }
240
241 fn to_qlog_event(&self, previous: &Self) -> Option<MetricsUpdated> {
243 let updated = self.retain_updated(previous);
244
245 if updated == Self::default() {
246 return None;
247 }
248
249 Some(MetricsUpdated {
250 min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
251 smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
252 latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
253 rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
254 pto_count: updated
255 .pto_count
256 .map(|count| count.try_into().unwrap_or(u16::MAX)),
257 bytes_in_flight: updated.bytes_in_flight,
258 packets_in_flight: updated.packets_in_flight,
259 congestion_window: updated.congestion_window,
260 ssthresh: updated.ssthresh,
261 pacing_rate: updated.pacing_rate,
262 })
263 }
264}
265
266#[derive(Copy, Clone)]
268pub struct RttEstimator {
269 latest: Duration,
271 smoothed: Option<Duration>,
273 var: Duration,
275 min: Duration,
277}
278
279impl RttEstimator {
280 fn new(initial_rtt: Duration) -> Self {
281 Self {
282 latest: initial_rtt,
283 smoothed: None,
284 var: initial_rtt / 2,
285 min: initial_rtt,
286 }
287 }
288
289 pub fn get(&self) -> Duration {
291 self.smoothed.unwrap_or(self.latest)
292 }
293
294 pub fn conservative(&self) -> Duration {
299 self.get().max(self.latest)
300 }
301
302 pub fn min(&self) -> Duration {
304 self.min
305 }
306
307 pub(crate) fn pto_base(&self) -> Duration {
309 self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
310 }
311
312 pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
313 self.latest = rtt;
314 self.min = cmp::min(self.min, self.latest);
316 if let Some(smoothed) = self.smoothed {
318 let adjusted_rtt = if self.min + ack_delay <= self.latest {
319 self.latest - ack_delay
320 } else {
321 self.latest
322 };
323 let var_sample = if smoothed > adjusted_rtt {
324 smoothed - adjusted_rtt
325 } else {
326 adjusted_rtt - smoothed
327 };
328 self.var = (3 * self.var + var_sample) / 4;
329 self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
330 } else {
331 self.smoothed = Some(self.latest);
332 self.var = self.latest / 2;
333 self.min = self.latest;
334 }
335 }
336}
337
338#[derive(Default)]
339pub(crate) struct PathResponses {
340 pending: Vec<PathResponse>,
341}
342
343impl PathResponses {
344 pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
345 const MAX_PATH_RESPONSES: usize = 16;
347 let response = PathResponse {
348 packet,
349 token,
350 remote,
351 };
352 let existing = self.pending.iter_mut().find(|x| x.remote == remote);
353 if let Some(existing) = existing {
354 if existing.packet <= packet {
356 *existing = response;
357 }
358 return;
359 }
360 if self.pending.len() < MAX_PATH_RESPONSES {
361 self.pending.push(response);
362 } else {
363 trace!("ignoring excessive PATH_CHALLENGE");
366 }
367 }
368
369 pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
370 let response = *self.pending.last()?;
371 if response.remote == remote {
372 return None;
375 }
376 self.pending.pop();
377 Some((response.token, response.remote))
378 }
379
380 pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
381 let response = *self.pending.last()?;
382 if response.remote != remote {
383 return None;
386 }
387 self.pending.pop();
388 Some(response.token)
389 }
390
391 pub(crate) fn is_empty(&self) -> bool {
392 self.pending.is_empty()
393 }
394}
395
396#[derive(Copy, Clone)]
397struct PathResponse {
398 packet: u64,
400 token: u64,
401 remote: SocketAddr,
403}
404
405pub(super) struct InFlight {
408 pub(super) bytes: u64,
413 pub(super) ack_eliciting: u64,
419}
420
421impl InFlight {
422 fn new() -> Self {
423 Self {
424 bytes: 0,
425 ack_eliciting: 0,
426 }
427 }
428
429 fn insert(&mut self, packet: &SentPacket) {
430 self.bytes += u64::from(packet.size);
431 self.ack_eliciting += u64::from(packet.ack_eliciting);
432 }
433
434 fn remove(&mut self, packet: &SentPacket) {
436 self.bytes -= u64::from(packet.size);
437 self.ack_eliciting -= u64::from(packet.ack_eliciting);
438 }
439}