ant_quic/connection/
paths.rs

1use std::{cmp, net::SocketAddr};
2
3use tracing::trace;
4
5use super::{
6    mtud::MtuDiscovery,
7    pacing::Pacer,
8    spaces::{PacketSpace, SentPacket},
9};
10use crate::{Duration, Instant, TIMER_GRANULARITY, TransportConfig, congestion, packet::SpaceId};
11
12#[cfg(feature = "__qlog")]
13use qlog::events::quic::MetricsUpdated;
14
15/// Description of a particular network path
16pub(super) struct PathData {
17    pub(super) remote: SocketAddr,
18    pub(super) rtt: RttEstimator,
19    /// Whether we're enabling ECN on outgoing packets
20    pub(super) sending_ecn: bool,
21    /// Congestion controller state
22    pub(super) congestion: Box<dyn congestion::Controller>,
23    /// Pacing state
24    pub(super) pacing: Pacer,
25    pub(super) challenge: Option<u64>,
26    pub(super) challenge_pending: bool,
27    /// Whether we're certain the peer can both send and receive on this address
28    ///
29    /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
30    /// migration. Always true for clients.
31    pub(super) validated: bool,
32    /// Total size of all UDP datagrams sent on this path
33    pub(super) total_sent: u64,
34    /// Total size of all UDP datagrams received on this path
35    pub(super) total_recvd: u64,
36    /// The state of the MTU discovery process
37    pub(super) mtud: MtuDiscovery,
38    /// Packet number of the first packet sent after an RTT sample was collected on this path
39    ///
40    /// Used in persistent congestion determination.
41    pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
42    pub(super) in_flight: InFlight,
43    /// Number of the first packet sent on this path
44    ///
45    /// Used to determine whether a packet was sent on an earlier path. Insufficient to determine if
46    /// a packet was sent on a later path.
47    first_packet: Option<u64>,
48
49    /// Snapshot of the qlog recovery metrics
50    #[cfg(feature = "__qlog")]
51    congestion_metrics: CongestionMetrics,
52
53    /// Address discovery information for this path
54    pub(super) address_info: PathAddressInfo,
55    /// Rate limiter for OBSERVED_ADDRESS frames on this path
56    pub(super) observation_rate_limiter: PathObservationRateLimiter,
57}
58
59impl PathData {
60    pub(super) fn new(
61        remote: SocketAddr,
62        allow_mtud: bool,
63        peer_max_udp_payload_size: Option<u16>,
64        now: Instant,
65        config: &TransportConfig,
66    ) -> Self {
67        let congestion = config.congestion_controller_factory.new_controller(
68            config.get_initial_mtu() as u64,
69            16 * 1024 * 1024,
70            now,
71        );
72        Self {
73            remote,
74            rtt: RttEstimator::new(config.initial_rtt),
75            sending_ecn: true,
76            pacing: Pacer::new(
77                config.initial_rtt,
78                congestion.initial_window(),
79                config.get_initial_mtu(),
80                now,
81            ),
82            congestion,
83            challenge: None,
84            challenge_pending: false,
85            validated: false,
86            total_sent: 0,
87            total_recvd: 0,
88            mtud: config
89                .mtu_discovery_config
90                .as_ref()
91                .filter(|_| allow_mtud)
92                .map_or(
93                    MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
94                    |mtud_config| {
95                        MtuDiscovery::new(
96                            config.get_initial_mtu(),
97                            config.min_mtu,
98                            peer_max_udp_payload_size,
99                            mtud_config.clone(),
100                        )
101                    },
102                ),
103            first_packet_after_rtt_sample: None,
104            in_flight: InFlight::new(),
105            first_packet: None,
106            #[cfg(feature = "__qlog")]
107            congestion_metrics: CongestionMetrics::default(),
108            address_info: PathAddressInfo::new(),
109            observation_rate_limiter: PathObservationRateLimiter::new(10, now), // Default rate of 10
110        }
111    }
112
113    pub(super) fn from_previous(remote: SocketAddr, prev: &Self, now: Instant) -> Self {
114        let congestion = prev.congestion.clone_box();
115        let smoothed_rtt = prev.rtt.get();
116        Self {
117            remote,
118            rtt: prev.rtt,
119            pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
120            sending_ecn: true,
121            congestion,
122            challenge: None,
123            challenge_pending: false,
124            validated: false,
125            total_sent: 0,
126            total_recvd: 0,
127            mtud: prev.mtud.clone(),
128            first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
129            in_flight: InFlight::new(),
130            first_packet: None,
131            #[cfg(feature = "__qlog")]
132            congestion_metrics: prev.congestion_metrics.clone(),
133            address_info: PathAddressInfo::new(), // Reset for new path
134            observation_rate_limiter: PathObservationRateLimiter::new(
135                prev.observation_rate_limiter.rate as u8,
136                now,
137            ), // Fresh limiter with same rate
138        }
139    }
140
141    /// Resets RTT, congestion control and MTU states.
142    ///
143    /// This is useful when it is known the underlying path has changed.
144    pub(super) fn reset(&mut self, now: Instant, config: &TransportConfig) {
145        self.rtt = RttEstimator::new(config.initial_rtt);
146        self.congestion = config.congestion_controller_factory.new_controller(
147            config.get_initial_mtu() as u64,
148            16 * 1024 * 1024,
149            now,
150        );
151        self.mtud.reset(config.get_initial_mtu(), config.min_mtu);
152        self.address_info = PathAddressInfo::new(); // Reset address info
153        // Reset tokens but preserve rate
154        let rate = self.observation_rate_limiter.rate as u8;
155        self.observation_rate_limiter = PathObservationRateLimiter::new(rate, now);
156    }
157
158    /// Update the observed address for this path
159    pub(super) fn update_observed_address(&mut self, address: SocketAddr, now: Instant) {
160        self.address_info.update_observed_address(address, now);
161    }
162
163    /// Check if the observed address has changed from the expected remote address
164    pub(super) fn has_address_changed(&self) -> bool {
165        self.address_info.has_address_changed(&self.remote)
166    }
167
168    /// Mark that we've notified the application about the current address
169    pub(super) fn mark_address_notified(&mut self) {
170        self.address_info.mark_notified();
171    }
172
173    /// Check if we can send an observation on this path
174    pub(super) fn can_send_observation(&mut self, now: Instant) -> bool {
175        self.observation_rate_limiter.can_send(now)
176    }
177
178    /// Consume a token for sending an observation
179    pub(super) fn consume_observation_token(&mut self, now: Instant) {
180        self.observation_rate_limiter.consume_token(now)
181    }
182
183    /// Update observation tokens based on elapsed time
184    pub(super) fn update_observation_tokens(&mut self, now: Instant) {
185        self.observation_rate_limiter.update_tokens(now)
186    }
187
188    /// Set the observation rate for this path
189    pub(super) fn set_observation_rate(&mut self, rate: u8) {
190        self.observation_rate_limiter.set_rate(rate)
191    }
192
193    /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
194    /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
195    pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
196        !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
197    }
198
199    /// Returns the path's current MTU
200    pub(super) fn current_mtu(&self) -> u16 {
201        self.mtud.current_mtu()
202    }
203
204    /// Account for transmission of `packet` with number `pn` in `space`
205    pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketSpace) {
206        self.in_flight.insert(&packet);
207        if self.first_packet.is_none() {
208            self.first_packet = Some(pn);
209        }
210        self.in_flight.bytes -= space.sent(pn, packet);
211    }
212
213    /// Remove `packet` with number `pn` from this path's congestion control counters, or return
214    /// `false` if `pn` was sent before this path was established.
215    pub(super) fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) -> bool {
216        if self.first_packet.is_none_or(|first| first > pn) {
217            return false;
218        }
219        self.in_flight.remove(packet);
220        true
221    }
222
223    #[cfg(feature = "__qlog")]
224    pub(super) fn qlog_congestion_metrics(&mut self, pto_count: u32) -> Option<MetricsUpdated> {
225        let controller_metrics = self.congestion.metrics();
226
227        let metrics = CongestionMetrics {
228            min_rtt: Some(self.rtt.min),
229            smoothed_rtt: Some(self.rtt.get()),
230            latest_rtt: Some(self.rtt.latest),
231            rtt_variance: Some(self.rtt.var),
232            pto_count: Some(pto_count),
233            bytes_in_flight: Some(self.in_flight.bytes),
234            packets_in_flight: Some(self.in_flight.ack_eliciting),
235
236            congestion_window: Some(controller_metrics.congestion_window),
237            ssthresh: controller_metrics.ssthresh,
238            pacing_rate: controller_metrics.pacing_rate,
239        };
240
241        let event = metrics.to_qlog_event(&self.congestion_metrics);
242        self.congestion_metrics = metrics;
243        event
244    }
245}
246
247/// Congestion metrics as described in [`recovery_metrics_updated`].
248///
249/// [`recovery_metrics_updated`]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-quic-events.html#name-recovery_metrics_updated
250#[cfg(feature = "__qlog")]
251#[derive(Default, Clone, PartialEq)]
252#[non_exhaustive]
253struct CongestionMetrics {
254    pub min_rtt: Option<Duration>,
255    pub smoothed_rtt: Option<Duration>,
256    pub latest_rtt: Option<Duration>,
257    pub rtt_variance: Option<Duration>,
258    pub pto_count: Option<u32>,
259    pub bytes_in_flight: Option<u64>,
260    pub packets_in_flight: Option<u64>,
261    pub congestion_window: Option<u64>,
262    pub ssthresh: Option<u64>,
263    pub pacing_rate: Option<u64>,
264}
265
266#[cfg(feature = "__qlog")]
267impl CongestionMetrics {
268    /// Retain only values that have been updated since the last snapshot.
269    fn retain_updated(&self, previous: &Self) -> Self {
270        macro_rules! keep_if_changed {
271            ($name:ident) => {
272                if previous.$name == self.$name {
273                    None
274                } else {
275                    self.$name
276                }
277            };
278        }
279
280        Self {
281            min_rtt: keep_if_changed!(min_rtt),
282            smoothed_rtt: keep_if_changed!(smoothed_rtt),
283            latest_rtt: keep_if_changed!(latest_rtt),
284            rtt_variance: keep_if_changed!(rtt_variance),
285            pto_count: keep_if_changed!(pto_count),
286            bytes_in_flight: keep_if_changed!(bytes_in_flight),
287            packets_in_flight: keep_if_changed!(packets_in_flight),
288            congestion_window: keep_if_changed!(congestion_window),
289            ssthresh: keep_if_changed!(ssthresh),
290            pacing_rate: keep_if_changed!(pacing_rate),
291        }
292    }
293
294    /// Emit a `MetricsUpdated` event containing only updated values
295    fn to_qlog_event(&self, previous: &Self) -> Option<MetricsUpdated> {
296        let updated = self.retain_updated(previous);
297
298        if updated == Self::default() {
299            return None;
300        }
301
302        Some(MetricsUpdated {
303            min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
304            smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
305            latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
306            rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
307            pto_count: updated
308                .pto_count
309                .map(|count| count.try_into().unwrap_or(u16::MAX)),
310            bytes_in_flight: updated.bytes_in_flight,
311            packets_in_flight: updated.packets_in_flight,
312            congestion_window: updated.congestion_window,
313            ssthresh: updated.ssthresh,
314            pacing_rate: updated.pacing_rate,
315        })
316    }
317}
318
319/// RTT estimation for a particular network path
320#[derive(Copy, Clone)]
321pub struct RttEstimator {
322    /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
323    latest: Duration,
324    /// The smoothed RTT of the connection, computed as described in RFC6298
325    smoothed: Option<Duration>,
326    /// The RTT variance, computed as described in RFC6298
327    var: Duration,
328    /// The minimum RTT seen in the connection, ignoring ack delay.
329    min: Duration,
330}
331
332impl RttEstimator {
333    fn new(initial_rtt: Duration) -> Self {
334        Self {
335            latest: initial_rtt,
336            smoothed: None,
337            var: initial_rtt / 2,
338            min: initial_rtt,
339        }
340    }
341
342    /// The current best RTT estimation.
343    pub fn get(&self) -> Duration {
344        self.smoothed.unwrap_or(self.latest)
345    }
346
347    /// Conservative estimate of RTT
348    ///
349    /// Takes the maximum of smoothed and latest RTT, as recommended
350    /// in 6.1.2 of the recovery spec (draft 29).
351    pub fn conservative(&self) -> Duration {
352        self.get().max(self.latest)
353    }
354
355    /// Minimum RTT registered so far for this estimator.
356    pub fn min(&self) -> Duration {
357        self.min
358    }
359
360    // PTO computed as described in RFC9002#6.2.1
361    pub(crate) fn pto_base(&self) -> Duration {
362        self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
363    }
364
365    pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
366        self.latest = rtt;
367        // min_rtt ignores ack delay.
368        self.min = cmp::min(self.min, self.latest);
369        // Based on RFC6298.
370        if let Some(smoothed) = self.smoothed {
371            let adjusted_rtt = if self.min + ack_delay <= self.latest {
372                self.latest - ack_delay
373            } else {
374                self.latest
375            };
376            let var_sample = smoothed.abs_diff(adjusted_rtt);
377            self.var = (3 * self.var + var_sample) / 4;
378            self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
379        } else {
380            self.smoothed = Some(self.latest);
381            self.var = self.latest / 2;
382            self.min = self.latest;
383        }
384    }
385}
386
387#[derive(Default)]
388pub(crate) struct PathResponses {
389    pending: Vec<PathResponse>,
390}
391
392impl PathResponses {
393    pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
394        /// Arbitrary permissive limit to prevent abuse
395        const MAX_PATH_RESPONSES: usize = 16;
396        let response = PathResponse {
397            packet,
398            token,
399            remote,
400        };
401        let existing = self.pending.iter_mut().find(|x| x.remote == remote);
402        if let Some(existing) = existing {
403            // Update a queued response
404            if existing.packet <= packet {
405                *existing = response;
406            }
407            return;
408        }
409        if self.pending.len() < MAX_PATH_RESPONSES {
410            self.pending.push(response);
411        } else {
412            // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
413            // older challenges.
414            trace!("ignoring excessive PATH_CHALLENGE");
415        }
416    }
417
418    pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
419        let response = *self.pending.last()?;
420        if response.remote == remote {
421            // We don't bother searching further because we expect that the on-path response will
422            // get drained in the immediate future by a call to `pop_on_path`
423            return None;
424        }
425        self.pending.pop();
426        Some((response.token, response.remote))
427    }
428
429    pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
430        let response = *self.pending.last()?;
431        if response.remote != remote {
432            // We don't bother searching further because we expect that the off-path response will
433            // get drained in the immediate future by a call to `pop_off_path`
434            return None;
435        }
436        self.pending.pop();
437        Some(response.token)
438    }
439
440    pub(crate) fn is_empty(&self) -> bool {
441        self.pending.is_empty()
442    }
443}
444
445#[derive(Copy, Clone)]
446struct PathResponse {
447    /// The packet number the corresponding PATH_CHALLENGE was received in
448    packet: u64,
449    token: u64,
450    /// The address the corresponding PATH_CHALLENGE was received from
451    remote: SocketAddr,
452}
453
454/// Tracks PATH_CHALLENGE tokens for NAT traversal candidate validation
455#[derive(Default)]
456pub(crate) struct NatTraversalChallenges {
457    pending: Vec<NatTraversalChallenge>,
458}
459
460impl NatTraversalChallenges {
461    pub(crate) fn push(&mut self, remote: SocketAddr, token: u64) {
462        /// Arbitrary permissive limit to prevent abuse
463        const MAX_NAT_CHALLENGES: usize = 10;
464
465        // Check if we already have a challenge for this address
466        if let Some(existing) = self.pending.iter_mut().find(|x| x.remote == remote) {
467            existing.token = token;
468            return;
469        }
470
471        if self.pending.len() < MAX_NAT_CHALLENGES {
472            self.pending.push(NatTraversalChallenge { remote, token });
473        } else {
474            // Replace the oldest challenge
475            self.pending[0] = NatTraversalChallenge { remote, token };
476        }
477    }
478
479    pub(crate) fn is_empty(&self) -> bool {
480        self.pending.is_empty()
481    }
482}
483
484#[derive(Copy, Clone)]
485struct NatTraversalChallenge {
486    /// The address to send the PATH_CHALLENGE to
487    remote: SocketAddr,
488    /// The challenge token
489    token: u64,
490}
491
492/// Summary statistics of packets that have been sent on a particular path, but which have not yet
493/// been acked or deemed lost
494pub(super) struct InFlight {
495    /// Sum of the sizes of all sent packets considered "in flight" by congestion control
496    ///
497    /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
498    /// count towards this to ensure congestion control does not impede congestion feedback.
499    pub(super) bytes: u64,
500    /// Number of packets in flight containing frames other than ACK and PADDING
501    ///
502    /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
503    /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
504    /// also be nonzero.
505    pub(super) ack_eliciting: u64,
506}
507
508impl InFlight {
509    fn new() -> Self {
510        Self {
511            bytes: 0,
512            ack_eliciting: 0,
513        }
514    }
515
516    fn insert(&mut self, packet: &SentPacket) {
517        self.bytes += u64::from(packet.size);
518        self.ack_eliciting += u64::from(packet.ack_eliciting);
519    }
520
521    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
522    fn remove(&mut self, packet: &SentPacket) {
523        self.bytes -= u64::from(packet.size);
524        self.ack_eliciting -= u64::from(packet.ack_eliciting);
525    }
526}
527
528/// Information about addresses observed for a specific path
529#[derive(Debug, Clone, PartialEq, Eq)]
530pub(super) struct PathAddressInfo {
531    /// The most recently observed address for this path
532    pub(super) observed_address: Option<SocketAddr>,
533    /// When the address was last observed
534    pub(super) last_observed: Option<Instant>,
535    /// Number of times the address has been observed
536    pub(super) observation_count: u64,
537    /// Whether we've notified the application about this address
538    pub(super) notified: bool,
539}
540
541/// Rate limiter for OBSERVED_ADDRESS frames per path
542#[derive(Debug, Clone)]
543pub(super) struct PathObservationRateLimiter {
544    /// Tokens available for sending observations
545    pub(super) tokens: f64,
546    /// Maximum tokens (burst capacity)
547    pub(super) max_tokens: f64,
548    /// Rate of token replenishment (tokens per second)
549    pub(super) rate: f64,
550    /// Last time tokens were updated
551    pub(super) last_update: Instant,
552}
553
554impl PathObservationRateLimiter {
555    /// Create a new rate limiter with the given rate
556    pub(super) fn new(rate: u8, now: Instant) -> Self {
557        let rate_f64 = rate as f64;
558        Self {
559            tokens: rate_f64,
560            max_tokens: rate_f64,
561            rate: rate_f64,
562            last_update: now,
563        }
564    }
565
566    /// Update tokens based on elapsed time
567    pub(super) fn update_tokens(&mut self, now: Instant) {
568        let elapsed = now
569            .saturating_duration_since(self.last_update)
570            .as_secs_f64();
571        self.tokens = (self.tokens + elapsed * self.rate).min(self.max_tokens);
572        self.last_update = now;
573    }
574
575    /// Check if we can send an observation
576    pub(super) fn can_send(&mut self, now: Instant) -> bool {
577        self.update_tokens(now);
578        self.tokens >= 1.0
579    }
580
581    /// Consume a token for sending an observation
582    pub(super) fn consume_token(&mut self, now: Instant) {
583        self.update_tokens(now);
584        if self.tokens >= 1.0 {
585            self.tokens -= 1.0;
586        }
587    }
588
589    /// Update the rate
590    pub(super) fn set_rate(&mut self, rate: u8) {
591        let rate_f64 = rate as f64;
592        self.rate = rate_f64;
593        self.max_tokens = rate_f64;
594        // Don't change current tokens, just cap at new max
595        self.tokens = self.tokens.min(self.max_tokens);
596    }
597}
598
599impl PathAddressInfo {
600    pub(super) fn new() -> Self {
601        Self {
602            observed_address: None,
603            last_observed: None,
604            observation_count: 0,
605            notified: false,
606        }
607    }
608
609    /// Update with a newly observed address
610    pub(super) fn update_observed_address(&mut self, address: SocketAddr, now: Instant) {
611        if self.observed_address == Some(address) {
612            // Same address observed again - preserve notification status
613            self.observation_count += 1;
614        } else {
615            // New address observed
616            self.observed_address = Some(address);
617            self.observation_count = 1;
618            self.notified = false; // Reset notification flag for new address
619        }
620        self.last_observed = Some(now);
621    }
622
623    /// Check if the observed address has changed from the expected address
624    pub(super) fn has_address_changed(&self, expected: &SocketAddr) -> bool {
625        match self.observed_address {
626            Some(observed) => observed != *expected,
627            None => false,
628        }
629    }
630
631    /// Mark that we've notified the application about this address
632    pub(super) fn mark_notified(&mut self) {
633        self.notified = true;
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
641
642    #[test]
643    fn path_address_info_new() {
644        let info = PathAddressInfo::new();
645        assert_eq!(info.observed_address, None);
646        assert_eq!(info.last_observed, None);
647        assert_eq!(info.observation_count, 0);
648        assert!(!info.notified);
649    }
650
651    #[test]
652    fn path_address_info_update_new_address() {
653        let mut info = PathAddressInfo::new();
654        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
655        let now = Instant::now();
656
657        info.update_observed_address(addr, now);
658
659        assert_eq!(info.observed_address, Some(addr));
660        assert_eq!(info.last_observed, Some(now));
661        assert_eq!(info.observation_count, 1);
662        assert!(!info.notified);
663    }
664
665    #[test]
666    fn path_address_info_update_same_address() {
667        let mut info = PathAddressInfo::new();
668        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
669        let now1 = Instant::now();
670
671        info.update_observed_address(addr, now1);
672        assert_eq!(info.observation_count, 1);
673
674        let now2 = now1 + Duration::from_secs(1);
675        info.update_observed_address(addr, now2);
676
677        assert_eq!(info.observed_address, Some(addr));
678        assert_eq!(info.last_observed, Some(now2));
679        assert_eq!(info.observation_count, 2);
680    }
681
682    #[test]
683    fn path_address_info_update_different_address() {
684        let mut info = PathAddressInfo::new();
685        let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
686        let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
687        let now1 = Instant::now();
688
689        info.update_observed_address(addr1, now1);
690        info.mark_notified();
691        assert!(info.notified);
692
693        let now2 = now1 + Duration::from_secs(1);
694        info.update_observed_address(addr2, now2);
695
696        assert_eq!(info.observed_address, Some(addr2));
697        assert_eq!(info.last_observed, Some(now2));
698        assert_eq!(info.observation_count, 1);
699        assert!(!info.notified); // Reset when address changes
700    }
701
702    #[test]
703    fn path_address_info_has_address_changed() {
704        let mut info = PathAddressInfo::new();
705        let expected = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
706        let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
707
708        // No observed address yet
709        assert!(!info.has_address_changed(&expected));
710
711        // Same as expected
712        info.update_observed_address(expected, Instant::now());
713        assert!(!info.has_address_changed(&expected));
714
715        // Different from expected
716        info.update_observed_address(observed, Instant::now());
717        assert!(info.has_address_changed(&expected));
718    }
719
720    #[test]
721    fn path_address_info_ipv6() {
722        let mut info = PathAddressInfo::new();
723        let addr = SocketAddr::new(
724            IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
725            8080,
726        );
727        let now = Instant::now();
728
729        info.update_observed_address(addr, now);
730
731        assert_eq!(info.observed_address, Some(addr));
732        assert_eq!(info.observation_count, 1);
733    }
734
735    #[test]
736    fn path_address_info_notification_tracking() {
737        let mut info = PathAddressInfo::new();
738        assert!(!info.notified);
739
740        // First observe an address
741        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80);
742        info.update_observed_address(addr, Instant::now());
743        assert!(!info.notified);
744
745        // Mark as notified
746        info.mark_notified();
747        assert!(info.notified);
748
749        // Notification flag persists when observing same address
750        info.update_observed_address(addr, Instant::now());
751        assert!(info.notified); // Still true for same address
752
753        // But resets on address change
754        let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), 80);
755        info.update_observed_address(new_addr, Instant::now());
756        assert!(!info.notified);
757    }
758
759    // Tests for PathData with address discovery integration
760    #[test]
761    fn path_data_with_address_info() {
762        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
763        let config = TransportConfig::default();
764        let now = Instant::now();
765
766        let path = PathData::new(remote, false, None, now, &config);
767
768        // Should have address_info field
769        assert!(path.address_info.observed_address.is_none());
770        assert!(path.address_info.last_observed.is_none());
771        assert_eq!(path.address_info.observation_count, 0);
772        assert!(!path.address_info.notified);
773    }
774
775    #[test]
776    fn path_data_update_observed_address() {
777        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
778        let config = TransportConfig::default();
779        let now = Instant::now();
780
781        let mut path = PathData::new(remote, false, None, now, &config);
782
783        // Update observed address
784        let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
785        path.update_observed_address(observed, now);
786
787        assert_eq!(path.address_info.observed_address, Some(observed));
788        assert_eq!(path.address_info.last_observed, Some(now));
789        assert_eq!(path.address_info.observation_count, 1);
790        assert!(!path.address_info.notified);
791    }
792
793    #[test]
794    fn path_data_has_address_changed() {
795        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
796        let config = TransportConfig::default();
797        let now = Instant::now();
798
799        let mut path = PathData::new(remote, false, None, now, &config);
800
801        // No change when no observed address
802        assert!(!path.has_address_changed());
803
804        // Update with same as remote
805        path.update_observed_address(remote, now);
806        assert!(!path.has_address_changed());
807
808        // Update with different address
809        let different = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
810        path.update_observed_address(different, now);
811        assert!(path.has_address_changed());
812    }
813
814    #[test]
815    fn path_data_mark_address_notified() {
816        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
817        let config = TransportConfig::default();
818        let now = Instant::now();
819
820        let mut path = PathData::new(remote, false, None, now, &config);
821
822        // Update and mark as notified
823        let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
824        path.update_observed_address(observed, now);
825        assert!(!path.address_info.notified);
826
827        path.mark_address_notified();
828        assert!(path.address_info.notified);
829    }
830
831    #[test]
832    fn path_data_from_previous_preserves_address_info() {
833        let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
834        let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
835        let config = TransportConfig::default();
836        let now = Instant::now();
837
838        let mut path1 = PathData::new(remote1, false, None, now, &config);
839
840        // Set up address info
841        let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 5678);
842        path1.update_observed_address(observed, now);
843        path1.mark_address_notified();
844
845        // Create new path from previous
846        let path2 = PathData::from_previous(remote2, &path1, now);
847
848        // Address info should be reset for new path
849        assert!(path2.address_info.observed_address.is_none());
850        assert!(path2.address_info.last_observed.is_none());
851        assert_eq!(path2.address_info.observation_count, 0);
852        assert!(!path2.address_info.notified);
853    }
854
855    #[test]
856    fn path_data_reset_clears_address_info() {
857        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
858        let config = TransportConfig::default();
859        let now = Instant::now();
860
861        let mut path = PathData::new(remote, false, None, now, &config);
862
863        // Set up address info
864        let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
865        path.update_observed_address(observed, now);
866        path.mark_address_notified();
867
868        // Reset should clear address info
869        path.reset(now, &config);
870
871        assert!(path.address_info.observed_address.is_none());
872        assert!(path.address_info.last_observed.is_none());
873        assert_eq!(path.address_info.observation_count, 0);
874        assert!(!path.address_info.notified);
875    }
876
877    // Tests for per-path rate limiting
878    #[test]
879    fn path_data_with_rate_limiter() {
880        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
881        let config = TransportConfig::default();
882        let now = Instant::now();
883
884        let path = PathData::new(remote, false, None, now, &config);
885
886        // Should have a rate limiter
887        assert!(path.observation_rate_limiter.tokens > 0.0);
888        assert_eq!(path.observation_rate_limiter.rate, 10.0); // Default rate
889        assert_eq!(path.observation_rate_limiter.max_tokens, 10.0);
890        assert_eq!(path.observation_rate_limiter.last_update, now);
891    }
892
893    #[test]
894    fn path_data_can_send_observation() {
895        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
896        let config = TransportConfig::default();
897        let now = Instant::now();
898
899        let mut path = PathData::new(remote, false, None, now, &config);
900
901        // Should be able to send initially (has tokens)
902        assert!(path.can_send_observation(now));
903
904        // Consume a token
905        path.consume_observation_token(now);
906
907        // Should still have tokens available
908        assert!(path.can_send_observation(now));
909
910        // Consume all tokens
911        for _ in 0..9 {
912            path.consume_observation_token(now);
913        }
914
915        // Should be out of tokens
916        assert!(!path.can_send_observation(now));
917
918        // Wait for token replenishment
919        let later = now + Duration::from_millis(200); // 0.2 seconds = 2 tokens
920        assert!(path.can_send_observation(later));
921    }
922
923    #[test]
924    fn path_data_rate_limiter_replenishment() {
925        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
926        let config = TransportConfig::default();
927        let now = Instant::now();
928
929        let mut path = PathData::new(remote, false, None, now, &config);
930
931        // Consume all tokens
932        for _ in 0..10 {
933            path.consume_observation_token(now);
934        }
935        assert_eq!(path.observation_rate_limiter.tokens, 0.0);
936
937        // Check replenishment after 1 second
938        let later = now + Duration::from_secs(1);
939        path.update_observation_tokens(later);
940
941        // Should have replenished to max (rate = 10/sec)
942        assert_eq!(path.observation_rate_limiter.tokens, 10.0);
943    }
944
945    #[test]
946    fn path_data_rate_limiter_custom_rate() {
947        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
948        let config = TransportConfig::default();
949        let now = Instant::now();
950
951        let mut path = PathData::new(remote, false, None, now, &config);
952
953        // Update with custom rate
954        path.set_observation_rate(5); // 5 per second
955        assert_eq!(path.observation_rate_limiter.rate, 5.0);
956        assert_eq!(path.observation_rate_limiter.max_tokens, 5.0);
957
958        // Consume all tokens
959        for _ in 0..5 {
960            path.consume_observation_token(now);
961        }
962        assert!(!path.can_send_observation(now));
963
964        // Check replenishment with new rate
965        let later = now + Duration::from_millis(400); // 0.4 seconds = 2 tokens at rate 5
966        path.update_observation_tokens(later);
967        assert_eq!(path.observation_rate_limiter.tokens, 2.0);
968    }
969
970    #[test]
971    fn path_data_rate_limiter_from_previous() {
972        let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
973        let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
974        let config = TransportConfig::default();
975        let now = Instant::now();
976
977        let mut path1 = PathData::new(remote1, false, None, now, &config);
978
979        // Set custom rate and consume some tokens
980        path1.set_observation_rate(20);
981        for _ in 0..5 {
982            path1.consume_observation_token(now);
983        }
984
985        // Create new path from previous
986        let path2 = PathData::from_previous(remote2, &path1, now);
987
988        // New path should have fresh rate limiter with same rate
989        assert_eq!(path2.observation_rate_limiter.rate, 20.0);
990        assert_eq!(path2.observation_rate_limiter.max_tokens, 20.0);
991        assert_eq!(path2.observation_rate_limiter.tokens, 20.0); // Full tokens
992    }
993
994    #[test]
995    fn path_data_reset_preserves_rate() {
996        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
997        let config = TransportConfig::default();
998        let now = Instant::now();
999
1000        let mut path = PathData::new(remote, false, None, now, &config);
1001
1002        // Set custom rate
1003        path.set_observation_rate(15);
1004
1005        // Consume some tokens
1006        for _ in 0..3 {
1007            path.consume_observation_token(now);
1008        }
1009
1010        // Reset the path
1011        path.reset(now, &config);
1012
1013        // Rate should be preserved, tokens should be reset
1014        assert_eq!(path.observation_rate_limiter.rate, 15.0);
1015        assert_eq!(path.observation_rate_limiter.tokens, 15.0); // Full tokens after reset
1016    }
1017}