ant_quic/connection/
paths.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{cmp, net::SocketAddr};
9
10use tracing::trace;
11
12use super::{
13    mtud::MtuDiscovery,
14    pacing::Pacer,
15    spaces::{PacketSpace, SentPacket},
16};
17use crate::{Duration, Instant, TIMER_GRANULARITY, TransportConfig, congestion, packet::SpaceId};
18
19#[cfg(feature = "__qlog")]
20use qlog::events::quic::MetricsUpdated;
21
22/// Description of a particular network path
23pub(super) struct PathData {
24    pub(super) remote: SocketAddr,
25    pub(super) rtt: RttEstimator,
26    /// Whether we're enabling ECN on outgoing packets
27    pub(super) sending_ecn: bool,
28    /// Congestion controller state
29    pub(super) congestion: Box<dyn congestion::Controller>,
30    /// Pacing state
31    pub(super) pacing: Pacer,
32    pub(super) challenge: Option<u64>,
33    pub(super) challenge_pending: bool,
34    /// Whether we're certain the peer can both send and receive on this address
35    ///
36    /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
37    /// migration. Always true for clients.
38    pub(super) validated: bool,
39    /// Total size of all UDP datagrams sent on this path
40    pub(super) total_sent: u64,
41    /// Total size of all UDP datagrams received on this path
42    pub(super) total_recvd: u64,
43    /// The state of the MTU discovery process
44    pub(super) mtud: MtuDiscovery,
45    /// Packet number of the first packet sent after an RTT sample was collected on this path
46    ///
47    /// Used in persistent congestion determination.
48    pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
49    pub(super) in_flight: InFlight,
50    /// Number of the first packet sent on this path
51    ///
52    /// Used to determine whether a packet was sent on an earlier path. Insufficient to determine if
53    /// a packet was sent on a later path.
54    first_packet: Option<u64>,
55
56    /// Snapshot of the qlog recovery metrics
57    #[cfg(feature = "__qlog")]
58    congestion_metrics: CongestionMetrics,
59
60    /// Address discovery information for this path
61    pub(super) address_info: PathAddressInfo,
62    /// Rate limiter for OBSERVED_ADDRESS frames on this path
63    pub(super) observation_rate_limiter: PathObservationRateLimiter,
64}
65
66impl PathData {
67    pub(super) fn new(
68        remote: SocketAddr,
69        allow_mtud: bool,
70        peer_max_udp_payload_size: Option<u16>,
71        now: Instant,
72        config: &TransportConfig,
73    ) -> Self {
74        let congestion = config.congestion_controller_factory.new_controller(
75            config.get_initial_mtu() as u64,
76            16 * 1024 * 1024,
77            now,
78        );
79        Self {
80            remote,
81            rtt: RttEstimator::new(config.initial_rtt),
82            sending_ecn: true,
83            pacing: Pacer::new(
84                config.initial_rtt,
85                congestion.initial_window(),
86                config.get_initial_mtu(),
87                now,
88            ),
89            congestion,
90            challenge: None,
91            challenge_pending: false,
92            validated: false,
93            total_sent: 0,
94            total_recvd: 0,
95            mtud: config
96                .mtu_discovery_config
97                .as_ref()
98                .filter(|_| allow_mtud)
99                .map_or(
100                    MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
101                    |mtud_config| {
102                        MtuDiscovery::new(
103                            config.get_initial_mtu(),
104                            config.min_mtu,
105                            peer_max_udp_payload_size,
106                            mtud_config.clone(),
107                        )
108                    },
109                ),
110            first_packet_after_rtt_sample: None,
111            in_flight: InFlight::new(),
112            first_packet: None,
113            #[cfg(feature = "__qlog")]
114            congestion_metrics: CongestionMetrics::default(),
115            address_info: PathAddressInfo::new(),
116            observation_rate_limiter: PathObservationRateLimiter::new(10, now), // Default rate of 10
117        }
118    }
119
120    pub(super) fn from_previous(remote: SocketAddr, prev: &Self, now: Instant) -> Self {
121        let congestion = prev.congestion.clone_box();
122        let smoothed_rtt = prev.rtt.get();
123        Self {
124            remote,
125            rtt: prev.rtt,
126            pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
127            sending_ecn: true,
128            congestion,
129            challenge: None,
130            challenge_pending: false,
131            validated: false,
132            total_sent: 0,
133            total_recvd: 0,
134            mtud: prev.mtud.clone(),
135            first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
136            in_flight: InFlight::new(),
137            first_packet: None,
138            #[cfg(feature = "__qlog")]
139            congestion_metrics: prev.congestion_metrics.clone(),
140            address_info: PathAddressInfo::new(), // Reset for new path
141            observation_rate_limiter: PathObservationRateLimiter::new(
142                prev.observation_rate_limiter.rate as u8,
143                now,
144            ), // Fresh limiter with same rate
145        }
146    }
147
148    /// Resets RTT, congestion control and MTU states.
149    ///
150    /// This is useful when it is known the underlying path has changed.
151    pub(super) fn reset(&mut self, now: Instant, config: &TransportConfig) {
152        self.rtt = RttEstimator::new(config.initial_rtt);
153        self.congestion = config.congestion_controller_factory.new_controller(
154            config.get_initial_mtu() as u64,
155            16 * 1024 * 1024,
156            now,
157        );
158        self.mtud.reset(config.get_initial_mtu(), config.min_mtu);
159        self.address_info = PathAddressInfo::new(); // Reset address info
160        // Reset tokens but preserve rate
161        let rate = self.observation_rate_limiter.rate as u8;
162        self.observation_rate_limiter = PathObservationRateLimiter::new(rate, now);
163    }
164
165    /// Update the observed address for this path
166    pub(super) fn update_observed_address(&mut self, address: SocketAddr, now: Instant) {
167        self.address_info.update_observed_address(address, now);
168    }
169
170    /// Check if the observed address has changed from the expected remote address
171    #[allow(dead_code)]
172    pub(super) fn has_address_changed(&self) -> bool {
173        self.address_info.has_address_changed(&self.remote)
174    }
175
176    /// Mark that we've notified the application about the current address
177    #[allow(dead_code)]
178    pub(super) fn mark_address_notified(&mut self) {
179        self.address_info.mark_notified();
180    }
181
182    /// Check if we can send an observation on this path
183    #[allow(dead_code)]
184    pub(super) fn can_send_observation(&mut self, now: Instant) -> bool {
185        self.observation_rate_limiter.can_send(now)
186    }
187
188    /// Consume a token for sending an observation
189    #[allow(dead_code)]
190    pub(super) fn consume_observation_token(&mut self, now: Instant) {
191        self.observation_rate_limiter.consume_token(now)
192    }
193
194    /// Update observation tokens based on elapsed time
195    #[allow(dead_code)]
196    pub(super) fn update_observation_tokens(&mut self, now: Instant) {
197        self.observation_rate_limiter.update_tokens(now)
198    }
199
200    /// Set the observation rate for this path
201    pub(super) fn set_observation_rate(&mut self, rate: u8) {
202        self.observation_rate_limiter.set_rate(rate)
203    }
204
205    /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
206    /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
207    pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
208        !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
209    }
210
211    /// Returns the path's current MTU
212    pub(super) fn current_mtu(&self) -> u16 {
213        self.mtud.current_mtu()
214    }
215
216    /// Account for transmission of `packet` with number `pn` in `space`
217    pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketSpace) {
218        self.in_flight.insert(&packet);
219        if self.first_packet.is_none() {
220            self.first_packet = Some(pn);
221        }
222        self.in_flight.bytes -= space.sent(pn, packet);
223    }
224
225    /// Remove `packet` with number `pn` from this path's congestion control counters, or return
226    /// `false` if `pn` was sent before this path was established.
227    pub(super) fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) -> bool {
228        if self.first_packet.is_none_or(|first| first > pn) {
229            return false;
230        }
231        self.in_flight.remove(packet);
232        true
233    }
234
235    #[cfg(feature = "__qlog")]
236    #[allow(dead_code)]
237    pub(super) fn qlog_congestion_metrics(&mut self, pto_count: u32) -> Option<MetricsUpdated> {
238        let controller_metrics = self.congestion.metrics();
239
240        let metrics = CongestionMetrics {
241            min_rtt: Some(self.rtt.min),
242            smoothed_rtt: Some(self.rtt.get()),
243            latest_rtt: Some(self.rtt.latest),
244            rtt_variance: Some(self.rtt.var),
245            pto_count: Some(pto_count),
246            bytes_in_flight: Some(self.in_flight.bytes),
247            packets_in_flight: Some(self.in_flight.ack_eliciting),
248
249            congestion_window: Some(controller_metrics.congestion_window),
250            ssthresh: controller_metrics.ssthresh,
251            pacing_rate: controller_metrics.pacing_rate,
252        };
253
254        let event = metrics.to_qlog_event(&self.congestion_metrics);
255        self.congestion_metrics = metrics;
256        event
257    }
258}
259
260/// Congestion metrics as described in [`recovery_metrics_updated`].
261///
262/// [`recovery_metrics_updated`]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-quic-events.html#name-recovery_metrics_updated
263#[cfg(feature = "__qlog")]
264#[derive(Default, Clone, PartialEq)]
265#[non_exhaustive]
266struct CongestionMetrics {
267    pub min_rtt: Option<Duration>,
268    pub smoothed_rtt: Option<Duration>,
269    pub latest_rtt: Option<Duration>,
270    pub rtt_variance: Option<Duration>,
271    pub pto_count: Option<u32>,
272    pub bytes_in_flight: Option<u64>,
273    pub packets_in_flight: Option<u64>,
274    pub congestion_window: Option<u64>,
275    pub ssthresh: Option<u64>,
276    pub pacing_rate: Option<u64>,
277}
278
279#[cfg(feature = "__qlog")]
280impl CongestionMetrics {
281    /// Retain only values that have been updated since the last snapshot.
282    #[allow(dead_code)]
283    fn retain_updated(&self, previous: &Self) -> Self {
284        macro_rules! keep_if_changed {
285            ($name:ident) => {
286                if previous.$name == self.$name {
287                    None
288                } else {
289                    self.$name
290                }
291            };
292        }
293
294        Self {
295            min_rtt: keep_if_changed!(min_rtt),
296            smoothed_rtt: keep_if_changed!(smoothed_rtt),
297            latest_rtt: keep_if_changed!(latest_rtt),
298            rtt_variance: keep_if_changed!(rtt_variance),
299            pto_count: keep_if_changed!(pto_count),
300            bytes_in_flight: keep_if_changed!(bytes_in_flight),
301            packets_in_flight: keep_if_changed!(packets_in_flight),
302            congestion_window: keep_if_changed!(congestion_window),
303            ssthresh: keep_if_changed!(ssthresh),
304            pacing_rate: keep_if_changed!(pacing_rate),
305        }
306    }
307
308    /// Emit a `MetricsUpdated` event containing only updated values
309    #[allow(dead_code)]
310    fn to_qlog_event(&self, previous: &Self) -> Option<MetricsUpdated> {
311        let updated = self.retain_updated(previous);
312
313        if updated == Self::default() {
314            return None;
315        }
316
317        Some(MetricsUpdated {
318            min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
319            smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
320            latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
321            rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
322            pto_count: updated
323                .pto_count
324                .map(|count| count.try_into().unwrap_or(u16::MAX)),
325            bytes_in_flight: updated.bytes_in_flight,
326            packets_in_flight: updated.packets_in_flight,
327            congestion_window: updated.congestion_window,
328            ssthresh: updated.ssthresh,
329            pacing_rate: updated.pacing_rate,
330        })
331    }
332}
333
334/// RTT estimation for a particular network path
335#[derive(Copy, Clone)]
336pub struct RttEstimator {
337    /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
338    latest: Duration,
339    /// The smoothed RTT of the connection, computed as described in RFC6298
340    smoothed: Option<Duration>,
341    /// The RTT variance, computed as described in RFC6298
342    var: Duration,
343    /// The minimum RTT seen in the connection, ignoring ack delay.
344    min: Duration,
345}
346
347impl RttEstimator {
348    fn new(initial_rtt: Duration) -> Self {
349        Self {
350            latest: initial_rtt,
351            smoothed: None,
352            var: initial_rtt / 2,
353            min: initial_rtt,
354        }
355    }
356
357    /// The current best RTT estimation.
358    pub fn get(&self) -> Duration {
359        self.smoothed.unwrap_or(self.latest)
360    }
361
362    /// Conservative estimate of RTT
363    ///
364    /// Takes the maximum of smoothed and latest RTT, as recommended
365    /// in 6.1.2 of the recovery spec (draft 29).
366    pub fn conservative(&self) -> Duration {
367        self.get().max(self.latest)
368    }
369
370    /// Minimum RTT registered so far for this estimator.
371    pub fn min(&self) -> Duration {
372        self.min
373    }
374
375    // PTO computed as described in RFC9002#6.2.1
376    pub(crate) fn pto_base(&self) -> Duration {
377        self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
378    }
379
380    pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
381        self.latest = rtt;
382        // min_rtt ignores ack delay.
383        self.min = cmp::min(self.min, self.latest);
384        // Based on RFC6298.
385        if let Some(smoothed) = self.smoothed {
386            let adjusted_rtt = if self.min + ack_delay <= self.latest {
387                self.latest - ack_delay
388            } else {
389                self.latest
390            };
391            let var_sample = smoothed.abs_diff(adjusted_rtt);
392            self.var = (3 * self.var + var_sample) / 4;
393            self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
394        } else {
395            self.smoothed = Some(self.latest);
396            self.var = self.latest / 2;
397            self.min = self.latest;
398        }
399    }
400}
401
402#[derive(Default)]
403pub(crate) struct PathResponses {
404    pending: Vec<PathResponse>,
405}
406
407impl PathResponses {
408    pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
409        /// Arbitrary permissive limit to prevent abuse
410        const MAX_PATH_RESPONSES: usize = 16;
411        let response = PathResponse {
412            packet,
413            token,
414            remote,
415        };
416        let existing = self.pending.iter_mut().find(|x| x.remote == remote);
417        if let Some(existing) = existing {
418            // Update a queued response
419            if existing.packet <= packet {
420                *existing = response;
421            }
422            return;
423        }
424        if self.pending.len() < MAX_PATH_RESPONSES {
425            self.pending.push(response);
426        } else {
427            // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
428            // older challenges.
429            trace!("ignoring excessive PATH_CHALLENGE");
430        }
431    }
432
433    pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
434        let response = *self.pending.last()?;
435        if response.remote == remote {
436            // We don't bother searching further because we expect that the on-path response will
437            // get drained in the immediate future by a call to `pop_on_path`
438            return None;
439        }
440        self.pending.pop();
441        Some((response.token, response.remote))
442    }
443
444    pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
445        let response = *self.pending.last()?;
446        if response.remote != remote {
447            // We don't bother searching further because we expect that the off-path response will
448            // get drained in the immediate future by a call to `pop_off_path`
449            return None;
450        }
451        self.pending.pop();
452        Some(response.token)
453    }
454
455    pub(crate) fn is_empty(&self) -> bool {
456        self.pending.is_empty()
457    }
458}
459
460#[derive(Copy, Clone)]
461struct PathResponse {
462    /// The packet number the corresponding PATH_CHALLENGE was received in
463    packet: u64,
464    token: u64,
465    /// The address the corresponding PATH_CHALLENGE was received from
466    remote: SocketAddr,
467}
468
469/// Tracks PATH_CHALLENGE tokens for NAT traversal candidate validation
470#[derive(Default)]
471pub(crate) struct NatTraversalChallenges {
472    pending: Vec<NatTraversalChallenge>,
473}
474
475impl NatTraversalChallenges {
476    pub(crate) fn push(&mut self, remote: SocketAddr, token: u64) {
477        /// Arbitrary permissive limit to prevent abuse
478        const MAX_NAT_CHALLENGES: usize = 10;
479
480        // Check if we already have a challenge for this address
481        if let Some(existing) = self.pending.iter_mut().find(|x| x.remote == remote) {
482            existing.token = token;
483            return;
484        }
485
486        if self.pending.len() < MAX_NAT_CHALLENGES {
487            self.pending.push(NatTraversalChallenge { remote, token });
488        } else {
489            // Replace the oldest challenge
490            self.pending[0] = NatTraversalChallenge { remote, token };
491        }
492    }
493
494    pub(crate) fn is_empty(&self) -> bool {
495        self.pending.is_empty()
496    }
497}
498
499#[derive(Copy, Clone)]
500struct NatTraversalChallenge {
501    /// The address to send the PATH_CHALLENGE to
502    remote: SocketAddr,
503    /// The challenge token
504    token: u64,
505}
506
507/// Summary statistics of packets that have been sent on a particular path, but which have not yet
508/// been acked or deemed lost
509pub(super) struct InFlight {
510    /// Sum of the sizes of all sent packets considered "in flight" by congestion control
511    ///
512    /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
513    /// count towards this to ensure congestion control does not impede congestion feedback.
514    pub(super) bytes: u64,
515    /// Number of packets in flight containing frames other than ACK and PADDING
516    ///
517    /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
518    /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
519    /// also be nonzero.
520    pub(super) ack_eliciting: u64,
521}
522
523impl InFlight {
524    fn new() -> Self {
525        Self {
526            bytes: 0,
527            ack_eliciting: 0,
528        }
529    }
530
531    fn insert(&mut self, packet: &SentPacket) {
532        self.bytes += u64::from(packet.size);
533        self.ack_eliciting += u64::from(packet.ack_eliciting);
534    }
535
536    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
537    fn remove(&mut self, packet: &SentPacket) {
538        self.bytes -= u64::from(packet.size);
539        self.ack_eliciting -= u64::from(packet.ack_eliciting);
540    }
541}
542
543/// Information about addresses observed for a specific path
544#[derive(Debug, Clone, PartialEq, Eq)]
545pub(super) struct PathAddressInfo {
546    /// The most recently observed address for this path
547    pub(super) observed_address: Option<SocketAddr>,
548    /// When the address was last observed
549    pub(super) last_observed: Option<Instant>,
550    /// Number of times the address has been observed
551    pub(super) observation_count: u64,
552    /// Whether we've notified the application about this address
553    pub(super) notified: bool,
554}
555
556/// Rate limiter for OBSERVED_ADDRESS frames per path
557#[derive(Debug, Clone)]
558pub(super) struct PathObservationRateLimiter {
559    /// Tokens available for sending observations
560    pub(super) tokens: f64,
561    /// Maximum tokens (burst capacity)
562    pub(super) max_tokens: f64,
563    /// Rate of token replenishment (tokens per second)
564    pub(super) rate: f64,
565    /// Last time tokens were updated
566    pub(super) last_update: Instant,
567}
568
569impl PathObservationRateLimiter {
570    /// Create a new rate limiter with the given rate
571    pub(super) fn new(rate: u8, now: Instant) -> Self {
572        let rate_f64 = rate as f64;
573        Self {
574            tokens: rate_f64,
575            max_tokens: rate_f64,
576            rate: rate_f64,
577            last_update: now,
578        }
579    }
580
581    /// Update tokens based on elapsed time
582    pub(super) fn update_tokens(&mut self, now: Instant) {
583        let elapsed = now
584            .saturating_duration_since(self.last_update)
585            .as_secs_f64();
586        self.tokens = (self.tokens + elapsed * self.rate).min(self.max_tokens);
587        self.last_update = now;
588    }
589
590    /// Check if we can send an observation
591    pub(super) fn can_send(&mut self, now: Instant) -> bool {
592        self.update_tokens(now);
593        self.tokens >= 1.0
594    }
595
596    /// Consume a token for sending an observation
597    pub(super) fn consume_token(&mut self, now: Instant) {
598        self.update_tokens(now);
599        if self.tokens >= 1.0 {
600            self.tokens -= 1.0;
601        }
602    }
603
604    /// Update the rate
605    pub(super) fn set_rate(&mut self, rate: u8) {
606        let rate_f64 = rate as f64;
607        self.rate = rate_f64;
608        self.max_tokens = rate_f64;
609        // Don't change current tokens, just cap at new max
610        self.tokens = self.tokens.min(self.max_tokens);
611    }
612}
613
614impl PathAddressInfo {
615    pub(super) fn new() -> Self {
616        Self {
617            observed_address: None,
618            last_observed: None,
619            observation_count: 0,
620            notified: false,
621        }
622    }
623
624    /// Update with a newly observed address
625    pub(super) fn update_observed_address(&mut self, address: SocketAddr, now: Instant) {
626        if self.observed_address == Some(address) {
627            // Same address observed again - preserve notification status
628            self.observation_count += 1;
629        } else {
630            // New address observed
631            self.observed_address = Some(address);
632            self.observation_count = 1;
633            self.notified = false; // Reset notification flag for new address
634        }
635        self.last_observed = Some(now);
636    }
637
638    /// Check if the observed address has changed from the expected address
639    pub(super) fn has_address_changed(&self, expected: &SocketAddr) -> bool {
640        match self.observed_address {
641            Some(observed) => observed != *expected,
642            None => false,
643        }
644    }
645
646    /// Mark that we've notified the application about this address
647    pub(super) fn mark_notified(&mut self) {
648        self.notified = true;
649    }
650}
651
652#[cfg(test)]
653mod tests {
654    use super::*;
655    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
656
657    #[test]
658    fn path_address_info_new() {
659        let info = PathAddressInfo::new();
660        assert_eq!(info.observed_address, None);
661        assert_eq!(info.last_observed, None);
662        assert_eq!(info.observation_count, 0);
663        assert!(!info.notified);
664    }
665
666    #[test]
667    fn path_address_info_update_new_address() {
668        let mut info = PathAddressInfo::new();
669        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
670        let now = Instant::now();
671
672        info.update_observed_address(addr, now);
673
674        assert_eq!(info.observed_address, Some(addr));
675        assert_eq!(info.last_observed, Some(now));
676        assert_eq!(info.observation_count, 1);
677        assert!(!info.notified);
678    }
679
680    #[test]
681    fn path_address_info_update_same_address() {
682        let mut info = PathAddressInfo::new();
683        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
684        let now1 = Instant::now();
685
686        info.update_observed_address(addr, now1);
687        assert_eq!(info.observation_count, 1);
688
689        let now2 = now1 + Duration::from_secs(1);
690        info.update_observed_address(addr, now2);
691
692        assert_eq!(info.observed_address, Some(addr));
693        assert_eq!(info.last_observed, Some(now2));
694        assert_eq!(info.observation_count, 2);
695    }
696
697    #[test]
698    fn path_address_info_update_different_address() {
699        let mut info = PathAddressInfo::new();
700        let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
701        let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
702        let now1 = Instant::now();
703
704        info.update_observed_address(addr1, now1);
705        info.mark_notified();
706        assert!(info.notified);
707
708        let now2 = now1 + Duration::from_secs(1);
709        info.update_observed_address(addr2, now2);
710
711        assert_eq!(info.observed_address, Some(addr2));
712        assert_eq!(info.last_observed, Some(now2));
713        assert_eq!(info.observation_count, 1);
714        assert!(!info.notified); // Reset when address changes
715    }
716
717    #[test]
718    fn path_address_info_has_address_changed() {
719        let mut info = PathAddressInfo::new();
720        let expected = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
721        let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
722
723        // No observed address yet
724        assert!(!info.has_address_changed(&expected));
725
726        // Same as expected
727        info.update_observed_address(expected, Instant::now());
728        assert!(!info.has_address_changed(&expected));
729
730        // Different from expected
731        info.update_observed_address(observed, Instant::now());
732        assert!(info.has_address_changed(&expected));
733    }
734
735    #[test]
736    fn path_address_info_ipv6() {
737        let mut info = PathAddressInfo::new();
738        let addr = SocketAddr::new(
739            IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
740            8080,
741        );
742        let now = Instant::now();
743
744        info.update_observed_address(addr, now);
745
746        assert_eq!(info.observed_address, Some(addr));
747        assert_eq!(info.observation_count, 1);
748    }
749
750    #[test]
751    fn path_address_info_notification_tracking() {
752        let mut info = PathAddressInfo::new();
753        assert!(!info.notified);
754
755        // First observe an address
756        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80);
757        info.update_observed_address(addr, Instant::now());
758        assert!(!info.notified);
759
760        // Mark as notified
761        info.mark_notified();
762        assert!(info.notified);
763
764        // Notification flag persists when observing same address
765        info.update_observed_address(addr, Instant::now());
766        assert!(info.notified); // Still true for same address
767
768        // But resets on address change
769        let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), 80);
770        info.update_observed_address(new_addr, Instant::now());
771        assert!(!info.notified);
772    }
773
774    // Tests for PathData with address discovery integration
775    #[test]
776    fn path_data_with_address_info() {
777        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
778        let config = TransportConfig::default();
779        let now = Instant::now();
780
781        let path = PathData::new(remote, false, None, now, &config);
782
783        // Should have address_info field
784        assert!(path.address_info.observed_address.is_none());
785        assert!(path.address_info.last_observed.is_none());
786        assert_eq!(path.address_info.observation_count, 0);
787        assert!(!path.address_info.notified);
788    }
789
790    #[test]
791    fn path_data_update_observed_address() {
792        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
793        let config = TransportConfig::default();
794        let now = Instant::now();
795
796        let mut path = PathData::new(remote, false, None, now, &config);
797
798        // Update observed address
799        let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
800        path.update_observed_address(observed, now);
801
802        assert_eq!(path.address_info.observed_address, Some(observed));
803        assert_eq!(path.address_info.last_observed, Some(now));
804        assert_eq!(path.address_info.observation_count, 1);
805        assert!(!path.address_info.notified);
806    }
807
808    #[test]
809    fn path_data_has_address_changed() {
810        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
811        let config = TransportConfig::default();
812        let now = Instant::now();
813
814        let mut path = PathData::new(remote, false, None, now, &config);
815
816        // No change when no observed address
817        assert!(!path.has_address_changed());
818
819        // Update with same as remote
820        path.update_observed_address(remote, now);
821        assert!(!path.has_address_changed());
822
823        // Update with different address
824        let different = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
825        path.update_observed_address(different, now);
826        assert!(path.has_address_changed());
827    }
828
829    #[test]
830    fn path_data_mark_address_notified() {
831        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
832        let config = TransportConfig::default();
833        let now = Instant::now();
834
835        let mut path = PathData::new(remote, false, None, now, &config);
836
837        // Update and mark as notified
838        let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
839        path.update_observed_address(observed, now);
840        assert!(!path.address_info.notified);
841
842        path.mark_address_notified();
843        assert!(path.address_info.notified);
844    }
845
846    #[test]
847    fn path_data_from_previous_preserves_address_info() {
848        let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
849        let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
850        let config = TransportConfig::default();
851        let now = Instant::now();
852
853        let mut path1 = PathData::new(remote1, false, None, now, &config);
854
855        // Set up address info
856        let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 5678);
857        path1.update_observed_address(observed, now);
858        path1.mark_address_notified();
859
860        // Create new path from previous
861        let path2 = PathData::from_previous(remote2, &path1, now);
862
863        // Address info should be reset for new path
864        assert!(path2.address_info.observed_address.is_none());
865        assert!(path2.address_info.last_observed.is_none());
866        assert_eq!(path2.address_info.observation_count, 0);
867        assert!(!path2.address_info.notified);
868    }
869
870    #[test]
871    fn path_data_reset_clears_address_info() {
872        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
873        let config = TransportConfig::default();
874        let now = Instant::now();
875
876        let mut path = PathData::new(remote, false, None, now, &config);
877
878        // Set up address info
879        let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
880        path.update_observed_address(observed, now);
881        path.mark_address_notified();
882
883        // Reset should clear address info
884        path.reset(now, &config);
885
886        assert!(path.address_info.observed_address.is_none());
887        assert!(path.address_info.last_observed.is_none());
888        assert_eq!(path.address_info.observation_count, 0);
889        assert!(!path.address_info.notified);
890    }
891
892    // Tests for per-path rate limiting
893    #[test]
894    fn path_data_with_rate_limiter() {
895        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
896        let config = TransportConfig::default();
897        let now = Instant::now();
898
899        let path = PathData::new(remote, false, None, now, &config);
900
901        // Should have a rate limiter
902        assert!(path.observation_rate_limiter.tokens > 0.0);
903        assert_eq!(path.observation_rate_limiter.rate, 10.0); // Default rate
904        assert_eq!(path.observation_rate_limiter.max_tokens, 10.0);
905        assert_eq!(path.observation_rate_limiter.last_update, now);
906    }
907
908    #[test]
909    fn path_data_can_send_observation() {
910        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
911        let config = TransportConfig::default();
912        let now = Instant::now();
913
914        let mut path = PathData::new(remote, false, None, now, &config);
915
916        // Should be able to send initially (has tokens)
917        assert!(path.can_send_observation(now));
918
919        // Consume a token
920        path.consume_observation_token(now);
921
922        // Should still have tokens available
923        assert!(path.can_send_observation(now));
924
925        // Consume all tokens
926        for _ in 0..9 {
927            path.consume_observation_token(now);
928        }
929
930        // Should be out of tokens
931        assert!(!path.can_send_observation(now));
932
933        // Wait for token replenishment
934        let later = now + Duration::from_millis(200); // 0.2 seconds = 2 tokens
935        assert!(path.can_send_observation(later));
936    }
937
938    #[test]
939    fn path_data_rate_limiter_replenishment() {
940        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
941        let config = TransportConfig::default();
942        let now = Instant::now();
943
944        let mut path = PathData::new(remote, false, None, now, &config);
945
946        // Consume all tokens
947        for _ in 0..10 {
948            path.consume_observation_token(now);
949        }
950        assert_eq!(path.observation_rate_limiter.tokens, 0.0);
951
952        // Check replenishment after 1 second
953        let later = now + Duration::from_secs(1);
954        path.update_observation_tokens(later);
955
956        // Should have replenished to max (rate = 10/sec)
957        assert_eq!(path.observation_rate_limiter.tokens, 10.0);
958    }
959
960    #[test]
961    fn path_data_rate_limiter_custom_rate() {
962        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
963        let config = TransportConfig::default();
964        let now = Instant::now();
965
966        let mut path = PathData::new(remote, false, None, now, &config);
967
968        // Update with custom rate
969        path.set_observation_rate(5); // 5 per second
970        assert_eq!(path.observation_rate_limiter.rate, 5.0);
971        assert_eq!(path.observation_rate_limiter.max_tokens, 5.0);
972
973        // Consume all tokens
974        for _ in 0..5 {
975            path.consume_observation_token(now);
976        }
977        assert!(!path.can_send_observation(now));
978
979        // Check replenishment with new rate
980        let later = now + Duration::from_millis(400); // 0.4 seconds = 2 tokens at rate 5
981        path.update_observation_tokens(later);
982        assert_eq!(path.observation_rate_limiter.tokens, 2.0);
983    }
984
985    #[test]
986    fn path_data_rate_limiter_from_previous() {
987        let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
988        let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
989        let config = TransportConfig::default();
990        let now = Instant::now();
991
992        let mut path1 = PathData::new(remote1, false, None, now, &config);
993
994        // Set custom rate and consume some tokens
995        path1.set_observation_rate(20);
996        for _ in 0..5 {
997            path1.consume_observation_token(now);
998        }
999
1000        // Create new path from previous
1001        let path2 = PathData::from_previous(remote2, &path1, now);
1002
1003        // New path should have fresh rate limiter with same rate
1004        assert_eq!(path2.observation_rate_limiter.rate, 20.0);
1005        assert_eq!(path2.observation_rate_limiter.max_tokens, 20.0);
1006        assert_eq!(path2.observation_rate_limiter.tokens, 20.0); // Full tokens
1007    }
1008
1009    #[test]
1010    fn path_data_reset_preserves_rate() {
1011        let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
1012        let config = TransportConfig::default();
1013        let now = Instant::now();
1014
1015        let mut path = PathData::new(remote, false, None, now, &config);
1016
1017        // Set custom rate
1018        path.set_observation_rate(15);
1019
1020        // Consume some tokens
1021        for _ in 0..3 {
1022            path.consume_observation_token(now);
1023        }
1024
1025        // Reset the path
1026        path.reset(now, &config);
1027
1028        // Rate should be preserved, tokens should be reset
1029        assert_eq!(path.observation_rate_limiter.rate, 15.0);
1030        assert_eq!(path.observation_rate_limiter.tokens, 15.0); // Full tokens after reset
1031    }
1032}