1use std::{
2 cmp,
3 collections::VecDeque,
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12use rand::{Rng, SeedableRng, rngs::StdRng};
15use thiserror::Error;
16use tracing::{debug, error, info, trace, trace_span, warn};
17
18use crate::{
19 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
20 MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
21 TransportErrorCode, VarInt,
22 cid_generator::ConnectionIdGenerator,
23 cid_queue::CidQueue,
24 coding::BufMutExt,
25 config::{ServerConfig, TransportConfig},
26 crypto::{self, KeyPair, Keys, PacketKey},
27 endpoint::AddressDiscoveryStats,
28 frame::{self, Close, Datagram, FrameStruct, NewToken},
29 packet::{
30 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
31 PacketNumber, PartialDecode, SpaceId,
32 },
33 range_set::ArrayRangeSet,
34 shared::{
35 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
36 EndpointEvent, EndpointEventInner,
37 },
38 token::{ResetToken, Token, TokenPayload},
39 transport_parameters::TransportParameters,
40};
41
42mod ack_frequency;
43use ack_frequency::AckFrequencyState;
44
45pub(crate) mod nat_traversal;
46use nat_traversal::NatTraversalState;
47pub(crate) use nat_traversal::{CoordinationPhase, NatTraversalError, NatTraversalRole};
48
49mod assembler;
50pub use assembler::Chunk;
51
52mod cid_state;
53use cid_state::CidState;
54
55mod datagrams;
56use datagrams::DatagramState;
57pub use datagrams::{Datagrams, SendDatagramError};
58
59mod mtud;
60mod pacing;
61
62mod packet_builder;
63use packet_builder::PacketBuilder;
64
65mod packet_crypto;
66use packet_crypto::{PrevCrypto, ZeroRttCrypto};
67
68mod paths;
69pub use paths::RttEstimator;
70use paths::{NatTraversalChallenges, PathData, PathResponses};
71
72mod send_buffer;
73
74mod spaces;
75#[cfg(fuzzing)]
76pub use spaces::Retransmits;
77#[cfg(not(fuzzing))]
78use spaces::Retransmits;
79use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
80
81mod stats;
82pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
83
84mod streams;
85#[cfg(fuzzing)]
86pub use streams::StreamsState;
87#[cfg(not(fuzzing))]
88use streams::StreamsState;
89pub use streams::{
90 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
91 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
92};
93
94mod timer;
95use crate::congestion::Controller;
96use timer::{Timer, TimerTable};
97
98pub struct Connection {
138 endpoint_config: Arc<EndpointConfig>,
139 config: Arc<TransportConfig>,
140 rng: StdRng,
141 crypto: Box<dyn crypto::Session>,
142 handshake_cid: ConnectionId,
144 rem_handshake_cid: ConnectionId,
146 local_ip: Option<IpAddr>,
149 path: PathData,
150 allow_mtud: bool,
152 prev_path: Option<(ConnectionId, PathData)>,
153 state: State,
154 side: ConnectionSide,
155 zero_rtt_enabled: bool,
157 zero_rtt_crypto: Option<ZeroRttCrypto>,
159 key_phase: bool,
160 key_phase_size: u64,
162 peer_params: TransportParameters,
164 orig_rem_cid: ConnectionId,
166 initial_dst_cid: ConnectionId,
168 retry_src_cid: Option<ConnectionId>,
171 lost_packets: u64,
173 events: VecDeque<Event>,
174 endpoint_events: VecDeque<EndpointEventInner>,
175 spin_enabled: bool,
177 spin: bool,
179 spaces: [PacketSpace; 3],
181 highest_space: SpaceId,
183 prev_crypto: Option<PrevCrypto>,
185 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
190 accepted_0rtt: bool,
191 permit_idle_reset: bool,
193 idle_timeout: Option<Duration>,
195 timers: TimerTable,
196 authentication_failures: u64,
198 error: Option<ConnectionError>,
200 packet_number_filter: PacketNumberFilter,
202
203 path_responses: PathResponses,
208 nat_traversal_challenges: NatTraversalChallenges,
210 close: bool,
211
212 ack_frequency: AckFrequencyState,
216
217 pto_count: u32,
222
223 receiving_ecn: bool,
228 total_authed_packets: u64,
230 app_limited: bool,
233
234 streams: StreamsState,
235 rem_cids: CidQueue,
237 local_cid_state: CidState,
239 datagrams: DatagramState,
241 stats: ConnectionStats,
243 version: u32,
245
246 nat_traversal: Option<NatTraversalState>,
248
249 address_discovery_state: Option<AddressDiscoveryState>,
251
252 #[cfg(feature = "trace")]
254 trace_context: crate::tracing::TraceContext,
255
256 #[cfg(feature = "trace")]
258 event_log: Arc<crate::tracing::EventLog>,
259
260 #[cfg(feature = "__qlog")]
262 qlog_streamer: Option<Box<dyn std::io::Write + Send + Sync>>,
263}
264
265impl Connection {
266 pub(crate) fn new(
267 endpoint_config: Arc<EndpointConfig>,
268 config: Arc<TransportConfig>,
269 init_cid: ConnectionId,
270 loc_cid: ConnectionId,
271 rem_cid: ConnectionId,
272 remote: SocketAddr,
273 local_ip: Option<IpAddr>,
274 crypto: Box<dyn crypto::Session>,
275 cid_gen: &dyn ConnectionIdGenerator,
276 now: Instant,
277 version: u32,
278 allow_mtud: bool,
279 rng_seed: [u8; 32],
280 side_args: SideArgs,
281 ) -> Self {
282 let pref_addr_cid = side_args.pref_addr_cid();
283 let path_validated = side_args.path_validated();
284 let connection_side = ConnectionSide::from(side_args);
285 let side = connection_side.side();
286 let initial_space = PacketSpace {
287 crypto: Some(crypto.initial_keys(&init_cid, side)),
288 ..PacketSpace::new(now)
289 };
290 let state = State::Handshake(state::Handshake {
291 rem_cid_set: side.is_server(),
292 expected_token: Bytes::new(),
293 client_hello: None,
294 });
295 let mut rng = StdRng::from_seed(rng_seed);
296 let mut this = Self {
297 endpoint_config,
298 crypto,
299 handshake_cid: loc_cid,
300 rem_handshake_cid: rem_cid,
301 local_cid_state: CidState::new(
302 cid_gen.cid_len(),
303 cid_gen.cid_lifetime(),
304 now,
305 if pref_addr_cid.is_some() { 2 } else { 1 },
306 ),
307 path: PathData::new(remote, allow_mtud, None, now, &config),
308 allow_mtud,
309 local_ip,
310 prev_path: None,
311 state,
312 side: connection_side,
313 zero_rtt_enabled: false,
314 zero_rtt_crypto: None,
315 key_phase: false,
316 key_phase_size: rng.gen_range(10..1000),
323 peer_params: TransportParameters::default(),
324 orig_rem_cid: rem_cid,
325 initial_dst_cid: init_cid,
326 retry_src_cid: None,
327 lost_packets: 0,
328 events: VecDeque::new(),
329 endpoint_events: VecDeque::new(),
330 spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
331 spin: false,
332 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
333 highest_space: SpaceId::Initial,
334 prev_crypto: None,
335 next_crypto: None,
336 accepted_0rtt: false,
337 permit_idle_reset: true,
338 idle_timeout: match config.max_idle_timeout {
339 None | Some(VarInt(0)) => None,
340 Some(dur) => Some(Duration::from_millis(dur.0)),
341 },
342 timers: TimerTable::default(),
343 authentication_failures: 0,
344 error: None,
345 #[cfg(test)]
346 packet_number_filter: match config.deterministic_packet_numbers {
347 false => PacketNumberFilter::new(&mut rng),
348 true => PacketNumberFilter::disabled(),
349 },
350 #[cfg(not(test))]
351 packet_number_filter: PacketNumberFilter::new(&mut rng),
352
353 path_responses: PathResponses::default(),
354 nat_traversal_challenges: NatTraversalChallenges::default(),
355 close: false,
356
357 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
358 &TransportParameters::default(),
359 )),
360
361 pto_count: 0,
362
363 app_limited: false,
364 receiving_ecn: false,
365 total_authed_packets: 0,
366
367 streams: StreamsState::new(
368 side,
369 config.max_concurrent_uni_streams,
370 config.max_concurrent_bidi_streams,
371 config.send_window,
372 config.receive_window,
373 config.stream_receive_window,
374 ),
375 datagrams: DatagramState::default(),
376 config,
377 rem_cids: CidQueue::new(rem_cid),
378 rng,
379 stats: ConnectionStats::default(),
380 version,
381 nat_traversal: None, address_discovery_state: {
383 Some(AddressDiscoveryState::new(
386 &crate::transport_parameters::AddressDiscoveryConfig::default(),
387 now
388 ))
389 },
390
391 #[cfg(feature = "trace")]
392 trace_context: crate::tracing::TraceContext::new(crate::tracing::TraceId::new()),
393
394 #[cfg(feature = "trace")]
395 event_log: crate::tracing::global_log(),
396
397 #[cfg(feature = "__qlog")]
398 qlog_streamer: None,
399 };
400
401 #[cfg(feature = "trace")]
403 {
404 use crate::tracing::*;
405 use crate::trace_event;
406 let _peer_id = {
407 let mut id = [0u8; 32];
408 let addr_bytes = match remote {
409 SocketAddr::V4(addr) => addr.ip().octets().to_vec(),
410 SocketAddr::V6(addr) => addr.ip().octets().to_vec(),
411 };
412 id[..addr_bytes.len().min(32)].copy_from_slice(&addr_bytes[..addr_bytes.len().min(32)]);
413 id
414 };
415
416 let (addr_bytes, addr_type) = socket_addr_to_bytes(remote);
417 trace_event!(&this.event_log, Event {
418 timestamp: timestamp_now(),
419 trace_id: this.trace_context.trace_id(),
420 sequence: 0,
421 _padding: 0,
422 node_id: [0u8; 32], event_data: EventData::ConnInit {
424 endpoint_bytes: addr_bytes,
425 addr_type,
426 _padding: [0u8; 45],
427 },
428 });
429 }
430
431 if path_validated {
432 this.on_path_validated();
433 }
434 if side.is_client() {
435 this.write_crypto();
437 this.init_0rtt();
438 }
439 this
440 }
441
442 #[cfg(feature = "__qlog")]
444 pub fn set_qlog(
445 &mut self,
446 writer: Box<dyn std::io::Write + Send + Sync>,
447 _title: Option<String>,
448 _description: Option<String>,
449 _now: Instant,
450 ) {
451 self.qlog_streamer = Some(writer);
452 }
453
454 #[cfg(feature = "__qlog")]
456 fn emit_qlog_recovery_metrics(&mut self, _now: Instant) {
457 }
460
461 #[must_use]
469 pub fn poll_timeout(&mut self) -> Option<Instant> {
470 let mut next_timeout = self.timers.next_timeout();
471
472 if let Some(nat_state) = &self.nat_traversal {
474 if let Some(nat_timeout) = nat_state.get_next_timeout(Instant::now()) {
475 self.timers.set(Timer::NatTraversal, nat_timeout);
477 next_timeout = Some(next_timeout.map_or(nat_timeout, |t| t.min(nat_timeout)));
478 }
479 }
480
481 next_timeout
482 }
483
484 #[must_use]
490 pub fn poll(&mut self) -> Option<Event> {
491 if let Some(x) = self.events.pop_front() {
492 return Some(x);
493 }
494
495 if let Some(event) = self.streams.poll() {
496 return Some(Event::Stream(event));
497 }
498
499 if let Some(err) = self.error.take() {
500 return Some(Event::ConnectionLost { reason: err });
501 }
502
503 None
504 }
505
506 #[must_use]
508 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
509 self.endpoint_events.pop_front().map(EndpointEvent)
510 }
511
512 #[must_use]
514 pub fn streams(&mut self) -> Streams<'_> {
515 Streams {
516 state: &mut self.streams,
517 conn_state: &self.state,
518 }
519 }
520
521 #[cfg(feature = "trace")]
523 pub(crate) fn trace_context(&self) -> &crate::tracing::TraceContext {
524 &self.trace_context
525 }
526
527 #[cfg(feature = "trace")]
529 pub(crate) fn event_log(&self) -> &Arc<crate::tracing::EventLog> {
530 &self.event_log
531 }
532
533 #[must_use]
535 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
536 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
537 RecvStream {
538 id,
539 state: &mut self.streams,
540 pending: &mut self.spaces[SpaceId::Data].pending,
541 }
542 }
543
544 #[must_use]
546 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
547 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
548 SendStream {
549 id,
550 state: &mut self.streams,
551 pending: &mut self.spaces[SpaceId::Data].pending,
552 conn_state: &self.state,
553 }
554 }
555
556 #[must_use]
566 pub fn poll_transmit(
567 &mut self,
568 now: Instant,
569 max_datagrams: usize,
570 buf: &mut Vec<u8>,
571 ) -> Option<Transmit> {
572 assert!(max_datagrams != 0);
573 let max_datagrams = match self.config.enable_segmentation_offload {
574 false => 1,
575 true => max_datagrams,
576 };
577
578 let mut num_datagrams = 0;
579 let mut datagram_start = 0;
582 let mut segment_size = usize::from(self.path.current_mtu());
583
584 if let Some(nat_traversal) = &mut self.nat_traversal {
586 if nat_traversal.check_coordination_timeout(now) {
587 trace!("NAT traversal coordination timed out, may retry");
588 }
589 }
590
591 if let Some(challenge) = self.send_nat_traversal_challenge(now, buf) {
593 return Some(challenge);
594 }
595
596 if let Some(challenge) = self.send_path_challenge(now, buf) {
597 return Some(challenge);
598 }
599
600 for space in SpaceId::iter() {
602 let request_immediate_ack =
603 space == SpaceId::Data && self.peer_supports_ack_frequency();
604 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
605 }
606
607 let close = match self.state {
609 State::Drained => {
610 self.app_limited = true;
611 return None;
612 }
613 State::Draining | State::Closed(_) => {
614 if !self.close {
617 self.app_limited = true;
618 return None;
619 }
620 true
621 }
622 _ => false,
623 };
624
625 if let Some(config) = &self.config.ack_frequency_config {
627 self.spaces[SpaceId::Data].pending.ack_frequency = self
628 .ack_frequency
629 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
630 && self.highest_space == SpaceId::Data
631 && self.peer_supports_ack_frequency();
632 }
633
634 let mut buf_capacity = 0;
638
639 let mut coalesce = true;
640 let mut builder_storage: Option<PacketBuilder> = None;
641 let mut sent_frames = None;
642 let mut pad_datagram = false;
643 let mut pad_datagram_to_mtu = false;
644 let mut congestion_blocked = false;
645
646 let mut space_idx = 0;
648 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
649 while space_idx < spaces.len() {
652 let space_id = spaces[space_idx];
653 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
660 let frame_space_1rtt =
661 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
662
663 let can_send = self.space_can_send(space_id, frame_space_1rtt);
665 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
666 space_idx += 1;
667 continue;
668 }
669
670 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
671 || self.spaces[space_id].ping_pending
672 || self.spaces[space_id].immediate_ack_pending;
673 if space_id == SpaceId::Data {
674 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
675 }
676
677 pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
678
679 let buf_end = if let Some(builder) = &builder_storage {
683 buf.len().max(builder.min_size) + builder.tag_len
684 } else {
685 buf.len()
686 };
687
688 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
689 crypto.packet.local.tag_len()
690 } else if space_id == SpaceId::Data {
691 self.zero_rtt_crypto.as_ref().expect(
692 "sending packets in the application data space requires known 0-RTT or 1-RTT keys",
693 ).packet.tag_len()
694 } else {
695 unreachable!("tried to send {:?} packet without keys", space_id)
696 };
697 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
698 if num_datagrams >= max_datagrams {
702 break;
704 }
705
706 if self
713 .path
714 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
715 {
716 trace!("blocked by anti-amplification");
717 break;
718 }
719
720 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
723 let untracked_bytes = if let Some(builder) = &builder_storage {
725 buf_capacity - builder.partial_encode.start
726 } else {
727 0
728 } as u64;
729 debug_assert!(untracked_bytes <= segment_size as u64);
730
731 let bytes_to_send = segment_size as u64 + untracked_bytes;
732 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
733 space_idx += 1;
734 congestion_blocked = true;
735 trace!("blocked by congestion control");
738 continue;
739 }
740
741 let smoothed_rtt = self.path.rtt.get();
743 if let Some(delay) = self.path.pacing.delay(
744 smoothed_rtt,
745 bytes_to_send,
746 self.path.current_mtu(),
747 self.path.congestion.window(),
748 now,
749 ) {
750 self.timers.set(Timer::Pacing, delay);
751 congestion_blocked = true;
752 trace!("blocked by pacing");
755 break;
756 }
757 }
758
759 if let Some(mut builder) = builder_storage.take() {
761 if pad_datagram {
762 builder.pad_to(MIN_INITIAL_SIZE);
763 }
764
765 if num_datagrams > 1 || pad_datagram_to_mtu {
766 const MAX_PADDING: usize = 16;
779 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
780 - datagram_start
781 + builder.tag_len;
782 if (packet_len_unpadded + MAX_PADDING < segment_size
783 && !pad_datagram_to_mtu)
784 || datagram_start + segment_size > buf_capacity
785 {
786 trace!(
787 "GSO truncated by demand for {} padding bytes or loss probe",
788 segment_size - packet_len_unpadded
789 );
790 builder_storage = Some(builder);
791 break;
792 }
793
794 builder.pad_to(segment_size as u16);
797 }
798
799 builder.finish_and_track(now, self, sent_frames.take(), buf);
800
801 if num_datagrams == 1 {
802 segment_size = buf.len();
809 buf_capacity = buf.len();
812
813 if space_id == SpaceId::Data {
820 let frame_space_1rtt =
821 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
822 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
823 break;
824 }
825 }
826 }
827 }
828
829 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
831 0 => segment_size,
832 _ => {
833 self.spaces[space_id].loss_probes -= 1;
834 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
838 }
839 };
840 buf_capacity += next_datagram_size_limit;
841 if buf.capacity() < buf_capacity {
842 buf.reserve(max_datagrams * segment_size);
851 }
852 num_datagrams += 1;
853 coalesce = true;
854 pad_datagram = false;
855 datagram_start = buf.len();
856
857 debug_assert_eq!(
858 datagram_start % segment_size,
859 0,
860 "datagrams in a GSO batch must be aligned to the segment size"
861 );
862 } else {
863 if let Some(builder) = builder_storage.take() {
867 builder.finish_and_track(now, self, sent_frames.take(), buf);
868 }
869 }
870
871 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
872
873 if self.spaces[SpaceId::Initial].crypto.is_some()
878 && space_id == SpaceId::Handshake
879 && self.side.is_client()
880 {
881 self.discard_space(now, SpaceId::Initial);
884 }
885 if let Some(ref mut prev) = self.prev_crypto {
886 prev.update_unacked = false;
887 }
888
889 debug_assert!(
890 builder_storage.is_none() && sent_frames.is_none(),
891 "Previous packet must have been finished"
892 );
893
894 let builder = builder_storage.insert(PacketBuilder::new(
895 now,
896 space_id,
897 self.rem_cids.active(),
898 buf,
899 buf_capacity,
900 datagram_start,
901 ack_eliciting,
902 self,
903 )?);
904 coalesce = coalesce && !builder.short_header;
905
906 pad_datagram |=
908 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
909
910 if close {
911 trace!("sending CONNECTION_CLOSE");
912 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
917 Self::populate_acks(
918 now,
919 self.receiving_ecn,
920 &mut SentFrames::default(),
921 &mut self.spaces[space_id],
922 buf,
923 &mut self.stats,
924 );
925 }
926
927 debug_assert!(
931 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
932 "ACKs should leave space for ConnectionClose"
933 );
934 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
935 let max_frame_size = builder.max_size - buf.len();
936 match self.state {
937 State::Closed(state::Closed { ref reason }) => {
938 if space_id == SpaceId::Data || reason.is_transport_layer() {
939 reason.encode(buf, max_frame_size)
940 } else {
941 frame::ConnectionClose {
942 error_code: TransportErrorCode::APPLICATION_ERROR,
943 frame_type: None,
944 reason: Bytes::new(),
945 }
946 .encode(buf, max_frame_size)
947 }
948 }
949 State::Draining => frame::ConnectionClose {
950 error_code: TransportErrorCode::NO_ERROR,
951 frame_type: None,
952 reason: Bytes::new(),
953 }
954 .encode(buf, max_frame_size),
955 _ => unreachable!(
956 "tried to make a close packet when the connection wasn't closed"
957 ),
958 }
959 }
960 if space_id == self.highest_space {
961 self.close = false;
963 break;
965 } else {
966 space_idx += 1;
970 continue;
971 }
972 }
973
974 if space_id == SpaceId::Data && num_datagrams == 1 {
977 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
978 let mut builder = builder_storage.take().unwrap();
981 trace!("PATH_RESPONSE {:08x} (off-path)", token);
982 buf.write(frame::FrameType::PATH_RESPONSE);
983 buf.write(token);
984 self.stats.frame_tx.path_response += 1;
985 builder.pad_to(MIN_INITIAL_SIZE);
986 builder.finish_and_track(
987 now,
988 self,
989 Some(SentFrames {
990 non_retransmits: true,
991 ..SentFrames::default()
992 }),
993 buf,
994 );
995 self.stats.udp_tx.on_sent(1, buf.len());
996
997 #[cfg(feature = "trace")]
999 {
1000 use crate::tracing::*;
1001 use crate::trace_packet_sent;
1002 trace_packet_sent!(
1003 &self.event_log,
1004 self.trace_context.trace_id(),
1005 buf.len() as u32,
1006 0 );
1008 }
1009
1010 return Some(Transmit {
1011 destination: remote,
1012 size: buf.len(),
1013 ecn: None,
1014 segment_size: None,
1015 src_ip: self.local_ip,
1016 });
1017 }
1018 }
1019
1020 if space_id == SpaceId::Data && self.address_discovery_state.is_some() {
1022 let peer_supports = self.peer_params.address_discovery.is_some() &&
1023 self.peer_params.address_discovery.as_ref().unwrap().enabled;
1024
1025 if let Some(state) = &mut self.address_discovery_state {
1026 let frames = state.check_for_address_observations(0, peer_supports, now);
1027 self.spaces[space_id].pending.observed_addresses.extend(frames);
1028 }
1029 }
1030
1031 let sent =
1032 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
1033
1034 debug_assert!(
1041 !(sent.is_ack_only(&self.streams)
1042 && !can_send.acks
1043 && can_send.other
1044 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
1045 && self.datagrams.outgoing.is_empty()),
1046 "SendableFrames was {can_send:?}, but only ACKs have been written"
1047 );
1048 pad_datagram |= sent.requires_padding;
1049
1050 if sent.largest_acked.is_some() {
1051 self.spaces[space_id].pending_acks.acks_sent();
1052 self.timers.stop(Timer::MaxAckDelay);
1053 }
1054
1055 sent_frames = Some(sent);
1057
1058 }
1061
1062 if let Some(mut builder) = builder_storage {
1064 if pad_datagram {
1065 builder.pad_to(MIN_INITIAL_SIZE);
1066 }
1067
1068 if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
1074 builder.pad_to(segment_size as u16);
1075 }
1076
1077 let last_packet_number = builder.exact_number;
1078 builder.finish_and_track(now, self, sent_frames, buf);
1079 self.path
1080 .congestion
1081 .on_sent(now, buf.len() as u64, last_packet_number);
1082
1083 #[cfg(feature = "__qlog")]
1084 self.emit_qlog_recovery_metrics(now);
1085 }
1086
1087 self.app_limited = buf.is_empty() && !congestion_blocked;
1088
1089 if buf.is_empty() && self.state.is_established() {
1091 let space_id = SpaceId::Data;
1092 let probe_size = self
1093 .path
1094 .mtud
1095 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
1096
1097 let buf_capacity = probe_size as usize;
1098 buf.reserve(buf_capacity);
1099
1100 let mut builder = PacketBuilder::new(
1101 now,
1102 space_id,
1103 self.rem_cids.active(),
1104 buf,
1105 buf_capacity,
1106 0,
1107 true,
1108 self,
1109 )?;
1110
1111 buf.write(frame::FrameType::PING);
1113 self.stats.frame_tx.ping += 1;
1114
1115 if self.peer_supports_ack_frequency() {
1117 buf.write(frame::FrameType::IMMEDIATE_ACK);
1118 self.stats.frame_tx.immediate_ack += 1;
1119 }
1120
1121 builder.pad_to(probe_size);
1122 let sent_frames = SentFrames {
1123 non_retransmits: true,
1124 ..Default::default()
1125 };
1126 builder.finish_and_track(now, self, Some(sent_frames), buf);
1127
1128 self.stats.path.sent_plpmtud_probes += 1;
1129 num_datagrams = 1;
1130
1131 trace!(?probe_size, "writing MTUD probe");
1132 }
1133
1134 if buf.is_empty() {
1135 return None;
1136 }
1137
1138 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1139 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1140
1141 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
1142
1143 #[cfg(feature = "trace")]
1145 {
1146 use crate::tracing::*;
1147 use crate::trace_packet_sent;
1148 let packet_num = self.spaces[SpaceId::Data].next_packet_number.saturating_sub(1);
1150 trace_packet_sent!(
1151 &self.event_log,
1152 self.trace_context.trace_id(),
1153 buf.len() as u32,
1154 packet_num
1155 );
1156 }
1157
1158 Some(Transmit {
1159 destination: self.path.remote,
1160 size: buf.len(),
1161 ecn: if self.path.sending_ecn {
1162 Some(EcnCodepoint::Ect0)
1163 } else {
1164 None
1165 },
1166 segment_size: match num_datagrams {
1167 1 => None,
1168 _ => Some(segment_size),
1169 },
1170 src_ip: self.local_ip,
1171 })
1172 }
1173
1174 fn send_coordination_request(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1176 let should_send = self.nat_traversal.as_ref()?.should_send_punch_request();
1178 if !should_send {
1179 return None;
1180 }
1181
1182 let (round, target_addrs, coordinator_addr) = {
1183 let nat_traversal = self.nat_traversal.as_ref()?;
1184 let coord = nat_traversal.coordination.as_ref()?;
1185 let addrs: Vec<_> = coord.punch_targets.iter().map(|t| t.remote_addr).collect();
1186 (coord.round, addrs, self.path.remote) };
1188
1189 if target_addrs.is_empty() {
1190 return None;
1191 }
1192
1193 debug_assert_eq!(
1194 self.highest_space,
1195 SpaceId::Data,
1196 "PUNCH_ME_NOW queued without 1-RTT keys"
1197 );
1198
1199 buf.reserve(MIN_INITIAL_SIZE as usize);
1200 let buf_capacity = buf.capacity();
1201
1202 let mut builder = PacketBuilder::new(
1203 now,
1204 SpaceId::Data,
1205 self.rem_cids.active(),
1206 buf,
1207 buf_capacity,
1208 0,
1209 false,
1210 self,
1211 )?;
1212
1213 trace!(
1214 "sending PUNCH_ME_NOW round {} with {} targets",
1215 round,
1216 target_addrs.len()
1217 );
1218
1219 buf.write(frame::FrameType::PUNCH_ME_NOW);
1221 buf.write(round);
1222 buf.write(target_addrs.len() as u8);
1223 for addr in target_addrs {
1224 match addr {
1225 SocketAddr::V4(v4) => {
1226 buf.write(4u8); buf.write(u32::from(*v4.ip()));
1228 buf.write(v4.port());
1229 }
1230 SocketAddr::V6(v6) => {
1231 buf.write(6u8); buf.write(*v6.ip());
1233 buf.write(v6.port());
1234 }
1235 }
1236 }
1237
1238 self.stats.frame_tx.ping += 1; builder.pad_to(MIN_INITIAL_SIZE);
1241 builder.finish_and_track(now, self, None, buf);
1242
1243 if let Some(nat_traversal) = &mut self.nat_traversal {
1245 nat_traversal.mark_punch_request_sent();
1246 }
1247
1248 Some(Transmit {
1249 destination: coordinator_addr,
1250 size: buf.len(),
1251 ecn: if self.path.sending_ecn {
1252 Some(EcnCodepoint::Ect0)
1253 } else {
1254 None
1255 },
1256 segment_size: None,
1257 src_ip: self.local_ip,
1258 })
1259 }
1260
1261 fn send_coordinated_path_challenge(
1263 &mut self,
1264 now: Instant,
1265 buf: &mut Vec<u8>,
1266 ) -> Option<Transmit> {
1267 if let Some(nat_traversal) = &mut self.nat_traversal {
1269 if nat_traversal.should_start_punching(now) {
1270 nat_traversal.start_punching_phase(now);
1271 }
1272 }
1273
1274 let (target_addr, challenge) = {
1276 let nat_traversal = self.nat_traversal.as_ref()?;
1277 match nat_traversal.get_coordination_phase() {
1278 Some(CoordinationPhase::Punching) => {
1279 let targets = nat_traversal.get_punch_targets_from_coordination()?;
1280 if targets.is_empty() {
1281 return None;
1282 }
1283 let target = &targets[0];
1285 (target.remote_addr, target.challenge)
1286 }
1287 _ => return None,
1288 }
1289 };
1290
1291 debug_assert_eq!(
1292 self.highest_space,
1293 SpaceId::Data,
1294 "PATH_CHALLENGE queued without 1-RTT keys"
1295 );
1296
1297 buf.reserve(MIN_INITIAL_SIZE as usize);
1298 let buf_capacity = buf.capacity();
1299
1300 let mut builder = PacketBuilder::new(
1301 now,
1302 SpaceId::Data,
1303 self.rem_cids.active(),
1304 buf,
1305 buf_capacity,
1306 0,
1307 false,
1308 self,
1309 )?;
1310
1311 trace!(
1312 "sending coordinated PATH_CHALLENGE {:08x} to {}",
1313 challenge, target_addr
1314 );
1315 buf.write(frame::FrameType::PATH_CHALLENGE);
1316 buf.write(challenge);
1317 self.stats.frame_tx.path_challenge += 1;
1318
1319 builder.pad_to(MIN_INITIAL_SIZE);
1320 builder.finish_and_track(now, self, None, buf);
1321
1322 if let Some(nat_traversal) = &mut self.nat_traversal {
1324 nat_traversal.mark_coordination_validating();
1325 }
1326
1327 Some(Transmit {
1328 destination: target_addr,
1329 size: buf.len(),
1330 ecn: if self.path.sending_ecn {
1331 Some(EcnCodepoint::Ect0)
1332 } else {
1333 None
1334 },
1335 segment_size: None,
1336 src_ip: self.local_ip,
1337 })
1338 }
1339
1340 fn send_nat_traversal_challenge(
1342 &mut self,
1343 now: Instant,
1344 buf: &mut Vec<u8>,
1345 ) -> Option<Transmit> {
1346 if let Some(request) = self.send_coordination_request(now, buf) {
1348 return Some(request);
1349 }
1350
1351 if let Some(punch) = self.send_coordinated_path_challenge(now, buf) {
1353 return Some(punch);
1354 }
1355
1356 let (remote_addr, remote_sequence) = {
1358 let nat_traversal = self.nat_traversal.as_ref()?;
1359 let candidates = nat_traversal.get_validation_candidates();
1360 if candidates.is_empty() {
1361 return None;
1362 }
1363 let (sequence, candidate) = candidates[0];
1365 (candidate.address, sequence)
1366 };
1367
1368 let challenge = self.rng.gen::<u64>();
1369
1370 if let Err(e) =
1372 self.nat_traversal
1373 .as_mut()?
1374 .start_validation(remote_sequence, challenge, now)
1375 {
1376 warn!("Failed to start NAT traversal validation: {}", e);
1377 return None;
1378 }
1379
1380 debug_assert_eq!(
1381 self.highest_space,
1382 SpaceId::Data,
1383 "PATH_CHALLENGE queued without 1-RTT keys"
1384 );
1385
1386 buf.reserve(MIN_INITIAL_SIZE as usize);
1387 let buf_capacity = buf.capacity();
1388
1389 let mut builder = PacketBuilder::new(
1391 now,
1392 SpaceId::Data,
1393 self.rem_cids.active(),
1394 buf,
1395 buf_capacity,
1396 0,
1397 false,
1398 self,
1399 )?;
1400
1401 trace!(
1402 "sending PATH_CHALLENGE {:08x} to NAT candidate {}",
1403 challenge, remote_addr
1404 );
1405 buf.write(frame::FrameType::PATH_CHALLENGE);
1406 buf.write(challenge);
1407 self.stats.frame_tx.path_challenge += 1;
1408
1409 builder.pad_to(MIN_INITIAL_SIZE);
1411
1412 builder.finish_and_track(now, self, None, buf);
1413
1414 Some(Transmit {
1415 destination: remote_addr,
1416 size: buf.len(),
1417 ecn: if self.path.sending_ecn {
1418 Some(EcnCodepoint::Ect0)
1419 } else {
1420 None
1421 },
1422 segment_size: None,
1423 src_ip: self.local_ip,
1424 })
1425 }
1426
1427 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1429 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1430 if !prev_path.challenge_pending {
1431 return None;
1432 }
1433 prev_path.challenge_pending = false;
1434 let token = prev_path
1435 .challenge
1436 .expect("previous path challenge pending without token");
1437 let destination = prev_path.remote;
1438 debug_assert_eq!(
1439 self.highest_space,
1440 SpaceId::Data,
1441 "PATH_CHALLENGE queued without 1-RTT keys"
1442 );
1443 buf.reserve(MIN_INITIAL_SIZE as usize);
1444
1445 let buf_capacity = buf.capacity();
1446
1447 let mut builder = PacketBuilder::new(
1453 now,
1454 SpaceId::Data,
1455 *prev_cid,
1456 buf,
1457 buf_capacity,
1458 0,
1459 false,
1460 self,
1461 )?;
1462 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1463 buf.write(frame::FrameType::PATH_CHALLENGE);
1464 buf.write(token);
1465 self.stats.frame_tx.path_challenge += 1;
1466
1467 builder.pad_to(MIN_INITIAL_SIZE);
1472
1473 builder.finish(self, buf);
1474 self.stats.udp_tx.on_sent(1, buf.len());
1475
1476 Some(Transmit {
1477 destination,
1478 size: buf.len(),
1479 ecn: None,
1480 segment_size: None,
1481 src_ip: self.local_ip,
1482 })
1483 }
1484
1485 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1487 if self.spaces[space_id].crypto.is_none()
1488 && (space_id != SpaceId::Data
1489 || self.zero_rtt_crypto.is_none()
1490 || self.side.is_server())
1491 {
1492 return SendableFrames::empty();
1494 }
1495 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1496 if space_id == SpaceId::Data {
1497 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1498 }
1499 can_send
1500 }
1501
1502 pub fn handle_event(&mut self, event: ConnectionEvent) {
1508 use ConnectionEventInner::*;
1509 match event.0 {
1510 Datagram(DatagramConnectionEvent {
1511 now,
1512 remote,
1513 ecn,
1514 first_decode,
1515 remaining,
1516 }) => {
1517 if remote != self.path.remote && !self.side.remote_may_migrate() {
1521 trace!("discarding packet from unrecognized peer {}", remote);
1522 return;
1523 }
1524
1525 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1526
1527 self.stats.udp_rx.datagrams += 1;
1528 self.stats.udp_rx.bytes += first_decode.len() as u64;
1529 let data_len = first_decode.len();
1530
1531 self.handle_decode(now, remote, ecn, first_decode);
1532 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1537
1538 if let Some(data) = remaining {
1539 self.stats.udp_rx.bytes += data.len() as u64;
1540 self.handle_coalesced(now, remote, ecn, data);
1541 }
1542
1543 #[cfg(feature = "__qlog")]
1544 self.emit_qlog_recovery_metrics(now);
1545
1546 if was_anti_amplification_blocked {
1547 self.set_loss_detection_timer(now);
1551 }
1552 }
1553 NewIdentifiers(ids, now) => {
1554 self.local_cid_state.new_cids(&ids, now);
1555 ids.into_iter().rev().for_each(|frame| {
1556 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1557 });
1558 if self
1560 .timers
1561 .get(Timer::PushNewCid)
1562 .map_or(true, |x| x <= now)
1563 {
1564 self.reset_cid_retirement();
1565 }
1566 }
1567 }
1568 }
1569
1570 pub fn handle_timeout(&mut self, now: Instant) {
1580 for &timer in &Timer::VALUES {
1581 if !self.timers.is_expired(timer, now) {
1582 continue;
1583 }
1584 self.timers.stop(timer);
1585 trace!(timer = ?timer, "timeout");
1586 match timer {
1587 Timer::Close => {
1588 self.state = State::Drained;
1589 self.endpoint_events.push_back(EndpointEventInner::Drained);
1590 }
1591 Timer::Idle => {
1592 self.kill(ConnectionError::TimedOut);
1593 }
1594 Timer::KeepAlive => {
1595 trace!("sending keep-alive");
1596 self.ping();
1597 }
1598 Timer::LossDetection => {
1599 self.on_loss_detection_timeout(now);
1600
1601 #[cfg(feature = "__qlog")]
1602 self.emit_qlog_recovery_metrics(now);
1603 }
1604 Timer::KeyDiscard => {
1605 self.zero_rtt_crypto = None;
1606 self.prev_crypto = None;
1607 }
1608 Timer::PathValidation => {
1609 debug!("path validation failed");
1610 if let Some((_, prev)) = self.prev_path.take() {
1611 self.path = prev;
1612 }
1613 self.path.challenge = None;
1614 self.path.challenge_pending = false;
1615 }
1616 Timer::Pacing => trace!("pacing timer expired"),
1617 Timer::NatTraversal => {
1618 self.handle_nat_traversal_timeout(now);
1619 }
1620 Timer::PushNewCid => {
1621 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1623 if !self.state.is_closed() {
1624 trace!(
1625 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1626 self.local_cid_state.retire_prior_to()
1627 );
1628 self.endpoint_events
1629 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1630 }
1631 }
1632 Timer::MaxAckDelay => {
1633 trace!("max ack delay reached");
1634 self.spaces[SpaceId::Data]
1636 .pending_acks
1637 .on_max_ack_delay_timeout()
1638 }
1639 }
1640 }
1641 }
1642
1643 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1655 self.close_inner(
1656 now,
1657 Close::Application(frame::ApplicationClose { error_code, reason }),
1658 )
1659 }
1660
1661 fn close_inner(&mut self, now: Instant, reason: Close) {
1662 let was_closed = self.state.is_closed();
1663 if !was_closed {
1664 self.close_common();
1665 self.set_close_timer(now);
1666 self.close = true;
1667 self.state = State::Closed(state::Closed { reason });
1668 }
1669 }
1670
1671 pub fn datagrams(&mut self) -> Datagrams<'_> {
1673 Datagrams { conn: self }
1674 }
1675
1676 pub fn stats(&self) -> ConnectionStats {
1678 let mut stats = self.stats;
1679 stats.path.rtt = self.path.rtt.get();
1680 stats.path.cwnd = self.path.congestion.window();
1681 stats.path.current_mtu = self.path.mtud.current_mtu();
1682
1683 stats
1684 }
1685
1686 pub fn ping(&mut self) {
1690 self.spaces[self.highest_space].ping_pending = true;
1691 }
1692
1693 pub fn force_key_update(&mut self) {
1697 if !self.state.is_established() {
1698 debug!("ignoring forced key update in illegal state");
1699 return;
1700 }
1701 if self.prev_crypto.is_some() {
1702 debug!("ignoring redundant forced key update");
1705 return;
1706 }
1707 self.update_keys(None, false);
1708 }
1709
1710 #[doc(hidden)]
1712 #[deprecated]
1713 pub fn initiate_key_update(&mut self) {
1714 self.force_key_update();
1715 }
1716
1717 pub fn crypto_session(&self) -> &dyn crypto::Session {
1719 &*self.crypto
1720 }
1721
1722 pub fn is_handshaking(&self) -> bool {
1727 self.state.is_handshake()
1728 }
1729
1730 pub fn is_closed(&self) -> bool {
1738 self.state.is_closed()
1739 }
1740
1741 pub fn is_drained(&self) -> bool {
1746 self.state.is_drained()
1747 }
1748
1749 pub fn accepted_0rtt(&self) -> bool {
1753 self.accepted_0rtt
1754 }
1755
1756 pub fn has_0rtt(&self) -> bool {
1758 self.zero_rtt_enabled
1759 }
1760
1761 pub fn has_pending_retransmits(&self) -> bool {
1763 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1764 }
1765
1766 pub fn side(&self) -> Side {
1768 self.side.side()
1769 }
1770
1771 pub fn remote_address(&self) -> SocketAddr {
1773 self.path.remote
1774 }
1775
1776 pub fn local_ip(&self) -> Option<IpAddr> {
1786 self.local_ip
1787 }
1788
1789 pub fn rtt(&self) -> Duration {
1791 self.path.rtt.get()
1792 }
1793
1794 pub fn congestion_state(&self) -> &dyn Controller {
1796 self.path.congestion.as_ref()
1797 }
1798
1799 pub fn path_changed(&mut self, now: Instant) {
1810 self.path.reset(now, &self.config);
1811 }
1812
1813 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1818 self.streams.set_max_concurrent(dir, count);
1819 let pending = &mut self.spaces[SpaceId::Data].pending;
1822 self.streams.queue_max_stream_id(pending);
1823 }
1824
1825 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1831 self.streams.max_concurrent(dir)
1832 }
1833
1834 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1836 if self.streams.set_receive_window(receive_window) {
1837 self.spaces[SpaceId::Data].pending.max_data = true;
1838 }
1839 }
1840
1841 pub fn set_address_discovery_enabled(&mut self, enabled: bool) {
1843 if let Some(ref mut state) = self.address_discovery_state {
1844 state.enabled = enabled;
1845 }
1846 }
1847
1848 pub fn address_discovery_enabled(&self) -> bool {
1850 self.address_discovery_state
1851 .as_ref()
1852 .map_or(false, |state| state.enabled)
1853 }
1854
1855 pub fn observed_address(&self) -> Option<SocketAddr> {
1860 self.address_discovery_state
1861 .as_ref()
1862 .and_then(|state| state.get_observed_address(0)) }
1864
1865 pub(crate) fn address_discovery_state(&self) -> Option<&AddressDiscoveryState> {
1867 self.address_discovery_state.as_ref()
1868 }
1869
1870 fn on_ack_received(
1871 &mut self,
1872 now: Instant,
1873 space: SpaceId,
1874 ack: frame::Ack,
1875 ) -> Result<(), TransportError> {
1876 if ack.largest >= self.spaces[space].next_packet_number {
1877 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1878 }
1879 let new_largest = {
1880 let space = &mut self.spaces[space];
1881 if space
1882 .largest_acked_packet
1883 .map_or(true, |pn| ack.largest > pn)
1884 {
1885 space.largest_acked_packet = Some(ack.largest);
1886 if let Some(info) = space.sent_packets.get(&ack.largest) {
1887 space.largest_acked_packet_sent = info.time_sent;
1891 }
1892 true
1893 } else {
1894 false
1895 }
1896 };
1897
1898 let mut newly_acked = ArrayRangeSet::new();
1900 for range in ack.iter() {
1901 self.packet_number_filter.check_ack(space, range.clone())?;
1902 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1903 newly_acked.insert_one(pn);
1904 }
1905 }
1906
1907 if newly_acked.is_empty() {
1908 return Ok(());
1909 }
1910
1911 let mut ack_eliciting_acked = false;
1912 for packet in newly_acked.elts() {
1913 if let Some(info) = self.spaces[space].take(packet) {
1914 if let Some(acked) = info.largest_acked {
1915 self.spaces[space].pending_acks.subtract_below(acked);
1921 }
1922 ack_eliciting_acked |= info.ack_eliciting;
1923
1924 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1926 if mtu_updated {
1927 self.path
1928 .congestion
1929 .on_mtu_update(self.path.mtud.current_mtu());
1930 }
1931
1932 self.ack_frequency.on_acked(packet);
1934
1935 self.on_packet_acked(now, packet, info);
1936 }
1937 }
1938
1939 self.path.congestion.on_end_acks(
1940 now,
1941 self.path.in_flight.bytes,
1942 self.app_limited,
1943 self.spaces[space].largest_acked_packet,
1944 );
1945
1946 if new_largest && ack_eliciting_acked {
1947 let ack_delay = if space != SpaceId::Data {
1948 Duration::from_micros(0)
1949 } else {
1950 cmp::min(
1951 self.ack_frequency.peer_max_ack_delay,
1952 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1953 )
1954 };
1955 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1956 self.path.rtt.update(ack_delay, rtt);
1957 if self.path.first_packet_after_rtt_sample.is_none() {
1958 self.path.first_packet_after_rtt_sample =
1959 Some((space, self.spaces[space].next_packet_number));
1960 }
1961 }
1962
1963 self.detect_lost_packets(now, space, true);
1965
1966 if self.peer_completed_address_validation() {
1967 self.pto_count = 0;
1968 }
1969
1970 if self.path.sending_ecn {
1972 if let Some(ecn) = ack.ecn {
1973 if new_largest {
1978 let sent = self.spaces[space].largest_acked_packet_sent;
1979 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1980 }
1981 } else {
1982 debug!("ECN not acknowledged by peer");
1984 self.path.sending_ecn = false;
1985 }
1986 }
1987
1988 self.set_loss_detection_timer(now);
1989 Ok(())
1990 }
1991
1992 fn process_ecn(
1994 &mut self,
1995 now: Instant,
1996 space: SpaceId,
1997 newly_acked: u64,
1998 ecn: frame::EcnCounts,
1999 largest_sent_time: Instant,
2000 ) {
2001 match self.spaces[space].detect_ecn(newly_acked, ecn) {
2002 Err(e) => {
2003 debug!("halting ECN due to verification failure: {}", e);
2004 self.path.sending_ecn = false;
2005 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
2008 }
2009 Ok(false) => {}
2010 Ok(true) => {
2011 self.stats.path.congestion_events += 1;
2012 self.path
2013 .congestion
2014 .on_congestion_event(now, largest_sent_time, false, 0);
2015 }
2016 }
2017 }
2018
2019 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
2022 self.remove_in_flight(pn, &info);
2023 if info.ack_eliciting && self.path.challenge.is_none() {
2024 self.path.congestion.on_ack(
2027 now,
2028 info.time_sent,
2029 info.size.into(),
2030 self.app_limited,
2031 &self.path.rtt,
2032 );
2033 }
2034
2035 if let Some(retransmits) = info.retransmits.get() {
2037 for (id, _) in retransmits.reset_stream.iter() {
2038 self.streams.reset_acked(*id);
2039 }
2040 }
2041
2042 for frame in info.stream_frames {
2043 self.streams.received_ack_of(frame);
2044 }
2045 }
2046
2047 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2048 let start = if self.zero_rtt_crypto.is_some() {
2049 now
2050 } else {
2051 self.prev_crypto
2052 .as_ref()
2053 .expect("no previous keys")
2054 .end_packet
2055 .as_ref()
2056 .expect("update not acknowledged yet")
2057 .1
2058 };
2059 self.timers
2060 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
2061 }
2062
2063 fn on_loss_detection_timeout(&mut self, now: Instant) {
2064 if let Some((_, pn_space)) = self.loss_time_and_space() {
2065 self.detect_lost_packets(now, pn_space, false);
2067 self.set_loss_detection_timer(now);
2068 return;
2069 }
2070
2071 let (_, space) = match self.pto_time_and_space(now) {
2072 Some(x) => x,
2073 None => {
2074 error!("PTO expired while unset");
2075 return;
2076 }
2077 };
2078 trace!(
2079 in_flight = self.path.in_flight.bytes,
2080 count = self.pto_count,
2081 ?space,
2082 "PTO fired"
2083 );
2084
2085 let count = match self.path.in_flight.ack_eliciting {
2086 0 => {
2089 debug_assert!(!self.peer_completed_address_validation());
2090 1
2091 }
2092 _ => 2,
2094 };
2095 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
2096 self.pto_count = self.pto_count.saturating_add(1);
2097 self.set_loss_detection_timer(now);
2098 }
2099
2100 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
2101 let mut lost_packets = Vec::<u64>::new();
2102 let mut lost_mtu_probe = None;
2103 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
2104 let rtt = self.path.rtt.conservative();
2105 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
2106
2107 let lost_send_time = now.checked_sub(loss_delay).unwrap();
2109 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
2110 let packet_threshold = self.config.packet_threshold as u64;
2111 let mut size_of_lost_packets = 0u64;
2112
2113 let congestion_period =
2117 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
2118 let mut persistent_congestion_start: Option<Instant> = None;
2119 let mut prev_packet = None;
2120 let mut in_persistent_congestion = false;
2121
2122 let space = &mut self.spaces[pn_space];
2123 space.loss_time = None;
2124
2125 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
2126 if prev_packet != Some(packet.wrapping_sub(1)) {
2127 persistent_congestion_start = None;
2129 }
2130
2131 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
2132 {
2133 if Some(packet) == in_flight_mtu_probe {
2134 lost_mtu_probe = in_flight_mtu_probe;
2137 } else {
2138 lost_packets.push(packet);
2139 size_of_lost_packets += info.size as u64;
2140 if info.ack_eliciting && due_to_ack {
2141 match persistent_congestion_start {
2142 Some(start) if info.time_sent - start > congestion_period => {
2145 in_persistent_congestion = true;
2146 }
2147 None if self
2149 .path
2150 .first_packet_after_rtt_sample
2151 .is_some_and(|x| x < (pn_space, packet)) =>
2152 {
2153 persistent_congestion_start = Some(info.time_sent);
2154 }
2155 _ => {}
2156 }
2157 }
2158 }
2159 } else {
2160 let next_loss_time = info.time_sent + loss_delay;
2161 space.loss_time = Some(
2162 space
2163 .loss_time
2164 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
2165 );
2166 persistent_congestion_start = None;
2167 }
2168
2169 prev_packet = Some(packet);
2170 }
2171
2172 if let Some(largest_lost) = lost_packets.last().cloned() {
2174 let old_bytes_in_flight = self.path.in_flight.bytes;
2175 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
2176 self.lost_packets += lost_packets.len() as u64;
2177 self.stats.path.lost_packets += lost_packets.len() as u64;
2178 self.stats.path.lost_bytes += size_of_lost_packets;
2179 trace!(
2180 "packets lost: {:?}, bytes lost: {}",
2181 lost_packets, size_of_lost_packets
2182 );
2183
2184 for &packet in &lost_packets {
2185 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2187 for frame in info.stream_frames {
2188 self.streams.retransmit(frame);
2189 }
2190 self.spaces[pn_space].pending |= info.retransmits;
2191 self.path.mtud.on_non_probe_lost(packet, info.size);
2192 }
2193
2194 if self.path.mtud.black_hole_detected(now) {
2195 self.stats.path.black_holes_detected += 1;
2196 self.path
2197 .congestion
2198 .on_mtu_update(self.path.mtud.current_mtu());
2199 if let Some(max_datagram_size) = self.datagrams().max_size() {
2200 self.datagrams.drop_oversized(max_datagram_size);
2201 }
2202 }
2203
2204 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
2206
2207 if lost_ack_eliciting {
2208 self.stats.path.congestion_events += 1;
2209 self.path.congestion.on_congestion_event(
2210 now,
2211 largest_lost_sent,
2212 in_persistent_congestion,
2213 size_of_lost_packets,
2214 );
2215 }
2216 }
2217
2218 if let Some(packet) = lost_mtu_probe {
2220 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2222 self.path.mtud.on_probe_lost();
2223 self.stats.path.lost_plpmtud_probes += 1;
2224 }
2225 }
2226
2227 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
2228 SpaceId::iter()
2229 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
2230 .min_by_key(|&(time, _)| time)
2231 }
2232
2233 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
2234 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
2235 let mut duration = self.path.rtt.pto_base() * backoff;
2236
2237 if self.path.in_flight.ack_eliciting == 0 {
2238 debug_assert!(!self.peer_completed_address_validation());
2239 let space = match self.highest_space {
2240 SpaceId::Handshake => SpaceId::Handshake,
2241 _ => SpaceId::Initial,
2242 };
2243 return Some((now + duration, space));
2244 }
2245
2246 let mut result = None;
2247 for space in SpaceId::iter() {
2248 if self.spaces[space].in_flight == 0 {
2249 continue;
2250 }
2251 if space == SpaceId::Data {
2252 if self.is_handshaking() {
2254 return result;
2255 }
2256 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2258 }
2259 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
2260 Some(time) => time,
2261 None => continue,
2262 };
2263 let pto = last_ack_eliciting + duration;
2264 if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
2265 result = Some((pto, space));
2266 }
2267 }
2268 result
2269 }
2270
2271 fn peer_completed_address_validation(&self) -> bool {
2272 if self.side.is_server() || self.state.is_closed() {
2273 return true;
2274 }
2275 self.spaces[SpaceId::Handshake]
2278 .largest_acked_packet
2279 .is_some()
2280 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
2281 || (self.spaces[SpaceId::Data].crypto.is_some()
2282 && self.spaces[SpaceId::Handshake].crypto.is_none())
2283 }
2284
2285 fn set_loss_detection_timer(&mut self, now: Instant) {
2286 if self.state.is_closed() {
2287 return;
2291 }
2292
2293 if let Some((loss_time, _)) = self.loss_time_and_space() {
2294 self.timers.set(Timer::LossDetection, loss_time);
2296 return;
2297 }
2298
2299 if self.path.anti_amplification_blocked(1) {
2300 self.timers.stop(Timer::LossDetection);
2302 return;
2303 }
2304
2305 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
2306 self.timers.stop(Timer::LossDetection);
2309 return;
2310 }
2311
2312 if let Some((timeout, _)) = self.pto_time_and_space(now) {
2315 self.timers.set(Timer::LossDetection, timeout);
2316 } else {
2317 self.timers.stop(Timer::LossDetection);
2318 }
2319 }
2320
2321 fn pto(&self, space: SpaceId) -> Duration {
2323 let max_ack_delay = match space {
2324 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
2325 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
2326 };
2327 self.path.rtt.pto_base() + max_ack_delay
2328 }
2329
2330 fn on_packet_authenticated(
2331 &mut self,
2332 now: Instant,
2333 space_id: SpaceId,
2334 ecn: Option<EcnCodepoint>,
2335 packet: Option<u64>,
2336 spin: bool,
2337 is_1rtt: bool,
2338 ) {
2339 self.total_authed_packets += 1;
2340 self.reset_keep_alive(now);
2341 self.reset_idle_timeout(now, space_id);
2342 self.permit_idle_reset = true;
2343 self.receiving_ecn |= ecn.is_some();
2344 if let Some(x) = ecn {
2345 let space = &mut self.spaces[space_id];
2346 space.ecn_counters += x;
2347
2348 if x.is_ce() {
2349 space.pending_acks.set_immediate_ack_required();
2350 }
2351 }
2352
2353 let packet = match packet {
2354 Some(x) => x,
2355 None => return,
2356 };
2357 if self.side.is_server() {
2358 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
2359 self.discard_space(now, SpaceId::Initial);
2361 }
2362 if self.zero_rtt_crypto.is_some() && is_1rtt {
2363 self.set_key_discard_timer(now, space_id)
2365 }
2366 }
2367 let space = &mut self.spaces[space_id];
2368 space.pending_acks.insert_one(packet, now);
2369 if packet >= space.rx_packet {
2370 space.rx_packet = packet;
2371 self.spin = self.side.is_client() ^ spin;
2373 }
2374 }
2375
2376 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
2377 let timeout = match self.idle_timeout {
2378 None => return,
2379 Some(dur) => dur,
2380 };
2381 if self.state.is_closed() {
2382 self.timers.stop(Timer::Idle);
2383 return;
2384 }
2385 let dt = cmp::max(timeout, 3 * self.pto(space));
2386 self.timers.set(Timer::Idle, now + dt);
2387 }
2388
2389 fn reset_keep_alive(&mut self, now: Instant) {
2390 let interval = match self.config.keep_alive_interval {
2391 Some(x) if self.state.is_established() => x,
2392 _ => return,
2393 };
2394 self.timers.set(Timer::KeepAlive, now + interval);
2395 }
2396
2397 fn reset_cid_retirement(&mut self) {
2398 if let Some(t) = self.local_cid_state.next_timeout() {
2399 self.timers.set(Timer::PushNewCid, t);
2400 }
2401 }
2402
2403 pub(crate) fn handle_first_packet(
2408 &mut self,
2409 now: Instant,
2410 remote: SocketAddr,
2411 ecn: Option<EcnCodepoint>,
2412 packet_number: u64,
2413 packet: InitialPacket,
2414 remaining: Option<BytesMut>,
2415 ) -> Result<(), ConnectionError> {
2416 let span = trace_span!("first recv");
2417 let _guard = span.enter();
2418 debug_assert!(self.side.is_server());
2419 let len = packet.header_data.len() + packet.payload.len();
2420 self.path.total_recvd = len as u64;
2421
2422 match self.state {
2423 State::Handshake(ref mut state) => {
2424 state.expected_token = packet.header.token.clone();
2425 }
2426 _ => unreachable!("first packet must be delivered in Handshake state"),
2427 }
2428
2429 self.on_packet_authenticated(
2430 now,
2431 SpaceId::Initial,
2432 ecn,
2433 Some(packet_number),
2434 false,
2435 false,
2436 );
2437
2438 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2439 if let Some(data) = remaining {
2440 self.handle_coalesced(now, remote, ecn, data);
2441 }
2442
2443 #[cfg(feature = "__qlog")]
2444 self.emit_qlog_recovery_metrics(now);
2445
2446 Ok(())
2447 }
2448
2449 fn init_0rtt(&mut self) {
2450 let (header, packet) = match self.crypto.early_crypto() {
2451 Some(x) => x,
2452 None => return,
2453 };
2454 if self.side.is_client() {
2455 match self.crypto.transport_parameters() {
2456 Ok(params) => {
2457 let params = params
2458 .expect("crypto layer didn't supply transport parameters with ticket");
2459 let params = TransportParameters {
2461 initial_src_cid: None,
2462 original_dst_cid: None,
2463 preferred_address: None,
2464 retry_src_cid: None,
2465 stateless_reset_token: None,
2466 min_ack_delay: None,
2467 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2468 max_ack_delay: TransportParameters::default().max_ack_delay,
2469 ..params
2470 };
2471 self.set_peer_params(params);
2472 }
2473 Err(e) => {
2474 error!("session ticket has malformed transport parameters: {}", e);
2475 return;
2476 }
2477 }
2478 }
2479 trace!("0-RTT enabled");
2480 self.zero_rtt_enabled = true;
2481 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2482 }
2483
2484 fn read_crypto(
2485 &mut self,
2486 space: SpaceId,
2487 crypto: &frame::Crypto,
2488 payload_len: usize,
2489 ) -> Result<(), TransportError> {
2490 let expected = if !self.state.is_handshake() {
2491 SpaceId::Data
2492 } else if self.highest_space == SpaceId::Initial {
2493 SpaceId::Initial
2494 } else {
2495 SpaceId::Handshake
2498 };
2499 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2503
2504 let end = crypto.offset + crypto.data.len() as u64;
2505 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2506 warn!(
2507 "received new {:?} CRYPTO data when expecting {:?}",
2508 space, expected
2509 );
2510 return Err(TransportError::PROTOCOL_VIOLATION(
2511 "new data at unexpected encryption level",
2512 ));
2513 }
2514
2515 let space = &mut self.spaces[space];
2516 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2517 if max > self.config.crypto_buffer_size as u64 {
2518 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2519 }
2520
2521 space
2522 .crypto_stream
2523 .insert(crypto.offset, crypto.data.clone(), payload_len);
2524 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2525 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2526 if self.crypto.read_handshake(&chunk.bytes)? {
2527 self.events.push_back(Event::HandshakeDataReady);
2528 }
2529 }
2530
2531 Ok(())
2532 }
2533
2534 fn write_crypto(&mut self) {
2535 loop {
2536 let space = self.highest_space;
2537 let mut outgoing = Vec::new();
2538 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2539 match space {
2540 SpaceId::Initial => {
2541 self.upgrade_crypto(SpaceId::Handshake, crypto);
2542 }
2543 SpaceId::Handshake => {
2544 self.upgrade_crypto(SpaceId::Data, crypto);
2545 }
2546 _ => unreachable!("got updated secrets during 1-RTT"),
2547 }
2548 }
2549 if outgoing.is_empty() {
2550 if space == self.highest_space {
2551 break;
2552 } else {
2553 continue;
2555 }
2556 }
2557 let offset = self.spaces[space].crypto_offset;
2558 let outgoing = Bytes::from(outgoing);
2559 if let State::Handshake(ref mut state) = self.state {
2560 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2561 state.client_hello = Some(outgoing.clone());
2562 }
2563 }
2564 self.spaces[space].crypto_offset += outgoing.len() as u64;
2565 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2566 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2567 offset,
2568 data: outgoing,
2569 });
2570 }
2571 }
2572
2573 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2575 debug_assert!(
2576 self.spaces[space].crypto.is_none(),
2577 "already reached packet space {space:?}"
2578 );
2579 trace!("{:?} keys ready", space);
2580 if space == SpaceId::Data {
2581 self.next_crypto = Some(
2583 self.crypto
2584 .next_1rtt_keys()
2585 .expect("handshake should be complete"),
2586 );
2587 }
2588
2589 self.spaces[space].crypto = Some(crypto);
2590 debug_assert!(space as usize > self.highest_space as usize);
2591 self.highest_space = space;
2592 if space == SpaceId::Data && self.side.is_client() {
2593 self.zero_rtt_crypto = None;
2595 }
2596 }
2597
2598 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2599 debug_assert!(space_id != SpaceId::Data);
2600 trace!("discarding {:?} keys", space_id);
2601 if space_id == SpaceId::Initial {
2602 if let ConnectionSide::Client { token, .. } = &mut self.side {
2604 *token = Bytes::new();
2605 }
2606 }
2607 let space = &mut self.spaces[space_id];
2608 space.crypto = None;
2609 space.time_of_last_ack_eliciting_packet = None;
2610 space.loss_time = None;
2611 space.in_flight = 0;
2612 let sent_packets = mem::take(&mut space.sent_packets);
2613 for (pn, packet) in sent_packets.into_iter() {
2614 self.remove_in_flight(pn, &packet);
2615 }
2616 self.set_loss_detection_timer(now)
2617 }
2618
2619 fn handle_coalesced(
2620 &mut self,
2621 now: Instant,
2622 remote: SocketAddr,
2623 ecn: Option<EcnCodepoint>,
2624 data: BytesMut,
2625 ) {
2626 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2627 let mut remaining = Some(data);
2628 while let Some(data) = remaining {
2629 match PartialDecode::new(
2630 data,
2631 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2632 &[self.version],
2633 self.endpoint_config.grease_quic_bit,
2634 ) {
2635 Ok((partial_decode, rest)) => {
2636 remaining = rest;
2637 self.handle_decode(now, remote, ecn, partial_decode);
2638 }
2639 Err(e) => {
2640 trace!("malformed header: {}", e);
2641 return;
2642 }
2643 }
2644 }
2645 }
2646
2647 fn handle_decode(
2648 &mut self,
2649 now: Instant,
2650 remote: SocketAddr,
2651 ecn: Option<EcnCodepoint>,
2652 partial_decode: PartialDecode,
2653 ) {
2654 if let Some(decoded) = packet_crypto::unprotect_header(
2655 partial_decode,
2656 &self.spaces,
2657 self.zero_rtt_crypto.as_ref(),
2658 self.peer_params.stateless_reset_token,
2659 ) {
2660 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2661 }
2662 }
2663
2664 fn handle_packet(
2665 &mut self,
2666 now: Instant,
2667 remote: SocketAddr,
2668 ecn: Option<EcnCodepoint>,
2669 packet: Option<Packet>,
2670 stateless_reset: bool,
2671 ) {
2672 self.stats.udp_rx.ios += 1;
2673 if let Some(ref packet) = packet {
2674 trace!(
2675 "got {:?} packet ({} bytes) from {} using id {}",
2676 packet.header.space(),
2677 packet.payload.len() + packet.header_data.len(),
2678 remote,
2679 packet.header.dst_cid(),
2680 );
2681
2682 #[cfg(feature = "trace")]
2684 {
2685 use crate::tracing::*;
2686 use crate::trace_packet_received;
2687 let packet_size = packet.payload.len() + packet.header_data.len();
2688 trace_packet_received!(
2689 &self.event_log,
2690 self.trace_context.trace_id(),
2691 packet_size as u32,
2692 0 );
2694 }
2695 }
2696
2697 if self.is_handshaking() && remote != self.path.remote {
2698 debug!("discarding packet with unexpected remote during handshake");
2699 return;
2700 }
2701
2702 let was_closed = self.state.is_closed();
2703 let was_drained = self.state.is_drained();
2704
2705 let decrypted = match packet {
2706 None => Err(None),
2707 Some(mut packet) => self
2708 .decrypt_packet(now, &mut packet)
2709 .map(move |number| (packet, number)),
2710 };
2711 let result = match decrypted {
2712 _ if stateless_reset => {
2713 debug!("got stateless reset");
2714 Err(ConnectionError::Reset)
2715 }
2716 Err(Some(e)) => {
2717 warn!("illegal packet: {}", e);
2718 Err(e.into())
2719 }
2720 Err(None) => {
2721 debug!("failed to authenticate packet");
2722 self.authentication_failures += 1;
2723 let integrity_limit = self.spaces[self.highest_space]
2724 .crypto
2725 .as_ref()
2726 .unwrap()
2727 .packet
2728 .local
2729 .integrity_limit();
2730 if self.authentication_failures > integrity_limit {
2731 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2732 } else {
2733 return;
2734 }
2735 }
2736 Ok((packet, number)) => {
2737 let span = match number {
2738 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2739 None => trace_span!("recv", space = ?packet.header.space()),
2740 };
2741 let _guard = span.enter();
2742
2743 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2744 if number.is_some_and(is_duplicate) {
2745 debug!("discarding possible duplicate packet");
2746 return;
2747 } else if self.state.is_handshake() && packet.header.is_short() {
2748 trace!("dropping short packet during handshake");
2750 return;
2751 } else {
2752 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2753 if let State::Handshake(ref hs) = self.state {
2754 if self.side.is_server() && token != &hs.expected_token {
2755 warn!("discarding Initial with invalid retry token");
2759 return;
2760 }
2761 }
2762 }
2763
2764 if !self.state.is_closed() {
2765 let spin = match packet.header {
2766 Header::Short { spin, .. } => spin,
2767 _ => false,
2768 };
2769 self.on_packet_authenticated(
2770 now,
2771 packet.header.space(),
2772 ecn,
2773 number,
2774 spin,
2775 packet.header.is_1rtt(),
2776 );
2777 }
2778
2779 self.process_decrypted_packet(now, remote, number, packet)
2780 }
2781 }
2782 };
2783
2784 if let Err(conn_err) = result {
2786 self.error = Some(conn_err.clone());
2787 self.state = match conn_err {
2788 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2789 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2790 ConnectionError::Reset
2791 | ConnectionError::TransportError(TransportError {
2792 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2793 ..
2794 }) => State::Drained,
2795 ConnectionError::TimedOut => {
2796 unreachable!("timeouts aren't generated by packet processing");
2797 }
2798 ConnectionError::TransportError(err) => {
2799 debug!("closing connection due to transport error: {}", err);
2800 State::closed(err)
2801 }
2802 ConnectionError::VersionMismatch => State::Draining,
2803 ConnectionError::LocallyClosed => {
2804 unreachable!("LocallyClosed isn't generated by packet processing");
2805 }
2806 ConnectionError::CidsExhausted => {
2807 unreachable!("CidsExhausted isn't generated by packet processing");
2808 }
2809 };
2810 }
2811
2812 if !was_closed && self.state.is_closed() {
2813 self.close_common();
2814 if !self.state.is_drained() {
2815 self.set_close_timer(now);
2816 }
2817 }
2818 if !was_drained && self.state.is_drained() {
2819 self.endpoint_events.push_back(EndpointEventInner::Drained);
2820 self.timers.stop(Timer::Close);
2823 }
2824
2825 if let State::Closed(_) = self.state {
2827 self.close = remote == self.path.remote;
2828 }
2829 }
2830
2831 fn process_decrypted_packet(
2832 &mut self,
2833 now: Instant,
2834 remote: SocketAddr,
2835 number: Option<u64>,
2836 packet: Packet,
2837 ) -> Result<(), ConnectionError> {
2838 let state = match self.state {
2839 State::Established => {
2840 match packet.header.space() {
2841 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2842 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2843 _ => {
2844 trace!("discarding unexpected pre-handshake packet");
2845 }
2846 }
2847 return Ok(());
2848 }
2849 State::Closed(_) => {
2850 for result in frame::Iter::new(packet.payload.freeze())? {
2851 let frame = match result {
2852 Ok(frame) => frame,
2853 Err(err) => {
2854 debug!("frame decoding error: {err:?}");
2855 continue;
2856 }
2857 };
2858
2859 if let Frame::Padding = frame {
2860 continue;
2861 };
2862
2863 self.stats.frame_rx.record(&frame);
2864
2865 if let Frame::Close(_) = frame {
2866 trace!("draining");
2867 self.state = State::Draining;
2868 break;
2869 }
2870 }
2871 return Ok(());
2872 }
2873 State::Draining | State::Drained => return Ok(()),
2874 State::Handshake(ref mut state) => state,
2875 };
2876
2877 match packet.header {
2878 Header::Retry {
2879 src_cid: rem_cid, ..
2880 } => {
2881 if self.side.is_server() {
2882 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2883 }
2884
2885 if self.total_authed_packets > 1
2886 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2888 &self.rem_cids.active(),
2889 &packet.header_data,
2890 &packet.payload,
2891 )
2892 {
2893 trace!("discarding invalid Retry");
2894 return Ok(());
2902 }
2903
2904 trace!("retrying with CID {}", rem_cid);
2905 let client_hello = state.client_hello.take().unwrap();
2906 self.retry_src_cid = Some(rem_cid);
2907 self.rem_cids.update_initial_cid(rem_cid);
2908 self.rem_handshake_cid = rem_cid;
2909
2910 let space = &mut self.spaces[SpaceId::Initial];
2911 if let Some(info) = space.take(0) {
2912 self.on_packet_acked(now, 0, info);
2913 };
2914
2915 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
2917 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2918 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2919 crypto_offset: client_hello.len() as u64,
2920 ..PacketSpace::new(now)
2921 };
2922 self.spaces[SpaceId::Initial]
2923 .pending
2924 .crypto
2925 .push_back(frame::Crypto {
2926 offset: 0,
2927 data: client_hello,
2928 });
2929
2930 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2932 for (pn, info) in zero_rtt {
2933 self.remove_in_flight(pn, &info);
2934 self.spaces[SpaceId::Data].pending |= info.retransmits;
2935 }
2936 self.streams.retransmit_all_for_0rtt();
2937
2938 let token_len = packet.payload.len() - 16;
2939 let ConnectionSide::Client { ref mut token, .. } = self.side else {
2940 unreachable!("we already short-circuited if we're server");
2941 };
2942 *token = packet.payload.freeze().split_to(token_len);
2943 self.state = State::Handshake(state::Handshake {
2944 expected_token: Bytes::new(),
2945 rem_cid_set: false,
2946 client_hello: None,
2947 });
2948 Ok(())
2949 }
2950 Header::Long {
2951 ty: LongType::Handshake,
2952 src_cid: rem_cid,
2953 ..
2954 } => {
2955 if rem_cid != self.rem_handshake_cid {
2956 debug!(
2957 "discarding packet with mismatched remote CID: {} != {}",
2958 self.rem_handshake_cid, rem_cid
2959 );
2960 return Ok(());
2961 }
2962 self.on_path_validated();
2963
2964 self.process_early_payload(now, packet)?;
2965 if self.state.is_closed() {
2966 return Ok(());
2967 }
2968
2969 if self.crypto.is_handshaking() {
2970 trace!("handshake ongoing");
2971 return Ok(());
2972 }
2973
2974 if self.side.is_client() {
2975 let params =
2977 self.crypto
2978 .transport_parameters()?
2979 .ok_or_else(|| TransportError {
2980 code: TransportErrorCode::crypto(0x6d),
2981 frame: None,
2982 reason: "transport parameters missing".into(),
2983 })?;
2984
2985 if self.has_0rtt() {
2986 if !self.crypto.early_data_accepted().unwrap() {
2987 debug_assert!(self.side.is_client());
2988 debug!("0-RTT rejected");
2989 self.accepted_0rtt = false;
2990 self.streams.zero_rtt_rejected();
2991
2992 self.spaces[SpaceId::Data].pending = Retransmits::default();
2994
2995 let sent_packets =
2997 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2998 for (pn, packet) in sent_packets {
2999 self.remove_in_flight(pn, &packet);
3000 }
3001 } else {
3002 self.accepted_0rtt = true;
3003 params.validate_resumption_from(&self.peer_params)?;
3004 }
3005 }
3006 if let Some(token) = params.stateless_reset_token {
3007 self.endpoint_events
3008 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
3009 }
3010 self.handle_peer_params(params)?;
3011 self.issue_first_cids(now);
3012 } else {
3013 self.spaces[SpaceId::Data].pending.handshake_done = true;
3015 self.discard_space(now, SpaceId::Handshake);
3016 }
3017
3018 self.events.push_back(Event::Connected);
3019 self.state = State::Established;
3020 trace!("established");
3021 Ok(())
3022 }
3023 Header::Initial(InitialHeader {
3024 src_cid: rem_cid, ..
3025 }) => {
3026 if !state.rem_cid_set {
3027 trace!("switching remote CID to {}", rem_cid);
3028 let mut state = state.clone();
3029 self.rem_cids.update_initial_cid(rem_cid);
3030 self.rem_handshake_cid = rem_cid;
3031 self.orig_rem_cid = rem_cid;
3032 state.rem_cid_set = true;
3033 self.state = State::Handshake(state);
3034 } else if rem_cid != self.rem_handshake_cid {
3035 debug!(
3036 "discarding packet with mismatched remote CID: {} != {}",
3037 self.rem_handshake_cid, rem_cid
3038 );
3039 return Ok(());
3040 }
3041
3042 let starting_space = self.highest_space;
3043 self.process_early_payload(now, packet)?;
3044
3045 if self.side.is_server()
3046 && starting_space == SpaceId::Initial
3047 && self.highest_space != SpaceId::Initial
3048 {
3049 let params =
3050 self.crypto
3051 .transport_parameters()?
3052 .ok_or_else(|| TransportError {
3053 code: TransportErrorCode::crypto(0x6d),
3054 frame: None,
3055 reason: "transport parameters missing".into(),
3056 })?;
3057 self.handle_peer_params(params)?;
3058 self.issue_first_cids(now);
3059 self.init_0rtt();
3060 }
3061 Ok(())
3062 }
3063 Header::Long {
3064 ty: LongType::ZeroRtt,
3065 ..
3066 } => {
3067 self.process_payload(now, remote, number.unwrap(), packet)?;
3068 Ok(())
3069 }
3070 Header::VersionNegotiate { .. } => {
3071 if self.total_authed_packets > 1 {
3072 return Ok(());
3073 }
3074 let supported = packet
3075 .payload
3076 .chunks(4)
3077 .any(|x| match <[u8; 4]>::try_from(x) {
3078 Ok(version) => self.version == u32::from_be_bytes(version),
3079 Err(_) => false,
3080 });
3081 if supported {
3082 return Ok(());
3083 }
3084 debug!("remote doesn't support our version");
3085 Err(ConnectionError::VersionMismatch)
3086 }
3087 Header::Short { .. } => unreachable!(
3088 "short packets received during handshake are discarded in handle_packet"
3089 ),
3090 }
3091 }
3092
3093 fn process_early_payload(
3095 &mut self,
3096 now: Instant,
3097 packet: Packet,
3098 ) -> Result<(), TransportError> {
3099 debug_assert_ne!(packet.header.space(), SpaceId::Data);
3100 let payload_len = packet.payload.len();
3101 let mut ack_eliciting = false;
3102 for result in frame::Iter::new(packet.payload.freeze())? {
3103 let frame = result?;
3104 let span = match frame {
3105 Frame::Padding => continue,
3106 _ => Some(trace_span!("frame", ty = %frame.ty())),
3107 };
3108
3109 self.stats.frame_rx.record(&frame);
3110
3111 let _guard = span.as_ref().map(|x| x.enter());
3112 ack_eliciting |= frame.is_ack_eliciting();
3113
3114 match frame {
3116 Frame::Padding | Frame::Ping => {}
3117 Frame::Crypto(frame) => {
3118 self.read_crypto(packet.header.space(), &frame, payload_len)?;
3119 }
3120 Frame::Ack(ack) => {
3121 self.on_ack_received(now, packet.header.space(), ack)?;
3122 }
3123 Frame::Close(reason) => {
3124 self.error = Some(reason.into());
3125 self.state = State::Draining;
3126 return Ok(());
3127 }
3128 _ => {
3129 let mut err =
3130 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
3131 err.frame = Some(frame.ty());
3132 return Err(err);
3133 }
3134 }
3135 }
3136
3137 if ack_eliciting {
3138 self.spaces[packet.header.space()]
3140 .pending_acks
3141 .set_immediate_ack_required();
3142 }
3143
3144 self.write_crypto();
3145 Ok(())
3146 }
3147
3148 fn process_payload(
3149 &mut self,
3150 now: Instant,
3151 remote: SocketAddr,
3152 number: u64,
3153 packet: Packet,
3154 ) -> Result<(), TransportError> {
3155 let payload = packet.payload.freeze();
3156 let mut is_probing_packet = true;
3157 let mut close = None;
3158 let payload_len = payload.len();
3159 let mut ack_eliciting = false;
3160 for result in frame::Iter::new(payload)? {
3161 let frame = result?;
3162 let span = match frame {
3163 Frame::Padding => continue,
3164 _ => Some(trace_span!("frame", ty = %frame.ty())),
3165 };
3166
3167 self.stats.frame_rx.record(&frame);
3168 match &frame {
3171 Frame::Crypto(f) => {
3172 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
3173 }
3174 Frame::Stream(f) => {
3175 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
3176 }
3177 Frame::Datagram(f) => {
3178 trace!(len = f.data.len(), "got datagram frame");
3179 }
3180 f => {
3181 trace!("got frame {:?}", f);
3182 }
3183 }
3184
3185 let _guard = span.as_ref().map(|x| x.enter());
3186 if packet.header.is_0rtt() {
3187 match frame {
3188 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
3189 return Err(TransportError::PROTOCOL_VIOLATION(
3190 "illegal frame type in 0-RTT",
3191 ));
3192 }
3193 _ => {}
3194 }
3195 }
3196 ack_eliciting |= frame.is_ack_eliciting();
3197
3198 match frame {
3200 Frame::Padding
3201 | Frame::PathChallenge(_)
3202 | Frame::PathResponse(_)
3203 | Frame::NewConnectionId(_) => {}
3204 _ => {
3205 is_probing_packet = false;
3206 }
3207 }
3208 match frame {
3209 Frame::Crypto(frame) => {
3210 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
3211 }
3212 Frame::Stream(frame) => {
3213 if self.streams.received(frame, payload_len)?.should_transmit() {
3214 self.spaces[SpaceId::Data].pending.max_data = true;
3215 }
3216 }
3217 Frame::Ack(ack) => {
3218 self.on_ack_received(now, SpaceId::Data, ack)?;
3219 }
3220 Frame::Padding | Frame::Ping => {}
3221 Frame::Close(reason) => {
3222 close = Some(reason);
3223 }
3224 Frame::PathChallenge(token) => {
3225 self.path_responses.push(number, token, remote);
3226 if remote == self.path.remote {
3227 match self.peer_supports_ack_frequency() {
3230 true => self.immediate_ack(),
3231 false => self.ping(),
3232 }
3233 }
3234 }
3235 Frame::PathResponse(token) => {
3236 if self.path.challenge == Some(token) && remote == self.path.remote {
3237 trace!("new path validated");
3238 self.timers.stop(Timer::PathValidation);
3239 self.path.challenge = None;
3240 self.path.validated = true;
3241 if let Some((_, ref mut prev_path)) = self.prev_path {
3242 prev_path.challenge = None;
3243 prev_path.challenge_pending = false;
3244 }
3245 self.on_path_validated();
3246 } else if let Some(nat_traversal) = &mut self.nat_traversal {
3247 match nat_traversal.handle_validation_success(remote, token, now) {
3249 Ok(sequence) => {
3250 trace!(
3251 "NAT traversal candidate {} validated for sequence {}",
3252 remote, sequence
3253 );
3254
3255 if nat_traversal.handle_coordination_success(remote, now) {
3257 trace!("Coordination succeeded via {}", remote);
3258
3259 let can_migrate = match &self.side {
3261 ConnectionSide::Client { .. } => true, ConnectionSide::Server { server_config } => {
3263 server_config.migration
3264 }
3265 };
3266
3267 if can_migrate {
3268 let best_pairs = nat_traversal.get_best_succeeded_pairs();
3270 if let Some(best) = best_pairs.first() {
3271 if best.remote_addr == remote
3272 && best.remote_addr != self.path.remote
3273 {
3274 debug!(
3275 "NAT traversal found better path, initiating migration"
3276 );
3277 if let Err(e) =
3279 self.migrate_to_nat_traversal_path(now)
3280 {
3281 warn!(
3282 "Failed to migrate to NAT traversal path: {:?}",
3283 e
3284 );
3285 }
3286 }
3287 }
3288 }
3289 } else {
3290 if nat_traversal.mark_pair_succeeded(remote) {
3292 trace!("NAT traversal pair succeeded for {}", remote);
3293 }
3294 }
3295 }
3296 Err(NatTraversalError::ChallengeMismatch) => {
3297 debug!(
3298 "PATH_RESPONSE challenge mismatch for NAT candidate {}",
3299 remote
3300 );
3301 }
3302 Err(e) => {
3303 debug!("NAT traversal validation error: {}", e);
3304 }
3305 }
3306 } else {
3307 debug!(token, "ignoring invalid PATH_RESPONSE");
3308 }
3309 }
3310 Frame::MaxData(bytes) => {
3311 self.streams.received_max_data(bytes);
3312 }
3313 Frame::MaxStreamData { id, offset } => {
3314 self.streams.received_max_stream_data(id, offset)?;
3315 }
3316 Frame::MaxStreams { dir, count } => {
3317 self.streams.received_max_streams(dir, count)?;
3318 }
3319 Frame::ResetStream(frame) => {
3320 if self.streams.received_reset(frame)?.should_transmit() {
3321 self.spaces[SpaceId::Data].pending.max_data = true;
3322 }
3323 }
3324 Frame::DataBlocked { offset } => {
3325 debug!(offset, "peer claims to be blocked at connection level");
3326 }
3327 Frame::StreamDataBlocked { id, offset } => {
3328 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
3329 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
3330 return Err(TransportError::STREAM_STATE_ERROR(
3331 "STREAM_DATA_BLOCKED on send-only stream",
3332 ));
3333 }
3334 debug!(
3335 stream = %id,
3336 offset, "peer claims to be blocked at stream level"
3337 );
3338 }
3339 Frame::StreamsBlocked { dir, limit } => {
3340 if limit > MAX_STREAM_COUNT {
3341 return Err(TransportError::FRAME_ENCODING_ERROR(
3342 "unrepresentable stream limit",
3343 ));
3344 }
3345 debug!(
3346 "peer claims to be blocked opening more than {} {} streams",
3347 limit, dir
3348 );
3349 }
3350 Frame::StopSending(frame::StopSending { id, error_code }) => {
3351 if id.initiator() != self.side.side() {
3352 if id.dir() == Dir::Uni {
3353 debug!("got STOP_SENDING on recv-only {}", id);
3354 return Err(TransportError::STREAM_STATE_ERROR(
3355 "STOP_SENDING on recv-only stream",
3356 ));
3357 }
3358 } else if self.streams.is_local_unopened(id) {
3359 return Err(TransportError::STREAM_STATE_ERROR(
3360 "STOP_SENDING on unopened stream",
3361 ));
3362 }
3363 self.streams.received_stop_sending(id, error_code);
3364 }
3365 Frame::RetireConnectionId { sequence } => {
3366 let allow_more_cids = self
3367 .local_cid_state
3368 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
3369 self.endpoint_events
3370 .push_back(EndpointEventInner::RetireConnectionId(
3371 now,
3372 sequence,
3373 allow_more_cids,
3374 ));
3375 }
3376 Frame::NewConnectionId(frame) => {
3377 trace!(
3378 sequence = frame.sequence,
3379 id = %frame.id,
3380 retire_prior_to = frame.retire_prior_to,
3381 );
3382 if self.rem_cids.active().is_empty() {
3383 return Err(TransportError::PROTOCOL_VIOLATION(
3384 "NEW_CONNECTION_ID when CIDs aren't in use",
3385 ));
3386 }
3387 if frame.retire_prior_to > frame.sequence {
3388 return Err(TransportError::PROTOCOL_VIOLATION(
3389 "NEW_CONNECTION_ID retiring unissued CIDs",
3390 ));
3391 }
3392
3393 use crate::cid_queue::InsertError;
3394 match self.rem_cids.insert(frame) {
3395 Ok(None) => {}
3396 Ok(Some((retired, reset_token))) => {
3397 let pending_retired =
3398 &mut self.spaces[SpaceId::Data].pending.retire_cids;
3399 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
3402 if (pending_retired.len() as u64)
3405 .saturating_add(retired.end.saturating_sub(retired.start))
3406 > MAX_PENDING_RETIRED_CIDS
3407 {
3408 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
3409 "queued too many retired CIDs",
3410 ));
3411 }
3412 pending_retired.extend(retired);
3413 self.set_reset_token(reset_token);
3414 }
3415 Err(InsertError::ExceedsLimit) => {
3416 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
3417 }
3418 Err(InsertError::Retired) => {
3419 trace!("discarding already-retired");
3420 self.spaces[SpaceId::Data]
3424 .pending
3425 .retire_cids
3426 .push(frame.sequence);
3427 continue;
3428 }
3429 };
3430
3431 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
3432 self.update_rem_cid();
3435 }
3436 }
3437 Frame::NewToken(NewToken { token }) => {
3438 let ConnectionSide::Client {
3439 token_store,
3440 server_name,
3441 ..
3442 } = &self.side
3443 else {
3444 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
3445 };
3446 if token.is_empty() {
3447 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
3448 }
3449 trace!("got new token");
3450 token_store.insert(server_name, token);
3451 }
3452 Frame::Datagram(datagram) => {
3453 if self
3454 .datagrams
3455 .received(datagram, &self.config.datagram_receive_buffer_size)?
3456 {
3457 self.events.push_back(Event::DatagramReceived);
3458 }
3459 }
3460 Frame::AckFrequency(ack_frequency) => {
3461 let space = &mut self.spaces[SpaceId::Data];
3463
3464 if !self
3465 .ack_frequency
3466 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3467 {
3468 continue;
3470 }
3471
3472 if let Some(timeout) = space
3475 .pending_acks
3476 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3477 {
3478 self.timers.set(Timer::MaxAckDelay, timeout);
3479 }
3480 }
3481 Frame::ImmediateAck => {
3482 self.spaces[SpaceId::Data]
3484 .pending_acks
3485 .set_immediate_ack_required();
3486 }
3487 Frame::HandshakeDone => {
3488 if self.side.is_server() {
3489 return Err(TransportError::PROTOCOL_VIOLATION(
3490 "client sent HANDSHAKE_DONE",
3491 ));
3492 }
3493 if self.spaces[SpaceId::Handshake].crypto.is_some() {
3494 self.discard_space(now, SpaceId::Handshake);
3495 }
3496 }
3497 Frame::AddAddress(add_address) => {
3498 self.handle_add_address(&add_address, now)?;
3499 }
3500 Frame::PunchMeNow(punch_me_now) => {
3501 self.handle_punch_me_now(&punch_me_now, now)?;
3502 }
3503 Frame::RemoveAddress(remove_address) => {
3504 self.handle_remove_address(&remove_address)?;
3505 }
3506 Frame::ObservedAddress(observed_address) => {
3507 self.handle_observed_address_frame(&observed_address, now)?;
3508 }
3509 }
3510 }
3511
3512 let space = &mut self.spaces[SpaceId::Data];
3513 if space
3514 .pending_acks
3515 .packet_received(now, number, ack_eliciting, &space.dedup)
3516 {
3517 self.timers
3518 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3519 }
3520
3521 let pending = &mut self.spaces[SpaceId::Data].pending;
3526 self.streams.queue_max_stream_id(pending);
3527
3528 if let Some(reason) = close {
3529 self.error = Some(reason.into());
3530 self.state = State::Draining;
3531 self.close = true;
3532 }
3533
3534 if remote != self.path.remote
3535 && !is_probing_packet
3536 && number == self.spaces[SpaceId::Data].rx_packet
3537 {
3538 let ConnectionSide::Server { ref server_config } = self.side else {
3539 panic!("packets from unknown remote should be dropped by clients");
3540 };
3541 debug_assert!(
3542 server_config.migration,
3543 "migration-initiating packets should have been dropped immediately"
3544 );
3545 self.migrate(now, remote);
3546 self.update_rem_cid();
3548 self.spin = false;
3549 }
3550
3551 Ok(())
3552 }
3553
3554 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3555 trace!(%remote, "migration initiated");
3556 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3560 PathData::from_previous(remote, &self.path, now)
3561 } else {
3562 let peer_max_udp_payload_size =
3563 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3564 .unwrap_or(u16::MAX);
3565 PathData::new(
3566 remote,
3567 self.allow_mtud,
3568 Some(peer_max_udp_payload_size),
3569 now,
3570 &self.config,
3571 )
3572 };
3573 new_path.challenge = Some(self.rng.gen());
3574 new_path.challenge_pending = true;
3575 let prev_pto = self.pto(SpaceId::Data);
3576
3577 let mut prev = mem::replace(&mut self.path, new_path);
3578 if prev.challenge.is_none() {
3580 prev.challenge = Some(self.rng.gen());
3581 prev.challenge_pending = true;
3582 self.prev_path = Some((self.rem_cids.active(), prev));
3585 }
3586
3587 self.timers.set(
3588 Timer::PathValidation,
3589 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3590 );
3591 }
3592
3593 pub fn local_address_changed(&mut self) {
3595 self.update_rem_cid();
3596 self.ping();
3597 }
3598
3599 pub fn migrate_to_nat_traversal_path(&mut self, now: Instant) -> Result<(), TransportError> {
3601 let (remote_addr, local_addr) = {
3603 let nat_state = self
3604 .nat_traversal
3605 .as_ref()
3606 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
3607
3608 let best_pairs = nat_state.get_best_succeeded_pairs();
3610 if best_pairs.is_empty() {
3611 return Err(TransportError::PROTOCOL_VIOLATION(
3612 "No validated NAT traversal paths",
3613 ));
3614 }
3615
3616 let best_path = best_pairs
3618 .iter()
3619 .find(|pair| pair.remote_addr != self.path.remote)
3620 .or_else(|| best_pairs.first());
3621
3622 let best_path = best_path.ok_or_else(|| {
3623 TransportError::PROTOCOL_VIOLATION("No suitable NAT traversal path")
3624 })?;
3625
3626 debug!(
3627 "Migrating to NAT traversal path: {} -> {} (priority: {})",
3628 self.path.remote, best_path.remote_addr, best_path.priority
3629 );
3630
3631 (best_path.remote_addr, best_path.local_addr)
3632 };
3633
3634 self.migrate(now, remote_addr);
3636
3637 if local_addr != SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0) {
3639 self.local_ip = Some(local_addr.ip());
3640 }
3641
3642 self.path.challenge_pending = true;
3644
3645 Ok(())
3646 }
3647
3648 fn update_rem_cid(&mut self) {
3650 let (reset_token, retired) = match self.rem_cids.next() {
3651 Some(x) => x,
3652 None => return,
3653 };
3654
3655 self.spaces[SpaceId::Data]
3657 .pending
3658 .retire_cids
3659 .extend(retired);
3660 self.set_reset_token(reset_token);
3661 }
3662
3663 fn set_reset_token(&mut self, reset_token: ResetToken) {
3664 self.endpoint_events
3665 .push_back(EndpointEventInner::ResetToken(
3666 self.path.remote,
3667 reset_token,
3668 ));
3669 self.peer_params.stateless_reset_token = Some(reset_token);
3670 }
3671
3672 fn issue_first_cids(&mut self, now: Instant) {
3674 if self.local_cid_state.cid_len() == 0 {
3675 return;
3676 }
3677
3678 let mut n = self.peer_params.issue_cids_limit() - 1;
3680 if let ConnectionSide::Server { server_config } = &self.side {
3681 if server_config.has_preferred_address() {
3682 n -= 1;
3684 }
3685 }
3686 self.endpoint_events
3687 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3688 }
3689
3690 fn populate_packet(
3691 &mut self,
3692 now: Instant,
3693 space_id: SpaceId,
3694 buf: &mut Vec<u8>,
3695 max_size: usize,
3696 pn: u64,
3697 ) -> SentFrames {
3698 let mut sent = SentFrames::default();
3699 let space = &mut self.spaces[space_id];
3700 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3701 space.pending_acks.maybe_ack_non_eliciting();
3702
3703 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3705 buf.write(frame::FrameType::HANDSHAKE_DONE);
3706 sent.retransmits.get_or_create().handshake_done = true;
3707 self.stats.frame_tx.handshake_done =
3709 self.stats.frame_tx.handshake_done.saturating_add(1);
3710 }
3711
3712 if mem::replace(&mut space.ping_pending, false) {
3714 trace!("PING");
3715 buf.write(frame::FrameType::PING);
3716 sent.non_retransmits = true;
3717 self.stats.frame_tx.ping += 1;
3718 }
3719
3720 if mem::replace(&mut space.immediate_ack_pending, false) {
3722 trace!("IMMEDIATE_ACK");
3723 buf.write(frame::FrameType::IMMEDIATE_ACK);
3724 sent.non_retransmits = true;
3725 self.stats.frame_tx.immediate_ack += 1;
3726 }
3727
3728 if space.pending_acks.can_send() {
3730 Self::populate_acks(
3731 now,
3732 self.receiving_ecn,
3733 &mut sent,
3734 space,
3735 buf,
3736 &mut self.stats,
3737 );
3738 }
3739
3740 if mem::replace(&mut space.pending.ack_frequency, false) {
3742 let sequence_number = self.ack_frequency.next_sequence_number();
3743
3744 let config = self.config.ack_frequency_config.as_ref().unwrap();
3746
3747 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3749 self.path.rtt.get(),
3750 config,
3751 &self.peer_params,
3752 );
3753
3754 trace!(?max_ack_delay, "ACK_FREQUENCY");
3755
3756 frame::AckFrequency {
3757 sequence: sequence_number,
3758 ack_eliciting_threshold: config.ack_eliciting_threshold,
3759 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3760 reordering_threshold: config.reordering_threshold,
3761 }
3762 .encode(buf);
3763
3764 sent.retransmits.get_or_create().ack_frequency = true;
3765
3766 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3767 self.stats.frame_tx.ack_frequency += 1;
3768 }
3769
3770 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3772 if let Some(token) = self.path.challenge {
3774 self.path.challenge_pending = false;
3776 sent.non_retransmits = true;
3777 sent.requires_padding = true;
3778 trace!("PATH_CHALLENGE {:08x}", token);
3779 buf.write(frame::FrameType::PATH_CHALLENGE);
3780 buf.write(token);
3781 self.stats.frame_tx.path_challenge += 1;
3782 }
3783
3784 }
3793
3794 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3796 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3797 sent.non_retransmits = true;
3798 sent.requires_padding = true;
3799 trace!("PATH_RESPONSE {:08x}", token);
3800 buf.write(frame::FrameType::PATH_RESPONSE);
3801 buf.write(token);
3802 self.stats.frame_tx.path_response += 1;
3803 }
3804 }
3805
3806 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3808 let mut frame = match space.pending.crypto.pop_front() {
3809 Some(x) => x,
3810 None => break,
3811 };
3812
3813 let max_crypto_data_size = max_size
3818 - buf.len()
3819 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3821 - 2; let len = frame
3824 .data
3825 .len()
3826 .min(2usize.pow(14) - 1)
3827 .min(max_crypto_data_size);
3828
3829 let data = frame.data.split_to(len);
3830 let truncated = frame::Crypto {
3831 offset: frame.offset,
3832 data,
3833 };
3834 trace!(
3835 "CRYPTO: off {} len {}",
3836 truncated.offset,
3837 truncated.data.len()
3838 );
3839 truncated.encode(buf);
3840 self.stats.frame_tx.crypto += 1;
3841 sent.retransmits.get_or_create().crypto.push_back(truncated);
3842 if !frame.data.is_empty() {
3843 frame.offset += len as u64;
3844 space.pending.crypto.push_front(frame);
3845 }
3846 }
3847
3848 if space_id == SpaceId::Data {
3849 self.streams.write_control_frames(
3850 buf,
3851 &mut space.pending,
3852 &mut sent.retransmits,
3853 &mut self.stats.frame_tx,
3854 max_size,
3855 );
3856 }
3857
3858 while buf.len() + 44 < max_size {
3860 let issued = match space.pending.new_cids.pop() {
3861 Some(x) => x,
3862 None => break,
3863 };
3864 trace!(
3865 sequence = issued.sequence,
3866 id = %issued.id,
3867 "NEW_CONNECTION_ID"
3868 );
3869 frame::NewConnectionId {
3870 sequence: issued.sequence,
3871 retire_prior_to: self.local_cid_state.retire_prior_to(),
3872 id: issued.id,
3873 reset_token: issued.reset_token,
3874 }
3875 .encode(buf);
3876 sent.retransmits.get_or_create().new_cids.push(issued);
3877 self.stats.frame_tx.new_connection_id += 1;
3878 }
3879
3880 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3882 let seq = match space.pending.retire_cids.pop() {
3883 Some(x) => x,
3884 None => break,
3885 };
3886 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3887 buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3888 buf.write_var(seq);
3889 sent.retransmits.get_or_create().retire_cids.push(seq);
3890 self.stats.frame_tx.retire_connection_id += 1;
3891 }
3892
3893 let mut sent_datagrams = false;
3895 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3896 match self.datagrams.write(buf, max_size) {
3897 true => {
3898 sent_datagrams = true;
3899 sent.non_retransmits = true;
3900 self.stats.frame_tx.datagram += 1;
3901 }
3902 false => break,
3903 }
3904 }
3905 if self.datagrams.send_blocked && sent_datagrams {
3906 self.events.push_back(Event::DatagramsUnblocked);
3907 self.datagrams.send_blocked = false;
3908 }
3909
3910 while let Some(remote_addr) = space.pending.new_tokens.pop() {
3912 debug_assert_eq!(space_id, SpaceId::Data);
3913 let ConnectionSide::Server { server_config } = &self.side else {
3914 panic!("NEW_TOKEN frames should not be enqueued by clients");
3915 };
3916
3917 if remote_addr != self.path.remote {
3918 continue;
3923 }
3924
3925 let token = Token::new(
3926 TokenPayload::Validation {
3927 ip: remote_addr.ip(),
3928 issued: server_config.time_source.now(),
3929 },
3930 &mut self.rng,
3931 );
3932 let new_token = NewToken {
3933 token: token.encode(&*server_config.token_key).into(),
3934 };
3935
3936 if buf.len() + new_token.size() >= max_size {
3937 space.pending.new_tokens.push(remote_addr);
3938 break;
3939 }
3940
3941 new_token.encode(buf);
3942 sent.retransmits
3943 .get_or_create()
3944 .new_tokens
3945 .push(remote_addr);
3946 self.stats.frame_tx.new_token += 1;
3947 }
3948
3949 while buf.len() + frame::AddAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3951 let add_address = match space.pending.add_addresses.pop() {
3952 Some(x) => x,
3953 None => break,
3954 };
3955 trace!(
3956 sequence = %add_address.sequence,
3957 address = %add_address.address,
3958 "ADD_ADDRESS"
3959 );
3960 add_address.encode(buf);
3961 sent.retransmits
3962 .get_or_create()
3963 .add_addresses
3964 .push(add_address);
3965 self.stats.frame_tx.add_address += 1;
3966 }
3967
3968 while buf.len() + frame::PunchMeNow::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3970 let punch_me_now = match space.pending.punch_me_now.pop() {
3971 Some(x) => x,
3972 None => break,
3973 };
3974 trace!(
3975 round = %punch_me_now.round,
3976 target_sequence = %punch_me_now.target_sequence,
3977 "PUNCH_ME_NOW"
3978 );
3979 punch_me_now.encode(buf);
3980 sent.retransmits
3981 .get_or_create()
3982 .punch_me_now
3983 .push(punch_me_now);
3984 self.stats.frame_tx.punch_me_now += 1;
3985 }
3986
3987 while buf.len() + frame::RemoveAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3989 let remove_address = match space.pending.remove_addresses.pop() {
3990 Some(x) => x,
3991 None => break,
3992 };
3993 trace!(
3994 sequence = %remove_address.sequence,
3995 "REMOVE_ADDRESS"
3996 );
3997 remove_address.encode(buf);
3998 sent.retransmits
3999 .get_or_create()
4000 .remove_addresses
4001 .push(remove_address);
4002 self.stats.frame_tx.remove_address += 1;
4003 }
4004
4005 while buf.len() + frame::ObservedAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4007 let observed_address = match space.pending.observed_addresses.pop() {
4008 Some(x) => x,
4009 None => break,
4010 };
4011 trace!(
4012 address = %observed_address.address,
4013 "OBSERVED_ADDRESS"
4014 );
4015 observed_address.encode(buf);
4016 sent.retransmits
4017 .get_or_create()
4018 .observed_addresses
4019 .push(observed_address);
4020 self.stats.frame_tx.observed_address += 1;
4021 }
4022
4023 if space_id == SpaceId::Data {
4025 sent.stream_frames =
4026 self.streams
4027 .write_stream_frames(buf, max_size, self.config.send_fairness);
4028 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
4029 }
4030
4031 sent
4032 }
4033
4034 fn populate_acks(
4039 now: Instant,
4040 receiving_ecn: bool,
4041 sent: &mut SentFrames,
4042 space: &mut PacketSpace,
4043 buf: &mut Vec<u8>,
4044 stats: &mut ConnectionStats,
4045 ) {
4046 debug_assert!(!space.pending_acks.ranges().is_empty());
4047
4048 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
4050 let ecn = if receiving_ecn {
4051 Some(&space.ecn_counters)
4052 } else {
4053 None
4054 };
4055 sent.largest_acked = space.pending_acks.ranges().max();
4056
4057 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
4058
4059 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
4061 let delay = delay_micros >> ack_delay_exp.into_inner();
4062
4063 trace!(
4064 "ACK {:?}, Delay = {}us",
4065 space.pending_acks.ranges(),
4066 delay_micros
4067 );
4068
4069 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
4070 stats.frame_tx.acks += 1;
4071 }
4072
4073 fn close_common(&mut self) {
4074 trace!("connection closed");
4075 for &timer in &Timer::VALUES {
4076 self.timers.stop(timer);
4077 }
4078 }
4079
4080 fn set_close_timer(&mut self, now: Instant) {
4081 self.timers
4082 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
4083 }
4084
4085 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
4087 if Some(self.orig_rem_cid) != params.initial_src_cid
4088 || (self.side.is_client()
4089 && (Some(self.initial_dst_cid) != params.original_dst_cid
4090 || self.retry_src_cid != params.retry_src_cid))
4091 {
4092 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
4093 "CID authentication failure",
4094 ));
4095 }
4096
4097 self.set_peer_params(params);
4098
4099 Ok(())
4100 }
4101
4102 fn set_peer_params(&mut self, params: TransportParameters) {
4103 self.streams.set_params(¶ms);
4104 self.idle_timeout =
4105 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
4106 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
4107 if let Some(ref info) = params.preferred_address {
4108 self.rem_cids.insert(frame::NewConnectionId {
4109 sequence: 1,
4110 id: info.connection_id,
4111 reset_token: info.stateless_reset_token,
4112 retire_prior_to: 0,
4113 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
4114 }
4115 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
4116
4117 self.negotiate_nat_traversal_capability(¶ms);
4119
4120 self.negotiate_address_discovery(¶ms);
4122
4123 self.peer_params = params;
4124 self.path.mtud.on_peer_max_udp_payload_size_received(
4125 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
4126 );
4127 }
4128
4129 fn negotiate_nat_traversal_capability(&mut self, params: &TransportParameters) {
4131 let peer_nat_config = match ¶ms.nat_traversal {
4133 Some(config) => config,
4134 None => {
4135 if self.config.nat_traversal_config.is_some() {
4137 debug!(
4138 "Peer does not support NAT traversal, maintaining backward compatibility"
4139 );
4140 self.emit_nat_traversal_capability_event(false);
4141
4142 self.set_nat_traversal_compatibility_mode(false);
4144 }
4145 return;
4146 }
4147 };
4148
4149 let local_nat_config = match &self.config.nat_traversal_config {
4151 Some(config) => config,
4152 None => {
4153 debug!("NAT traversal not enabled locally, ignoring peer support");
4154 self.emit_nat_traversal_capability_event(false);
4155 self.set_nat_traversal_compatibility_mode(false);
4156 return;
4157 }
4158 };
4159
4160 info!("Both peers support NAT traversal, negotiating capabilities");
4162
4163 match self.negotiate_nat_traversal_parameters(local_nat_config, peer_nat_config) {
4165 Ok(negotiated_config) => {
4166 info!("NAT traversal capability negotiated successfully");
4167 self.emit_nat_traversal_capability_event(true);
4168
4169 self.init_nat_traversal_with_negotiated_config(&negotiated_config);
4171
4172 self.set_nat_traversal_compatibility_mode(true);
4174
4175 if matches!(
4177 negotiated_config.role,
4178 crate::transport_parameters::NatTraversalRole::Client
4179 ) {
4180 self.initiate_nat_traversal_process();
4181 }
4182 }
4183 Err(e) => {
4184 warn!("NAT traversal capability negotiation failed: {}", e);
4185 self.emit_nat_traversal_capability_event(false);
4186 self.set_nat_traversal_compatibility_mode(false);
4187 }
4188 }
4189 }
4190
4191 fn validate_nat_traversal_roles(
4193 &self,
4194 local_config: &crate::transport_parameters::NatTraversalConfig,
4195 peer_config: &crate::transport_parameters::NatTraversalConfig,
4196 ) -> Result<(), String> {
4197 match (&local_config.role, &peer_config.role) {
4199 (
4201 crate::transport_parameters::NatTraversalRole::Bootstrap,
4202 crate::transport_parameters::NatTraversalRole::Bootstrap,
4203 ) => {
4204 debug!("Both endpoints are bootstrap nodes - unusual but allowed");
4205 }
4206 (
4208 crate::transport_parameters::NatTraversalRole::Client,
4209 crate::transport_parameters::NatTraversalRole::Server { .. },
4210 )
4211 | (
4212 crate::transport_parameters::NatTraversalRole::Server { .. },
4213 crate::transport_parameters::NatTraversalRole::Client,
4214 ) => {
4215 debug!("Client-Server NAT traversal role combination");
4216 }
4217 (crate::transport_parameters::NatTraversalRole::Bootstrap, _)
4219 | (_, crate::transport_parameters::NatTraversalRole::Bootstrap) => {
4220 debug!("Bootstrap node coordination");
4221 }
4222 (
4224 crate::transport_parameters::NatTraversalRole::Client,
4225 crate::transport_parameters::NatTraversalRole::Client,
4226 ) => {
4227 debug!("Client-Client connection requires bootstrap coordination");
4228 }
4229 (
4231 crate::transport_parameters::NatTraversalRole::Server { .. },
4232 crate::transport_parameters::NatTraversalRole::Server { .. },
4233 ) => {
4234 debug!("Server-Server connection");
4235 }
4236 }
4237
4238 Ok(())
4239 }
4240
4241 fn emit_nat_traversal_capability_event(&mut self, negotiated: bool) {
4243 if negotiated {
4246 info!("NAT traversal capability successfully negotiated");
4247 } else {
4248 info!("NAT traversal capability not available (peer or local support missing)");
4249 }
4250
4251 }
4254
4255 fn set_nat_traversal_compatibility_mode(&mut self, enabled: bool) {
4257 if enabled {
4258 debug!("NAT traversal enabled for this connection");
4259 } else {
4261 debug!("NAT traversal disabled for this connection (backward compatibility mode)");
4262 if self.nat_traversal.is_some() {
4264 warn!("Clearing NAT traversal state due to compatibility mode");
4265 self.nat_traversal = None;
4266 }
4267 }
4268 }
4269
4270 fn negotiate_nat_traversal_parameters(
4272 &self,
4273 local_config: &crate::transport_parameters::NatTraversalConfig,
4274 peer_config: &crate::transport_parameters::NatTraversalConfig,
4275 ) -> Result<crate::transport_parameters::NatTraversalConfig, String> {
4276 self.validate_nat_traversal_roles(local_config, peer_config)?;
4278
4279 let negotiated_role = local_config.role;
4281
4282 let max_candidates = local_config
4284 .max_candidates
4285 .into_inner()
4286 .min(peer_config.max_candidates.into_inner())
4287 .min(50); let coordination_timeout = local_config
4290 .coordination_timeout
4291 .into_inner()
4292 .min(peer_config.coordination_timeout.into_inner())
4293 .min(30000); let max_concurrent_attempts = local_config
4296 .max_concurrent_attempts
4297 .into_inner()
4298 .min(peer_config.max_concurrent_attempts.into_inner())
4299 .min(10); let peer_id = local_config.peer_id.or(peer_config.peer_id);
4303
4304 let negotiated_config = crate::transport_parameters::NatTraversalConfig {
4305 role: negotiated_role,
4306 max_candidates: VarInt::from_u64(max_candidates).unwrap(),
4307 coordination_timeout: VarInt::from_u64(coordination_timeout).unwrap(),
4308 max_concurrent_attempts: VarInt::from_u64(max_concurrent_attempts).unwrap(),
4309 peer_id,
4310 };
4311
4312 negotiated_config
4314 .validate()
4315 .map_err(|e| format!("Negotiated configuration validation failed: {:?}", e))?;
4316
4317 debug!(
4318 "NAT traversal parameters negotiated: role={:?}, max_candidates={}, timeout={}ms, max_attempts={}",
4319 negotiated_role, max_candidates, coordination_timeout, max_concurrent_attempts
4320 );
4321
4322 Ok(negotiated_config)
4323 }
4324
4325 fn init_nat_traversal_with_negotiated_config(
4327 &mut self,
4328 config: &crate::transport_parameters::NatTraversalConfig,
4329 ) {
4330 let role = match config.role {
4332 crate::transport_parameters::NatTraversalRole::Client => NatTraversalRole::Client,
4333 crate::transport_parameters::NatTraversalRole::Server { can_relay } => {
4334 NatTraversalRole::Server { can_relay }
4335 }
4336 crate::transport_parameters::NatTraversalRole::Bootstrap => NatTraversalRole::Bootstrap,
4337 };
4338
4339 let max_candidates = config.max_candidates.into_inner().min(50) as u32;
4340 let coordination_timeout =
4341 Duration::from_millis(config.coordination_timeout.into_inner().min(30000));
4342
4343 self.nat_traversal = Some(NatTraversalState::new(
4345 role,
4346 max_candidates,
4347 coordination_timeout,
4348 ));
4349
4350 trace!(
4351 "NAT traversal initialized with negotiated config: role={:?}",
4352 role
4353 );
4354
4355 match role {
4357 NatTraversalRole::Bootstrap => {
4358 self.prepare_address_observation();
4360 }
4361 NatTraversalRole::Client => {
4362 self.schedule_candidate_discovery();
4364 }
4365 NatTraversalRole::Server { .. } => {
4366 self.prepare_coordination_handling();
4368 }
4369 }
4370 }
4371
4372 fn initiate_nat_traversal_process(&mut self) {
4374 if let Some(nat_state) = &mut self.nat_traversal {
4375 match nat_state.start_candidate_discovery() {
4376 Ok(()) => {
4377 debug!("NAT traversal process initiated - candidate discovery started");
4378 self.timers.set(
4380 Timer::NatTraversal,
4381 Instant::now() + Duration::from_millis(100),
4382 );
4383 }
4384 Err(e) => {
4385 warn!("Failed to initiate NAT traversal process: {}", e);
4386 }
4387 }
4388 }
4389 }
4390
4391 fn prepare_address_observation(&mut self) {
4393 debug!("Preparing for address observation as bootstrap node");
4394 }
4397
4398 fn schedule_candidate_discovery(&mut self) {
4400 debug!("Scheduling candidate discovery for client endpoint");
4401 self.timers.set(
4403 Timer::NatTraversal,
4404 Instant::now() + Duration::from_millis(50),
4405 );
4406 }
4407
4408 fn prepare_coordination_handling(&mut self) {
4410 debug!("Preparing to handle coordination requests as server endpoint");
4411 }
4414
4415 fn handle_nat_traversal_timeout(&mut self, now: Instant) {
4417 let timeout_result = if let Some(nat_state) = &mut self.nat_traversal {
4419 nat_state.handle_timeout(now)
4420 } else {
4421 return;
4422 };
4423
4424 match timeout_result {
4426 Ok(actions) => {
4427 for action in actions {
4428 match action {
4429 nat_traversal::TimeoutAction::RetryDiscovery => {
4430 debug!("NAT traversal timeout: retrying candidate discovery");
4431 if let Some(nat_state) = &mut self.nat_traversal {
4432 if let Err(e) = nat_state.start_candidate_discovery() {
4433 warn!("Failed to retry candidate discovery: {}", e);
4434 }
4435 }
4436 }
4437 nat_traversal::TimeoutAction::RetryCoordination => {
4438 debug!("NAT traversal timeout: retrying coordination");
4439 self.timers
4441 .set(Timer::NatTraversal, now + Duration::from_secs(2));
4442 }
4443 nat_traversal::TimeoutAction::StartValidation => {
4444 debug!("NAT traversal timeout: starting path validation");
4445 self.start_nat_traversal_validation(now);
4446 }
4447 nat_traversal::TimeoutAction::Complete => {
4448 debug!("NAT traversal completed successfully");
4449 self.timers.stop(Timer::NatTraversal);
4451 }
4452 nat_traversal::TimeoutAction::Failed => {
4453 warn!("NAT traversal failed after timeout");
4454 self.handle_nat_traversal_failure();
4456 }
4457 }
4458 }
4459 }
4460 Err(e) => {
4461 warn!("NAT traversal timeout handling failed: {}", e);
4462 self.handle_nat_traversal_failure();
4463 }
4464 }
4465 }
4466
4467 fn start_nat_traversal_validation(&mut self, now: Instant) {
4469 if let Some(nat_state) = &mut self.nat_traversal {
4470 let pairs = nat_state.get_next_validation_pairs(3);
4472
4473 for pair in pairs {
4474 let challenge = self.rng.gen();
4476 self.path.challenge = Some(challenge);
4477 self.path.challenge_pending = true;
4478
4479 debug!(
4480 "Starting path validation for NAT traversal candidate: {}",
4481 pair.remote_addr
4482 );
4483 }
4484
4485 self.timers
4487 .set(Timer::PathValidation, now + Duration::from_secs(3));
4488 }
4489 }
4490
4491 fn handle_nat_traversal_failure(&mut self) {
4493 warn!("NAT traversal failed, considering fallback options");
4494
4495 self.nat_traversal = None;
4497 self.timers.stop(Timer::NatTraversal);
4498
4499 debug!("NAT traversal disabled for this connection due to failure");
4506 }
4507
4508 pub fn nat_traversal_supported(&self) -> bool {
4510 self.nat_traversal.is_some()
4511 && self.config.nat_traversal_config.is_some()
4512 && self.peer_params.nat_traversal.is_some()
4513 }
4514
4515 pub fn nat_traversal_config(&self) -> Option<&crate::transport_parameters::NatTraversalConfig> {
4517 self.peer_params.nat_traversal.as_ref()
4518 }
4519
4520 pub fn nat_traversal_ready(&self) -> bool {
4522 self.nat_traversal_supported() && matches!(self.state, State::Established)
4523 }
4524
4525 #[allow(dead_code)]
4530 pub(crate) fn nat_traversal_stats(&self) -> Option<nat_traversal::NatTraversalStats> {
4531 self.nat_traversal.as_ref().map(|state| state.stats.clone())
4532 }
4533
4534 #[cfg(test)]
4536 pub(crate) fn force_enable_nat_traversal(&mut self, role: NatTraversalRole) {
4537 use crate::transport_parameters::{NatTraversalConfig, NatTraversalRole as TPRole};
4538
4539 let tp_role = match role {
4540 NatTraversalRole::Client => TPRole::Client,
4541 NatTraversalRole::Server { can_relay } => TPRole::Server { can_relay },
4542 NatTraversalRole::Bootstrap => TPRole::Bootstrap,
4543 };
4544
4545 let config = NatTraversalConfig {
4546 role: tp_role,
4547 max_candidates: VarInt::from_u32(8),
4548 coordination_timeout: VarInt::from_u32(10000),
4549 max_concurrent_attempts: VarInt::from_u32(3),
4550 peer_id: None,
4551 };
4552
4553 self.peer_params.nat_traversal = Some(config.clone());
4554 self.config = Arc::new({
4555 let mut transport_config = (*self.config).clone();
4556 transport_config.nat_traversal_config = Some(config);
4557 transport_config
4558 });
4559
4560 self.nat_traversal = Some(NatTraversalState::new(role, 8, Duration::from_secs(10)));
4561 }
4562
4563 fn derive_peer_id_from_connection(&self) -> [u8; 32] {
4566 let mut hasher = std::collections::hash_map::DefaultHasher::new();
4568 use std::hash::Hasher;
4569 hasher.write(&self.rem_handshake_cid);
4570 hasher.write(&self.handshake_cid);
4571 hasher.write(&self.path.remote.to_string().into_bytes());
4572 let hash = hasher.finish();
4573 let mut peer_id = [0u8; 32];
4574 peer_id[..8].copy_from_slice(&hash.to_be_bytes());
4575 let cid_bytes = self.rem_handshake_cid.as_ref();
4577 let copy_len = (cid_bytes.len()).min(24);
4578 peer_id[8..8 + copy_len].copy_from_slice(&cid_bytes[..copy_len]);
4579 peer_id
4580 }
4581
4582 fn handle_add_address(
4584 &mut self,
4585 add_address: &crate::frame::AddAddress,
4586 now: Instant,
4587 ) -> Result<(), TransportError> {
4588 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4589 TransportError::PROTOCOL_VIOLATION("AddAddress frame without NAT traversal negotiation")
4590 })?;
4591
4592 match nat_state.add_remote_candidate(
4593 add_address.sequence,
4594 add_address.address,
4595 add_address.priority,
4596 now,
4597 ) {
4598 Ok(()) => {
4599 trace!(
4600 "Added remote candidate: {} (seq={}, priority={})",
4601 add_address.address, add_address.sequence, add_address.priority
4602 );
4603
4604 self.trigger_candidate_validation(add_address.address, now)?;
4606 Ok(())
4607 }
4608 Err(NatTraversalError::TooManyCandidates) => Err(TransportError::PROTOCOL_VIOLATION(
4609 "too many NAT traversal candidates",
4610 )),
4611 Err(NatTraversalError::DuplicateAddress) => {
4612 Ok(())
4614 }
4615 Err(e) => {
4616 warn!("Failed to add remote candidate: {}", e);
4617 Ok(()) }
4619 }
4620 }
4621
4622 fn handle_punch_me_now(
4624 &mut self,
4625 punch_me_now: &crate::frame::PunchMeNow,
4626 now: Instant,
4627 ) -> Result<(), TransportError> {
4628 trace!(
4629 "Received PunchMeNow: round={}, target_seq={}, local_addr={}",
4630 punch_me_now.round, punch_me_now.target_sequence, punch_me_now.local_address
4631 );
4632
4633 if let Some(nat_state) = &self.nat_traversal {
4635 if matches!(nat_state.role, NatTraversalRole::Bootstrap) {
4636 let from_peer_id = self.derive_peer_id_from_connection();
4638
4639 let punch_me_now_clone = punch_me_now.clone();
4641 drop(nat_state); match self
4644 .nat_traversal
4645 .as_mut()
4646 .unwrap()
4647 .handle_punch_me_now_frame(
4648 from_peer_id,
4649 self.path.remote,
4650 &punch_me_now_clone,
4651 now,
4652 ) {
4653 Ok(Some(coordination_frame)) => {
4654 trace!("Bootstrap node coordinating PUNCH_ME_NOW between peers");
4655
4656 if let Some(target_peer_id) = punch_me_now.target_peer_id {
4658 self.endpoint_events.push_back(
4659 crate::shared::EndpointEventInner::RelayPunchMeNow(
4660 target_peer_id,
4661 coordination_frame,
4662 ),
4663 );
4664 }
4665
4666 return Ok(());
4667 }
4668 Ok(None) => {
4669 trace!("Bootstrap coordination completed or no action needed");
4670 return Ok(());
4671 }
4672 Err(e) => {
4673 warn!("Bootstrap coordination failed: {}", e);
4674 return Ok(());
4675 }
4676 }
4677 }
4678 }
4679
4680 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4682 TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation")
4683 })?;
4684
4685 if nat_state
4687 .handle_peer_punch_request(punch_me_now.round, now)
4688 .map_err(|_e| {
4689 TransportError::PROTOCOL_VIOLATION("Failed to handle peer punch request")
4690 })?
4691 {
4692 trace!("Coordination synchronized for round {}", punch_me_now.round);
4693
4694 let _local_addr = self
4697 .local_ip
4698 .map(|ip| SocketAddr::new(ip, 0))
4699 .unwrap_or_else(|| {
4700 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
4701 });
4702
4703 let target = nat_traversal::PunchTarget {
4704 remote_addr: punch_me_now.local_address,
4705 remote_sequence: punch_me_now.target_sequence,
4706 challenge: self.rng.gen(),
4707 };
4708
4709 nat_state.start_coordination_round(vec![target], now);
4711 } else {
4712 debug!(
4713 "Failed to synchronize coordination for round {}",
4714 punch_me_now.round
4715 );
4716 }
4717
4718 Ok(())
4719 }
4720
4721 fn handle_remove_address(
4723 &mut self,
4724 remove_address: &crate::frame::RemoveAddress,
4725 ) -> Result<(), TransportError> {
4726 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4727 TransportError::PROTOCOL_VIOLATION(
4728 "RemoveAddress frame without NAT traversal negotiation",
4729 )
4730 })?;
4731
4732 if nat_state.remove_candidate(remove_address.sequence) {
4733 trace!(
4734 "Removed candidate with sequence {}",
4735 remove_address.sequence
4736 );
4737 } else {
4738 trace!(
4739 "Attempted to remove unknown candidate sequence {}",
4740 remove_address.sequence
4741 );
4742 }
4743
4744 Ok(())
4745 }
4746
4747 fn handle_observed_address_frame(
4749 &mut self,
4750 observed_address: &crate::frame::ObservedAddress,
4751 now: Instant,
4752 ) -> Result<(), TransportError> {
4753 let state = self.address_discovery_state.as_mut().ok_or_else(|| {
4755 TransportError::PROTOCOL_VIOLATION(
4756 "ObservedAddress frame without address discovery negotiation",
4757 )
4758 })?;
4759
4760 if !state.enabled {
4762 return Err(TransportError::PROTOCOL_VIOLATION(
4763 "ObservedAddress frame received when address discovery is disabled",
4764 ));
4765 }
4766
4767 #[cfg(feature = "trace")]
4769 {
4770 use crate::tracing::*;
4771 use crate::trace_observed_address_received;
4772 trace_observed_address_received!(
4773 &self.event_log,
4774 self.trace_context.trace_id(),
4775 observed_address.address,
4776 0u64 );
4778 }
4779
4780 let path_id = 0u64; state.handle_observed_address(observed_address.address, path_id, now);
4785
4786 self.path.update_observed_address(observed_address.address, now);
4788
4789 trace!(
4791 "Received ObservedAddress frame: address={} for path={}",
4792 observed_address.address,
4793 path_id
4794 );
4795
4796 Ok(())
4797 }
4798
4799 pub fn queue_add_address(&mut self, sequence: VarInt, address: SocketAddr, priority: VarInt) {
4801 let add_address = frame::AddAddress {
4803 sequence,
4804 address,
4805 priority,
4806 };
4807
4808 self.spaces[SpaceId::Data]
4809 .pending
4810 .add_addresses
4811 .push(add_address);
4812 trace!(
4813 "Queued AddAddress frame: seq={}, addr={}, priority={}",
4814 sequence, address, priority
4815 );
4816 }
4817
4818 pub fn queue_punch_me_now(
4820 &mut self,
4821 round: VarInt,
4822 target_sequence: VarInt,
4823 local_address: SocketAddr,
4824 ) {
4825 let punch_me_now = frame::PunchMeNow {
4826 round,
4827 target_sequence,
4828 local_address,
4829 target_peer_id: None, };
4831
4832 self.spaces[SpaceId::Data]
4833 .pending
4834 .punch_me_now
4835 .push(punch_me_now);
4836 trace!(
4837 "Queued PunchMeNow frame: round={}, target={}",
4838 round, target_sequence
4839 );
4840 }
4841
4842 pub fn queue_remove_address(&mut self, sequence: VarInt) {
4844 let remove_address = frame::RemoveAddress { sequence };
4845
4846 self.spaces[SpaceId::Data]
4847 .pending
4848 .remove_addresses
4849 .push(remove_address);
4850 trace!("Queued RemoveAddress frame: seq={}", sequence);
4851 }
4852
4853 pub fn queue_observed_address(&mut self, address: SocketAddr) {
4855 let observed_address = frame::ObservedAddress { address };
4856 self.spaces[SpaceId::Data]
4857 .pending
4858 .observed_addresses
4859 .push(observed_address);
4860 trace!("Queued ObservedAddress frame: addr={}", address);
4861 }
4862
4863 pub fn check_for_address_observations(&mut self, now: Instant) {
4865 let Some(state) = &mut self.address_discovery_state else {
4867 return;
4868 };
4869
4870 if !state.enabled {
4872 return;
4873 }
4874
4875 let path_id = 0u64; let remote_address = self.path.remote;
4880
4881 if state.should_send_observation(path_id, now) {
4883 if let Some(frame) = state.queue_observed_address_frame(path_id, remote_address) {
4885 self.spaces[SpaceId::Data]
4887 .pending
4888 .observed_addresses
4889 .push(frame);
4890
4891 state.record_observation_sent(path_id);
4893
4894 #[cfg(feature = "trace")]
4896 {
4897 use crate::tracing::*;
4898 use crate::trace_observed_address_sent;
4899 trace_observed_address_sent!(
4900 &self.event_log,
4901 self.trace_context.trace_id(),
4902 remote_address,
4903 path_id
4904 );
4905 }
4906
4907 trace!("Queued OBSERVED_ADDRESS frame for path {} with address {}", path_id, remote_address);
4908 }
4909 }
4910 }
4911
4912 fn trigger_candidate_validation(
4914 &mut self,
4915 candidate_address: SocketAddr,
4916 now: Instant,
4917 ) -> Result<(), TransportError> {
4918 let nat_state = self
4919 .nat_traversal
4920 .as_mut()
4921 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
4922
4923 if nat_state
4925 .active_validations
4926 .contains_key(&candidate_address)
4927 {
4928 trace!("Validation already in progress for {}", candidate_address);
4929 return Ok(());
4930 }
4931
4932 let challenge = self.rng.gen::<u64>();
4934
4935 let validation_state = nat_traversal::PathValidationState {
4937 challenge,
4938 sent_at: now,
4939 retry_count: 0,
4940 max_retries: 3,
4941 coordination_round: None,
4942 timeout_state: nat_traversal::AdaptiveTimeoutState::new(),
4943 last_retry_at: None,
4944 };
4945
4946 nat_state
4948 .active_validations
4949 .insert(candidate_address, validation_state);
4950
4951 self.nat_traversal_challenges
4953 .push(candidate_address, challenge);
4954
4955 nat_state.stats.validations_succeeded += 1; trace!(
4959 "Triggered PATH_CHALLENGE validation for {} with challenge {:016x}",
4960 candidate_address, challenge
4961 );
4962
4963 Ok(())
4964 }
4965
4966 pub fn nat_traversal_state(&self) -> Option<(NatTraversalRole, usize, usize)> {
4968 self.nat_traversal.as_ref().map(|state| {
4969 (
4970 state.role,
4971 state.local_candidates.len(),
4972 state.remote_candidates.len(),
4973 )
4974 })
4975 }
4976
4977 pub fn initiate_nat_traversal_coordination(
4979 &mut self,
4980 now: Instant,
4981 ) -> Result<(), TransportError> {
4982 let nat_state = self
4983 .nat_traversal
4984 .as_mut()
4985 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
4986
4987 if nat_state.should_send_punch_request() {
4989 nat_state.generate_candidate_pairs(now);
4991
4992 let pairs = nat_state.get_next_validation_pairs(3);
4994 if pairs.is_empty() {
4995 return Err(TransportError::PROTOCOL_VIOLATION(
4996 "No candidate pairs for coordination",
4997 ));
4998 }
4999
5000 let targets: Vec<_> = pairs
5002 .into_iter()
5003 .map(|pair| nat_traversal::PunchTarget {
5004 remote_addr: pair.remote_addr,
5005 remote_sequence: pair.remote_sequence,
5006 challenge: self.rng.gen(),
5007 })
5008 .collect();
5009
5010 let round = nat_state
5012 .start_coordination_round(targets, now)
5013 .map_err(|_e| {
5014 TransportError::PROTOCOL_VIOLATION("Failed to start coordination round")
5015 })?;
5016
5017 let local_addr = self
5020 .local_ip
5021 .map(|ip| SocketAddr::new(ip, self.local_ip.map(|_| 0).unwrap_or(0)))
5022 .unwrap_or_else(|| {
5023 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
5024 });
5025
5026 let punch_me_now = frame::PunchMeNow {
5027 round,
5028 target_sequence: VarInt::from_u32(0), local_address: local_addr,
5030 target_peer_id: None, };
5032
5033 self.spaces[SpaceId::Data]
5034 .pending
5035 .punch_me_now
5036 .push(punch_me_now);
5037 nat_state.mark_punch_request_sent();
5038
5039 trace!("Initiated NAT traversal coordination round {}", round);
5040 }
5041
5042 Ok(())
5043 }
5044
5045 pub fn validate_nat_candidates(&mut self, now: Instant) {
5047 self.generate_nat_traversal_challenges(now);
5048 }
5049
5050 pub fn send_nat_address_advertisement(
5065 &mut self,
5066 address: SocketAddr,
5067 priority: u32,
5068 ) -> Result<u64, ConnectionError> {
5069 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5071 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5072 "NAT traversal not enabled on this connection",
5073 ))
5074 })?;
5075
5076 let sequence = nat_state.next_sequence;
5078 nat_state.next_sequence =
5079 VarInt::from_u64(nat_state.next_sequence.into_inner() + 1).unwrap();
5080
5081 let now = Instant::now();
5083 nat_state.local_candidates.insert(
5084 sequence,
5085 nat_traversal::AddressCandidate {
5086 address,
5087 priority,
5088 source: nat_traversal::CandidateSource::Local,
5089 discovered_at: now,
5090 state: nat_traversal::CandidateState::New,
5091 attempt_count: 0,
5092 last_attempt: None,
5093 },
5094 );
5095
5096 nat_state.stats.local_candidates_sent += 1;
5098
5099 self.queue_add_address(sequence, address, VarInt::from_u32(priority));
5101
5102 debug!(
5103 "Queued ADD_ADDRESS frame: addr={}, priority={}, seq={}",
5104 address, priority, sequence
5105 );
5106 Ok(sequence.into_inner())
5107 }
5108
5109 pub fn send_nat_punch_coordination(
5122 &mut self,
5123 target_sequence: u64,
5124 local_address: SocketAddr,
5125 round: u32,
5126 ) -> Result<(), ConnectionError> {
5127 let _nat_state = self.nat_traversal.as_ref().ok_or_else(|| {
5129 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5130 "NAT traversal not enabled on this connection",
5131 ))
5132 })?;
5133
5134 self.queue_punch_me_now(
5136 VarInt::from_u32(round),
5137 VarInt::from_u64(target_sequence).map_err(|_| {
5138 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5139 "Invalid target sequence number",
5140 ))
5141 })?,
5142 local_address,
5143 );
5144
5145 debug!(
5146 "Queued PUNCH_ME_NOW frame: target_seq={}, local_addr={}, round={}",
5147 target_sequence, local_address, round
5148 );
5149 Ok(())
5150 }
5151
5152 pub fn send_nat_address_removal(&mut self, sequence: u64) -> Result<(), ConnectionError> {
5163 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5165 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5166 "NAT traversal not enabled on this connection",
5167 ))
5168 })?;
5169
5170 let sequence_varint = VarInt::from_u64(sequence).map_err(|_| {
5171 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5172 "Invalid sequence number",
5173 ))
5174 })?;
5175
5176 nat_state.local_candidates.remove(&sequence_varint);
5178
5179 self.queue_remove_address(sequence_varint);
5181
5182 debug!("Queued REMOVE_ADDRESS frame: seq={}", sequence);
5183 Ok(())
5184 }
5185
5186 #[allow(dead_code)]
5195 pub(crate) fn get_nat_traversal_stats(&self) -> Option<&nat_traversal::NatTraversalStats> {
5196 self.nat_traversal.as_ref().map(|state| &state.stats)
5197 }
5198
5199 pub fn is_nat_traversal_enabled(&self) -> bool {
5201 self.nat_traversal.is_some()
5202 }
5203
5204 pub fn get_nat_traversal_role(&self) -> Option<NatTraversalRole> {
5206 self.nat_traversal.as_ref().map(|state| state.role)
5207 }
5208
5209 fn negotiate_address_discovery(&mut self, peer_params: &TransportParameters) {
5211 let now = Instant::now();
5212
5213 match &peer_params.address_discovery {
5215 Some(peer_config) if peer_config.enabled => {
5216 if let Some(state) = &mut self.address_discovery_state {
5218 if state.enabled {
5219 let negotiated_rate = state.max_observation_rate.min(peer_config.max_observation_rate);
5222 state.update_rate_limit(negotiated_rate as f64);
5223
5224 state.observe_all_paths = state.observe_all_paths && peer_config.observe_all_paths;
5227
5228 debug!(
5229 "Address discovery negotiated: rate={}, all_paths={}",
5230 negotiated_rate,
5231 state.observe_all_paths
5232 );
5233 } else {
5234 debug!("Address discovery disabled locally, ignoring peer support");
5236 }
5237 } else {
5238 self.address_discovery_state = Some(AddressDiscoveryState::new(peer_config, now));
5240 debug!("Address discovery initialized from peer config");
5241 }
5242 }
5243 _ => {
5244 if let Some(state) = &mut self.address_discovery_state {
5246 state.enabled = false;
5247 debug!("Address discovery disabled - peer doesn't support it");
5248 }
5249 }
5250 }
5251
5252 if let Some(state) = &self.address_discovery_state {
5254 if state.enabled {
5255 self.path.set_observation_rate(state.max_observation_rate);
5256 }
5257 }
5258 }
5259
5260 fn decrypt_packet(
5261 &mut self,
5262 now: Instant,
5263 packet: &mut Packet,
5264 ) -> Result<Option<u64>, Option<TransportError>> {
5265 let result = packet_crypto::decrypt_packet_body(
5266 packet,
5267 &self.spaces,
5268 self.zero_rtt_crypto.as_ref(),
5269 self.key_phase,
5270 self.prev_crypto.as_ref(),
5271 self.next_crypto.as_ref(),
5272 )?;
5273
5274 let result = match result {
5275 Some(r) => r,
5276 None => return Ok(None),
5277 };
5278
5279 if result.outgoing_key_update_acked {
5280 if let Some(prev) = self.prev_crypto.as_mut() {
5281 prev.end_packet = Some((result.number, now));
5282 self.set_key_discard_timer(now, packet.header.space());
5283 }
5284 }
5285
5286 if result.incoming_key_update {
5287 trace!("key update authenticated");
5288 self.update_keys(Some((result.number, now)), true);
5289 self.set_key_discard_timer(now, packet.header.space());
5290 }
5291
5292 Ok(Some(result.number))
5293 }
5294
5295 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5296 trace!("executing key update");
5297 let new = self
5301 .crypto
5302 .next_1rtt_keys()
5303 .expect("only called for `Data` packets");
5304 self.key_phase_size = new
5305 .local
5306 .confidentiality_limit()
5307 .saturating_sub(KEY_UPDATE_MARGIN);
5308 let old = mem::replace(
5309 &mut self.spaces[SpaceId::Data]
5310 .crypto
5311 .as_mut()
5312 .unwrap() .packet,
5314 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5315 );
5316 self.spaces[SpaceId::Data].sent_with_keys = 0;
5317 self.prev_crypto = Some(PrevCrypto {
5318 crypto: old,
5319 end_packet,
5320 update_unacked: remote,
5321 });
5322 self.key_phase = !self.key_phase;
5323 }
5324
5325 fn peer_supports_ack_frequency(&self) -> bool {
5326 self.peer_params.min_ack_delay.is_some()
5327 }
5328
5329 pub(crate) fn immediate_ack(&mut self) {
5334 self.spaces[self.highest_space].immediate_ack_pending = true;
5335 }
5336
5337 #[cfg(test)]
5339 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5340 let (first_decode, remaining) = match &event.0 {
5341 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5342 first_decode,
5343 remaining,
5344 ..
5345 }) => (first_decode, remaining),
5346 _ => return None,
5347 };
5348
5349 if remaining.is_some() {
5350 panic!("Packets should never be coalesced in tests");
5351 }
5352
5353 let decrypted_header = packet_crypto::unprotect_header(
5354 first_decode.clone(),
5355 &self.spaces,
5356 self.zero_rtt_crypto.as_ref(),
5357 self.peer_params.stateless_reset_token,
5358 )?;
5359
5360 let mut packet = decrypted_header.packet?;
5361 packet_crypto::decrypt_packet_body(
5362 &mut packet,
5363 &self.spaces,
5364 self.zero_rtt_crypto.as_ref(),
5365 self.key_phase,
5366 self.prev_crypto.as_ref(),
5367 self.next_crypto.as_ref(),
5368 )
5369 .ok()?;
5370
5371 Some(packet.payload.to_vec())
5372 }
5373
5374 #[cfg(test)]
5377 pub(crate) fn bytes_in_flight(&self) -> u64 {
5378 self.path.in_flight.bytes
5379 }
5380
5381 #[cfg(test)]
5383 pub(crate) fn congestion_window(&self) -> u64 {
5384 self.path
5385 .congestion
5386 .window()
5387 .saturating_sub(self.path.in_flight.bytes)
5388 }
5389
5390 #[cfg(test)]
5392 pub(crate) fn is_idle(&self) -> bool {
5393 Timer::VALUES
5394 .iter()
5395 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
5396 .filter_map(|&t| Some((t, self.timers.get(t)?)))
5397 .min_by_key(|&(_, time)| time)
5398 .map_or(true, |(timer, _)| timer == Timer::Idle)
5399 }
5400
5401 #[cfg(test)]
5403 pub(crate) fn lost_packets(&self) -> u64 {
5404 self.lost_packets
5405 }
5406
5407 #[cfg(test)]
5409 pub(crate) fn using_ecn(&self) -> bool {
5410 self.path.sending_ecn
5411 }
5412
5413 #[cfg(test)]
5415 pub(crate) fn total_recvd(&self) -> u64 {
5416 self.path.total_recvd
5417 }
5418
5419 #[cfg(test)]
5420 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
5421 self.local_cid_state.active_seq()
5422 }
5423
5424 #[cfg(test)]
5427 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
5428 let n = self.local_cid_state.assign_retire_seq(v);
5429 self.endpoint_events
5430 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
5431 }
5432
5433 #[cfg(test)]
5435 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
5436 self.rem_cids.active_seq()
5437 }
5438
5439 #[cfg(test)]
5441 pub(crate) fn path_mtu(&self) -> u16 {
5442 self.path.current_mtu()
5443 }
5444
5445 fn can_send_1rtt(&self, max_size: usize) -> bool {
5449 self.streams.can_send_stream_data()
5450 || self.path.challenge_pending
5451 || self
5452 .prev_path
5453 .as_ref()
5454 .is_some_and(|(_, x)| x.challenge_pending)
5455 || !self.path_responses.is_empty()
5456 || !self.nat_traversal_challenges.is_empty()
5457 || self
5458 .datagrams
5459 .outgoing
5460 .front()
5461 .is_some_and(|x| x.size(true) <= max_size)
5462 }
5463
5464 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
5466 for path in [&mut self.path]
5468 .into_iter()
5469 .chain(self.prev_path.as_mut().map(|(_, data)| data))
5470 {
5471 if path.remove_in_flight(pn, packet) {
5472 return;
5473 }
5474 }
5475 }
5476
5477 fn kill(&mut self, reason: ConnectionError) {
5479 self.close_common();
5480 self.error = Some(reason);
5481 self.state = State::Drained;
5482 self.endpoint_events.push_back(EndpointEventInner::Drained);
5483 }
5484
5485 fn generate_nat_traversal_challenges(&mut self, now: Instant) {
5487 let candidates: Vec<(VarInt, SocketAddr)> = if let Some(nat_state) = &self.nat_traversal {
5489 nat_state
5490 .get_validation_candidates()
5491 .into_iter()
5492 .take(3) .map(|(seq, candidate)| (seq, candidate.address))
5494 .collect()
5495 } else {
5496 return;
5497 };
5498
5499 if candidates.is_empty() {
5500 return;
5501 }
5502
5503 if let Some(nat_state) = &mut self.nat_traversal {
5505 for (seq, address) in candidates {
5506 let challenge: u64 = self.rng.gen();
5508
5509 if let Err(e) = nat_state.start_validation(seq, challenge, now) {
5511 debug!("Failed to start validation for candidate {}: {}", seq, e);
5512 continue;
5513 }
5514
5515 self.nat_traversal_challenges.push(address, challenge);
5517 trace!(
5518 "Queuing NAT validation PATH_CHALLENGE for {} with token {:08x}",
5519 address, challenge
5520 );
5521 }
5522 }
5523 }
5524
5525 pub fn current_mtu(&self) -> u16 {
5529 self.path.current_mtu()
5530 }
5531
5532 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
5539 let pn_len = match pn {
5540 Some(pn) => PacketNumber::new(
5541 pn,
5542 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
5543 )
5544 .len(),
5545 None => 4,
5547 };
5548
5549 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
5551 }
5552
5553 fn tag_len_1rtt(&self) -> usize {
5554 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
5555 Some(crypto) => Some(&*crypto.packet.local),
5556 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
5557 };
5558 key.map_or(16, |x| x.tag_len())
5562 }
5563
5564 fn on_path_validated(&mut self) {
5566 self.path.validated = true;
5567 let ConnectionSide::Server { server_config } = &self.side else {
5568 return;
5569 };
5570 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
5571 new_tokens.clear();
5572 for _ in 0..server_config.validation_token.sent {
5573 new_tokens.push(self.path.remote);
5574 }
5575 }
5576}
5577
5578impl fmt::Debug for Connection {
5579 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5580 f.debug_struct("Connection")
5581 .field("handshake_cid", &self.handshake_cid)
5582 .finish()
5583 }
5584}
5585
5586enum ConnectionSide {
5588 Client {
5589 token: Bytes,
5591 token_store: Arc<dyn TokenStore>,
5592 server_name: String,
5593 },
5594 Server {
5595 server_config: Arc<ServerConfig>,
5596 },
5597}
5598
5599impl ConnectionSide {
5600 fn remote_may_migrate(&self) -> bool {
5601 match self {
5602 Self::Server { server_config } => server_config.migration,
5603 Self::Client { .. } => false,
5604 }
5605 }
5606
5607 fn is_client(&self) -> bool {
5608 self.side().is_client()
5609 }
5610
5611 fn is_server(&self) -> bool {
5612 self.side().is_server()
5613 }
5614
5615 fn side(&self) -> Side {
5616 match *self {
5617 Self::Client { .. } => Side::Client,
5618 Self::Server { .. } => Side::Server,
5619 }
5620 }
5621}
5622
5623impl From<SideArgs> for ConnectionSide {
5624 fn from(side: SideArgs) -> Self {
5625 match side {
5626 SideArgs::Client {
5627 token_store,
5628 server_name,
5629 } => Self::Client {
5630 token: token_store.take(&server_name).unwrap_or_default(),
5631 token_store,
5632 server_name,
5633 },
5634 SideArgs::Server {
5635 server_config,
5636 pref_addr_cid: _,
5637 path_validated: _,
5638 } => Self::Server { server_config },
5639 }
5640 }
5641}
5642
5643pub(crate) enum SideArgs {
5645 Client {
5646 token_store: Arc<dyn TokenStore>,
5647 server_name: String,
5648 },
5649 Server {
5650 server_config: Arc<ServerConfig>,
5651 pref_addr_cid: Option<ConnectionId>,
5652 path_validated: bool,
5653 },
5654}
5655
5656impl SideArgs {
5657 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
5658 match *self {
5659 Self::Client { .. } => None,
5660 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
5661 }
5662 }
5663
5664 pub(crate) fn path_validated(&self) -> bool {
5665 match *self {
5666 Self::Client { .. } => true,
5667 Self::Server { path_validated, .. } => path_validated,
5668 }
5669 }
5670
5671 pub(crate) fn side(&self) -> Side {
5672 match *self {
5673 Self::Client { .. } => Side::Client,
5674 Self::Server { .. } => Side::Server,
5675 }
5676 }
5677}
5678
5679#[derive(Debug, Error, Clone, PartialEq, Eq)]
5681pub enum ConnectionError {
5682 #[error("peer doesn't implement any supported version")]
5684 VersionMismatch,
5685 #[error(transparent)]
5687 TransportError(#[from] TransportError),
5688 #[error("aborted by peer: {0}")]
5690 ConnectionClosed(frame::ConnectionClose),
5691 #[error("closed by peer: {0}")]
5693 ApplicationClosed(frame::ApplicationClose),
5694 #[error("reset by peer")]
5696 Reset,
5697 #[error("timed out")]
5703 TimedOut,
5704 #[error("closed")]
5706 LocallyClosed,
5707 #[error("CIDs exhausted")]
5711 CidsExhausted,
5712}
5713
5714impl From<Close> for ConnectionError {
5715 fn from(x: Close) -> Self {
5716 match x {
5717 Close::Connection(reason) => Self::ConnectionClosed(reason),
5718 Close::Application(reason) => Self::ApplicationClosed(reason),
5719 }
5720 }
5721}
5722
5723impl From<ConnectionError> for io::Error {
5725 fn from(x: ConnectionError) -> Self {
5726 use ConnectionError::*;
5727 let kind = match x {
5728 TimedOut => io::ErrorKind::TimedOut,
5729 Reset => io::ErrorKind::ConnectionReset,
5730 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
5731 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
5732 io::ErrorKind::Other
5733 }
5734 };
5735 Self::new(kind, x)
5736 }
5737}
5738
5739#[allow(unreachable_pub)] #[derive(Clone)]
5741pub enum State {
5742 Handshake(state::Handshake),
5743 Established,
5744 Closed(state::Closed),
5745 Draining,
5746 Drained,
5748}
5749
5750impl State {
5751 fn closed<R: Into<Close>>(reason: R) -> Self {
5752 Self::Closed(state::Closed {
5753 reason: reason.into(),
5754 })
5755 }
5756
5757 fn is_handshake(&self) -> bool {
5758 matches!(*self, Self::Handshake(_))
5759 }
5760
5761 fn is_established(&self) -> bool {
5762 matches!(*self, Self::Established)
5763 }
5764
5765 fn is_closed(&self) -> bool {
5766 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
5767 }
5768
5769 fn is_drained(&self) -> bool {
5770 matches!(*self, Self::Drained)
5771 }
5772}
5773
5774mod state {
5775 use super::*;
5776
5777 #[allow(unreachable_pub)] #[derive(Clone)]
5779 pub struct Handshake {
5780 pub(super) rem_cid_set: bool,
5784 pub(super) expected_token: Bytes,
5788 pub(super) client_hello: Option<Bytes>,
5792 }
5793
5794 #[allow(unreachable_pub)] #[derive(Clone)]
5796 pub struct Closed {
5797 pub(super) reason: Close,
5798 }
5799}
5800
5801#[derive(Debug)]
5803pub enum Event {
5804 HandshakeDataReady,
5806 Connected,
5808 ConnectionLost {
5812 reason: ConnectionError,
5814 },
5815 Stream(StreamEvent),
5817 DatagramReceived,
5819 DatagramsUnblocked,
5821}
5822
5823fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
5824 if x > y { x - y } else { Duration::ZERO }
5825}
5826
5827fn get_max_ack_delay(params: &TransportParameters) -> Duration {
5828 Duration::from_micros(params.max_ack_delay.0 * 1000)
5829}
5830
5831const MAX_BACKOFF_EXPONENT: u32 = 16;
5833
5834const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
5842
5843const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
5849 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
5850
5851const KEY_UPDATE_MARGIN: u64 = 10_000;
5855
5856#[derive(Default)]
5857struct SentFrames {
5858 retransmits: ThinRetransmits,
5859 largest_acked: Option<u64>,
5860 stream_frames: StreamMetaVec,
5861 non_retransmits: bool,
5863 requires_padding: bool,
5864}
5865
5866impl SentFrames {
5867 fn is_ack_only(&self, streams: &StreamsState) -> bool {
5869 self.largest_acked.is_some()
5870 && !self.non_retransmits
5871 && self.stream_frames.is_empty()
5872 && self.retransmits.is_empty(streams)
5873 }
5874}
5875
5876fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
5884 match (x, y) {
5885 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
5886 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
5887 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
5888 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
5889 }
5890}
5891
5892#[derive(Debug, Clone)]
5894struct AddressDiscoveryState {
5895 enabled: bool,
5897 max_observation_rate: u8,
5899 observe_all_paths: bool,
5901 path_addresses: std::collections::HashMap<u64, paths::PathAddressInfo>,
5903 rate_limiter: AddressObservationRateLimiter,
5905 observed_addresses: Vec<ObservedAddressEvent>,
5907 bootstrap_mode: bool,
5909}
5910
5911#[derive(Debug, Clone, PartialEq, Eq)]
5913struct ObservedAddressEvent {
5914 address: SocketAddr,
5916 received_at: Instant,
5918 path_id: u64,
5920}
5921
5922#[derive(Debug, Clone)]
5924struct AddressObservationRateLimiter {
5925 tokens: f64,
5927 max_tokens: f64,
5929 rate: f64,
5931 last_update: Instant,
5933}
5934
5935impl AddressDiscoveryState {
5936 fn new(config: &crate::transport_parameters::AddressDiscoveryConfig, now: Instant) -> Self {
5938 Self {
5939 enabled: config.enabled,
5940 max_observation_rate: config.max_observation_rate,
5941 observe_all_paths: config.observe_all_paths,
5942 path_addresses: std::collections::HashMap::new(),
5943 rate_limiter: AddressObservationRateLimiter::new(config.max_observation_rate, now),
5944 observed_addresses: Vec::new(),
5945 bootstrap_mode: false,
5946 }
5947 }
5948
5949 fn should_send_observation(&mut self, path_id: u64, now: Instant) -> bool {
5951 if !self.should_observe_path(path_id) {
5953 return false;
5954 }
5955
5956 let needs_observation = match self.path_addresses.get(&path_id) {
5958 Some(info) => info.observed_address.is_none() || !info.notified,
5959 None => true,
5960 };
5961
5962 if !needs_observation {
5963 return false;
5964 }
5965
5966 self.rate_limiter.try_consume(1.0, now)
5968 }
5969
5970 fn record_observation_sent(&mut self, path_id: u64) {
5972 if let Some(info) = self.path_addresses.get_mut(&path_id) {
5973 info.mark_notified();
5974 }
5975 }
5976
5977 fn handle_observed_address(&mut self, address: SocketAddr, path_id: u64, now: Instant) {
5979 if !self.enabled {
5980 return;
5981 }
5982
5983 self.observed_addresses.push(ObservedAddressEvent {
5984 address,
5985 received_at: now,
5986 path_id,
5987 });
5988
5989 let info = self.path_addresses.entry(path_id).or_insert_with(|| {
5991 paths::PathAddressInfo::new()
5992 });
5993 info.update_observed_address(address, now);
5994 }
5995
5996 pub(crate) fn get_observed_address(&self, path_id: u64) -> Option<SocketAddr> {
5998 self.path_addresses.get(&path_id)
5999 .and_then(|info| info.observed_address)
6000 }
6001
6002 pub(crate) fn get_all_observed_addresses(&self) -> Vec<SocketAddr> {
6004 self.path_addresses
6005 .values()
6006 .filter_map(|info| info.observed_address)
6007 .collect()
6008 }
6009
6010 pub(crate) fn stats(&self) -> AddressDiscoveryStats {
6012 AddressDiscoveryStats {
6013 frames_sent: self.observed_addresses.len() as u64, frames_received: self.observed_addresses.len() as u64,
6015 addresses_discovered: self.path_addresses
6016 .values()
6017 .filter(|info| info.observed_address.is_some())
6018 .count() as u64,
6019 address_changes_detected: 0, }
6021 }
6022
6023 fn has_unnotified_changes(&self) -> bool {
6025 self.path_addresses.values()
6026 .any(|info| info.observed_address.is_some() && !info.notified)
6027 }
6028
6029 fn queue_observed_address_frame(&mut self, path_id: u64, address: SocketAddr) -> Option<frame::ObservedAddress> {
6031 if !self.enabled {
6033 return None;
6034 }
6035
6036 if !self.observe_all_paths && path_id != 0 {
6038 return None;
6039 }
6040
6041 if let Some(info) = self.path_addresses.get(&path_id) {
6043 if info.notified {
6044 return None;
6045 }
6046 }
6047
6048 if self.rate_limiter.tokens < 1.0 {
6050 return None;
6051 }
6052
6053 self.rate_limiter.tokens -= 1.0;
6055
6056 let info = self.path_addresses.entry(path_id).or_insert_with(|| {
6058 paths::PathAddressInfo::new()
6059 });
6060 info.observed_address = Some(address);
6061 info.notified = true;
6062
6063 Some(frame::ObservedAddress { address })
6065 }
6066
6067 fn check_for_address_observations(
6069 &mut self,
6070 _current_path: u64,
6071 peer_supports_address_discovery: bool,
6072 now: Instant
6073 ) -> Vec<frame::ObservedAddress> {
6074 let mut frames = Vec::new();
6075
6076 if !self.enabled || !peer_supports_address_discovery {
6078 return frames;
6079 }
6080
6081 self.rate_limiter.update_tokens(now);
6083
6084 let paths_to_notify: Vec<u64> = self.path_addresses.iter()
6086 .filter_map(|(&path_id, info)| {
6087 if info.observed_address.is_some() && !info.notified {
6088 Some(path_id)
6089 } else {
6090 None
6091 }
6092 })
6093 .collect();
6094
6095 for path_id in paths_to_notify {
6097 if !self.should_observe_path(path_id) {
6099 continue;
6100 }
6101
6102 if !self.bootstrap_mode && self.rate_limiter.tokens < 1.0 {
6104 break; }
6106
6107 if let Some(info) = self.path_addresses.get_mut(&path_id) {
6109 if let Some(address) = info.observed_address {
6110 if self.bootstrap_mode {
6112 self.rate_limiter.tokens -= 0.2; } else {
6114 self.rate_limiter.tokens -= 1.0;
6115 }
6116
6117 info.notified = true;
6119
6120 frames.push(frame::ObservedAddress { address });
6122 }
6123 }
6124 }
6125
6126 frames
6127 }
6128
6129 fn update_rate_limit(&mut self, new_rate: f64) {
6131 self.max_observation_rate = new_rate as u8;
6132 self.rate_limiter.set_rate(new_rate as u8);
6133 }
6134
6135 fn from_transport_params(params: &TransportParameters) -> Option<Self> {
6137 params.address_discovery.as_ref().map(|config| {
6138 Self::new(config, Instant::now())
6139 })
6140 }
6141
6142 #[cfg(test)]
6144 fn new_with_params(enabled: bool, max_rate: f64, observe_all_paths: bool) -> Self {
6145 let config = crate::transport_parameters::AddressDiscoveryConfig {
6146 enabled,
6147 max_observation_rate: max_rate as u8,
6148 observe_all_paths,
6149 };
6150 Self::new(&config, Instant::now())
6151 }
6152
6153 fn set_bootstrap_mode(&mut self, enabled: bool) {
6155 self.bootstrap_mode = enabled;
6156 if enabled {
6158 let bootstrap_rate = self.get_effective_rate_limit();
6159 self.rate_limiter.rate = bootstrap_rate;
6160 self.rate_limiter.max_tokens = bootstrap_rate * 2.0; self.rate_limiter.tokens = self.rate_limiter.max_tokens;
6163 }
6164 }
6165
6166 fn is_bootstrap_mode(&self) -> bool {
6168 self.bootstrap_mode
6169 }
6170
6171 fn get_effective_rate_limit(&self) -> f64 {
6173 if self.bootstrap_mode {
6174 (self.max_observation_rate as f64) * 5.0
6176 } else {
6177 self.max_observation_rate as f64
6178 }
6179 }
6180
6181 fn should_observe_path(&self, path_id: u64) -> bool {
6183 if !self.enabled {
6184 return false;
6185 }
6186
6187 if self.bootstrap_mode {
6189 return true;
6190 }
6191
6192 self.observe_all_paths || path_id == 0
6194 }
6195
6196 fn should_send_observation_immediately(&self, is_new_connection: bool) -> bool {
6198 self.bootstrap_mode && is_new_connection
6199 }
6200}
6201
6202impl AddressObservationRateLimiter {
6203 fn new(rate: u8, now: Instant) -> Self {
6205 let rate_f64 = rate as f64;
6206 Self {
6207 tokens: rate_f64,
6208 max_tokens: rate_f64,
6209 rate: rate_f64,
6210 last_update: now,
6211 }
6212 }
6213
6214 fn try_consume(&mut self, tokens: f64, now: Instant) -> bool {
6216 self.update_tokens(now);
6217
6218 if self.tokens >= tokens {
6219 self.tokens -= tokens;
6220 true
6221 } else {
6222 false
6223 }
6224 }
6225
6226 fn update_tokens(&mut self, now: Instant) {
6228 let elapsed = now.saturating_duration_since(self.last_update);
6229 let new_tokens = elapsed.as_secs_f64() * self.rate;
6230 self.tokens = (self.tokens + new_tokens).min(self.max_tokens);
6231 self.last_update = now;
6232 }
6233
6234 fn set_rate(&mut self, rate: u8) {
6236 let rate_f64 = rate as f64;
6237 self.rate = rate_f64;
6238 self.max_tokens = rate_f64;
6239 if self.tokens > self.max_tokens {
6241 self.tokens = self.max_tokens;
6242 }
6243 }
6244}
6245
6246#[cfg(test)]
6247mod tests {
6248 use super::*;
6249 use crate::transport_parameters::AddressDiscoveryConfig;
6250 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
6251
6252 #[test]
6253 fn address_discovery_state_new() {
6254 let config = crate::transport_parameters::AddressDiscoveryConfig {
6255 enabled: true,
6256 max_observation_rate: 10,
6257 observe_all_paths: false,
6258 };
6259 let now = Instant::now();
6260 let state = AddressDiscoveryState::new(&config, now);
6261
6262 assert!(state.enabled);
6263 assert_eq!(state.max_observation_rate, 10);
6264 assert!(!state.observe_all_paths);
6265 assert!(state.path_addresses.is_empty());
6266 assert!(state.observed_addresses.is_empty());
6267 assert_eq!(state.rate_limiter.tokens, 10.0);
6268 }
6269
6270 #[test]
6271 fn address_discovery_state_disabled() {
6272 let config = crate::transport_parameters::AddressDiscoveryConfig {
6273 enabled: false,
6274 max_observation_rate: 10,
6275 observe_all_paths: false,
6276 };
6277 let now = Instant::now();
6278 let mut state = AddressDiscoveryState::new(&config, now);
6279
6280 assert!(!state.should_send_observation(0, now));
6282 }
6283
6284 #[test]
6285 fn address_discovery_state_should_send_observation() {
6286 let config = crate::transport_parameters::AddressDiscoveryConfig {
6287 enabled: true,
6288 max_observation_rate: 10,
6289 observe_all_paths: true,
6290 };
6291 let now = Instant::now();
6292 let mut state = AddressDiscoveryState::new(&config, now);
6293
6294 assert!(state.should_send_observation(0, now));
6296
6297 let mut path_info = paths::PathAddressInfo::new();
6299 path_info.update_observed_address(
6300 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6301 now
6302 );
6303 path_info.mark_notified();
6304 state.path_addresses.insert(0, path_info);
6305
6306 assert!(!state.should_send_observation(0, now));
6308
6309 assert!(state.should_send_observation(1, now));
6311 }
6312
6313 #[test]
6314 fn address_discovery_state_rate_limiting() {
6315 let config = crate::transport_parameters::AddressDiscoveryConfig {
6316 enabled: true,
6317 max_observation_rate: 2, observe_all_paths: true,
6319 };
6320 let now = Instant::now();
6321 let mut state = AddressDiscoveryState::new(&config, now);
6322
6323 assert!(state.should_send_observation(0, now));
6325 assert!(state.should_send_observation(1, now));
6326
6327 assert!(!state.should_send_observation(2, now));
6329
6330 let later = now + Duration::from_secs(1);
6332 assert!(state.should_send_observation(2, later));
6333 assert!(state.should_send_observation(3, later));
6334 assert!(!state.should_send_observation(4, later));
6335 }
6336
6337 #[test]
6338 fn address_discovery_state_handle_observed_address() {
6339 let config = crate::transport_parameters::AddressDiscoveryConfig {
6340 enabled: true,
6341 max_observation_rate: 10,
6342 observe_all_paths: true,
6343 };
6344 let now = Instant::now();
6345 let mut state = AddressDiscoveryState::new(&config, now);
6346
6347 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6348 let addr2 = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)), 8080);
6349
6350 state.handle_observed_address(addr1, 0, now);
6352 assert_eq!(state.observed_addresses.len(), 1);
6353 assert_eq!(state.observed_addresses[0].address, addr1);
6354 assert_eq!(state.observed_addresses[0].path_id, 0);
6355
6356 let later = now + Duration::from_millis(100);
6358 state.handle_observed_address(addr2, 1, later);
6359 assert_eq!(state.observed_addresses.len(), 2);
6360 assert_eq!(state.observed_addresses[1].address, addr2);
6361 assert_eq!(state.observed_addresses[1].path_id, 1);
6362 }
6363
6364 #[test]
6365 fn address_discovery_state_get_observed_address() {
6366 let config = crate::transport_parameters::AddressDiscoveryConfig {
6367 enabled: true,
6368 max_observation_rate: 10,
6369 observe_all_paths: true,
6370 };
6371 let now = Instant::now();
6372 let mut state = AddressDiscoveryState::new(&config, now);
6373
6374 assert_eq!(state.get_observed_address(0), None);
6376
6377 let mut path_info = paths::PathAddressInfo::new();
6379 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80);
6380 path_info.update_observed_address(addr, now);
6381 state.path_addresses.insert(0, path_info);
6382
6383 assert_eq!(state.get_observed_address(0), Some(addr));
6385 assert_eq!(state.get_observed_address(1), None);
6386 }
6387
6388 #[test]
6389 fn address_discovery_state_unnotified_changes() {
6390 let config = crate::transport_parameters::AddressDiscoveryConfig {
6391 enabled: true,
6392 max_observation_rate: 10,
6393 observe_all_paths: true,
6394 };
6395 let now = Instant::now();
6396 let mut state = AddressDiscoveryState::new(&config, now);
6397
6398 assert!(!state.has_unnotified_changes());
6400
6401 let mut path_info = paths::PathAddressInfo::new();
6403 path_info.update_observed_address(
6404 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6405 now
6406 );
6407 state.path_addresses.insert(0, path_info);
6408
6409 assert!(state.has_unnotified_changes());
6411
6412 state.record_observation_sent(0);
6414 assert!(!state.has_unnotified_changes());
6415 }
6416
6417 #[test]
6418 fn address_observation_rate_limiter_token_bucket() {
6419 let now = Instant::now();
6420 let mut limiter = AddressObservationRateLimiter::new(5, now); assert_eq!(limiter.tokens, 5.0);
6424 assert_eq!(limiter.max_tokens, 5.0);
6425 assert_eq!(limiter.rate, 5.0);
6426
6427 assert!(limiter.try_consume(3.0, now));
6429 assert_eq!(limiter.tokens, 2.0);
6430
6431 assert!(!limiter.try_consume(3.0, now));
6433 assert_eq!(limiter.tokens, 2.0);
6434
6435 let later = now + Duration::from_secs(1);
6437 limiter.update_tokens(later);
6438 assert_eq!(limiter.tokens, 5.0); let half_sec = now + Duration::from_millis(500);
6442 let mut limiter2 = AddressObservationRateLimiter::new(5, now);
6443 limiter2.try_consume(3.0, now);
6444 limiter2.update_tokens(half_sec);
6445 assert_eq!(limiter2.tokens, 4.5); }
6447
6448 #[test]
6450 fn connection_initializes_address_discovery_state_default() {
6451 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6454 let state = AddressDiscoveryState::new(&config, Instant::now());
6455 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
6458 }
6459
6460 #[test]
6461 fn connection_initializes_with_address_discovery_enabled() {
6462 let config = crate::transport_parameters::AddressDiscoveryConfig {
6464 enabled: true,
6465 max_observation_rate: 10,
6466 observe_all_paths: false,
6467 };
6468 let state = AddressDiscoveryState::new(&config, Instant::now());
6469 assert!(state.enabled);
6470 assert_eq!(state.max_observation_rate, 10);
6471 assert!(!state.observe_all_paths);
6472 }
6473
6474 #[test]
6475 fn connection_address_discovery_enabled_by_default() {
6476 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6478 let state = AddressDiscoveryState::new(&config, Instant::now());
6479 assert!(state.enabled); }
6481
6482 #[test]
6483 fn negotiate_max_idle_timeout_commutative() {
6484 let test_params = [
6485 (None, None, None),
6486 (None, Some(VarInt(0)), None),
6487 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6488 (Some(VarInt(0)), Some(VarInt(0)), None),
6489 (
6490 Some(VarInt(2)),
6491 Some(VarInt(0)),
6492 Some(Duration::from_millis(2)),
6493 ),
6494 (
6495 Some(VarInt(1)),
6496 Some(VarInt(4)),
6497 Some(Duration::from_millis(1)),
6498 ),
6499 ];
6500
6501 for (left, right, result) in test_params {
6502 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6503 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6504 }
6505 }
6506
6507 #[test]
6508 fn path_creation_initializes_address_discovery() {
6509 let config = TransportConfig::default();
6510 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6511 let now = Instant::now();
6512
6513 let path = paths::PathData::new(remote, false, None, now, &config);
6515
6516 assert!(path.address_info.observed_address.is_none());
6518 assert!(path.address_info.last_observed.is_none());
6519 assert_eq!(path.address_info.observation_count, 0);
6520 assert!(!path.address_info.notified);
6521
6522 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6524 assert_eq!(path.observation_rate_limiter.max_tokens, 10.0);
6525 assert_eq!(path.observation_rate_limiter.tokens, 10.0);
6526 }
6527
6528 #[test]
6529 fn path_migration_resets_address_discovery() {
6530 let config = TransportConfig::default();
6531 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6532 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6533 let now = Instant::now();
6534
6535 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
6537 path1.update_observed_address(remote1, now);
6538 path1.mark_address_notified();
6539 path1.consume_observation_token(now);
6540 path1.set_observation_rate(20);
6541
6542 let path2 = paths::PathData::from_previous(remote2, &path1, now);
6544
6545 assert!(path2.address_info.observed_address.is_none());
6547 assert!(path2.address_info.last_observed.is_none());
6548 assert_eq!(path2.address_info.observation_count, 0);
6549 assert!(!path2.address_info.notified);
6550
6551 assert_eq!(path2.observation_rate_limiter.rate, 20.0);
6553 assert_eq!(path2.observation_rate_limiter.tokens, 20.0);
6554 }
6555
6556 #[test]
6557 fn connection_path_updates_observation_rate() {
6558 let config = TransportConfig::default();
6559 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 42);
6560 let now = Instant::now();
6561
6562 let mut path = paths::PathData::new(remote, false, None, now, &config);
6563
6564 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6566
6567 path.set_observation_rate(25);
6569 assert_eq!(path.observation_rate_limiter.rate, 25.0);
6570 assert_eq!(path.observation_rate_limiter.max_tokens, 25.0);
6571
6572 path.observation_rate_limiter.tokens = 30.0; path.set_observation_rate(20);
6575 assert_eq!(path.observation_rate_limiter.tokens, 20.0); }
6577
6578 #[test]
6579 fn path_validation_preserves_discovery_state() {
6580 let config = TransportConfig::default();
6581 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6582 let now = Instant::now();
6583
6584 let mut path = paths::PathData::new(remote, false, None, now, &config);
6585
6586 let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 5678);
6588 path.update_observed_address(observed, now);
6589 path.set_observation_rate(15);
6590
6591 path.validated = true;
6593
6594 assert_eq!(path.address_info.observed_address, Some(observed));
6596 assert_eq!(path.observation_rate_limiter.rate, 15.0);
6597 }
6598
6599 #[test]
6600 fn address_discovery_state_initialization() {
6601 let mut config = AddressDiscoveryConfig::default();
6602 config.enabled = true;
6603 config.max_observation_rate = 30;
6604 config.observe_all_paths = true;
6605
6606 let now = Instant::now();
6607 let state = AddressDiscoveryState::new(&config, now);
6608
6609 assert!(state.enabled);
6610 assert_eq!(state.max_observation_rate, 30);
6611 assert!(state.observe_all_paths);
6612 assert!(state.path_addresses.is_empty());
6613 assert!(state.observed_addresses.is_empty());
6614 }
6615
6616 #[test]
6618 fn handle_observed_address_frame_basic() {
6619 let config = AddressDiscoveryConfig {
6620 enabled: true,
6621 max_observation_rate: 10,
6622 observe_all_paths: false,
6623 };
6624 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6625 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6626 let now = Instant::now();
6627 let path_id = 0;
6628
6629 state.handle_observed_address(addr, path_id, now);
6631
6632 assert_eq!(state.observed_addresses.len(), 1);
6634 assert_eq!(state.observed_addresses[0].address, addr);
6635 assert_eq!(state.observed_addresses[0].path_id, path_id);
6636 assert_eq!(state.observed_addresses[0].received_at, now);
6637
6638 assert!(state.path_addresses.contains_key(&path_id));
6640 let path_info = &state.path_addresses[&path_id];
6641 assert_eq!(path_info.observed_address, Some(addr));
6642 assert_eq!(path_info.last_observed, Some(now));
6643 assert_eq!(path_info.observation_count, 1);
6644 }
6645
6646 #[test]
6647 fn handle_observed_address_frame_multiple_observations() {
6648 let config = AddressDiscoveryConfig {
6649 enabled: true,
6650 max_observation_rate: 10,
6651 observe_all_paths: false,
6652 };
6653 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6654 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6655 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6656 let now = Instant::now();
6657 let path_id = 0;
6658
6659 state.handle_observed_address(addr1, path_id, now);
6661 state.handle_observed_address(addr1, path_id, now + Duration::from_secs(1));
6662 state.handle_observed_address(addr2, path_id, now + Duration::from_secs(2));
6663
6664 assert_eq!(state.observed_addresses.len(), 3);
6666
6667 let path_info = &state.path_addresses[&path_id];
6669 assert_eq!(path_info.observed_address, Some(addr2));
6670 assert_eq!(path_info.observation_count, 1); }
6672
6673 #[test]
6674 fn handle_observed_address_frame_disabled() {
6675 let mut config = AddressDiscoveryConfig::default();
6676 config.enabled = false;
6677 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6678 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6679 let now = Instant::now();
6680
6681 state.handle_observed_address(addr, 0, now);
6683
6684 assert!(state.observed_addresses.is_empty());
6686 assert!(state.path_addresses.is_empty());
6687 }
6688
6689 #[test]
6690 fn should_send_observation_basic() {
6691 let mut config = AddressDiscoveryConfig::default();
6692 config.enabled = true;
6693 config.max_observation_rate = 10;
6694 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6695 let now = Instant::now();
6696 let path_id = 0;
6697
6698 assert!(state.should_send_observation(path_id, now));
6700
6701 state.record_observation_sent(path_id);
6703
6704 assert!(state.should_send_observation(path_id, now));
6706 }
6707
6708 #[test]
6709 fn should_send_observation_rate_limiting() {
6710 let mut config = AddressDiscoveryConfig::default();
6711 config.enabled = true;
6712 config.max_observation_rate = 2; let now = Instant::now();
6714 let mut state = AddressDiscoveryState::new(&config, now);
6715 let path_id = 0;
6716
6717 assert!(state.should_send_observation(path_id, now));
6719 state.record_observation_sent(path_id);
6720 assert!(state.should_send_observation(path_id, now));
6721 state.record_observation_sent(path_id);
6722
6723 assert!(!state.should_send_observation(path_id, now));
6725
6726 let later = now + Duration::from_secs(1);
6728 assert!(state.should_send_observation(path_id, later));
6729 }
6730
6731 #[test]
6732 fn should_send_observation_disabled() {
6733 let mut config = AddressDiscoveryConfig::default();
6734 config.enabled = false;
6735 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6736
6737 assert!(!state.should_send_observation(0, Instant::now()));
6739 }
6740
6741 #[test]
6742 fn should_send_observation_per_path() {
6743 let mut config = AddressDiscoveryConfig::default();
6744 config.enabled = true;
6745 config.max_observation_rate = 2; config.observe_all_paths = true;
6747 let now = Instant::now();
6748 let mut state = AddressDiscoveryState::new(&config, now);
6749
6750 assert!(state.should_send_observation(0, now));
6752 state.record_observation_sent(0);
6753
6754 assert!(state.should_send_observation(1, now));
6756 state.record_observation_sent(1);
6757
6758 assert!(!state.should_send_observation(0, now));
6760 assert!(!state.should_send_observation(1, now));
6761
6762 let later = now + Duration::from_secs(1);
6764 assert!(state.should_send_observation(0, later));
6765 }
6766
6767 #[test]
6768 fn has_unnotified_changes_test() {
6769 let config = AddressDiscoveryConfig {
6770 enabled: true,
6771 max_observation_rate: 10,
6772 observe_all_paths: false,
6773 };
6774 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6775 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6776 let now = Instant::now();
6777
6778 assert!(!state.has_unnotified_changes());
6780
6781 state.handle_observed_address(addr, 0, now);
6783 assert!(state.has_unnotified_changes());
6784
6785 state.path_addresses.get_mut(&0).unwrap().notified = true;
6787 assert!(!state.has_unnotified_changes());
6788 }
6789
6790 #[test]
6791 fn get_observed_address_test() {
6792 let config = AddressDiscoveryConfig {
6793 enabled: true,
6794 max_observation_rate: 10,
6795 observe_all_paths: false,
6796 };
6797 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6798 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6799 let now = Instant::now();
6800 let path_id = 0;
6801
6802 assert_eq!(state.get_observed_address(path_id), None);
6804
6805 state.handle_observed_address(addr, path_id, now);
6807 assert_eq!(state.get_observed_address(path_id), Some(addr));
6808
6809 assert_eq!(state.get_observed_address(999), None);
6811 }
6812
6813 #[test]
6815 fn rate_limiter_token_bucket_basic() {
6816 let now = Instant::now();
6817 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(5.0, now));
6821 assert!(limiter.try_consume(5.0, now));
6822
6823 assert!(!limiter.try_consume(1.0, now));
6825 }
6826
6827 #[test]
6828 fn rate_limiter_token_replenishment() {
6829 let now = Instant::now();
6830 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(10.0, now));
6834 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
6838 assert!(limiter.try_consume(10.0, later)); assert!(!limiter.try_consume(0.1, later)); let later = later + Duration::from_millis(500);
6843 assert!(limiter.try_consume(5.0, later)); assert!(!limiter.try_consume(0.1, later)); }
6846
6847 #[test]
6848 fn rate_limiter_max_tokens_cap() {
6849 let now = Instant::now();
6850 let mut limiter = AddressObservationRateLimiter::new(10, now);
6851
6852 let later = now + Duration::from_secs(2);
6854 assert!(limiter.try_consume(10.0, later));
6856 assert!(!limiter.try_consume(10.1, later)); let later2 = later + Duration::from_secs(1);
6860 assert!(limiter.try_consume(3.0, later2));
6861
6862 let much_later = later2 + Duration::from_secs(2);
6864 assert!(limiter.try_consume(10.0, much_later)); assert!(!limiter.try_consume(0.1, much_later)); }
6867
6868 #[test]
6869 fn rate_limiter_fractional_consumption() {
6870 let now = Instant::now();
6871 let mut limiter = AddressObservationRateLimiter::new(10, now);
6872
6873 assert!(limiter.try_consume(0.5, now));
6875 assert!(limiter.try_consume(2.3, now));
6876 assert!(limiter.try_consume(7.2, now)); assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_millis(100); assert!(limiter.try_consume(1.0, later));
6882 assert!(!limiter.try_consume(0.1, later));
6883 }
6884
6885 #[test]
6886 fn rate_limiter_zero_rate() {
6887 let now = Instant::now();
6888 let mut limiter = AddressObservationRateLimiter::new(0, now); assert!(!limiter.try_consume(1.0, now));
6892 assert!(!limiter.try_consume(0.1, now));
6893 assert!(!limiter.try_consume(0.001, now));
6894
6895 let later = now + Duration::from_secs(10);
6897 assert!(!limiter.try_consume(0.001, later));
6898 }
6899
6900 #[test]
6901 fn rate_limiter_high_rate() {
6902 let now = Instant::now();
6903 let mut limiter = AddressObservationRateLimiter::new(63, now); assert!(limiter.try_consume(60.0, now));
6907 assert!(limiter.try_consume(3.0, now));
6908 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
6912 assert!(limiter.try_consume(63.0, later)); assert!(!limiter.try_consume(0.1, later)); }
6915
6916 #[test]
6917 fn rate_limiter_time_precision() {
6918 let now = Instant::now();
6919 let mut limiter = AddressObservationRateLimiter::new(100, now); assert!(limiter.try_consume(100.0, now));
6923 assert!(!limiter.try_consume(0.1, now));
6924
6925 let later = now + Duration::from_millis(10);
6927 assert!(limiter.try_consume(0.8, later)); assert!(!limiter.try_consume(0.5, later)); let much_later = later + Duration::from_millis(100); assert!(limiter.try_consume(5.0, much_later)); limiter.tokens = 0.0; let final_time = much_later + Duration::from_millis(1);
6939 limiter.update_tokens(final_time); assert!(limiter.tokens >= 0.09 && limiter.tokens <= 0.11);
6944 }
6945
6946 #[test]
6947 fn per_path_rate_limiting_independent() {
6948 let config = crate::transport_parameters::AddressDiscoveryConfig {
6949 enabled: true,
6950 max_observation_rate: 5, observe_all_paths: true,
6952 };
6953 let now = Instant::now();
6954 let mut state = AddressDiscoveryState::new(&config, now);
6955
6956 let path1_id = 0;
6958 let path2_id = 1;
6959 let path3_id = 2;
6960
6961 for _ in 0..3 {
6964 assert!(state.should_send_observation(path1_id, now));
6965 state.record_observation_sent(path1_id);
6966 }
6967
6968 for _ in 0..2 {
6970 assert!(state.should_send_observation(path2_id, now));
6971 state.record_observation_sent(path2_id);
6972 }
6973
6974 assert!(!state.should_send_observation(path3_id, now));
6976
6977 let later = now + Duration::from_secs(1);
6979
6980 assert!(state.should_send_observation(path1_id, later));
6982 assert!(state.should_send_observation(path2_id, later));
6983 assert!(state.should_send_observation(path3_id, later));
6984 }
6985
6986 #[test]
6987 fn per_path_rate_limiting_with_path_specific_limits() {
6988 let now = Instant::now();
6989 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6990 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
6991 let config = TransportConfig::default();
6992
6993 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
6995 let mut path2 = paths::PathData::new(remote2, false, None, now, &config);
6996
6997 path1.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now); path2.observation_rate_limiter = paths::PathObservationRateLimiter::new(5, now); for _ in 0..10 {
7003 assert!(path1.observation_rate_limiter.can_send(now));
7004 path1.observation_rate_limiter.consume_token(now);
7005 }
7006 assert!(!path1.observation_rate_limiter.can_send(now));
7007
7008 for _ in 0..5 {
7010 assert!(path2.observation_rate_limiter.can_send(now));
7011 path2.observation_rate_limiter.consume_token(now);
7012 }
7013 assert!(!path2.observation_rate_limiter.can_send(now));
7014 }
7015
7016 #[test]
7017 fn per_path_rate_limiting_address_change_detection() {
7018 let config = crate::transport_parameters::AddressDiscoveryConfig {
7019 enabled: true,
7020 max_observation_rate: 10,
7021 observe_all_paths: true,
7022 };
7023 let now = Instant::now();
7024 let mut state = AddressDiscoveryState::new(&config, now);
7025
7026 let path_id = 0;
7028 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7029 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8080);
7030
7031 assert!(state.should_send_observation(path_id, now));
7033 state.handle_observed_address(addr1, path_id, now);
7034 state.record_observation_sent(path_id);
7035
7036 assert!(!state.should_send_observation(path_id, now));
7038
7039 state.handle_observed_address(addr2, path_id, now);
7041 if let Some(info) = state.path_addresses.get_mut(&path_id) {
7042 info.notified = false; }
7044
7045 assert!(state.should_send_observation(path_id, now));
7047 }
7048
7049 #[test]
7050 fn per_path_rate_limiting_migration() {
7051 let now = Instant::now();
7052 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7053 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7054 let config = TransportConfig::default();
7055
7056 let mut path = paths::PathData::new(remote1, false, None, now, &config);
7058 path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7059
7060 for _ in 0..5 {
7062 assert!(path.observation_rate_limiter.can_send(now));
7063 path.observation_rate_limiter.consume_token(now);
7064 }
7065
7066 let mut new_path = paths::PathData::new(remote2, false, None, now, &config);
7068
7069 new_path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7072
7073 for _ in 0..10 {
7075 assert!(new_path.observation_rate_limiter.can_send(now));
7076 new_path.observation_rate_limiter.consume_token(now);
7077 }
7078 assert!(!new_path.observation_rate_limiter.can_send(now));
7079 }
7080
7081 #[test]
7082 fn per_path_rate_limiting_disabled_paths() {
7083 let config = crate::transport_parameters::AddressDiscoveryConfig {
7084 enabled: true,
7085 max_observation_rate: 10,
7086 observe_all_paths: false, };
7088 let now = Instant::now();
7089 let mut state = AddressDiscoveryState::new(&config, now);
7090
7091 assert!(state.should_send_observation(0, now));
7093
7094 assert!(!state.should_send_observation(1, now));
7096 assert!(!state.should_send_observation(2, now));
7097
7098 let later = now + Duration::from_secs(1);
7100 assert!(!state.should_send_observation(1, later));
7101 }
7102
7103 #[test]
7104 fn respecting_negotiated_max_observation_rate_basic() {
7105 let config = crate::transport_parameters::AddressDiscoveryConfig {
7106 enabled: true,
7107 max_observation_rate: 20, observe_all_paths: true,
7109 };
7110 let now = Instant::now();
7111 let mut state = AddressDiscoveryState::new(&config, now);
7112
7113 state.max_observation_rate = 10; state.rate_limiter = AddressObservationRateLimiter::new(10, now);
7116
7117 for _ in 0..10 {
7119 assert!(state.should_send_observation(0, now));
7120 }
7121 assert!(!state.should_send_observation(0, now));
7123 }
7124
7125 #[test]
7126 fn respecting_negotiated_max_observation_rate_zero() {
7127 let config = crate::transport_parameters::AddressDiscoveryConfig {
7128 enabled: true,
7129 max_observation_rate: 10,
7130 observe_all_paths: true,
7131 };
7132 let now = Instant::now();
7133 let mut state = AddressDiscoveryState::new(&config, now);
7134
7135 state.max_observation_rate = 0;
7137 state.rate_limiter = AddressObservationRateLimiter::new(0, now);
7138
7139 assert!(!state.should_send_observation(0, now));
7141 assert!(!state.should_send_observation(1, now));
7142
7143 let later = now + Duration::from_secs(10);
7145 assert!(!state.should_send_observation(0, later));
7146 }
7147
7148 #[test]
7149 fn respecting_negotiated_max_observation_rate_higher() {
7150 let config = crate::transport_parameters::AddressDiscoveryConfig {
7151 enabled: true,
7152 max_observation_rate: 5, observe_all_paths: true,
7154 };
7155 let now = Instant::now();
7156 let mut state = AddressDiscoveryState::new(&config, now);
7157
7158 state.max_observation_rate = 10; for _ in 0..5 {
7165 assert!(state.should_send_observation(0, now));
7166 }
7167 assert!(!state.should_send_observation(0, now));
7169 }
7170
7171 #[test]
7172 fn respecting_negotiated_max_observation_rate_dynamic_update() {
7173 let config = crate::transport_parameters::AddressDiscoveryConfig {
7174 enabled: true,
7175 max_observation_rate: 10,
7176 observe_all_paths: true,
7177 };
7178 let now = Instant::now();
7179 let mut state = AddressDiscoveryState::new(&config, now);
7180
7181 for _i in 0..5 {
7183 assert!(state.should_send_observation(0, now)); state.record_observation_sent(0);
7185 }
7186
7187 state.max_observation_rate = 3;
7191 state.rate_limiter.set_rate(3);
7192
7193 for i in 0..3 {
7196 assert!(state.should_send_observation(i+1, now)); state.record_observation_sent(i+1);
7198 }
7199 assert!(!state.should_send_observation(10, now));
7200
7201 let later = now + Duration::from_secs(1);
7203 for i in 0..3 {
7204 assert!(state.should_send_observation(i+20, later)); state.record_observation_sent(i+20);
7206 }
7207 assert!(!state.should_send_observation(30, later));
7208 }
7209
7210 #[test]
7211 fn respecting_negotiated_max_observation_rate_with_paths() {
7212 let config = crate::transport_parameters::AddressDiscoveryConfig {
7213 enabled: true,
7214 max_observation_rate: 6, observe_all_paths: true,
7216 };
7217 let now = Instant::now();
7218 let mut state = AddressDiscoveryState::new(&config, now);
7219
7220 assert!(state.should_send_observation(0, now));
7222 assert!(state.should_send_observation(1, now));
7223 assert!(state.should_send_observation(2, now));
7224 assert!(state.should_send_observation(0, now));
7225 assert!(state.should_send_observation(1, now));
7226 assert!(state.should_send_observation(2, now));
7227
7228 assert!(!state.should_send_observation(0, now));
7230 assert!(!state.should_send_observation(1, now));
7231 assert!(!state.should_send_observation(2, now));
7232 }
7233
7234 #[test]
7235 fn queue_observed_address_frame_basic() {
7236 let config = crate::transport_parameters::AddressDiscoveryConfig {
7237 enabled: true,
7238 max_observation_rate: 10,
7239 observe_all_paths: true,
7240 };
7241 let now = Instant::now();
7242 let mut state = AddressDiscoveryState::new(&config, now);
7243
7244 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7246 let frame = state.queue_observed_address_frame(0, address);
7247
7248 assert!(frame.is_some());
7250 let frame = frame.unwrap();
7251 assert_eq!(frame.address, address);
7252
7253 assert!(state.path_addresses.contains_key(&0));
7255 assert!(state.path_addresses.get(&0).unwrap().notified);
7256 }
7257
7258 #[test]
7259 fn queue_observed_address_frame_rate_limited() {
7260 let config = crate::transport_parameters::AddressDiscoveryConfig {
7261 enabled: true,
7262 max_observation_rate: 2, observe_all_paths: true,
7264 };
7265 let now = Instant::now();
7266 let mut state = AddressDiscoveryState::new(&config, now);
7267
7268 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 5000);
7270 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 5001);
7271 let addr3 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), 5002);
7272
7273 assert!(state.queue_observed_address_frame(0, addr1).is_some());
7274 assert!(state.queue_observed_address_frame(1, addr2).is_some());
7275
7276 assert!(state.queue_observed_address_frame(2, addr3).is_none());
7278 }
7279
7280 #[test]
7281 fn queue_observed_address_frame_disabled() {
7282 let config = crate::transport_parameters::AddressDiscoveryConfig {
7283 enabled: false, max_observation_rate: 10,
7285 observe_all_paths: true,
7286 };
7287 let now = Instant::now();
7288 let mut state = AddressDiscoveryState::new(&config, now);
7289
7290 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7291
7292 assert!(state.queue_observed_address_frame(0, address).is_none());
7294 }
7295
7296 #[test]
7297 fn queue_observed_address_frame_already_notified() {
7298 let config = crate::transport_parameters::AddressDiscoveryConfig {
7299 enabled: true,
7300 max_observation_rate: 10,
7301 observe_all_paths: true,
7302 };
7303 let now = Instant::now();
7304 let mut state = AddressDiscoveryState::new(&config, now);
7305
7306 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7307
7308 assert!(state.queue_observed_address_frame(0, address).is_some());
7310
7311 assert!(state.queue_observed_address_frame(0, address).is_none());
7313
7314 let new_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7316 assert!(state.queue_observed_address_frame(0, new_address).is_none());
7317 }
7318
7319 #[test]
7320 fn queue_observed_address_frame_primary_path_only() {
7321 let config = crate::transport_parameters::AddressDiscoveryConfig {
7322 enabled: true,
7323 max_observation_rate: 10,
7324 observe_all_paths: false, };
7326 let now = Instant::now();
7327 let mut state = AddressDiscoveryState::new(&config, now);
7328
7329 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7330
7331 assert!(state.queue_observed_address_frame(0, address).is_some());
7333
7334 assert!(state.queue_observed_address_frame(1, address).is_none());
7336 assert!(state.queue_observed_address_frame(2, address).is_none());
7337 }
7338
7339 #[test]
7340 fn queue_observed_address_frame_updates_path_info() {
7341 let config = crate::transport_parameters::AddressDiscoveryConfig {
7342 enabled: true,
7343 max_observation_rate: 10,
7344 observe_all_paths: true,
7345 };
7346 let now = Instant::now();
7347 let mut state = AddressDiscoveryState::new(&config, now);
7348
7349 let address = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)), 5000);
7350
7351 let frame = state.queue_observed_address_frame(0, address);
7353 assert!(frame.is_some());
7354
7355 let path_info = state.path_addresses.get(&0).unwrap();
7357 assert_eq!(path_info.observed_address, Some(address));
7358 assert!(path_info.notified);
7359
7360 assert_eq!(state.observed_addresses.len(), 0);
7363 }
7364
7365 #[test]
7366 fn retransmits_includes_observed_addresses() {
7367 use crate::connection::spaces::Retransmits;
7368
7369 let mut retransmits = Retransmits::default();
7371
7372 assert!(retransmits.observed_addresses.is_empty());
7374
7375 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7377 let frame = frame::ObservedAddress { address };
7378 retransmits.observed_addresses.push(frame);
7379
7380 assert_eq!(retransmits.observed_addresses.len(), 1);
7382 assert_eq!(retransmits.observed_addresses[0].address, address);
7383 }
7384
7385 #[test]
7386 fn check_for_address_observations_no_peer_support() {
7387 let config = crate::transport_parameters::AddressDiscoveryConfig {
7388 enabled: true,
7389 max_observation_rate: 10,
7390 observe_all_paths: false,
7391 };
7392 let now = Instant::now();
7393 let mut state = AddressDiscoveryState::new(&config, now);
7394
7395 state.path_addresses.insert(0, paths::PathAddressInfo::new());
7397 state.path_addresses.get_mut(&0).unwrap().observed_address =
7398 Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000));
7399
7400 let frames = state.check_for_address_observations(0, false, now);
7402
7403 assert!(frames.is_empty());
7405 }
7406
7407 #[test]
7408 fn check_for_address_observations_with_peer_support() {
7409 let config = crate::transport_parameters::AddressDiscoveryConfig {
7410 enabled: true,
7411 max_observation_rate: 10,
7412 observe_all_paths: false,
7413 };
7414 let now = Instant::now();
7415 let mut state = AddressDiscoveryState::new(&config, now);
7416
7417 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7419 state.path_addresses.insert(0, paths::PathAddressInfo::new());
7420 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(address);
7421
7422 let frames = state.check_for_address_observations(0, true, now);
7424
7425 assert_eq!(frames.len(), 1);
7427 assert_eq!(frames[0].address, address);
7428
7429 assert!(state.path_addresses.get(&0).unwrap().notified);
7431 }
7432
7433 #[test]
7434 fn check_for_address_observations_rate_limited() {
7435 let config = crate::transport_parameters::AddressDiscoveryConfig {
7436 enabled: true,
7437 max_observation_rate: 1, observe_all_paths: true,
7439 };
7440 let now = Instant::now();
7441 let mut state = AddressDiscoveryState::new(&config, now);
7442
7443 for i in 0..3 {
7445 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100 + i)), 5000);
7446 state.path_addresses.insert(i as u64, paths::PathAddressInfo::new());
7447 state.path_addresses.get_mut(&(i as u64)).unwrap().observed_address = Some(address);
7448 }
7449
7450 let frames1 = state.check_for_address_observations(0, true, now);
7452 assert_eq!(frames1.len(), 1);
7453
7454 let frames2 = state.check_for_address_observations(1, true, now);
7456 assert_eq!(frames2.len(), 0);
7457
7458 let later = now + Duration::from_secs(2);
7460 let frames3 = state.check_for_address_observations(1, true, later);
7461 assert_eq!(frames3.len(), 1);
7462 }
7463
7464 #[test]
7465 fn check_for_address_observations_multiple_paths() {
7466 let config = crate::transport_parameters::AddressDiscoveryConfig {
7467 enabled: true,
7468 max_observation_rate: 10,
7469 observe_all_paths: true,
7470 };
7471 let now = Instant::now();
7472 let mut state = AddressDiscoveryState::new(&config, now);
7473
7474 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7476 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7477
7478 state.path_addresses.insert(0, paths::PathAddressInfo::new());
7479 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(addr1);
7480
7481 state.path_addresses.insert(1, paths::PathAddressInfo::new());
7482 state.path_addresses.get_mut(&1).unwrap().observed_address = Some(addr2);
7483
7484 let frames = state.check_for_address_observations(0, true, now);
7486
7487 assert_eq!(frames.len(), 2);
7489
7490 let addresses: Vec<_> = frames.iter().map(|f| f.address).collect();
7492 assert!(addresses.contains(&addr1));
7493 assert!(addresses.contains(&addr2));
7494
7495 assert!(state.path_addresses.get(&0).unwrap().notified);
7497 assert!(state.path_addresses.get(&1).unwrap().notified);
7498 }
7499
7500 #[test]
7502 fn test_rate_limiter_configuration() {
7503 let state = AddressDiscoveryState::new_with_params(true, 10.0, false);
7505 assert_eq!(state.rate_limiter.rate, 10.0);
7506 assert_eq!(state.rate_limiter.max_tokens, 10.0);
7507 assert_eq!(state.rate_limiter.tokens, 10.0);
7508
7509 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
7510 assert_eq!(state.rate_limiter.rate, 63.0);
7511 assert_eq!(state.rate_limiter.max_tokens, 63.0);
7512 }
7513
7514 #[test]
7515 fn test_rate_limiter_update_configuration() {
7516 let mut state = AddressDiscoveryState::new_with_params(true, 5.0, false);
7517
7518 assert_eq!(state.rate_limiter.rate, 5.0);
7520
7521 state.update_rate_limit(10.0);
7523 assert_eq!(state.rate_limiter.rate, 10.0);
7524 assert_eq!(state.rate_limiter.max_tokens, 10.0);
7525
7526 state.rate_limiter.tokens = 15.0;
7528 state.update_rate_limit(8.0);
7529 assert_eq!(state.rate_limiter.tokens, 8.0);
7530 }
7531
7532 #[test]
7533 fn test_rate_limiter_from_transport_params() {
7534 let mut params = TransportParameters::default();
7535 params.address_discovery = Some(AddressDiscoveryConfig {
7536 enabled: true,
7537 max_observation_rate: 25,
7538 observe_all_paths: true,
7539 });
7540
7541 let state = AddressDiscoveryState::from_transport_params(¶ms);
7542 assert!(state.is_some());
7543 let state = state.unwrap();
7544 assert_eq!(state.rate_limiter.rate, 25.0);
7545 assert!(state.observe_all_paths);
7546 }
7547
7548 #[test]
7549 fn test_rate_limiter_zero_rate() {
7550 let state = AddressDiscoveryState::new_with_params(true, 0.0, false);
7551 assert_eq!(state.rate_limiter.rate, 0.0);
7552 assert_eq!(state.rate_limiter.tokens, 0.0);
7553
7554 let address = "192.168.1.1:443".parse().unwrap();
7556 let mut state = AddressDiscoveryState::new_with_params(true, 0.0, false);
7557 let frame = state.queue_observed_address_frame(0, address);
7558 assert!(frame.is_none());
7559 }
7560
7561 #[test]
7562 fn test_rate_limiter_configuration_edge_cases() {
7563 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
7565 assert_eq!(state.rate_limiter.rate, 63.0);
7566
7567 let state = AddressDiscoveryState::new_with_params(true, 100.0, false);
7569 assert_eq!(state.rate_limiter.rate, 100.0);
7571
7572 let state = AddressDiscoveryState::new_with_params(true, 2.5, false);
7574 assert_eq!(state.rate_limiter.rate, 2.0);
7576 }
7577
7578 #[test]
7579 fn test_rate_limiter_runtime_update() {
7580 let mut state = AddressDiscoveryState::new_with_params(true, 10.0, false);
7581 let now = Instant::now();
7582
7583 state.rate_limiter.tokens = 5.0;
7585
7586 state.update_rate_limit(3.0);
7588
7589 assert_eq!(state.rate_limiter.tokens, 3.0);
7591 assert_eq!(state.rate_limiter.rate, 3.0);
7592 assert_eq!(state.rate_limiter.max_tokens, 3.0);
7593
7594 let later = now + Duration::from_secs(1);
7596 state.rate_limiter.update_tokens(later);
7597
7598 assert_eq!(state.rate_limiter.tokens, 3.0);
7600 }
7601
7602 #[test]
7604 fn test_address_discovery_state_initialization_default() {
7605 let now = Instant::now();
7607 let default_config = crate::transport_parameters::AddressDiscoveryConfig::default();
7608
7609 let address_discovery_state = Some(AddressDiscoveryState::new(&default_config, now));
7612
7613 assert!(address_discovery_state.is_some());
7614 let state = address_discovery_state.unwrap();
7615
7616 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
7620 }
7621
7622 #[test]
7623 fn test_address_discovery_state_initialization_on_handshake() {
7624 let now = Instant::now();
7626
7627 let mut address_discovery_state = Some(AddressDiscoveryState::new(
7629 &crate::transport_parameters::AddressDiscoveryConfig::default(),
7630 now
7631 ));
7632
7633 let peer_params = TransportParameters {
7635 address_discovery: Some(AddressDiscoveryConfig {
7636 enabled: true,
7637 max_observation_rate: 20,
7638 observe_all_paths: true,
7639 }),
7640 ..TransportParameters::default()
7641 };
7642
7643 if let Some(peer_config) = &peer_params.address_discovery {
7645 if peer_config.enabled {
7646 address_discovery_state = Some(AddressDiscoveryState::new(peer_config, now));
7647 }
7648 }
7649
7650 assert!(address_discovery_state.is_some());
7652 let state = address_discovery_state.unwrap();
7653 assert!(state.enabled);
7654 assert_eq!(state.max_observation_rate, 20);
7655 assert!(state.observe_all_paths);
7656 }
7657
7658 #[test]
7659 fn test_address_discovery_negotiation_disabled_peer() {
7660 let now = Instant::now();
7662
7663 let our_config = AddressDiscoveryConfig {
7665 enabled: true,
7666 max_observation_rate: 30,
7667 observe_all_paths: false,
7668 };
7669 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
7670
7671 let peer_params = TransportParameters {
7673 address_discovery: None,
7674 ..TransportParameters::default()
7675 };
7676
7677 if peer_params.address_discovery.is_none() {
7679 if let Some(state) = &mut address_discovery_state {
7680 state.enabled = false;
7681 }
7682 }
7683
7684 let state = address_discovery_state.unwrap();
7686 assert!(!state.enabled); }
7688
7689 #[test]
7690 fn test_address_discovery_negotiation_rate_limiting() {
7691 let now = Instant::now();
7693
7694 let our_config = AddressDiscoveryConfig {
7696 enabled: true,
7697 max_observation_rate: 30,
7698 observe_all_paths: true,
7699 };
7700 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
7701
7702 let peer_params = TransportParameters {
7704 address_discovery: Some(AddressDiscoveryConfig {
7705 enabled: true,
7706 max_observation_rate: 15,
7707 observe_all_paths: true,
7708 }),
7709 ..TransportParameters::default()
7710 };
7711
7712 if let (Some(state), Some(peer_config)) = (&mut address_discovery_state, &peer_params.address_discovery) {
7714 if peer_config.enabled && state.enabled {
7715 let negotiated_rate = state.max_observation_rate.min(peer_config.max_observation_rate);
7717 state.update_rate_limit(negotiated_rate as f64);
7718 }
7719 }
7720
7721 let state = address_discovery_state.unwrap();
7723 assert_eq!(state.rate_limiter.rate, 15.0); }
7725
7726 #[test]
7727 fn test_address_discovery_path_initialization() {
7728 let now = Instant::now();
7730 let config = AddressDiscoveryConfig {
7731 enabled: true,
7732 max_observation_rate: 10,
7733 observe_all_paths: false,
7734 };
7735 let mut state = AddressDiscoveryState::new(&config, now);
7736
7737 assert!(state.path_addresses.is_empty());
7739
7740 let should_send = state.should_send_observation(0, now);
7742 assert!(should_send); }
7747
7748 #[test]
7749 fn test_address_discovery_multiple_path_initialization() {
7750 let now = Instant::now();
7752 let config = AddressDiscoveryConfig {
7753 enabled: true,
7754 max_observation_rate: 10,
7755 observe_all_paths: true, };
7757 let mut state = AddressDiscoveryState::new(&config, now);
7758
7759 assert!(state.should_send_observation(0, now)); assert!(state.should_send_observation(1, now)); assert!(state.should_send_observation(2, now)); let config_primary_only = AddressDiscoveryConfig {
7766 enabled: true,
7767 max_observation_rate: 10,
7768 observe_all_paths: false,
7769 };
7770 let mut state_primary = AddressDiscoveryState::new(&config_primary_only, now);
7771
7772 assert!(state_primary.should_send_observation(0, now)); assert!(!state_primary.should_send_observation(1, now)); }
7775
7776 #[test]
7777 fn test_handle_observed_address_frame_valid() {
7778 let now = Instant::now();
7780 let config = AddressDiscoveryConfig {
7781 enabled: true,
7782 max_observation_rate: 10,
7783 observe_all_paths: true,
7784 };
7785 let mut state = AddressDiscoveryState::new(&config, now);
7786
7787 let observed_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
7789 state.handle_observed_address(observed_addr, 0, now);
7790
7791 assert_eq!(state.observed_addresses.len(), 1);
7793 assert_eq!(state.observed_addresses[0].address, observed_addr);
7794 assert_eq!(state.observed_addresses[0].path_id, 0);
7795 assert_eq!(state.observed_addresses[0].received_at, now);
7796
7797 let path_info = state.path_addresses.get(&0).unwrap();
7799 assert_eq!(path_info.observed_address, Some(observed_addr));
7800 assert_eq!(path_info.last_observed, Some(now));
7801 assert_eq!(path_info.observation_count, 1);
7802 }
7803
7804 #[test]
7805 fn test_handle_multiple_observed_addresses() {
7806 let now = Instant::now();
7808 let config = AddressDiscoveryConfig {
7809 enabled: true,
7810 max_observation_rate: 10,
7811 observe_all_paths: true,
7812 };
7813 let mut state = AddressDiscoveryState::new(&config, now);
7814
7815 let addr1 = SocketAddr::from(([192, 168, 1, 100], 5000));
7817 let addr2 = SocketAddr::from(([10, 0, 0, 50], 6000));
7818 let addr3 = SocketAddr::from(([192, 168, 1, 100], 7000)); state.handle_observed_address(addr1, 0, now);
7821 state.handle_observed_address(addr2, 1, now);
7822 state.handle_observed_address(addr3, 0, now + Duration::from_millis(100));
7823
7824 assert_eq!(state.observed_addresses.len(), 3);
7826
7827 let path0_info = state.path_addresses.get(&0).unwrap();
7829 assert_eq!(path0_info.observed_address, Some(addr3));
7830 assert_eq!(path0_info.observation_count, 1); let path1_info = state.path_addresses.get(&1).unwrap();
7834 assert_eq!(path1_info.observed_address, Some(addr2));
7835 assert_eq!(path1_info.observation_count, 1);
7836 }
7837
7838 #[test]
7839 fn test_get_observed_address() {
7840 let now = Instant::now();
7842 let config = AddressDiscoveryConfig {
7843 enabled: true,
7844 max_observation_rate: 10,
7845 observe_all_paths: true,
7846 };
7847 let mut state = AddressDiscoveryState::new(&config, now);
7848
7849 assert_eq!(state.get_observed_address(0), None);
7851
7852 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
7854 state.handle_observed_address(addr, 0, now);
7855
7856 assert_eq!(state.get_observed_address(0), Some(addr));
7858
7859 assert_eq!(state.get_observed_address(999), None);
7861 }
7862
7863 #[test]
7864 fn test_has_unnotified_changes() {
7865 let now = Instant::now();
7867 let config = AddressDiscoveryConfig {
7868 enabled: true,
7869 max_observation_rate: 10,
7870 observe_all_paths: true,
7871 };
7872 let mut state = AddressDiscoveryState::new(&config, now);
7873
7874 assert!(!state.has_unnotified_changes());
7876
7877 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
7879 state.handle_observed_address(addr, 0, now);
7880 assert!(state.has_unnotified_changes());
7881
7882 if let Some(path_info) = state.path_addresses.get_mut(&0) {
7884 path_info.notified = true;
7885 }
7886 assert!(!state.has_unnotified_changes());
7887
7888 let addr2 = SocketAddr::from(([192, 168, 1, 100], 6000));
7890 state.handle_observed_address(addr2, 0, now + Duration::from_secs(1));
7891 assert!(state.has_unnotified_changes());
7892 }
7893
7894 #[test]
7895 fn test_address_discovery_disabled() {
7896 let now = Instant::now();
7898 let config = AddressDiscoveryConfig {
7899 enabled: false, max_observation_rate: 10,
7901 observe_all_paths: true,
7902 };
7903 let mut state = AddressDiscoveryState::new(&config, now);
7904
7905 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
7907 state.handle_observed_address(addr, 0, now);
7908
7909 assert_eq!(state.observed_addresses.len(), 0);
7911 assert_eq!(state.path_addresses.len(), 0);
7912
7913 assert!(!state.should_send_observation(0, now));
7915 }
7916
7917 #[test]
7918 fn test_rate_limiting_basic() {
7919 let now = Instant::now();
7921 let config = AddressDiscoveryConfig {
7922 enabled: true,
7923 max_observation_rate: 2, observe_all_paths: true,
7925 };
7926 let mut state = AddressDiscoveryState::new(&config, now);
7927
7928 assert!(state.should_send_observation(0, now));
7930 state.record_observation_sent(0);
7932
7933 assert!(state.should_send_observation(1, now));
7935 state.record_observation_sent(1);
7936
7937 assert!(!state.should_send_observation(2, now));
7939
7940 let later = now + Duration::from_millis(500);
7942 assert!(state.should_send_observation(3, later));
7943 state.record_observation_sent(3);
7944
7945 assert!(!state.should_send_observation(4, later));
7947
7948 let _one_sec_later = now + Duration::from_secs(1);
7952 let two_sec_later = now + Duration::from_secs(2);
7956 assert!(state.should_send_observation(5, two_sec_later));
7957 state.record_observation_sent(5);
7958
7959 assert!(state.should_send_observation(6, two_sec_later));
7970 state.record_observation_sent(6);
7971
7972 assert!(!state.should_send_observation(7, two_sec_later), "Expected no tokens available");
7974 }
7975
7976 #[test]
7977 fn test_rate_limiting_per_path() {
7978 let now = Instant::now();
7980 let config = AddressDiscoveryConfig {
7981 enabled: true,
7982 max_observation_rate: 1, observe_all_paths: true,
7984 };
7985 let mut state = AddressDiscoveryState::new(&config, now);
7986
7987 assert!(state.should_send_observation(0, now));
7989 state.record_observation_sent(0);
7990
7991 assert!(!state.should_send_observation(0, now));
7993
7994 assert!(!state.should_send_observation(1, now));
7996
7997 let later = now + Duration::from_secs(1);
7999 assert!(state.should_send_observation(1, later));
8000 state.record_observation_sent(1);
8001
8002 assert!(!state.should_send_observation(2, later));
8004 }
8005
8006 #[test]
8007 fn test_rate_limiting_zero_rate() {
8008 let now = Instant::now();
8010 let config = AddressDiscoveryConfig {
8011 enabled: true,
8012 max_observation_rate: 0, observe_all_paths: true,
8014 };
8015 let mut state = AddressDiscoveryState::new(&config, now);
8016
8017 assert!(!state.should_send_observation(0, now));
8019 assert!(!state.should_send_observation(0, now + Duration::from_secs(10)));
8020 assert!(!state.should_send_observation(0, now + Duration::from_secs(100)));
8021 }
8022
8023 #[test]
8024 fn test_rate_limiting_update() {
8025 let now = Instant::now();
8027 let config = AddressDiscoveryConfig {
8028 enabled: true,
8029 max_observation_rate: 1, observe_all_paths: true,
8031 };
8032 let mut state = AddressDiscoveryState::new(&config, now);
8033
8034 assert!(state.should_send_observation(0, now));
8036 state.record_observation_sent(0);
8037 assert!(!state.should_send_observation(1, now));
8039
8040 state.update_rate_limit(10.0);
8042
8043 let later = now + Duration::from_millis(100);
8046 assert!(state.should_send_observation(1, later));
8047 state.record_observation_sent(1);
8048
8049 let later2 = now + Duration::from_millis(200);
8051 assert!(state.should_send_observation(2, later2));
8052 }
8053
8054 #[test]
8055 fn test_rate_limiting_burst() {
8056 let now = Instant::now();
8058 let config = AddressDiscoveryConfig {
8059 enabled: true,
8060 max_observation_rate: 10, observe_all_paths: true,
8062 };
8063 let mut state = AddressDiscoveryState::new(&config, now);
8064
8065 for _ in 0..10 {
8067 assert!(state.should_send_observation(0, now));
8068 state.record_observation_sent(0);
8069 }
8070
8071 assert!(!state.should_send_observation(0, now));
8073
8074 let later = now + Duration::from_millis(100);
8076 assert!(state.should_send_observation(0, later));
8077 state.record_observation_sent(0);
8078 assert!(!state.should_send_observation(0, later));
8079 }
8080
8081 #[test]
8082 fn test_connection_rate_limiting_with_check_observations() {
8083 let now = Instant::now();
8085 let config = AddressDiscoveryConfig {
8086 enabled: true,
8087 max_observation_rate: 2, observe_all_paths: false,
8089 };
8090 let mut state = AddressDiscoveryState::new(&config, now);
8091
8092 let mut path_info = paths::PathAddressInfo::new();
8094 path_info.update_observed_address(
8095 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
8096 now
8097 );
8098 state.path_addresses.insert(0, path_info);
8099
8100 let frame1 = state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8102 assert!(frame1.is_some());
8103 state.record_observation_sent(0);
8104
8105 if let Some(info) = state.path_addresses.get_mut(&0) {
8107 info.notified = false;
8108 }
8109
8110 let frame2 = state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8112 assert!(frame2.is_some());
8113 state.record_observation_sent(0);
8114
8115 if let Some(info) = state.path_addresses.get_mut(&0) {
8117 info.notified = false;
8118 }
8119
8120 let frame3 = state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8122 assert!(frame3.is_none()); let later = now + Duration::from_millis(500);
8126 state.rate_limiter.update_tokens(later); if let Some(info) = state.path_addresses.get_mut(&0) {
8130 info.notified = false;
8131 }
8132
8133 let frame4 = state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8134 assert!(frame4.is_some()); }
8136
8137 #[test]
8138 fn test_queue_observed_address_frame() {
8139 let now = Instant::now();
8141 let config = AddressDiscoveryConfig {
8142 enabled: true,
8143 max_observation_rate: 5,
8144 observe_all_paths: true,
8145 };
8146 let mut state = AddressDiscoveryState::new(&config, now);
8147
8148 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8149
8150 let frame = state.queue_observed_address_frame(0, addr);
8152 assert!(frame.is_some());
8153 assert_eq!(frame.unwrap().address, addr);
8154
8155 state.record_observation_sent(0);
8157
8158 for i in 0..4 {
8160 if let Some(info) = state.path_addresses.get_mut(&0) {
8162 info.notified = false;
8163 }
8164
8165 let frame = state.queue_observed_address_frame(0, addr);
8166 assert!(frame.is_some(), "Frame {} should be allowed", i + 2);
8167 state.record_observation_sent(0);
8168 }
8169
8170 if let Some(info) = state.path_addresses.get_mut(&0) {
8172 info.notified = false;
8173 }
8174
8175 let frame = state.queue_observed_address_frame(0, addr);
8177 assert!(frame.is_none(), "6th frame should be rate limited");
8178 }
8179
8180 #[test]
8181 fn test_multi_path_basic() {
8182 let now = Instant::now();
8184 let config = AddressDiscoveryConfig {
8185 enabled: true,
8186 max_observation_rate: 10,
8187 observe_all_paths: true,
8188 };
8189 let mut state = AddressDiscoveryState::new(&config, now);
8190
8191 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8192 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8193 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8194
8195 state.handle_observed_address(addr1, 0, now);
8197 state.handle_observed_address(addr2, 1, now);
8198 state.handle_observed_address(addr3, 2, now);
8199
8200 assert_eq!(state.get_observed_address(0), Some(addr1));
8202 assert_eq!(state.get_observed_address(1), Some(addr2));
8203 assert_eq!(state.get_observed_address(2), Some(addr3));
8204
8205 assert!(state.has_unnotified_changes());
8207
8208 assert_eq!(state.observed_addresses.len(), 3);
8210 }
8211
8212 #[test]
8213 fn test_multi_path_observe_primary_only() {
8214 let now = Instant::now();
8216 let config = AddressDiscoveryConfig {
8217 enabled: true,
8218 max_observation_rate: 10,
8219 observe_all_paths: false, };
8221 let mut state = AddressDiscoveryState::new(&config, now);
8222
8223 assert!(state.should_send_observation(0, now));
8225 state.record_observation_sent(0);
8226
8227 assert!(!state.should_send_observation(1, now));
8229 assert!(!state.should_send_observation(2, now));
8230
8231 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8233 assert!(state.queue_observed_address_frame(0, addr).is_some());
8234 assert!(state.queue_observed_address_frame(1, addr).is_none());
8235 assert!(state.queue_observed_address_frame(2, addr).is_none());
8236 }
8237
8238 #[test]
8239 fn test_multi_path_rate_limiting() {
8240 let now = Instant::now();
8242 let config = AddressDiscoveryConfig {
8243 enabled: true,
8244 max_observation_rate: 3, observe_all_paths: true,
8246 };
8247 let mut state = AddressDiscoveryState::new(&config, now);
8248
8249 assert!(state.should_send_observation(0, now));
8251 state.record_observation_sent(0);
8252
8253 assert!(state.should_send_observation(1, now));
8254 state.record_observation_sent(1);
8255
8256 assert!(state.should_send_observation(2, now));
8257 state.record_observation_sent(2);
8258
8259 assert!(!state.should_send_observation(3, now));
8261 assert!(!state.should_send_observation(0, now)); let later = now + Duration::from_secs(1);
8265 assert!(state.should_send_observation(4, later));
8266 assert!(state.should_send_observation(5, later));
8267 assert!(state.should_send_observation(6, later));
8268 }
8269
8270 #[test]
8271 fn test_multi_path_address_changes() {
8272 let now = Instant::now();
8274 let config = AddressDiscoveryConfig {
8275 enabled: true,
8276 max_observation_rate: 10,
8277 observe_all_paths: true,
8278 };
8279 let mut state = AddressDiscoveryState::new(&config, now);
8280
8281 let addr1a = SocketAddr::from(([192, 168, 1, 1], 5000));
8282 let addr1b = SocketAddr::from(([192, 168, 1, 2], 5000));
8283 let addr2a = SocketAddr::from(([10, 0, 0, 1], 6000));
8284 let addr2b = SocketAddr::from(([10, 0, 0, 2], 6000));
8285
8286 state.handle_observed_address(addr1a, 0, now);
8288 state.handle_observed_address(addr2a, 1, now);
8289
8290 state.record_observation_sent(0);
8292 state.record_observation_sent(1);
8293 assert!(!state.has_unnotified_changes());
8294
8295 state.handle_observed_address(addr1b, 0, now + Duration::from_secs(1));
8297 assert!(state.has_unnotified_changes());
8298
8299 assert_eq!(state.get_observed_address(0), Some(addr1b));
8301 assert_eq!(state.get_observed_address(1), Some(addr2a));
8302
8303 state.record_observation_sent(0);
8305 assert!(!state.has_unnotified_changes());
8306
8307 state.handle_observed_address(addr2b, 1, now + Duration::from_secs(2));
8309 assert!(state.has_unnotified_changes());
8310 }
8311
8312 #[test]
8313 fn test_multi_path_migration() {
8314 let now = Instant::now();
8316 let config = AddressDiscoveryConfig {
8317 enabled: true,
8318 max_observation_rate: 10,
8319 observe_all_paths: true,
8320 };
8321 let mut state = AddressDiscoveryState::new(&config, now);
8322
8323 let addr_old = SocketAddr::from(([192, 168, 1, 1], 5000));
8324 let addr_new = SocketAddr::from(([10, 0, 0, 1], 6000));
8325
8326 state.handle_observed_address(addr_old, 0, now);
8328 assert_eq!(state.get_observed_address(0), Some(addr_old));
8329
8330 state.handle_observed_address(addr_new, 1, now + Duration::from_secs(1));
8332
8333 assert_eq!(state.get_observed_address(0), Some(addr_old));
8335 assert_eq!(state.get_observed_address(1), Some(addr_new));
8336
8337 assert_eq!(state.path_addresses.len(), 2);
8340 }
8341
8342 #[test]
8343 fn test_check_for_address_observations_multi_path() {
8344 let now = Instant::now();
8346 let config = AddressDiscoveryConfig {
8347 enabled: true,
8348 max_observation_rate: 10,
8349 observe_all_paths: true,
8350 };
8351 let mut state = AddressDiscoveryState::new(&config, now);
8352
8353 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8355 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8356 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8357
8358 state.handle_observed_address(addr1, 0, now);
8359 state.handle_observed_address(addr2, 1, now);
8360 state.handle_observed_address(addr3, 2, now);
8361
8362 let frames = state.check_for_address_observations(0, true, now);
8364
8365 assert_eq!(frames.len(), 3);
8367
8368 assert_eq!(frames[0].address, addr1);
8371 assert_eq!(frames[1].address, addr2);
8372 assert_eq!(frames[2].address, addr3);
8373
8374 let frame_addrs: Vec<_> = frames.iter().map(|f| f.address).collect();
8376 assert!(frame_addrs.contains(&addr1));
8377 assert!(frame_addrs.contains(&addr2));
8378 assert!(frame_addrs.contains(&addr3));
8379
8380 assert!(!state.has_unnotified_changes());
8382 }
8383
8384 #[test]
8385 fn test_multi_path_with_peer_not_supporting() {
8386 let now = Instant::now();
8388 let config = AddressDiscoveryConfig {
8389 enabled: true,
8390 max_observation_rate: 10,
8391 observe_all_paths: true,
8392 };
8393 let mut state = AddressDiscoveryState::new(&config, now);
8394
8395 state.handle_observed_address(SocketAddr::from(([192, 168, 1, 1], 5000)), 0, now);
8397 state.handle_observed_address(SocketAddr::from(([10, 0, 0, 1], 6000)), 1, now);
8398
8399 let frames = state.check_for_address_observations(0, false, now);
8401 assert_eq!(frames.len(), 0);
8402
8403 assert!(state.has_unnotified_changes());
8405 }
8406
8407 #[test]
8409 fn test_bootstrap_node_aggressive_observation_mode() {
8410 let config = AddressDiscoveryConfig {
8412 enabled: true,
8413 max_observation_rate: 10,
8414 observe_all_paths: false,
8415 };
8416 let now = Instant::now();
8417 let mut state = AddressDiscoveryState::new(&config, now);
8418
8419 assert!(!state.is_bootstrap_mode());
8421
8422 state.set_bootstrap_mode(true);
8424 assert!(state.is_bootstrap_mode());
8425
8426 assert!(state.should_observe_path(0)); assert!(state.should_observe_path(1)); assert!(state.should_observe_path(2));
8430
8431 let bootstrap_rate = state.get_effective_rate_limit();
8433 assert!(bootstrap_rate > 10.0); }
8435
8436 #[test]
8437 fn test_bootstrap_node_immediate_observation() {
8438 let config = AddressDiscoveryConfig {
8440 enabled: true,
8441 max_observation_rate: 10,
8442 observe_all_paths: false,
8443 };
8444 let now = Instant::now();
8445 let mut state = AddressDiscoveryState::new(&config, now);
8446 state.set_bootstrap_mode(true);
8447
8448 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8450 state.handle_observed_address(addr, 0, now);
8451
8452 assert!(state.should_send_observation_immediately(true));
8454
8455 assert!(state.should_send_observation(0, now));
8457
8458 let frame = state.queue_observed_address_frame(0, addr);
8460 assert!(frame.is_some());
8461 }
8462
8463 #[test]
8464 fn test_bootstrap_node_multiple_path_observations() {
8465 let config = AddressDiscoveryConfig {
8467 enabled: true,
8468 max_observation_rate: 5, observe_all_paths: false, };
8471 let now = Instant::now();
8472 let mut state = AddressDiscoveryState::new(&config, now);
8473 state.set_bootstrap_mode(true);
8474
8475 let addrs = vec![
8477 (0, SocketAddr::from(([192, 168, 1, 1], 5000))),
8478 (1, SocketAddr::from(([10, 0, 0, 1], 6000))),
8479 (2, SocketAddr::from(([172, 16, 0, 1], 7000))),
8480 ];
8481
8482 for (path_id, addr) in &addrs {
8483 state.handle_observed_address(*addr, *path_id, now);
8484 }
8485
8486 let frames = state.check_for_address_observations(0, true, now);
8488 assert_eq!(frames.len(), 3);
8489
8490 for (_, addr) in &addrs {
8492 assert!(frames.iter().any(|f| f.address == *addr));
8493 }
8494 }
8495
8496 #[test]
8497 fn test_bootstrap_node_rate_limit_override() {
8498 let config = AddressDiscoveryConfig {
8500 enabled: true,
8501 max_observation_rate: 1, observe_all_paths: false,
8503 };
8504 let now = Instant::now();
8505 let mut state = AddressDiscoveryState::new(&config, now);
8506 state.set_bootstrap_mode(true);
8507
8508 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8510
8511 for i in 0..10 {
8513 state.handle_observed_address(addr, i, now);
8514 let can_send = state.should_send_observation(i, now);
8515 assert!(can_send, "Bootstrap node should send observation {}", i);
8516 state.record_observation_sent(i);
8517 }
8518 }
8519
8520 #[test]
8521 fn test_bootstrap_node_configuration() {
8522 let mut config = AddressDiscoveryConfig {
8524 enabled: true,
8525 max_observation_rate: 10,
8526 observe_all_paths: false,
8527 };
8528
8529 config.apply_bootstrap_settings();
8531
8532 assert_eq!(config.max_observation_rate, 63); assert!(config.observe_all_paths);
8535 assert!(config.enabled);
8536 }
8537
8538 #[test]
8539 fn test_bootstrap_role_detection() {
8540 use crate::transport_parameters::{NatTraversalRole, NatTraversalConfig};
8541 use crate::VarInt;
8542
8543 let nat_config = NatTraversalConfig::new(
8545 NatTraversalRole::Bootstrap,
8546 VarInt::from_u32(8),
8547 VarInt::from_u32(10000),
8548 VarInt::from_u32(3),
8549 None
8550 );
8551
8552 let addr_discovery_config = AddressDiscoveryConfig {
8555 enabled: true,
8556 max_observation_rate: 10,
8557 observe_all_paths: false,
8558 };
8559
8560 let mut state = AddressDiscoveryState::new(&addr_discovery_config, Instant::now());
8561
8562 if matches!(nat_config.role(), NatTraversalRole::Bootstrap) {
8564 state.set_bootstrap_mode(true);
8565 }
8566
8567 assert!(state.is_bootstrap_mode());
8568 }
8569
8570 #[test]
8571 fn test_bootstrap_node_persistent_observation() {
8572 let config = AddressDiscoveryConfig {
8574 enabled: true,
8575 max_observation_rate: 10,
8576 observe_all_paths: false,
8577 };
8578 let mut now = Instant::now();
8579 let mut state = AddressDiscoveryState::new(&config, now);
8580 state.set_bootstrap_mode(true);
8581
8582 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8583 let addr2 = SocketAddr::from(([192, 168, 1, 2], 5000));
8584
8585 state.handle_observed_address(addr1, 0, now);
8587 assert!(state.should_send_observation(0, now));
8588 state.record_observation_sent(0);
8589
8590 now += Duration::from_secs(60);
8592 state.handle_observed_address(addr2, 0, now);
8593
8594 assert!(state.should_send_observation(0, now));
8596 }
8597
8598 #[test]
8599 fn test_bootstrap_node_multi_peer_support() {
8600 let config = AddressDiscoveryConfig {
8603 enabled: true,
8604 max_observation_rate: 50, observe_all_paths: true,
8606 };
8607 let now = Instant::now();
8608 let mut state = AddressDiscoveryState::new(&config, now);
8609 state.set_bootstrap_mode(true);
8610
8611 let peer_addresses = vec![
8613 (0, SocketAddr::from(([192, 168, 1, 1], 5000))), (1, SocketAddr::from(([10, 0, 0, 1], 6000))), (2, SocketAddr::from(([172, 16, 0, 1], 7000))), (3, SocketAddr::from(([192, 168, 2, 1], 8000))), ];
8618
8619 for (path_id, addr) in &peer_addresses {
8621 state.handle_observed_address(*addr, *path_id, now);
8622 }
8623
8624 let frames = state.check_for_address_observations(0, true, now);
8626 assert_eq!(frames.len(), peer_addresses.len());
8627
8628 for (_, addr) in &peer_addresses {
8630 assert!(frames.iter().any(|f| f.address == *addr));
8631 }
8632 }
8633
8634 mod address_discovery_tests {
8636 include!("address_discovery_tests.rs");
8637 }
8638}