1#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
9use std::{
10 cmp,
11 collections::VecDeque,
12 convert::TryFrom,
13 fmt, io, mem,
14 net::{IpAddr, SocketAddr},
15 sync::Arc,
16};
17
18use bytes::{Bytes, BytesMut};
19use frame::StreamMetaVec;
20use rand::{Rng, SeedableRng, rngs::StdRng};
23use thiserror::Error;
24use tracing::{debug, error, info, trace, trace_span, warn};
25
26use crate::{
27 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
28 MIN_INITIAL_SIZE, MtuDiscoveryConfig, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
29 TransportError, TransportErrorCode, VarInt, VarIntBoundsExceeded,
30 cid_generator::ConnectionIdGenerator,
31 cid_queue::CidQueue,
32 coding::BufMutExt,
33 config::{ServerConfig, TransportConfig},
34 crypto::{self, KeyPair, Keys, PacketKey},
35 endpoint::AddressDiscoveryStats,
36 frame::{self, Close, Datagram, FrameStruct, NewToken},
37 nat_traversal_api::PeerId,
38 packet::{
39 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
40 PacketNumber, PartialDecode, SpaceId,
41 },
42 range_set::ArrayRangeSet,
43 shared::{
44 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
45 EndpointEvent, EndpointEventInner,
46 },
47 token::ResetToken,
48 transport_parameters::TransportParameters,
49};
50
51fn allow_loopback_from_env() -> bool {
52 matches!(
53 std::env::var("ANT_QUIC_ALLOW_LOOPBACK")
54 .unwrap_or_default()
55 .trim()
56 .to_ascii_lowercase()
57 .as_str(),
58 "1" | "true" | "yes"
59 )
60}
61
62mod ack_frequency;
63use ack_frequency::AckFrequencyState;
64
65pub mod port_prediction;
66pub use self::port_prediction::{PortPredictor, PortPredictorConfig};
67
68pub(crate) mod nat_traversal;
69use nat_traversal::NatTraversalState;
70pub(crate) use nat_traversal::{CoordinationPhase, NatTraversalError};
72
73mod assembler;
74pub use assembler::Chunk;
75
76mod cid_state;
77use cid_state::CidState;
78
79mod datagrams;
80use datagrams::DatagramState;
81pub use datagrams::{Datagrams, SendDatagramError};
82
83mod mtud;
84
85mod pacing;
86
87mod packet_builder;
88use packet_builder::PacketBuilder;
89
90mod packet_crypto;
91use packet_crypto::{PrevCrypto, ZeroRttCrypto};
92
93mod paths;
94pub use paths::RttEstimator;
95use paths::{PathData, PathResponses};
96
97mod send_buffer;
98
99mod spaces;
100#[cfg(fuzzing)]
101pub use spaces::Retransmits;
102#[cfg(not(fuzzing))]
103use spaces::Retransmits;
104use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
105
106mod stats;
107pub use stats::{ConnectionStats, DatagramDropStats, FrameStats, PathStats, UdpStats};
108
109mod streams;
110#[cfg(fuzzing)]
111pub use streams::StreamsState;
112#[cfg(not(fuzzing))]
113use streams::StreamsState;
114pub use streams::{
115 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
116 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
117};
118
119mod timer;
120use crate::congestion::Controller;
121use timer::{Timer, TimerTable};
122
123pub struct Connection {
163 endpoint_config: Arc<EndpointConfig>,
164 config: Arc<TransportConfig>,
165 rng: StdRng,
166 crypto: Box<dyn crypto::Session>,
167 handshake_cid: ConnectionId,
169 rem_handshake_cid: ConnectionId,
171 local_ip: Option<IpAddr>,
174 path: PathData,
175 allow_mtud: bool,
177 prev_path: Option<(ConnectionId, PathData)>,
178 state: State,
179 side: ConnectionSide,
180 zero_rtt_enabled: bool,
182 zero_rtt_crypto: Option<ZeroRttCrypto>,
184 key_phase: bool,
185 key_phase_size: u64,
187 peer_params: TransportParameters,
189 orig_rem_cid: ConnectionId,
191 initial_dst_cid: ConnectionId,
193 retry_src_cid: Option<ConnectionId>,
196 lost_packets: u64,
198 events: VecDeque<Event>,
199 endpoint_events: VecDeque<EndpointEventInner>,
200 spin_enabled: bool,
202 spin: bool,
204 spaces: [PacketSpace; 3],
206 highest_space: SpaceId,
208 prev_crypto: Option<PrevCrypto>,
210 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
215 accepted_0rtt: bool,
216 permit_idle_reset: bool,
218 idle_timeout: Option<Duration>,
220 timers: TimerTable,
221 authentication_failures: u64,
223 error: Option<ConnectionError>,
225 packet_number_filter: PacketNumberFilter,
227
228 path_responses: PathResponses,
233 close: bool,
234
235 ack_frequency: AckFrequencyState,
239
240 pto_count: u32,
245
246 receiving_ecn: bool,
251 total_authed_packets: u64,
253 app_limited: bool,
256
257 streams: StreamsState,
258 rem_cids: CidQueue,
260 local_cid_state: CidState,
262 datagrams: DatagramState,
264 stats: ConnectionStats,
266 version: u32,
268
269 nat_traversal: Option<NatTraversalState>,
271
272 nat_traversal_frame_config: frame::nat_traversal_unified::NatTraversalFrameConfig,
274
275 address_discovery_state: Option<AddressDiscoveryState>,
277
278 pqc_state: PqcState,
280
281 #[cfg(feature = "trace")]
283 trace_context: crate::tracing::TraceContext,
284
285 #[cfg(feature = "trace")]
287 event_log: Arc<crate::tracing::EventLog>,
288
289 #[cfg(feature = "__qlog")]
291 qlog_streamer: Option<Box<dyn std::io::Write + Send + Sync>>,
292
293 peer_id_for_tokens: Option<PeerId>,
295 delay_new_token_until_binding: bool,
297}
298
299impl Connection {
300 pub(crate) fn new(
301 endpoint_config: Arc<EndpointConfig>,
302 config: Arc<TransportConfig>,
303 init_cid: ConnectionId,
304 loc_cid: ConnectionId,
305 rem_cid: ConnectionId,
306 remote: SocketAddr,
307 local_ip: Option<IpAddr>,
308 crypto: Box<dyn crypto::Session>,
309 cid_gen: &dyn ConnectionIdGenerator,
310 now: Instant,
311 version: u32,
312 allow_mtud: bool,
313 rng_seed: [u8; 32],
314 side_args: SideArgs,
315 ) -> Self {
316 let pref_addr_cid = side_args.pref_addr_cid();
317 let path_validated = side_args.path_validated();
318 let connection_side = ConnectionSide::from(side_args);
319 let side = connection_side.side();
320 let initial_space = PacketSpace {
321 crypto: Some(crypto.initial_keys(&init_cid, side)),
322 ..PacketSpace::new(now)
323 };
324 let state = State::Handshake(state::Handshake {
325 rem_cid_set: side.is_server(),
326 expected_token: Bytes::new(),
327 client_hello: None,
328 });
329 let mut rng = StdRng::from_seed(rng_seed);
330 let mut this = Self {
331 endpoint_config,
332 crypto,
333 handshake_cid: loc_cid,
334 rem_handshake_cid: rem_cid,
335 local_cid_state: CidState::new(
336 cid_gen.cid_len(),
337 cid_gen.cid_lifetime(),
338 now,
339 if pref_addr_cid.is_some() { 2 } else { 1 },
340 ),
341 path: PathData::new(remote, allow_mtud, None, now, &config),
342 allow_mtud,
343 local_ip,
344 prev_path: None,
345 state,
346 side: connection_side,
347 zero_rtt_enabled: false,
348 zero_rtt_crypto: None,
349 key_phase: false,
350 key_phase_size: rng.gen_range(10..1000),
357 peer_params: TransportParameters::default(),
358 orig_rem_cid: rem_cid,
359 initial_dst_cid: init_cid,
360 retry_src_cid: None,
361 lost_packets: 0,
362 events: VecDeque::new(),
363 endpoint_events: VecDeque::new(),
364 spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
365 spin: false,
366 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
367 highest_space: SpaceId::Initial,
368 prev_crypto: None,
369 next_crypto: None,
370 accepted_0rtt: false,
371 permit_idle_reset: true,
372 idle_timeout: match config.max_idle_timeout {
373 None | Some(VarInt(0)) => None,
374 Some(dur) => Some(Duration::from_millis(dur.0)),
375 },
376 timers: TimerTable::default(),
377 authentication_failures: 0,
378 error: None,
379 #[cfg(test)]
380 packet_number_filter: match config.deterministic_packet_numbers {
381 false => PacketNumberFilter::new(&mut rng),
382 true => PacketNumberFilter::disabled(),
383 },
384 #[cfg(not(test))]
385 packet_number_filter: PacketNumberFilter::new(&mut rng),
386
387 path_responses: PathResponses::default(),
388 close: false,
389
390 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
391 &TransportParameters::default(),
392 )),
393
394 pto_count: 0,
395
396 app_limited: false,
397 receiving_ecn: false,
398 total_authed_packets: 0,
399
400 streams: StreamsState::new(
401 side,
402 config.max_concurrent_uni_streams,
403 config.max_concurrent_bidi_streams,
404 config.send_window,
405 config.receive_window,
406 config.stream_receive_window,
407 ),
408 datagrams: DatagramState::default(),
409 config,
410 rem_cids: CidQueue::new(rem_cid),
411 rng,
412 stats: ConnectionStats::default(),
413 version,
414 nat_traversal: None, nat_traversal_frame_config:
416 frame::nat_traversal_unified::NatTraversalFrameConfig::default(),
417 address_discovery_state: {
418 Some(AddressDiscoveryState::new(
421 &crate::transport_parameters::AddressDiscoveryConfig::default(),
422 now,
423 ))
424 },
425 pqc_state: PqcState::new(),
426
427 #[cfg(feature = "trace")]
428 trace_context: crate::tracing::TraceContext::new(crate::tracing::TraceId::new()),
429
430 #[cfg(feature = "trace")]
431 event_log: crate::tracing::global_log(),
432
433 #[cfg(feature = "__qlog")]
434 qlog_streamer: None,
435
436 peer_id_for_tokens: None,
437 delay_new_token_until_binding: false,
438 };
439
440 #[cfg(feature = "trace")]
442 {
443 use crate::trace_event;
444 use crate::tracing::{Event, EventData, socket_addr_to_bytes, timestamp_now};
445 let _peer_id = {
447 let mut id = [0u8; 32];
448 let addr_bytes = match remote {
449 SocketAddr::V4(addr) => addr.ip().octets().to_vec(),
450 SocketAddr::V6(addr) => addr.ip().octets().to_vec(),
451 };
452 id[..addr_bytes.len().min(32)]
453 .copy_from_slice(&addr_bytes[..addr_bytes.len().min(32)]);
454 id
455 };
456
457 let (addr_bytes, addr_type) = socket_addr_to_bytes(remote);
458 trace_event!(
459 &this.event_log,
460 Event {
461 timestamp: timestamp_now(),
462 trace_id: this.trace_context.trace_id(),
463 sequence: 0,
464 _padding: 0,
465 node_id: [0u8; 32], event_data: EventData::ConnInit {
467 endpoint_bytes: addr_bytes,
468 addr_type,
469 _padding: [0u8; 45],
470 },
471 }
472 );
473 }
474
475 if path_validated {
476 this.on_path_validated();
477 }
478 if side.is_client() {
479 this.write_crypto();
481 this.init_0rtt();
482 }
483 this
484 }
485
486 #[cfg(feature = "__qlog")]
488 pub fn set_qlog(
489 &mut self,
490 writer: Box<dyn std::io::Write + Send + Sync>,
491 _title: Option<String>,
492 _description: Option<String>,
493 _now: Instant,
494 ) {
495 self.qlog_streamer = Some(writer);
496 }
497
498 #[cfg(feature = "__qlog")]
500 fn emit_qlog_recovery_metrics(&mut self, _now: Instant) {
501 }
504
505 #[must_use]
513 pub fn poll_timeout(&mut self) -> Option<Instant> {
514 let mut next_timeout = self.timers.next_timeout();
515
516 if let Some(nat_state) = &self.nat_traversal {
518 if let Some(nat_timeout) = nat_state.get_next_timeout(Instant::now()) {
519 self.timers.set(Timer::NatTraversal, nat_timeout);
521 next_timeout = Some(next_timeout.map_or(nat_timeout, |t| t.min(nat_timeout)));
522 }
523 }
524
525 next_timeout
526 }
527
528 #[must_use]
534 pub fn poll(&mut self) -> Option<Event> {
535 if let Some(x) = self.events.pop_front() {
536 return Some(x);
537 }
538
539 if let Some(event) = self.streams.poll() {
540 return Some(Event::Stream(event));
541 }
542
543 if let Some(err) = self.error.take() {
544 return Some(Event::ConnectionLost { reason: err });
545 }
546
547 None
548 }
549
550 #[must_use]
552 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
553 self.endpoint_events.pop_front().map(EndpointEvent)
554 }
555
556 #[must_use]
558 pub fn streams(&mut self) -> Streams<'_> {
559 Streams {
560 state: &mut self.streams,
561 conn_state: &self.state,
562 }
563 }
564
565 #[must_use]
569 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
570 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
571 RecvStream {
572 id,
573 state: &mut self.streams,
574 pending: &mut self.spaces[SpaceId::Data].pending,
575 }
576 }
577
578 #[must_use]
580 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
581 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
582 SendStream {
583 id,
584 state: &mut self.streams,
585 pending: &mut self.spaces[SpaceId::Data].pending,
586 conn_state: &self.state,
587 }
588 }
589
590 #[must_use]
600 pub fn poll_transmit(
601 &mut self,
602 now: Instant,
603 max_datagrams: usize,
604 buf: &mut Vec<u8>,
605 ) -> Option<Transmit> {
606 assert!(max_datagrams != 0);
607 let max_datagrams = match self.config.enable_segmentation_offload {
608 false => 1,
609 true => max_datagrams,
610 };
611
612 let mut num_datagrams = 0;
613 let mut datagram_start = 0;
616 let mut segment_size = usize::from(self.path.current_mtu());
617
618 if let Some(nat_traversal) = &mut self.nat_traversal {
620 if nat_traversal.check_coordination_timeout(now) {
621 trace!("NAT traversal coordination timed out, may retry");
622 }
623 let expired = nat_traversal.check_validation_timeouts(now);
625 if !expired.is_empty() {
626 debug!(
627 "Cleaned up {} expired NAT traversal validations",
628 expired.len()
629 );
630 }
631 }
632
633 self.check_for_address_observations(now);
635
636 if let Some(challenge) = self.send_nat_traversal_challenge(now, buf) {
638 return Some(challenge);
639 }
640
641 if let Some(challenge) = self.send_path_challenge(now, buf) {
642 return Some(challenge);
643 }
644
645 for space in SpaceId::iter() {
647 let request_immediate_ack =
648 space == SpaceId::Data && self.peer_supports_ack_frequency();
649 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
650 }
651
652 let close = match self.state {
654 State::Drained => {
655 self.app_limited = true;
656 return None;
657 }
658 State::Draining | State::Closed(_) => {
659 if !self.close {
662 self.app_limited = true;
663 return None;
664 }
665 true
666 }
667 _ => false,
668 };
669
670 if let Some(config) = &self.config.ack_frequency_config {
672 self.spaces[SpaceId::Data].pending.ack_frequency = self
673 .ack_frequency
674 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
675 && self.highest_space == SpaceId::Data
676 && self.peer_supports_ack_frequency();
677 }
678
679 let mut buf_capacity = 0;
683
684 let mut coalesce = true;
685 let mut builder_storage: Option<PacketBuilder> = None;
686 let mut sent_frames = None;
687 let mut pad_datagram = false;
688 let mut pad_datagram_to_mtu = false;
689 let mut congestion_blocked = false;
690
691 let mut space_idx = 0;
693 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
694 while space_idx < spaces.len() {
697 let space_id = spaces[space_idx];
698 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
705 let frame_space_1rtt =
706 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
707
708 let can_send = self.space_can_send(space_id, frame_space_1rtt);
710 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
711 space_idx += 1;
712 continue;
713 }
714
715 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
716 || self.spaces[space_id].ping_pending
717 || self.spaces[space_id].immediate_ack_pending;
718 if space_id == SpaceId::Data {
719 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
720 }
721
722 pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
723
724 let buf_end = if let Some(builder) = &builder_storage {
728 buf.len().max(builder.min_size) + builder.tag_len
729 } else {
730 buf.len()
731 };
732
733 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
734 crypto.packet.local.tag_len()
735 } else if space_id == SpaceId::Data {
736 match self.zero_rtt_crypto.as_ref() {
737 Some(crypto) => crypto.packet.tag_len(),
738 None => {
739 error!(
741 "sending packets in the application data space requires known 0-RTT or 1-RTT keys"
742 );
743 return None;
744 }
745 }
746 } else {
747 unreachable!("tried to send {:?} packet without keys", space_id)
748 };
749 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
750 if num_datagrams >= max_datagrams {
754 break;
756 }
757
758 if self
765 .path
766 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
767 {
768 trace!("blocked by anti-amplification");
769 break;
770 }
771
772 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
775 let untracked_bytes = if let Some(builder) = &builder_storage {
777 buf_capacity - builder.partial_encode.start
778 } else {
779 0
780 } as u64;
781 debug_assert!(untracked_bytes <= segment_size as u64);
782
783 let bytes_to_send = segment_size as u64 + untracked_bytes;
784 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
785 space_idx += 1;
786 congestion_blocked = true;
787 trace!("blocked by congestion control");
790 continue;
791 }
792
793 let smoothed_rtt = self.path.rtt.get();
795 if let Some(delay) = self.path.pacing.delay(
796 smoothed_rtt,
797 bytes_to_send,
798 self.path.current_mtu(),
799 self.path.congestion.window(),
800 now,
801 ) {
802 self.timers.set(Timer::Pacing, delay);
803 congestion_blocked = true;
804 trace!("blocked by pacing");
807 break;
808 }
809 }
810
811 if let Some(mut builder) = builder_storage.take() {
813 if pad_datagram {
814 let min_size = self.pqc_state.min_initial_size();
815 builder.pad_to(min_size);
816 }
817
818 if num_datagrams > 1 || pad_datagram_to_mtu {
819 const MAX_PADDING: usize = 16;
832 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
833 - datagram_start
834 + builder.tag_len;
835 if (packet_len_unpadded + MAX_PADDING < segment_size
836 && !pad_datagram_to_mtu)
837 || datagram_start + segment_size > buf_capacity
838 {
839 trace!(
840 "GSO truncated by demand for {} padding bytes or loss probe",
841 segment_size - packet_len_unpadded
842 );
843 builder_storage = Some(builder);
844 break;
845 }
846
847 builder.pad_to(segment_size as u16);
850 }
851
852 builder.finish_and_track(now, self, sent_frames.take(), buf);
853
854 if num_datagrams == 1 {
855 segment_size = buf.len();
862 buf_capacity = buf.len();
865
866 if space_id == SpaceId::Data {
873 let frame_space_1rtt =
874 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
875 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
876 break;
877 }
878 }
879 }
880 }
881
882 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
884 0 => segment_size,
885 _ => {
886 self.spaces[space_id].loss_probes -= 1;
887 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
891 }
892 };
893 buf_capacity += next_datagram_size_limit;
894 if buf.capacity() < buf_capacity {
895 buf.reserve(max_datagrams * segment_size);
904 }
905 num_datagrams += 1;
906 coalesce = true;
907 pad_datagram = false;
908 datagram_start = buf.len();
909
910 debug_assert_eq!(
911 datagram_start % segment_size,
912 0,
913 "datagrams in a GSO batch must be aligned to the segment size"
914 );
915 } else {
916 if let Some(builder) = builder_storage.take() {
920 builder.finish_and_track(now, self, sent_frames.take(), buf);
921 }
922 }
923
924 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
925
926 if self.spaces[SpaceId::Initial].crypto.is_some()
931 && space_id == SpaceId::Handshake
932 && self.side.is_client()
933 {
934 self.discard_space(now, SpaceId::Initial);
937 }
938 if let Some(ref mut prev) = self.prev_crypto {
939 prev.update_unacked = false;
940 }
941
942 debug_assert!(
943 builder_storage.is_none() && sent_frames.is_none(),
944 "Previous packet must have been finished"
945 );
946
947 let builder = builder_storage.insert(PacketBuilder::new(
948 now,
949 space_id,
950 self.rem_cids.active(),
951 buf,
952 buf_capacity,
953 datagram_start,
954 ack_eliciting,
955 self,
956 )?);
957 coalesce = coalesce && !builder.short_header;
958
959 let should_adjust_coalescing = self
961 .pqc_state
962 .should_adjust_coalescing(buf.len() - datagram_start, space_id);
963
964 if should_adjust_coalescing {
965 coalesce = false;
966 trace!("Disabling coalescing for PQC handshake in {:?}", space_id);
967 }
968
969 pad_datagram |=
971 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
972
973 if close {
974 trace!("sending CONNECTION_CLOSE");
975 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
980 if Self::populate_acks(
981 now,
982 self.receiving_ecn,
983 &mut SentFrames::default(),
984 &mut self.spaces[space_id],
985 buf,
986 &mut self.stats,
987 )
988 .is_err()
989 {
990 self.handle_encode_error(now, "ACK (close)");
991 return None;
992 }
993 }
994
995 debug_assert!(
999 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
1000 "ACKs should leave space for ConnectionClose"
1001 );
1002 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
1003 let max_frame_size = builder.max_size - buf.len();
1004 match self.state {
1005 State::Closed(state::Closed { ref reason }) => {
1006 let result = if space_id == SpaceId::Data || reason.is_transport_layer()
1007 {
1008 reason.try_encode(buf, max_frame_size)
1009 } else {
1010 frame::ConnectionClose {
1011 error_code: TransportErrorCode::APPLICATION_ERROR,
1012 frame_type: None,
1013 reason: Bytes::new(),
1014 }
1015 .try_encode(buf, max_frame_size)
1016 };
1017 if result.is_err() {
1018 self.handle_encode_error(now, "ConnectionClose");
1019 return None;
1020 }
1021 }
1022 State::Draining => {
1023 if (frame::ConnectionClose {
1024 error_code: TransportErrorCode::NO_ERROR,
1025 frame_type: None,
1026 reason: Bytes::new(),
1027 })
1028 .try_encode(buf, max_frame_size)
1029 .is_err()
1030 {
1031 self.handle_encode_error(now, "ConnectionClose (draining)");
1032 return None;
1033 }
1034 }
1035 _ => unreachable!(
1036 "tried to make a close packet when the connection wasn't closed"
1037 ),
1038 }
1039 }
1040 if space_id == self.highest_space {
1041 self.close = false;
1043 break;
1045 } else {
1046 space_idx += 1;
1050 continue;
1051 }
1052 }
1053
1054 if space_id == SpaceId::Data && num_datagrams == 1 {
1057 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
1058 let mut builder = builder_storage.take().unwrap();
1061 trace!("PATH_RESPONSE {:08x} (off-path)", token);
1062 if !self.encode_or_close(
1063 now,
1064 frame::FrameType::PATH_RESPONSE.try_encode(buf),
1065 "PATH_RESPONSE (off-path)",
1066 ) {
1067 return None;
1068 }
1069 buf.write(token);
1070 self.stats.frame_tx.path_response += 1;
1071 let min_size = self.pqc_state.min_initial_size();
1072 builder.pad_to(min_size);
1073 builder.finish_and_track(
1074 now,
1075 self,
1076 Some(SentFrames {
1077 non_retransmits: true,
1078 ..SentFrames::default()
1079 }),
1080 buf,
1081 );
1082 self.stats.udp_tx.on_sent(1, buf.len());
1083
1084 #[cfg(feature = "trace")]
1086 {
1087 use crate::trace_packet_sent;
1088 trace_packet_sent!(
1090 &self.event_log,
1091 self.trace_context.trace_id(),
1092 buf.len() as u32,
1093 0 );
1095 }
1096
1097 return Some(Transmit {
1098 destination: remote,
1099 size: buf.len(),
1100 ecn: None,
1101 segment_size: None,
1102 src_ip: self.local_ip,
1103 });
1104 }
1105 }
1106
1107 if space_id == SpaceId::Data && self.address_discovery_state.is_some() {
1109 let peer_supports = self.peer_params.address_discovery.is_some();
1110
1111 if let Some(state) = &mut self.address_discovery_state {
1112 if peer_supports {
1113 if let Some(frame) = state.queue_observed_address_frame(0, self.path.remote)
1114 {
1115 self.spaces[space_id]
1116 .pending
1117 .outbound_observations
1118 .push(frame);
1119 }
1120 }
1121 }
1122 }
1123
1124 let sent =
1125 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
1126
1127 debug_assert!(
1134 !(sent.is_ack_only(&self.streams)
1135 && !can_send.acks
1136 && can_send.other
1137 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
1138 && self.datagrams.outgoing.is_empty()),
1139 "SendableFrames was {can_send:?}, but only ACKs have been written"
1140 );
1141 pad_datagram |= sent.requires_padding;
1142
1143 if sent.largest_acked.is_some() {
1144 self.spaces[space_id].pending_acks.acks_sent();
1145 self.timers.stop(Timer::MaxAckDelay);
1146 }
1147
1148 sent_frames = Some(sent);
1150
1151 }
1154
1155 if let Some(mut builder) = builder_storage {
1157 if pad_datagram {
1158 let min_size = self.pqc_state.min_initial_size();
1159 builder.pad_to(min_size);
1160 }
1161
1162 if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
1168 builder.pad_to(segment_size as u16);
1169 }
1170
1171 let last_packet_number = builder.exact_number;
1172 builder.finish_and_track(now, self, sent_frames, buf);
1173 self.path
1174 .congestion
1175 .on_sent(now, buf.len() as u64, last_packet_number);
1176
1177 #[cfg(feature = "__qlog")]
1178 self.emit_qlog_recovery_metrics(now);
1179 }
1180
1181 self.app_limited = buf.is_empty() && !congestion_blocked;
1182
1183 if buf.is_empty() && self.state.is_established() {
1185 let space_id = SpaceId::Data;
1186 let probe_size = self
1187 .path
1188 .mtud
1189 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
1190
1191 let buf_capacity = probe_size as usize;
1192 buf.reserve(buf_capacity);
1193
1194 let mut builder = PacketBuilder::new(
1195 now,
1196 space_id,
1197 self.rem_cids.active(),
1198 buf,
1199 buf_capacity,
1200 0,
1201 true,
1202 self,
1203 )?;
1204
1205 if !self.encode_or_close(now, frame::FrameType::PING.try_encode(buf), "PING (MTU)") {
1207 return None;
1208 }
1209 self.stats.frame_tx.ping += 1;
1210
1211 if self.peer_supports_ack_frequency() {
1213 if !self.encode_or_close(
1214 now,
1215 frame::FrameType::IMMEDIATE_ACK.try_encode(buf),
1216 "IMMEDIATE_ACK (MTU)",
1217 ) {
1218 return None;
1219 }
1220 self.stats.frame_tx.immediate_ack += 1;
1221 }
1222
1223 builder.pad_to(probe_size);
1224 let sent_frames = SentFrames {
1225 non_retransmits: true,
1226 ..Default::default()
1227 };
1228 builder.finish_and_track(now, self, Some(sent_frames), buf);
1229
1230 self.stats.path.sent_plpmtud_probes += 1;
1231 num_datagrams = 1;
1232
1233 trace!(?probe_size, "writing MTUD probe");
1234 }
1235
1236 if buf.is_empty() {
1237 return None;
1238 }
1239
1240 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1241 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1242
1243 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
1244
1245 #[cfg(feature = "trace")]
1247 {
1248 use crate::trace_packet_sent;
1249 let packet_num = self.spaces[SpaceId::Data]
1252 .next_packet_number
1253 .saturating_sub(1);
1254 trace_packet_sent!(
1255 &self.event_log,
1256 self.trace_context.trace_id(),
1257 buf.len() as u32,
1258 packet_num
1259 );
1260 }
1261
1262 Some(Transmit {
1263 destination: self.path.remote,
1264 size: buf.len(),
1265 ecn: if self.path.sending_ecn {
1266 Some(EcnCodepoint::Ect0)
1267 } else {
1268 None
1269 },
1270 segment_size: match num_datagrams {
1271 1 => None,
1272 _ => Some(segment_size),
1273 },
1274 src_ip: self.local_ip,
1275 })
1276 }
1277
1278 fn send_coordination_request(&mut self, _now: Instant, _buf: &mut Vec<u8>) -> Option<Transmit> {
1280 let nat = self.nat_traversal.as_mut()?;
1282 if !nat.should_send_punch_request() {
1283 return None;
1284 }
1285
1286 let coord = nat.coordination.as_ref()?;
1287 let round = coord.round;
1288 if coord.punch_targets.is_empty() {
1289 return None;
1290 }
1291
1292 trace!(
1293 "queuing PUNCH_ME_NOW round {} with {} targets",
1294 round,
1295 coord.punch_targets.len()
1296 );
1297
1298 for target in &coord.punch_targets {
1300 let punch = frame::PunchMeNow {
1301 round,
1302 paired_with_sequence_number: target.remote_sequence,
1303 address: target.remote_addr,
1304 target_peer_id: None,
1305 };
1306 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1307 }
1308
1309 nat.mark_punch_request_sent();
1311
1312 None
1314 }
1315
1316 fn send_coordinated_path_challenge(
1318 &mut self,
1319 now: Instant,
1320 buf: &mut Vec<u8>,
1321 ) -> Option<Transmit> {
1322 if let Some(nat_traversal) = &mut self.nat_traversal {
1324 if nat_traversal.should_start_punching(now) {
1325 nat_traversal.start_punching_phase(now);
1326 }
1327 }
1328
1329 let (target_addr, challenge) = {
1331 let nat_traversal = self.nat_traversal.as_ref()?;
1332 match nat_traversal.get_coordination_phase() {
1333 Some(CoordinationPhase::Punching) => {
1334 let targets = nat_traversal.get_punch_targets_from_coordination()?;
1335 if targets.is_empty() {
1336 return None;
1337 }
1338 let target = &targets[0];
1340 (target.remote_addr, target.challenge)
1341 }
1342 _ => return None,
1343 }
1344 };
1345
1346 debug_assert_eq!(
1347 self.highest_space,
1348 SpaceId::Data,
1349 "PATH_CHALLENGE queued without 1-RTT keys"
1350 );
1351
1352 buf.reserve(self.pqc_state.min_initial_size() as usize);
1353 let buf_capacity = buf.capacity();
1354
1355 let mut builder = PacketBuilder::new(
1356 now,
1357 SpaceId::Data,
1358 self.rem_cids.active(),
1359 buf,
1360 buf_capacity,
1361 0,
1362 false,
1363 self,
1364 )?;
1365
1366 trace!(
1367 "sending coordinated PATH_CHALLENGE {:08x} to {}",
1368 challenge, target_addr
1369 );
1370 if !self.encode_or_close(
1371 now,
1372 frame::FrameType::PATH_CHALLENGE.try_encode(buf),
1373 "PATH_CHALLENGE (coordination)",
1374 ) {
1375 return None;
1376 }
1377 buf.write(challenge);
1378 self.stats.frame_tx.path_challenge += 1;
1379
1380 let min_size = self.pqc_state.min_initial_size();
1381 builder.pad_to(min_size);
1382 builder.finish_and_track(now, self, None, buf);
1383
1384 if let Some(nat_traversal) = &mut self.nat_traversal {
1386 nat_traversal.mark_coordination_validating();
1387 }
1388
1389 Some(Transmit {
1390 destination: target_addr,
1391 size: buf.len(),
1392 ecn: if self.path.sending_ecn {
1393 Some(EcnCodepoint::Ect0)
1394 } else {
1395 None
1396 },
1397 segment_size: None,
1398 src_ip: self.local_ip,
1399 })
1400 }
1401
1402 fn send_nat_traversal_challenge(
1404 &mut self,
1405 now: Instant,
1406 buf: &mut Vec<u8>,
1407 ) -> Option<Transmit> {
1408 if let Some(request) = self.send_coordination_request(now, buf) {
1410 return Some(request);
1411 }
1412
1413 if let Some(punch) = self.send_coordinated_path_challenge(now, buf) {
1415 return Some(punch);
1416 }
1417
1418 let (remote_addr, remote_sequence) = {
1420 let nat_traversal = self.nat_traversal.as_ref()?;
1421 let candidates = nat_traversal.get_validation_candidates();
1422 if candidates.is_empty() {
1423 return None;
1424 }
1425 let (sequence, candidate) = candidates[0];
1427 (candidate.address, sequence)
1428 };
1429
1430 let challenge = self.rng.r#gen::<u64>();
1431
1432 if let Err(e) =
1434 self.nat_traversal
1435 .as_mut()?
1436 .start_validation(remote_sequence, challenge, now)
1437 {
1438 warn!("Failed to start NAT traversal validation: {}", e);
1439 return None;
1440 }
1441
1442 debug_assert_eq!(
1443 self.highest_space,
1444 SpaceId::Data,
1445 "PATH_CHALLENGE queued without 1-RTT keys"
1446 );
1447
1448 buf.reserve(self.pqc_state.min_initial_size() as usize);
1449 let buf_capacity = buf.capacity();
1450
1451 let mut builder = PacketBuilder::new(
1453 now,
1454 SpaceId::Data,
1455 self.rem_cids.active(),
1456 buf,
1457 buf_capacity,
1458 0,
1459 false,
1460 self,
1461 )?;
1462
1463 trace!(
1464 "sending PATH_CHALLENGE {:08x} to NAT candidate {}",
1465 challenge, remote_addr
1466 );
1467 if !self.encode_or_close(
1468 now,
1469 frame::FrameType::PATH_CHALLENGE.try_encode(buf),
1470 "PATH_CHALLENGE (nat)",
1471 ) {
1472 return None;
1473 }
1474 buf.write(challenge);
1475 self.stats.frame_tx.path_challenge += 1;
1476
1477 let min_size = self.pqc_state.min_initial_size();
1479 builder.pad_to(min_size);
1480
1481 builder.finish_and_track(now, self, None, buf);
1482
1483 Some(Transmit {
1484 destination: remote_addr,
1485 size: buf.len(),
1486 ecn: if self.path.sending_ecn {
1487 Some(EcnCodepoint::Ect0)
1488 } else {
1489 None
1490 },
1491 segment_size: None,
1492 src_ip: self.local_ip,
1493 })
1494 }
1495
1496 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1498 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1499 if !prev_path.challenge_pending {
1500 return None;
1501 }
1502 prev_path.challenge_pending = false;
1503 let token = prev_path
1504 .challenge
1505 .expect("previous path challenge pending without token");
1506 let destination = prev_path.remote;
1507 debug_assert_eq!(
1508 self.highest_space,
1509 SpaceId::Data,
1510 "PATH_CHALLENGE queued without 1-RTT keys"
1511 );
1512 buf.reserve(self.pqc_state.min_initial_size() as usize);
1513
1514 let buf_capacity = buf.capacity();
1515
1516 let mut builder = PacketBuilder::new(
1522 now,
1523 SpaceId::Data,
1524 *prev_cid,
1525 buf,
1526 buf_capacity,
1527 0,
1528 false,
1529 self,
1530 )?;
1531 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1532 if !self.encode_or_close(
1533 now,
1534 frame::FrameType::PATH_CHALLENGE.try_encode(buf),
1535 "PATH_CHALLENGE (prev path)",
1536 ) {
1537 return None;
1538 }
1539 buf.write(token);
1540 self.stats.frame_tx.path_challenge += 1;
1541
1542 let min_size = self.pqc_state.min_initial_size();
1547 builder.pad_to(min_size);
1548
1549 builder.finish(self, buf);
1550 self.stats.udp_tx.on_sent(1, buf.len());
1551
1552 Some(Transmit {
1553 destination,
1554 size: buf.len(),
1555 ecn: None,
1556 segment_size: None,
1557 src_ip: self.local_ip,
1558 })
1559 }
1560
1561 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1563 if self.spaces[space_id].crypto.is_none()
1564 && (space_id != SpaceId::Data
1565 || self.zero_rtt_crypto.is_none()
1566 || self.side.is_server())
1567 {
1568 return SendableFrames::empty();
1570 }
1571 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1572 if space_id == SpaceId::Data {
1573 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1574 }
1575 can_send
1576 }
1577
1578 pub fn handle_event(&mut self, event: ConnectionEvent) {
1584 use ConnectionEventInner::*;
1585 match event.0 {
1586 Datagram(DatagramConnectionEvent {
1587 now,
1588 remote,
1589 ecn,
1590 first_decode,
1591 remaining,
1592 }) => {
1593 if remote != self.path.remote && !self.side.remote_may_migrate() {
1597 trace!("discarding packet from unrecognized peer {}", remote);
1598 return;
1599 }
1600
1601 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1602
1603 self.stats.udp_rx.datagrams += 1;
1604 self.stats.udp_rx.bytes += first_decode.len() as u64;
1605 let data_len = first_decode.len();
1606
1607 self.handle_decode(now, remote, ecn, first_decode);
1608 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1613
1614 if let Some(data) = remaining {
1615 self.stats.udp_rx.bytes += data.len() as u64;
1616 self.handle_coalesced(now, remote, ecn, data);
1617 }
1618
1619 #[cfg(feature = "__qlog")]
1620 self.emit_qlog_recovery_metrics(now);
1621
1622 if was_anti_amplification_blocked {
1623 self.set_loss_detection_timer(now);
1627 }
1628 }
1629 NewIdentifiers(ids, now) => {
1630 self.local_cid_state.new_cids(&ids, now);
1631 ids.into_iter().rev().for_each(|frame| {
1632 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1633 });
1634 if self.timers.get(Timer::PushNewCid).is_none_or(|x| x <= now) {
1636 self.reset_cid_retirement();
1637 }
1638 }
1639 QueueAddAddress(add) => {
1640 self.spaces[SpaceId::Data].pending.add_addresses.push(add);
1642 }
1643 QueuePunchMeNow(punch) => {
1644 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1646 }
1647 }
1648 }
1649
1650 pub fn handle_timeout(&mut self, now: Instant) {
1660 for &timer in &Timer::VALUES {
1661 if !self.timers.is_expired(timer, now) {
1662 continue;
1663 }
1664 self.timers.stop(timer);
1665 trace!(timer = ?timer, "timeout");
1666 match timer {
1667 Timer::Close => {
1668 self.state = State::Drained;
1669 self.endpoint_events.push_back(EndpointEventInner::Drained);
1670 }
1671 Timer::Idle => {
1672 self.kill(ConnectionError::TimedOut);
1673 }
1674 Timer::KeepAlive => {
1675 trace!("sending keep-alive");
1676 self.ping();
1677 }
1678 Timer::LossDetection => {
1679 self.on_loss_detection_timeout(now);
1680
1681 #[cfg(feature = "__qlog")]
1682 self.emit_qlog_recovery_metrics(now);
1683 }
1684 Timer::KeyDiscard => {
1685 self.zero_rtt_crypto = None;
1686 self.prev_crypto = None;
1687 }
1688 Timer::PathValidation => {
1689 debug!("path validation failed");
1690 if let Some((_, prev)) = self.prev_path.take() {
1691 self.path = prev;
1692 }
1693 self.path.challenge = None;
1694 self.path.challenge_pending = false;
1695 }
1696 Timer::Pacing => trace!("pacing timer expired"),
1697 Timer::NatTraversal => {
1698 self.handle_nat_traversal_timeout(now);
1699 }
1700 Timer::PushNewCid => {
1701 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1703 if !self.state.is_closed() {
1704 trace!(
1705 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1706 self.local_cid_state.retire_prior_to()
1707 );
1708 self.endpoint_events
1709 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1710 }
1711 }
1712 Timer::MaxAckDelay => {
1713 trace!("max ack delay reached");
1714 self.spaces[SpaceId::Data]
1716 .pending_acks
1717 .on_max_ack_delay_timeout()
1718 }
1719 }
1720 }
1721 }
1722
1723 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1735 self.close_inner(
1736 now,
1737 Close::Application(frame::ApplicationClose { error_code, reason }),
1738 )
1739 }
1740
1741 fn close_inner(&mut self, now: Instant, reason: Close) {
1742 let was_closed = self.state.is_closed();
1743 if !was_closed {
1744 self.close_common();
1745 self.set_close_timer(now);
1746 self.close = true;
1747 self.state = State::Closed(state::Closed { reason });
1748 }
1749 }
1750
1751 pub fn datagrams(&mut self) -> Datagrams<'_> {
1753 Datagrams { conn: self }
1754 }
1755
1756 pub fn stats(&self) -> ConnectionStats {
1758 let mut stats = self.stats;
1759 stats.path.rtt = self.path.rtt.get();
1760 stats.path.cwnd = self.path.congestion.window();
1761 stats.path.current_mtu = self.path.mtud.current_mtu();
1762
1763 stats
1764 }
1765
1766 pub fn nat_traversal_uses_rfc_frame_format(&self) -> bool {
1768 self.nat_traversal_frame_config.use_rfc_format
1769 }
1770
1771 pub fn nat_traversal_accepts_legacy_frame_format(&self) -> bool {
1773 self.nat_traversal_frame_config.accept_legacy
1774 }
1775
1776 pub fn set_token_binding_peer_id(&mut self, pid: PeerId) {
1778 self.peer_id_for_tokens = Some(pid);
1779 }
1780
1781 pub fn set_delay_new_token_until_binding(&mut self, v: bool) {
1783 self.delay_new_token_until_binding = v;
1784 }
1785
1786 pub fn ping(&mut self) {
1790 self.spaces[self.highest_space].ping_pending = true;
1791 }
1792
1793 pub(crate) fn is_pqc(&self) -> bool {
1795 self.pqc_state.using_pqc
1796 }
1797
1798 pub fn force_key_update(&mut self) {
1802 if !self.state.is_established() {
1803 debug!("ignoring forced key update in illegal state");
1804 return;
1805 }
1806 if self.prev_crypto.is_some() {
1807 debug!("ignoring redundant forced key update");
1810 return;
1811 }
1812 self.update_keys(None, false);
1813 }
1814
1815 pub fn crypto_session(&self) -> &dyn crypto::Session {
1817 &*self.crypto
1818 }
1819
1820 pub fn is_handshaking(&self) -> bool {
1825 self.state.is_handshake()
1826 }
1827
1828 pub fn is_closed(&self) -> bool {
1836 self.state.is_closed()
1837 }
1838
1839 pub fn is_drained(&self) -> bool {
1844 self.state.is_drained()
1845 }
1846
1847 pub fn accepted_0rtt(&self) -> bool {
1851 self.accepted_0rtt
1852 }
1853
1854 pub fn has_0rtt(&self) -> bool {
1856 self.zero_rtt_enabled
1857 }
1858
1859 pub fn has_pending_retransmits(&self) -> bool {
1861 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1862 }
1863
1864 pub fn side(&self) -> Side {
1866 self.side.side()
1867 }
1868
1869 pub fn remote_address(&self) -> SocketAddr {
1871 self.path.remote
1872 }
1873
1874 pub fn local_ip(&self) -> Option<IpAddr> {
1884 self.local_ip
1885 }
1886
1887 pub fn rtt(&self) -> Duration {
1889 self.path.rtt.get()
1890 }
1891
1892 pub fn congestion_state(&self) -> &dyn Controller {
1894 self.path.congestion.as_ref()
1895 }
1896
1897 pub fn path_changed(&mut self, now: Instant) {
1908 self.path.reset(now, &self.config);
1909 }
1910
1911 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1916 self.streams.set_max_concurrent(dir, count);
1917 let pending = &mut self.spaces[SpaceId::Data].pending;
1920 self.streams.queue_max_stream_id(pending);
1921 }
1922
1923 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1929 self.streams.max_concurrent(dir)
1930 }
1931
1932 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1934 if self.streams.set_receive_window(receive_window) {
1935 self.spaces[SpaceId::Data].pending.max_data = true;
1936 }
1937 }
1938
1939 pub fn set_address_discovery_enabled(&mut self, enabled: bool) {
1941 if let Some(ref mut state) = self.address_discovery_state {
1942 state.enabled = enabled;
1943 }
1944 }
1945
1946 pub fn address_discovery_enabled(&self) -> bool {
1948 self.address_discovery_state
1949 .as_ref()
1950 .is_some_and(|state| state.enabled)
1951 }
1952
1953 pub fn observed_address(&self) -> Option<SocketAddr> {
1958 self.address_discovery_state
1959 .as_ref()
1960 .and_then(|state| state.get_observed_address(0)) }
1962
1963 pub fn all_observed_addresses(&self) -> Vec<SocketAddr> {
1968 self.address_discovery_state
1969 .as_ref()
1970 .map(|state| state.get_all_received_history())
1971 .unwrap_or_default()
1972 }
1973
1974 #[allow(dead_code)]
1976 pub(crate) fn address_discovery_state(&self) -> Option<&AddressDiscoveryState> {
1977 self.address_discovery_state.as_ref()
1978 }
1979
1980 fn on_ack_received(
1981 &mut self,
1982 now: Instant,
1983 space: SpaceId,
1984 ack: frame::Ack,
1985 ) -> Result<(), TransportError> {
1986 if ack.largest >= self.spaces[space].next_packet_number {
1987 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1988 }
1989 let new_largest = {
1990 let space = &mut self.spaces[space];
1991 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
1992 space.largest_acked_packet = Some(ack.largest);
1993 if let Some(info) = space.sent_packets.get(&ack.largest) {
1994 space.largest_acked_packet_sent = info.time_sent;
1998 }
1999 true
2000 } else {
2001 false
2002 }
2003 };
2004
2005 let mut newly_acked = ArrayRangeSet::new();
2007 for range in ack.iter() {
2008 self.packet_number_filter.check_ack(space, range.clone())?;
2009 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
2010 newly_acked.insert_one(pn);
2011 }
2012 }
2013
2014 if newly_acked.is_empty() {
2015 return Ok(());
2016 }
2017
2018 let mut ack_eliciting_acked = false;
2019 for packet in newly_acked.elts() {
2020 if let Some(info) = self.spaces[space].take(packet) {
2021 if let Some(acked) = info.largest_acked {
2022 self.spaces[space].pending_acks.subtract_below(acked);
2028 }
2029 ack_eliciting_acked |= info.ack_eliciting;
2030
2031 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
2033 if mtu_updated {
2034 self.path
2035 .congestion
2036 .on_mtu_update(self.path.mtud.current_mtu());
2037 }
2038
2039 self.ack_frequency.on_acked(packet);
2041
2042 self.on_packet_acked(now, packet, info);
2043 }
2044 }
2045
2046 self.path.congestion.on_end_acks(
2047 now,
2048 self.path.in_flight.bytes,
2049 self.app_limited,
2050 self.spaces[space].largest_acked_packet,
2051 );
2052
2053 if new_largest && ack_eliciting_acked {
2054 let ack_delay = if space != SpaceId::Data {
2055 Duration::from_micros(0)
2056 } else {
2057 cmp::min(
2058 self.ack_frequency.peer_max_ack_delay,
2059 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2060 )
2061 };
2062 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
2063 self.path.rtt.update(ack_delay, rtt);
2064 if self.path.first_packet_after_rtt_sample.is_none() {
2065 self.path.first_packet_after_rtt_sample =
2066 Some((space, self.spaces[space].next_packet_number));
2067 }
2068 }
2069
2070 self.detect_lost_packets(now, space, true);
2072
2073 if self.peer_completed_address_validation() {
2074 self.pto_count = 0;
2075 }
2076
2077 if self.path.sending_ecn {
2079 if let Some(ecn) = ack.ecn {
2080 if new_largest {
2085 let sent = self.spaces[space].largest_acked_packet_sent;
2086 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
2087 }
2088 } else {
2089 debug!("ECN not acknowledged by peer");
2091 self.path.sending_ecn = false;
2092 }
2093 }
2094
2095 self.set_loss_detection_timer(now);
2096 Ok(())
2097 }
2098
2099 fn process_ecn(
2101 &mut self,
2102 now: Instant,
2103 space: SpaceId,
2104 newly_acked: u64,
2105 ecn: frame::EcnCounts,
2106 largest_sent_time: Instant,
2107 ) {
2108 match self.spaces[space].detect_ecn(newly_acked, ecn) {
2109 Err(e) => {
2110 debug!("halting ECN due to verification failure: {}", e);
2111 self.path.sending_ecn = false;
2112 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
2115 }
2116 Ok(false) => {}
2117 Ok(true) => {
2118 self.stats.path.congestion_events += 1;
2119 self.path
2120 .congestion
2121 .on_congestion_event(now, largest_sent_time, false, 0);
2122 }
2123 }
2124 }
2125
2126 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
2129 self.remove_in_flight(pn, &info);
2130 if info.ack_eliciting && self.path.challenge.is_none() {
2131 self.path.congestion.on_ack(
2134 now,
2135 info.time_sent,
2136 info.size.into(),
2137 self.app_limited,
2138 &self.path.rtt,
2139 );
2140 }
2141
2142 if let Some(retransmits) = info.retransmits.get() {
2144 for (id, _) in retransmits.reset_stream.iter() {
2145 self.streams.reset_acked(*id);
2146 }
2147 }
2148
2149 for frame in info.stream_frames {
2150 self.streams.received_ack_of(frame);
2151 }
2152 }
2153
2154 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2155 let start = if self.zero_rtt_crypto.is_some() {
2156 now
2157 } else {
2158 self.prev_crypto
2159 .as_ref()
2160 .expect("no previous keys")
2161 .end_packet
2162 .as_ref()
2163 .expect("update not acknowledged yet")
2164 .1
2165 };
2166 self.timers
2167 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
2168 }
2169
2170 fn on_loss_detection_timeout(&mut self, now: Instant) {
2171 if let Some((_, pn_space)) = self.loss_time_and_space() {
2172 self.detect_lost_packets(now, pn_space, false);
2174 self.set_loss_detection_timer(now);
2175 return;
2176 }
2177
2178 let (_, space) = match self.pto_time_and_space(now) {
2179 Some(x) => x,
2180 None => {
2181 error!("PTO expired while unset");
2182 return;
2183 }
2184 };
2185 trace!(
2186 in_flight = self.path.in_flight.bytes,
2187 count = self.pto_count,
2188 ?space,
2189 "PTO fired"
2190 );
2191
2192 let count = match self.path.in_flight.ack_eliciting {
2193 0 => {
2196 debug_assert!(!self.peer_completed_address_validation());
2197 1
2198 }
2199 _ => 2,
2201 };
2202 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
2203 self.pto_count = self.pto_count.saturating_add(1);
2204 self.set_loss_detection_timer(now);
2205 }
2206
2207 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
2208 let mut lost_packets = Vec::<u64>::new();
2209 let mut lost_mtu_probe = None;
2210 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
2211 let rtt = self.path.rtt.conservative();
2212 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
2213
2214 let lost_send_time = now.checked_sub(loss_delay).unwrap();
2216 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
2217 let packet_threshold = self.config.packet_threshold as u64;
2218 let mut size_of_lost_packets = 0u64;
2219
2220 let congestion_period =
2224 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
2225 let mut persistent_congestion_start: Option<Instant> = None;
2226 let mut prev_packet = None;
2227 let mut in_persistent_congestion = false;
2228
2229 let space = &mut self.spaces[pn_space];
2230 space.loss_time = None;
2231
2232 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
2233 if prev_packet != Some(packet.wrapping_sub(1)) {
2234 persistent_congestion_start = None;
2236 }
2237
2238 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
2239 {
2240 if Some(packet) == in_flight_mtu_probe {
2241 lost_mtu_probe = in_flight_mtu_probe;
2244 } else {
2245 lost_packets.push(packet);
2246 size_of_lost_packets += info.size as u64;
2247 if info.ack_eliciting && due_to_ack {
2248 match persistent_congestion_start {
2249 Some(start) if info.time_sent - start > congestion_period => {
2252 in_persistent_congestion = true;
2253 }
2254 None if self
2256 .path
2257 .first_packet_after_rtt_sample
2258 .is_some_and(|x| x < (pn_space, packet)) =>
2259 {
2260 persistent_congestion_start = Some(info.time_sent);
2261 }
2262 _ => {}
2263 }
2264 }
2265 }
2266 } else {
2267 let next_loss_time = info.time_sent + loss_delay;
2268 space.loss_time = Some(
2269 space
2270 .loss_time
2271 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
2272 );
2273 persistent_congestion_start = None;
2274 }
2275
2276 prev_packet = Some(packet);
2277 }
2278
2279 if let Some(largest_lost) = lost_packets.last().cloned() {
2281 let old_bytes_in_flight = self.path.in_flight.bytes;
2282 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
2283 self.lost_packets += lost_packets.len() as u64;
2284 self.stats.path.lost_packets += lost_packets.len() as u64;
2285 self.stats.path.lost_bytes += size_of_lost_packets;
2286 trace!(
2287 "packets lost: {:?}, bytes lost: {}",
2288 lost_packets, size_of_lost_packets
2289 );
2290
2291 for &packet in &lost_packets {
2292 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2294 for frame in info.stream_frames {
2295 self.streams.retransmit(frame);
2296 }
2297 self.spaces[pn_space].pending |= info.retransmits;
2298 self.path.mtud.on_non_probe_lost(packet, info.size);
2299 }
2300
2301 if self.path.mtud.black_hole_detected(now) {
2302 self.stats.path.black_holes_detected += 1;
2303 self.path
2304 .congestion
2305 .on_mtu_update(self.path.mtud.current_mtu());
2306 if let Some(max_datagram_size) = self.datagrams().max_size() {
2307 self.datagrams.drop_oversized(max_datagram_size);
2308 }
2309 }
2310
2311 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
2313
2314 if lost_ack_eliciting {
2315 self.stats.path.congestion_events += 1;
2316 self.path.congestion.on_congestion_event(
2317 now,
2318 largest_lost_sent,
2319 in_persistent_congestion,
2320 size_of_lost_packets,
2321 );
2322 }
2323 }
2324
2325 if let Some(packet) = lost_mtu_probe {
2327 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2329 self.path.mtud.on_probe_lost();
2330 self.stats.path.lost_plpmtud_probes += 1;
2331 }
2332 }
2333
2334 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
2335 SpaceId::iter()
2336 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
2337 .min_by_key(|&(time, _)| time)
2338 }
2339
2340 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
2341 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
2342 let mut duration = self.path.rtt.pto_base() * backoff;
2343
2344 if self.path.in_flight.ack_eliciting == 0 {
2345 debug_assert!(!self.peer_completed_address_validation());
2346 let space = match self.highest_space {
2347 SpaceId::Handshake => SpaceId::Handshake,
2348 _ => SpaceId::Initial,
2349 };
2350 return Some((now + duration, space));
2351 }
2352
2353 let mut result = None;
2354 for space in SpaceId::iter() {
2355 if self.spaces[space].in_flight == 0 {
2356 continue;
2357 }
2358 if space == SpaceId::Data {
2359 if self.is_handshaking() {
2361 return result;
2362 }
2363 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2365 }
2366 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
2367 Some(time) => time,
2368 None => continue,
2369 };
2370 let pto = last_ack_eliciting + duration;
2371 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2372 result = Some((pto, space));
2373 }
2374 }
2375 result
2376 }
2377
2378 fn peer_completed_address_validation(&self) -> bool {
2379 if self.side.is_server() || self.state.is_closed() {
2380 return true;
2381 }
2382 self.spaces[SpaceId::Handshake]
2385 .largest_acked_packet
2386 .is_some()
2387 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
2388 || (self.spaces[SpaceId::Data].crypto.is_some()
2389 && self.spaces[SpaceId::Handshake].crypto.is_none())
2390 }
2391
2392 fn set_loss_detection_timer(&mut self, now: Instant) {
2393 if self.state.is_closed() {
2394 return;
2398 }
2399
2400 if let Some((loss_time, _)) = self.loss_time_and_space() {
2401 self.timers.set(Timer::LossDetection, loss_time);
2403 return;
2404 }
2405
2406 if self.path.anti_amplification_blocked(1) {
2407 self.timers.stop(Timer::LossDetection);
2409 return;
2410 }
2411
2412 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
2413 self.timers.stop(Timer::LossDetection);
2416 return;
2417 }
2418
2419 if let Some((timeout, _)) = self.pto_time_and_space(now) {
2422 self.timers.set(Timer::LossDetection, timeout);
2423 } else {
2424 self.timers.stop(Timer::LossDetection);
2425 }
2426 }
2427
2428 fn pto(&self, space: SpaceId) -> Duration {
2430 let max_ack_delay = match space {
2431 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
2432 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
2433 };
2434 self.path.rtt.pto_base() + max_ack_delay
2435 }
2436
2437 fn on_packet_authenticated(
2438 &mut self,
2439 now: Instant,
2440 space_id: SpaceId,
2441 ecn: Option<EcnCodepoint>,
2442 packet: Option<u64>,
2443 spin: bool,
2444 is_1rtt: bool,
2445 ) {
2446 self.total_authed_packets += 1;
2447 self.reset_keep_alive(now);
2448 self.reset_idle_timeout(now, space_id);
2449 self.permit_idle_reset = true;
2450 self.receiving_ecn |= ecn.is_some();
2451 if let Some(x) = ecn {
2452 let space = &mut self.spaces[space_id];
2453 space.ecn_counters += x;
2454
2455 if x.is_ce() {
2456 space.pending_acks.set_immediate_ack_required();
2457 }
2458 }
2459
2460 let packet = match packet {
2461 Some(x) => x,
2462 None => return,
2463 };
2464 if self.side.is_server() {
2465 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
2466 self.discard_space(now, SpaceId::Initial);
2468 }
2469 if self.zero_rtt_crypto.is_some() && is_1rtt {
2470 self.set_key_discard_timer(now, space_id)
2472 }
2473 }
2474 let space = &mut self.spaces[space_id];
2475 space.pending_acks.insert_one(packet, now);
2476 if packet >= space.rx_packet {
2477 space.rx_packet = packet;
2478 self.spin = self.side.is_client() ^ spin;
2480 }
2481 }
2482
2483 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
2484 let timeout = match self.idle_timeout {
2485 None => return,
2486 Some(dur) => dur,
2487 };
2488 if self.state.is_closed() {
2489 self.timers.stop(Timer::Idle);
2490 return;
2491 }
2492 let dt = cmp::max(timeout, 3 * self.pto(space));
2493 self.timers.set(Timer::Idle, now + dt);
2494 }
2495
2496 fn reset_keep_alive(&mut self, now: Instant) {
2497 let interval = match self.config.keep_alive_interval {
2498 Some(x) if self.state.is_established() => x,
2499 _ => return,
2500 };
2501 self.timers.set(Timer::KeepAlive, now + interval);
2502 }
2503
2504 fn reset_cid_retirement(&mut self) {
2505 if let Some(t) = self.local_cid_state.next_timeout() {
2506 self.timers.set(Timer::PushNewCid, t);
2507 }
2508 }
2509
2510 pub(crate) fn handle_first_packet(
2515 &mut self,
2516 now: Instant,
2517 remote: SocketAddr,
2518 ecn: Option<EcnCodepoint>,
2519 packet_number: u64,
2520 packet: InitialPacket,
2521 remaining: Option<BytesMut>,
2522 ) -> Result<(), ConnectionError> {
2523 let span = trace_span!("first recv");
2524 let _guard = span.enter();
2525 debug_assert!(self.side.is_server());
2526 let len = packet.header_data.len() + packet.payload.len();
2527 self.path.total_recvd = len as u64;
2528
2529 match self.state {
2530 State::Handshake(ref mut state) => {
2531 state.expected_token = packet.header.token.clone();
2532 }
2533 _ => unreachable!("first packet must be delivered in Handshake state"),
2534 }
2535
2536 self.on_packet_authenticated(
2537 now,
2538 SpaceId::Initial,
2539 ecn,
2540 Some(packet_number),
2541 false,
2542 false,
2543 );
2544
2545 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2546 if let Some(data) = remaining {
2547 self.handle_coalesced(now, remote, ecn, data);
2548 }
2549
2550 #[cfg(feature = "__qlog")]
2551 self.emit_qlog_recovery_metrics(now);
2552
2553 Ok(())
2554 }
2555
2556 fn init_0rtt(&mut self) {
2557 let (header, packet) = match self.crypto.early_crypto() {
2558 Some(x) => x,
2559 None => return,
2560 };
2561 if self.side.is_client() {
2562 match self.crypto.transport_parameters() {
2563 Ok(params) => {
2564 let params = params
2565 .expect("crypto layer didn't supply transport parameters with ticket");
2566 let params = TransportParameters {
2568 initial_src_cid: None,
2569 original_dst_cid: None,
2570 preferred_address: None,
2571 retry_src_cid: None,
2572 stateless_reset_token: None,
2573 min_ack_delay: None,
2574 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2575 max_ack_delay: TransportParameters::default().max_ack_delay,
2576 ..params
2577 };
2578 self.set_peer_params(params);
2579 }
2580 Err(e) => {
2581 error!("session ticket has malformed transport parameters: {}", e);
2582 return;
2583 }
2584 }
2585 }
2586 trace!("0-RTT enabled");
2587 self.zero_rtt_enabled = true;
2588 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2589 }
2590
2591 fn read_crypto(
2592 &mut self,
2593 space: SpaceId,
2594 crypto: &frame::Crypto,
2595 payload_len: usize,
2596 ) -> Result<(), TransportError> {
2597 let expected = if !self.state.is_handshake() {
2598 SpaceId::Data
2599 } else if self.highest_space == SpaceId::Initial {
2600 SpaceId::Initial
2601 } else {
2602 SpaceId::Handshake
2605 };
2606 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2610
2611 let end = crypto.offset + crypto.data.len() as u64;
2612 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2613 warn!(
2614 "received new {:?} CRYPTO data when expecting {:?}",
2615 space, expected
2616 );
2617 return Err(TransportError::PROTOCOL_VIOLATION(
2618 "new data at unexpected encryption level",
2619 ));
2620 }
2621
2622 self.pqc_state.detect_pqc_from_crypto(&crypto.data, space);
2624
2625 if self.pqc_state.should_trigger_mtu_discovery() {
2627 self.path
2629 .mtud
2630 .reset(self.pqc_state.min_initial_size(), self.config.min_mtu);
2631 trace!("Triggered MTU discovery for PQC handshake");
2632 }
2633
2634 let space = &mut self.spaces[space];
2635 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2636 if max > self.config.crypto_buffer_size as u64 {
2637 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2638 }
2639
2640 space
2641 .crypto_stream
2642 .insert(crypto.offset, crypto.data.clone(), payload_len);
2643 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2644 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2645 if self.crypto.read_handshake(&chunk.bytes)? {
2646 self.events.push_back(Event::HandshakeDataReady);
2647 }
2648 }
2649
2650 Ok(())
2651 }
2652
2653 fn write_crypto(&mut self) {
2654 loop {
2655 let space = self.highest_space;
2656 let mut outgoing = Vec::new();
2657 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2658 match space {
2659 SpaceId::Initial => {
2660 self.upgrade_crypto(SpaceId::Handshake, crypto);
2661 }
2662 SpaceId::Handshake => {
2663 self.upgrade_crypto(SpaceId::Data, crypto);
2664 }
2665 _ => unreachable!("got updated secrets during 1-RTT"),
2666 }
2667 }
2668 if outgoing.is_empty() {
2669 if space == self.highest_space {
2670 break;
2671 } else {
2672 continue;
2674 }
2675 }
2676 let offset = self.spaces[space].crypto_offset;
2677 let outgoing = Bytes::from(outgoing);
2678 if let State::Handshake(ref mut state) = self.state {
2679 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2680 state.client_hello = Some(outgoing.clone());
2681 }
2682 }
2683 self.spaces[space].crypto_offset += outgoing.len() as u64;
2684 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2685
2686 let use_pqc_fragmentation = self.pqc_state.using_pqc && outgoing.len() > 1200;
2688
2689 if use_pqc_fragmentation {
2690 let frames = self.pqc_state.packet_handler.fragment_crypto_data(
2692 &outgoing,
2693 offset,
2694 self.pqc_state.min_initial_size() as usize,
2695 );
2696 for frame in frames {
2697 self.spaces[space].pending.crypto.push_back(frame);
2698 }
2699 } else {
2700 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2702 offset,
2703 data: outgoing,
2704 });
2705 }
2706 }
2707 }
2708
2709 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2711 debug_assert!(
2712 self.spaces[space].crypto.is_none(),
2713 "already reached packet space {space:?}"
2714 );
2715 trace!("{:?} keys ready", space);
2716 if space == SpaceId::Data {
2717 self.next_crypto = Some(
2719 self.crypto
2720 .next_1rtt_keys()
2721 .expect("handshake should be complete"),
2722 );
2723 }
2724
2725 self.spaces[space].crypto = Some(crypto);
2726 debug_assert!(space as usize > self.highest_space as usize);
2727 self.highest_space = space;
2728 if space == SpaceId::Data && self.side.is_client() {
2729 self.zero_rtt_crypto = None;
2731 }
2732 }
2733
2734 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2735 debug_assert!(space_id != SpaceId::Data);
2736 trace!("discarding {:?} keys", space_id);
2737 if space_id == SpaceId::Initial {
2738 if let ConnectionSide::Client { token, .. } = &mut self.side {
2740 *token = Bytes::new();
2741 }
2742 }
2743 let space = &mut self.spaces[space_id];
2744 space.crypto = None;
2745 space.time_of_last_ack_eliciting_packet = None;
2746 space.loss_time = None;
2747 space.in_flight = 0;
2748 let sent_packets = mem::take(&mut space.sent_packets);
2749 for (pn, packet) in sent_packets.into_iter() {
2750 self.remove_in_flight(pn, &packet);
2751 }
2752 self.set_loss_detection_timer(now)
2753 }
2754
2755 fn handle_coalesced(
2756 &mut self,
2757 now: Instant,
2758 remote: SocketAddr,
2759 ecn: Option<EcnCodepoint>,
2760 data: BytesMut,
2761 ) {
2762 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2763 let mut remaining = Some(data);
2764 while let Some(data) = remaining {
2765 match PartialDecode::new(
2766 data,
2767 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2768 &[self.version],
2769 self.endpoint_config.grease_quic_bit,
2770 ) {
2771 Ok((partial_decode, rest)) => {
2772 remaining = rest;
2773 self.handle_decode(now, remote, ecn, partial_decode);
2774 }
2775 Err(e) => {
2776 trace!("malformed header: {}", e);
2777 return;
2778 }
2779 }
2780 }
2781 }
2782
2783 fn handle_decode(
2784 &mut self,
2785 now: Instant,
2786 remote: SocketAddr,
2787 ecn: Option<EcnCodepoint>,
2788 partial_decode: PartialDecode,
2789 ) {
2790 if let Some(decoded) = packet_crypto::unprotect_header(
2791 partial_decode,
2792 &self.spaces,
2793 self.zero_rtt_crypto.as_ref(),
2794 self.peer_params.stateless_reset_token,
2795 ) {
2796 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2797 }
2798 }
2799
2800 fn handle_packet(
2801 &mut self,
2802 now: Instant,
2803 remote: SocketAddr,
2804 ecn: Option<EcnCodepoint>,
2805 packet: Option<Packet>,
2806 stateless_reset: bool,
2807 ) {
2808 self.stats.udp_rx.ios += 1;
2809 if let Some(ref packet) = packet {
2810 trace!(
2811 "got {:?} packet ({} bytes) from {} using id {}",
2812 packet.header.space(),
2813 packet.payload.len() + packet.header_data.len(),
2814 remote,
2815 packet.header.dst_cid(),
2816 );
2817
2818 #[cfg(feature = "trace")]
2820 {
2821 use crate::trace_packet_received;
2822 let packet_size = packet.payload.len() + packet.header_data.len();
2824 trace_packet_received!(
2825 &self.event_log,
2826 self.trace_context.trace_id(),
2827 packet_size as u32,
2828 0 );
2830 }
2831 }
2832
2833 if self.is_handshaking() && remote != self.path.remote {
2834 debug!("discarding packet with unexpected remote during handshake");
2835 return;
2836 }
2837
2838 let was_closed = self.state.is_closed();
2839 let was_drained = self.state.is_drained();
2840
2841 let decrypted = match packet {
2842 None => Err(None),
2843 Some(mut packet) => self
2844 .decrypt_packet(now, &mut packet)
2845 .map(move |number| (packet, number)),
2846 };
2847 let result = match decrypted {
2848 _ if stateless_reset => {
2849 debug!("got stateless reset");
2850 Err(ConnectionError::Reset)
2851 }
2852 Err(Some(e)) => {
2853 warn!("illegal packet: {}", e);
2854 Err(e.into())
2855 }
2856 Err(None) => {
2857 debug!("failed to authenticate packet");
2858 self.authentication_failures += 1;
2859 let integrity_limit = self.spaces[self.highest_space]
2860 .crypto
2861 .as_ref()
2862 .unwrap()
2863 .packet
2864 .local
2865 .integrity_limit();
2866 if self.authentication_failures > integrity_limit {
2867 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2868 } else {
2869 return;
2870 }
2871 }
2872 Ok((packet, number)) => {
2873 let span = match number {
2874 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2875 None => trace_span!("recv", space = ?packet.header.space()),
2876 };
2877 let _guard = span.enter();
2878
2879 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2880 if number.is_some_and(is_duplicate) {
2881 debug!("discarding possible duplicate packet");
2882 return;
2883 } else if self.state.is_handshake() && packet.header.is_short() {
2884 trace!("dropping short packet during handshake");
2886 return;
2887 } else {
2888 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2889 if let State::Handshake(ref hs) = self.state {
2890 if self.side.is_server() && token != &hs.expected_token {
2891 warn!("discarding Initial with invalid retry token");
2895 return;
2896 }
2897 }
2898 }
2899
2900 if !self.state.is_closed() {
2901 let spin = match packet.header {
2902 Header::Short { spin, .. } => spin,
2903 _ => false,
2904 };
2905 self.on_packet_authenticated(
2906 now,
2907 packet.header.space(),
2908 ecn,
2909 number,
2910 spin,
2911 packet.header.is_1rtt(),
2912 );
2913 }
2914
2915 self.process_decrypted_packet(now, remote, number, packet)
2916 }
2917 }
2918 };
2919
2920 if let Err(conn_err) = result {
2922 self.error = Some(conn_err.clone());
2923 self.state = match conn_err {
2924 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2925 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2926 ConnectionError::Reset
2927 | ConnectionError::TransportError(TransportError {
2928 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2929 ..
2930 }) => State::Drained,
2931 ConnectionError::TimedOut => {
2932 unreachable!("timeouts aren't generated by packet processing");
2933 }
2934 ConnectionError::TransportError(err) => {
2935 debug!("closing connection due to transport error: {}", err);
2936 State::closed(err)
2937 }
2938 ConnectionError::VersionMismatch => State::Draining,
2939 ConnectionError::LocallyClosed => {
2940 unreachable!("LocallyClosed isn't generated by packet processing");
2941 }
2942 ConnectionError::CidsExhausted => {
2943 unreachable!("CidsExhausted isn't generated by packet processing");
2944 }
2945 };
2946 }
2947
2948 if !was_closed && self.state.is_closed() {
2949 self.close_common();
2950 if !self.state.is_drained() {
2951 self.set_close_timer(now);
2952 }
2953 }
2954 if !was_drained && self.state.is_drained() {
2955 self.endpoint_events.push_back(EndpointEventInner::Drained);
2956 self.timers.stop(Timer::Close);
2959 }
2960
2961 if let State::Closed(_) = self.state {
2963 self.close = remote == self.path.remote;
2964 }
2965 }
2966
2967 fn process_decrypted_packet(
2968 &mut self,
2969 now: Instant,
2970 remote: SocketAddr,
2971 number: Option<u64>,
2972 packet: Packet,
2973 ) -> Result<(), ConnectionError> {
2974 let state = match self.state {
2975 State::Established => {
2976 match packet.header.space() {
2977 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2978 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2979 _ => {
2980 trace!("discarding unexpected pre-handshake packet");
2981 }
2982 }
2983 return Ok(());
2984 }
2985 State::Closed(_) => {
2986 for result in frame::Iter::new(packet.payload.freeze())? {
2987 let frame = match result {
2988 Ok(frame) => frame,
2989 Err(err) => {
2990 debug!("frame decoding error: {err:?}");
2991 continue;
2992 }
2993 };
2994
2995 if let Frame::Padding = frame {
2996 continue;
2997 };
2998
2999 self.stats.frame_rx.record(&frame);
3000
3001 if let Frame::Close(_) = frame {
3002 trace!("draining");
3003 self.state = State::Draining;
3004 break;
3005 }
3006 }
3007 return Ok(());
3008 }
3009 State::Draining | State::Drained => return Ok(()),
3010 State::Handshake(ref mut state) => state,
3011 };
3012
3013 match packet.header {
3014 Header::Retry {
3015 src_cid: rem_cid, ..
3016 } => {
3017 if self.side.is_server() {
3018 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3019 }
3020
3021 if self.total_authed_packets > 1
3022 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
3024 &self.rem_cids.active(),
3025 &packet.header_data,
3026 &packet.payload,
3027 )
3028 {
3029 trace!("discarding invalid Retry");
3030 return Ok(());
3038 }
3039
3040 trace!("retrying with CID {}", rem_cid);
3041 let client_hello = state.client_hello.take().unwrap();
3042 self.retry_src_cid = Some(rem_cid);
3043 self.rem_cids.update_initial_cid(rem_cid);
3044 self.rem_handshake_cid = rem_cid;
3045
3046 let space = &mut self.spaces[SpaceId::Initial];
3047 if let Some(info) = space.take(0) {
3048 self.on_packet_acked(now, 0, info);
3049 };
3050
3051 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
3053 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
3054 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
3055 crypto_offset: client_hello.len() as u64,
3056 ..PacketSpace::new(now)
3057 };
3058 self.spaces[SpaceId::Initial]
3059 .pending
3060 .crypto
3061 .push_back(frame::Crypto {
3062 offset: 0,
3063 data: client_hello,
3064 });
3065
3066 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
3068 for (pn, info) in zero_rtt {
3069 self.remove_in_flight(pn, &info);
3070 self.spaces[SpaceId::Data].pending |= info.retransmits;
3071 }
3072 self.streams.retransmit_all_for_0rtt();
3073
3074 let token_len = packet.payload.len() - 16;
3075 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3076 unreachable!("we already short-circuited if we're server");
3077 };
3078 *token = packet.payload.freeze().split_to(token_len);
3079 self.state = State::Handshake(state::Handshake {
3080 expected_token: Bytes::new(),
3081 rem_cid_set: false,
3082 client_hello: None,
3083 });
3084 Ok(())
3085 }
3086 Header::Long {
3087 ty: LongType::Handshake,
3088 src_cid: rem_cid,
3089 ..
3090 } => {
3091 if rem_cid != self.rem_handshake_cid {
3092 debug!(
3093 "discarding packet with mismatched remote CID: {} != {}",
3094 self.rem_handshake_cid, rem_cid
3095 );
3096 return Ok(());
3097 }
3098 self.on_path_validated();
3099
3100 self.process_early_payload(now, packet)?;
3101 if self.state.is_closed() {
3102 return Ok(());
3103 }
3104
3105 if self.crypto.is_handshaking() {
3106 trace!("handshake ongoing");
3107 return Ok(());
3108 }
3109
3110 if self.side.is_client() {
3111 let params =
3113 self.crypto
3114 .transport_parameters()?
3115 .ok_or_else(|| TransportError {
3116 code: TransportErrorCode::crypto(0x6d),
3117 frame: None,
3118 reason: "transport parameters missing".into(),
3119 })?;
3120
3121 if self.has_0rtt() {
3122 if !self.crypto.early_data_accepted().unwrap() {
3123 debug_assert!(self.side.is_client());
3124 debug!("0-RTT rejected");
3125 self.accepted_0rtt = false;
3126 self.streams.zero_rtt_rejected();
3127
3128 self.spaces[SpaceId::Data].pending = Retransmits::default();
3130
3131 let sent_packets =
3133 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
3134 for (pn, packet) in sent_packets {
3135 self.remove_in_flight(pn, &packet);
3136 }
3137 } else {
3138 self.accepted_0rtt = true;
3139 params.validate_resumption_from(&self.peer_params)?;
3140 }
3141 }
3142 if let Some(token) = params.stateless_reset_token {
3143 self.endpoint_events
3144 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
3145 }
3146 self.handle_peer_params(params)?;
3147 self.issue_first_cids(now);
3148 } else {
3149 self.spaces[SpaceId::Data].pending.handshake_done = true;
3151 self.discard_space(now, SpaceId::Handshake);
3152 }
3153
3154 self.events.push_back(Event::Connected);
3155 self.state = State::Established;
3156 trace!("established");
3157 Ok(())
3158 }
3159 Header::Initial(InitialHeader {
3160 src_cid: rem_cid, ..
3161 }) => {
3162 if !state.rem_cid_set {
3163 trace!("switching remote CID to {}", rem_cid);
3164 let mut state = state.clone();
3165 self.rem_cids.update_initial_cid(rem_cid);
3166 self.rem_handshake_cid = rem_cid;
3167 self.orig_rem_cid = rem_cid;
3168 state.rem_cid_set = true;
3169 self.state = State::Handshake(state);
3170 } else if rem_cid != self.rem_handshake_cid {
3171 debug!(
3172 "discarding packet with mismatched remote CID: {} != {}",
3173 self.rem_handshake_cid, rem_cid
3174 );
3175 return Ok(());
3176 }
3177
3178 let starting_space = self.highest_space;
3179 self.process_early_payload(now, packet)?;
3180
3181 if self.side.is_server()
3182 && starting_space == SpaceId::Initial
3183 && self.highest_space != SpaceId::Initial
3184 {
3185 let params =
3186 self.crypto
3187 .transport_parameters()?
3188 .ok_or_else(|| TransportError {
3189 code: TransportErrorCode::crypto(0x6d),
3190 frame: None,
3191 reason: "transport parameters missing".into(),
3192 })?;
3193 self.handle_peer_params(params)?;
3194 self.issue_first_cids(now);
3195 self.init_0rtt();
3196 }
3197 Ok(())
3198 }
3199 Header::Long {
3200 ty: LongType::ZeroRtt,
3201 ..
3202 } => {
3203 self.process_payload(now, remote, number.unwrap(), packet)?;
3204 Ok(())
3205 }
3206 Header::VersionNegotiate { .. } => {
3207 if self.total_authed_packets > 1 {
3208 return Ok(());
3209 }
3210 let supported = packet
3211 .payload
3212 .chunks(4)
3213 .any(|x| match <[u8; 4]>::try_from(x) {
3214 Ok(version) => self.version == u32::from_be_bytes(version),
3215 Err(_) => false,
3216 });
3217 if supported {
3218 return Ok(());
3219 }
3220 debug!("remote doesn't support our version");
3221 Err(ConnectionError::VersionMismatch)
3222 }
3223 Header::Short { .. } => unreachable!(
3224 "short packets received during handshake are discarded in handle_packet"
3225 ),
3226 }
3227 }
3228
3229 fn process_early_payload(
3231 &mut self,
3232 now: Instant,
3233 packet: Packet,
3234 ) -> Result<(), TransportError> {
3235 debug_assert_ne!(packet.header.space(), SpaceId::Data);
3236 let payload_len = packet.payload.len();
3237 let mut ack_eliciting = false;
3238 for result in frame::Iter::new(packet.payload.freeze())? {
3239 let frame = result?;
3240 let span = match frame {
3241 Frame::Padding => continue,
3242 _ => Some(trace_span!("frame", ty = %frame.ty())),
3243 };
3244
3245 self.stats.frame_rx.record(&frame);
3246
3247 let _guard = span.as_ref().map(|x| x.enter());
3248 ack_eliciting |= frame.is_ack_eliciting();
3249
3250 match frame {
3252 Frame::Padding | Frame::Ping => {}
3253 Frame::Crypto(frame) => {
3254 self.read_crypto(packet.header.space(), &frame, payload_len)?;
3255 }
3256 Frame::Ack(ack) => {
3257 self.on_ack_received(now, packet.header.space(), ack)?;
3258 }
3259 Frame::Close(reason) => {
3260 self.error = Some(reason.into());
3261 self.state = State::Draining;
3262 return Ok(());
3263 }
3264 _ => {
3265 let mut err =
3266 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
3267 err.frame = Some(frame.ty());
3268 return Err(err);
3269 }
3270 }
3271 }
3272
3273 if ack_eliciting {
3274 self.spaces[packet.header.space()]
3276 .pending_acks
3277 .set_immediate_ack_required();
3278 }
3279
3280 self.write_crypto();
3281 Ok(())
3282 }
3283
3284 fn process_payload(
3285 &mut self,
3286 now: Instant,
3287 remote: SocketAddr,
3288 number: u64,
3289 packet: Packet,
3290 ) -> Result<(), TransportError> {
3291 let payload = packet.payload.freeze();
3292 let mut is_probing_packet = true;
3293 let mut close = None;
3294 let payload_len = payload.len();
3295 let mut ack_eliciting = false;
3296 for result in frame::Iter::new(payload)? {
3297 let frame = result?;
3298 let span = match frame {
3299 Frame::Padding => continue,
3300 _ => Some(trace_span!("frame", ty = %frame.ty())),
3301 };
3302
3303 self.stats.frame_rx.record(&frame);
3304 match &frame {
3307 Frame::Crypto(f) => {
3308 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
3309 }
3310 Frame::Stream(f) => {
3311 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
3312 }
3313 Frame::Datagram(f) => {
3314 trace!(len = f.data.len(), "got datagram frame");
3315 }
3316 f => {
3317 trace!("got frame {:?}", f);
3318 }
3319 }
3320
3321 let _guard = span.as_ref().map(|x| x.enter());
3322 if packet.header.is_0rtt() {
3323 match frame {
3324 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
3325 return Err(TransportError::PROTOCOL_VIOLATION(
3326 "illegal frame type in 0-RTT",
3327 ));
3328 }
3329 _ => {}
3330 }
3331 }
3332 ack_eliciting |= frame.is_ack_eliciting();
3333
3334 match frame {
3336 Frame::Padding
3337 | Frame::PathChallenge(_)
3338 | Frame::PathResponse(_)
3339 | Frame::NewConnectionId(_) => {}
3340 _ => {
3341 is_probing_packet = false;
3342 }
3343 }
3344 match frame {
3345 Frame::Crypto(frame) => {
3346 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
3347 }
3348 Frame::Stream(frame) => {
3349 if self.streams.received(frame, payload_len)?.should_transmit() {
3350 self.spaces[SpaceId::Data].pending.max_data = true;
3351 }
3352 }
3353 Frame::Ack(ack) => {
3354 self.on_ack_received(now, SpaceId::Data, ack)?;
3355 }
3356 Frame::Padding | Frame::Ping => {}
3357 Frame::Close(reason) => {
3358 close = Some(reason);
3359 }
3360 Frame::PathChallenge(token) => {
3361 self.path_responses.push(number, token, remote);
3362 if remote == self.path.remote {
3363 match self.peer_supports_ack_frequency() {
3366 true => self.immediate_ack(),
3367 false => self.ping(),
3368 }
3369 }
3370 }
3371 Frame::PathResponse(token) => {
3372 if self.path.challenge == Some(token) && remote == self.path.remote {
3373 trace!("new path validated");
3374 self.timers.stop(Timer::PathValidation);
3375 self.path.challenge = None;
3376 self.path.validated = true;
3377 if let Some((_, ref mut prev_path)) = self.prev_path {
3378 prev_path.challenge = None;
3379 prev_path.challenge_pending = false;
3380 }
3381 self.on_path_validated();
3382 } else if let Some(nat_traversal) = &mut self.nat_traversal {
3383 match nat_traversal.handle_validation_success(remote, token, now) {
3385 Ok(sequence) => {
3386 trace!(
3387 "NAT traversal candidate {} validated for sequence {}",
3388 remote, sequence
3389 );
3390
3391 if nat_traversal.handle_coordination_success(remote, now) {
3393 trace!("Coordination succeeded via {}", remote);
3394
3395 let can_migrate = match &self.side {
3397 ConnectionSide::Client { .. } => true, ConnectionSide::Server { server_config } => {
3399 server_config.migration
3400 }
3401 };
3402
3403 if can_migrate {
3404 let best_pairs = nat_traversal.get_best_succeeded_pairs();
3406 if let Some(best) = best_pairs.first() {
3407 if best.remote_addr == remote
3408 && best.remote_addr != self.path.remote
3409 {
3410 debug!(
3411 "NAT traversal found better path, initiating migration"
3412 );
3413 if let Err(e) =
3415 self.migrate_to_nat_traversal_path(now)
3416 {
3417 warn!(
3418 "Failed to migrate to NAT traversal path: {:?}",
3419 e
3420 );
3421 }
3422 }
3423 }
3424 }
3425 } else {
3426 if nat_traversal.mark_pair_succeeded(remote) {
3428 trace!("NAT traversal pair succeeded for {}", remote);
3429 }
3430 }
3431 }
3432 Err(NatTraversalError::ChallengeMismatch) => {
3433 debug!(
3434 "PATH_RESPONSE challenge mismatch for NAT candidate {}",
3435 remote
3436 );
3437 }
3438 Err(e) => {
3439 debug!("NAT traversal validation error: {}", e);
3440 }
3441 }
3442 } else {
3443 debug!(token, "ignoring invalid PATH_RESPONSE");
3444 }
3445 }
3446 Frame::MaxData(bytes) => {
3447 self.streams.received_max_data(bytes);
3448 }
3449 Frame::MaxStreamData { id, offset } => {
3450 self.streams.received_max_stream_data(id, offset)?;
3451 }
3452 Frame::MaxStreams { dir, count } => {
3453 self.streams.received_max_streams(dir, count)?;
3454 }
3455 Frame::ResetStream(frame) => {
3456 if self.streams.received_reset(frame)?.should_transmit() {
3457 self.spaces[SpaceId::Data].pending.max_data = true;
3458 }
3459 }
3460 Frame::DataBlocked { offset } => {
3461 debug!(offset, "peer claims to be blocked at connection level");
3462 }
3463 Frame::StreamDataBlocked { id, offset } => {
3464 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
3465 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
3466 return Err(TransportError::STREAM_STATE_ERROR(
3467 "STREAM_DATA_BLOCKED on send-only stream",
3468 ));
3469 }
3470 debug!(
3471 stream = %id,
3472 offset, "peer claims to be blocked at stream level"
3473 );
3474 }
3475 Frame::StreamsBlocked { dir, limit } => {
3476 if limit > MAX_STREAM_COUNT {
3477 return Err(TransportError::FRAME_ENCODING_ERROR(
3478 "unrepresentable stream limit",
3479 ));
3480 }
3481 debug!(
3482 "peer claims to be blocked opening more than {} {} streams",
3483 limit, dir
3484 );
3485 }
3486 Frame::StopSending(frame::StopSending { id, error_code }) => {
3487 if id.initiator() != self.side.side() {
3488 if id.dir() == Dir::Uni {
3489 debug!("got STOP_SENDING on recv-only {}", id);
3490 return Err(TransportError::STREAM_STATE_ERROR(
3491 "STOP_SENDING on recv-only stream",
3492 ));
3493 }
3494 } else if self.streams.is_local_unopened(id) {
3495 return Err(TransportError::STREAM_STATE_ERROR(
3496 "STOP_SENDING on unopened stream",
3497 ));
3498 }
3499 self.streams.received_stop_sending(id, error_code);
3500 }
3501 Frame::RetireConnectionId { sequence } => {
3502 let allow_more_cids = self
3503 .local_cid_state
3504 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
3505 self.endpoint_events
3506 .push_back(EndpointEventInner::RetireConnectionId(
3507 now,
3508 sequence,
3509 allow_more_cids,
3510 ));
3511 }
3512 Frame::NewConnectionId(frame) => {
3513 trace!(
3514 sequence = frame.sequence,
3515 id = %frame.id,
3516 retire_prior_to = frame.retire_prior_to,
3517 );
3518 if self.rem_cids.active().is_empty() {
3519 return Err(TransportError::PROTOCOL_VIOLATION(
3520 "NEW_CONNECTION_ID when CIDs aren't in use",
3521 ));
3522 }
3523 if frame.retire_prior_to > frame.sequence {
3524 return Err(TransportError::PROTOCOL_VIOLATION(
3525 "NEW_CONNECTION_ID retiring unissued CIDs",
3526 ));
3527 }
3528
3529 use crate::cid_queue::InsertError;
3530 match self.rem_cids.insert(frame) {
3531 Ok(None) => {}
3532 Ok(Some((retired, reset_token))) => {
3533 let pending_retired =
3534 &mut self.spaces[SpaceId::Data].pending.retire_cids;
3535 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
3538 if (pending_retired.len() as u64)
3541 .saturating_add(retired.end.saturating_sub(retired.start))
3542 > MAX_PENDING_RETIRED_CIDS
3543 {
3544 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
3545 "queued too many retired CIDs",
3546 ));
3547 }
3548 pending_retired.extend(retired);
3549 self.set_reset_token(reset_token);
3550 }
3551 Err(InsertError::ExceedsLimit) => {
3552 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
3553 }
3554 Err(InsertError::Retired) => {
3555 trace!("discarding already-retired");
3556 self.spaces[SpaceId::Data]
3560 .pending
3561 .retire_cids
3562 .push(frame.sequence);
3563 continue;
3564 }
3565 };
3566
3567 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
3568 self.update_rem_cid();
3571 }
3572 }
3573 Frame::NewToken(NewToken { token }) => {
3574 let ConnectionSide::Client {
3575 token_store,
3576 server_name,
3577 ..
3578 } = &self.side
3579 else {
3580 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
3581 };
3582 if token.is_empty() {
3583 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
3584 }
3585 trace!("got new token");
3586 token_store.insert(server_name, token);
3587 }
3588 Frame::Datagram(datagram) => {
3589 let result = self
3590 .datagrams
3591 .received(datagram, &self.config.datagram_receive_buffer_size)?;
3592 if result.was_empty {
3593 self.events.push_back(Event::DatagramReceived);
3594 }
3595 if result.dropped_count > 0 {
3596 let drop_counts = DatagramDropStats {
3597 datagrams: result.dropped_count as u64,
3598 bytes: result.dropped_bytes as u64,
3599 };
3600 self.stats
3601 .datagram_drops
3602 .record(drop_counts.datagrams, drop_counts.bytes);
3603 self.events.push_back(Event::DatagramDropped(drop_counts));
3604 }
3605 }
3606 Frame::AckFrequency(ack_frequency) => {
3607 let space = &mut self.spaces[SpaceId::Data];
3609
3610 if !self
3611 .ack_frequency
3612 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3613 {
3614 continue;
3616 }
3617
3618 if let Some(timeout) = space
3621 .pending_acks
3622 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3623 {
3624 self.timers.set(Timer::MaxAckDelay, timeout);
3625 }
3626 }
3627 Frame::ImmediateAck => {
3628 self.spaces[SpaceId::Data]
3630 .pending_acks
3631 .set_immediate_ack_required();
3632 }
3633 Frame::HandshakeDone => {
3634 if self.side.is_server() {
3635 return Err(TransportError::PROTOCOL_VIOLATION(
3636 "client sent HANDSHAKE_DONE",
3637 ));
3638 }
3639 if self.spaces[SpaceId::Handshake].crypto.is_some() {
3640 self.discard_space(now, SpaceId::Handshake);
3641 }
3642 }
3643 Frame::AddAddress(add_address) => {
3644 self.handle_add_address(&add_address, now)?;
3645 }
3646 Frame::PunchMeNow(punch_me_now) => {
3647 self.handle_punch_me_now(&punch_me_now, now)?;
3648 }
3649 Frame::RemoveAddress(remove_address) => {
3650 self.handle_remove_address(&remove_address)?;
3651 }
3652 Frame::ObservedAddress(observed_address) => {
3653 self.handle_observed_address_frame(&observed_address, now)?;
3654 }
3655 Frame::TryConnectTo(try_connect_to) => {
3656 self.handle_try_connect_to(&try_connect_to, now)?;
3657 }
3658 Frame::TryConnectToResponse(response) => {
3659 self.handle_try_connect_to_response(&response)?;
3660 }
3661 }
3662 }
3663
3664 let space = &mut self.spaces[SpaceId::Data];
3665 if space
3666 .pending_acks
3667 .packet_received(now, number, ack_eliciting, &space.dedup)
3668 {
3669 self.timers
3670 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3671 }
3672
3673 let pending = &mut self.spaces[SpaceId::Data].pending;
3678 self.streams.queue_max_stream_id(pending);
3679
3680 if let Some(reason) = close {
3681 self.error = Some(reason.into());
3682 self.state = State::Draining;
3683 self.close = true;
3684 }
3685
3686 if remote != self.path.remote
3687 && !is_probing_packet
3688 && number == self.spaces[SpaceId::Data].rx_packet
3689 {
3690 let ConnectionSide::Server { ref server_config } = self.side else {
3691 return Err(TransportError::PROTOCOL_VIOLATION(
3692 "packets from unknown remote should be dropped by clients",
3693 ));
3694 };
3695 debug_assert!(
3696 server_config.migration,
3697 "migration-initiating packets should have been dropped immediately"
3698 );
3699 self.migrate(now, remote);
3700 self.update_rem_cid();
3702 self.spin = false;
3703 }
3704
3705 Ok(())
3706 }
3707
3708 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3709 trace!(%remote, "migration initiated");
3710 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3714 PathData::from_previous(remote, &self.path, now)
3715 } else {
3716 let peer_max_udp_payload_size =
3717 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3718 .unwrap_or(u16::MAX);
3719 PathData::new(
3720 remote,
3721 self.allow_mtud,
3722 Some(peer_max_udp_payload_size),
3723 now,
3724 &self.config,
3725 )
3726 };
3727 new_path.challenge = Some(self.rng.r#gen());
3728 new_path.challenge_pending = true;
3729 let prev_pto = self.pto(SpaceId::Data);
3730
3731 let mut prev = mem::replace(&mut self.path, new_path);
3732 if prev.challenge.is_none() {
3734 prev.challenge = Some(self.rng.r#gen());
3735 prev.challenge_pending = true;
3736 self.prev_path = Some((self.rem_cids.active(), prev));
3739 }
3740
3741 self.timers.set(
3742 Timer::PathValidation,
3743 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3744 );
3745 }
3746
3747 pub fn local_address_changed(&mut self) {
3749 self.update_rem_cid();
3750 self.ping();
3751 }
3752
3753 pub fn migrate_to_nat_traversal_path(&mut self, now: Instant) -> Result<(), TransportError> {
3755 let (remote_addr, local_addr) = {
3757 let nat_state = self
3758 .nat_traversal
3759 .as_ref()
3760 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
3761
3762 let best_pairs = nat_state.get_best_succeeded_pairs();
3764 if best_pairs.is_empty() {
3765 return Err(TransportError::PROTOCOL_VIOLATION(
3766 "No validated NAT traversal paths",
3767 ));
3768 }
3769
3770 let best_path = best_pairs
3772 .iter()
3773 .find(|pair| pair.remote_addr != self.path.remote)
3774 .or_else(|| best_pairs.first());
3775
3776 let best_path = best_path.ok_or_else(|| {
3777 TransportError::PROTOCOL_VIOLATION("No suitable NAT traversal path")
3778 })?;
3779
3780 debug!(
3781 "Migrating to NAT traversal path: {} -> {} (priority: {})",
3782 self.path.remote, best_path.remote_addr, best_path.priority
3783 );
3784
3785 (best_path.remote_addr, best_path.local_addr)
3786 };
3787
3788 self.migrate(now, remote_addr);
3790
3791 if local_addr != SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0) {
3793 self.local_ip = Some(local_addr.ip());
3794 }
3795
3796 self.path.challenge_pending = true;
3798
3799 Ok(())
3800 }
3801
3802 fn update_rem_cid(&mut self) {
3804 let (reset_token, retired) = match self.rem_cids.next() {
3805 Some(x) => x,
3806 None => return,
3807 };
3808
3809 self.spaces[SpaceId::Data]
3811 .pending
3812 .retire_cids
3813 .extend(retired);
3814 self.set_reset_token(reset_token);
3815 }
3816
3817 fn set_reset_token(&mut self, reset_token: ResetToken) {
3818 self.endpoint_events
3819 .push_back(EndpointEventInner::ResetToken(
3820 self.path.remote,
3821 reset_token,
3822 ));
3823 self.peer_params.stateless_reset_token = Some(reset_token);
3824 }
3825
3826 fn handle_encode_error(&mut self, now: Instant, context: &'static str) {
3827 tracing::error!("VarInt overflow while encoding {context}");
3828 self.close_inner(
3829 now,
3830 Close::from(TransportError::INTERNAL_ERROR(
3831 "varint overflow during encoding",
3832 )),
3833 );
3834 }
3835
3836 fn encode_or_close(
3837 &mut self,
3838 now: Instant,
3839 result: Result<(), VarIntBoundsExceeded>,
3840 context: &'static str,
3841 ) -> bool {
3842 if result.is_err() {
3843 self.handle_encode_error(now, context);
3844 return false;
3845 }
3846 true
3847 }
3848
3849 fn issue_first_cids(&mut self, now: Instant) {
3851 if self.local_cid_state.cid_len() == 0 {
3852 return;
3853 }
3854
3855 let mut n = self.peer_params.issue_cids_limit() - 1;
3857 if let ConnectionSide::Server { server_config } = &self.side {
3858 if server_config.has_preferred_address() {
3859 n -= 1;
3861 }
3862 }
3863 self.endpoint_events
3864 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3865 }
3866
3867 fn populate_packet(
3868 &mut self,
3869 now: Instant,
3870 space_id: SpaceId,
3871 buf: &mut Vec<u8>,
3872 max_size: usize,
3873 pn: u64,
3874 ) -> SentFrames {
3875 let mut sent = SentFrames::default();
3876 let space = &mut self.spaces[space_id];
3877 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3878 space.pending_acks.maybe_ack_non_eliciting();
3879 macro_rules! encode_or_close {
3880 ($result:expr, $context:expr) => {{
3881 if $result.is_err() {
3882 drop(space);
3883 self.handle_encode_error(now, $context);
3884 return sent;
3885 }
3886 }};
3887 }
3888
3889 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3891 encode_or_close!(
3892 frame::FrameType::HANDSHAKE_DONE.try_encode(buf),
3893 "HANDSHAKE_DONE"
3894 );
3895 sent.retransmits.get_or_create().handshake_done = true;
3896 self.stats.frame_tx.handshake_done =
3898 self.stats.frame_tx.handshake_done.saturating_add(1);
3899 }
3900
3901 if mem::replace(&mut space.ping_pending, false) {
3903 trace!("PING");
3904 encode_or_close!(frame::FrameType::PING.try_encode(buf), "PING");
3905 sent.non_retransmits = true;
3906 self.stats.frame_tx.ping += 1;
3907 }
3908
3909 if mem::replace(&mut space.immediate_ack_pending, false) {
3911 trace!("IMMEDIATE_ACK");
3912 encode_or_close!(
3913 frame::FrameType::IMMEDIATE_ACK.try_encode(buf),
3914 "IMMEDIATE_ACK"
3915 );
3916 sent.non_retransmits = true;
3917 self.stats.frame_tx.immediate_ack += 1;
3918 }
3919
3920 if space.pending_acks.can_send() {
3922 let ack_result = Self::populate_acks(
3923 now,
3924 self.receiving_ecn,
3925 &mut sent,
3926 space,
3927 buf,
3928 &mut self.stats,
3929 );
3930 encode_or_close!(ack_result, "ACK");
3931 }
3932
3933 if mem::replace(&mut space.pending.ack_frequency, false) {
3935 let sequence_number = self.ack_frequency.next_sequence_number();
3936
3937 let config = self.config.ack_frequency_config.as_ref().unwrap();
3939
3940 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3942 self.path.rtt.get(),
3943 config,
3944 &self.peer_params,
3945 );
3946
3947 trace!(?max_ack_delay, "ACK_FREQUENCY");
3948
3949 encode_or_close!(
3950 (frame::AckFrequency {
3951 sequence: sequence_number,
3952 ack_eliciting_threshold: config.ack_eliciting_threshold,
3953 request_max_ack_delay: max_ack_delay
3954 .as_micros()
3955 .try_into()
3956 .unwrap_or(VarInt::MAX),
3957 reordering_threshold: config.reordering_threshold,
3958 })
3959 .try_encode(buf),
3960 "ACK_FREQUENCY"
3961 );
3962
3963 sent.retransmits.get_or_create().ack_frequency = true;
3964
3965 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3966 self.stats.frame_tx.ack_frequency += 1;
3967 }
3968
3969 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3971 if let Some(token) = self.path.challenge {
3973 self.path.challenge_pending = false;
3975 sent.non_retransmits = true;
3976 sent.requires_padding = true;
3977 trace!("PATH_CHALLENGE {:08x}", token);
3978 encode_or_close!(
3979 frame::FrameType::PATH_CHALLENGE.try_encode(buf),
3980 "PATH_CHALLENGE"
3981 );
3982 buf.write(token);
3983 self.stats.frame_tx.path_challenge += 1;
3984 }
3985
3986 }
3989
3990 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3992 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3993 sent.non_retransmits = true;
3994 sent.requires_padding = true;
3995 trace!("PATH_RESPONSE {:08x}", token);
3996 encode_or_close!(
3997 frame::FrameType::PATH_RESPONSE.try_encode(buf),
3998 "PATH_RESPONSE"
3999 );
4000 buf.write(token);
4001 self.stats.frame_tx.path_response += 1;
4002 }
4003 }
4004
4005 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
4007 let mut frame = match space.pending.crypto.pop_front() {
4008 Some(x) => x,
4009 None => break,
4010 };
4011
4012 let max_crypto_data_size = max_size
4017 - buf.len()
4018 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
4020 - 2; let available_space = max_size - buf.len();
4024 let remaining_data = frame.data.len();
4025 let optimal_size = self
4026 .pqc_state
4027 .calculate_crypto_frame_size(available_space, remaining_data);
4028
4029 let len = frame
4030 .data
4031 .len()
4032 .min(2usize.pow(14) - 1)
4033 .min(max_crypto_data_size)
4034 .min(optimal_size);
4035
4036 let data = frame.data.split_to(len);
4037 let truncated = frame::Crypto {
4038 offset: frame.offset,
4039 data,
4040 };
4041 trace!(
4042 "CRYPTO: off {} len {}",
4043 truncated.offset,
4044 truncated.data.len()
4045 );
4046 encode_or_close!(truncated.try_encode(buf), "CRYPTO");
4047 self.stats.frame_tx.crypto += 1;
4048 sent.retransmits.get_or_create().crypto.push_back(truncated);
4049 if !frame.data.is_empty() {
4050 frame.offset += len as u64;
4051 space.pending.crypto.push_front(frame);
4052 }
4053 }
4054
4055 if space_id == SpaceId::Data {
4056 let control_result = self.streams.write_control_frames(
4057 buf,
4058 &mut space.pending,
4059 &mut sent.retransmits,
4060 &mut self.stats.frame_tx,
4061 max_size,
4062 );
4063 encode_or_close!(control_result, "control frames");
4064 }
4065
4066 while buf.len() + 44 < max_size {
4068 let issued = match space.pending.new_cids.pop() {
4069 Some(x) => x,
4070 None => break,
4071 };
4072 trace!(
4073 sequence = issued.sequence,
4074 id = %issued.id,
4075 "NEW_CONNECTION_ID"
4076 );
4077 encode_or_close!(
4078 (frame::NewConnectionId {
4079 sequence: issued.sequence,
4080 retire_prior_to: self.local_cid_state.retire_prior_to(),
4081 id: issued.id,
4082 reset_token: issued.reset_token,
4083 })
4084 .try_encode(buf),
4085 "NEW_CONNECTION_ID"
4086 );
4087 sent.retransmits.get_or_create().new_cids.push(issued);
4088 self.stats.frame_tx.new_connection_id += 1;
4089 }
4090
4091 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
4093 let seq = match space.pending.retire_cids.pop() {
4094 Some(x) => x,
4095 None => break,
4096 };
4097 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
4098 encode_or_close!(
4099 frame::FrameType::RETIRE_CONNECTION_ID.try_encode(buf),
4100 "RETIRE_CONNECTION_ID"
4101 );
4102 encode_or_close!(buf.write_var(seq), "RETIRE_CONNECTION_ID seq");
4103 sent.retransmits.get_or_create().retire_cids.push(seq);
4104 self.stats.frame_tx.retire_connection_id += 1;
4105 }
4106
4107 let mut sent_datagrams = false;
4109 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4110 match self.datagrams.write(buf, max_size) {
4111 true => {
4112 sent_datagrams = true;
4113 sent.non_retransmits = true;
4114 self.stats.frame_tx.datagram += 1;
4115 }
4116 false => break,
4117 }
4118 }
4119 if self.datagrams.send_blocked && sent_datagrams {
4120 self.events.push_back(Event::DatagramsUnblocked);
4121 self.datagrams.send_blocked = false;
4122 }
4123
4124 while let Some(remote_addr) = space.pending.new_tokens.pop() {
4126 debug_assert_eq!(space_id, SpaceId::Data);
4127 let ConnectionSide::Server { server_config } = &self.side else {
4128 debug_assert!(false, "NEW_TOKEN frames should not be enqueued by clients");
4130 continue;
4131 };
4132
4133 if remote_addr != self.path.remote {
4134 continue;
4139 }
4140
4141 if self.delay_new_token_until_binding && self.peer_id_for_tokens.is_none() {
4144 space.pending.new_tokens.push(remote_addr);
4146 break;
4147 }
4148
4149 let token = match crate::token_v2::encode_validation_token_with_rng(
4150 &server_config.token_key,
4151 remote_addr.ip(),
4152 server_config.time_source.now(),
4153 &mut self.rng,
4154 ) {
4155 Ok(token) => token,
4156 Err(err) => {
4157 error!(?err, "failed to encode validation token");
4158 continue;
4159 }
4160 };
4161 let new_token = NewToken {
4162 token: token.into(),
4163 };
4164
4165 if buf.len() + new_token.size() >= max_size {
4166 space.pending.new_tokens.push(remote_addr);
4167 break;
4168 }
4169
4170 encode_or_close!(new_token.try_encode(buf), "NEW_TOKEN");
4171 sent.retransmits
4172 .get_or_create()
4173 .new_tokens
4174 .push(remote_addr);
4175 self.stats.frame_tx.new_token += 1;
4176 }
4177
4178 while buf.len() + frame::AddAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4180 let add_address = match space.pending.add_addresses.pop() {
4181 Some(x) => x,
4182 None => break,
4183 };
4184 trace!(
4185 sequence = %add_address.sequence,
4186 address = %add_address.address,
4187 "ADD_ADDRESS"
4188 );
4189 if self.nat_traversal_frame_config.use_rfc_format {
4191 encode_or_close!(add_address.try_encode_rfc(buf), "ADD_ADDRESS (rfc)");
4192 } else {
4193 encode_or_close!(add_address.try_encode_legacy(buf), "ADD_ADDRESS (legacy)");
4194 }
4195 sent.retransmits
4196 .get_or_create()
4197 .add_addresses
4198 .push(add_address);
4199 self.stats.frame_tx.add_address += 1;
4200 }
4201
4202 while buf.len() + frame::PunchMeNow::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4204 let punch_me_now = match space.pending.punch_me_now.pop() {
4205 Some(x) => x,
4206 None => break,
4207 };
4208 trace!(
4209 round = %punch_me_now.round,
4210 paired_with_sequence_number = %punch_me_now.paired_with_sequence_number,
4211 "PUNCH_ME_NOW"
4212 );
4213 if self.nat_traversal_frame_config.use_rfc_format {
4215 encode_or_close!(punch_me_now.try_encode_rfc(buf), "PUNCH_ME_NOW (rfc)");
4216 } else {
4217 encode_or_close!(punch_me_now.try_encode_legacy(buf), "PUNCH_ME_NOW (legacy)");
4218 }
4219 sent.retransmits
4220 .get_or_create()
4221 .punch_me_now
4222 .push(punch_me_now);
4223 self.stats.frame_tx.punch_me_now += 1;
4224 }
4225
4226 while buf.len() + frame::RemoveAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4228 let remove_address = match space.pending.remove_addresses.pop() {
4229 Some(x) => x,
4230 None => break,
4231 };
4232 trace!(
4233 sequence = %remove_address.sequence,
4234 "REMOVE_ADDRESS"
4235 );
4236 encode_or_close!(remove_address.try_encode(buf), "REMOVE_ADDRESS");
4238 sent.retransmits
4239 .get_or_create()
4240 .remove_addresses
4241 .push(remove_address);
4242 self.stats.frame_tx.remove_address += 1;
4243 }
4244
4245 while buf.len() + frame::ObservedAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data
4247 {
4248 let observed_address = match space.pending.outbound_observations.pop() {
4249 Some(x) => x,
4250 None => break,
4251 };
4252 info!(
4253 address = %observed_address.address,
4254 sequence = %observed_address.sequence_number,
4255 "populate_packet: ENCODING OBSERVED_ADDRESS into packet"
4256 );
4257 encode_or_close!(observed_address.try_encode(buf), "OBSERVED_ADDRESS");
4258 sent.retransmits
4259 .get_or_create()
4260 .outbound_observations
4261 .push(observed_address);
4262 self.stats.frame_tx.observed_address += 1;
4263 }
4264
4265 if space_id == SpaceId::Data {
4267 sent.stream_frames =
4268 self.streams
4269 .write_stream_frames(buf, max_size, self.config.send_fairness);
4270 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
4271 }
4272
4273 sent
4274 }
4275
4276 fn populate_acks(
4281 now: Instant,
4282 receiving_ecn: bool,
4283 sent: &mut SentFrames,
4284 space: &mut PacketSpace,
4285 buf: &mut Vec<u8>,
4286 stats: &mut ConnectionStats,
4287 ) -> Result<(), VarIntBoundsExceeded> {
4288 debug_assert!(!space.pending_acks.ranges().is_empty());
4289
4290 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
4292 let ecn = if receiving_ecn {
4293 Some(&space.ecn_counters)
4294 } else {
4295 None
4296 };
4297 sent.largest_acked = space.pending_acks.ranges().max();
4298
4299 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
4300
4301 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
4303 let delay = delay_micros >> ack_delay_exp.into_inner();
4304
4305 trace!(
4306 "ACK {:?}, Delay = {}us",
4307 space.pending_acks.ranges(),
4308 delay_micros
4309 );
4310
4311 frame::Ack::try_encode(delay as _, space.pending_acks.ranges(), ecn, buf)?;
4312 stats.frame_tx.acks += 1;
4313 Ok(())
4314 }
4315
4316 fn close_common(&mut self) {
4317 trace!("connection closed");
4318 for &timer in &Timer::VALUES {
4319 self.timers.stop(timer);
4320 }
4321 }
4322
4323 fn set_close_timer(&mut self, now: Instant) {
4324 self.timers
4325 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
4326 }
4327
4328 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
4330 if Some(self.orig_rem_cid) != params.initial_src_cid
4331 || (self.side.is_client()
4332 && (Some(self.initial_dst_cid) != params.original_dst_cid
4333 || self.retry_src_cid != params.retry_src_cid))
4334 {
4335 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
4336 "CID authentication failure",
4337 ));
4338 }
4339
4340 self.set_peer_params(params);
4341
4342 Ok(())
4343 }
4344
4345 fn set_peer_params(&mut self, params: TransportParameters) {
4346 self.streams.set_params(¶ms);
4347 self.idle_timeout =
4348 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
4349 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
4350 if let Some(ref info) = params.preferred_address {
4351 self.rem_cids.insert(frame::NewConnectionId {
4352 sequence: 1,
4353 id: info.connection_id,
4354 reset_token: info.stateless_reset_token,
4355 retire_prior_to: 0,
4356 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
4357 }
4358 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
4359
4360 self.negotiate_nat_traversal_capability(¶ms);
4362
4363 let local_has_nat_traversal = self.config.nat_traversal_config.is_some();
4366 let local_supports_rfc = local_has_nat_traversal;
4369 self.nat_traversal_frame_config = frame::nat_traversal_unified::NatTraversalFrameConfig {
4370 use_rfc_format: local_supports_rfc && params.supports_rfc_nat_traversal(),
4372 accept_legacy: true,
4374 };
4375
4376 self.negotiate_address_discovery(¶ms);
4378
4379 self.pqc_state.update_from_peer_params(¶ms);
4381
4382 if self.pqc_state.enabled && self.pqc_state.using_pqc {
4384 trace!("PQC enabled, adjusting MTU discovery for larger handshake packets");
4385 let current_mtu = self.path.mtud.current_mtu();
4389 if current_mtu < self.pqc_state.handshake_mtu {
4390 trace!(
4391 "Current MTU {} is less than PQC handshake MTU {}, will rely on MTU discovery",
4392 current_mtu, self.pqc_state.handshake_mtu
4393 );
4394 }
4395 }
4396
4397 self.peer_params = params;
4398 self.path.mtud.on_peer_max_udp_payload_size_received(
4399 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
4400 );
4401 }
4402
4403 fn negotiate_nat_traversal_capability(&mut self, params: &TransportParameters) {
4405 let peer_nat_config = match ¶ms.nat_traversal {
4407 Some(config) => config,
4408 None => {
4409 if self.config.nat_traversal_config.is_some() {
4411 debug!(
4412 "Peer does not support NAT traversal, maintaining backward compatibility"
4413 );
4414 self.emit_nat_traversal_capability_event(false);
4415
4416 self.set_nat_traversal_compatibility_mode(false);
4418 }
4419 return;
4420 }
4421 };
4422
4423 let local_nat_config = match &self.config.nat_traversal_config {
4425 Some(config) => config,
4426 None => {
4427 debug!("NAT traversal not enabled locally, ignoring peer support");
4428 self.emit_nat_traversal_capability_event(false);
4429 self.set_nat_traversal_compatibility_mode(false);
4430 return;
4431 }
4432 };
4433
4434 info!("Both peers support NAT traversal, negotiating capabilities");
4436
4437 match self.negotiate_nat_traversal_parameters(local_nat_config, peer_nat_config) {
4439 Ok(negotiated_config) => {
4440 info!("NAT traversal capability negotiated successfully");
4441 self.emit_nat_traversal_capability_event(true);
4442
4443 self.init_nat_traversal_with_negotiated_config(&negotiated_config);
4445
4446 self.set_nat_traversal_compatibility_mode(true);
4448
4449 if matches!(
4451 negotiated_config,
4452 crate::transport_parameters::NatTraversalConfig::ClientSupport
4453 ) {
4454 self.initiate_nat_traversal_process();
4455 }
4456 }
4457 Err(e) => {
4458 warn!("NAT traversal capability negotiation failed: {}", e);
4459 self.emit_nat_traversal_capability_event(false);
4460 self.set_nat_traversal_compatibility_mode(false);
4461 }
4462 }
4463 }
4464
4465 fn emit_nat_traversal_capability_event(&mut self, negotiated: bool) {
4467 if negotiated {
4470 info!("NAT traversal capability successfully negotiated");
4471 } else {
4472 info!("NAT traversal capability not available (peer or local support missing)");
4473 }
4474
4475 }
4478
4479 fn set_nat_traversal_compatibility_mode(&mut self, enabled: bool) {
4481 if enabled {
4482 debug!("NAT traversal enabled for this connection");
4483 } else {
4485 debug!("NAT traversal disabled for this connection (backward compatibility mode)");
4486 if self.nat_traversal.is_some() {
4488 warn!("Clearing NAT traversal state due to compatibility mode");
4489 self.nat_traversal = None;
4490 }
4491 }
4492 }
4493
4494 fn negotiate_nat_traversal_parameters(
4496 &self,
4497 local_config: &crate::transport_parameters::NatTraversalConfig,
4498 peer_config: &crate::transport_parameters::NatTraversalConfig,
4499 ) -> Result<crate::transport_parameters::NatTraversalConfig, String> {
4500 match (local_config, peer_config) {
4505 (
4507 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4508 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4509 concurrency_limit,
4510 },
4511 ) => Ok(
4512 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4513 concurrency_limit: *concurrency_limit,
4514 },
4515 ),
4516 (
4518 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4519 concurrency_limit,
4520 },
4521 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4522 ) => Ok(
4523 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4524 concurrency_limit: *concurrency_limit,
4525 },
4526 ),
4527 (
4529 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4530 concurrency_limit: limit1,
4531 },
4532 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4533 concurrency_limit: limit2,
4534 },
4535 ) => Ok(
4536 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4537 concurrency_limit: (*limit1).min(*limit2),
4538 },
4539 ),
4540 (
4542 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4543 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4544 ) => Err("Both endpoints claim to be NAT traversal clients".to_string()),
4545 }
4546 }
4547
4548 fn init_nat_traversal_with_negotiated_config(
4553 &mut self,
4554 _config: &crate::transport_parameters::NatTraversalConfig,
4555 ) {
4556 let max_candidates = 50; let coordination_timeout = Duration::from_secs(10); self.nat_traversal = Some(NatTraversalState::new(max_candidates, coordination_timeout));
4563
4564 trace!("NAT traversal initialized for symmetric P2P node");
4565
4566 self.prepare_address_observation();
4569 self.schedule_candidate_discovery();
4570 self.prepare_coordination_handling();
4571 }
4572
4573 fn initiate_nat_traversal_process(&mut self) {
4575 if let Some(nat_state) = &mut self.nat_traversal {
4576 match nat_state.start_candidate_discovery() {
4577 Ok(()) => {
4578 debug!("NAT traversal process initiated - candidate discovery started");
4579 self.timers.set(
4581 Timer::NatTraversal,
4582 Instant::now() + Duration::from_millis(100),
4583 );
4584 }
4585 Err(e) => {
4586 warn!("Failed to initiate NAT traversal process: {}", e);
4587 }
4588 }
4589 }
4590 }
4591
4592 fn prepare_address_observation(&mut self) {
4594 debug!("Preparing for address observation as bootstrap node");
4595 }
4598
4599 fn schedule_candidate_discovery(&mut self) {
4601 debug!("Scheduling candidate discovery for client endpoint");
4602 self.timers.set(
4604 Timer::NatTraversal,
4605 Instant::now() + Duration::from_millis(50),
4606 );
4607 }
4608
4609 fn prepare_coordination_handling(&mut self) {
4611 debug!("Preparing to handle coordination requests as server endpoint");
4612 }
4615
4616 fn handle_nat_traversal_timeout(&mut self, now: Instant) {
4618 let timeout_result = if let Some(nat_state) = &mut self.nat_traversal {
4620 nat_state.handle_timeout(now)
4621 } else {
4622 return;
4623 };
4624
4625 match timeout_result {
4627 Ok(actions) => {
4628 for action in actions {
4629 match action {
4630 nat_traversal::TimeoutAction::RetryDiscovery => {
4631 debug!("NAT traversal timeout: retrying candidate discovery");
4632 if let Some(nat_state) = &mut self.nat_traversal {
4633 if let Err(e) = nat_state.start_candidate_discovery() {
4634 warn!("Failed to retry candidate discovery: {}", e);
4635 }
4636 }
4637 }
4638 nat_traversal::TimeoutAction::RetryCoordination => {
4639 debug!("NAT traversal timeout: retrying coordination");
4640 self.timers
4642 .set(Timer::NatTraversal, now + Duration::from_secs(2));
4643 }
4644 nat_traversal::TimeoutAction::StartValidation => {
4645 debug!("NAT traversal timeout: starting path validation");
4646 self.start_nat_traversal_validation(now);
4647 }
4648 nat_traversal::TimeoutAction::Complete => {
4649 debug!("NAT traversal completed successfully");
4650 self.timers.stop(Timer::NatTraversal);
4652 }
4653 nat_traversal::TimeoutAction::Failed => {
4654 warn!("NAT traversal failed after timeout");
4655 self.handle_nat_traversal_failure();
4657 }
4658 }
4659 }
4660 }
4661 Err(e) => {
4662 warn!("NAT traversal timeout handling failed: {}", e);
4663 self.handle_nat_traversal_failure();
4664 }
4665 }
4666 }
4667
4668 fn start_nat_traversal_validation(&mut self, now: Instant) {
4670 if let Some(nat_state) = &mut self.nat_traversal {
4671 let pairs = nat_state.get_next_validation_pairs(3);
4673
4674 for pair in pairs {
4675 let challenge = self.rng.r#gen();
4677 self.path.challenge = Some(challenge);
4678 self.path.challenge_pending = true;
4679
4680 debug!(
4681 "Starting path validation for NAT traversal candidate: {}",
4682 pair.remote_addr
4683 );
4684 }
4685
4686 self.timers
4688 .set(Timer::PathValidation, now + Duration::from_secs(3));
4689 }
4690 }
4691
4692 fn handle_nat_traversal_failure(&mut self) {
4694 warn!("NAT traversal failed, considering fallback options");
4695
4696 self.nat_traversal = None;
4698 self.timers.stop(Timer::NatTraversal);
4699
4700 debug!("NAT traversal disabled for this connection due to failure");
4707 }
4708
4709 pub fn nat_traversal_supported(&self) -> bool {
4711 self.nat_traversal.is_some()
4712 && self.config.nat_traversal_config.is_some()
4713 && self.peer_params.nat_traversal.is_some()
4714 }
4715
4716 pub fn nat_traversal_config(&self) -> Option<&crate::transport_parameters::NatTraversalConfig> {
4718 self.peer_params.nat_traversal.as_ref()
4719 }
4720
4721 pub fn nat_traversal_ready(&self) -> bool {
4723 self.nat_traversal_supported() && matches!(self.state, State::Established)
4724 }
4725
4726 #[allow(dead_code)]
4731 pub(crate) fn nat_traversal_stats(&self) -> Option<nat_traversal::NatTraversalStats> {
4732 self.nat_traversal.as_ref().map(|state| state.stats.clone())
4733 }
4734
4735 #[cfg(test)]
4739 #[allow(dead_code)]
4740 pub(crate) fn force_enable_nat_traversal(&mut self) {
4741 use crate::transport_parameters::NatTraversalConfig;
4742
4743 let config = NatTraversalConfig::ServerSupport {
4745 concurrency_limit: VarInt::from_u32(5),
4746 };
4747
4748 self.peer_params.nat_traversal = Some(config.clone());
4749 self.config = Arc::new({
4750 let mut transport_config = (*self.config).clone();
4751 transport_config.nat_traversal_config = Some(config);
4752 transport_config
4753 });
4754
4755 self.nat_traversal = Some(NatTraversalState::new(8, Duration::from_secs(10)));
4757 }
4758
4759 fn handle_add_address(
4761 &mut self,
4762 add_address: &crate::frame::AddAddress,
4763 now: Instant,
4764 ) -> Result<(), TransportError> {
4765 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4766 TransportError::PROTOCOL_VIOLATION("AddAddress frame without NAT traversal negotiation")
4767 })?;
4768
4769 let normalized_addr = crate::shared::normalize_socket_addr(add_address.address);
4772
4773 info!(
4774 "handle_add_address: RECEIVED ADD_ADDRESS from peer addr={} (normalized={}) seq={} priority={}",
4775 add_address.address, normalized_addr, add_address.sequence, add_address.priority
4776 );
4777
4778 match nat_state.add_remote_candidate(
4779 add_address.sequence,
4780 normalized_addr,
4781 add_address.priority,
4782 now,
4783 ) {
4784 Ok(()) => {
4785 info!(
4786 "Added remote candidate: {} (seq={}, priority={})",
4787 normalized_addr, add_address.sequence, add_address.priority
4788 );
4789
4790 self.endpoint_events.push_back(
4792 crate::shared::EndpointEventInner::PeerAddressAdvertised {
4793 peer_addr: self.path.remote,
4794 advertised_addr: normalized_addr,
4795 },
4796 );
4797
4798 self.trigger_candidate_validation(normalized_addr, now)?;
4800 Ok(())
4801 }
4802 Err(NatTraversalError::TooManyCandidates) => Err(TransportError::PROTOCOL_VIOLATION(
4803 "too many NAT traversal candidates",
4804 )),
4805 Err(NatTraversalError::DuplicateAddress) => {
4806 Ok(())
4808 }
4809 Err(e) => {
4810 warn!("Failed to add remote candidate: {}", e);
4811 Ok(()) }
4813 }
4814 }
4815
4816 fn handle_punch_me_now(
4820 &mut self,
4821 punch_me_now: &crate::frame::PunchMeNow,
4822 now: Instant,
4823 ) -> Result<(), TransportError> {
4824 trace!(
4825 "Received PunchMeNow: round={}, target_seq={}, local_addr={}",
4826 punch_me_now.round, punch_me_now.paired_with_sequence_number, punch_me_now.address
4827 );
4828
4829 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4831 TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation")
4832 })?;
4833
4834 let target = nat_traversal::PunchTarget {
4837 remote_addr: punch_me_now.address,
4838 remote_sequence: punch_me_now.paired_with_sequence_number,
4839 challenge: self.rng.r#gen(),
4840 };
4841
4842 if let Err(_e) =
4843 nat_state.prime_passive_coordination_target(punch_me_now.round, target, now)
4844 {
4845 debug!(
4846 "Failed to prime passive coordination for round {}",
4847 punch_me_now.round
4848 );
4849 } else {
4850 trace!(
4851 "Passive coordination primed for round {}",
4852 punch_me_now.round
4853 );
4854 }
4855
4856 Ok(())
4857 }
4858
4859 fn handle_remove_address(
4861 &mut self,
4862 remove_address: &crate::frame::RemoveAddress,
4863 ) -> Result<(), TransportError> {
4864 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4865 TransportError::PROTOCOL_VIOLATION(
4866 "RemoveAddress frame without NAT traversal negotiation",
4867 )
4868 })?;
4869
4870 if nat_state.remove_candidate(remove_address.sequence) {
4871 trace!(
4872 "Removed candidate with sequence {}",
4873 remove_address.sequence
4874 );
4875 } else {
4876 trace!(
4877 "Attempted to remove unknown candidate sequence {}",
4878 remove_address.sequence
4879 );
4880 }
4881
4882 Ok(())
4883 }
4884
4885 fn handle_observed_address_frame(
4887 &mut self,
4888 observed_address: &crate::frame::ObservedAddress,
4889 now: Instant,
4890 ) -> Result<(), TransportError> {
4891 tracing::info!(
4892 address = %observed_address.address,
4893 sequence = %observed_address.sequence_number,
4894 from_peer = %self.peer_id_for_tokens.map(|pid| format!("{pid}")).unwrap_or_else(|| "unknown".to_string()),
4895 "handle_observed_address_frame: RECEIVED OBSERVED_ADDRESS from peer"
4896 );
4897 let state = self.address_discovery_state.as_mut().ok_or_else(|| {
4899 TransportError::PROTOCOL_VIOLATION(
4900 "ObservedAddress frame without address discovery negotiation",
4901 )
4902 })?;
4903
4904 if !state.enabled {
4906 return Err(TransportError::PROTOCOL_VIOLATION(
4907 "ObservedAddress frame received when address discovery is disabled",
4908 ));
4909 }
4910
4911 #[cfg(feature = "trace")]
4913 {
4914 use crate::trace_observed_address_received;
4915 let peer_bytes = self
4916 .peer_id_for_tokens
4917 .as_ref()
4918 .map(|pid| pid.0)
4919 .unwrap_or([0u8; 32]);
4920 trace_observed_address_received!(
4921 &self.event_log,
4922 self.trace_context.trace_id(),
4923 observed_address.address,
4924 0u64, peer_bytes
4926 );
4927 }
4928
4929 let path_id = 0u64; if let Some(&last_seq) = state.last_received_sequence.get(&path_id) {
4937 if observed_address.sequence_number <= last_seq {
4938 trace!(
4939 "Ignoring OBSERVED_ADDRESS frame with stale sequence number {} (last was {})",
4940 observed_address.sequence_number, last_seq
4941 );
4942 return Ok(());
4943 }
4944 }
4945
4946 state
4948 .last_received_sequence
4949 .insert(path_id, observed_address.sequence_number);
4950
4951 let normalized_addr = crate::shared::normalize_socket_addr(observed_address.address);
4954
4955 state.handle_observed_address(normalized_addr, path_id, now);
4957
4958 self.path.update_observed_address(normalized_addr, now);
4960
4961 trace!(
4963 "Received ObservedAddress frame: address={} for path={}",
4964 observed_address.address, path_id
4965 );
4966
4967 Ok(())
4968 }
4969
4970 fn handle_try_connect_to(
4975 &mut self,
4976 try_connect_to: &crate::frame::TryConnectTo,
4977 now: Instant,
4978 ) -> Result<(), TransportError> {
4979 trace!(
4980 "Received TryConnectTo: request_id={}, target={}, timeout_ms={}",
4981 try_connect_to.request_id, try_connect_to.target_address, try_connect_to.timeout_ms
4982 );
4983
4984 let target = try_connect_to.target_address;
4986
4987 let allow_loopback = allow_loopback_from_env();
4989 if target.ip().is_loopback() && !allow_loopback {
4990 warn!(
4991 "Rejecting TryConnectTo request to loopback address: {}",
4992 target
4993 );
4994 let response = crate::frame::TryConnectToResponse {
4996 request_id: try_connect_to.request_id,
4997 success: false,
4998 error_code: Some(crate::frame::TryConnectError::InvalidAddress),
4999 source_address: self.path.remote,
5000 };
5001 self.spaces[SpaceId::Data]
5002 .pending
5003 .try_connect_to_responses
5004 .push(response);
5005 return Ok(());
5006 }
5007
5008 if target.ip().is_unspecified() {
5010 warn!(
5011 "Rejecting TryConnectTo request to unspecified address: {}",
5012 target
5013 );
5014 let response = crate::frame::TryConnectToResponse {
5015 request_id: try_connect_to.request_id,
5016 success: false,
5017 error_code: Some(crate::frame::TryConnectError::InvalidAddress),
5018 source_address: self.path.remote,
5019 };
5020 self.spaces[SpaceId::Data]
5021 .pending
5022 .try_connect_to_responses
5023 .push(response);
5024 return Ok(());
5025 }
5026
5027 self.endpoint_events
5030 .push_back(EndpointEventInner::TryConnectTo {
5031 request_id: try_connect_to.request_id,
5032 target_address: try_connect_to.target_address,
5033 timeout_ms: try_connect_to.timeout_ms,
5034 requester_connection: self.path.remote,
5035 requested_at: now,
5036 });
5037
5038 trace!(
5039 "Queued TryConnectTo attempt for request_id={}",
5040 try_connect_to.request_id
5041 );
5042
5043 Ok(())
5044 }
5045
5046 fn handle_try_connect_to_response(
5048 &mut self,
5049 response: &crate::frame::TryConnectToResponse,
5050 ) -> Result<(), TransportError> {
5051 trace!(
5052 "Received TryConnectToResponse: request_id={}, success={}, error={:?}, source={}",
5053 response.request_id, response.success, response.error_code, response.source_address
5054 );
5055
5056 if response.success {
5059 debug!(
5060 "TryConnectTo succeeded: target can receive connections from {}",
5061 response.source_address
5062 );
5063
5064 if let Some(nat_state) = &mut self.nat_traversal {
5066 nat_state
5067 .record_successful_callback_probe(response.request_id, response.source_address);
5068 }
5069 } else {
5070 debug!("TryConnectTo failed with error: {:?}", response.error_code);
5071
5072 if let Some(nat_state) = &mut self.nat_traversal {
5074 nat_state.record_failed_callback_probe(response.request_id, response.error_code);
5075 }
5076 }
5077
5078 Ok(())
5079 }
5080
5081 pub fn queue_add_address(&mut self, sequence: VarInt, address: SocketAddr, priority: VarInt) {
5083 let add_address = frame::AddAddress {
5085 sequence,
5086 address,
5087 priority,
5088 };
5089
5090 self.spaces[SpaceId::Data]
5091 .pending
5092 .add_addresses
5093 .push(add_address);
5094 trace!(
5095 "Queued AddAddress frame: seq={}, addr={}, priority={}",
5096 sequence, address, priority
5097 );
5098 }
5099
5100 pub fn queue_punch_me_now(
5102 &mut self,
5103 round: VarInt,
5104 paired_with_sequence_number: VarInt,
5105 address: SocketAddr,
5106 ) {
5107 self.queue_punch_me_now_with_target(round, paired_with_sequence_number, address, None);
5108 }
5109
5110 pub fn queue_punch_me_now_with_target(
5122 &mut self,
5123 round: VarInt,
5124 paired_with_sequence_number: VarInt,
5125 address: SocketAddr,
5126 target_peer_id: Option<[u8; 32]>,
5127 ) {
5128 let punch_me_now = frame::PunchMeNow {
5129 round,
5130 paired_with_sequence_number,
5131 address,
5132 target_peer_id,
5133 };
5134
5135 self.spaces[SpaceId::Data]
5136 .pending
5137 .punch_me_now
5138 .push(punch_me_now);
5139
5140 if target_peer_id.is_some() {
5141 trace!(
5142 "Queued PunchMeNow frame for relay: round={}, target_seq={}, target_peer={:?}",
5143 round,
5144 paired_with_sequence_number,
5145 target_peer_id.map(|p| hex::encode(&p[..8]))
5146 );
5147 } else {
5148 trace!(
5149 "Queued PunchMeNow frame: round={}, target={}",
5150 round, paired_with_sequence_number
5151 );
5152 }
5153 }
5154
5155 pub fn queue_remove_address(&mut self, sequence: VarInt) {
5157 let remove_address = frame::RemoveAddress { sequence };
5158
5159 self.spaces[SpaceId::Data]
5160 .pending
5161 .remove_addresses
5162 .push(remove_address);
5163 trace!("Queued RemoveAddress frame: seq={}", sequence);
5164 }
5165
5166 pub fn queue_observed_address(&mut self, address: SocketAddr) {
5168 let sequence_number = if let Some(state) = &mut self.address_discovery_state {
5170 let seq = state.next_sequence_number;
5171 state.next_sequence_number =
5172 VarInt::from_u64(state.next_sequence_number.into_inner() + 1)
5173 .expect("sequence number overflow");
5174 seq
5175 } else {
5176 VarInt::from_u32(0)
5178 };
5179
5180 let observed_address = frame::ObservedAddress {
5181 sequence_number,
5182 address,
5183 };
5184 self.spaces[SpaceId::Data]
5185 .pending
5186 .outbound_observations
5187 .push(observed_address);
5188 trace!("Queued ObservedAddress frame: addr={}", address);
5189 }
5190
5191 pub fn check_for_address_observations(&mut self, now: Instant) {
5193 let Some(state) = &mut self.address_discovery_state else {
5195 return;
5196 };
5197
5198 if !state.enabled {
5200 return;
5201 }
5202
5203 if self.peer_params.address_discovery.is_none() {
5206 return;
5207 }
5208
5209 let path_id = 0u64; let remote_address = self.path.remote;
5214
5215 if state.should_send_observation(path_id, now) {
5217 if let Some(frame) = state.queue_observed_address_frame(path_id, remote_address) {
5219 self.spaces[SpaceId::Data]
5221 .pending
5222 .outbound_observations
5223 .push(frame);
5224
5225 state.record_observation_sent(path_id);
5227
5228 #[cfg(feature = "trace")]
5230 {
5231 use crate::trace_observed_address_sent;
5232 trace_observed_address_sent!(
5234 &self.event_log,
5235 self.trace_context.trace_id(),
5236 remote_address,
5237 path_id
5238 );
5239 }
5240
5241 trace!(
5242 "Queued OBSERVED_ADDRESS frame for path {} with address {}",
5243 path_id, remote_address
5244 );
5245 }
5246 }
5247 }
5248
5249 fn trigger_candidate_validation(
5251 &mut self,
5252 candidate_address: SocketAddr,
5253 now: Instant,
5254 ) -> Result<(), TransportError> {
5255 let nat_state = self
5256 .nat_traversal
5257 .as_mut()
5258 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5259
5260 let already_validating = nat_state.active_validations.values().any(|v| {
5263 crate::shared::normalize_socket_addr(v.target_addr)
5264 == crate::shared::normalize_socket_addr(candidate_address)
5265 });
5266 if already_validating {
5267 trace!("Validation already in progress for {}", candidate_address);
5268 return Ok(());
5269 }
5270
5271 let sequence = nat_state
5273 .remote_candidates
5274 .iter()
5275 .find(|(_, c)| {
5276 crate::shared::normalize_socket_addr(c.address)
5277 == crate::shared::normalize_socket_addr(candidate_address)
5278 })
5279 .map(|(seq, _)| *seq)
5280 .unwrap_or(crate::VarInt::from_u32(0));
5281
5282 let challenge = self.rng.r#gen::<u64>();
5284
5285 let validation_state = nat_traversal::PathValidationState {
5287 challenge,
5288 sequence,
5289 target_addr: candidate_address,
5290 sent_at: now,
5291 retry_count: 0,
5292 max_retries: 3,
5293 coordination_round: None,
5294 timeout_state: nat_traversal::AdaptiveTimeoutState::new(),
5295 last_retry_at: None,
5296 };
5297
5298 nat_state
5300 .active_validations
5301 .insert(challenge, validation_state);
5302
5303 nat_state.stats.validations_succeeded += 1; trace!(
5309 "Triggered PATH_CHALLENGE validation for {} with challenge {:016x}",
5310 candidate_address, challenge
5311 );
5312
5313 Ok(())
5314 }
5315
5316 pub fn nat_traversal_state(&self) -> Option<(usize, usize)> {
5321 self.nat_traversal
5322 .as_ref()
5323 .map(|state| (state.local_candidates.len(), state.remote_candidates.len()))
5324 }
5325
5326 pub fn initiate_nat_traversal_coordination(
5328 &mut self,
5329 now: Instant,
5330 ) -> Result<(), TransportError> {
5331 let nat_state = self
5332 .nat_traversal
5333 .as_mut()
5334 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5335
5336 if nat_state.should_send_punch_request() {
5338 nat_state.generate_candidate_pairs(now);
5340
5341 let pairs = nat_state.get_next_validation_pairs(3);
5343 if pairs.is_empty() {
5344 return Err(TransportError::PROTOCOL_VIOLATION(
5345 "No candidate pairs for coordination",
5346 ));
5347 }
5348
5349 let targets: Vec<_> = pairs
5351 .into_iter()
5352 .map(|pair| nat_traversal::PunchTarget {
5353 remote_addr: pair.remote_addr,
5354 remote_sequence: pair.remote_sequence,
5355 challenge: self.rng.r#gen(),
5356 })
5357 .collect();
5358
5359 let round = nat_state
5361 .start_coordination_round(targets, now)
5362 .map_err(|_e| {
5363 TransportError::PROTOCOL_VIOLATION("Failed to start coordination round")
5364 })?;
5365
5366 let local_addr = self
5369 .local_ip
5370 .map(|ip| SocketAddr::new(ip, self.local_ip.map(|_| 0).unwrap_or(0)))
5371 .unwrap_or_else(|| {
5372 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
5373 });
5374
5375 let punch_me_now = frame::PunchMeNow {
5376 round,
5377 paired_with_sequence_number: VarInt::from_u32(0), address: local_addr,
5379 target_peer_id: None, };
5381
5382 self.spaces[SpaceId::Data]
5383 .pending
5384 .punch_me_now
5385 .push(punch_me_now);
5386 nat_state.mark_punch_request_sent();
5387
5388 trace!("Initiated NAT traversal coordination round {}", round);
5389 }
5390
5391 Ok(())
5392 }
5393
5394 pub fn validate_nat_candidates(&mut self, now: Instant) {
5396 self.generate_nat_traversal_challenges(now);
5397 }
5398
5399 pub fn send_nat_address_advertisement(
5414 &mut self,
5415 address: SocketAddr,
5416 priority: u32,
5417 ) -> Result<u64, ConnectionError> {
5418 let normalized_addr = crate::shared::normalize_socket_addr(address);
5421
5422 if !is_valid_nat_advertisement_address(normalized_addr) {
5423 debug!(
5424 "Skipping NAT address advertisement for invalid candidate {}",
5425 normalized_addr
5426 );
5427 return Err(ConnectionError::TransportError(
5428 TransportError::PROTOCOL_VIOLATION("invalid NAT candidate address"),
5429 ));
5430 }
5431
5432 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5434 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5435 "NAT traversal not enabled on this connection",
5436 ))
5437 })?;
5438
5439 let sequence = nat_state.next_sequence;
5441 nat_state.next_sequence =
5442 VarInt::from_u64(nat_state.next_sequence.into_inner() + 1).unwrap();
5443
5444 let now = Instant::now();
5446 nat_state.local_candidates.insert(
5447 sequence,
5448 nat_traversal::AddressCandidate {
5449 address: normalized_addr,
5450 priority,
5451 source: nat_traversal::CandidateSource::Local,
5452 discovered_at: now,
5453 state: nat_traversal::CandidateState::New,
5454 attempt_count: 0,
5455 last_attempt: None,
5456 },
5457 );
5458
5459 nat_state.stats.local_candidates_sent += 1;
5461
5462 self.queue_add_address(sequence, normalized_addr, VarInt::from_u32(priority));
5464
5465 debug!(
5466 "Queued ADD_ADDRESS frame: addr={} (normalized from {}), priority={}, seq={}",
5467 normalized_addr, address, priority, sequence
5468 );
5469 Ok(sequence.into_inner())
5470 }
5471
5472 pub fn send_nat_punch_coordination(
5485 &mut self,
5486 paired_with_sequence_number: u64,
5487 address: SocketAddr,
5488 round: u32,
5489 ) -> Result<(), ConnectionError> {
5490 let _nat_state = self.nat_traversal.as_ref().ok_or_else(|| {
5492 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5493 "NAT traversal not enabled on this connection",
5494 ))
5495 })?;
5496
5497 self.queue_punch_me_now(
5499 VarInt::from_u32(round),
5500 VarInt::from_u64(paired_with_sequence_number).map_err(|_| {
5501 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5502 "Invalid target sequence number",
5503 ))
5504 })?,
5505 address,
5506 );
5507
5508 debug!(
5509 "Queued PUNCH_ME_NOW frame: paired_with_seq={}, addr={}, round={}",
5510 paired_with_sequence_number, address, round
5511 );
5512 Ok(())
5513 }
5514
5515 pub fn send_nat_punch_via_relay(
5529 &mut self,
5530 target_peer_id: [u8; 32],
5531 our_address: SocketAddr,
5532 round: u32,
5533 ) -> Result<(), ConnectionError> {
5534 let _nat_state = self.nat_traversal.as_ref().ok_or_else(|| {
5536 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5537 "NAT traversal not enabled on this connection",
5538 ))
5539 })?;
5540
5541 self.queue_punch_me_now_with_target(
5543 VarInt::from_u32(round),
5544 VarInt::from_u32(0), our_address,
5546 Some(target_peer_id),
5547 );
5548
5549 info!(
5550 "Queued PUNCH_ME_NOW for relay: target_peer={}, our_addr={}, round={}",
5551 hex::encode(&target_peer_id[..8]),
5552 our_address,
5553 round
5554 );
5555 Ok(())
5556 }
5557
5558 pub fn send_nat_address_removal(&mut self, sequence: u64) -> Result<(), ConnectionError> {
5569 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5571 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5572 "NAT traversal not enabled on this connection",
5573 ))
5574 })?;
5575
5576 let sequence_varint = VarInt::from_u64(sequence).map_err(|_| {
5577 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5578 "Invalid sequence number",
5579 ))
5580 })?;
5581
5582 nat_state.local_candidates.remove(&sequence_varint);
5584
5585 self.queue_remove_address(sequence_varint);
5587
5588 debug!("Queued REMOVE_ADDRESS frame: seq={}", sequence);
5589 Ok(())
5590 }
5591
5592 #[allow(dead_code)]
5601 pub(crate) fn get_nat_traversal_stats(&self) -> Option<&nat_traversal::NatTraversalStats> {
5602 self.nat_traversal.as_ref().map(|state| &state.stats)
5603 }
5604
5605 pub fn is_nat_traversal_enabled(&self) -> bool {
5607 self.nat_traversal.is_some()
5608 }
5609
5610 fn negotiate_address_discovery(&mut self, peer_params: &TransportParameters) {
5614 let now = Instant::now();
5615
5616 info!(
5617 "negotiate_address_discovery: peer_params.address_discovery = {:?}",
5618 peer_params.address_discovery
5619 );
5620
5621 match &peer_params.address_discovery {
5623 Some(peer_config) => {
5624 info!("Peer supports address discovery: {:?}", peer_config);
5626 if let Some(state) = &mut self.address_discovery_state {
5627 if state.enabled {
5628 info!(
5631 "Address discovery negotiated successfully: rate={}, all_paths={}",
5632 state.max_observation_rate, state.observe_all_paths
5633 );
5634 } else {
5635 info!("Address discovery disabled locally, ignoring peer support");
5637 }
5638 } else {
5639 self.address_discovery_state =
5641 Some(AddressDiscoveryState::new(peer_config, now));
5642 info!("Address discovery initialized from peer config");
5643 }
5644 }
5645 _ => {
5646 warn!("Peer does NOT support address discovery (transport parameter not present)");
5648 if let Some(state) = &mut self.address_discovery_state {
5649 state.enabled = false;
5650 }
5651 }
5652 }
5653
5654 if let Some(state) = &self.address_discovery_state {
5656 if state.enabled {
5657 self.path.set_observation_rate(state.max_observation_rate);
5658 }
5659 }
5660 }
5661
5662 fn decrypt_packet(
5663 &mut self,
5664 now: Instant,
5665 packet: &mut Packet,
5666 ) -> Result<Option<u64>, Option<TransportError>> {
5667 let result = packet_crypto::decrypt_packet_body(
5668 packet,
5669 &self.spaces,
5670 self.zero_rtt_crypto.as_ref(),
5671 self.key_phase,
5672 self.prev_crypto.as_ref(),
5673 self.next_crypto.as_ref(),
5674 )?;
5675
5676 let result = match result {
5677 Some(r) => r,
5678 None => return Ok(None),
5679 };
5680
5681 if result.outgoing_key_update_acked {
5682 if let Some(prev) = self.prev_crypto.as_mut() {
5683 prev.end_packet = Some((result.number, now));
5684 self.set_key_discard_timer(now, packet.header.space());
5685 }
5686 }
5687
5688 if result.incoming_key_update {
5689 trace!("key update authenticated");
5690 self.update_keys(Some((result.number, now)), true);
5691 self.set_key_discard_timer(now, packet.header.space());
5692 }
5693
5694 Ok(Some(result.number))
5695 }
5696
5697 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5698 trace!("executing key update");
5699 let new = self
5703 .crypto
5704 .next_1rtt_keys()
5705 .expect("only called for `Data` packets");
5706 self.key_phase_size = new
5707 .local
5708 .confidentiality_limit()
5709 .saturating_sub(KEY_UPDATE_MARGIN);
5710 let old = mem::replace(
5711 &mut self.spaces[SpaceId::Data]
5712 .crypto
5713 .as_mut()
5714 .unwrap() .packet,
5716 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5717 );
5718 self.spaces[SpaceId::Data].sent_with_keys = 0;
5719 self.prev_crypto = Some(PrevCrypto {
5720 crypto: old,
5721 end_packet,
5722 update_unacked: remote,
5723 });
5724 self.key_phase = !self.key_phase;
5725 }
5726
5727 fn peer_supports_ack_frequency(&self) -> bool {
5728 self.peer_params.min_ack_delay.is_some()
5729 }
5730
5731 pub(crate) fn immediate_ack(&mut self) {
5736 self.spaces[self.highest_space].immediate_ack_pending = true;
5737 }
5738
5739 #[cfg(test)]
5741 #[allow(dead_code)]
5742 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5743 let (first_decode, remaining) = match &event.0 {
5744 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5745 first_decode,
5746 remaining,
5747 ..
5748 }) => (first_decode, remaining),
5749 _ => return None,
5750 };
5751
5752 if remaining.is_some() {
5753 panic!("Packets should never be coalesced in tests");
5754 }
5755
5756 let decrypted_header = packet_crypto::unprotect_header(
5757 first_decode.clone(),
5758 &self.spaces,
5759 self.zero_rtt_crypto.as_ref(),
5760 self.peer_params.stateless_reset_token,
5761 )?;
5762
5763 let mut packet = decrypted_header.packet?;
5764 packet_crypto::decrypt_packet_body(
5765 &mut packet,
5766 &self.spaces,
5767 self.zero_rtt_crypto.as_ref(),
5768 self.key_phase,
5769 self.prev_crypto.as_ref(),
5770 self.next_crypto.as_ref(),
5771 )
5772 .ok()?;
5773
5774 Some(packet.payload.to_vec())
5775 }
5776
5777 #[cfg(test)]
5780 #[allow(dead_code)]
5781 pub(crate) fn bytes_in_flight(&self) -> u64 {
5782 self.path.in_flight.bytes
5783 }
5784
5785 #[cfg(test)]
5787 #[allow(dead_code)]
5788 pub(crate) fn congestion_window(&self) -> u64 {
5789 self.path
5790 .congestion
5791 .window()
5792 .saturating_sub(self.path.in_flight.bytes)
5793 }
5794
5795 #[cfg(test)]
5797 #[allow(dead_code)]
5798 pub(crate) fn is_idle(&self) -> bool {
5799 Timer::VALUES
5800 .iter()
5801 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
5802 .filter_map(|&t| Some((t, self.timers.get(t)?)))
5803 .min_by_key(|&(_, time)| time)
5804 .is_none_or(|(timer, _)| timer == Timer::Idle)
5805 }
5806
5807 #[cfg(test)]
5809 #[allow(dead_code)]
5810 pub(crate) fn lost_packets(&self) -> u64 {
5811 self.lost_packets
5812 }
5813
5814 #[cfg(test)]
5816 #[allow(dead_code)]
5817 pub(crate) fn using_ecn(&self) -> bool {
5818 self.path.sending_ecn
5819 }
5820
5821 #[cfg(test)]
5823 #[allow(dead_code)]
5824 pub(crate) fn total_recvd(&self) -> u64 {
5825 self.path.total_recvd
5826 }
5827
5828 #[cfg(test)]
5829 #[allow(dead_code)]
5830 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
5831 self.local_cid_state.active_seq()
5832 }
5833
5834 #[cfg(test)]
5837 #[allow(dead_code)]
5838 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
5839 let n = self.local_cid_state.assign_retire_seq(v);
5840 self.endpoint_events
5841 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
5842 }
5843
5844 #[cfg(test)]
5846 #[allow(dead_code)]
5847 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
5848 self.rem_cids.active_seq()
5849 }
5850
5851 #[cfg(test)]
5853 #[cfg(test)]
5854 #[allow(dead_code)]
5855 pub(crate) fn path_mtu(&self) -> u16 {
5856 self.path.current_mtu()
5857 }
5858
5859 fn can_send_1rtt(&self, max_size: usize) -> bool {
5863 self.streams.can_send_stream_data()
5864 || self.path.challenge_pending
5865 || self
5866 .prev_path
5867 .as_ref()
5868 .is_some_and(|(_, x)| x.challenge_pending)
5869 || !self.path_responses.is_empty()
5870 || self
5871 .datagrams
5872 .outgoing
5873 .front()
5874 .is_some_and(|x| x.size(true) <= max_size)
5875 }
5876
5877 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
5879 for path in [&mut self.path]
5881 .into_iter()
5882 .chain(self.prev_path.as_mut().map(|(_, data)| data))
5883 {
5884 if path.remove_in_flight(pn, packet) {
5885 return;
5886 }
5887 }
5888 }
5889
5890 fn kill(&mut self, reason: ConnectionError) {
5892 self.close_common();
5893 self.error = Some(reason);
5894 self.state = State::Drained;
5895 self.endpoint_events.push_back(EndpointEventInner::Drained);
5896 }
5897
5898 fn generate_nat_traversal_challenges(&mut self, now: Instant) {
5900 let candidates: Vec<(VarInt, SocketAddr)> = if let Some(nat_state) = &self.nat_traversal {
5902 nat_state
5903 .get_validation_candidates()
5904 .into_iter()
5905 .take(3) .map(|(seq, candidate)| (seq, candidate.address))
5907 .collect()
5908 } else {
5909 return;
5910 };
5911
5912 if candidates.is_empty() {
5913 return;
5914 }
5915
5916 if let Some(nat_state) = &mut self.nat_traversal {
5918 for (seq, address) in candidates {
5919 let challenge: u64 = self.rng.r#gen();
5921
5922 if let Err(e) = nat_state.start_validation(seq, challenge, now) {
5924 debug!("Failed to start validation for candidate {}: {}", seq, e);
5925 continue;
5926 }
5927
5928 trace!(
5930 "Started NAT validation for {} with token {:08x}",
5931 address, challenge
5932 );
5933 }
5934 }
5935 }
5936
5937 pub fn current_mtu(&self) -> u16 {
5941 self.path.current_mtu()
5942 }
5943
5944 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
5951 let pn_len = match pn {
5952 Some(pn) => PacketNumber::new(
5953 pn,
5954 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
5955 )
5956 .len(),
5957 None => 4,
5959 };
5960
5961 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
5963 }
5964
5965 fn tag_len_1rtt(&self) -> usize {
5966 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
5967 Some(crypto) => Some(&*crypto.packet.local),
5968 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
5969 };
5970 key.map_or(16, |x| x.tag_len())
5974 }
5975
5976 fn on_path_validated(&mut self) {
5978 self.path.validated = true;
5979 let ConnectionSide::Server { server_config } = &self.side else {
5980 return;
5981 };
5982 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
5983 new_tokens.clear();
5984 for _ in 0..server_config.validation_token.sent {
5985 new_tokens.push(self.path.remote);
5986 }
5987 }
5988}
5989
5990impl fmt::Debug for Connection {
5991 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5992 f.debug_struct("Connection")
5993 .field("handshake_cid", &self.handshake_cid)
5994 .finish()
5995 }
5996}
5997
5998enum ConnectionSide {
6000 Client {
6001 token: Bytes,
6003 token_store: Arc<dyn TokenStore>,
6004 server_name: String,
6005 },
6006 Server {
6007 server_config: Arc<ServerConfig>,
6008 },
6009}
6010
6011impl ConnectionSide {
6012 fn remote_may_migrate(&self) -> bool {
6013 match self {
6014 Self::Server { server_config } => server_config.migration,
6015 Self::Client { .. } => false,
6016 }
6017 }
6018
6019 fn is_client(&self) -> bool {
6020 self.side().is_client()
6021 }
6022
6023 fn is_server(&self) -> bool {
6024 self.side().is_server()
6025 }
6026
6027 fn side(&self) -> Side {
6028 match *self {
6029 Self::Client { .. } => Side::Client,
6030 Self::Server { .. } => Side::Server,
6031 }
6032 }
6033}
6034
6035impl From<SideArgs> for ConnectionSide {
6036 fn from(side: SideArgs) -> Self {
6037 match side {
6038 SideArgs::Client {
6039 token_store,
6040 server_name,
6041 } => Self::Client {
6042 token: token_store.take(&server_name).unwrap_or_default(),
6043 token_store,
6044 server_name,
6045 },
6046 SideArgs::Server {
6047 server_config,
6048 pref_addr_cid: _,
6049 path_validated: _,
6050 } => Self::Server { server_config },
6051 }
6052 }
6053}
6054
6055pub(crate) enum SideArgs {
6057 Client {
6058 token_store: Arc<dyn TokenStore>,
6059 server_name: String,
6060 },
6061 Server {
6062 server_config: Arc<ServerConfig>,
6063 pref_addr_cid: Option<ConnectionId>,
6064 path_validated: bool,
6065 },
6066}
6067
6068impl SideArgs {
6069 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6070 match *self {
6071 Self::Client { .. } => None,
6072 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6073 }
6074 }
6075
6076 pub(crate) fn path_validated(&self) -> bool {
6077 match *self {
6078 Self::Client { .. } => true,
6079 Self::Server { path_validated, .. } => path_validated,
6080 }
6081 }
6082
6083 pub(crate) fn side(&self) -> Side {
6084 match *self {
6085 Self::Client { .. } => Side::Client,
6086 Self::Server { .. } => Side::Server,
6087 }
6088 }
6089}
6090
6091fn is_valid_nat_advertisement_address(address: SocketAddr) -> bool {
6092 if address.port() == 0 {
6093 return false;
6094 }
6095
6096 match address.ip() {
6097 IpAddr::V4(ipv4) => !ipv4.is_unspecified() && !ipv4.is_broadcast() && !ipv4.is_multicast(),
6098 IpAddr::V6(ipv6) => !ipv6.is_unspecified() && !ipv6.is_multicast(),
6099 }
6100}
6101
6102#[derive(Debug, Error, Clone, PartialEq, Eq)]
6104pub enum ConnectionError {
6105 #[error("peer doesn't implement any supported version")]
6107 VersionMismatch,
6108 #[error(transparent)]
6110 TransportError(#[from] TransportError),
6111 #[error("aborted by peer: {0}")]
6113 ConnectionClosed(frame::ConnectionClose),
6114 #[error("closed by peer: {0}")]
6116 ApplicationClosed(frame::ApplicationClose),
6117 #[error("reset by peer")]
6119 Reset,
6120 #[error("timed out")]
6126 TimedOut,
6127 #[error("closed")]
6129 LocallyClosed,
6130 #[error("CIDs exhausted")]
6134 CidsExhausted,
6135}
6136
6137impl From<Close> for ConnectionError {
6138 fn from(x: Close) -> Self {
6139 match x {
6140 Close::Connection(reason) => Self::ConnectionClosed(reason),
6141 Close::Application(reason) => Self::ApplicationClosed(reason),
6142 }
6143 }
6144}
6145
6146impl From<ConnectionError> for io::Error {
6148 fn from(x: ConnectionError) -> Self {
6149 use ConnectionError::*;
6150 let kind = match x {
6151 TimedOut => io::ErrorKind::TimedOut,
6152 Reset => io::ErrorKind::ConnectionReset,
6153 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6154 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6155 io::ErrorKind::Other
6156 }
6157 };
6158 Self::new(kind, x)
6159 }
6160}
6161
6162#[derive(Clone, Debug)]
6163pub enum State {
6165 Handshake(state::Handshake),
6167 Established,
6169 Closed(state::Closed),
6171 Draining,
6173 Drained,
6175}
6176
6177impl State {
6178 fn closed<R: Into<Close>>(reason: R) -> Self {
6179 Self::Closed(state::Closed {
6180 reason: reason.into(),
6181 })
6182 }
6183
6184 fn is_handshake(&self) -> bool {
6185 matches!(*self, Self::Handshake(_))
6186 }
6187
6188 fn is_established(&self) -> bool {
6189 matches!(*self, Self::Established)
6190 }
6191
6192 fn is_closed(&self) -> bool {
6193 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
6194 }
6195
6196 fn is_drained(&self) -> bool {
6197 matches!(*self, Self::Drained)
6198 }
6199}
6200
6201mod state {
6202 use super::*;
6203
6204 #[derive(Clone, Debug)]
6205 pub struct Handshake {
6206 pub(super) rem_cid_set: bool,
6210 pub(super) expected_token: Bytes,
6214 pub(super) client_hello: Option<Bytes>,
6218 }
6219
6220 #[derive(Clone, Debug)]
6221 pub struct Closed {
6222 pub(super) reason: Close,
6223 }
6224}
6225
6226#[derive(Debug)]
6228pub enum Event {
6229 HandshakeDataReady,
6231 Connected,
6233 ConnectionLost {
6237 reason: ConnectionError,
6239 },
6240 Stream(StreamEvent),
6242 DatagramReceived,
6244 DatagramsUnblocked,
6246 DatagramDropped(DatagramDropStats),
6252}
6253
6254fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
6255 if x > y { x - y } else { Duration::ZERO }
6256}
6257
6258fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6259 Duration::from_micros(params.max_ack_delay.0 * 1000)
6260}
6261
6262const MAX_BACKOFF_EXPONENT: u32 = 16;
6264
6265const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6273
6274const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6280 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6281
6282const KEY_UPDATE_MARGIN: u64 = 10_000;
6286
6287#[derive(Default)]
6288struct SentFrames {
6289 retransmits: ThinRetransmits,
6290 largest_acked: Option<u64>,
6291 stream_frames: StreamMetaVec,
6292 non_retransmits: bool,
6294 requires_padding: bool,
6295}
6296
6297impl SentFrames {
6298 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6300 self.largest_acked.is_some()
6301 && !self.non_retransmits
6302 && self.stream_frames.is_empty()
6303 && self.retransmits.is_empty(streams)
6304 }
6305}
6306
6307fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6315 match (x, y) {
6316 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6317 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6318 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6319 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6320 }
6321}
6322
6323#[derive(Debug, Clone)]
6325pub(crate) struct PqcState {
6326 enabled: bool,
6328 #[allow(dead_code)]
6330 algorithms: Option<crate::transport_parameters::PqcAlgorithms>,
6331 handshake_mtu: u16,
6333 using_pqc: bool,
6335 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler,
6337}
6338
6339#[allow(dead_code)]
6340impl PqcState {
6341 fn new() -> Self {
6342 Self {
6343 enabled: false,
6344 algorithms: None,
6345 handshake_mtu: MIN_INITIAL_SIZE,
6346 using_pqc: false,
6347 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler::new(),
6348 }
6349 }
6350
6351 fn min_initial_size(&self) -> u16 {
6353 if self.enabled && self.using_pqc {
6354 std::cmp::max(self.handshake_mtu, 4096)
6356 } else {
6357 MIN_INITIAL_SIZE
6358 }
6359 }
6360
6361 fn update_from_peer_params(&mut self, params: &TransportParameters) {
6363 if let Some(ref algorithms) = params.pqc_algorithms {
6364 self.enabled = true;
6365 self.algorithms = Some(algorithms.clone());
6366 if algorithms.ml_kem_768 || algorithms.ml_dsa_65 {
6368 self.using_pqc = true;
6369 self.handshake_mtu = 4096; }
6371 }
6372 }
6373
6374 fn detect_pqc_from_crypto(&mut self, crypto_data: &[u8], space: SpaceId) {
6376 if !self.enabled {
6377 return;
6378 }
6379 if self.packet_handler.detect_pqc_handshake(crypto_data, space) {
6380 self.using_pqc = true;
6381 self.handshake_mtu = self.packet_handler.get_min_packet_size(space);
6383 }
6384 }
6385
6386 fn should_trigger_mtu_discovery(&mut self) -> bool {
6388 self.packet_handler.should_trigger_mtu_discovery()
6389 }
6390
6391 fn get_mtu_config(&self) -> MtuDiscoveryConfig {
6393 self.packet_handler.get_pqc_mtu_config()
6394 }
6395
6396 fn calculate_crypto_frame_size(&self, available_space: usize, remaining_data: usize) -> usize {
6398 self.packet_handler
6399 .calculate_crypto_frame_size(available_space, remaining_data)
6400 }
6401
6402 fn should_adjust_coalescing(&self, current_size: usize, space: SpaceId) -> bool {
6404 self.packet_handler
6405 .adjust_coalescing_for_pqc(current_size, space)
6406 }
6407
6408 fn on_packet_sent(&mut self, space: SpaceId, size: u16) {
6410 self.packet_handler.on_packet_sent(space, size);
6411 }
6412
6413 fn reset(&mut self) {
6415 self.enabled = false;
6416 self.algorithms = None;
6417 self.handshake_mtu = MIN_INITIAL_SIZE;
6418 self.using_pqc = false;
6419 self.packet_handler.reset();
6420 }
6421}
6422
6423impl Default for PqcState {
6424 fn default() -> Self {
6425 Self::new()
6426 }
6427}
6428
6429#[derive(Debug, Clone)]
6431pub(crate) struct AddressDiscoveryState {
6432 enabled: bool,
6434 max_observation_rate: u8,
6436 observe_all_paths: bool,
6438 sent_observations: std::collections::HashMap<u64, paths::PathAddressInfo>,
6440 received_observations: std::collections::HashMap<u64, paths::PathAddressInfo>,
6442 rate_limiter: AddressObservationRateLimiter,
6444 received_history: Vec<ObservedAddressEvent>,
6446 bootstrap_mode: bool,
6448 next_sequence_number: VarInt,
6450 last_received_sequence: std::collections::HashMap<u64, VarInt>,
6452 frames_sent: u64,
6454}
6455
6456#[derive(Debug, Clone, PartialEq, Eq)]
6458struct ObservedAddressEvent {
6459 address: SocketAddr,
6461 received_at: Instant,
6463 path_id: u64,
6465}
6466
6467#[derive(Debug, Clone)]
6469struct AddressObservationRateLimiter {
6470 tokens: f64,
6472 max_tokens: f64,
6474 rate: f64,
6476 last_update: Instant,
6478}
6479
6480#[allow(dead_code)]
6481impl AddressDiscoveryState {
6482 fn new(config: &crate::transport_parameters::AddressDiscoveryConfig, now: Instant) -> Self {
6484 use crate::transport_parameters::AddressDiscoveryConfig::*;
6485
6486 let (enabled, _can_send, _can_receive) = match config {
6488 SendOnly => (true, true, false),
6489 ReceiveOnly => (true, false, true),
6490 SendAndReceive => (true, true, true),
6491 };
6492
6493 let max_observation_rate = 10u8; let observe_all_paths = false; Self {
6499 enabled,
6500 max_observation_rate,
6501 observe_all_paths,
6502 sent_observations: std::collections::HashMap::new(),
6503 received_observations: std::collections::HashMap::new(),
6504 rate_limiter: AddressObservationRateLimiter::new(max_observation_rate, now),
6505 received_history: Vec::new(),
6506 bootstrap_mode: false,
6507 next_sequence_number: VarInt::from_u32(0),
6508 last_received_sequence: std::collections::HashMap::new(),
6509 frames_sent: 0,
6510 }
6511 }
6512
6513 fn should_send_observation(&mut self, path_id: u64, now: Instant) -> bool {
6515 if !self.should_observe_path(path_id) {
6517 return false;
6518 }
6519
6520 let needs_observation = match self.sent_observations.get(&path_id) {
6522 Some(info) => info.observed_address.is_none() || !info.notified,
6523 None => true,
6524 };
6525
6526 if !needs_observation {
6527 return false;
6528 }
6529
6530 self.rate_limiter.try_consume(1.0, now)
6532 }
6533
6534 fn record_observation_sent(&mut self, path_id: u64) {
6536 if let Some(info) = self.sent_observations.get_mut(&path_id) {
6537 info.mark_notified();
6538 }
6539 }
6540
6541 fn handle_observed_address(&mut self, address: SocketAddr, path_id: u64, now: Instant) {
6543 if !self.enabled {
6544 return;
6545 }
6546
6547 self.received_history.push(ObservedAddressEvent {
6548 address,
6549 received_at: now,
6550 path_id,
6551 });
6552
6553 let info = self
6555 .received_observations
6556 .entry(path_id)
6557 .or_insert_with(paths::PathAddressInfo::new);
6558 info.update_observed_address(address, now);
6559 }
6560
6561 pub(crate) fn get_observed_address(&self, path_id: u64) -> Option<SocketAddr> {
6563 self.received_observations
6564 .get(&path_id)
6565 .and_then(|info| info.observed_address)
6566 }
6567
6568 pub(crate) fn get_all_received_history(&self) -> Vec<SocketAddr> {
6570 self.received_observations
6571 .values()
6572 .filter_map(|info| info.observed_address)
6573 .collect()
6574 }
6575
6576 pub(crate) fn stats(&self) -> AddressDiscoveryStats {
6578 AddressDiscoveryStats {
6579 frames_sent: self.frames_sent,
6580 frames_received: self.received_history.len() as u64,
6581 addresses_discovered: self
6582 .received_observations
6583 .values()
6584 .filter(|info| info.observed_address.is_some())
6585 .count() as u64,
6586 address_changes_detected: 0, }
6588 }
6589
6590 fn has_unnotified_changes(&self) -> bool {
6596 let has_unsent = self
6598 .sent_observations
6599 .values()
6600 .any(|info| info.observed_address.is_some() && !info.notified);
6601
6602 let has_unreceived = self
6604 .received_observations
6605 .values()
6606 .any(|info| info.observed_address.is_some() && !info.notified);
6607
6608 has_unsent || has_unreceived
6609 }
6610
6611 fn queue_observed_address_frame(
6613 &mut self,
6614 path_id: u64,
6615 address: SocketAddr,
6616 ) -> Option<frame::ObservedAddress> {
6617 if !self.enabled {
6619 tracing::debug!("queue_observed_address_frame: BLOCKED - address discovery disabled");
6620 return None;
6621 }
6622
6623 if !self.observe_all_paths && path_id != 0 {
6625 tracing::debug!(
6626 "queue_observed_address_frame: BLOCKED - path {} not allowed (observe_all_paths={})",
6627 path_id,
6628 self.observe_all_paths
6629 );
6630 return None;
6631 }
6632
6633 if let Some(info) = self.sent_observations.get(&path_id) {
6635 if info.notified {
6636 tracing::trace!(
6637 "queue_observed_address_frame: BLOCKED - path {} already notified",
6638 path_id
6639 );
6640 return None;
6641 }
6642 }
6643
6644 if self.rate_limiter.tokens < 1.0 {
6646 tracing::debug!(
6647 "queue_observed_address_frame: BLOCKED - rate limited (tokens={})",
6648 self.rate_limiter.tokens
6649 );
6650 return None;
6651 }
6652
6653 tracing::info!(
6654 "queue_observed_address_frame: SENDING OBSERVED_ADDRESS to {} for path {}",
6655 address,
6656 path_id
6657 );
6658
6659 self.rate_limiter.tokens -= 1.0;
6661
6662 let info = self
6664 .sent_observations
6665 .entry(path_id)
6666 .or_insert_with(paths::PathAddressInfo::new);
6667 info.observed_address = Some(address);
6668 info.notified = true;
6669
6670 tracing::trace!(
6671 path_id = ?path_id,
6672 address = %address,
6673 "queue_observed_address_frame: queuing frame"
6674 );
6675
6676 let sequence_number = self.next_sequence_number;
6678 self.next_sequence_number = VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6679 .expect("sequence number overflow");
6680
6681 Some(frame::ObservedAddress {
6682 sequence_number,
6683 address,
6684 })
6685 }
6686
6687 fn check_for_address_observations(
6689 &mut self,
6690 _current_path: u64,
6691 peer_supports_address_discovery: bool,
6692 now: Instant,
6693 ) -> Vec<frame::ObservedAddress> {
6694 let mut frames = Vec::new();
6695
6696 if !self.enabled || !peer_supports_address_discovery {
6698 return frames;
6699 }
6700
6701 self.rate_limiter.update_tokens(now);
6703
6704 let paths_to_notify: Vec<u64> = self
6706 .sent_observations
6707 .iter()
6708 .filter_map(|(&path_id, info)| {
6709 if info.observed_address.is_some() && !info.notified {
6710 Some(path_id)
6711 } else {
6712 None
6713 }
6714 })
6715 .collect();
6716
6717 for path_id in paths_to_notify {
6719 if !self.should_observe_path(path_id) {
6721 continue;
6722 }
6723
6724 if !self.bootstrap_mode && self.rate_limiter.tokens < 1.0 {
6726 break; }
6728
6729 if let Some(info) = self.sent_observations.get_mut(&path_id) {
6731 if let Some(address) = info.observed_address {
6732 if self.bootstrap_mode {
6734 self.rate_limiter.tokens -= 0.2; } else {
6736 self.rate_limiter.tokens -= 1.0;
6737 }
6738
6739 info.notified = true;
6741
6742 let sequence_number = self.next_sequence_number;
6744 self.next_sequence_number =
6745 VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6746 .expect("sequence number overflow");
6747
6748 self.frames_sent += 1;
6749
6750 frames.push(frame::ObservedAddress {
6751 sequence_number,
6752 address,
6753 });
6754 }
6755 }
6756 }
6757
6758 frames
6759 }
6760
6761 fn update_rate_limit(&mut self, new_rate: f64) {
6763 self.max_observation_rate = new_rate as u8;
6764 self.rate_limiter.set_rate(new_rate as u8);
6765 }
6766
6767 fn from_transport_params(params: &TransportParameters) -> Option<Self> {
6769 params
6770 .address_discovery
6771 .as_ref()
6772 .map(|config| Self::new(config, Instant::now()))
6773 }
6774
6775 #[cfg(test)]
6777 fn new_with_params(enabled: bool, max_rate: f64, observe_all_paths: bool) -> Self {
6778 if !enabled {
6780 return Self {
6782 enabled: false,
6783 max_observation_rate: max_rate as u8,
6784 observe_all_paths,
6785 sent_observations: std::collections::HashMap::new(),
6786 received_observations: std::collections::HashMap::new(),
6787 rate_limiter: AddressObservationRateLimiter::new(max_rate as u8, Instant::now()),
6788 received_history: Vec::new(),
6789 bootstrap_mode: false,
6790 next_sequence_number: VarInt::from_u32(0),
6791 last_received_sequence: std::collections::HashMap::new(),
6792 frames_sent: 0,
6793 };
6794 }
6795
6796 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6798 let mut state = Self::new(&config, Instant::now());
6799 state.max_observation_rate = max_rate as u8;
6800 state.observe_all_paths = observe_all_paths;
6801 state.rate_limiter = AddressObservationRateLimiter::new(max_rate as u8, Instant::now());
6802 state
6803 }
6804
6805 fn set_bootstrap_mode(&mut self, enabled: bool) {
6807 self.bootstrap_mode = enabled;
6808 if enabled {
6810 let bootstrap_rate = self.get_effective_rate_limit();
6811 self.rate_limiter.rate = bootstrap_rate;
6812 self.rate_limiter.max_tokens = bootstrap_rate * 2.0; self.rate_limiter.tokens = self.rate_limiter.max_tokens;
6815 }
6816 }
6817
6818 fn is_bootstrap_mode(&self) -> bool {
6820 self.bootstrap_mode
6821 }
6822
6823 fn get_effective_rate_limit(&self) -> f64 {
6825 if self.bootstrap_mode {
6826 (self.max_observation_rate as f64) * 5.0
6828 } else {
6829 self.max_observation_rate as f64
6830 }
6831 }
6832
6833 fn should_observe_path(&self, path_id: u64) -> bool {
6835 if !self.enabled {
6836 return false;
6837 }
6838
6839 if self.bootstrap_mode {
6841 return true;
6842 }
6843
6844 self.observe_all_paths || path_id == 0
6846 }
6847
6848 fn should_send_observation_immediately(&self, is_new_connection: bool) -> bool {
6850 self.bootstrap_mode && is_new_connection
6851 }
6852}
6853
6854#[allow(dead_code)]
6855impl AddressObservationRateLimiter {
6856 fn new(rate: u8, now: Instant) -> Self {
6858 let rate_f64 = rate as f64;
6859 Self {
6860 tokens: rate_f64,
6861 max_tokens: rate_f64,
6862 rate: rate_f64,
6863 last_update: now,
6864 }
6865 }
6866
6867 fn try_consume(&mut self, tokens: f64, now: Instant) -> bool {
6869 self.update_tokens(now);
6870
6871 if self.tokens >= tokens {
6872 self.tokens -= tokens;
6873 true
6874 } else {
6875 false
6876 }
6877 }
6878
6879 fn update_tokens(&mut self, now: Instant) {
6881 let elapsed = now.saturating_duration_since(self.last_update);
6882 let new_tokens = elapsed.as_secs_f64() * self.rate;
6883 self.tokens = (self.tokens + new_tokens).min(self.max_tokens);
6884 self.last_update = now;
6885 }
6886
6887 fn set_rate(&mut self, rate: u8) {
6889 let rate_f64 = rate as f64;
6890 self.rate = rate_f64;
6891 self.max_tokens = rate_f64;
6892 if self.tokens > self.max_tokens {
6894 self.tokens = self.max_tokens;
6895 }
6896 }
6897}
6898
6899#[doc(hidden)]
6901pub fn address_discovery_burst_admissions_for_test(attempts: usize) -> usize {
6902 let now = Instant::now();
6903 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6904 let mut state = AddressDiscoveryState::new(&config, now);
6905 state.observe_all_paths = true;
6906
6907 (0..attempts)
6908 .filter(|attempt| {
6909 let octet = ((*attempt % 200) + 1) as u8;
6910 let port = 40000u16 + (*attempt % 1000) as u16;
6911 let address = SocketAddr::from(([93, 184, 216, octet], port));
6912 state
6913 .queue_observed_address_frame(*attempt as u64, address)
6914 .is_some()
6915 })
6916 .count()
6917}
6918
6919impl Connection {
6920 pub(crate) fn supports_ack_receive_v2(&self) -> bool {
6921 self.peer_params.ack_receive_v2
6922 }
6923}
6924
6925#[cfg(test)]
6926mod tests {
6927 use super::*;
6928 use crate::transport_parameters::AddressDiscoveryConfig;
6929 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
6930
6931 #[test]
6932 fn nat_advertisement_address_validation_rejects_unspecified_and_zero_port() {
6933 assert!(!is_valid_nat_advertisement_address(SocketAddr::new(
6934 IpAddr::V6(Ipv6Addr::UNSPECIFIED),
6935 5000,
6936 )));
6937 assert!(!is_valid_nat_advertisement_address(SocketAddr::new(
6938 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10)),
6939 0,
6940 )));
6941 assert!(is_valid_nat_advertisement_address(SocketAddr::new(
6942 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10)),
6943 5000,
6944 )));
6945 }
6946
6947 #[test]
6948 fn address_discovery_state_new() {
6949 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6950 let now = Instant::now();
6951 let state = AddressDiscoveryState::new(&config, now);
6952
6953 assert!(state.enabled);
6954 assert_eq!(state.max_observation_rate, 10);
6955 assert!(!state.observe_all_paths);
6956 assert!(state.sent_observations.is_empty());
6957 assert!(state.received_observations.is_empty());
6958 assert!(state.received_history.is_empty());
6959 assert_eq!(state.rate_limiter.tokens, 10.0);
6960 }
6961
6962 #[test]
6963 fn address_discovery_state_disabled() {
6964 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6965 let now = Instant::now();
6966 let mut state = AddressDiscoveryState::new(&config, now);
6967
6968 state.enabled = false;
6970
6971 assert!(!state.should_send_observation(0, now));
6973 }
6974
6975 #[test]
6976 fn address_discovery_state_should_send_observation() {
6977 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6978 let now = Instant::now();
6979 let mut state = AddressDiscoveryState::new(&config, now);
6980
6981 assert!(state.should_send_observation(0, now));
6983
6984 let mut path_info = paths::PathAddressInfo::new();
6986 path_info.update_observed_address(
6987 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6988 now,
6989 );
6990 path_info.mark_notified();
6991 state.sent_observations.insert(0, path_info);
6992
6993 assert!(!state.should_send_observation(0, now));
6995
6996 assert!(!state.should_send_observation(1, now));
6998 }
6999
7000 #[test]
7001 fn address_discovery_state_rate_limiting() {
7002 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7003 let now = Instant::now();
7004 let mut state = AddressDiscoveryState::new(&config, now);
7005
7006 state.observe_all_paths = true;
7008
7009 assert!(state.should_send_observation(0, now));
7011
7012 state.rate_limiter.try_consume(9.0, now); assert!(!state.should_send_observation(0, now));
7017
7018 let later = now + Duration::from_secs(1);
7020 state.rate_limiter.update_tokens(later);
7021 assert!(state.should_send_observation(0, later));
7022 }
7023
7024 #[test]
7025 fn address_discovery_state_handle_observed_address() {
7026 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7027 let now = Instant::now();
7028 let mut state = AddressDiscoveryState::new(&config, now);
7029
7030 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
7031 let addr2 = SocketAddr::new(
7032 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
7033 8080,
7034 );
7035
7036 state.handle_observed_address(addr1, 0, now);
7038 assert_eq!(state.received_history.len(), 1);
7039 assert_eq!(state.received_history[0].address, addr1);
7040 assert_eq!(state.received_history[0].path_id, 0);
7041
7042 let later = now + Duration::from_millis(100);
7044 state.handle_observed_address(addr2, 1, later);
7045 assert_eq!(state.received_history.len(), 2);
7046 assert_eq!(state.received_history[1].address, addr2);
7047 assert_eq!(state.received_history[1].path_id, 1);
7048 }
7049
7050 #[test]
7051 fn address_discovery_state_get_observed_address() {
7052 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7053 let now = Instant::now();
7054 let mut state = AddressDiscoveryState::new(&config, now);
7055
7056 assert_eq!(state.get_observed_address(0), None);
7058
7059 let mut path_info = paths::PathAddressInfo::new();
7061 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80);
7062 path_info.update_observed_address(addr, now);
7063 state.received_observations.insert(0, path_info);
7064
7065 assert_eq!(state.get_observed_address(0), Some(addr));
7067 assert_eq!(state.get_observed_address(1), None);
7068 }
7069
7070 #[test]
7071 fn address_discovery_state_unnotified_changes() {
7072 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7073 let now = Instant::now();
7074 let mut state = AddressDiscoveryState::new(&config, now);
7075
7076 assert!(!state.has_unnotified_changes());
7078
7079 let mut path_info = paths::PathAddressInfo::new();
7081 path_info.update_observed_address(
7082 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
7083 now,
7084 );
7085 state.sent_observations.insert(0, path_info);
7086
7087 assert!(state.has_unnotified_changes());
7089
7090 state.record_observation_sent(0);
7092 assert!(!state.has_unnotified_changes());
7093 }
7094
7095 #[test]
7096 fn address_observation_rate_limiter_token_bucket() {
7097 let now = Instant::now();
7098 let mut limiter = AddressObservationRateLimiter::new(5, now); assert_eq!(limiter.tokens, 5.0);
7102 assert_eq!(limiter.max_tokens, 5.0);
7103 assert_eq!(limiter.rate, 5.0);
7104
7105 assert!(limiter.try_consume(3.0, now));
7107 assert_eq!(limiter.tokens, 2.0);
7108
7109 assert!(!limiter.try_consume(3.0, now));
7111 assert_eq!(limiter.tokens, 2.0);
7112
7113 let later = now + Duration::from_secs(1);
7115 limiter.update_tokens(later);
7116 assert_eq!(limiter.tokens, 5.0); let half_sec = now + Duration::from_millis(500);
7120 let mut limiter2 = AddressObservationRateLimiter::new(5, now);
7121 limiter2.try_consume(3.0, now);
7122 limiter2.update_tokens(half_sec);
7123 assert_eq!(limiter2.tokens, 4.5); }
7125
7126 #[test]
7128 fn connection_initializes_address_discovery_state_default() {
7129 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
7132 let state = AddressDiscoveryState::new(&config, Instant::now());
7133 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
7136 }
7137
7138 #[test]
7139 fn connection_initializes_with_address_discovery_enabled() {
7140 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7142 let state = AddressDiscoveryState::new(&config, Instant::now());
7143 assert!(state.enabled);
7144 assert_eq!(state.max_observation_rate, 10);
7145 assert!(!state.observe_all_paths);
7146 }
7147
7148 #[test]
7149 fn connection_address_discovery_enabled_by_default() {
7150 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
7152 let state = AddressDiscoveryState::new(&config, Instant::now());
7153 assert!(state.enabled); }
7155
7156 #[test]
7157 fn negotiate_max_idle_timeout_commutative() {
7158 let test_params = [
7159 (None, None, None),
7160 (None, Some(VarInt(0)), None),
7161 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
7162 (Some(VarInt(0)), Some(VarInt(0)), None),
7163 (
7164 Some(VarInt(2)),
7165 Some(VarInt(0)),
7166 Some(Duration::from_millis(2)),
7167 ),
7168 (
7169 Some(VarInt(1)),
7170 Some(VarInt(4)),
7171 Some(Duration::from_millis(1)),
7172 ),
7173 ];
7174
7175 for (left, right, result) in test_params {
7176 assert_eq!(negotiate_max_idle_timeout(left, right), result);
7177 assert_eq!(negotiate_max_idle_timeout(right, left), result);
7178 }
7179 }
7180
7181 #[test]
7182 fn path_creation_initializes_address_discovery() {
7183 let config = TransportConfig::default();
7184 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7185 let now = Instant::now();
7186
7187 let path = paths::PathData::new(remote, false, None, now, &config);
7189
7190 assert!(path.address_info.observed_address.is_none());
7192 assert!(path.address_info.last_observed.is_none());
7193 assert_eq!(path.address_info.observation_count, 0);
7194 assert!(!path.address_info.notified);
7195
7196 assert_eq!(path.observation_rate_limiter.rate, 10.0);
7198 assert_eq!(path.observation_rate_limiter.max_tokens, 10.0);
7199 assert_eq!(path.observation_rate_limiter.tokens, 10.0);
7200 }
7201
7202 #[test]
7203 fn path_migration_resets_address_discovery() {
7204 let config = TransportConfig::default();
7205 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7206 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
7207 let now = Instant::now();
7208
7209 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
7211 path1.update_observed_address(remote1, now);
7212 path1.mark_address_notified();
7213 path1.consume_observation_token(now);
7214 path1.set_observation_rate(20);
7215
7216 let path2 = paths::PathData::from_previous(remote2, &path1, now);
7218
7219 assert!(path2.address_info.observed_address.is_none());
7221 assert!(path2.address_info.last_observed.is_none());
7222 assert_eq!(path2.address_info.observation_count, 0);
7223 assert!(!path2.address_info.notified);
7224
7225 assert_eq!(path2.observation_rate_limiter.rate, 20.0);
7227 assert_eq!(path2.observation_rate_limiter.tokens, 20.0);
7228 }
7229
7230 #[test]
7231 fn connection_path_updates_observation_rate() {
7232 let config = TransportConfig::default();
7233 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 42);
7234 let now = Instant::now();
7235
7236 let mut path = paths::PathData::new(remote, false, None, now, &config);
7237
7238 assert_eq!(path.observation_rate_limiter.rate, 10.0);
7240
7241 path.set_observation_rate(25);
7243 assert_eq!(path.observation_rate_limiter.rate, 25.0);
7244 assert_eq!(path.observation_rate_limiter.max_tokens, 25.0);
7245
7246 path.observation_rate_limiter.tokens = 30.0; path.set_observation_rate(20);
7249 assert_eq!(path.observation_rate_limiter.tokens, 20.0); }
7251
7252 #[test]
7253 fn path_validation_preserves_discovery_state() {
7254 let config = TransportConfig::default();
7255 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7256 let now = Instant::now();
7257
7258 let mut path = paths::PathData::new(remote, false, None, now, &config);
7259
7260 let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 5678);
7262 path.update_observed_address(observed, now);
7263 path.set_observation_rate(15);
7264
7265 path.validated = true;
7267
7268 assert_eq!(path.address_info.observed_address, Some(observed));
7270 assert_eq!(path.observation_rate_limiter.rate, 15.0);
7271 }
7272
7273 #[test]
7274 fn address_discovery_state_initialization() {
7275 let state = AddressDiscoveryState::new_with_params(true, 30.0, true);
7277
7278 assert!(state.enabled);
7279 assert_eq!(state.max_observation_rate, 30);
7280 assert!(state.observe_all_paths);
7281 assert!(state.sent_observations.is_empty());
7282 assert!(state.received_observations.is_empty());
7283 assert!(state.received_history.is_empty());
7284 }
7285
7286 #[test]
7288 fn handle_observed_address_frame_basic() {
7289 let config = AddressDiscoveryConfig::SendAndReceive;
7290 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7291 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7292 let now = Instant::now();
7293 let path_id = 0;
7294
7295 state.handle_observed_address(addr, path_id, now);
7297
7298 assert_eq!(state.received_history.len(), 1);
7300 assert_eq!(state.received_history[0].address, addr);
7301 assert_eq!(state.received_history[0].path_id, path_id);
7302 assert_eq!(state.received_history[0].received_at, now);
7303
7304 assert!(state.received_observations.contains_key(&path_id));
7306 let path_info = &state.received_observations[&path_id];
7307 assert_eq!(path_info.observed_address, Some(addr));
7308 assert_eq!(path_info.last_observed, Some(now));
7309 assert_eq!(path_info.observation_count, 1);
7310 }
7311
7312 #[test]
7313 fn handle_observed_address_frame_multiple_observations() {
7314 let config = AddressDiscoveryConfig::SendAndReceive;
7315 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7316 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7317 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
7318 let now = Instant::now();
7319 let path_id = 0;
7320
7321 state.handle_observed_address(addr1, path_id, now);
7323 state.handle_observed_address(addr1, path_id, now + Duration::from_secs(1));
7324 state.handle_observed_address(addr2, path_id, now + Duration::from_secs(2));
7325
7326 assert_eq!(state.received_history.len(), 3);
7328
7329 let path_info = &state.received_observations[&path_id];
7331 assert_eq!(path_info.observed_address, Some(addr2));
7332 assert_eq!(path_info.observation_count, 1); }
7334
7335 #[test]
7336 fn handle_observed_address_frame_disabled() {
7337 let config = AddressDiscoveryConfig::SendAndReceive;
7338 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7339 state.enabled = false; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7341 let now = Instant::now();
7342
7343 state.handle_observed_address(addr, 0, now);
7345
7346 assert!(state.received_history.is_empty());
7348 assert!(state.sent_observations.is_empty());
7349 assert!(state.received_observations.is_empty());
7350 }
7351
7352 #[test]
7353 fn should_send_observation_basic() {
7354 let config = AddressDiscoveryConfig::SendAndReceive;
7355 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7356 state.max_observation_rate = 10;
7357 let now = Instant::now();
7358 let path_id = 0;
7359
7360 assert!(state.should_send_observation(path_id, now));
7362
7363 state.record_observation_sent(path_id);
7365
7366 assert!(state.should_send_observation(path_id, now));
7368 }
7369
7370 #[test]
7371 fn should_send_observation_rate_limiting() {
7372 let config = AddressDiscoveryConfig::SendAndReceive;
7373 let now = Instant::now();
7374 let mut state = AddressDiscoveryState::new(&config, now);
7375 state.max_observation_rate = 2; state.update_rate_limit(2.0);
7377 let path_id = 0;
7378
7379 assert!(state.should_send_observation(path_id, now));
7381 state.record_observation_sent(path_id);
7382 assert!(state.should_send_observation(path_id, now));
7383 state.record_observation_sent(path_id);
7384
7385 assert!(!state.should_send_observation(path_id, now));
7387
7388 let later = now + Duration::from_secs(1);
7390 assert!(state.should_send_observation(path_id, later));
7391 }
7392
7393 #[test]
7394 fn should_send_observation_disabled() {
7395 let config = AddressDiscoveryConfig::SendAndReceive;
7396 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7397 state.enabled = false;
7398
7399 assert!(!state.should_send_observation(0, Instant::now()));
7401 }
7402
7403 #[test]
7404 fn should_send_observation_per_path() {
7405 let config = AddressDiscoveryConfig::SendAndReceive;
7406 let now = Instant::now();
7407 let mut state = AddressDiscoveryState::new(&config, now);
7408 state.max_observation_rate = 2; state.observe_all_paths = true;
7410 state.update_rate_limit(2.0);
7411
7412 assert!(state.should_send_observation(0, now));
7414 state.record_observation_sent(0);
7415
7416 assert!(state.should_send_observation(1, now));
7418 state.record_observation_sent(1);
7419
7420 assert!(!state.should_send_observation(0, now));
7422 assert!(!state.should_send_observation(1, now));
7423
7424 let later = now + Duration::from_secs(1);
7426 assert!(state.should_send_observation(0, later));
7427 }
7428
7429 #[test]
7430 fn has_unnotified_changes_test() {
7431 let config = AddressDiscoveryConfig::SendAndReceive;
7432 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7433 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7434 let now = Instant::now();
7435
7436 assert!(!state.has_unnotified_changes());
7438
7439 state.handle_observed_address(addr, 0, now);
7441 assert!(state.has_unnotified_changes());
7442
7443 state.received_observations.get_mut(&0).unwrap().notified = true;
7445 assert!(!state.has_unnotified_changes());
7446 }
7447
7448 #[test]
7449 fn get_observed_address_test() {
7450 let config = AddressDiscoveryConfig::SendAndReceive;
7451 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7452 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7453 let now = Instant::now();
7454 let path_id = 0;
7455
7456 assert_eq!(state.get_observed_address(path_id), None);
7458
7459 state.handle_observed_address(addr, path_id, now);
7461 assert_eq!(state.get_observed_address(path_id), Some(addr));
7462
7463 assert_eq!(state.get_observed_address(999), None);
7465 }
7466
7467 #[test]
7469 fn rate_limiter_token_bucket_basic() {
7470 let now = Instant::now();
7471 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(5.0, now));
7475 assert!(limiter.try_consume(5.0, now));
7476
7477 assert!(!limiter.try_consume(1.0, now));
7479 }
7480
7481 #[test]
7482 fn rate_limiter_token_replenishment() {
7483 let now = Instant::now();
7484 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(10.0, now));
7488 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7492 assert!(limiter.try_consume(10.0, later)); assert!(!limiter.try_consume(0.1, later)); let later = later + Duration::from_millis(500);
7497 assert!(limiter.try_consume(5.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7500
7501 #[test]
7502 fn rate_limiter_max_tokens_cap() {
7503 let now = Instant::now();
7504 let mut limiter = AddressObservationRateLimiter::new(10, now);
7505
7506 let later = now + Duration::from_secs(2);
7508 assert!(limiter.try_consume(10.0, later));
7510 assert!(!limiter.try_consume(10.1, later)); let later2 = later + Duration::from_secs(1);
7514 assert!(limiter.try_consume(3.0, later2));
7515
7516 let much_later = later2 + Duration::from_secs(2);
7518 assert!(limiter.try_consume(10.0, much_later)); assert!(!limiter.try_consume(0.1, much_later)); }
7521
7522 #[test]
7523 fn rate_limiter_fractional_consumption() {
7524 let now = Instant::now();
7525 let mut limiter = AddressObservationRateLimiter::new(10, now);
7526
7527 assert!(limiter.try_consume(0.5, now));
7529 assert!(limiter.try_consume(2.3, now));
7530 assert!(limiter.try_consume(7.2, now)); assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_millis(100); assert!(limiter.try_consume(1.0, later));
7536 assert!(!limiter.try_consume(0.1, later));
7537 }
7538
7539 #[test]
7540 fn rate_limiter_zero_rate() {
7541 let now = Instant::now();
7542 let mut limiter = AddressObservationRateLimiter::new(0, now); assert!(!limiter.try_consume(1.0, now));
7546 assert!(!limiter.try_consume(0.1, now));
7547 assert!(!limiter.try_consume(0.001, now));
7548
7549 let later = now + Duration::from_secs(10);
7551 assert!(!limiter.try_consume(0.001, later));
7552 }
7553
7554 #[test]
7555 fn rate_limiter_high_rate() {
7556 let now = Instant::now();
7557 let mut limiter = AddressObservationRateLimiter::new(63, now); assert!(limiter.try_consume(60.0, now));
7561 assert!(limiter.try_consume(3.0, now));
7562 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7566 assert!(limiter.try_consume(63.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7569
7570 #[test]
7571 fn rate_limiter_time_precision() {
7572 let now = Instant::now();
7573 let mut limiter = AddressObservationRateLimiter::new(100, now); assert!(limiter.try_consume(100.0, now));
7577 assert!(!limiter.try_consume(0.1, now));
7578
7579 let later = now + Duration::from_millis(10);
7581 assert!(limiter.try_consume(0.8, later)); assert!(!limiter.try_consume(0.5, later)); let much_later = later + Duration::from_millis(100); assert!(limiter.try_consume(5.0, much_later)); limiter.tokens = 0.0; let final_time = much_later + Duration::from_millis(1);
7593 limiter.update_tokens(final_time); assert!(limiter.tokens >= 0.09 && limiter.tokens <= 0.11);
7598 }
7599
7600 #[test]
7601 fn per_path_rate_limiting_independent() {
7602 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7603 let now = Instant::now();
7604 let mut state = AddressDiscoveryState::new(&config, now);
7605
7606 state.observe_all_paths = true;
7608
7609 state.update_rate_limit(5.0);
7611
7612 state
7614 .sent_observations
7615 .insert(0, paths::PathAddressInfo::new());
7616 state
7617 .sent_observations
7618 .insert(1, paths::PathAddressInfo::new());
7619 state
7620 .sent_observations
7621 .insert(2, paths::PathAddressInfo::new());
7622
7623 state
7625 .sent_observations
7626 .get_mut(&0)
7627 .unwrap()
7628 .observed_address = Some(SocketAddr::new(
7629 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7630 8080,
7631 ));
7632 state
7633 .sent_observations
7634 .get_mut(&1)
7635 .unwrap()
7636 .observed_address = Some(SocketAddr::new(
7637 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
7638 8081,
7639 ));
7640 state
7641 .sent_observations
7642 .get_mut(&2)
7643 .unwrap()
7644 .observed_address = Some(SocketAddr::new(
7645 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)),
7646 8082,
7647 ));
7648
7649 for _ in 0..3 {
7651 assert!(state.should_send_observation(0, now));
7652 state.record_observation_sent(0);
7653 state.sent_observations.get_mut(&0).unwrap().notified = false;
7655 }
7656
7657 for _ in 0..2 {
7659 assert!(state.should_send_observation(1, now));
7660 state.record_observation_sent(1);
7661 state.sent_observations.get_mut(&1).unwrap().notified = false;
7663 }
7664
7665 assert!(!state.should_send_observation(2, now));
7667
7668 let later = now + Duration::from_secs(1);
7670
7671 assert!(state.should_send_observation(0, later));
7673 assert!(state.should_send_observation(1, later));
7674 assert!(state.should_send_observation(2, later));
7675 }
7676
7677 #[test]
7678 fn per_path_rate_limiting_with_path_specific_limits() {
7679 let now = Instant::now();
7680 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7681 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7682 let config = TransportConfig::default();
7683
7684 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
7686 let mut path2 = paths::PathData::new(remote2, false, None, now, &config);
7687
7688 path1.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now); path2.observation_rate_limiter = paths::PathObservationRateLimiter::new(5, now); for _ in 0..10 {
7694 assert!(path1.observation_rate_limiter.can_send(now));
7695 path1.observation_rate_limiter.consume_token(now);
7696 }
7697 assert!(!path1.observation_rate_limiter.can_send(now));
7698
7699 for _ in 0..5 {
7701 assert!(path2.observation_rate_limiter.can_send(now));
7702 path2.observation_rate_limiter.consume_token(now);
7703 }
7704 assert!(!path2.observation_rate_limiter.can_send(now));
7705 }
7706
7707 #[test]
7708 fn per_path_rate_limiting_address_change_detection() {
7709 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7710 let now = Instant::now();
7711 let mut state = AddressDiscoveryState::new(&config, now);
7712
7713 let path_id = 0;
7715 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7716 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8080);
7717
7718 assert!(state.should_send_observation(path_id, now));
7720
7721 let frame = state.queue_observed_address_frame(path_id, addr1);
7723 assert!(frame.is_some());
7724
7725 assert!(!state.should_send_observation(path_id, now));
7727
7728 if let Some(info) = state.sent_observations.get_mut(&path_id) {
7730 info.notified = false;
7731 info.observed_address = Some(addr2);
7732 }
7733
7734 assert!(state.should_send_observation(path_id, now));
7736 }
7737
7738 #[test]
7739 fn per_path_rate_limiting_migration() {
7740 let now = Instant::now();
7741 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7742 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7743 let config = TransportConfig::default();
7744
7745 let mut path = paths::PathData::new(remote1, false, None, now, &config);
7747 path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7748
7749 for _ in 0..5 {
7751 assert!(path.observation_rate_limiter.can_send(now));
7752 path.observation_rate_limiter.consume_token(now);
7753 }
7754
7755 let mut new_path = paths::PathData::new(remote2, false, None, now, &config);
7757
7758 new_path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7761
7762 for _ in 0..10 {
7764 assert!(new_path.observation_rate_limiter.can_send(now));
7765 new_path.observation_rate_limiter.consume_token(now);
7766 }
7767 assert!(!new_path.observation_rate_limiter.can_send(now));
7768 }
7769
7770 #[test]
7771 fn per_path_rate_limiting_disabled_paths() {
7772 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7773 let now = Instant::now();
7774 let mut state = AddressDiscoveryState::new(&config, now);
7775
7776 assert!(state.should_send_observation(0, now));
7778
7779 assert!(!state.should_send_observation(1, now));
7781 assert!(!state.should_send_observation(2, now));
7782
7783 let later = now + Duration::from_secs(1);
7785 assert!(!state.should_send_observation(1, later));
7786 }
7787
7788 #[test]
7789 fn respecting_negotiated_max_observation_rate_basic() {
7790 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7791 let now = Instant::now();
7792 let mut state = AddressDiscoveryState::new(&config, now);
7793
7794 state.max_observation_rate = 10; state.rate_limiter = AddressObservationRateLimiter::new(10, now);
7797
7798 for _ in 0..10 {
7800 assert!(state.should_send_observation(0, now));
7801 }
7802 assert!(!state.should_send_observation(0, now));
7804 }
7805
7806 #[test]
7807 fn respecting_negotiated_max_observation_rate_zero() {
7808 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7809 let now = Instant::now();
7810 let mut state = AddressDiscoveryState::new(&config, now);
7811
7812 state.max_observation_rate = 0;
7814 state.rate_limiter = AddressObservationRateLimiter::new(0, now);
7815
7816 assert!(!state.should_send_observation(0, now));
7818 assert!(!state.should_send_observation(1, now));
7819
7820 let later = now + Duration::from_secs(10);
7822 assert!(!state.should_send_observation(0, later));
7823 }
7824
7825 #[test]
7826 fn respecting_negotiated_max_observation_rate_higher() {
7827 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7828 let now = Instant::now();
7829 let mut state = AddressDiscoveryState::new(&config, now);
7830
7831 state
7833 .sent_observations
7834 .insert(0, paths::PathAddressInfo::new());
7835 state
7836 .sent_observations
7837 .get_mut(&0)
7838 .unwrap()
7839 .observed_address = Some(SocketAddr::new(
7840 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7841 8080,
7842 ));
7843
7844 state.update_rate_limit(5.0);
7846
7847 state.max_observation_rate = 20; for _ in 0..5 {
7852 assert!(state.should_send_observation(0, now));
7853 state.record_observation_sent(0);
7854 state.sent_observations.get_mut(&0).unwrap().notified = false;
7856 }
7857 assert!(!state.should_send_observation(0, now));
7859 }
7860
7861 #[test]
7862 fn respecting_negotiated_max_observation_rate_dynamic_update() {
7863 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7864 let now = Instant::now();
7865 let mut state = AddressDiscoveryState::new(&config, now);
7866
7867 state
7869 .sent_observations
7870 .insert(0, paths::PathAddressInfo::new());
7871 state
7872 .sent_observations
7873 .get_mut(&0)
7874 .unwrap()
7875 .observed_address = Some(SocketAddr::new(
7876 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7877 8080,
7878 ));
7879
7880 for _ in 0..5 {
7882 assert!(state.should_send_observation(0, now));
7883 state.record_observation_sent(0);
7884 state.sent_observations.get_mut(&0).unwrap().notified = false;
7886 }
7887
7888 state.max_observation_rate = 3;
7892 state.rate_limiter.set_rate(3);
7893
7894 for _ in 0..3 {
7897 assert!(state.should_send_observation(0, now));
7898 state.record_observation_sent(0);
7899 state.sent_observations.get_mut(&0).unwrap().notified = false;
7901 }
7902
7903 assert!(!state.should_send_observation(0, now));
7905
7906 let later = now + Duration::from_secs(1);
7908 for _ in 0..3 {
7909 assert!(state.should_send_observation(0, later));
7910 state.record_observation_sent(0);
7911 state.sent_observations.get_mut(&0).unwrap().notified = false;
7913 }
7914
7915 assert!(!state.should_send_observation(0, later));
7917 }
7918
7919 #[test]
7920 fn respecting_negotiated_max_observation_rate_with_paths() {
7921 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7922 let now = Instant::now();
7923 let mut state = AddressDiscoveryState::new(&config, now);
7924
7925 state.observe_all_paths = true;
7927
7928 for i in 0..3 {
7930 state
7931 .sent_observations
7932 .insert(i, paths::PathAddressInfo::new());
7933 state
7934 .sent_observations
7935 .get_mut(&i)
7936 .unwrap()
7937 .observed_address = Some(SocketAddr::new(
7938 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100 + i as u8)),
7939 5000,
7940 ));
7941 }
7942
7943 for _ in 0..3 {
7946 for i in 0..3 {
7948 if state.should_send_observation(i, now) {
7949 state.record_observation_sent(i);
7950 state.sent_observations.get_mut(&i).unwrap().notified = false;
7952 }
7953 }
7954 }
7955
7956 assert!(state.should_send_observation(0, now));
7959 state.record_observation_sent(0);
7960
7961 assert!(!state.should_send_observation(0, now));
7963 assert!(!state.should_send_observation(1, now));
7964 assert!(!state.should_send_observation(2, now));
7965 }
7966
7967 #[test]
7968 fn queue_observed_address_frame_basic() {
7969 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7970 let now = Instant::now();
7971 let mut state = AddressDiscoveryState::new(&config, now);
7972
7973 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7975 let frame = state.queue_observed_address_frame(0, address);
7976
7977 assert!(frame.is_some());
7979 let frame = frame.unwrap();
7980 assert_eq!(frame.address, address);
7981
7982 assert!(state.sent_observations.contains_key(&0));
7984 assert!(state.sent_observations.get(&0).unwrap().notified);
7985 }
7986
7987 #[test]
7988 fn queue_observed_address_frame_rate_limited() {
7989 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7990 let now = Instant::now();
7991 let mut state = AddressDiscoveryState::new(&config, now);
7992
7993 state.observe_all_paths = true;
7995
7996 let mut addresses = Vec::new();
7998 for i in 0..10 {
7999 let addr = SocketAddr::new(
8000 IpAddr::V4(Ipv4Addr::new(192, 168, 1, i as u8)),
8001 5000 + i as u16,
8002 );
8003 addresses.push(addr);
8004 assert!(
8005 state.queue_observed_address_frame(i as u64, addr).is_some(),
8006 "Frame {} should be allowed",
8007 i + 1
8008 );
8009 }
8010
8011 let addr11 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 11)), 5011);
8013 assert!(
8014 state.queue_observed_address_frame(10, addr11).is_none(),
8015 "11th frame should be rate limited"
8016 );
8017 }
8018
8019 #[test]
8020 fn queue_observed_address_frame_disabled() {
8021 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8022 let now = Instant::now();
8023 let mut state = AddressDiscoveryState::new(&config, now);
8024
8025 state.enabled = false;
8027
8028 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
8029
8030 assert!(state.queue_observed_address_frame(0, address).is_none());
8032 }
8033
8034 #[test]
8035 fn queue_observed_address_frame_already_notified() {
8036 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8037 let now = Instant::now();
8038 let mut state = AddressDiscoveryState::new(&config, now);
8039
8040 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
8041
8042 assert!(state.queue_observed_address_frame(0, address).is_some());
8044
8045 assert!(state.queue_observed_address_frame(0, address).is_none());
8047
8048 let new_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
8050 assert!(state.queue_observed_address_frame(0, new_address).is_none());
8051 }
8052
8053 #[test]
8054 fn queue_observed_address_frame_primary_path_only() {
8055 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8056 let now = Instant::now();
8057 let mut state = AddressDiscoveryState::new(&config, now);
8058
8059 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
8060
8061 assert!(state.queue_observed_address_frame(0, address).is_some());
8063
8064 assert!(state.queue_observed_address_frame(1, address).is_none());
8066 assert!(state.queue_observed_address_frame(2, address).is_none());
8067 }
8068
8069 #[test]
8070 fn queue_observed_address_frame_updates_path_info() {
8071 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8072 let now = Instant::now();
8073 let mut state = AddressDiscoveryState::new(&config, now);
8074
8075 let address = SocketAddr::new(
8076 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
8077 5000,
8078 );
8079
8080 let frame = state.queue_observed_address_frame(0, address);
8082 assert!(frame.is_some());
8083
8084 let path_info = state.sent_observations.get(&0).unwrap();
8086 assert_eq!(path_info.observed_address, Some(address));
8087 assert!(path_info.notified);
8088
8089 assert_eq!(state.received_history.len(), 0);
8092 }
8093
8094 #[test]
8095 fn retransmits_includes_outbound_observations() {
8096 use crate::connection::spaces::Retransmits;
8097
8098 let mut retransmits = Retransmits::default();
8100
8101 assert!(retransmits.outbound_observations.is_empty());
8103
8104 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
8106 let frame = frame::ObservedAddress {
8107 sequence_number: VarInt::from_u32(1),
8108 address,
8109 };
8110 retransmits.outbound_observations.push(frame);
8111
8112 assert_eq!(retransmits.outbound_observations.len(), 1);
8114 assert_eq!(retransmits.outbound_observations[0].address, address);
8115 }
8116
8117 #[test]
8118 fn check_for_address_observations_no_peer_support() {
8119 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8120 let now = Instant::now();
8121 let mut state = AddressDiscoveryState::new(&config, now);
8122
8123 state
8125 .sent_observations
8126 .insert(0, paths::PathAddressInfo::new());
8127 state
8128 .sent_observations
8129 .get_mut(&0)
8130 .unwrap()
8131 .observed_address = Some(SocketAddr::new(
8132 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)),
8133 5000,
8134 ));
8135
8136 let frames = state.check_for_address_observations(0, false, now);
8138
8139 assert!(frames.is_empty());
8141 }
8142
8143 #[test]
8144 fn check_for_address_observations_with_peer_support() {
8145 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8146 let now = Instant::now();
8147 let mut state = AddressDiscoveryState::new(&config, now);
8148
8149 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
8151 state
8152 .sent_observations
8153 .insert(0, paths::PathAddressInfo::new());
8154 state
8155 .sent_observations
8156 .get_mut(&0)
8157 .unwrap()
8158 .observed_address = Some(address);
8159
8160 let frames = state.check_for_address_observations(0, true, now);
8162
8163 assert_eq!(frames.len(), 1);
8165 assert_eq!(frames[0].address, address);
8166
8167 assert!(state.sent_observations.get(&0).unwrap().notified);
8169 }
8170
8171 #[test]
8172 fn check_for_address_observations_rate_limited() {
8173 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8174 let now = Instant::now();
8175 let mut state = AddressDiscoveryState::new(&config, now);
8176
8177 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
8179 state
8180 .sent_observations
8181 .insert(0, paths::PathAddressInfo::new());
8182 state
8183 .sent_observations
8184 .get_mut(&0)
8185 .unwrap()
8186 .observed_address = Some(address);
8187
8188 for _ in 0..10 {
8190 let frames = state.check_for_address_observations(0, true, now);
8191 if frames.is_empty() {
8192 break;
8193 }
8194 state.sent_observations.get_mut(&0).unwrap().notified = false;
8196 }
8197
8198 assert_eq!(state.rate_limiter.tokens, 0.0);
8200
8201 state.sent_observations.get_mut(&0).unwrap().notified = false;
8203
8204 let frames2 = state.check_for_address_observations(0, true, now);
8206 assert_eq!(frames2.len(), 0);
8207
8208 state.sent_observations.get_mut(&0).unwrap().notified = false;
8210
8211 let later = now + Duration::from_millis(200); let frames3 = state.check_for_address_observations(0, true, later);
8214 assert_eq!(frames3.len(), 1);
8215 }
8216
8217 #[test]
8218 fn check_for_address_observations_multiple_paths() {
8219 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8220 let now = Instant::now();
8221 let mut state = AddressDiscoveryState::new(&config, now);
8222
8223 state.observe_all_paths = true;
8225
8226 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
8228 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
8229
8230 state
8231 .sent_observations
8232 .insert(0, paths::PathAddressInfo::new());
8233 state
8234 .sent_observations
8235 .get_mut(&0)
8236 .unwrap()
8237 .observed_address = Some(addr1);
8238
8239 state
8240 .sent_observations
8241 .insert(1, paths::PathAddressInfo::new());
8242 state
8243 .sent_observations
8244 .get_mut(&1)
8245 .unwrap()
8246 .observed_address = Some(addr2);
8247
8248 let frames = state.check_for_address_observations(0, true, now);
8250
8251 assert_eq!(frames.len(), 2);
8253
8254 let addresses: Vec<_> = frames.iter().map(|f| f.address).collect();
8256 assert!(addresses.contains(&addr1));
8257 assert!(addresses.contains(&addr2));
8258
8259 assert!(state.sent_observations.get(&0).unwrap().notified);
8261 assert!(state.sent_observations.get(&1).unwrap().notified);
8262 }
8263
8264 #[test]
8266 fn test_rate_limiter_configuration() {
8267 let state = AddressDiscoveryState::new_with_params(true, 10.0, false);
8269 assert_eq!(state.rate_limiter.rate, 10.0);
8270 assert_eq!(state.rate_limiter.max_tokens, 10.0);
8271 assert_eq!(state.rate_limiter.tokens, 10.0);
8272
8273 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
8274 assert_eq!(state.rate_limiter.rate, 63.0);
8275 assert_eq!(state.rate_limiter.max_tokens, 63.0);
8276 }
8277
8278 #[test]
8279 fn test_rate_limiter_update_configuration() {
8280 let mut state = AddressDiscoveryState::new_with_params(true, 5.0, false);
8281
8282 assert_eq!(state.rate_limiter.rate, 5.0);
8284
8285 state.update_rate_limit(10.0);
8287 assert_eq!(state.rate_limiter.rate, 10.0);
8288 assert_eq!(state.rate_limiter.max_tokens, 10.0);
8289
8290 state.rate_limiter.tokens = 15.0;
8292 state.update_rate_limit(8.0);
8293 assert_eq!(state.rate_limiter.tokens, 8.0);
8294 }
8295
8296 #[test]
8297 fn test_rate_limiter_from_transport_params() {
8298 let mut params = TransportParameters::default();
8299 params.address_discovery = Some(AddressDiscoveryConfig::SendAndReceive);
8300
8301 let state = AddressDiscoveryState::from_transport_params(¶ms);
8302 assert!(state.is_some());
8303 let state = state.unwrap();
8304 assert_eq!(state.rate_limiter.rate, 10.0); assert!(!state.observe_all_paths); }
8307
8308 #[test]
8309 fn test_rate_limiter_zero_rate() {
8310 let state = AddressDiscoveryState::new_with_params(true, 0.0, false);
8311 assert_eq!(state.rate_limiter.rate, 0.0);
8312 assert_eq!(state.rate_limiter.tokens, 0.0);
8313
8314 let address = "192.168.1.1:443".parse().unwrap();
8316 let mut state = AddressDiscoveryState::new_with_params(true, 0.0, false);
8317 let frame = state.queue_observed_address_frame(0, address);
8318 assert!(frame.is_none());
8319 }
8320
8321 #[test]
8322 fn test_rate_limiter_configuration_edge_cases() {
8323 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
8325 assert_eq!(state.rate_limiter.rate, 63.0);
8326
8327 let state = AddressDiscoveryState::new_with_params(true, 100.0, false);
8329 assert_eq!(state.rate_limiter.rate, 100.0);
8331
8332 let state = AddressDiscoveryState::new_with_params(true, 2.5, false);
8334 assert_eq!(state.rate_limiter.rate, 2.0);
8336 }
8337
8338 #[test]
8339 fn test_rate_limiter_runtime_update() {
8340 let mut state = AddressDiscoveryState::new_with_params(true, 10.0, false);
8341 let now = Instant::now();
8342
8343 state.rate_limiter.tokens = 5.0;
8345
8346 state.update_rate_limit(3.0);
8348
8349 assert_eq!(state.rate_limiter.tokens, 3.0);
8351 assert_eq!(state.rate_limiter.rate, 3.0);
8352 assert_eq!(state.rate_limiter.max_tokens, 3.0);
8353
8354 let later = now + Duration::from_secs(1);
8356 state.rate_limiter.update_tokens(later);
8357
8358 assert_eq!(state.rate_limiter.tokens, 3.0);
8360 }
8361
8362 #[test]
8364 fn test_address_discovery_state_initialization_default() {
8365 let now = Instant::now();
8367 let default_config = crate::transport_parameters::AddressDiscoveryConfig::default();
8368
8369 let address_discovery_state = Some(AddressDiscoveryState::new(&default_config, now));
8372
8373 assert!(address_discovery_state.is_some());
8374 let state = address_discovery_state.unwrap();
8375
8376 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
8380 }
8381
8382 #[test]
8383 fn test_address_discovery_state_initialization_on_handshake() {
8384 let now = Instant::now();
8386
8387 let mut address_discovery_state = Some(AddressDiscoveryState::new(
8389 &crate::transport_parameters::AddressDiscoveryConfig::default(),
8390 now,
8391 ));
8392
8393 let peer_params = TransportParameters {
8395 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8396 ..TransportParameters::default()
8397 };
8398
8399 if let Some(peer_config) = &peer_params.address_discovery {
8401 address_discovery_state = Some(AddressDiscoveryState::new(peer_config, now));
8403 }
8404
8405 assert!(address_discovery_state.is_some());
8407 let state = address_discovery_state.unwrap();
8408 assert!(state.enabled);
8409 assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths); }
8413
8414 #[test]
8415 fn test_address_discovery_negotiation_disabled_peer() {
8416 let now = Instant::now();
8418
8419 let our_config = AddressDiscoveryConfig::SendAndReceive;
8421 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8422
8423 let peer_params = TransportParameters {
8425 address_discovery: None,
8426 ..TransportParameters::default()
8427 };
8428
8429 if peer_params.address_discovery.is_none() {
8431 if let Some(state) = &mut address_discovery_state {
8432 state.enabled = false;
8433 }
8434 }
8435
8436 let state = address_discovery_state.unwrap();
8438 assert!(!state.enabled); }
8440
8441 #[test]
8442 fn test_address_discovery_negotiation_rate_limiting() {
8443 let now = Instant::now();
8445
8446 let our_config = AddressDiscoveryConfig::SendAndReceive;
8448 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8449
8450 if let Some(state) = &mut address_discovery_state {
8452 state.max_observation_rate = 30;
8453 state.update_rate_limit(30.0);
8454 }
8455
8456 let peer_params = TransportParameters {
8458 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8459 ..TransportParameters::default()
8460 };
8461
8462 if let (Some(state), Some(_peer_config)) =
8465 (&mut address_discovery_state, &peer_params.address_discovery)
8466 {
8467 let peer_rate = 15u8;
8470 let negotiated_rate = state.max_observation_rate.min(peer_rate);
8471 state.update_rate_limit(negotiated_rate as f64);
8472 }
8473
8474 let state = address_discovery_state.unwrap();
8476 assert_eq!(state.rate_limiter.rate, 15.0); }
8478
8479 #[test]
8480 fn test_address_discovery_path_initialization() {
8481 let now = Instant::now();
8483 let config = AddressDiscoveryConfig::SendAndReceive;
8484 let mut state = AddressDiscoveryState::new(&config, now);
8485
8486 assert!(state.sent_observations.is_empty());
8488 assert!(state.received_observations.is_empty());
8489
8490 let should_send = state.should_send_observation(0, now);
8492 assert!(should_send); }
8497
8498 #[test]
8499 fn test_address_discovery_multiple_path_initialization() {
8500 let now = Instant::now();
8502 let config = AddressDiscoveryConfig::SendAndReceive;
8503 let mut state = AddressDiscoveryState::new(&config, now);
8504
8505 assert!(state.should_send_observation(0, now)); assert!(!state.should_send_observation(1, now)); assert!(!state.should_send_observation(2, now)); state.observe_all_paths = true;
8512 assert!(state.should_send_observation(1, now)); assert!(state.should_send_observation(2, now)); let config_primary_only = AddressDiscoveryConfig::SendAndReceive;
8517 let mut state_primary = AddressDiscoveryState::new(&config_primary_only, now);
8518
8519 assert!(state_primary.should_send_observation(0, now)); assert!(!state_primary.should_send_observation(1, now)); }
8522
8523 #[test]
8524 fn test_handle_observed_address_frame_valid() {
8525 let now = Instant::now();
8527 let config = AddressDiscoveryConfig::SendAndReceive;
8528 let mut state = AddressDiscoveryState::new(&config, now);
8529
8530 let observed_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8532 state.handle_observed_address(observed_addr, 0, now);
8533
8534 assert_eq!(state.received_history.len(), 1);
8536 assert_eq!(state.received_history[0].address, observed_addr);
8537 assert_eq!(state.received_history[0].path_id, 0);
8538 assert_eq!(state.received_history[0].received_at, now);
8539
8540 let path_info = state.received_observations.get(&0).unwrap();
8542 assert_eq!(path_info.observed_address, Some(observed_addr));
8543 assert_eq!(path_info.last_observed, Some(now));
8544 assert_eq!(path_info.observation_count, 1);
8545 }
8546
8547 #[test]
8548 fn test_handle_multiple_received_history() {
8549 let now = Instant::now();
8551 let config = AddressDiscoveryConfig::SendAndReceive;
8552 let mut state = AddressDiscoveryState::new(&config, now);
8553
8554 let addr1 = SocketAddr::from(([192, 168, 1, 100], 5000));
8556 let addr2 = SocketAddr::from(([10, 0, 0, 50], 6000));
8557 let addr3 = SocketAddr::from(([192, 168, 1, 100], 7000)); state.handle_observed_address(addr1, 0, now);
8560 state.handle_observed_address(addr2, 1, now);
8561 state.handle_observed_address(addr3, 0, now + Duration::from_millis(100));
8562
8563 assert_eq!(state.received_history.len(), 3);
8565
8566 let path0_info = state.received_observations.get(&0).unwrap();
8568 assert_eq!(path0_info.observed_address, Some(addr3));
8569 assert_eq!(path0_info.observation_count, 1); let path1_info = state.received_observations.get(&1).unwrap();
8573 assert_eq!(path1_info.observed_address, Some(addr2));
8574 assert_eq!(path1_info.observation_count, 1);
8575 }
8576
8577 #[test]
8578 fn test_get_observed_address() {
8579 let now = Instant::now();
8581 let config = AddressDiscoveryConfig::SendAndReceive;
8582 let mut state = AddressDiscoveryState::new(&config, now);
8583
8584 assert_eq!(state.get_observed_address(0), None);
8586
8587 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8589 state.handle_observed_address(addr, 0, now);
8590
8591 assert_eq!(state.get_observed_address(0), Some(addr));
8593
8594 assert_eq!(state.get_observed_address(999), None);
8596 }
8597
8598 #[test]
8599 fn test_has_unnotified_changes() {
8600 let now = Instant::now();
8602 let config = AddressDiscoveryConfig::SendAndReceive;
8603 let mut state = AddressDiscoveryState::new(&config, now);
8604
8605 assert!(!state.has_unnotified_changes());
8607
8608 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8610 state.handle_observed_address(addr, 0, now);
8611 assert!(state.has_unnotified_changes());
8612
8613 if let Some(path_info) = state.received_observations.get_mut(&0) {
8615 path_info.notified = true;
8616 }
8617 assert!(!state.has_unnotified_changes());
8618
8619 let addr2 = SocketAddr::from(([192, 168, 1, 100], 6000));
8621 state.handle_observed_address(addr2, 0, now + Duration::from_secs(1));
8622 assert!(state.has_unnotified_changes());
8623 }
8624
8625 #[test]
8626 fn test_address_discovery_disabled() {
8627 let now = Instant::now();
8629 let config = AddressDiscoveryConfig::SendAndReceive;
8630 let mut state = AddressDiscoveryState::new(&config, now);
8631
8632 state.enabled = false;
8634
8635 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8637 state.handle_observed_address(addr, 0, now);
8638
8639 assert_eq!(state.received_history.len(), 0);
8641
8642 assert!(!state.should_send_observation(0, now));
8644 }
8645
8646 #[test]
8647 fn test_rate_limiting_basic() {
8648 let now = Instant::now();
8650 let config = AddressDiscoveryConfig::SendAndReceive;
8651 let mut state = AddressDiscoveryState::new(&config, now);
8652
8653 state.observe_all_paths = true;
8655 state.rate_limiter.set_rate(2); assert!(state.should_send_observation(0, now));
8659 state.record_observation_sent(0);
8661
8662 assert!(state.should_send_observation(1, now));
8664 state.record_observation_sent(1);
8665
8666 assert!(!state.should_send_observation(2, now));
8668
8669 let later = now + Duration::from_millis(500);
8671 assert!(state.should_send_observation(3, later));
8672 state.record_observation_sent(3);
8673
8674 assert!(!state.should_send_observation(4, later));
8676
8677 let _one_sec_later = now + Duration::from_secs(1);
8681 let two_sec_later = now + Duration::from_secs(2);
8685 assert!(state.should_send_observation(5, two_sec_later));
8686 state.record_observation_sent(5);
8687
8688 assert!(state.should_send_observation(6, two_sec_later));
8699 state.record_observation_sent(6);
8700
8701 assert!(
8703 !state.should_send_observation(7, two_sec_later),
8704 "Expected no tokens available"
8705 );
8706 }
8707
8708 #[test]
8709 fn test_rate_limiting_per_path() {
8710 let now = Instant::now();
8712 let config = AddressDiscoveryConfig::SendAndReceive;
8713 let mut state = AddressDiscoveryState::new(&config, now);
8714
8715 state
8717 .sent_observations
8718 .insert(0, paths::PathAddressInfo::new());
8719 state
8720 .sent_observations
8721 .get_mut(&0)
8722 .unwrap()
8723 .observed_address = Some(SocketAddr::new(
8724 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
8725 8080,
8726 ));
8727
8728 for _ in 0..10 {
8730 assert!(state.should_send_observation(0, now));
8731 state.record_observation_sent(0);
8732 state.sent_observations.get_mut(&0).unwrap().notified = false;
8734 }
8735
8736 assert!(!state.should_send_observation(0, now));
8738
8739 let later = now + Duration::from_millis(100);
8741 assert!(state.should_send_observation(0, later));
8742 state.record_observation_sent(0);
8743
8744 state.sent_observations.get_mut(&0).unwrap().notified = false;
8746
8747 assert!(!state.should_send_observation(0, later));
8749 }
8750
8751 #[test]
8752 fn test_rate_limiting_zero_rate() {
8753 let now = Instant::now();
8755 let config = AddressDiscoveryConfig::SendAndReceive;
8756 let mut state = AddressDiscoveryState::new(&config, now);
8757
8758 state.rate_limiter.set_rate(0);
8760 state.rate_limiter.tokens = 0.0;
8761 state.rate_limiter.max_tokens = 0.0;
8762
8763 assert!(!state.should_send_observation(0, now));
8765 assert!(!state.should_send_observation(0, now + Duration::from_secs(10)));
8766 assert!(!state.should_send_observation(0, now + Duration::from_secs(100)));
8767 }
8768
8769 #[test]
8770 fn test_rate_limiting_update() {
8771 let now = Instant::now();
8773 let config = AddressDiscoveryConfig::SendAndReceive;
8774 let mut state = AddressDiscoveryState::new(&config, now);
8775
8776 state.observe_all_paths = true;
8778
8779 for i in 0..12 {
8781 state
8782 .sent_observations
8783 .insert(i, paths::PathAddressInfo::new());
8784 state
8785 .sent_observations
8786 .get_mut(&i)
8787 .unwrap()
8788 .observed_address = Some(SocketAddr::new(
8789 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8790 8080,
8791 ));
8792 }
8793
8794 for i in 0..10 {
8797 assert!(state.should_send_observation(i, now));
8798 state.record_observation_sent(i);
8799 }
8800 assert!(!state.should_send_observation(10, now));
8802
8803 state.update_rate_limit(20.0);
8805
8806 let later = now + Duration::from_millis(50);
8809 assert!(state.should_send_observation(10, later));
8810 state.record_observation_sent(10);
8811
8812 let later2 = now + Duration::from_millis(100);
8814 assert!(state.should_send_observation(11, later2));
8815 }
8816
8817 #[test]
8818 fn test_rate_limiting_burst() {
8819 let now = Instant::now();
8821 let config = AddressDiscoveryConfig::SendAndReceive;
8822 let mut state = AddressDiscoveryState::new(&config, now);
8823
8824 for _ in 0..10 {
8826 assert!(state.should_send_observation(0, now));
8827 state.record_observation_sent(0);
8828 }
8829
8830 assert!(!state.should_send_observation(0, now));
8832
8833 let later = now + Duration::from_millis(100);
8835 assert!(state.should_send_observation(0, later));
8836 state.record_observation_sent(0);
8837 assert!(!state.should_send_observation(0, later));
8838 }
8839
8840 #[test]
8841 fn test_connection_rate_limiting_with_check_observations() {
8842 let now = Instant::now();
8844 let config = AddressDiscoveryConfig::SendAndReceive;
8845 let mut state = AddressDiscoveryState::new(&config, now);
8846
8847 let mut path_info = paths::PathAddressInfo::new();
8849 path_info.update_observed_address(
8850 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
8851 now,
8852 );
8853 state.sent_observations.insert(0, path_info);
8854
8855 let frame1 =
8857 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8858 assert!(frame1.is_some());
8859 state.record_observation_sent(0);
8860
8861 if let Some(info) = state.sent_observations.get_mut(&0) {
8863 info.notified = false;
8864 }
8865
8866 for _ in 1..10 {
8868 if let Some(info) = state.sent_observations.get_mut(&0) {
8870 info.notified = false;
8871 }
8872 let frame =
8873 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8874 assert!(frame.is_some());
8875 state.record_observation_sent(0);
8876 }
8877
8878 if let Some(info) = state.sent_observations.get_mut(&0) {
8880 info.notified = false;
8881 }
8882 let frame3 =
8883 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8884 assert!(frame3.is_none()); let later = now + Duration::from_millis(100);
8888 state.rate_limiter.update_tokens(later); if let Some(info) = state.sent_observations.get_mut(&0) {
8892 info.notified = false;
8893 }
8894
8895 let frame4 =
8896 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8897 assert!(frame4.is_some()); }
8899
8900 #[test]
8901 fn test_queue_observed_address_frame() {
8902 let now = Instant::now();
8904 let config = AddressDiscoveryConfig::SendAndReceive;
8905 let mut state = AddressDiscoveryState::new(&config, now);
8906
8907 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8908
8909 let frame = state.queue_observed_address_frame(0, addr);
8911 assert!(frame.is_some());
8912 assert_eq!(frame.unwrap().address, addr);
8913
8914 state.record_observation_sent(0);
8916
8917 for i in 0..9 {
8919 if let Some(info) = state.sent_observations.get_mut(&0) {
8921 info.notified = false;
8922 }
8923
8924 let frame = state.queue_observed_address_frame(0, addr);
8925 assert!(frame.is_some(), "Frame {} should be allowed", i + 2);
8926 state.record_observation_sent(0);
8927 }
8928
8929 if let Some(info) = state.sent_observations.get_mut(&0) {
8931 info.notified = false;
8932 }
8933
8934 let frame = state.queue_observed_address_frame(0, addr);
8936 assert!(frame.is_none(), "11th frame should be rate limited");
8937 }
8938
8939 #[test]
8940 fn test_multi_path_basic() {
8941 let now = Instant::now();
8943 let config = AddressDiscoveryConfig::SendAndReceive;
8944 let mut state = AddressDiscoveryState::new(&config, now);
8945
8946 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8947 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8948 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8949
8950 state.handle_observed_address(addr1, 0, now);
8952 state.handle_observed_address(addr2, 1, now);
8953 state.handle_observed_address(addr3, 2, now);
8954
8955 assert_eq!(state.get_observed_address(0), Some(addr1));
8957 assert_eq!(state.get_observed_address(1), Some(addr2));
8958 assert_eq!(state.get_observed_address(2), Some(addr3));
8959
8960 assert!(state.has_unnotified_changes());
8962
8963 assert_eq!(state.received_history.len(), 3);
8965 }
8966
8967 #[test]
8968 fn test_multi_path_observe_primary_only() {
8969 let now = Instant::now();
8971 let config = AddressDiscoveryConfig::SendAndReceive;
8972 let mut state = AddressDiscoveryState::new(&config, now);
8973
8974 assert!(state.should_send_observation(0, now));
8976 state.record_observation_sent(0);
8977
8978 assert!(!state.should_send_observation(1, now));
8980 assert!(!state.should_send_observation(2, now));
8981
8982 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8984 assert!(state.queue_observed_address_frame(0, addr).is_some());
8985 assert!(state.queue_observed_address_frame(1, addr).is_none());
8986 assert!(state.queue_observed_address_frame(2, addr).is_none());
8987 }
8988
8989 #[test]
8990 fn test_multi_path_rate_limiting() {
8991 let now = Instant::now();
8993 let config = AddressDiscoveryConfig::SendAndReceive;
8994 let mut state = AddressDiscoveryState::new(&config, now);
8995
8996 state.observe_all_paths = true;
8998
8999 for i in 0..21 {
9001 state
9002 .sent_observations
9003 .insert(i, paths::PathAddressInfo::new());
9004 state
9005 .sent_observations
9006 .get_mut(&i)
9007 .unwrap()
9008 .observed_address = Some(SocketAddr::new(
9009 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
9010 8080,
9011 ));
9012 }
9013
9014 for i in 0..10 {
9016 assert!(state.should_send_observation(i, now));
9017 state.record_observation_sent(i);
9018 }
9019
9020 assert!(!state.should_send_observation(10, now));
9022
9023 state.sent_observations.get_mut(&0).unwrap().notified = false;
9025 assert!(!state.should_send_observation(0, now)); let later = now + Duration::from_secs(1);
9029 for i in 10..20 {
9030 assert!(state.should_send_observation(i, later));
9031 state.record_observation_sent(i);
9032 }
9033 assert!(!state.should_send_observation(20, later));
9035 }
9036
9037 #[test]
9038 fn test_multi_path_address_changes() {
9039 let now = Instant::now();
9041 let config = AddressDiscoveryConfig::SendAndReceive;
9042 let mut state = AddressDiscoveryState::new(&config, now);
9043
9044 let addr1a = SocketAddr::from(([192, 168, 1, 1], 5000));
9045 let addr1b = SocketAddr::from(([192, 168, 1, 2], 5000));
9046 let addr2a = SocketAddr::from(([10, 0, 0, 1], 6000));
9047 let addr2b = SocketAddr::from(([10, 0, 0, 2], 6000));
9048
9049 state.handle_observed_address(addr1a, 0, now);
9051 state.handle_observed_address(addr2a, 1, now);
9052
9053 if let Some(info) = state.received_observations.get_mut(&0) {
9055 info.notified = true;
9056 }
9057 if let Some(info) = state.received_observations.get_mut(&1) {
9058 info.notified = true;
9059 }
9060 assert!(!state.has_unnotified_changes());
9061
9062 state.handle_observed_address(addr1b, 0, now + Duration::from_secs(1));
9064 assert!(state.has_unnotified_changes());
9065
9066 assert_eq!(state.get_observed_address(0), Some(addr1b));
9068 assert_eq!(state.get_observed_address(1), Some(addr2a));
9069
9070 if let Some(info) = state.received_observations.get_mut(&0) {
9072 info.notified = true;
9073 }
9074 assert!(!state.has_unnotified_changes());
9075
9076 state.handle_observed_address(addr2b, 1, now + Duration::from_secs(2));
9078 assert!(state.has_unnotified_changes());
9079 }
9080
9081 #[test]
9082 fn test_multi_path_migration() {
9083 let now = Instant::now();
9085 let config = AddressDiscoveryConfig::SendAndReceive;
9086 let mut state = AddressDiscoveryState::new(&config, now);
9087
9088 let addr_old = SocketAddr::from(([192, 168, 1, 1], 5000));
9089 let addr_new = SocketAddr::from(([10, 0, 0, 1], 6000));
9090
9091 state.handle_observed_address(addr_old, 0, now);
9093 assert_eq!(state.get_observed_address(0), Some(addr_old));
9094
9095 state.handle_observed_address(addr_new, 1, now + Duration::from_secs(1));
9097
9098 assert_eq!(state.get_observed_address(0), Some(addr_old));
9100 assert_eq!(state.get_observed_address(1), Some(addr_new));
9101
9102 assert_eq!(state.received_observations.len(), 2);
9105 }
9106
9107 #[test]
9108 fn test_check_for_address_observations_multi_path() {
9109 let now = Instant::now();
9111 let config = AddressDiscoveryConfig::SendAndReceive;
9112 let mut state = AddressDiscoveryState::new(&config, now);
9113
9114 state.observe_all_paths = true;
9116
9117 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
9119 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
9120 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
9121
9122 state
9124 .sent_observations
9125 .insert(0, paths::PathAddressInfo::new());
9126 state
9127 .sent_observations
9128 .get_mut(&0)
9129 .unwrap()
9130 .observed_address = Some(addr1);
9131 state
9132 .sent_observations
9133 .insert(1, paths::PathAddressInfo::new());
9134 state
9135 .sent_observations
9136 .get_mut(&1)
9137 .unwrap()
9138 .observed_address = Some(addr2);
9139 state
9140 .sent_observations
9141 .insert(2, paths::PathAddressInfo::new());
9142 state
9143 .sent_observations
9144 .get_mut(&2)
9145 .unwrap()
9146 .observed_address = Some(addr3);
9147
9148 let frames = state.check_for_address_observations(0, true, now);
9150
9151 assert_eq!(frames.len(), 3);
9153
9154 let frame_addrs: Vec<_> = frames.iter().map(|f| f.address).collect();
9156 assert!(frame_addrs.contains(&addr1), "addr1 should be in frames");
9157 assert!(frame_addrs.contains(&addr2), "addr2 should be in frames");
9158 assert!(frame_addrs.contains(&addr3), "addr3 should be in frames");
9159
9160 assert!(!state.has_unnotified_changes());
9162 }
9163
9164 #[test]
9165 fn test_multi_path_with_peer_not_supporting() {
9166 let now = Instant::now();
9168 let config = AddressDiscoveryConfig::SendAndReceive;
9169 let mut state = AddressDiscoveryState::new(&config, now);
9170
9171 state.handle_observed_address(SocketAddr::from(([192, 168, 1, 1], 5000)), 0, now);
9173 state.handle_observed_address(SocketAddr::from(([10, 0, 0, 1], 6000)), 1, now);
9174
9175 let frames = state.check_for_address_observations(0, false, now);
9177 assert_eq!(frames.len(), 0);
9178
9179 assert!(state.has_unnotified_changes());
9181 }
9182
9183 #[test]
9185 fn test_bootstrap_node_aggressive_observation_mode() {
9186 let config = AddressDiscoveryConfig::SendAndReceive;
9188 let now = Instant::now();
9189 let mut state = AddressDiscoveryState::new(&config, now);
9190
9191 assert!(!state.is_bootstrap_mode());
9193
9194 state.set_bootstrap_mode(true);
9196 assert!(state.is_bootstrap_mode());
9197
9198 assert!(state.should_observe_path(0)); assert!(state.should_observe_path(1)); assert!(state.should_observe_path(2));
9202
9203 let bootstrap_rate = state.get_effective_rate_limit();
9205 assert!(bootstrap_rate > 10.0); }
9207
9208 #[test]
9209 fn test_bootstrap_node_immediate_observation() {
9210 let config = AddressDiscoveryConfig::SendAndReceive;
9212 let now = Instant::now();
9213 let mut state = AddressDiscoveryState::new(&config, now);
9214 state.set_bootstrap_mode(true);
9215
9216 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
9218 state.handle_observed_address(addr, 0, now);
9219
9220 assert!(state.should_send_observation_immediately(true));
9222
9223 assert!(state.should_send_observation(0, now));
9225
9226 let frame = state.queue_observed_address_frame(0, addr);
9228 assert!(frame.is_some());
9229 }
9230
9231 #[test]
9232 fn test_bootstrap_node_multiple_path_observations() {
9233 let config = AddressDiscoveryConfig::SendAndReceive;
9235 let now = Instant::now();
9236 let mut state = AddressDiscoveryState::new(&config, now);
9237 state.set_bootstrap_mode(true);
9238
9239 let addrs = vec![
9241 (0u64, SocketAddr::from(([192, 168, 1, 1], 5000))),
9242 (1u64, SocketAddr::from(([10, 0, 0, 1], 6000))),
9243 (2u64, SocketAddr::from(([172, 16, 0, 1], 7000))),
9244 ];
9245
9246 for (path_id, addr) in &addrs {
9247 state
9248 .sent_observations
9249 .insert(*path_id, paths::PathAddressInfo::new());
9250 state
9251 .sent_observations
9252 .get_mut(path_id)
9253 .unwrap()
9254 .observed_address = Some(*addr);
9255 }
9256
9257 let frames = state.check_for_address_observations(0, true, now);
9259 assert_eq!(frames.len(), 3);
9260
9261 for (_, addr) in &addrs {
9263 assert!(frames.iter().any(|f| f.address == *addr));
9264 }
9265 }
9266
9267 #[test]
9268 fn test_bootstrap_node_rate_limit_override() {
9269 let config = AddressDiscoveryConfig::SendAndReceive;
9271 let now = Instant::now();
9272 let mut state = AddressDiscoveryState::new(&config, now);
9273 state.set_bootstrap_mode(true);
9274
9275 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
9277
9278 for i in 0..10 {
9280 state.handle_observed_address(addr, i, now);
9281 let can_send = state.should_send_observation(i, now);
9282 assert!(can_send, "Bootstrap node should send observation {i}");
9283 state.record_observation_sent(i);
9284 }
9285 }
9286
9287 #[test]
9288 fn test_bootstrap_node_configuration() {
9289 let config = AddressDiscoveryConfig::SendAndReceive;
9291 let mut state = AddressDiscoveryState::new(&config, Instant::now());
9292
9293 state.set_bootstrap_mode(true);
9295
9296 assert!(state.bootstrap_mode);
9298 assert!(state.enabled);
9299
9300 let effective_rate = state.get_effective_rate_limit();
9302 assert!(effective_rate > state.max_observation_rate as f64);
9303 }
9304
9305 #[test]
9306 fn test_bootstrap_node_persistent_observation() {
9307 let config = AddressDiscoveryConfig::SendAndReceive;
9309 let mut now = Instant::now();
9310 let mut state = AddressDiscoveryState::new(&config, now);
9311 state.set_bootstrap_mode(true);
9312
9313 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
9314 let addr2 = SocketAddr::from(([192, 168, 1, 2], 5000));
9315
9316 state.handle_observed_address(addr1, 0, now);
9318 assert!(state.should_send_observation(0, now));
9319 state.record_observation_sent(0);
9320
9321 now += Duration::from_secs(60);
9323 state.handle_observed_address(addr2, 0, now);
9324
9325 assert!(state.should_send_observation(0, now));
9327 }
9328
9329 #[test]
9330 fn test_bootstrap_node_multi_peer_support() {
9331 let config = AddressDiscoveryConfig::SendAndReceive;
9334 let now = Instant::now();
9335 let mut state = AddressDiscoveryState::new(&config, now);
9336 state.set_bootstrap_mode(true);
9337
9338 let peer_addresses: Vec<(u64, SocketAddr)> = vec![
9340 (0, SocketAddr::from(([192, 168, 1, 1], 5000))), (1, SocketAddr::from(([10, 0, 0, 1], 6000))), (2, SocketAddr::from(([172, 16, 0, 1], 7000))), (3, SocketAddr::from(([192, 168, 2, 1], 8000))), ];
9345
9346 for (path_id, addr) in &peer_addresses {
9348 state
9349 .sent_observations
9350 .insert(*path_id, paths::PathAddressInfo::new());
9351 state
9352 .sent_observations
9353 .get_mut(path_id)
9354 .unwrap()
9355 .observed_address = Some(*addr);
9356 }
9357
9358 let frames = state.check_for_address_observations(0, true, now);
9360 assert_eq!(frames.len(), peer_addresses.len());
9361
9362 for (_, addr) in &peer_addresses {
9364 assert!(frames.iter().any(|f| f.address == *addr));
9365 }
9366 }
9367
9368 mod address_discovery_tests {
9370 include!("address_discovery_tests.rs");
9371 }
9372}