1#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
9use std::{
10 cmp,
11 collections::VecDeque,
12 convert::TryFrom,
13 fmt, io, mem,
14 net::{IpAddr, SocketAddr},
15 sync::Arc,
16};
17
18use bytes::{Bytes, BytesMut};
19use frame::StreamMetaVec;
20use rand::{Rng, SeedableRng, rngs::StdRng};
23use thiserror::Error;
24use tracing::{debug, error, info, trace, trace_span, warn};
25
26use crate::{
27 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
28 MIN_INITIAL_SIZE, MtuDiscoveryConfig, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
29 TransportError, TransportErrorCode, VarInt,
30 cid_generator::ConnectionIdGenerator,
31 cid_queue::CidQueue,
32 coding::BufMutExt,
33 config::{ServerConfig, TransportConfig},
34 crypto::{self, KeyPair, Keys, PacketKey},
35 endpoint::AddressDiscoveryStats,
36 frame::{self, Close, Datagram, FrameStruct, NewToken},
37 nat_traversal_api::PeerId,
38 packet::{
39 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
40 PacketNumber, PartialDecode, SpaceId,
41 },
42 range_set::ArrayRangeSet,
43 shared::{
44 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
45 EndpointEvent, EndpointEventInner,
46 },
47 token::{ResetToken, Token, TokenPayload},
48 transport_parameters::TransportParameters,
49};
50
51mod ack_frequency;
52use ack_frequency::AckFrequencyState;
53
54pub(crate) mod nat_traversal;
55use nat_traversal::NatTraversalState;
56pub(crate) use nat_traversal::{CoordinationPhase, NatTraversalError};
58
59mod assembler;
60pub use assembler::Chunk;
61
62mod cid_state;
63use cid_state::CidState;
64
65mod datagrams;
66use datagrams::DatagramState;
67pub use datagrams::{Datagrams, SendDatagramError};
68
69mod mtud;
70use mtud::MtuDiscovery;
71
72mod pacing;
73
74mod packet_builder;
75use packet_builder::PacketBuilder;
76
77mod packet_crypto;
78use packet_crypto::{PrevCrypto, ZeroRttCrypto};
79
80mod paths;
81pub use paths::RttEstimator;
82use paths::{NatTraversalChallenges, PathData, PathResponses};
83
84mod send_buffer;
85
86mod spaces;
87#[cfg(fuzzing)]
88pub use spaces::Retransmits;
89#[cfg(not(fuzzing))]
90use spaces::Retransmits;
91use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
92
93mod stats;
94pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
95
96mod streams;
97#[cfg(fuzzing)]
98pub use streams::StreamsState;
99#[cfg(not(fuzzing))]
100use streams::StreamsState;
101pub use streams::{
102 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
103 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
104};
105
106mod timer;
107use crate::congestion::Controller;
108use timer::{Timer, TimerTable};
109
110pub struct Connection {
150 endpoint_config: Arc<EndpointConfig>,
151 config: Arc<TransportConfig>,
152 rng: StdRng,
153 crypto: Box<dyn crypto::Session>,
154 handshake_cid: ConnectionId,
156 rem_handshake_cid: ConnectionId,
158 local_ip: Option<IpAddr>,
161 path: PathData,
162 allow_mtud: bool,
164 prev_path: Option<(ConnectionId, PathData)>,
165 state: State,
166 side: ConnectionSide,
167 zero_rtt_enabled: bool,
169 zero_rtt_crypto: Option<ZeroRttCrypto>,
171 key_phase: bool,
172 key_phase_size: u64,
174 peer_params: TransportParameters,
176 orig_rem_cid: ConnectionId,
178 initial_dst_cid: ConnectionId,
180 retry_src_cid: Option<ConnectionId>,
183 lost_packets: u64,
185 events: VecDeque<Event>,
186 endpoint_events: VecDeque<EndpointEventInner>,
187 spin_enabled: bool,
189 spin: bool,
191 spaces: [PacketSpace; 3],
193 highest_space: SpaceId,
195 prev_crypto: Option<PrevCrypto>,
197 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
202 accepted_0rtt: bool,
203 permit_idle_reset: bool,
205 idle_timeout: Option<Duration>,
207 timers: TimerTable,
208 authentication_failures: u64,
210 error: Option<ConnectionError>,
212 packet_number_filter: PacketNumberFilter,
214
215 path_responses: PathResponses,
220 nat_traversal_challenges: NatTraversalChallenges,
222 close: bool,
223
224 ack_frequency: AckFrequencyState,
228
229 pto_count: u32,
234
235 receiving_ecn: bool,
240 total_authed_packets: u64,
242 app_limited: bool,
245
246 streams: StreamsState,
247 rem_cids: CidQueue,
249 local_cid_state: CidState,
251 datagrams: DatagramState,
253 stats: ConnectionStats,
255 version: u32,
257
258 nat_traversal: Option<NatTraversalState>,
260
261 nat_traversal_frame_config: frame::nat_traversal_unified::NatTraversalFrameConfig,
263
264 address_discovery_state: Option<AddressDiscoveryState>,
266
267 pqc_state: PqcState,
269
270 #[cfg(feature = "trace")]
272 trace_context: crate::tracing::TraceContext,
273
274 #[cfg(feature = "trace")]
276 event_log: Arc<crate::tracing::EventLog>,
277
278 #[cfg(feature = "__qlog")]
280 qlog_streamer: Option<Box<dyn std::io::Write + Send + Sync>>,
281
282 peer_id_for_tokens: Option<PeerId>,
284 delay_new_token_until_binding: bool,
287}
288
289impl Connection {
290 pub(crate) fn new(
291 endpoint_config: Arc<EndpointConfig>,
292 config: Arc<TransportConfig>,
293 init_cid: ConnectionId,
294 loc_cid: ConnectionId,
295 rem_cid: ConnectionId,
296 remote: SocketAddr,
297 local_ip: Option<IpAddr>,
298 crypto: Box<dyn crypto::Session>,
299 cid_gen: &dyn ConnectionIdGenerator,
300 now: Instant,
301 version: u32,
302 allow_mtud: bool,
303 rng_seed: [u8; 32],
304 side_args: SideArgs,
305 ) -> Self {
306 let pref_addr_cid = side_args.pref_addr_cid();
307 let path_validated = side_args.path_validated();
308 let connection_side = ConnectionSide::from(side_args);
309 let side = connection_side.side();
310 let initial_space = PacketSpace {
311 crypto: Some(crypto.initial_keys(&init_cid, side)),
312 ..PacketSpace::new(now)
313 };
314 let state = State::Handshake(state::Handshake {
315 rem_cid_set: side.is_server(),
316 expected_token: Bytes::new(),
317 client_hello: None,
318 });
319 let mut rng = StdRng::from_seed(rng_seed);
320 let mut this = Self {
321 endpoint_config,
322 crypto,
323 handshake_cid: loc_cid,
324 rem_handshake_cid: rem_cid,
325 local_cid_state: CidState::new(
326 cid_gen.cid_len(),
327 cid_gen.cid_lifetime(),
328 now,
329 if pref_addr_cid.is_some() { 2 } else { 1 },
330 ),
331 path: PathData::new(remote, allow_mtud, None, now, &config),
332 allow_mtud,
333 local_ip,
334 prev_path: None,
335 state,
336 side: connection_side,
337 zero_rtt_enabled: false,
338 zero_rtt_crypto: None,
339 key_phase: false,
340 key_phase_size: rng.gen_range(10..1000),
347 peer_params: TransportParameters::default(),
348 orig_rem_cid: rem_cid,
349 initial_dst_cid: init_cid,
350 retry_src_cid: None,
351 lost_packets: 0,
352 events: VecDeque::new(),
353 endpoint_events: VecDeque::new(),
354 spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
355 spin: false,
356 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
357 highest_space: SpaceId::Initial,
358 prev_crypto: None,
359 next_crypto: None,
360 accepted_0rtt: false,
361 permit_idle_reset: true,
362 idle_timeout: match config.max_idle_timeout {
363 None | Some(VarInt(0)) => None,
364 Some(dur) => Some(Duration::from_millis(dur.0)),
365 },
366 timers: TimerTable::default(),
367 authentication_failures: 0,
368 error: None,
369 #[cfg(test)]
370 packet_number_filter: match config.deterministic_packet_numbers {
371 false => PacketNumberFilter::new(&mut rng),
372 true => PacketNumberFilter::disabled(),
373 },
374 #[cfg(not(test))]
375 packet_number_filter: PacketNumberFilter::new(&mut rng),
376
377 path_responses: PathResponses::default(),
378 nat_traversal_challenges: NatTraversalChallenges::default(),
379 close: false,
380
381 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
382 &TransportParameters::default(),
383 )),
384
385 pto_count: 0,
386
387 app_limited: false,
388 receiving_ecn: false,
389 total_authed_packets: 0,
390
391 streams: StreamsState::new(
392 side,
393 config.max_concurrent_uni_streams,
394 config.max_concurrent_bidi_streams,
395 config.send_window,
396 config.receive_window,
397 config.stream_receive_window,
398 ),
399 datagrams: DatagramState::default(),
400 config,
401 rem_cids: CidQueue::new(rem_cid),
402 rng,
403 stats: ConnectionStats::default(),
404 version,
405 nat_traversal: None, nat_traversal_frame_config:
407 frame::nat_traversal_unified::NatTraversalFrameConfig::default(),
408 address_discovery_state: {
409 Some(AddressDiscoveryState::new(
412 &crate::transport_parameters::AddressDiscoveryConfig::default(),
413 now,
414 ))
415 },
416 pqc_state: PqcState::new(),
417
418 #[cfg(feature = "trace")]
419 trace_context: crate::tracing::TraceContext::new(crate::tracing::TraceId::new()),
420
421 #[cfg(feature = "trace")]
422 event_log: crate::tracing::global_log(),
423
424 #[cfg(feature = "__qlog")]
425 qlog_streamer: None,
426
427 peer_id_for_tokens: None,
428 delay_new_token_until_binding: false,
429 };
430
431 #[cfg(feature = "trace")]
433 {
434 use crate::trace_event;
435 use crate::tracing::{Event, EventData, socket_addr_to_bytes, timestamp_now};
436 let _peer_id = {
438 let mut id = [0u8; 32];
439 let addr_bytes = match remote {
440 SocketAddr::V4(addr) => addr.ip().octets().to_vec(),
441 SocketAddr::V6(addr) => addr.ip().octets().to_vec(),
442 };
443 id[..addr_bytes.len().min(32)]
444 .copy_from_slice(&addr_bytes[..addr_bytes.len().min(32)]);
445 id
446 };
447
448 let (addr_bytes, addr_type) = socket_addr_to_bytes(remote);
449 trace_event!(
450 &this.event_log,
451 Event {
452 timestamp: timestamp_now(),
453 trace_id: this.trace_context.trace_id(),
454 sequence: 0,
455 _padding: 0,
456 node_id: [0u8; 32], event_data: EventData::ConnInit {
458 endpoint_bytes: addr_bytes,
459 addr_type,
460 _padding: [0u8; 45],
461 },
462 }
463 );
464 }
465
466 if path_validated {
467 this.on_path_validated();
468 }
469 if side.is_client() {
470 this.write_crypto();
472 this.init_0rtt();
473 }
474 this
475 }
476
477 #[cfg(feature = "__qlog")]
479 pub fn set_qlog(
480 &mut self,
481 writer: Box<dyn std::io::Write + Send + Sync>,
482 _title: Option<String>,
483 _description: Option<String>,
484 _now: Instant,
485 ) {
486 self.qlog_streamer = Some(writer);
487 }
488
489 #[cfg(feature = "__qlog")]
491 fn emit_qlog_recovery_metrics(&mut self, _now: Instant) {
492 }
495
496 #[must_use]
504 pub fn poll_timeout(&mut self) -> Option<Instant> {
505 let mut next_timeout = self.timers.next_timeout();
506
507 if let Some(nat_state) = &self.nat_traversal {
509 if let Some(nat_timeout) = nat_state.get_next_timeout(Instant::now()) {
510 self.timers.set(Timer::NatTraversal, nat_timeout);
512 next_timeout = Some(next_timeout.map_or(nat_timeout, |t| t.min(nat_timeout)));
513 }
514 }
515
516 next_timeout
517 }
518
519 #[must_use]
525 pub fn poll(&mut self) -> Option<Event> {
526 if let Some(x) = self.events.pop_front() {
527 return Some(x);
528 }
529
530 if let Some(event) = self.streams.poll() {
531 return Some(Event::Stream(event));
532 }
533
534 if let Some(err) = self.error.take() {
535 return Some(Event::ConnectionLost { reason: err });
536 }
537
538 None
539 }
540
541 #[must_use]
543 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
544 self.endpoint_events.pop_front().map(EndpointEvent)
545 }
546
547 #[must_use]
549 pub fn streams(&mut self) -> Streams<'_> {
550 Streams {
551 state: &mut self.streams,
552 conn_state: &self.state,
553 }
554 }
555
556 #[must_use]
560 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
561 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
562 RecvStream {
563 id,
564 state: &mut self.streams,
565 pending: &mut self.spaces[SpaceId::Data].pending,
566 }
567 }
568
569 #[must_use]
571 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
572 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
573 SendStream {
574 id,
575 state: &mut self.streams,
576 pending: &mut self.spaces[SpaceId::Data].pending,
577 conn_state: &self.state,
578 }
579 }
580
581 #[must_use]
591 pub fn poll_transmit(
592 &mut self,
593 now: Instant,
594 max_datagrams: usize,
595 buf: &mut Vec<u8>,
596 ) -> Option<Transmit> {
597 assert!(max_datagrams != 0);
598 let max_datagrams = match self.config.enable_segmentation_offload {
599 false => 1,
600 true => max_datagrams,
601 };
602
603 let mut num_datagrams = 0;
604 let mut datagram_start = 0;
607 let mut segment_size = usize::from(self.path.current_mtu());
608
609 if let Some(nat_traversal) = &mut self.nat_traversal {
611 if nat_traversal.check_coordination_timeout(now) {
612 trace!("NAT traversal coordination timed out, may retry");
613 }
614 }
615
616 if let Some(challenge) = self.send_nat_traversal_challenge(now, buf) {
618 return Some(challenge);
619 }
620
621 if let Some(challenge) = self.send_path_challenge(now, buf) {
622 return Some(challenge);
623 }
624
625 for space in SpaceId::iter() {
627 let request_immediate_ack =
628 space == SpaceId::Data && self.peer_supports_ack_frequency();
629 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
630 }
631
632 let close = match self.state {
634 State::Drained => {
635 self.app_limited = true;
636 return None;
637 }
638 State::Draining | State::Closed(_) => {
639 if !self.close {
642 self.app_limited = true;
643 return None;
644 }
645 true
646 }
647 _ => false,
648 };
649
650 if let Some(config) = &self.config.ack_frequency_config {
652 self.spaces[SpaceId::Data].pending.ack_frequency = self
653 .ack_frequency
654 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
655 && self.highest_space == SpaceId::Data
656 && self.peer_supports_ack_frequency();
657 }
658
659 let mut buf_capacity = 0;
663
664 let mut coalesce = true;
665 let mut builder_storage: Option<PacketBuilder> = None;
666 let mut sent_frames = None;
667 let mut pad_datagram = false;
668 let mut pad_datagram_to_mtu = false;
669 let mut congestion_blocked = false;
670
671 let mut space_idx = 0;
673 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
674 while space_idx < spaces.len() {
677 let space_id = spaces[space_idx];
678 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
685 let frame_space_1rtt =
686 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
687
688 let can_send = self.space_can_send(space_id, frame_space_1rtt);
690 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
691 space_idx += 1;
692 continue;
693 }
694
695 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
696 || self.spaces[space_id].ping_pending
697 || self.spaces[space_id].immediate_ack_pending;
698 if space_id == SpaceId::Data {
699 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
700 }
701
702 pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
703
704 let buf_end = if let Some(builder) = &builder_storage {
708 buf.len().max(builder.min_size) + builder.tag_len
709 } else {
710 buf.len()
711 };
712
713 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
714 crypto.packet.local.tag_len()
715 } else if space_id == SpaceId::Data {
716 match self.zero_rtt_crypto.as_ref() {
717 Some(crypto) => crypto.packet.tag_len(),
718 None => {
719 error!(
721 "sending packets in the application data space requires known 0-RTT or 1-RTT keys"
722 );
723 return None;
724 }
725 }
726 } else {
727 unreachable!("tried to send {:?} packet without keys", space_id)
728 };
729 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
730 if num_datagrams >= max_datagrams {
734 break;
736 }
737
738 if self
745 .path
746 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
747 {
748 trace!("blocked by anti-amplification");
749 break;
750 }
751
752 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
755 let untracked_bytes = if let Some(builder) = &builder_storage {
757 buf_capacity - builder.partial_encode.start
758 } else {
759 0
760 } as u64;
761 debug_assert!(untracked_bytes <= segment_size as u64);
762
763 let bytes_to_send = segment_size as u64 + untracked_bytes;
764 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
765 space_idx += 1;
766 congestion_blocked = true;
767 trace!("blocked by congestion control");
770 continue;
771 }
772
773 let smoothed_rtt = self.path.rtt.get();
775 if let Some(delay) = self.path.pacing.delay(
776 smoothed_rtt,
777 bytes_to_send,
778 self.path.current_mtu(),
779 self.path.congestion.window(),
780 now,
781 ) {
782 self.timers.set(Timer::Pacing, delay);
783 congestion_blocked = true;
784 trace!("blocked by pacing");
787 break;
788 }
789 }
790
791 if let Some(mut builder) = builder_storage.take() {
793 if pad_datagram {
794 let min_size = self.pqc_state.min_initial_size();
795 builder.pad_to(min_size);
796 }
797
798 if num_datagrams > 1 || pad_datagram_to_mtu {
799 const MAX_PADDING: usize = 16;
812 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
813 - datagram_start
814 + builder.tag_len;
815 if (packet_len_unpadded + MAX_PADDING < segment_size
816 && !pad_datagram_to_mtu)
817 || datagram_start + segment_size > buf_capacity
818 {
819 trace!(
820 "GSO truncated by demand for {} padding bytes or loss probe",
821 segment_size - packet_len_unpadded
822 );
823 builder_storage = Some(builder);
824 break;
825 }
826
827 builder.pad_to(segment_size as u16);
830 }
831
832 builder.finish_and_track(now, self, sent_frames.take(), buf);
833
834 if num_datagrams == 1 {
835 segment_size = buf.len();
842 buf_capacity = buf.len();
845
846 if space_id == SpaceId::Data {
853 let frame_space_1rtt =
854 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
855 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
856 break;
857 }
858 }
859 }
860 }
861
862 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
864 0 => segment_size,
865 _ => {
866 self.spaces[space_id].loss_probes -= 1;
867 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
871 }
872 };
873 buf_capacity += next_datagram_size_limit;
874 if buf.capacity() < buf_capacity {
875 buf.reserve(max_datagrams * segment_size);
884 }
885 num_datagrams += 1;
886 coalesce = true;
887 pad_datagram = false;
888 datagram_start = buf.len();
889
890 debug_assert_eq!(
891 datagram_start % segment_size,
892 0,
893 "datagrams in a GSO batch must be aligned to the segment size"
894 );
895 } else {
896 if let Some(builder) = builder_storage.take() {
900 builder.finish_and_track(now, self, sent_frames.take(), buf);
901 }
902 }
903
904 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
905
906 if self.spaces[SpaceId::Initial].crypto.is_some()
911 && space_id == SpaceId::Handshake
912 && self.side.is_client()
913 {
914 self.discard_space(now, SpaceId::Initial);
917 }
918 if let Some(ref mut prev) = self.prev_crypto {
919 prev.update_unacked = false;
920 }
921
922 debug_assert!(
923 builder_storage.is_none() && sent_frames.is_none(),
924 "Previous packet must have been finished"
925 );
926
927 let builder = builder_storage.insert(PacketBuilder::new(
928 now,
929 space_id,
930 self.rem_cids.active(),
931 buf,
932 buf_capacity,
933 datagram_start,
934 ack_eliciting,
935 self,
936 )?);
937 coalesce = coalesce && !builder.short_header;
938
939 let should_adjust_coalescing = self
941 .pqc_state
942 .should_adjust_coalescing(buf.len() - datagram_start, space_id);
943
944 if should_adjust_coalescing {
945 coalesce = false;
946 trace!("Disabling coalescing for PQC handshake in {:?}", space_id);
947 }
948
949 pad_datagram |=
951 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
952
953 if close {
954 trace!("sending CONNECTION_CLOSE");
955 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
960 Self::populate_acks(
961 now,
962 self.receiving_ecn,
963 &mut SentFrames::default(),
964 &mut self.spaces[space_id],
965 buf,
966 &mut self.stats,
967 );
968 }
969
970 debug_assert!(
974 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
975 "ACKs should leave space for ConnectionClose"
976 );
977 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
978 let max_frame_size = builder.max_size - buf.len();
979 match self.state {
980 State::Closed(state::Closed { ref reason }) => {
981 if space_id == SpaceId::Data || reason.is_transport_layer() {
982 reason.encode(buf, max_frame_size)
983 } else {
984 frame::ConnectionClose {
985 error_code: TransportErrorCode::APPLICATION_ERROR,
986 frame_type: None,
987 reason: Bytes::new(),
988 }
989 .encode(buf, max_frame_size)
990 }
991 }
992 State::Draining => frame::ConnectionClose {
993 error_code: TransportErrorCode::NO_ERROR,
994 frame_type: None,
995 reason: Bytes::new(),
996 }
997 .encode(buf, max_frame_size),
998 _ => unreachable!(
999 "tried to make a close packet when the connection wasn't closed"
1000 ),
1001 }
1002 }
1003 if space_id == self.highest_space {
1004 self.close = false;
1006 break;
1008 } else {
1009 space_idx += 1;
1013 continue;
1014 }
1015 }
1016
1017 if space_id == SpaceId::Data && num_datagrams == 1 {
1020 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
1021 let mut builder = builder_storage.take().unwrap();
1024 trace!("PATH_RESPONSE {:08x} (off-path)", token);
1025 buf.write(frame::FrameType::PATH_RESPONSE);
1026 buf.write(token);
1027 self.stats.frame_tx.path_response += 1;
1028 let min_size = self.pqc_state.min_initial_size();
1029 builder.pad_to(min_size);
1030 builder.finish_and_track(
1031 now,
1032 self,
1033 Some(SentFrames {
1034 non_retransmits: true,
1035 ..SentFrames::default()
1036 }),
1037 buf,
1038 );
1039 self.stats.udp_tx.on_sent(1, buf.len());
1040
1041 #[cfg(feature = "trace")]
1043 {
1044 use crate::trace_packet_sent;
1045 trace_packet_sent!(
1047 &self.event_log,
1048 self.trace_context.trace_id(),
1049 buf.len() as u32,
1050 0 );
1052 }
1053
1054 return Some(Transmit {
1055 destination: remote,
1056 size: buf.len(),
1057 ecn: None,
1058 segment_size: None,
1059 src_ip: self.local_ip,
1060 });
1061 }
1062 }
1063
1064 if space_id == SpaceId::Data && self.address_discovery_state.is_some() {
1066 let peer_supports = self.peer_params.address_discovery.is_some();
1067
1068 if let Some(state) = &mut self.address_discovery_state {
1069 if peer_supports {
1070 if let Some(frame) = state.queue_observed_address_frame(0, self.path.remote)
1071 {
1072 self.spaces[space_id]
1073 .pending
1074 .outbound_observations
1075 .push(frame);
1076 }
1077 }
1078 }
1079 }
1080
1081 let sent =
1082 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
1083
1084 debug_assert!(
1091 !(sent.is_ack_only(&self.streams)
1092 && !can_send.acks
1093 && can_send.other
1094 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
1095 && self.datagrams.outgoing.is_empty()),
1096 "SendableFrames was {can_send:?}, but only ACKs have been written"
1097 );
1098 pad_datagram |= sent.requires_padding;
1099
1100 if sent.largest_acked.is_some() {
1101 self.spaces[space_id].pending_acks.acks_sent();
1102 self.timers.stop(Timer::MaxAckDelay);
1103 }
1104
1105 sent_frames = Some(sent);
1107
1108 }
1111
1112 if let Some(mut builder) = builder_storage {
1114 if pad_datagram {
1115 let min_size = self.pqc_state.min_initial_size();
1116 builder.pad_to(min_size);
1117 }
1118
1119 if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
1125 builder.pad_to(segment_size as u16);
1126 }
1127
1128 let last_packet_number = builder.exact_number;
1129 builder.finish_and_track(now, self, sent_frames, buf);
1130 self.path
1131 .congestion
1132 .on_sent(now, buf.len() as u64, last_packet_number);
1133
1134 #[cfg(feature = "__qlog")]
1135 self.emit_qlog_recovery_metrics(now);
1136 }
1137
1138 self.app_limited = buf.is_empty() && !congestion_blocked;
1139
1140 if buf.is_empty() && self.state.is_established() {
1142 let space_id = SpaceId::Data;
1143 let probe_size = self
1144 .path
1145 .mtud
1146 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
1147
1148 let buf_capacity = probe_size as usize;
1149 buf.reserve(buf_capacity);
1150
1151 let mut builder = PacketBuilder::new(
1152 now,
1153 space_id,
1154 self.rem_cids.active(),
1155 buf,
1156 buf_capacity,
1157 0,
1158 true,
1159 self,
1160 )?;
1161
1162 buf.write(frame::FrameType::PING);
1164 self.stats.frame_tx.ping += 1;
1165
1166 if self.peer_supports_ack_frequency() {
1168 buf.write(frame::FrameType::IMMEDIATE_ACK);
1169 self.stats.frame_tx.immediate_ack += 1;
1170 }
1171
1172 builder.pad_to(probe_size);
1173 let sent_frames = SentFrames {
1174 non_retransmits: true,
1175 ..Default::default()
1176 };
1177 builder.finish_and_track(now, self, Some(sent_frames), buf);
1178
1179 self.stats.path.sent_plpmtud_probes += 1;
1180 num_datagrams = 1;
1181
1182 trace!(?probe_size, "writing MTUD probe");
1183 }
1184
1185 if buf.is_empty() {
1186 return None;
1187 }
1188
1189 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1190 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1191
1192 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
1193
1194 #[cfg(feature = "trace")]
1196 {
1197 use crate::trace_packet_sent;
1198 let packet_num = self.spaces[SpaceId::Data]
1201 .next_packet_number
1202 .saturating_sub(1);
1203 trace_packet_sent!(
1204 &self.event_log,
1205 self.trace_context.trace_id(),
1206 buf.len() as u32,
1207 packet_num
1208 );
1209 }
1210
1211 Some(Transmit {
1212 destination: self.path.remote,
1213 size: buf.len(),
1214 ecn: if self.path.sending_ecn {
1215 Some(EcnCodepoint::Ect0)
1216 } else {
1217 None
1218 },
1219 segment_size: match num_datagrams {
1220 1 => None,
1221 _ => Some(segment_size),
1222 },
1223 src_ip: self.local_ip,
1224 })
1225 }
1226
1227 fn send_coordination_request(&mut self, _now: Instant, _buf: &mut Vec<u8>) -> Option<Transmit> {
1229 let nat = self.nat_traversal.as_mut()?;
1231 if !nat.should_send_punch_request() {
1232 return None;
1233 }
1234
1235 let coord = nat.coordination.as_ref()?;
1236 let round = coord.round;
1237 if coord.punch_targets.is_empty() {
1238 return None;
1239 }
1240
1241 trace!(
1242 "queuing PUNCH_ME_NOW round {} with {} targets",
1243 round,
1244 coord.punch_targets.len()
1245 );
1246
1247 for target in &coord.punch_targets {
1249 let punch = frame::PunchMeNow {
1250 round,
1251 paired_with_sequence_number: target.remote_sequence,
1252 address: target.remote_addr,
1253 target_peer_id: None,
1254 };
1255 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1256 }
1257
1258 nat.mark_punch_request_sent();
1260
1261 None
1263 }
1264
1265 fn send_coordinated_path_challenge(
1267 &mut self,
1268 now: Instant,
1269 buf: &mut Vec<u8>,
1270 ) -> Option<Transmit> {
1271 if let Some(nat_traversal) = &mut self.nat_traversal {
1273 if nat_traversal.should_start_punching(now) {
1274 nat_traversal.start_punching_phase(now);
1275 }
1276 }
1277
1278 let (target_addr, challenge) = {
1280 let nat_traversal = self.nat_traversal.as_ref()?;
1281 match nat_traversal.get_coordination_phase() {
1282 Some(CoordinationPhase::Punching) => {
1283 let targets = nat_traversal.get_punch_targets_from_coordination()?;
1284 if targets.is_empty() {
1285 return None;
1286 }
1287 let target = &targets[0];
1289 (target.remote_addr, target.challenge)
1290 }
1291 _ => return None,
1292 }
1293 };
1294
1295 debug_assert_eq!(
1296 self.highest_space,
1297 SpaceId::Data,
1298 "PATH_CHALLENGE queued without 1-RTT keys"
1299 );
1300
1301 buf.reserve(self.pqc_state.min_initial_size() as usize);
1302 let buf_capacity = buf.capacity();
1303
1304 let mut builder = PacketBuilder::new(
1305 now,
1306 SpaceId::Data,
1307 self.rem_cids.active(),
1308 buf,
1309 buf_capacity,
1310 0,
1311 false,
1312 self,
1313 )?;
1314
1315 trace!(
1316 "sending coordinated PATH_CHALLENGE {:08x} to {}",
1317 challenge, target_addr
1318 );
1319 buf.write(frame::FrameType::PATH_CHALLENGE);
1320 buf.write(challenge);
1321 self.stats.frame_tx.path_challenge += 1;
1322
1323 let min_size = self.pqc_state.min_initial_size();
1324 builder.pad_to(min_size);
1325 builder.finish_and_track(now, self, None, buf);
1326
1327 if let Some(nat_traversal) = &mut self.nat_traversal {
1329 nat_traversal.mark_coordination_validating();
1330 }
1331
1332 Some(Transmit {
1333 destination: target_addr,
1334 size: buf.len(),
1335 ecn: if self.path.sending_ecn {
1336 Some(EcnCodepoint::Ect0)
1337 } else {
1338 None
1339 },
1340 segment_size: None,
1341 src_ip: self.local_ip,
1342 })
1343 }
1344
1345 fn send_nat_traversal_challenge(
1347 &mut self,
1348 now: Instant,
1349 buf: &mut Vec<u8>,
1350 ) -> Option<Transmit> {
1351 if let Some(request) = self.send_coordination_request(now, buf) {
1353 return Some(request);
1354 }
1355
1356 if let Some(punch) = self.send_coordinated_path_challenge(now, buf) {
1358 return Some(punch);
1359 }
1360
1361 let (remote_addr, remote_sequence) = {
1363 let nat_traversal = self.nat_traversal.as_ref()?;
1364 let candidates = nat_traversal.get_validation_candidates();
1365 if candidates.is_empty() {
1366 return None;
1367 }
1368 let (sequence, candidate) = candidates[0];
1370 (candidate.address, sequence)
1371 };
1372
1373 let challenge = self.rng.r#gen::<u64>();
1374
1375 if let Err(e) =
1377 self.nat_traversal
1378 .as_mut()?
1379 .start_validation(remote_sequence, challenge, now)
1380 {
1381 warn!("Failed to start NAT traversal validation: {}", e);
1382 return None;
1383 }
1384
1385 debug_assert_eq!(
1386 self.highest_space,
1387 SpaceId::Data,
1388 "PATH_CHALLENGE queued without 1-RTT keys"
1389 );
1390
1391 buf.reserve(self.pqc_state.min_initial_size() as usize);
1392 let buf_capacity = buf.capacity();
1393
1394 let mut builder = PacketBuilder::new(
1396 now,
1397 SpaceId::Data,
1398 self.rem_cids.active(),
1399 buf,
1400 buf_capacity,
1401 0,
1402 false,
1403 self,
1404 )?;
1405
1406 trace!(
1407 "sending PATH_CHALLENGE {:08x} to NAT candidate {}",
1408 challenge, remote_addr
1409 );
1410 buf.write(frame::FrameType::PATH_CHALLENGE);
1411 buf.write(challenge);
1412 self.stats.frame_tx.path_challenge += 1;
1413
1414 let min_size = self.pqc_state.min_initial_size();
1416 builder.pad_to(min_size);
1417
1418 builder.finish_and_track(now, self, None, buf);
1419
1420 Some(Transmit {
1421 destination: remote_addr,
1422 size: buf.len(),
1423 ecn: if self.path.sending_ecn {
1424 Some(EcnCodepoint::Ect0)
1425 } else {
1426 None
1427 },
1428 segment_size: None,
1429 src_ip: self.local_ip,
1430 })
1431 }
1432
1433 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1435 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1436 if !prev_path.challenge_pending {
1437 return None;
1438 }
1439 prev_path.challenge_pending = false;
1440 let token = prev_path
1441 .challenge
1442 .expect("previous path challenge pending without token");
1443 let destination = prev_path.remote;
1444 debug_assert_eq!(
1445 self.highest_space,
1446 SpaceId::Data,
1447 "PATH_CHALLENGE queued without 1-RTT keys"
1448 );
1449 buf.reserve(self.pqc_state.min_initial_size() as usize);
1450
1451 let buf_capacity = buf.capacity();
1452
1453 let mut builder = PacketBuilder::new(
1459 now,
1460 SpaceId::Data,
1461 *prev_cid,
1462 buf,
1463 buf_capacity,
1464 0,
1465 false,
1466 self,
1467 )?;
1468 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1469 buf.write(frame::FrameType::PATH_CHALLENGE);
1470 buf.write(token);
1471 self.stats.frame_tx.path_challenge += 1;
1472
1473 let min_size = self.pqc_state.min_initial_size();
1478 builder.pad_to(min_size);
1479
1480 builder.finish(self, buf);
1481 self.stats.udp_tx.on_sent(1, buf.len());
1482
1483 Some(Transmit {
1484 destination,
1485 size: buf.len(),
1486 ecn: None,
1487 segment_size: None,
1488 src_ip: self.local_ip,
1489 })
1490 }
1491
1492 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1494 if self.spaces[space_id].crypto.is_none()
1495 && (space_id != SpaceId::Data
1496 || self.zero_rtt_crypto.is_none()
1497 || self.side.is_server())
1498 {
1499 return SendableFrames::empty();
1501 }
1502 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1503 if space_id == SpaceId::Data {
1504 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1505 }
1506 can_send
1507 }
1508
1509 pub fn handle_event(&mut self, event: ConnectionEvent) {
1515 use ConnectionEventInner::*;
1516 match event.0 {
1517 Datagram(DatagramConnectionEvent {
1518 now,
1519 remote,
1520 ecn,
1521 first_decode,
1522 remaining,
1523 }) => {
1524 if remote != self.path.remote && !self.side.remote_may_migrate() {
1528 trace!("discarding packet from unrecognized peer {}", remote);
1529 return;
1530 }
1531
1532 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1533
1534 self.stats.udp_rx.datagrams += 1;
1535 self.stats.udp_rx.bytes += first_decode.len() as u64;
1536 let data_len = first_decode.len();
1537
1538 self.handle_decode(now, remote, ecn, first_decode);
1539 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1544
1545 if let Some(data) = remaining {
1546 self.stats.udp_rx.bytes += data.len() as u64;
1547 self.handle_coalesced(now, remote, ecn, data);
1548 }
1549
1550 #[cfg(feature = "__qlog")]
1551 self.emit_qlog_recovery_metrics(now);
1552
1553 if was_anti_amplification_blocked {
1554 self.set_loss_detection_timer(now);
1558 }
1559 }
1560 NewIdentifiers(ids, now) => {
1561 self.local_cid_state.new_cids(&ids, now);
1562 ids.into_iter().rev().for_each(|frame| {
1563 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1564 });
1565 if self.timers.get(Timer::PushNewCid).is_none_or(|x| x <= now) {
1567 self.reset_cid_retirement();
1568 }
1569 }
1570 QueueAddAddress(add) => {
1571 self.spaces[SpaceId::Data].pending.add_addresses.push(add);
1573 }
1574 QueuePunchMeNow(punch) => {
1575 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1577 }
1578 }
1579 }
1580
1581 pub fn handle_timeout(&mut self, now: Instant) {
1591 for &timer in &Timer::VALUES {
1592 if !self.timers.is_expired(timer, now) {
1593 continue;
1594 }
1595 self.timers.stop(timer);
1596 trace!(timer = ?timer, "timeout");
1597 match timer {
1598 Timer::Close => {
1599 self.state = State::Drained;
1600 self.endpoint_events.push_back(EndpointEventInner::Drained);
1601 }
1602 Timer::Idle => {
1603 self.kill(ConnectionError::TimedOut);
1604 }
1605 Timer::KeepAlive => {
1606 trace!("sending keep-alive");
1607 self.ping();
1608 }
1609 Timer::LossDetection => {
1610 self.on_loss_detection_timeout(now);
1611
1612 #[cfg(feature = "__qlog")]
1613 self.emit_qlog_recovery_metrics(now);
1614 }
1615 Timer::KeyDiscard => {
1616 self.zero_rtt_crypto = None;
1617 self.prev_crypto = None;
1618 }
1619 Timer::PathValidation => {
1620 debug!("path validation failed");
1621 if let Some((_, prev)) = self.prev_path.take() {
1622 self.path = prev;
1623 }
1624 self.path.challenge = None;
1625 self.path.challenge_pending = false;
1626 }
1627 Timer::Pacing => trace!("pacing timer expired"),
1628 Timer::NatTraversal => {
1629 self.handle_nat_traversal_timeout(now);
1630 }
1631 Timer::PushNewCid => {
1632 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1634 if !self.state.is_closed() {
1635 trace!(
1636 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1637 self.local_cid_state.retire_prior_to()
1638 );
1639 self.endpoint_events
1640 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1641 }
1642 }
1643 Timer::MaxAckDelay => {
1644 trace!("max ack delay reached");
1645 self.spaces[SpaceId::Data]
1647 .pending_acks
1648 .on_max_ack_delay_timeout()
1649 }
1650 }
1651 }
1652 }
1653
1654 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1666 self.close_inner(
1667 now,
1668 Close::Application(frame::ApplicationClose { error_code, reason }),
1669 )
1670 }
1671
1672 fn close_inner(&mut self, now: Instant, reason: Close) {
1673 let was_closed = self.state.is_closed();
1674 if !was_closed {
1675 self.close_common();
1676 self.set_close_timer(now);
1677 self.close = true;
1678 self.state = State::Closed(state::Closed { reason });
1679 }
1680 }
1681
1682 pub fn datagrams(&mut self) -> Datagrams<'_> {
1684 Datagrams { conn: self }
1685 }
1686
1687 pub fn stats(&self) -> ConnectionStats {
1689 let mut stats = self.stats;
1690 stats.path.rtt = self.path.rtt.get();
1691 stats.path.cwnd = self.path.congestion.window();
1692 stats.path.current_mtu = self.path.mtud.current_mtu();
1693
1694 stats
1695 }
1696
1697 pub fn set_token_binding_peer_id(&mut self, pid: PeerId) {
1699 self.peer_id_for_tokens = Some(pid);
1700 }
1701
1702 pub fn set_delay_new_token_until_binding(&mut self, v: bool) {
1704 self.delay_new_token_until_binding = v;
1705 }
1706
1707 pub fn ping(&mut self) {
1711 self.spaces[self.highest_space].ping_pending = true;
1712 }
1713
1714 pub(crate) fn is_pqc(&self) -> bool {
1716 self.pqc_state.using_pqc
1717 }
1718
1719 pub fn force_key_update(&mut self) {
1723 if !self.state.is_established() {
1724 debug!("ignoring forced key update in illegal state");
1725 return;
1726 }
1727 if self.prev_crypto.is_some() {
1728 debug!("ignoring redundant forced key update");
1731 return;
1732 }
1733 self.update_keys(None, false);
1734 }
1735
1736 pub fn crypto_session(&self) -> &dyn crypto::Session {
1738 &*self.crypto
1739 }
1740
1741 pub fn is_handshaking(&self) -> bool {
1746 self.state.is_handshake()
1747 }
1748
1749 pub fn is_closed(&self) -> bool {
1757 self.state.is_closed()
1758 }
1759
1760 pub fn is_drained(&self) -> bool {
1765 self.state.is_drained()
1766 }
1767
1768 pub fn accepted_0rtt(&self) -> bool {
1772 self.accepted_0rtt
1773 }
1774
1775 pub fn has_0rtt(&self) -> bool {
1777 self.zero_rtt_enabled
1778 }
1779
1780 pub fn has_pending_retransmits(&self) -> bool {
1782 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1783 }
1784
1785 pub fn side(&self) -> Side {
1787 self.side.side()
1788 }
1789
1790 pub fn remote_address(&self) -> SocketAddr {
1792 self.path.remote
1793 }
1794
1795 pub fn local_ip(&self) -> Option<IpAddr> {
1805 self.local_ip
1806 }
1807
1808 pub fn rtt(&self) -> Duration {
1810 self.path.rtt.get()
1811 }
1812
1813 pub fn congestion_state(&self) -> &dyn Controller {
1815 self.path.congestion.as_ref()
1816 }
1817
1818 pub fn path_changed(&mut self, now: Instant) {
1829 self.path.reset(now, &self.config);
1830 }
1831
1832 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1837 self.streams.set_max_concurrent(dir, count);
1838 let pending = &mut self.spaces[SpaceId::Data].pending;
1841 self.streams.queue_max_stream_id(pending);
1842 }
1843
1844 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1850 self.streams.max_concurrent(dir)
1851 }
1852
1853 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1855 if self.streams.set_receive_window(receive_window) {
1856 self.spaces[SpaceId::Data].pending.max_data = true;
1857 }
1858 }
1859
1860 pub fn set_address_discovery_enabled(&mut self, enabled: bool) {
1862 if let Some(ref mut state) = self.address_discovery_state {
1863 state.enabled = enabled;
1864 }
1865 }
1866
1867 pub fn address_discovery_enabled(&self) -> bool {
1869 self.address_discovery_state
1870 .as_ref()
1871 .is_some_and(|state| state.enabled)
1872 }
1873
1874 pub fn observed_address(&self) -> Option<SocketAddr> {
1879 self.address_discovery_state
1880 .as_ref()
1881 .and_then(|state| state.get_observed_address(0)) }
1883
1884 #[allow(dead_code)]
1886 pub(crate) fn address_discovery_state(&self) -> Option<&AddressDiscoveryState> {
1887 self.address_discovery_state.as_ref()
1888 }
1889
1890 fn on_ack_received(
1891 &mut self,
1892 now: Instant,
1893 space: SpaceId,
1894 ack: frame::Ack,
1895 ) -> Result<(), TransportError> {
1896 if ack.largest >= self.spaces[space].next_packet_number {
1897 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1898 }
1899 let new_largest = {
1900 let space = &mut self.spaces[space];
1901 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
1902 space.largest_acked_packet = Some(ack.largest);
1903 if let Some(info) = space.sent_packets.get(&ack.largest) {
1904 space.largest_acked_packet_sent = info.time_sent;
1908 }
1909 true
1910 } else {
1911 false
1912 }
1913 };
1914
1915 let mut newly_acked = ArrayRangeSet::new();
1917 for range in ack.iter() {
1918 self.packet_number_filter.check_ack(space, range.clone())?;
1919 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1920 newly_acked.insert_one(pn);
1921 }
1922 }
1923
1924 if newly_acked.is_empty() {
1925 return Ok(());
1926 }
1927
1928 let mut ack_eliciting_acked = false;
1929 for packet in newly_acked.elts() {
1930 if let Some(info) = self.spaces[space].take(packet) {
1931 if let Some(acked) = info.largest_acked {
1932 self.spaces[space].pending_acks.subtract_below(acked);
1938 }
1939 ack_eliciting_acked |= info.ack_eliciting;
1940
1941 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1943 if mtu_updated {
1944 self.path
1945 .congestion
1946 .on_mtu_update(self.path.mtud.current_mtu());
1947 }
1948
1949 self.ack_frequency.on_acked(packet);
1951
1952 self.on_packet_acked(now, packet, info);
1953 }
1954 }
1955
1956 self.path.congestion.on_end_acks(
1957 now,
1958 self.path.in_flight.bytes,
1959 self.app_limited,
1960 self.spaces[space].largest_acked_packet,
1961 );
1962
1963 if new_largest && ack_eliciting_acked {
1964 let ack_delay = if space != SpaceId::Data {
1965 Duration::from_micros(0)
1966 } else {
1967 cmp::min(
1968 self.ack_frequency.peer_max_ack_delay,
1969 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1970 )
1971 };
1972 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1973 self.path.rtt.update(ack_delay, rtt);
1974 if self.path.first_packet_after_rtt_sample.is_none() {
1975 self.path.first_packet_after_rtt_sample =
1976 Some((space, self.spaces[space].next_packet_number));
1977 }
1978 }
1979
1980 self.detect_lost_packets(now, space, true);
1982
1983 if self.peer_completed_address_validation() {
1984 self.pto_count = 0;
1985 }
1986
1987 if self.path.sending_ecn {
1989 if let Some(ecn) = ack.ecn {
1990 if new_largest {
1995 let sent = self.spaces[space].largest_acked_packet_sent;
1996 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1997 }
1998 } else {
1999 debug!("ECN not acknowledged by peer");
2001 self.path.sending_ecn = false;
2002 }
2003 }
2004
2005 self.set_loss_detection_timer(now);
2006 Ok(())
2007 }
2008
2009 fn process_ecn(
2011 &mut self,
2012 now: Instant,
2013 space: SpaceId,
2014 newly_acked: u64,
2015 ecn: frame::EcnCounts,
2016 largest_sent_time: Instant,
2017 ) {
2018 match self.spaces[space].detect_ecn(newly_acked, ecn) {
2019 Err(e) => {
2020 debug!("halting ECN due to verification failure: {}", e);
2021 self.path.sending_ecn = false;
2022 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
2025 }
2026 Ok(false) => {}
2027 Ok(true) => {
2028 self.stats.path.congestion_events += 1;
2029 self.path
2030 .congestion
2031 .on_congestion_event(now, largest_sent_time, false, 0);
2032 }
2033 }
2034 }
2035
2036 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
2039 self.remove_in_flight(pn, &info);
2040 if info.ack_eliciting && self.path.challenge.is_none() {
2041 self.path.congestion.on_ack(
2044 now,
2045 info.time_sent,
2046 info.size.into(),
2047 self.app_limited,
2048 &self.path.rtt,
2049 );
2050 }
2051
2052 if let Some(retransmits) = info.retransmits.get() {
2054 for (id, _) in retransmits.reset_stream.iter() {
2055 self.streams.reset_acked(*id);
2056 }
2057 }
2058
2059 for frame in info.stream_frames {
2060 self.streams.received_ack_of(frame);
2061 }
2062 }
2063
2064 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2065 let start = if self.zero_rtt_crypto.is_some() {
2066 now
2067 } else {
2068 self.prev_crypto
2069 .as_ref()
2070 .expect("no previous keys")
2071 .end_packet
2072 .as_ref()
2073 .expect("update not acknowledged yet")
2074 .1
2075 };
2076 self.timers
2077 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
2078 }
2079
2080 fn on_loss_detection_timeout(&mut self, now: Instant) {
2081 if let Some((_, pn_space)) = self.loss_time_and_space() {
2082 self.detect_lost_packets(now, pn_space, false);
2084 self.set_loss_detection_timer(now);
2085 return;
2086 }
2087
2088 let (_, space) = match self.pto_time_and_space(now) {
2089 Some(x) => x,
2090 None => {
2091 error!("PTO expired while unset");
2092 return;
2093 }
2094 };
2095 trace!(
2096 in_flight = self.path.in_flight.bytes,
2097 count = self.pto_count,
2098 ?space,
2099 "PTO fired"
2100 );
2101
2102 let count = match self.path.in_flight.ack_eliciting {
2103 0 => {
2106 debug_assert!(!self.peer_completed_address_validation());
2107 1
2108 }
2109 _ => 2,
2111 };
2112 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
2113 self.pto_count = self.pto_count.saturating_add(1);
2114 self.set_loss_detection_timer(now);
2115 }
2116
2117 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
2118 let mut lost_packets = Vec::<u64>::new();
2119 let mut lost_mtu_probe = None;
2120 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
2121 let rtt = self.path.rtt.conservative();
2122 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
2123
2124 let lost_send_time = now.checked_sub(loss_delay).unwrap();
2126 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
2127 let packet_threshold = self.config.packet_threshold as u64;
2128 let mut size_of_lost_packets = 0u64;
2129
2130 let congestion_period =
2134 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
2135 let mut persistent_congestion_start: Option<Instant> = None;
2136 let mut prev_packet = None;
2137 let mut in_persistent_congestion = false;
2138
2139 let space = &mut self.spaces[pn_space];
2140 space.loss_time = None;
2141
2142 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
2143 if prev_packet != Some(packet.wrapping_sub(1)) {
2144 persistent_congestion_start = None;
2146 }
2147
2148 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
2149 {
2150 if Some(packet) == in_flight_mtu_probe {
2151 lost_mtu_probe = in_flight_mtu_probe;
2154 } else {
2155 lost_packets.push(packet);
2156 size_of_lost_packets += info.size as u64;
2157 if info.ack_eliciting && due_to_ack {
2158 match persistent_congestion_start {
2159 Some(start) if info.time_sent - start > congestion_period => {
2162 in_persistent_congestion = true;
2163 }
2164 None if self
2166 .path
2167 .first_packet_after_rtt_sample
2168 .is_some_and(|x| x < (pn_space, packet)) =>
2169 {
2170 persistent_congestion_start = Some(info.time_sent);
2171 }
2172 _ => {}
2173 }
2174 }
2175 }
2176 } else {
2177 let next_loss_time = info.time_sent + loss_delay;
2178 space.loss_time = Some(
2179 space
2180 .loss_time
2181 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
2182 );
2183 persistent_congestion_start = None;
2184 }
2185
2186 prev_packet = Some(packet);
2187 }
2188
2189 if let Some(largest_lost) = lost_packets.last().cloned() {
2191 let old_bytes_in_flight = self.path.in_flight.bytes;
2192 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
2193 self.lost_packets += lost_packets.len() as u64;
2194 self.stats.path.lost_packets += lost_packets.len() as u64;
2195 self.stats.path.lost_bytes += size_of_lost_packets;
2196 trace!(
2197 "packets lost: {:?}, bytes lost: {}",
2198 lost_packets, size_of_lost_packets
2199 );
2200
2201 for &packet in &lost_packets {
2202 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2204 for frame in info.stream_frames {
2205 self.streams.retransmit(frame);
2206 }
2207 self.spaces[pn_space].pending |= info.retransmits;
2208 self.path.mtud.on_non_probe_lost(packet, info.size);
2209 }
2210
2211 if self.path.mtud.black_hole_detected(now) {
2212 self.stats.path.black_holes_detected += 1;
2213 self.path
2214 .congestion
2215 .on_mtu_update(self.path.mtud.current_mtu());
2216 if let Some(max_datagram_size) = self.datagrams().max_size() {
2217 self.datagrams.drop_oversized(max_datagram_size);
2218 }
2219 }
2220
2221 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
2223
2224 if lost_ack_eliciting {
2225 self.stats.path.congestion_events += 1;
2226 self.path.congestion.on_congestion_event(
2227 now,
2228 largest_lost_sent,
2229 in_persistent_congestion,
2230 size_of_lost_packets,
2231 );
2232 }
2233 }
2234
2235 if let Some(packet) = lost_mtu_probe {
2237 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2239 self.path.mtud.on_probe_lost();
2240 self.stats.path.lost_plpmtud_probes += 1;
2241 }
2242 }
2243
2244 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
2245 SpaceId::iter()
2246 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
2247 .min_by_key(|&(time, _)| time)
2248 }
2249
2250 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
2251 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
2252 let mut duration = self.path.rtt.pto_base() * backoff;
2253
2254 if self.path.in_flight.ack_eliciting == 0 {
2255 debug_assert!(!self.peer_completed_address_validation());
2256 let space = match self.highest_space {
2257 SpaceId::Handshake => SpaceId::Handshake,
2258 _ => SpaceId::Initial,
2259 };
2260 return Some((now + duration, space));
2261 }
2262
2263 let mut result = None;
2264 for space in SpaceId::iter() {
2265 if self.spaces[space].in_flight == 0 {
2266 continue;
2267 }
2268 if space == SpaceId::Data {
2269 if self.is_handshaking() {
2271 return result;
2272 }
2273 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2275 }
2276 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
2277 Some(time) => time,
2278 None => continue,
2279 };
2280 let pto = last_ack_eliciting + duration;
2281 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2282 result = Some((pto, space));
2283 }
2284 }
2285 result
2286 }
2287
2288 fn peer_completed_address_validation(&self) -> bool {
2289 if self.side.is_server() || self.state.is_closed() {
2290 return true;
2291 }
2292 self.spaces[SpaceId::Handshake]
2295 .largest_acked_packet
2296 .is_some()
2297 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
2298 || (self.spaces[SpaceId::Data].crypto.is_some()
2299 && self.spaces[SpaceId::Handshake].crypto.is_none())
2300 }
2301
2302 fn set_loss_detection_timer(&mut self, now: Instant) {
2303 if self.state.is_closed() {
2304 return;
2308 }
2309
2310 if let Some((loss_time, _)) = self.loss_time_and_space() {
2311 self.timers.set(Timer::LossDetection, loss_time);
2313 return;
2314 }
2315
2316 if self.path.anti_amplification_blocked(1) {
2317 self.timers.stop(Timer::LossDetection);
2319 return;
2320 }
2321
2322 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
2323 self.timers.stop(Timer::LossDetection);
2326 return;
2327 }
2328
2329 if let Some((timeout, _)) = self.pto_time_and_space(now) {
2332 self.timers.set(Timer::LossDetection, timeout);
2333 } else {
2334 self.timers.stop(Timer::LossDetection);
2335 }
2336 }
2337
2338 fn pto(&self, space: SpaceId) -> Duration {
2340 let max_ack_delay = match space {
2341 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
2342 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
2343 };
2344 self.path.rtt.pto_base() + max_ack_delay
2345 }
2346
2347 fn on_packet_authenticated(
2348 &mut self,
2349 now: Instant,
2350 space_id: SpaceId,
2351 ecn: Option<EcnCodepoint>,
2352 packet: Option<u64>,
2353 spin: bool,
2354 is_1rtt: bool,
2355 ) {
2356 self.total_authed_packets += 1;
2357 self.reset_keep_alive(now);
2358 self.reset_idle_timeout(now, space_id);
2359 self.permit_idle_reset = true;
2360 self.receiving_ecn |= ecn.is_some();
2361 if let Some(x) = ecn {
2362 let space = &mut self.spaces[space_id];
2363 space.ecn_counters += x;
2364
2365 if x.is_ce() {
2366 space.pending_acks.set_immediate_ack_required();
2367 }
2368 }
2369
2370 let packet = match packet {
2371 Some(x) => x,
2372 None => return,
2373 };
2374 if self.side.is_server() {
2375 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
2376 self.discard_space(now, SpaceId::Initial);
2378 }
2379 if self.zero_rtt_crypto.is_some() && is_1rtt {
2380 self.set_key_discard_timer(now, space_id)
2382 }
2383 }
2384 let space = &mut self.spaces[space_id];
2385 space.pending_acks.insert_one(packet, now);
2386 if packet >= space.rx_packet {
2387 space.rx_packet = packet;
2388 self.spin = self.side.is_client() ^ spin;
2390 }
2391 }
2392
2393 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
2394 let timeout = match self.idle_timeout {
2395 None => return,
2396 Some(dur) => dur,
2397 };
2398 if self.state.is_closed() {
2399 self.timers.stop(Timer::Idle);
2400 return;
2401 }
2402 let dt = cmp::max(timeout, 3 * self.pto(space));
2403 self.timers.set(Timer::Idle, now + dt);
2404 }
2405
2406 fn reset_keep_alive(&mut self, now: Instant) {
2407 let interval = match self.config.keep_alive_interval {
2408 Some(x) if self.state.is_established() => x,
2409 _ => return,
2410 };
2411 self.timers.set(Timer::KeepAlive, now + interval);
2412 }
2413
2414 fn reset_cid_retirement(&mut self) {
2415 if let Some(t) = self.local_cid_state.next_timeout() {
2416 self.timers.set(Timer::PushNewCid, t);
2417 }
2418 }
2419
2420 pub(crate) fn handle_first_packet(
2425 &mut self,
2426 now: Instant,
2427 remote: SocketAddr,
2428 ecn: Option<EcnCodepoint>,
2429 packet_number: u64,
2430 packet: InitialPacket,
2431 remaining: Option<BytesMut>,
2432 ) -> Result<(), ConnectionError> {
2433 let span = trace_span!("first recv");
2434 let _guard = span.enter();
2435 debug_assert!(self.side.is_server());
2436 let len = packet.header_data.len() + packet.payload.len();
2437 self.path.total_recvd = len as u64;
2438
2439 match self.state {
2440 State::Handshake(ref mut state) => {
2441 state.expected_token = packet.header.token.clone();
2442 }
2443 _ => unreachable!("first packet must be delivered in Handshake state"),
2444 }
2445
2446 self.on_packet_authenticated(
2447 now,
2448 SpaceId::Initial,
2449 ecn,
2450 Some(packet_number),
2451 false,
2452 false,
2453 );
2454
2455 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2456 if let Some(data) = remaining {
2457 self.handle_coalesced(now, remote, ecn, data);
2458 }
2459
2460 #[cfg(feature = "__qlog")]
2461 self.emit_qlog_recovery_metrics(now);
2462
2463 Ok(())
2464 }
2465
2466 fn init_0rtt(&mut self) {
2467 let (header, packet) = match self.crypto.early_crypto() {
2468 Some(x) => x,
2469 None => return,
2470 };
2471 if self.side.is_client() {
2472 match self.crypto.transport_parameters() {
2473 Ok(params) => {
2474 let params = params
2475 .expect("crypto layer didn't supply transport parameters with ticket");
2476 let params = TransportParameters {
2478 initial_src_cid: None,
2479 original_dst_cid: None,
2480 preferred_address: None,
2481 retry_src_cid: None,
2482 stateless_reset_token: None,
2483 min_ack_delay: None,
2484 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2485 max_ack_delay: TransportParameters::default().max_ack_delay,
2486 ..params
2487 };
2488 self.set_peer_params(params);
2489 }
2490 Err(e) => {
2491 error!("session ticket has malformed transport parameters: {}", e);
2492 return;
2493 }
2494 }
2495 }
2496 trace!("0-RTT enabled");
2497 self.zero_rtt_enabled = true;
2498 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2499 }
2500
2501 fn read_crypto(
2502 &mut self,
2503 space: SpaceId,
2504 crypto: &frame::Crypto,
2505 payload_len: usize,
2506 ) -> Result<(), TransportError> {
2507 let expected = if !self.state.is_handshake() {
2508 SpaceId::Data
2509 } else if self.highest_space == SpaceId::Initial {
2510 SpaceId::Initial
2511 } else {
2512 SpaceId::Handshake
2515 };
2516 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2520
2521 let end = crypto.offset + crypto.data.len() as u64;
2522 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2523 warn!(
2524 "received new {:?} CRYPTO data when expecting {:?}",
2525 space, expected
2526 );
2527 return Err(TransportError::PROTOCOL_VIOLATION(
2528 "new data at unexpected encryption level",
2529 ));
2530 }
2531
2532 self.pqc_state.detect_pqc_from_crypto(&crypto.data, space);
2534
2535 if self.pqc_state.should_trigger_mtu_discovery() {
2537 self.path
2539 .mtud
2540 .reset(self.pqc_state.min_initial_size(), self.config.min_mtu);
2541 trace!("Triggered MTU discovery for PQC handshake");
2542 }
2543
2544 let space = &mut self.spaces[space];
2545 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2546 if max > self.config.crypto_buffer_size as u64 {
2547 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2548 }
2549
2550 space
2551 .crypto_stream
2552 .insert(crypto.offset, crypto.data.clone(), payload_len);
2553 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2554 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2555 if self.crypto.read_handshake(&chunk.bytes)? {
2556 self.events.push_back(Event::HandshakeDataReady);
2557 }
2558 }
2559
2560 Ok(())
2561 }
2562
2563 fn write_crypto(&mut self) {
2564 loop {
2565 let space = self.highest_space;
2566 let mut outgoing = Vec::new();
2567 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2568 match space {
2569 SpaceId::Initial => {
2570 self.upgrade_crypto(SpaceId::Handshake, crypto);
2571 }
2572 SpaceId::Handshake => {
2573 self.upgrade_crypto(SpaceId::Data, crypto);
2574 }
2575 _ => unreachable!("got updated secrets during 1-RTT"),
2576 }
2577 }
2578 if outgoing.is_empty() {
2579 if space == self.highest_space {
2580 break;
2581 } else {
2582 continue;
2584 }
2585 }
2586 let offset = self.spaces[space].crypto_offset;
2587 let outgoing = Bytes::from(outgoing);
2588 if let State::Handshake(ref mut state) = self.state {
2589 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2590 state.client_hello = Some(outgoing.clone());
2591 }
2592 }
2593 self.spaces[space].crypto_offset += outgoing.len() as u64;
2594 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2595
2596 let use_pqc_fragmentation = self.pqc_state.using_pqc && outgoing.len() > 1200;
2598
2599 if use_pqc_fragmentation {
2600 let frames = self.pqc_state.packet_handler.fragment_crypto_data(
2602 &outgoing,
2603 offset,
2604 self.pqc_state.min_initial_size() as usize,
2605 );
2606 for frame in frames {
2607 self.spaces[space].pending.crypto.push_back(frame);
2608 }
2609 } else {
2610 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2612 offset,
2613 data: outgoing,
2614 });
2615 }
2616 }
2617 }
2618
2619 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2621 debug_assert!(
2622 self.spaces[space].crypto.is_none(),
2623 "already reached packet space {space:?}"
2624 );
2625 trace!("{:?} keys ready", space);
2626 if space == SpaceId::Data {
2627 self.next_crypto = Some(
2629 self.crypto
2630 .next_1rtt_keys()
2631 .expect("handshake should be complete"),
2632 );
2633 }
2634
2635 self.spaces[space].crypto = Some(crypto);
2636 debug_assert!(space as usize > self.highest_space as usize);
2637 self.highest_space = space;
2638 if space == SpaceId::Data && self.side.is_client() {
2639 self.zero_rtt_crypto = None;
2641 }
2642 }
2643
2644 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2645 debug_assert!(space_id != SpaceId::Data);
2646 trace!("discarding {:?} keys", space_id);
2647 if space_id == SpaceId::Initial {
2648 if let ConnectionSide::Client { token, .. } = &mut self.side {
2650 *token = Bytes::new();
2651 }
2652 }
2653 let space = &mut self.spaces[space_id];
2654 space.crypto = None;
2655 space.time_of_last_ack_eliciting_packet = None;
2656 space.loss_time = None;
2657 space.in_flight = 0;
2658 let sent_packets = mem::take(&mut space.sent_packets);
2659 for (pn, packet) in sent_packets.into_iter() {
2660 self.remove_in_flight(pn, &packet);
2661 }
2662 self.set_loss_detection_timer(now)
2663 }
2664
2665 fn handle_coalesced(
2666 &mut self,
2667 now: Instant,
2668 remote: SocketAddr,
2669 ecn: Option<EcnCodepoint>,
2670 data: BytesMut,
2671 ) {
2672 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2673 let mut remaining = Some(data);
2674 while let Some(data) = remaining {
2675 match PartialDecode::new(
2676 data,
2677 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2678 &[self.version],
2679 self.endpoint_config.grease_quic_bit,
2680 ) {
2681 Ok((partial_decode, rest)) => {
2682 remaining = rest;
2683 self.handle_decode(now, remote, ecn, partial_decode);
2684 }
2685 Err(e) => {
2686 trace!("malformed header: {}", e);
2687 return;
2688 }
2689 }
2690 }
2691 }
2692
2693 fn handle_decode(
2694 &mut self,
2695 now: Instant,
2696 remote: SocketAddr,
2697 ecn: Option<EcnCodepoint>,
2698 partial_decode: PartialDecode,
2699 ) {
2700 if let Some(decoded) = packet_crypto::unprotect_header(
2701 partial_decode,
2702 &self.spaces,
2703 self.zero_rtt_crypto.as_ref(),
2704 self.peer_params.stateless_reset_token,
2705 ) {
2706 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2707 }
2708 }
2709
2710 fn handle_packet(
2711 &mut self,
2712 now: Instant,
2713 remote: SocketAddr,
2714 ecn: Option<EcnCodepoint>,
2715 packet: Option<Packet>,
2716 stateless_reset: bool,
2717 ) {
2718 self.stats.udp_rx.ios += 1;
2719 if let Some(ref packet) = packet {
2720 trace!(
2721 "got {:?} packet ({} bytes) from {} using id {}",
2722 packet.header.space(),
2723 packet.payload.len() + packet.header_data.len(),
2724 remote,
2725 packet.header.dst_cid(),
2726 );
2727
2728 #[cfg(feature = "trace")]
2730 {
2731 use crate::trace_packet_received;
2732 let packet_size = packet.payload.len() + packet.header_data.len();
2734 trace_packet_received!(
2735 &self.event_log,
2736 self.trace_context.trace_id(),
2737 packet_size as u32,
2738 0 );
2740 }
2741 }
2742
2743 if self.is_handshaking() && remote != self.path.remote {
2744 debug!("discarding packet with unexpected remote during handshake");
2745 return;
2746 }
2747
2748 let was_closed = self.state.is_closed();
2749 let was_drained = self.state.is_drained();
2750
2751 let decrypted = match packet {
2752 None => Err(None),
2753 Some(mut packet) => self
2754 .decrypt_packet(now, &mut packet)
2755 .map(move |number| (packet, number)),
2756 };
2757 let result = match decrypted {
2758 _ if stateless_reset => {
2759 debug!("got stateless reset");
2760 Err(ConnectionError::Reset)
2761 }
2762 Err(Some(e)) => {
2763 warn!("illegal packet: {}", e);
2764 Err(e.into())
2765 }
2766 Err(None) => {
2767 debug!("failed to authenticate packet");
2768 self.authentication_failures += 1;
2769 let integrity_limit = self.spaces[self.highest_space]
2770 .crypto
2771 .as_ref()
2772 .unwrap()
2773 .packet
2774 .local
2775 .integrity_limit();
2776 if self.authentication_failures > integrity_limit {
2777 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2778 } else {
2779 return;
2780 }
2781 }
2782 Ok((packet, number)) => {
2783 let span = match number {
2784 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2785 None => trace_span!("recv", space = ?packet.header.space()),
2786 };
2787 let _guard = span.enter();
2788
2789 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2790 if number.is_some_and(is_duplicate) {
2791 debug!("discarding possible duplicate packet");
2792 return;
2793 } else if self.state.is_handshake() && packet.header.is_short() {
2794 trace!("dropping short packet during handshake");
2796 return;
2797 } else {
2798 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2799 if let State::Handshake(ref hs) = self.state {
2800 if self.side.is_server() && token != &hs.expected_token {
2801 warn!("discarding Initial with invalid retry token");
2805 return;
2806 }
2807 }
2808 }
2809
2810 if !self.state.is_closed() {
2811 let spin = match packet.header {
2812 Header::Short { spin, .. } => spin,
2813 _ => false,
2814 };
2815 self.on_packet_authenticated(
2816 now,
2817 packet.header.space(),
2818 ecn,
2819 number,
2820 spin,
2821 packet.header.is_1rtt(),
2822 );
2823 }
2824
2825 self.process_decrypted_packet(now, remote, number, packet)
2826 }
2827 }
2828 };
2829
2830 if let Err(conn_err) = result {
2832 self.error = Some(conn_err.clone());
2833 self.state = match conn_err {
2834 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2835 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2836 ConnectionError::Reset
2837 | ConnectionError::TransportError(TransportError {
2838 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2839 ..
2840 }) => State::Drained,
2841 ConnectionError::TimedOut => {
2842 unreachable!("timeouts aren't generated by packet processing");
2843 }
2844 ConnectionError::TransportError(err) => {
2845 debug!("closing connection due to transport error: {}", err);
2846 State::closed(err)
2847 }
2848 ConnectionError::VersionMismatch => State::Draining,
2849 ConnectionError::LocallyClosed => {
2850 unreachable!("LocallyClosed isn't generated by packet processing");
2851 }
2852 ConnectionError::CidsExhausted => {
2853 unreachable!("CidsExhausted isn't generated by packet processing");
2854 }
2855 };
2856 }
2857
2858 if !was_closed && self.state.is_closed() {
2859 self.close_common();
2860 if !self.state.is_drained() {
2861 self.set_close_timer(now);
2862 }
2863 }
2864 if !was_drained && self.state.is_drained() {
2865 self.endpoint_events.push_back(EndpointEventInner::Drained);
2866 self.timers.stop(Timer::Close);
2869 }
2870
2871 if let State::Closed(_) = self.state {
2873 self.close = remote == self.path.remote;
2874 }
2875 }
2876
2877 fn process_decrypted_packet(
2878 &mut self,
2879 now: Instant,
2880 remote: SocketAddr,
2881 number: Option<u64>,
2882 packet: Packet,
2883 ) -> Result<(), ConnectionError> {
2884 let state = match self.state {
2885 State::Established => {
2886 match packet.header.space() {
2887 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2888 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2889 _ => {
2890 trace!("discarding unexpected pre-handshake packet");
2891 }
2892 }
2893 return Ok(());
2894 }
2895 State::Closed(_) => {
2896 for result in frame::Iter::new(packet.payload.freeze())? {
2897 let frame = match result {
2898 Ok(frame) => frame,
2899 Err(err) => {
2900 debug!("frame decoding error: {err:?}");
2901 continue;
2902 }
2903 };
2904
2905 if let Frame::Padding = frame {
2906 continue;
2907 };
2908
2909 self.stats.frame_rx.record(&frame);
2910
2911 if let Frame::Close(_) = frame {
2912 trace!("draining");
2913 self.state = State::Draining;
2914 break;
2915 }
2916 }
2917 return Ok(());
2918 }
2919 State::Draining | State::Drained => return Ok(()),
2920 State::Handshake(ref mut state) => state,
2921 };
2922
2923 match packet.header {
2924 Header::Retry {
2925 src_cid: rem_cid, ..
2926 } => {
2927 if self.side.is_server() {
2928 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2929 }
2930
2931 if self.total_authed_packets > 1
2932 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2934 &self.rem_cids.active(),
2935 &packet.header_data,
2936 &packet.payload,
2937 )
2938 {
2939 trace!("discarding invalid Retry");
2940 return Ok(());
2948 }
2949
2950 trace!("retrying with CID {}", rem_cid);
2951 let client_hello = state.client_hello.take().unwrap();
2952 self.retry_src_cid = Some(rem_cid);
2953 self.rem_cids.update_initial_cid(rem_cid);
2954 self.rem_handshake_cid = rem_cid;
2955
2956 let space = &mut self.spaces[SpaceId::Initial];
2957 if let Some(info) = space.take(0) {
2958 self.on_packet_acked(now, 0, info);
2959 };
2960
2961 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
2963 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2964 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2965 crypto_offset: client_hello.len() as u64,
2966 ..PacketSpace::new(now)
2967 };
2968 self.spaces[SpaceId::Initial]
2969 .pending
2970 .crypto
2971 .push_back(frame::Crypto {
2972 offset: 0,
2973 data: client_hello,
2974 });
2975
2976 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2978 for (pn, info) in zero_rtt {
2979 self.remove_in_flight(pn, &info);
2980 self.spaces[SpaceId::Data].pending |= info.retransmits;
2981 }
2982 self.streams.retransmit_all_for_0rtt();
2983
2984 let token_len = packet.payload.len() - 16;
2985 let ConnectionSide::Client { ref mut token, .. } = self.side else {
2986 unreachable!("we already short-circuited if we're server");
2987 };
2988 *token = packet.payload.freeze().split_to(token_len);
2989 self.state = State::Handshake(state::Handshake {
2990 expected_token: Bytes::new(),
2991 rem_cid_set: false,
2992 client_hello: None,
2993 });
2994 Ok(())
2995 }
2996 Header::Long {
2997 ty: LongType::Handshake,
2998 src_cid: rem_cid,
2999 ..
3000 } => {
3001 if rem_cid != self.rem_handshake_cid {
3002 debug!(
3003 "discarding packet with mismatched remote CID: {} != {}",
3004 self.rem_handshake_cid, rem_cid
3005 );
3006 return Ok(());
3007 }
3008 self.on_path_validated();
3009
3010 self.process_early_payload(now, packet)?;
3011 if self.state.is_closed() {
3012 return Ok(());
3013 }
3014
3015 if self.crypto.is_handshaking() {
3016 trace!("handshake ongoing");
3017 return Ok(());
3018 }
3019
3020 if self.side.is_client() {
3021 let params =
3023 self.crypto
3024 .transport_parameters()?
3025 .ok_or_else(|| TransportError {
3026 code: TransportErrorCode::crypto(0x6d),
3027 frame: None,
3028 reason: "transport parameters missing".into(),
3029 })?;
3030
3031 if self.has_0rtt() {
3032 if !self.crypto.early_data_accepted().unwrap() {
3033 debug_assert!(self.side.is_client());
3034 debug!("0-RTT rejected");
3035 self.accepted_0rtt = false;
3036 self.streams.zero_rtt_rejected();
3037
3038 self.spaces[SpaceId::Data].pending = Retransmits::default();
3040
3041 let sent_packets =
3043 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
3044 for (pn, packet) in sent_packets {
3045 self.remove_in_flight(pn, &packet);
3046 }
3047 } else {
3048 self.accepted_0rtt = true;
3049 params.validate_resumption_from(&self.peer_params)?;
3050 }
3051 }
3052 if let Some(token) = params.stateless_reset_token {
3053 self.endpoint_events
3054 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
3055 }
3056 self.handle_peer_params(params)?;
3057 self.issue_first_cids(now);
3058 } else {
3059 self.spaces[SpaceId::Data].pending.handshake_done = true;
3061 self.discard_space(now, SpaceId::Handshake);
3062 }
3063
3064 self.events.push_back(Event::Connected);
3065 self.state = State::Established;
3066 trace!("established");
3067 Ok(())
3068 }
3069 Header::Initial(InitialHeader {
3070 src_cid: rem_cid, ..
3071 }) => {
3072 if !state.rem_cid_set {
3073 trace!("switching remote CID to {}", rem_cid);
3074 let mut state = state.clone();
3075 self.rem_cids.update_initial_cid(rem_cid);
3076 self.rem_handshake_cid = rem_cid;
3077 self.orig_rem_cid = rem_cid;
3078 state.rem_cid_set = true;
3079 self.state = State::Handshake(state);
3080 } else if rem_cid != self.rem_handshake_cid {
3081 debug!(
3082 "discarding packet with mismatched remote CID: {} != {}",
3083 self.rem_handshake_cid, rem_cid
3084 );
3085 return Ok(());
3086 }
3087
3088 let starting_space = self.highest_space;
3089 self.process_early_payload(now, packet)?;
3090
3091 if self.side.is_server()
3092 && starting_space == SpaceId::Initial
3093 && self.highest_space != SpaceId::Initial
3094 {
3095 let params =
3096 self.crypto
3097 .transport_parameters()?
3098 .ok_or_else(|| TransportError {
3099 code: TransportErrorCode::crypto(0x6d),
3100 frame: None,
3101 reason: "transport parameters missing".into(),
3102 })?;
3103 self.handle_peer_params(params)?;
3104 self.issue_first_cids(now);
3105 self.init_0rtt();
3106 }
3107 Ok(())
3108 }
3109 Header::Long {
3110 ty: LongType::ZeroRtt,
3111 ..
3112 } => {
3113 self.process_payload(now, remote, number.unwrap(), packet)?;
3114 Ok(())
3115 }
3116 Header::VersionNegotiate { .. } => {
3117 if self.total_authed_packets > 1 {
3118 return Ok(());
3119 }
3120 let supported = packet
3121 .payload
3122 .chunks(4)
3123 .any(|x| match <[u8; 4]>::try_from(x) {
3124 Ok(version) => self.version == u32::from_be_bytes(version),
3125 Err(_) => false,
3126 });
3127 if supported {
3128 return Ok(());
3129 }
3130 debug!("remote doesn't support our version");
3131 Err(ConnectionError::VersionMismatch)
3132 }
3133 Header::Short { .. } => unreachable!(
3134 "short packets received during handshake are discarded in handle_packet"
3135 ),
3136 }
3137 }
3138
3139 fn process_early_payload(
3141 &mut self,
3142 now: Instant,
3143 packet: Packet,
3144 ) -> Result<(), TransportError> {
3145 debug_assert_ne!(packet.header.space(), SpaceId::Data);
3146 let payload_len = packet.payload.len();
3147 let mut ack_eliciting = false;
3148 for result in frame::Iter::new(packet.payload.freeze())? {
3149 let frame = result?;
3150 let span = match frame {
3151 Frame::Padding => continue,
3152 _ => Some(trace_span!("frame", ty = %frame.ty())),
3153 };
3154
3155 self.stats.frame_rx.record(&frame);
3156
3157 let _guard = span.as_ref().map(|x| x.enter());
3158 ack_eliciting |= frame.is_ack_eliciting();
3159
3160 match frame {
3162 Frame::Padding | Frame::Ping => {}
3163 Frame::Crypto(frame) => {
3164 self.read_crypto(packet.header.space(), &frame, payload_len)?;
3165 }
3166 Frame::Ack(ack) => {
3167 self.on_ack_received(now, packet.header.space(), ack)?;
3168 }
3169 Frame::Close(reason) => {
3170 self.error = Some(reason.into());
3171 self.state = State::Draining;
3172 return Ok(());
3173 }
3174 _ => {
3175 let mut err =
3176 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
3177 err.frame = Some(frame.ty());
3178 return Err(err);
3179 }
3180 }
3181 }
3182
3183 if ack_eliciting {
3184 self.spaces[packet.header.space()]
3186 .pending_acks
3187 .set_immediate_ack_required();
3188 }
3189
3190 self.write_crypto();
3191 Ok(())
3192 }
3193
3194 fn process_payload(
3195 &mut self,
3196 now: Instant,
3197 remote: SocketAddr,
3198 number: u64,
3199 packet: Packet,
3200 ) -> Result<(), TransportError> {
3201 let payload = packet.payload.freeze();
3202 let mut is_probing_packet = true;
3203 let mut close = None;
3204 let payload_len = payload.len();
3205 let mut ack_eliciting = false;
3206 for result in frame::Iter::new(payload)? {
3207 let frame = result?;
3208 let span = match frame {
3209 Frame::Padding => continue,
3210 _ => Some(trace_span!("frame", ty = %frame.ty())),
3211 };
3212
3213 self.stats.frame_rx.record(&frame);
3214 match &frame {
3217 Frame::Crypto(f) => {
3218 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
3219 }
3220 Frame::Stream(f) => {
3221 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
3222 }
3223 Frame::Datagram(f) => {
3224 trace!(len = f.data.len(), "got datagram frame");
3225 }
3226 f => {
3227 trace!("got frame {:?}", f);
3228 }
3229 }
3230
3231 let _guard = span.as_ref().map(|x| x.enter());
3232 if packet.header.is_0rtt() {
3233 match frame {
3234 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
3235 return Err(TransportError::PROTOCOL_VIOLATION(
3236 "illegal frame type in 0-RTT",
3237 ));
3238 }
3239 _ => {}
3240 }
3241 }
3242 ack_eliciting |= frame.is_ack_eliciting();
3243
3244 match frame {
3246 Frame::Padding
3247 | Frame::PathChallenge(_)
3248 | Frame::PathResponse(_)
3249 | Frame::NewConnectionId(_) => {}
3250 _ => {
3251 is_probing_packet = false;
3252 }
3253 }
3254 match frame {
3255 Frame::Crypto(frame) => {
3256 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
3257 }
3258 Frame::Stream(frame) => {
3259 if self.streams.received(frame, payload_len)?.should_transmit() {
3260 self.spaces[SpaceId::Data].pending.max_data = true;
3261 }
3262 }
3263 Frame::Ack(ack) => {
3264 self.on_ack_received(now, SpaceId::Data, ack)?;
3265 }
3266 Frame::Padding | Frame::Ping => {}
3267 Frame::Close(reason) => {
3268 close = Some(reason);
3269 }
3270 Frame::PathChallenge(token) => {
3271 self.path_responses.push(number, token, remote);
3272 if remote == self.path.remote {
3273 match self.peer_supports_ack_frequency() {
3276 true => self.immediate_ack(),
3277 false => self.ping(),
3278 }
3279 }
3280 }
3281 Frame::PathResponse(token) => {
3282 if self.path.challenge == Some(token) && remote == self.path.remote {
3283 trace!("new path validated");
3284 self.timers.stop(Timer::PathValidation);
3285 self.path.challenge = None;
3286 self.path.validated = true;
3287 if let Some((_, ref mut prev_path)) = self.prev_path {
3288 prev_path.challenge = None;
3289 prev_path.challenge_pending = false;
3290 }
3291 self.on_path_validated();
3292 } else if let Some(nat_traversal) = &mut self.nat_traversal {
3293 match nat_traversal.handle_validation_success(remote, token, now) {
3295 Ok(sequence) => {
3296 trace!(
3297 "NAT traversal candidate {} validated for sequence {}",
3298 remote, sequence
3299 );
3300
3301 if nat_traversal.handle_coordination_success(remote, now) {
3303 trace!("Coordination succeeded via {}", remote);
3304
3305 let can_migrate = match &self.side {
3307 ConnectionSide::Client { .. } => true, ConnectionSide::Server { server_config } => {
3309 server_config.migration
3310 }
3311 };
3312
3313 if can_migrate {
3314 let best_pairs = nat_traversal.get_best_succeeded_pairs();
3316 if let Some(best) = best_pairs.first() {
3317 if best.remote_addr == remote
3318 && best.remote_addr != self.path.remote
3319 {
3320 debug!(
3321 "NAT traversal found better path, initiating migration"
3322 );
3323 if let Err(e) =
3325 self.migrate_to_nat_traversal_path(now)
3326 {
3327 warn!(
3328 "Failed to migrate to NAT traversal path: {:?}",
3329 e
3330 );
3331 }
3332 }
3333 }
3334 }
3335 } else {
3336 if nat_traversal.mark_pair_succeeded(remote) {
3338 trace!("NAT traversal pair succeeded for {}", remote);
3339 }
3340 }
3341 }
3342 Err(NatTraversalError::ChallengeMismatch) => {
3343 debug!(
3344 "PATH_RESPONSE challenge mismatch for NAT candidate {}",
3345 remote
3346 );
3347 }
3348 Err(e) => {
3349 debug!("NAT traversal validation error: {}", e);
3350 }
3351 }
3352 } else {
3353 debug!(token, "ignoring invalid PATH_RESPONSE");
3354 }
3355 }
3356 Frame::MaxData(bytes) => {
3357 self.streams.received_max_data(bytes);
3358 }
3359 Frame::MaxStreamData { id, offset } => {
3360 self.streams.received_max_stream_data(id, offset)?;
3361 }
3362 Frame::MaxStreams { dir, count } => {
3363 self.streams.received_max_streams(dir, count)?;
3364 }
3365 Frame::ResetStream(frame) => {
3366 if self.streams.received_reset(frame)?.should_transmit() {
3367 self.spaces[SpaceId::Data].pending.max_data = true;
3368 }
3369 }
3370 Frame::DataBlocked { offset } => {
3371 debug!(offset, "peer claims to be blocked at connection level");
3372 }
3373 Frame::StreamDataBlocked { id, offset } => {
3374 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
3375 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
3376 return Err(TransportError::STREAM_STATE_ERROR(
3377 "STREAM_DATA_BLOCKED on send-only stream",
3378 ));
3379 }
3380 debug!(
3381 stream = %id,
3382 offset, "peer claims to be blocked at stream level"
3383 );
3384 }
3385 Frame::StreamsBlocked { dir, limit } => {
3386 if limit > MAX_STREAM_COUNT {
3387 return Err(TransportError::FRAME_ENCODING_ERROR(
3388 "unrepresentable stream limit",
3389 ));
3390 }
3391 debug!(
3392 "peer claims to be blocked opening more than {} {} streams",
3393 limit, dir
3394 );
3395 }
3396 Frame::StopSending(frame::StopSending { id, error_code }) => {
3397 if id.initiator() != self.side.side() {
3398 if id.dir() == Dir::Uni {
3399 debug!("got STOP_SENDING on recv-only {}", id);
3400 return Err(TransportError::STREAM_STATE_ERROR(
3401 "STOP_SENDING on recv-only stream",
3402 ));
3403 }
3404 } else if self.streams.is_local_unopened(id) {
3405 return Err(TransportError::STREAM_STATE_ERROR(
3406 "STOP_SENDING on unopened stream",
3407 ));
3408 }
3409 self.streams.received_stop_sending(id, error_code);
3410 }
3411 Frame::RetireConnectionId { sequence } => {
3412 let allow_more_cids = self
3413 .local_cid_state
3414 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
3415 self.endpoint_events
3416 .push_back(EndpointEventInner::RetireConnectionId(
3417 now,
3418 sequence,
3419 allow_more_cids,
3420 ));
3421 }
3422 Frame::NewConnectionId(frame) => {
3423 trace!(
3424 sequence = frame.sequence,
3425 id = %frame.id,
3426 retire_prior_to = frame.retire_prior_to,
3427 );
3428 if self.rem_cids.active().is_empty() {
3429 return Err(TransportError::PROTOCOL_VIOLATION(
3430 "NEW_CONNECTION_ID when CIDs aren't in use",
3431 ));
3432 }
3433 if frame.retire_prior_to > frame.sequence {
3434 return Err(TransportError::PROTOCOL_VIOLATION(
3435 "NEW_CONNECTION_ID retiring unissued CIDs",
3436 ));
3437 }
3438
3439 use crate::cid_queue::InsertError;
3440 match self.rem_cids.insert(frame) {
3441 Ok(None) => {}
3442 Ok(Some((retired, reset_token))) => {
3443 let pending_retired =
3444 &mut self.spaces[SpaceId::Data].pending.retire_cids;
3445 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
3448 if (pending_retired.len() as u64)
3451 .saturating_add(retired.end.saturating_sub(retired.start))
3452 > MAX_PENDING_RETIRED_CIDS
3453 {
3454 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
3455 "queued too many retired CIDs",
3456 ));
3457 }
3458 pending_retired.extend(retired);
3459 self.set_reset_token(reset_token);
3460 }
3461 Err(InsertError::ExceedsLimit) => {
3462 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
3463 }
3464 Err(InsertError::Retired) => {
3465 trace!("discarding already-retired");
3466 self.spaces[SpaceId::Data]
3470 .pending
3471 .retire_cids
3472 .push(frame.sequence);
3473 continue;
3474 }
3475 };
3476
3477 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
3478 self.update_rem_cid();
3481 }
3482 }
3483 Frame::NewToken(NewToken { token }) => {
3484 let ConnectionSide::Client {
3485 token_store,
3486 server_name,
3487 ..
3488 } = &self.side
3489 else {
3490 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
3491 };
3492 if token.is_empty() {
3493 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
3494 }
3495 trace!("got new token");
3496 token_store.insert(server_name, token);
3497 }
3498 Frame::Datagram(datagram) => {
3499 if self
3500 .datagrams
3501 .received(datagram, &self.config.datagram_receive_buffer_size)?
3502 {
3503 self.events.push_back(Event::DatagramReceived);
3504 }
3505 }
3506 Frame::AckFrequency(ack_frequency) => {
3507 let space = &mut self.spaces[SpaceId::Data];
3509
3510 if !self
3511 .ack_frequency
3512 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3513 {
3514 continue;
3516 }
3517
3518 if let Some(timeout) = space
3521 .pending_acks
3522 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3523 {
3524 self.timers.set(Timer::MaxAckDelay, timeout);
3525 }
3526 }
3527 Frame::ImmediateAck => {
3528 self.spaces[SpaceId::Data]
3530 .pending_acks
3531 .set_immediate_ack_required();
3532 }
3533 Frame::HandshakeDone => {
3534 if self.side.is_server() {
3535 return Err(TransportError::PROTOCOL_VIOLATION(
3536 "client sent HANDSHAKE_DONE",
3537 ));
3538 }
3539 if self.spaces[SpaceId::Handshake].crypto.is_some() {
3540 self.discard_space(now, SpaceId::Handshake);
3541 }
3542 }
3543 Frame::AddAddress(add_address) => {
3544 self.handle_add_address(&add_address, now)?;
3545 }
3546 Frame::PunchMeNow(punch_me_now) => {
3547 self.handle_punch_me_now(&punch_me_now, now)?;
3548 }
3549 Frame::RemoveAddress(remove_address) => {
3550 self.handle_remove_address(&remove_address)?;
3551 }
3552 Frame::ObservedAddress(observed_address) => {
3553 self.handle_observed_address_frame(&observed_address, now)?;
3554 }
3555 }
3556 }
3557
3558 let space = &mut self.spaces[SpaceId::Data];
3559 if space
3560 .pending_acks
3561 .packet_received(now, number, ack_eliciting, &space.dedup)
3562 {
3563 self.timers
3564 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3565 }
3566
3567 let pending = &mut self.spaces[SpaceId::Data].pending;
3572 self.streams.queue_max_stream_id(pending);
3573
3574 if let Some(reason) = close {
3575 self.error = Some(reason.into());
3576 self.state = State::Draining;
3577 self.close = true;
3578 }
3579
3580 if remote != self.path.remote
3581 && !is_probing_packet
3582 && number == self.spaces[SpaceId::Data].rx_packet
3583 {
3584 let ConnectionSide::Server { ref server_config } = self.side else {
3585 return Err(TransportError::PROTOCOL_VIOLATION(
3586 "packets from unknown remote should be dropped by clients",
3587 ));
3588 };
3589 debug_assert!(
3590 server_config.migration,
3591 "migration-initiating packets should have been dropped immediately"
3592 );
3593 self.migrate(now, remote);
3594 self.update_rem_cid();
3596 self.spin = false;
3597 }
3598
3599 Ok(())
3600 }
3601
3602 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3603 trace!(%remote, "migration initiated");
3604 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3608 PathData::from_previous(remote, &self.path, now)
3609 } else {
3610 let peer_max_udp_payload_size =
3611 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3612 .unwrap_or(u16::MAX);
3613 PathData::new(
3614 remote,
3615 self.allow_mtud,
3616 Some(peer_max_udp_payload_size),
3617 now,
3618 &self.config,
3619 )
3620 };
3621 new_path.challenge = Some(self.rng.r#gen());
3622 new_path.challenge_pending = true;
3623 let prev_pto = self.pto(SpaceId::Data);
3624
3625 let mut prev = mem::replace(&mut self.path, new_path);
3626 if prev.challenge.is_none() {
3628 prev.challenge = Some(self.rng.r#gen());
3629 prev.challenge_pending = true;
3630 self.prev_path = Some((self.rem_cids.active(), prev));
3633 }
3634
3635 self.timers.set(
3636 Timer::PathValidation,
3637 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3638 );
3639 }
3640
3641 pub fn local_address_changed(&mut self) {
3643 self.update_rem_cid();
3644 self.ping();
3645 }
3646
3647 pub fn migrate_to_nat_traversal_path(&mut self, now: Instant) -> Result<(), TransportError> {
3649 let (remote_addr, local_addr) = {
3651 let nat_state = self
3652 .nat_traversal
3653 .as_ref()
3654 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
3655
3656 let best_pairs = nat_state.get_best_succeeded_pairs();
3658 if best_pairs.is_empty() {
3659 return Err(TransportError::PROTOCOL_VIOLATION(
3660 "No validated NAT traversal paths",
3661 ));
3662 }
3663
3664 let best_path = best_pairs
3666 .iter()
3667 .find(|pair| pair.remote_addr != self.path.remote)
3668 .or_else(|| best_pairs.first());
3669
3670 let best_path = best_path.ok_or_else(|| {
3671 TransportError::PROTOCOL_VIOLATION("No suitable NAT traversal path")
3672 })?;
3673
3674 debug!(
3675 "Migrating to NAT traversal path: {} -> {} (priority: {})",
3676 self.path.remote, best_path.remote_addr, best_path.priority
3677 );
3678
3679 (best_path.remote_addr, best_path.local_addr)
3680 };
3681
3682 self.migrate(now, remote_addr);
3684
3685 if local_addr != SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0) {
3687 self.local_ip = Some(local_addr.ip());
3688 }
3689
3690 self.path.challenge_pending = true;
3692
3693 Ok(())
3694 }
3695
3696 fn update_rem_cid(&mut self) {
3698 let (reset_token, retired) = match self.rem_cids.next() {
3699 Some(x) => x,
3700 None => return,
3701 };
3702
3703 self.spaces[SpaceId::Data]
3705 .pending
3706 .retire_cids
3707 .extend(retired);
3708 self.set_reset_token(reset_token);
3709 }
3710
3711 fn set_reset_token(&mut self, reset_token: ResetToken) {
3712 self.endpoint_events
3713 .push_back(EndpointEventInner::ResetToken(
3714 self.path.remote,
3715 reset_token,
3716 ));
3717 self.peer_params.stateless_reset_token = Some(reset_token);
3718 }
3719
3720 fn issue_first_cids(&mut self, now: Instant) {
3722 if self.local_cid_state.cid_len() == 0 {
3723 return;
3724 }
3725
3726 let mut n = self.peer_params.issue_cids_limit() - 1;
3728 if let ConnectionSide::Server { server_config } = &self.side {
3729 if server_config.has_preferred_address() {
3730 n -= 1;
3732 }
3733 }
3734 self.endpoint_events
3735 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3736 }
3737
3738 fn populate_packet(
3739 &mut self,
3740 now: Instant,
3741 space_id: SpaceId,
3742 buf: &mut Vec<u8>,
3743 max_size: usize,
3744 pn: u64,
3745 ) -> SentFrames {
3746 let mut sent = SentFrames::default();
3747 let space = &mut self.spaces[space_id];
3748 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3749 space.pending_acks.maybe_ack_non_eliciting();
3750
3751 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3753 buf.write(frame::FrameType::HANDSHAKE_DONE);
3754 sent.retransmits.get_or_create().handshake_done = true;
3755 self.stats.frame_tx.handshake_done =
3757 self.stats.frame_tx.handshake_done.saturating_add(1);
3758 }
3759
3760 if mem::replace(&mut space.ping_pending, false) {
3762 trace!("PING");
3763 buf.write(frame::FrameType::PING);
3764 sent.non_retransmits = true;
3765 self.stats.frame_tx.ping += 1;
3766 }
3767
3768 if mem::replace(&mut space.immediate_ack_pending, false) {
3770 trace!("IMMEDIATE_ACK");
3771 buf.write(frame::FrameType::IMMEDIATE_ACK);
3772 sent.non_retransmits = true;
3773 self.stats.frame_tx.immediate_ack += 1;
3774 }
3775
3776 if space.pending_acks.can_send() {
3778 Self::populate_acks(
3779 now,
3780 self.receiving_ecn,
3781 &mut sent,
3782 space,
3783 buf,
3784 &mut self.stats,
3785 );
3786 }
3787
3788 if mem::replace(&mut space.pending.ack_frequency, false) {
3790 let sequence_number = self.ack_frequency.next_sequence_number();
3791
3792 let config = self.config.ack_frequency_config.as_ref().unwrap();
3794
3795 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3797 self.path.rtt.get(),
3798 config,
3799 &self.peer_params,
3800 );
3801
3802 trace!(?max_ack_delay, "ACK_FREQUENCY");
3803
3804 frame::AckFrequency {
3805 sequence: sequence_number,
3806 ack_eliciting_threshold: config.ack_eliciting_threshold,
3807 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3808 reordering_threshold: config.reordering_threshold,
3809 }
3810 .encode(buf);
3811
3812 sent.retransmits.get_or_create().ack_frequency = true;
3813
3814 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3815 self.stats.frame_tx.ack_frequency += 1;
3816 }
3817
3818 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3820 if let Some(token) = self.path.challenge {
3822 self.path.challenge_pending = false;
3824 sent.non_retransmits = true;
3825 sent.requires_padding = true;
3826 trace!("PATH_CHALLENGE {:08x}", token);
3827 buf.write(frame::FrameType::PATH_CHALLENGE);
3828 buf.write(token);
3829 self.stats.frame_tx.path_challenge += 1;
3830 }
3831
3832 }
3841
3842 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3844 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3845 sent.non_retransmits = true;
3846 sent.requires_padding = true;
3847 trace!("PATH_RESPONSE {:08x}", token);
3848 buf.write(frame::FrameType::PATH_RESPONSE);
3849 buf.write(token);
3850 self.stats.frame_tx.path_response += 1;
3851 }
3852 }
3853
3854 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3856 let mut frame = match space.pending.crypto.pop_front() {
3857 Some(x) => x,
3858 None => break,
3859 };
3860
3861 let max_crypto_data_size = max_size
3866 - buf.len()
3867 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3869 - 2; let available_space = max_size - buf.len();
3873 let remaining_data = frame.data.len();
3874 let optimal_size = self
3875 .pqc_state
3876 .calculate_crypto_frame_size(available_space, remaining_data);
3877
3878 let len = frame
3879 .data
3880 .len()
3881 .min(2usize.pow(14) - 1)
3882 .min(max_crypto_data_size)
3883 .min(optimal_size);
3884
3885 let data = frame.data.split_to(len);
3886 let truncated = frame::Crypto {
3887 offset: frame.offset,
3888 data,
3889 };
3890 trace!(
3891 "CRYPTO: off {} len {}",
3892 truncated.offset,
3893 truncated.data.len()
3894 );
3895 truncated.encode(buf);
3896 self.stats.frame_tx.crypto += 1;
3897 sent.retransmits.get_or_create().crypto.push_back(truncated);
3898 if !frame.data.is_empty() {
3899 frame.offset += len as u64;
3900 space.pending.crypto.push_front(frame);
3901 }
3902 }
3903
3904 if space_id == SpaceId::Data {
3905 self.streams.write_control_frames(
3906 buf,
3907 &mut space.pending,
3908 &mut sent.retransmits,
3909 &mut self.stats.frame_tx,
3910 max_size,
3911 );
3912 }
3913
3914 while buf.len() + 44 < max_size {
3916 let issued = match space.pending.new_cids.pop() {
3917 Some(x) => x,
3918 None => break,
3919 };
3920 trace!(
3921 sequence = issued.sequence,
3922 id = %issued.id,
3923 "NEW_CONNECTION_ID"
3924 );
3925 frame::NewConnectionId {
3926 sequence: issued.sequence,
3927 retire_prior_to: self.local_cid_state.retire_prior_to(),
3928 id: issued.id,
3929 reset_token: issued.reset_token,
3930 }
3931 .encode(buf);
3932 sent.retransmits.get_or_create().new_cids.push(issued);
3933 self.stats.frame_tx.new_connection_id += 1;
3934 }
3935
3936 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3938 let seq = match space.pending.retire_cids.pop() {
3939 Some(x) => x,
3940 None => break,
3941 };
3942 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3943 buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3944 buf.write_var(seq);
3945 sent.retransmits.get_or_create().retire_cids.push(seq);
3946 self.stats.frame_tx.retire_connection_id += 1;
3947 }
3948
3949 let mut sent_datagrams = false;
3951 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3952 match self.datagrams.write(buf, max_size) {
3953 true => {
3954 sent_datagrams = true;
3955 sent.non_retransmits = true;
3956 self.stats.frame_tx.datagram += 1;
3957 }
3958 false => break,
3959 }
3960 }
3961 if self.datagrams.send_blocked && sent_datagrams {
3962 self.events.push_back(Event::DatagramsUnblocked);
3963 self.datagrams.send_blocked = false;
3964 }
3965
3966 while let Some(remote_addr) = space.pending.new_tokens.pop() {
3968 debug_assert_eq!(space_id, SpaceId::Data);
3969 let ConnectionSide::Server { server_config } = &self.side else {
3970 debug_assert!(false, "NEW_TOKEN frames should not be enqueued by clients");
3972 continue;
3973 };
3974
3975 if remote_addr != self.path.remote {
3976 continue;
3981 }
3982
3983 if self.delay_new_token_until_binding && self.peer_id_for_tokens.is_none() {
3986 space.pending.new_tokens.push(remote_addr);
3988 break;
3989 }
3990
3991 let new_token = if let Some(pid) = self.peer_id_for_tokens {
3993 let nonce_u128: u128 = self.rng.r#gen();
3996 let nonce = nonce_u128.to_le_bytes();
3997 let cid = self.rem_cids.active();
3998 let mut pt = Vec::with_capacity(32 + 1 + cid.len() + 16);
3999 pt.extend_from_slice(&pid.0);
4000 pt.push(cid.len() as u8);
4001 pt.extend_from_slice(&cid[..]);
4002 pt.extend_from_slice(&nonce);
4003 let mut tok = pt;
4004 tok.extend_from_slice(&nonce[..12]);
4005 NewToken { token: tok.into() }
4006 } else {
4007 let token = Token::new(
4008 TokenPayload::Validation {
4009 ip: remote_addr.ip(),
4010 issued: server_config.time_source.now(),
4011 },
4012 &mut self.rng,
4013 );
4014 NewToken {
4015 token: token.encode(&*server_config.token_key).into(),
4016 }
4017 };
4018
4019 if buf.len() + new_token.size() >= max_size {
4020 space.pending.new_tokens.push(remote_addr);
4021 break;
4022 }
4023
4024 new_token.encode(buf);
4025 sent.retransmits
4026 .get_or_create()
4027 .new_tokens
4028 .push(remote_addr);
4029 self.stats.frame_tx.new_token += 1;
4030 }
4031
4032 while buf.len() + frame::AddAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4034 let add_address = match space.pending.add_addresses.pop() {
4035 Some(x) => x,
4036 None => break,
4037 };
4038 trace!(
4039 sequence = %add_address.sequence,
4040 address = %add_address.address,
4041 "ADD_ADDRESS"
4042 );
4043 if self.nat_traversal_frame_config.use_rfc_format {
4045 add_address.encode_rfc(buf);
4046 } else {
4047 add_address.encode_legacy(buf);
4048 }
4049 sent.retransmits
4050 .get_or_create()
4051 .add_addresses
4052 .push(add_address);
4053 self.stats.frame_tx.add_address += 1;
4054 }
4055
4056 while buf.len() + frame::PunchMeNow::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4058 let punch_me_now = match space.pending.punch_me_now.pop() {
4059 Some(x) => x,
4060 None => break,
4061 };
4062 trace!(
4063 round = %punch_me_now.round,
4064 paired_with_sequence_number = %punch_me_now.paired_with_sequence_number,
4065 "PUNCH_ME_NOW"
4066 );
4067 if self.nat_traversal_frame_config.use_rfc_format {
4069 punch_me_now.encode_rfc(buf);
4070 } else {
4071 punch_me_now.encode_legacy(buf);
4072 }
4073 sent.retransmits
4074 .get_or_create()
4075 .punch_me_now
4076 .push(punch_me_now);
4077 self.stats.frame_tx.punch_me_now += 1;
4078 }
4079
4080 while buf.len() + frame::RemoveAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4082 let remove_address = match space.pending.remove_addresses.pop() {
4083 Some(x) => x,
4084 None => break,
4085 };
4086 trace!(
4087 sequence = %remove_address.sequence,
4088 "REMOVE_ADDRESS"
4089 );
4090 remove_address.encode(buf);
4092 sent.retransmits
4093 .get_or_create()
4094 .remove_addresses
4095 .push(remove_address);
4096 self.stats.frame_tx.remove_address += 1;
4097 }
4098
4099 while buf.len() + frame::ObservedAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data
4101 {
4102 let observed_address = match space.pending.outbound_observations.pop() {
4103 Some(x) => x,
4104 None => break,
4105 };
4106 trace!(
4107 address = %observed_address.address,
4108 "OBSERVED_ADDRESS"
4109 );
4110 observed_address.encode(buf);
4111 sent.retransmits
4112 .get_or_create()
4113 .outbound_observations
4114 .push(observed_address);
4115 self.stats.frame_tx.observed_address += 1;
4116 }
4117
4118 if space_id == SpaceId::Data {
4120 sent.stream_frames =
4121 self.streams
4122 .write_stream_frames(buf, max_size, self.config.send_fairness);
4123 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
4124 }
4125
4126 sent
4127 }
4128
4129 fn populate_acks(
4134 now: Instant,
4135 receiving_ecn: bool,
4136 sent: &mut SentFrames,
4137 space: &mut PacketSpace,
4138 buf: &mut Vec<u8>,
4139 stats: &mut ConnectionStats,
4140 ) {
4141 debug_assert!(!space.pending_acks.ranges().is_empty());
4142
4143 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
4145 let ecn = if receiving_ecn {
4146 Some(&space.ecn_counters)
4147 } else {
4148 None
4149 };
4150 sent.largest_acked = space.pending_acks.ranges().max();
4151
4152 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
4153
4154 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
4156 let delay = delay_micros >> ack_delay_exp.into_inner();
4157
4158 trace!(
4159 "ACK {:?}, Delay = {}us",
4160 space.pending_acks.ranges(),
4161 delay_micros
4162 );
4163
4164 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
4165 stats.frame_tx.acks += 1;
4166 }
4167
4168 fn close_common(&mut self) {
4169 trace!("connection closed");
4170 for &timer in &Timer::VALUES {
4171 self.timers.stop(timer);
4172 }
4173 }
4174
4175 fn set_close_timer(&mut self, now: Instant) {
4176 self.timers
4177 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
4178 }
4179
4180 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
4182 if Some(self.orig_rem_cid) != params.initial_src_cid
4183 || (self.side.is_client()
4184 && (Some(self.initial_dst_cid) != params.original_dst_cid
4185 || self.retry_src_cid != params.retry_src_cid))
4186 {
4187 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
4188 "CID authentication failure",
4189 ));
4190 }
4191
4192 self.set_peer_params(params);
4193
4194 Ok(())
4195 }
4196
4197 fn set_peer_params(&mut self, params: TransportParameters) {
4198 self.streams.set_params(¶ms);
4199 self.idle_timeout =
4200 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
4201 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
4202 if let Some(ref info) = params.preferred_address {
4203 self.rem_cids.insert(frame::NewConnectionId {
4204 sequence: 1,
4205 id: info.connection_id,
4206 reset_token: info.stateless_reset_token,
4207 retire_prior_to: 0,
4208 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
4209 }
4210 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
4211
4212 self.negotiate_nat_traversal_capability(¶ms);
4214
4215 let local_has_nat_traversal = self.config.nat_traversal_config.is_some();
4218 let local_supports_rfc = local_has_nat_traversal;
4221 self.nat_traversal_frame_config = frame::nat_traversal_unified::NatTraversalFrameConfig {
4222 use_rfc_format: local_supports_rfc && params.supports_rfc_nat_traversal(),
4224 accept_legacy: true,
4226 };
4227
4228 self.negotiate_address_discovery(¶ms);
4230
4231 self.pqc_state.update_from_peer_params(¶ms);
4233
4234 if self.pqc_state.enabled && self.pqc_state.using_pqc {
4236 trace!("PQC enabled, adjusting MTU discovery for larger handshake packets");
4237 let current_mtu = self.path.mtud.current_mtu();
4241 if current_mtu < self.pqc_state.handshake_mtu {
4242 trace!(
4243 "Current MTU {} is less than PQC handshake MTU {}, will rely on MTU discovery",
4244 current_mtu, self.pqc_state.handshake_mtu
4245 );
4246 }
4247 }
4248
4249 self.peer_params = params;
4250 self.path.mtud.on_peer_max_udp_payload_size_received(
4251 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
4252 );
4253 }
4254
4255 fn negotiate_nat_traversal_capability(&mut self, params: &TransportParameters) {
4257 let peer_nat_config = match ¶ms.nat_traversal {
4259 Some(config) => config,
4260 None => {
4261 if self.config.nat_traversal_config.is_some() {
4263 debug!(
4264 "Peer does not support NAT traversal, maintaining backward compatibility"
4265 );
4266 self.emit_nat_traversal_capability_event(false);
4267
4268 self.set_nat_traversal_compatibility_mode(false);
4270 }
4271 return;
4272 }
4273 };
4274
4275 let local_nat_config = match &self.config.nat_traversal_config {
4277 Some(config) => config,
4278 None => {
4279 debug!("NAT traversal not enabled locally, ignoring peer support");
4280 self.emit_nat_traversal_capability_event(false);
4281 self.set_nat_traversal_compatibility_mode(false);
4282 return;
4283 }
4284 };
4285
4286 info!("Both peers support NAT traversal, negotiating capabilities");
4288
4289 match self.negotiate_nat_traversal_parameters(local_nat_config, peer_nat_config) {
4291 Ok(negotiated_config) => {
4292 info!("NAT traversal capability negotiated successfully");
4293 self.emit_nat_traversal_capability_event(true);
4294
4295 self.init_nat_traversal_with_negotiated_config(&negotiated_config);
4297
4298 self.set_nat_traversal_compatibility_mode(true);
4300
4301 if matches!(
4303 negotiated_config,
4304 crate::transport_parameters::NatTraversalConfig::ClientSupport
4305 ) {
4306 self.initiate_nat_traversal_process();
4307 }
4308 }
4309 Err(e) => {
4310 warn!("NAT traversal capability negotiation failed: {}", e);
4311 self.emit_nat_traversal_capability_event(false);
4312 self.set_nat_traversal_compatibility_mode(false);
4313 }
4314 }
4315 }
4316
4317 fn emit_nat_traversal_capability_event(&mut self, negotiated: bool) {
4371 if negotiated {
4374 info!("NAT traversal capability successfully negotiated");
4375 } else {
4376 info!("NAT traversal capability not available (peer or local support missing)");
4377 }
4378
4379 }
4382
4383 fn set_nat_traversal_compatibility_mode(&mut self, enabled: bool) {
4385 if enabled {
4386 debug!("NAT traversal enabled for this connection");
4387 } else {
4389 debug!("NAT traversal disabled for this connection (backward compatibility mode)");
4390 if self.nat_traversal.is_some() {
4392 warn!("Clearing NAT traversal state due to compatibility mode");
4393 self.nat_traversal = None;
4394 }
4395 }
4396 }
4397
4398 fn negotiate_nat_traversal_parameters(
4400 &self,
4401 local_config: &crate::transport_parameters::NatTraversalConfig,
4402 peer_config: &crate::transport_parameters::NatTraversalConfig,
4403 ) -> Result<crate::transport_parameters::NatTraversalConfig, String> {
4404 match (local_config, peer_config) {
4409 (
4411 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4412 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4413 concurrency_limit,
4414 },
4415 ) => Ok(
4416 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4417 concurrency_limit: *concurrency_limit,
4418 },
4419 ),
4420 (
4422 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4423 concurrency_limit,
4424 },
4425 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4426 ) => Ok(
4427 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4428 concurrency_limit: *concurrency_limit,
4429 },
4430 ),
4431 (
4433 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4434 concurrency_limit: limit1,
4435 },
4436 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4437 concurrency_limit: limit2,
4438 },
4439 ) => Ok(
4440 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4441 concurrency_limit: (*limit1).min(*limit2),
4442 },
4443 ),
4444 (
4446 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4447 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4448 ) => Err("Both endpoints claim to be NAT traversal clients".to_string()),
4449 }
4450 }
4451
4452 fn init_nat_traversal_with_negotiated_config(
4457 &mut self,
4458 _config: &crate::transport_parameters::NatTraversalConfig,
4459 ) {
4460 let max_candidates = 50; let coordination_timeout = Duration::from_secs(10); self.nat_traversal = Some(NatTraversalState::new(max_candidates, coordination_timeout));
4467
4468 trace!("NAT traversal initialized for symmetric P2P node");
4469
4470 self.prepare_address_observation();
4473 self.schedule_candidate_discovery();
4474 self.prepare_coordination_handling();
4475 }
4476
4477 fn initiate_nat_traversal_process(&mut self) {
4479 if let Some(nat_state) = &mut self.nat_traversal {
4480 match nat_state.start_candidate_discovery() {
4481 Ok(()) => {
4482 debug!("NAT traversal process initiated - candidate discovery started");
4483 self.timers.set(
4485 Timer::NatTraversal,
4486 Instant::now() + Duration::from_millis(100),
4487 );
4488 }
4489 Err(e) => {
4490 warn!("Failed to initiate NAT traversal process: {}", e);
4491 }
4492 }
4493 }
4494 }
4495
4496 fn prepare_address_observation(&mut self) {
4498 debug!("Preparing for address observation as bootstrap node");
4499 }
4502
4503 fn schedule_candidate_discovery(&mut self) {
4505 debug!("Scheduling candidate discovery for client endpoint");
4506 self.timers.set(
4508 Timer::NatTraversal,
4509 Instant::now() + Duration::from_millis(50),
4510 );
4511 }
4512
4513 fn prepare_coordination_handling(&mut self) {
4515 debug!("Preparing to handle coordination requests as server endpoint");
4516 }
4519
4520 fn handle_nat_traversal_timeout(&mut self, now: Instant) {
4522 let timeout_result = if let Some(nat_state) = &mut self.nat_traversal {
4524 nat_state.handle_timeout(now)
4525 } else {
4526 return;
4527 };
4528
4529 match timeout_result {
4531 Ok(actions) => {
4532 for action in actions {
4533 match action {
4534 nat_traversal::TimeoutAction::RetryDiscovery => {
4535 debug!("NAT traversal timeout: retrying candidate discovery");
4536 if let Some(nat_state) = &mut self.nat_traversal {
4537 if let Err(e) = nat_state.start_candidate_discovery() {
4538 warn!("Failed to retry candidate discovery: {}", e);
4539 }
4540 }
4541 }
4542 nat_traversal::TimeoutAction::RetryCoordination => {
4543 debug!("NAT traversal timeout: retrying coordination");
4544 self.timers
4546 .set(Timer::NatTraversal, now + Duration::from_secs(2));
4547 }
4548 nat_traversal::TimeoutAction::StartValidation => {
4549 debug!("NAT traversal timeout: starting path validation");
4550 self.start_nat_traversal_validation(now);
4551 }
4552 nat_traversal::TimeoutAction::Complete => {
4553 debug!("NAT traversal completed successfully");
4554 self.timers.stop(Timer::NatTraversal);
4556 }
4557 nat_traversal::TimeoutAction::Failed => {
4558 warn!("NAT traversal failed after timeout");
4559 self.handle_nat_traversal_failure();
4561 }
4562 }
4563 }
4564 }
4565 Err(e) => {
4566 warn!("NAT traversal timeout handling failed: {}", e);
4567 self.handle_nat_traversal_failure();
4568 }
4569 }
4570 }
4571
4572 fn start_nat_traversal_validation(&mut self, now: Instant) {
4574 if let Some(nat_state) = &mut self.nat_traversal {
4575 let pairs = nat_state.get_next_validation_pairs(3);
4577
4578 for pair in pairs {
4579 let challenge = self.rng.r#gen();
4581 self.path.challenge = Some(challenge);
4582 self.path.challenge_pending = true;
4583
4584 debug!(
4585 "Starting path validation for NAT traversal candidate: {}",
4586 pair.remote_addr
4587 );
4588 }
4589
4590 self.timers
4592 .set(Timer::PathValidation, now + Duration::from_secs(3));
4593 }
4594 }
4595
4596 fn handle_nat_traversal_failure(&mut self) {
4598 warn!("NAT traversal failed, considering fallback options");
4599
4600 self.nat_traversal = None;
4602 self.timers.stop(Timer::NatTraversal);
4603
4604 debug!("NAT traversal disabled for this connection due to failure");
4611 }
4612
4613 pub fn nat_traversal_supported(&self) -> bool {
4615 self.nat_traversal.is_some()
4616 && self.config.nat_traversal_config.is_some()
4617 && self.peer_params.nat_traversal.is_some()
4618 }
4619
4620 pub fn nat_traversal_config(&self) -> Option<&crate::transport_parameters::NatTraversalConfig> {
4622 self.peer_params.nat_traversal.as_ref()
4623 }
4624
4625 pub fn nat_traversal_ready(&self) -> bool {
4627 self.nat_traversal_supported() && matches!(self.state, State::Established)
4628 }
4629
4630 #[allow(dead_code)]
4635 pub(crate) fn nat_traversal_stats(&self) -> Option<nat_traversal::NatTraversalStats> {
4636 self.nat_traversal.as_ref().map(|state| state.stats.clone())
4637 }
4638
4639 #[cfg(test)]
4643 #[allow(dead_code)]
4644 pub(crate) fn force_enable_nat_traversal(&mut self) {
4645 use crate::transport_parameters::NatTraversalConfig;
4646
4647 let config = NatTraversalConfig::ServerSupport {
4649 concurrency_limit: VarInt::from_u32(5),
4650 };
4651
4652 self.peer_params.nat_traversal = Some(config.clone());
4653 self.config = Arc::new({
4654 let mut transport_config = (*self.config).clone();
4655 transport_config.nat_traversal_config = Some(config);
4656 transport_config
4657 });
4658
4659 self.nat_traversal = Some(NatTraversalState::new(8, Duration::from_secs(10)));
4661 }
4662
4663 fn derive_peer_id_from_connection(&self) -> [u8; 32] {
4666 let mut hasher = std::collections::hash_map::DefaultHasher::new();
4668 use std::hash::Hasher;
4669 hasher.write(&self.rem_handshake_cid);
4670 hasher.write(&self.handshake_cid);
4671 hasher.write(&self.path.remote.to_string().into_bytes());
4672 let hash = hasher.finish();
4673 let mut peer_id = [0u8; 32];
4674 peer_id[..8].copy_from_slice(&hash.to_be_bytes());
4675 let cid_bytes = self.rem_handshake_cid.as_ref();
4677 let copy_len = (cid_bytes.len()).min(24);
4678 peer_id[8..8 + copy_len].copy_from_slice(&cid_bytes[..copy_len]);
4679 peer_id
4680 }
4681
4682 fn handle_add_address(
4684 &mut self,
4685 add_address: &crate::frame::AddAddress,
4686 now: Instant,
4687 ) -> Result<(), TransportError> {
4688 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4689 TransportError::PROTOCOL_VIOLATION("AddAddress frame without NAT traversal negotiation")
4690 })?;
4691
4692 match nat_state.add_remote_candidate(
4693 add_address.sequence,
4694 add_address.address,
4695 add_address.priority,
4696 now,
4697 ) {
4698 Ok(()) => {
4699 trace!(
4700 "Added remote candidate: {} (seq={}, priority={})",
4701 add_address.address, add_address.sequence, add_address.priority
4702 );
4703
4704 self.trigger_candidate_validation(add_address.address, now)?;
4706 Ok(())
4707 }
4708 Err(NatTraversalError::TooManyCandidates) => Err(TransportError::PROTOCOL_VIOLATION(
4709 "too many NAT traversal candidates",
4710 )),
4711 Err(NatTraversalError::DuplicateAddress) => {
4712 Ok(())
4714 }
4715 Err(e) => {
4716 warn!("Failed to add remote candidate: {}", e);
4717 Ok(()) }
4719 }
4720 }
4721
4722 fn handle_punch_me_now(
4726 &mut self,
4727 punch_me_now: &crate::frame::PunchMeNow,
4728 now: Instant,
4729 ) -> Result<(), TransportError> {
4730 trace!(
4731 "Received PunchMeNow: round={}, target_seq={}, local_addr={}",
4732 punch_me_now.round, punch_me_now.paired_with_sequence_number, punch_me_now.address
4733 );
4734
4735 if let Some(nat_state) = &self.nat_traversal {
4737 if nat_state.bootstrap_coordinator.is_some() {
4739 let from_peer_id = self.derive_peer_id_from_connection();
4741
4742 let punch_me_now_clone = punch_me_now.clone();
4744 drop(nat_state); match self
4747 .nat_traversal
4748 .as_mut()
4749 .unwrap()
4750 .handle_punch_me_now_frame(
4751 from_peer_id,
4752 self.path.remote,
4753 &punch_me_now_clone,
4754 now,
4755 ) {
4756 Ok(Some(coordination_frame)) => {
4757 trace!("Node coordinating PUNCH_ME_NOW between peers");
4758
4759 if let Some(target_peer_id) = punch_me_now.target_peer_id {
4761 self.endpoint_events.push_back(
4762 crate::shared::EndpointEventInner::RelayPunchMeNow(
4763 target_peer_id,
4764 coordination_frame,
4765 ),
4766 );
4767 }
4768
4769 return Ok(());
4770 }
4771 Ok(None) => {
4772 trace!("Coordination completed or no action needed");
4773 return Ok(());
4774 }
4775 Err(e) => {
4776 warn!("Coordination failed: {}", e);
4777 return Ok(());
4778 }
4779 }
4780 }
4781 }
4782
4783 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4785 TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation")
4786 })?;
4787
4788 if nat_state
4790 .handle_peer_punch_request(punch_me_now.round, now)
4791 .map_err(|_e| {
4792 TransportError::PROTOCOL_VIOLATION("Failed to handle peer punch request")
4793 })?
4794 {
4795 trace!("Coordination synchronized for round {}", punch_me_now.round);
4796
4797 let _local_addr = self
4800 .local_ip
4801 .map(|ip| SocketAddr::new(ip, 0))
4802 .unwrap_or_else(|| {
4803 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
4804 });
4805
4806 let target = nat_traversal::PunchTarget {
4807 remote_addr: punch_me_now.address,
4808 remote_sequence: punch_me_now.paired_with_sequence_number,
4809 challenge: self.rng.r#gen(),
4810 };
4811
4812 let _ = nat_state.start_coordination_round(vec![target], now);
4814 } else {
4815 debug!(
4816 "Failed to synchronize coordination for round {}",
4817 punch_me_now.round
4818 );
4819 }
4820
4821 Ok(())
4822 }
4823
4824 fn handle_remove_address(
4826 &mut self,
4827 remove_address: &crate::frame::RemoveAddress,
4828 ) -> Result<(), TransportError> {
4829 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4830 TransportError::PROTOCOL_VIOLATION(
4831 "RemoveAddress frame without NAT traversal negotiation",
4832 )
4833 })?;
4834
4835 if nat_state.remove_candidate(remove_address.sequence) {
4836 trace!(
4837 "Removed candidate with sequence {}",
4838 remove_address.sequence
4839 );
4840 } else {
4841 trace!(
4842 "Attempted to remove unknown candidate sequence {}",
4843 remove_address.sequence
4844 );
4845 }
4846
4847 Ok(())
4848 }
4849
4850 fn handle_observed_address_frame(
4852 &mut self,
4853 observed_address: &crate::frame::ObservedAddress,
4854 now: Instant,
4855 ) -> Result<(), TransportError> {
4856 println!(
4857 "DEBUG: handle_observed_address_frame: received address {}",
4858 observed_address.address
4859 );
4860 let state = self.address_discovery_state.as_mut().ok_or_else(|| {
4862 TransportError::PROTOCOL_VIOLATION(
4863 "ObservedAddress frame without address discovery negotiation",
4864 )
4865 })?;
4866
4867 if !state.enabled {
4869 return Err(TransportError::PROTOCOL_VIOLATION(
4870 "ObservedAddress frame received when address discovery is disabled",
4871 ));
4872 }
4873
4874 #[cfg(feature = "trace")]
4876 {
4877 use crate::trace_observed_address_received;
4878 trace_observed_address_received!(
4880 &self.event_log,
4881 self.trace_context.trace_id(),
4882 observed_address.address,
4883 0u64 );
4885 }
4886
4887 let path_id = 0u64; if let Some(&last_seq) = state.last_received_sequence.get(&path_id) {
4895 if observed_address.sequence_number <= last_seq {
4896 trace!(
4897 "Ignoring OBSERVED_ADDRESS frame with stale sequence number {} (last was {})",
4898 observed_address.sequence_number, last_seq
4899 );
4900 return Ok(());
4901 }
4902 }
4903
4904 state
4906 .last_received_sequence
4907 .insert(path_id, observed_address.sequence_number);
4908
4909 state.handle_observed_address(observed_address.address, path_id, now);
4911
4912 self.path
4914 .update_observed_address(observed_address.address, now);
4915
4916 trace!(
4918 "Received ObservedAddress frame: address={} for path={}",
4919 observed_address.address, path_id
4920 );
4921
4922 Ok(())
4923 }
4924
4925 pub fn queue_add_address(&mut self, sequence: VarInt, address: SocketAddr, priority: VarInt) {
4927 let add_address = frame::AddAddress {
4929 sequence,
4930 address,
4931 priority,
4932 };
4933
4934 self.spaces[SpaceId::Data]
4935 .pending
4936 .add_addresses
4937 .push(add_address);
4938 trace!(
4939 "Queued AddAddress frame: seq={}, addr={}, priority={}",
4940 sequence, address, priority
4941 );
4942 }
4943
4944 pub fn queue_punch_me_now(
4946 &mut self,
4947 round: VarInt,
4948 paired_with_sequence_number: VarInt,
4949 address: SocketAddr,
4950 ) {
4951 let punch_me_now = frame::PunchMeNow {
4952 round,
4953 paired_with_sequence_number,
4954 address,
4955 target_peer_id: None, };
4957
4958 self.spaces[SpaceId::Data]
4959 .pending
4960 .punch_me_now
4961 .push(punch_me_now);
4962 trace!(
4963 "Queued PunchMeNow frame: round={}, target={}",
4964 round, paired_with_sequence_number
4965 );
4966 }
4967
4968 pub fn queue_remove_address(&mut self, sequence: VarInt) {
4970 let remove_address = frame::RemoveAddress { sequence };
4971
4972 self.spaces[SpaceId::Data]
4973 .pending
4974 .remove_addresses
4975 .push(remove_address);
4976 trace!("Queued RemoveAddress frame: seq={}", sequence);
4977 }
4978
4979 pub fn queue_observed_address(&mut self, address: SocketAddr) {
4981 let sequence_number = if let Some(state) = &mut self.address_discovery_state {
4983 let seq = state.next_sequence_number;
4984 state.next_sequence_number =
4985 VarInt::from_u64(state.next_sequence_number.into_inner() + 1)
4986 .expect("sequence number overflow");
4987 seq
4988 } else {
4989 VarInt::from_u32(0)
4991 };
4992
4993 let observed_address = frame::ObservedAddress {
4994 sequence_number,
4995 address,
4996 };
4997 self.spaces[SpaceId::Data]
4998 .pending
4999 .outbound_observations
5000 .push(observed_address);
5001 trace!("Queued ObservedAddress frame: addr={}", address);
5002 }
5003
5004 pub fn check_for_address_observations(&mut self, now: Instant) {
5006 let Some(state) = &mut self.address_discovery_state else {
5008 return;
5009 };
5010
5011 if !state.enabled {
5013 return;
5014 }
5015
5016 let path_id = 0u64; let remote_address = self.path.remote;
5021
5022 if state.should_send_observation(path_id, now) {
5024 if let Some(frame) = state.queue_observed_address_frame(path_id, remote_address) {
5026 self.spaces[SpaceId::Data]
5028 .pending
5029 .outbound_observations
5030 .push(frame);
5031
5032 state.record_observation_sent(path_id);
5034
5035 #[cfg(feature = "trace")]
5037 {
5038 use crate::trace_observed_address_sent;
5039 trace_observed_address_sent!(
5041 &self.event_log,
5042 self.trace_context.trace_id(),
5043 remote_address,
5044 path_id
5045 );
5046 }
5047
5048 trace!(
5049 "Queued OBSERVED_ADDRESS frame for path {} with address {}",
5050 path_id, remote_address
5051 );
5052 }
5053 }
5054 }
5055
5056 fn trigger_candidate_validation(
5058 &mut self,
5059 candidate_address: SocketAddr,
5060 now: Instant,
5061 ) -> Result<(), TransportError> {
5062 let nat_state = self
5063 .nat_traversal
5064 .as_mut()
5065 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5066
5067 if nat_state
5069 .active_validations
5070 .contains_key(&candidate_address)
5071 {
5072 trace!("Validation already in progress for {}", candidate_address);
5073 return Ok(());
5074 }
5075
5076 let challenge = self.rng.r#gen::<u64>();
5078
5079 let validation_state = nat_traversal::PathValidationState {
5081 challenge,
5082 sent_at: now,
5083 retry_count: 0,
5084 max_retries: 3,
5085 coordination_round: None,
5086 timeout_state: nat_traversal::AdaptiveTimeoutState::new(),
5087 last_retry_at: None,
5088 };
5089
5090 nat_state
5092 .active_validations
5093 .insert(candidate_address, validation_state);
5094
5095 self.nat_traversal_challenges
5097 .push(candidate_address, challenge);
5098
5099 nat_state.stats.validations_succeeded += 1; trace!(
5103 "Triggered PATH_CHALLENGE validation for {} with challenge {:016x}",
5104 candidate_address, challenge
5105 );
5106
5107 Ok(())
5108 }
5109
5110 pub fn nat_traversal_state(&self) -> Option<(usize, usize)> {
5115 self.nat_traversal
5116 .as_ref()
5117 .map(|state| (state.local_candidates.len(), state.remote_candidates.len()))
5118 }
5119
5120 pub fn initiate_nat_traversal_coordination(
5122 &mut self,
5123 now: Instant,
5124 ) -> Result<(), TransportError> {
5125 let nat_state = self
5126 .nat_traversal
5127 .as_mut()
5128 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5129
5130 if nat_state.should_send_punch_request() {
5132 nat_state.generate_candidate_pairs(now);
5134
5135 let pairs = nat_state.get_next_validation_pairs(3);
5137 if pairs.is_empty() {
5138 return Err(TransportError::PROTOCOL_VIOLATION(
5139 "No candidate pairs for coordination",
5140 ));
5141 }
5142
5143 let targets: Vec<_> = pairs
5145 .into_iter()
5146 .map(|pair| nat_traversal::PunchTarget {
5147 remote_addr: pair.remote_addr,
5148 remote_sequence: pair.remote_sequence,
5149 challenge: self.rng.r#gen(),
5150 })
5151 .collect();
5152
5153 let round = nat_state
5155 .start_coordination_round(targets, now)
5156 .map_err(|_e| {
5157 TransportError::PROTOCOL_VIOLATION("Failed to start coordination round")
5158 })?;
5159
5160 let local_addr = self
5163 .local_ip
5164 .map(|ip| SocketAddr::new(ip, self.local_ip.map(|_| 0).unwrap_or(0)))
5165 .unwrap_or_else(|| {
5166 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
5167 });
5168
5169 let punch_me_now = frame::PunchMeNow {
5170 round,
5171 paired_with_sequence_number: VarInt::from_u32(0), address: local_addr,
5173 target_peer_id: None, };
5175
5176 self.spaces[SpaceId::Data]
5177 .pending
5178 .punch_me_now
5179 .push(punch_me_now);
5180 nat_state.mark_punch_request_sent();
5181
5182 trace!("Initiated NAT traversal coordination round {}", round);
5183 }
5184
5185 Ok(())
5186 }
5187
5188 pub fn validate_nat_candidates(&mut self, now: Instant) {
5190 self.generate_nat_traversal_challenges(now);
5191 }
5192
5193 pub fn send_nat_address_advertisement(
5208 &mut self,
5209 address: SocketAddr,
5210 priority: u32,
5211 ) -> Result<u64, ConnectionError> {
5212 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5214 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5215 "NAT traversal not enabled on this connection",
5216 ))
5217 })?;
5218
5219 let sequence = nat_state.next_sequence;
5221 nat_state.next_sequence =
5222 VarInt::from_u64(nat_state.next_sequence.into_inner() + 1).unwrap();
5223
5224 let now = Instant::now();
5226 nat_state.local_candidates.insert(
5227 sequence,
5228 nat_traversal::AddressCandidate {
5229 address,
5230 priority,
5231 source: nat_traversal::CandidateSource::Local,
5232 discovered_at: now,
5233 state: nat_traversal::CandidateState::New,
5234 attempt_count: 0,
5235 last_attempt: None,
5236 },
5237 );
5238
5239 nat_state.stats.local_candidates_sent += 1;
5241
5242 self.queue_add_address(sequence, address, VarInt::from_u32(priority));
5244
5245 debug!(
5246 "Queued ADD_ADDRESS frame: addr={}, priority={}, seq={}",
5247 address, priority, sequence
5248 );
5249 Ok(sequence.into_inner())
5250 }
5251
5252 pub fn send_nat_punch_coordination(
5265 &mut self,
5266 paired_with_sequence_number: u64,
5267 address: SocketAddr,
5268 round: u32,
5269 ) -> Result<(), ConnectionError> {
5270 let _nat_state = self.nat_traversal.as_ref().ok_or_else(|| {
5272 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5273 "NAT traversal not enabled on this connection",
5274 ))
5275 })?;
5276
5277 self.queue_punch_me_now(
5279 VarInt::from_u32(round),
5280 VarInt::from_u64(paired_with_sequence_number).map_err(|_| {
5281 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5282 "Invalid target sequence number",
5283 ))
5284 })?,
5285 address,
5286 );
5287
5288 debug!(
5289 "Queued PUNCH_ME_NOW frame: paired_with_seq={}, addr={}, round={}",
5290 paired_with_sequence_number, address, round
5291 );
5292 Ok(())
5293 }
5294
5295 pub fn send_nat_address_removal(&mut self, sequence: u64) -> Result<(), ConnectionError> {
5306 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5308 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5309 "NAT traversal not enabled on this connection",
5310 ))
5311 })?;
5312
5313 let sequence_varint = VarInt::from_u64(sequence).map_err(|_| {
5314 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5315 "Invalid sequence number",
5316 ))
5317 })?;
5318
5319 nat_state.local_candidates.remove(&sequence_varint);
5321
5322 self.queue_remove_address(sequence_varint);
5324
5325 debug!("Queued REMOVE_ADDRESS frame: seq={}", sequence);
5326 Ok(())
5327 }
5328
5329 #[allow(dead_code)]
5338 pub(crate) fn get_nat_traversal_stats(&self) -> Option<&nat_traversal::NatTraversalStats> {
5339 self.nat_traversal.as_ref().map(|state| &state.stats)
5340 }
5341
5342 pub fn is_nat_traversal_enabled(&self) -> bool {
5344 self.nat_traversal.is_some()
5345 }
5346
5347 fn negotiate_address_discovery(&mut self, peer_params: &TransportParameters) {
5351 let now = Instant::now();
5352
5353 match &peer_params.address_discovery {
5355 Some(peer_config) => {
5356 if let Some(state) = &mut self.address_discovery_state {
5358 if state.enabled {
5359 debug!(
5362 "Address discovery negotiated: rate={}, all_paths={}",
5363 state.max_observation_rate, state.observe_all_paths
5364 );
5365 } else {
5366 debug!("Address discovery disabled locally, ignoring peer support");
5368 }
5369 } else {
5370 self.address_discovery_state =
5372 Some(AddressDiscoveryState::new(peer_config, now));
5373 debug!("Address discovery initialized from peer config");
5374 }
5375 }
5376 _ => {
5377 if let Some(state) = &mut self.address_discovery_state {
5379 state.enabled = false;
5380 debug!("Address discovery disabled - peer doesn't support it");
5381 }
5382 }
5383 }
5384
5385 if let Some(state) = &self.address_discovery_state {
5387 if state.enabled {
5388 self.path.set_observation_rate(state.max_observation_rate);
5389 }
5390 }
5391 }
5392
5393 fn decrypt_packet(
5394 &mut self,
5395 now: Instant,
5396 packet: &mut Packet,
5397 ) -> Result<Option<u64>, Option<TransportError>> {
5398 let result = packet_crypto::decrypt_packet_body(
5399 packet,
5400 &self.spaces,
5401 self.zero_rtt_crypto.as_ref(),
5402 self.key_phase,
5403 self.prev_crypto.as_ref(),
5404 self.next_crypto.as_ref(),
5405 )?;
5406
5407 let result = match result {
5408 Some(r) => r,
5409 None => return Ok(None),
5410 };
5411
5412 if result.outgoing_key_update_acked {
5413 if let Some(prev) = self.prev_crypto.as_mut() {
5414 prev.end_packet = Some((result.number, now));
5415 self.set_key_discard_timer(now, packet.header.space());
5416 }
5417 }
5418
5419 if result.incoming_key_update {
5420 trace!("key update authenticated");
5421 self.update_keys(Some((result.number, now)), true);
5422 self.set_key_discard_timer(now, packet.header.space());
5423 }
5424
5425 Ok(Some(result.number))
5426 }
5427
5428 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5429 trace!("executing key update");
5430 let new = self
5434 .crypto
5435 .next_1rtt_keys()
5436 .expect("only called for `Data` packets");
5437 self.key_phase_size = new
5438 .local
5439 .confidentiality_limit()
5440 .saturating_sub(KEY_UPDATE_MARGIN);
5441 let old = mem::replace(
5442 &mut self.spaces[SpaceId::Data]
5443 .crypto
5444 .as_mut()
5445 .unwrap() .packet,
5447 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5448 );
5449 self.spaces[SpaceId::Data].sent_with_keys = 0;
5450 self.prev_crypto = Some(PrevCrypto {
5451 crypto: old,
5452 end_packet,
5453 update_unacked: remote,
5454 });
5455 self.key_phase = !self.key_phase;
5456 }
5457
5458 fn peer_supports_ack_frequency(&self) -> bool {
5459 self.peer_params.min_ack_delay.is_some()
5460 }
5461
5462 pub(crate) fn immediate_ack(&mut self) {
5467 self.spaces[self.highest_space].immediate_ack_pending = true;
5468 }
5469
5470 #[cfg(test)]
5472 #[allow(dead_code)]
5473 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5474 let (first_decode, remaining) = match &event.0 {
5475 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5476 first_decode,
5477 remaining,
5478 ..
5479 }) => (first_decode, remaining),
5480 _ => return None,
5481 };
5482
5483 if remaining.is_some() {
5484 panic!("Packets should never be coalesced in tests");
5485 }
5486
5487 let decrypted_header = packet_crypto::unprotect_header(
5488 first_decode.clone(),
5489 &self.spaces,
5490 self.zero_rtt_crypto.as_ref(),
5491 self.peer_params.stateless_reset_token,
5492 )?;
5493
5494 let mut packet = decrypted_header.packet?;
5495 packet_crypto::decrypt_packet_body(
5496 &mut packet,
5497 &self.spaces,
5498 self.zero_rtt_crypto.as_ref(),
5499 self.key_phase,
5500 self.prev_crypto.as_ref(),
5501 self.next_crypto.as_ref(),
5502 )
5503 .ok()?;
5504
5505 Some(packet.payload.to_vec())
5506 }
5507
5508 #[cfg(test)]
5511 #[allow(dead_code)]
5512 pub(crate) fn bytes_in_flight(&self) -> u64 {
5513 self.path.in_flight.bytes
5514 }
5515
5516 #[cfg(test)]
5518 #[allow(dead_code)]
5519 pub(crate) fn congestion_window(&self) -> u64 {
5520 self.path
5521 .congestion
5522 .window()
5523 .saturating_sub(self.path.in_flight.bytes)
5524 }
5525
5526 #[cfg(test)]
5528 #[allow(dead_code)]
5529 pub(crate) fn is_idle(&self) -> bool {
5530 Timer::VALUES
5531 .iter()
5532 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
5533 .filter_map(|&t| Some((t, self.timers.get(t)?)))
5534 .min_by_key(|&(_, time)| time)
5535 .is_none_or(|(timer, _)| timer == Timer::Idle)
5536 }
5537
5538 #[cfg(test)]
5540 #[allow(dead_code)]
5541 pub(crate) fn lost_packets(&self) -> u64 {
5542 self.lost_packets
5543 }
5544
5545 #[cfg(test)]
5547 #[allow(dead_code)]
5548 pub(crate) fn using_ecn(&self) -> bool {
5549 self.path.sending_ecn
5550 }
5551
5552 #[cfg(test)]
5554 #[allow(dead_code)]
5555 pub(crate) fn total_recvd(&self) -> u64 {
5556 self.path.total_recvd
5557 }
5558
5559 #[cfg(test)]
5560 #[allow(dead_code)]
5561 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
5562 self.local_cid_state.active_seq()
5563 }
5564
5565 #[cfg(test)]
5568 #[allow(dead_code)]
5569 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
5570 let n = self.local_cid_state.assign_retire_seq(v);
5571 self.endpoint_events
5572 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
5573 }
5574
5575 #[cfg(test)]
5577 #[allow(dead_code)]
5578 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
5579 self.rem_cids.active_seq()
5580 }
5581
5582 #[cfg(test)]
5584 #[cfg(test)]
5585 #[allow(dead_code)]
5586 pub(crate) fn path_mtu(&self) -> u16 {
5587 self.path.current_mtu()
5588 }
5589
5590 fn can_send_1rtt(&self, max_size: usize) -> bool {
5594 self.streams.can_send_stream_data()
5595 || self.path.challenge_pending
5596 || self
5597 .prev_path
5598 .as_ref()
5599 .is_some_and(|(_, x)| x.challenge_pending)
5600 || !self.path_responses.is_empty()
5601 || !self.nat_traversal_challenges.is_empty()
5602 || self
5603 .datagrams
5604 .outgoing
5605 .front()
5606 .is_some_and(|x| x.size(true) <= max_size)
5607 }
5608
5609 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
5611 for path in [&mut self.path]
5613 .into_iter()
5614 .chain(self.prev_path.as_mut().map(|(_, data)| data))
5615 {
5616 if path.remove_in_flight(pn, packet) {
5617 return;
5618 }
5619 }
5620 }
5621
5622 fn kill(&mut self, reason: ConnectionError) {
5624 self.close_common();
5625 self.error = Some(reason);
5626 self.state = State::Drained;
5627 self.endpoint_events.push_back(EndpointEventInner::Drained);
5628 }
5629
5630 fn generate_nat_traversal_challenges(&mut self, now: Instant) {
5632 let candidates: Vec<(VarInt, SocketAddr)> = if let Some(nat_state) = &self.nat_traversal {
5634 nat_state
5635 .get_validation_candidates()
5636 .into_iter()
5637 .take(3) .map(|(seq, candidate)| (seq, candidate.address))
5639 .collect()
5640 } else {
5641 return;
5642 };
5643
5644 if candidates.is_empty() {
5645 return;
5646 }
5647
5648 if let Some(nat_state) = &mut self.nat_traversal {
5650 for (seq, address) in candidates {
5651 let challenge: u64 = self.rng.r#gen();
5653
5654 if let Err(e) = nat_state.start_validation(seq, challenge, now) {
5656 debug!("Failed to start validation for candidate {}: {}", seq, e);
5657 continue;
5658 }
5659
5660 self.nat_traversal_challenges.push(address, challenge);
5662 trace!(
5663 "Queuing NAT validation PATH_CHALLENGE for {} with token {:08x}",
5664 address, challenge
5665 );
5666 }
5667 }
5668 }
5669
5670 pub fn current_mtu(&self) -> u16 {
5674 self.path.current_mtu()
5675 }
5676
5677 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
5684 let pn_len = match pn {
5685 Some(pn) => PacketNumber::new(
5686 pn,
5687 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
5688 )
5689 .len(),
5690 None => 4,
5692 };
5693
5694 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
5696 }
5697
5698 fn tag_len_1rtt(&self) -> usize {
5699 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
5700 Some(crypto) => Some(&*crypto.packet.local),
5701 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
5702 };
5703 key.map_or(16, |x| x.tag_len())
5707 }
5708
5709 fn on_path_validated(&mut self) {
5711 self.path.validated = true;
5712 let ConnectionSide::Server { server_config } = &self.side else {
5713 return;
5714 };
5715 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
5716 new_tokens.clear();
5717 for _ in 0..server_config.validation_token.sent {
5718 new_tokens.push(self.path.remote);
5719 }
5720 }
5721}
5722
5723impl fmt::Debug for Connection {
5724 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5725 f.debug_struct("Connection")
5726 .field("handshake_cid", &self.handshake_cid)
5727 .finish()
5728 }
5729}
5730
5731enum ConnectionSide {
5733 Client {
5734 token: Bytes,
5736 token_store: Arc<dyn TokenStore>,
5737 server_name: String,
5738 },
5739 Server {
5740 server_config: Arc<ServerConfig>,
5741 },
5742}
5743
5744impl ConnectionSide {
5745 fn remote_may_migrate(&self) -> bool {
5746 match self {
5747 Self::Server { server_config } => server_config.migration,
5748 Self::Client { .. } => false,
5749 }
5750 }
5751
5752 fn is_client(&self) -> bool {
5753 self.side().is_client()
5754 }
5755
5756 fn is_server(&self) -> bool {
5757 self.side().is_server()
5758 }
5759
5760 fn side(&self) -> Side {
5761 match *self {
5762 Self::Client { .. } => Side::Client,
5763 Self::Server { .. } => Side::Server,
5764 }
5765 }
5766}
5767
5768impl From<SideArgs> for ConnectionSide {
5769 fn from(side: SideArgs) -> Self {
5770 match side {
5771 SideArgs::Client {
5772 token_store,
5773 server_name,
5774 } => Self::Client {
5775 token: token_store.take(&server_name).unwrap_or_default(),
5776 token_store,
5777 server_name,
5778 },
5779 SideArgs::Server {
5780 server_config,
5781 pref_addr_cid: _,
5782 path_validated: _,
5783 } => Self::Server { server_config },
5784 }
5785 }
5786}
5787
5788pub(crate) enum SideArgs {
5790 Client {
5791 token_store: Arc<dyn TokenStore>,
5792 server_name: String,
5793 },
5794 Server {
5795 server_config: Arc<ServerConfig>,
5796 pref_addr_cid: Option<ConnectionId>,
5797 path_validated: bool,
5798 },
5799}
5800
5801impl SideArgs {
5802 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
5803 match *self {
5804 Self::Client { .. } => None,
5805 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
5806 }
5807 }
5808
5809 pub(crate) fn path_validated(&self) -> bool {
5810 match *self {
5811 Self::Client { .. } => true,
5812 Self::Server { path_validated, .. } => path_validated,
5813 }
5814 }
5815
5816 pub(crate) fn side(&self) -> Side {
5817 match *self {
5818 Self::Client { .. } => Side::Client,
5819 Self::Server { .. } => Side::Server,
5820 }
5821 }
5822}
5823
5824#[derive(Debug, Error, Clone, PartialEq, Eq)]
5826pub enum ConnectionError {
5827 #[error("peer doesn't implement any supported version")]
5829 VersionMismatch,
5830 #[error(transparent)]
5832 TransportError(#[from] TransportError),
5833 #[error("aborted by peer: {0}")]
5835 ConnectionClosed(frame::ConnectionClose),
5836 #[error("closed by peer: {0}")]
5838 ApplicationClosed(frame::ApplicationClose),
5839 #[error("reset by peer")]
5841 Reset,
5842 #[error("timed out")]
5848 TimedOut,
5849 #[error("closed")]
5851 LocallyClosed,
5852 #[error("CIDs exhausted")]
5856 CidsExhausted,
5857}
5858
5859impl From<Close> for ConnectionError {
5860 fn from(x: Close) -> Self {
5861 match x {
5862 Close::Connection(reason) => Self::ConnectionClosed(reason),
5863 Close::Application(reason) => Self::ApplicationClosed(reason),
5864 }
5865 }
5866}
5867
5868impl From<ConnectionError> for io::Error {
5870 fn from(x: ConnectionError) -> Self {
5871 use ConnectionError::*;
5872 let kind = match x {
5873 TimedOut => io::ErrorKind::TimedOut,
5874 Reset => io::ErrorKind::ConnectionReset,
5875 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
5876 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
5877 io::ErrorKind::Other
5878 }
5879 };
5880 Self::new(kind, x)
5881 }
5882}
5883
5884#[derive(Clone, Debug)]
5885pub enum State {
5887 Handshake(state::Handshake),
5889 Established,
5891 Closed(state::Closed),
5893 Draining,
5895 Drained,
5897}
5898
5899impl State {
5900 fn closed<R: Into<Close>>(reason: R) -> Self {
5901 Self::Closed(state::Closed {
5902 reason: reason.into(),
5903 })
5904 }
5905
5906 fn is_handshake(&self) -> bool {
5907 matches!(*self, Self::Handshake(_))
5908 }
5909
5910 fn is_established(&self) -> bool {
5911 matches!(*self, Self::Established)
5912 }
5913
5914 fn is_closed(&self) -> bool {
5915 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
5916 }
5917
5918 fn is_drained(&self) -> bool {
5919 matches!(*self, Self::Drained)
5920 }
5921}
5922
5923mod state {
5924 use super::*;
5925
5926 #[derive(Clone, Debug)]
5927 pub struct Handshake {
5928 pub(super) rem_cid_set: bool,
5932 pub(super) expected_token: Bytes,
5936 pub(super) client_hello: Option<Bytes>,
5940 }
5941
5942 #[derive(Clone, Debug)]
5943 pub struct Closed {
5944 pub(super) reason: Close,
5945 }
5946}
5947
5948#[derive(Debug)]
5950pub enum Event {
5951 HandshakeDataReady,
5953 Connected,
5955 ConnectionLost {
5959 reason: ConnectionError,
5961 },
5962 Stream(StreamEvent),
5964 DatagramReceived,
5966 DatagramsUnblocked,
5968}
5969
5970fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
5971 if x > y { x - y } else { Duration::ZERO }
5972}
5973
5974fn get_max_ack_delay(params: &TransportParameters) -> Duration {
5975 Duration::from_micros(params.max_ack_delay.0 * 1000)
5976}
5977
5978const MAX_BACKOFF_EXPONENT: u32 = 16;
5980
5981const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
5989
5990const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
5996 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
5997
5998const KEY_UPDATE_MARGIN: u64 = 10_000;
6002
6003#[derive(Default)]
6004struct SentFrames {
6005 retransmits: ThinRetransmits,
6006 largest_acked: Option<u64>,
6007 stream_frames: StreamMetaVec,
6008 non_retransmits: bool,
6010 requires_padding: bool,
6011}
6012
6013impl SentFrames {
6014 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6016 self.largest_acked.is_some()
6017 && !self.non_retransmits
6018 && self.stream_frames.is_empty()
6019 && self.retransmits.is_empty(streams)
6020 }
6021}
6022
6023fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6031 match (x, y) {
6032 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6033 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6034 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6035 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6036 }
6037}
6038
6039#[derive(Debug, Clone)]
6041pub(crate) struct PqcState {
6042 enabled: bool,
6044 #[allow(dead_code)]
6046 algorithms: Option<crate::transport_parameters::PqcAlgorithms>,
6047 handshake_mtu: u16,
6049 using_pqc: bool,
6051 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler,
6053}
6054
6055#[allow(dead_code)]
6056impl PqcState {
6057 fn new() -> Self {
6058 Self {
6059 enabled: false,
6060 algorithms: None,
6061 handshake_mtu: MIN_INITIAL_SIZE,
6062 using_pqc: false,
6063 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler::new(),
6064 }
6065 }
6066
6067 fn min_initial_size(&self) -> u16 {
6069 if self.enabled && self.using_pqc {
6070 std::cmp::max(self.handshake_mtu, 4096)
6072 } else {
6073 MIN_INITIAL_SIZE
6074 }
6075 }
6076
6077 fn update_from_peer_params(&mut self, params: &TransportParameters) {
6079 if let Some(ref algorithms) = params.pqc_algorithms {
6080 self.enabled = true;
6081 self.algorithms = Some(algorithms.clone());
6082 if algorithms.ml_kem_768
6084 || algorithms.ml_dsa_65
6085 || algorithms.hybrid_x25519_ml_kem
6086 || algorithms.hybrid_ed25519_ml_dsa
6087 {
6088 self.using_pqc = true;
6089 self.handshake_mtu = 4096; }
6091 }
6092 }
6093
6094 fn detect_pqc_from_crypto(&mut self, crypto_data: &[u8], space: SpaceId) {
6096 if !self.enabled {
6097 return;
6098 }
6099 if self.packet_handler.detect_pqc_handshake(crypto_data, space) {
6100 self.using_pqc = true;
6101 self.handshake_mtu = self.packet_handler.get_min_packet_size(space);
6103 }
6104 }
6105
6106 fn should_trigger_mtu_discovery(&mut self) -> bool {
6108 self.packet_handler.should_trigger_mtu_discovery()
6109 }
6110
6111 fn get_mtu_config(&self) -> MtuDiscoveryConfig {
6113 self.packet_handler.get_pqc_mtu_config()
6114 }
6115
6116 fn calculate_crypto_frame_size(&self, available_space: usize, remaining_data: usize) -> usize {
6118 self.packet_handler
6119 .calculate_crypto_frame_size(available_space, remaining_data)
6120 }
6121
6122 fn should_adjust_coalescing(&self, current_size: usize, space: SpaceId) -> bool {
6124 self.packet_handler
6125 .adjust_coalescing_for_pqc(current_size, space)
6126 }
6127
6128 fn on_packet_sent(&mut self, space: SpaceId, size: u16) {
6130 self.packet_handler.on_packet_sent(space, size);
6131 }
6132
6133 fn reset(&mut self) {
6135 self.enabled = false;
6136 self.algorithms = None;
6137 self.handshake_mtu = MIN_INITIAL_SIZE;
6138 self.using_pqc = false;
6139 self.packet_handler.reset();
6140 }
6141}
6142
6143impl Default for PqcState {
6144 fn default() -> Self {
6145 Self::new()
6146 }
6147}
6148
6149#[derive(Debug, Clone)]
6151pub(crate) struct AddressDiscoveryState {
6152 enabled: bool,
6154 max_observation_rate: u8,
6156 observe_all_paths: bool,
6158 sent_observations: std::collections::HashMap<u64, paths::PathAddressInfo>,
6160 received_observations: std::collections::HashMap<u64, paths::PathAddressInfo>,
6162 rate_limiter: AddressObservationRateLimiter,
6164 received_history: Vec<ObservedAddressEvent>,
6166 bootstrap_mode: bool,
6168 next_sequence_number: VarInt,
6170 last_received_sequence: std::collections::HashMap<u64, VarInt>,
6172 frames_sent: u64,
6174}
6175
6176#[derive(Debug, Clone, PartialEq, Eq)]
6178struct ObservedAddressEvent {
6179 address: SocketAddr,
6181 received_at: Instant,
6183 path_id: u64,
6185}
6186
6187#[derive(Debug, Clone)]
6189struct AddressObservationRateLimiter {
6190 tokens: f64,
6192 max_tokens: f64,
6194 rate: f64,
6196 last_update: Instant,
6198}
6199
6200#[allow(dead_code)]
6201impl AddressDiscoveryState {
6202 fn new(config: &crate::transport_parameters::AddressDiscoveryConfig, now: Instant) -> Self {
6204 use crate::transport_parameters::AddressDiscoveryConfig::*;
6205
6206 let (enabled, _can_send, _can_receive) = match config {
6208 SendOnly => (true, true, false),
6209 ReceiveOnly => (true, false, true),
6210 SendAndReceive => (true, true, true),
6211 };
6212
6213 let max_observation_rate = 10u8; let observe_all_paths = false; Self {
6219 enabled,
6220 max_observation_rate,
6221 observe_all_paths,
6222 sent_observations: std::collections::HashMap::new(),
6223 received_observations: std::collections::HashMap::new(),
6224 rate_limiter: AddressObservationRateLimiter::new(max_observation_rate, now),
6225 received_history: Vec::new(),
6226 bootstrap_mode: false,
6227 next_sequence_number: VarInt::from_u32(0),
6228 last_received_sequence: std::collections::HashMap::new(),
6229 frames_sent: 0,
6230 }
6231 }
6232
6233 fn should_send_observation(&mut self, path_id: u64, now: Instant) -> bool {
6235 if !self.should_observe_path(path_id) {
6237 return false;
6238 }
6239
6240 let needs_observation = match self.sent_observations.get(&path_id) {
6242 Some(info) => info.observed_address.is_none() || !info.notified,
6243 None => true,
6244 };
6245
6246 if !needs_observation {
6247 return false;
6248 }
6249
6250 self.rate_limiter.try_consume(1.0, now)
6252 }
6253
6254 fn record_observation_sent(&mut self, path_id: u64) {
6256 if let Some(info) = self.sent_observations.get_mut(&path_id) {
6257 info.mark_notified();
6258 }
6259 }
6260
6261 fn handle_observed_address(&mut self, address: SocketAddr, path_id: u64, now: Instant) {
6263 if !self.enabled {
6264 return;
6265 }
6266
6267 self.received_history.push(ObservedAddressEvent {
6268 address,
6269 received_at: now,
6270 path_id,
6271 });
6272
6273 let info = self
6275 .received_observations
6276 .entry(path_id)
6277 .or_insert_with(paths::PathAddressInfo::new);
6278 info.update_observed_address(address, now);
6279 }
6280
6281 pub(crate) fn get_observed_address(&self, path_id: u64) -> Option<SocketAddr> {
6283 self.received_observations
6284 .get(&path_id)
6285 .and_then(|info| info.observed_address)
6286 }
6287
6288 pub(crate) fn get_all_received_history(&self) -> Vec<SocketAddr> {
6290 self.received_observations
6291 .values()
6292 .filter_map(|info| info.observed_address)
6293 .collect()
6294 }
6295
6296 pub(crate) fn stats(&self) -> AddressDiscoveryStats {
6298 AddressDiscoveryStats {
6299 frames_sent: self.frames_sent,
6300 frames_received: self.received_history.len() as u64,
6301 addresses_discovered: self
6302 .received_observations
6303 .values()
6304 .filter(|info| info.observed_address.is_some())
6305 .count() as u64,
6306 address_changes_detected: 0, }
6308 }
6309
6310 fn has_unnotified_changes(&self) -> bool {
6316 let has_unsent = self
6318 .sent_observations
6319 .values()
6320 .any(|info| info.observed_address.is_some() && !info.notified);
6321
6322 let has_unreceived = self
6324 .received_observations
6325 .values()
6326 .any(|info| info.observed_address.is_some() && !info.notified);
6327
6328 has_unsent || has_unreceived
6329 }
6330
6331 fn queue_observed_address_frame(
6333 &mut self,
6334 path_id: u64,
6335 address: SocketAddr,
6336 ) -> Option<frame::ObservedAddress> {
6337 if !self.enabled {
6339 return None;
6340 }
6341
6342 if !self.observe_all_paths && path_id != 0 {
6344 return None;
6345 }
6346
6347 if let Some(info) = self.sent_observations.get(&path_id) {
6349 if info.notified {
6350 return None;
6351 }
6352 }
6353
6354 if self.rate_limiter.tokens < 1.0 {
6356 return None;
6357 }
6358
6359 self.rate_limiter.tokens -= 1.0;
6361
6362 let info = self
6364 .sent_observations
6365 .entry(path_id)
6366 .or_insert_with(paths::PathAddressInfo::new);
6367 info.observed_address = Some(address);
6368 info.notified = true;
6369
6370 println!(
6371 "DEBUG: queue_observed_address_frame: ACTUALLY QUEUING frame for path {} with address {}",
6372 path_id, address
6373 );
6374
6375 let sequence_number = self.next_sequence_number;
6377 self.next_sequence_number = VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6378 .expect("sequence number overflow");
6379
6380 Some(frame::ObservedAddress {
6381 sequence_number,
6382 address,
6383 })
6384 }
6385
6386 fn check_for_address_observations(
6388 &mut self,
6389 _current_path: u64,
6390 peer_supports_address_discovery: bool,
6391 now: Instant,
6392 ) -> Vec<frame::ObservedAddress> {
6393 let mut frames = Vec::new();
6394
6395 if !self.enabled || !peer_supports_address_discovery {
6397 return frames;
6398 }
6399
6400 self.rate_limiter.update_tokens(now);
6402
6403 let paths_to_notify: Vec<u64> = self
6405 .sent_observations
6406 .iter()
6407 .filter_map(|(&path_id, info)| {
6408 if info.observed_address.is_some() && !info.notified {
6409 Some(path_id)
6410 } else {
6411 None
6412 }
6413 })
6414 .collect();
6415
6416 for path_id in paths_to_notify {
6418 if !self.should_observe_path(path_id) {
6420 continue;
6421 }
6422
6423 if !self.bootstrap_mode && self.rate_limiter.tokens < 1.0 {
6425 break; }
6427
6428 if let Some(info) = self.sent_observations.get_mut(&path_id) {
6430 if let Some(address) = info.observed_address {
6431 if self.bootstrap_mode {
6433 self.rate_limiter.tokens -= 0.2; } else {
6435 self.rate_limiter.tokens -= 1.0;
6436 }
6437
6438 info.notified = true;
6440
6441 let sequence_number = self.next_sequence_number;
6443 self.next_sequence_number =
6444 VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6445 .expect("sequence number overflow");
6446
6447 self.frames_sent += 1;
6448
6449 frames.push(frame::ObservedAddress {
6450 sequence_number,
6451 address,
6452 });
6453 }
6454 }
6455 }
6456
6457 frames
6458 }
6459
6460 fn update_rate_limit(&mut self, new_rate: f64) {
6462 self.max_observation_rate = new_rate as u8;
6463 self.rate_limiter.set_rate(new_rate as u8);
6464 }
6465
6466 fn from_transport_params(params: &TransportParameters) -> Option<Self> {
6468 params
6469 .address_discovery
6470 .as_ref()
6471 .map(|config| Self::new(config, Instant::now()))
6472 }
6473
6474 #[cfg(test)]
6476 fn new_with_params(enabled: bool, max_rate: f64, observe_all_paths: bool) -> Self {
6477 if !enabled {
6479 return Self {
6481 enabled: false,
6482 max_observation_rate: max_rate as u8,
6483 observe_all_paths,
6484 sent_observations: std::collections::HashMap::new(),
6485 received_observations: std::collections::HashMap::new(),
6486 rate_limiter: AddressObservationRateLimiter::new(max_rate as u8, Instant::now()),
6487 received_history: Vec::new(),
6488 bootstrap_mode: false,
6489 next_sequence_number: VarInt::from_u32(0),
6490 last_received_sequence: std::collections::HashMap::new(),
6491 frames_sent: 0,
6492 };
6493 }
6494
6495 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6497 let mut state = Self::new(&config, Instant::now());
6498 state.max_observation_rate = max_rate as u8;
6499 state.observe_all_paths = observe_all_paths;
6500 state.rate_limiter = AddressObservationRateLimiter::new(max_rate as u8, Instant::now());
6501 state
6502 }
6503
6504 fn set_bootstrap_mode(&mut self, enabled: bool) {
6506 self.bootstrap_mode = enabled;
6507 if enabled {
6509 let bootstrap_rate = self.get_effective_rate_limit();
6510 self.rate_limiter.rate = bootstrap_rate;
6511 self.rate_limiter.max_tokens = bootstrap_rate * 2.0; self.rate_limiter.tokens = self.rate_limiter.max_tokens;
6514 }
6515 }
6516
6517 fn is_bootstrap_mode(&self) -> bool {
6519 self.bootstrap_mode
6520 }
6521
6522 fn get_effective_rate_limit(&self) -> f64 {
6524 if self.bootstrap_mode {
6525 (self.max_observation_rate as f64) * 5.0
6527 } else {
6528 self.max_observation_rate as f64
6529 }
6530 }
6531
6532 fn should_observe_path(&self, path_id: u64) -> bool {
6534 if !self.enabled {
6535 return false;
6536 }
6537
6538 if self.bootstrap_mode {
6540 return true;
6541 }
6542
6543 self.observe_all_paths || path_id == 0
6545 }
6546
6547 fn should_send_observation_immediately(&self, is_new_connection: bool) -> bool {
6549 self.bootstrap_mode && is_new_connection
6550 }
6551}
6552
6553#[allow(dead_code)]
6554impl AddressObservationRateLimiter {
6555 fn new(rate: u8, now: Instant) -> Self {
6557 let rate_f64 = rate as f64;
6558 Self {
6559 tokens: rate_f64,
6560 max_tokens: rate_f64,
6561 rate: rate_f64,
6562 last_update: now,
6563 }
6564 }
6565
6566 fn try_consume(&mut self, tokens: f64, now: Instant) -> bool {
6568 self.update_tokens(now);
6569
6570 if self.tokens >= tokens {
6571 self.tokens -= tokens;
6572 true
6573 } else {
6574 false
6575 }
6576 }
6577
6578 fn update_tokens(&mut self, now: Instant) {
6580 let elapsed = now.saturating_duration_since(self.last_update);
6581 let new_tokens = elapsed.as_secs_f64() * self.rate;
6582 self.tokens = (self.tokens + new_tokens).min(self.max_tokens);
6583 self.last_update = now;
6584 }
6585
6586 fn set_rate(&mut self, rate: u8) {
6588 let rate_f64 = rate as f64;
6589 self.rate = rate_f64;
6590 self.max_tokens = rate_f64;
6591 if self.tokens > self.max_tokens {
6593 self.tokens = self.max_tokens;
6594 }
6595 }
6596}
6597
6598#[cfg(test)]
6599mod tests {
6600 use super::*;
6601 use crate::transport_parameters::AddressDiscoveryConfig;
6602 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
6603
6604 #[test]
6605 fn address_discovery_state_new() {
6606 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6607 let now = Instant::now();
6608 let state = AddressDiscoveryState::new(&config, now);
6609
6610 assert!(state.enabled);
6611 assert_eq!(state.max_observation_rate, 10);
6612 assert!(!state.observe_all_paths);
6613 assert!(state.sent_observations.is_empty());
6614 assert!(state.received_observations.is_empty());
6615 assert!(state.received_history.is_empty());
6616 assert_eq!(state.rate_limiter.tokens, 10.0);
6617 }
6618
6619 #[test]
6620 fn address_discovery_state_disabled() {
6621 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6622 let now = Instant::now();
6623 let mut state = AddressDiscoveryState::new(&config, now);
6624
6625 state.enabled = false;
6627
6628 assert!(!state.should_send_observation(0, now));
6630 }
6631
6632 #[test]
6633 fn address_discovery_state_should_send_observation() {
6634 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6635 let now = Instant::now();
6636 let mut state = AddressDiscoveryState::new(&config, now);
6637
6638 assert!(state.should_send_observation(0, now));
6640
6641 let mut path_info = paths::PathAddressInfo::new();
6643 path_info.update_observed_address(
6644 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6645 now,
6646 );
6647 path_info.mark_notified();
6648 state.sent_observations.insert(0, path_info);
6649
6650 assert!(!state.should_send_observation(0, now));
6652
6653 assert!(!state.should_send_observation(1, now));
6655 }
6656
6657 #[test]
6658 fn address_discovery_state_rate_limiting() {
6659 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6660 let now = Instant::now();
6661 let mut state = AddressDiscoveryState::new(&config, now);
6662
6663 state.observe_all_paths = true;
6665
6666 assert!(state.should_send_observation(0, now));
6668
6669 state.rate_limiter.try_consume(9.0, now); assert!(!state.should_send_observation(0, now));
6674
6675 let later = now + Duration::from_secs(1);
6677 state.rate_limiter.update_tokens(later);
6678 assert!(state.should_send_observation(0, later));
6679 }
6680
6681 #[test]
6682 fn address_discovery_state_handle_observed_address() {
6683 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6684 let now = Instant::now();
6685 let mut state = AddressDiscoveryState::new(&config, now);
6686
6687 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6688 let addr2 = SocketAddr::new(
6689 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
6690 8080,
6691 );
6692
6693 state.handle_observed_address(addr1, 0, now);
6695 assert_eq!(state.received_history.len(), 1);
6696 assert_eq!(state.received_history[0].address, addr1);
6697 assert_eq!(state.received_history[0].path_id, 0);
6698
6699 let later = now + Duration::from_millis(100);
6701 state.handle_observed_address(addr2, 1, later);
6702 assert_eq!(state.received_history.len(), 2);
6703 assert_eq!(state.received_history[1].address, addr2);
6704 assert_eq!(state.received_history[1].path_id, 1);
6705 }
6706
6707 #[test]
6708 fn address_discovery_state_get_observed_address() {
6709 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6710 let now = Instant::now();
6711 let mut state = AddressDiscoveryState::new(&config, now);
6712
6713 assert_eq!(state.get_observed_address(0), None);
6715
6716 let mut path_info = paths::PathAddressInfo::new();
6718 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80);
6719 path_info.update_observed_address(addr, now);
6720 state.received_observations.insert(0, path_info);
6721
6722 assert_eq!(state.get_observed_address(0), Some(addr));
6724 assert_eq!(state.get_observed_address(1), None);
6725 }
6726
6727 #[test]
6728 fn address_discovery_state_unnotified_changes() {
6729 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6730 let now = Instant::now();
6731 let mut state = AddressDiscoveryState::new(&config, now);
6732
6733 assert!(!state.has_unnotified_changes());
6735
6736 let mut path_info = paths::PathAddressInfo::new();
6738 path_info.update_observed_address(
6739 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6740 now,
6741 );
6742 state.sent_observations.insert(0, path_info);
6743
6744 assert!(state.has_unnotified_changes());
6746
6747 state.record_observation_sent(0);
6749 assert!(!state.has_unnotified_changes());
6750 }
6751
6752 #[test]
6753 fn address_observation_rate_limiter_token_bucket() {
6754 let now = Instant::now();
6755 let mut limiter = AddressObservationRateLimiter::new(5, now); assert_eq!(limiter.tokens, 5.0);
6759 assert_eq!(limiter.max_tokens, 5.0);
6760 assert_eq!(limiter.rate, 5.0);
6761
6762 assert!(limiter.try_consume(3.0, now));
6764 assert_eq!(limiter.tokens, 2.0);
6765
6766 assert!(!limiter.try_consume(3.0, now));
6768 assert_eq!(limiter.tokens, 2.0);
6769
6770 let later = now + Duration::from_secs(1);
6772 limiter.update_tokens(later);
6773 assert_eq!(limiter.tokens, 5.0); let half_sec = now + Duration::from_millis(500);
6777 let mut limiter2 = AddressObservationRateLimiter::new(5, now);
6778 limiter2.try_consume(3.0, now);
6779 limiter2.update_tokens(half_sec);
6780 assert_eq!(limiter2.tokens, 4.5); }
6782
6783 #[test]
6785 fn connection_initializes_address_discovery_state_default() {
6786 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6789 let state = AddressDiscoveryState::new(&config, Instant::now());
6790 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
6793 }
6794
6795 #[test]
6796 fn connection_initializes_with_address_discovery_enabled() {
6797 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6799 let state = AddressDiscoveryState::new(&config, Instant::now());
6800 assert!(state.enabled);
6801 assert_eq!(state.max_observation_rate, 10);
6802 assert!(!state.observe_all_paths);
6803 }
6804
6805 #[test]
6806 fn connection_address_discovery_enabled_by_default() {
6807 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6809 let state = AddressDiscoveryState::new(&config, Instant::now());
6810 assert!(state.enabled); }
6812
6813 #[test]
6814 fn negotiate_max_idle_timeout_commutative() {
6815 let test_params = [
6816 (None, None, None),
6817 (None, Some(VarInt(0)), None),
6818 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6819 (Some(VarInt(0)), Some(VarInt(0)), None),
6820 (
6821 Some(VarInt(2)),
6822 Some(VarInt(0)),
6823 Some(Duration::from_millis(2)),
6824 ),
6825 (
6826 Some(VarInt(1)),
6827 Some(VarInt(4)),
6828 Some(Duration::from_millis(1)),
6829 ),
6830 ];
6831
6832 for (left, right, result) in test_params {
6833 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6834 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6835 }
6836 }
6837
6838 #[test]
6839 fn path_creation_initializes_address_discovery() {
6840 let config = TransportConfig::default();
6841 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6842 let now = Instant::now();
6843
6844 let path = paths::PathData::new(remote, false, None, now, &config);
6846
6847 assert!(path.address_info.observed_address.is_none());
6849 assert!(path.address_info.last_observed.is_none());
6850 assert_eq!(path.address_info.observation_count, 0);
6851 assert!(!path.address_info.notified);
6852
6853 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6855 assert_eq!(path.observation_rate_limiter.max_tokens, 10.0);
6856 assert_eq!(path.observation_rate_limiter.tokens, 10.0);
6857 }
6858
6859 #[test]
6860 fn path_migration_resets_address_discovery() {
6861 let config = TransportConfig::default();
6862 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6863 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6864 let now = Instant::now();
6865
6866 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
6868 path1.update_observed_address(remote1, now);
6869 path1.mark_address_notified();
6870 path1.consume_observation_token(now);
6871 path1.set_observation_rate(20);
6872
6873 let path2 = paths::PathData::from_previous(remote2, &path1, now);
6875
6876 assert!(path2.address_info.observed_address.is_none());
6878 assert!(path2.address_info.last_observed.is_none());
6879 assert_eq!(path2.address_info.observation_count, 0);
6880 assert!(!path2.address_info.notified);
6881
6882 assert_eq!(path2.observation_rate_limiter.rate, 20.0);
6884 assert_eq!(path2.observation_rate_limiter.tokens, 20.0);
6885 }
6886
6887 #[test]
6888 fn connection_path_updates_observation_rate() {
6889 let config = TransportConfig::default();
6890 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 42);
6891 let now = Instant::now();
6892
6893 let mut path = paths::PathData::new(remote, false, None, now, &config);
6894
6895 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6897
6898 path.set_observation_rate(25);
6900 assert_eq!(path.observation_rate_limiter.rate, 25.0);
6901 assert_eq!(path.observation_rate_limiter.max_tokens, 25.0);
6902
6903 path.observation_rate_limiter.tokens = 30.0; path.set_observation_rate(20);
6906 assert_eq!(path.observation_rate_limiter.tokens, 20.0); }
6908
6909 #[test]
6910 fn path_validation_preserves_discovery_state() {
6911 let config = TransportConfig::default();
6912 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6913 let now = Instant::now();
6914
6915 let mut path = paths::PathData::new(remote, false, None, now, &config);
6916
6917 let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 5678);
6919 path.update_observed_address(observed, now);
6920 path.set_observation_rate(15);
6921
6922 path.validated = true;
6924
6925 assert_eq!(path.address_info.observed_address, Some(observed));
6927 assert_eq!(path.observation_rate_limiter.rate, 15.0);
6928 }
6929
6930 #[test]
6931 fn address_discovery_state_initialization() {
6932 let state = AddressDiscoveryState::new_with_params(true, 30.0, true);
6934
6935 assert!(state.enabled);
6936 assert_eq!(state.max_observation_rate, 30);
6937 assert!(state.observe_all_paths);
6938 assert!(state.sent_observations.is_empty());
6939 assert!(state.received_observations.is_empty());
6940 assert!(state.received_history.is_empty());
6941 }
6942
6943 #[test]
6945 fn handle_observed_address_frame_basic() {
6946 let config = AddressDiscoveryConfig::SendAndReceive;
6947 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6948 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6949 let now = Instant::now();
6950 let path_id = 0;
6951
6952 state.handle_observed_address(addr, path_id, now);
6954
6955 assert_eq!(state.received_history.len(), 1);
6957 assert_eq!(state.received_history[0].address, addr);
6958 assert_eq!(state.received_history[0].path_id, path_id);
6959 assert_eq!(state.received_history[0].received_at, now);
6960
6961 assert!(state.received_observations.contains_key(&path_id));
6963 let path_info = &state.received_observations[&path_id];
6964 assert_eq!(path_info.observed_address, Some(addr));
6965 assert_eq!(path_info.last_observed, Some(now));
6966 assert_eq!(path_info.observation_count, 1);
6967 }
6968
6969 #[test]
6970 fn handle_observed_address_frame_multiple_observations() {
6971 let config = AddressDiscoveryConfig::SendAndReceive;
6972 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6973 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6974 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6975 let now = Instant::now();
6976 let path_id = 0;
6977
6978 state.handle_observed_address(addr1, path_id, now);
6980 state.handle_observed_address(addr1, path_id, now + Duration::from_secs(1));
6981 state.handle_observed_address(addr2, path_id, now + Duration::from_secs(2));
6982
6983 assert_eq!(state.received_history.len(), 3);
6985
6986 let path_info = &state.received_observations[&path_id];
6988 assert_eq!(path_info.observed_address, Some(addr2));
6989 assert_eq!(path_info.observation_count, 1); }
6991
6992 #[test]
6993 fn handle_observed_address_frame_disabled() {
6994 let config = AddressDiscoveryConfig::SendAndReceive;
6995 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6996 state.enabled = false; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6998 let now = Instant::now();
6999
7000 state.handle_observed_address(addr, 0, now);
7002
7003 assert!(state.received_history.is_empty());
7005 assert!(state.sent_observations.is_empty());
7006 assert!(state.received_observations.is_empty());
7007 }
7008
7009 #[test]
7010 fn should_send_observation_basic() {
7011 let config = AddressDiscoveryConfig::SendAndReceive;
7012 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7013 state.max_observation_rate = 10;
7014 let now = Instant::now();
7015 let path_id = 0;
7016
7017 assert!(state.should_send_observation(path_id, now));
7019
7020 state.record_observation_sent(path_id);
7022
7023 assert!(state.should_send_observation(path_id, now));
7025 }
7026
7027 #[test]
7028 fn should_send_observation_rate_limiting() {
7029 let config = AddressDiscoveryConfig::SendAndReceive;
7030 let now = Instant::now();
7031 let mut state = AddressDiscoveryState::new(&config, now);
7032 state.max_observation_rate = 2; state.update_rate_limit(2.0);
7034 let path_id = 0;
7035
7036 assert!(state.should_send_observation(path_id, now));
7038 state.record_observation_sent(path_id);
7039 assert!(state.should_send_observation(path_id, now));
7040 state.record_observation_sent(path_id);
7041
7042 assert!(!state.should_send_observation(path_id, now));
7044
7045 let later = now + Duration::from_secs(1);
7047 assert!(state.should_send_observation(path_id, later));
7048 }
7049
7050 #[test]
7051 fn should_send_observation_disabled() {
7052 let config = AddressDiscoveryConfig::SendAndReceive;
7053 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7054 state.enabled = false;
7055
7056 assert!(!state.should_send_observation(0, Instant::now()));
7058 }
7059
7060 #[test]
7061 fn should_send_observation_per_path() {
7062 let config = AddressDiscoveryConfig::SendAndReceive;
7063 let now = Instant::now();
7064 let mut state = AddressDiscoveryState::new(&config, now);
7065 state.max_observation_rate = 2; state.observe_all_paths = true;
7067 state.update_rate_limit(2.0);
7068
7069 assert!(state.should_send_observation(0, now));
7071 state.record_observation_sent(0);
7072
7073 assert!(state.should_send_observation(1, now));
7075 state.record_observation_sent(1);
7076
7077 assert!(!state.should_send_observation(0, now));
7079 assert!(!state.should_send_observation(1, now));
7080
7081 let later = now + Duration::from_secs(1);
7083 assert!(state.should_send_observation(0, later));
7084 }
7085
7086 #[test]
7087 fn has_unnotified_changes_test() {
7088 let config = AddressDiscoveryConfig::SendAndReceive;
7089 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7090 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7091 let now = Instant::now();
7092
7093 assert!(!state.has_unnotified_changes());
7095
7096 state.handle_observed_address(addr, 0, now);
7098 assert!(state.has_unnotified_changes());
7099
7100 state.received_observations.get_mut(&0).unwrap().notified = true;
7102 assert!(!state.has_unnotified_changes());
7103 }
7104
7105 #[test]
7106 fn get_observed_address_test() {
7107 let config = AddressDiscoveryConfig::SendAndReceive;
7108 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7109 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7110 let now = Instant::now();
7111 let path_id = 0;
7112
7113 assert_eq!(state.get_observed_address(path_id), None);
7115
7116 state.handle_observed_address(addr, path_id, now);
7118 assert_eq!(state.get_observed_address(path_id), Some(addr));
7119
7120 assert_eq!(state.get_observed_address(999), None);
7122 }
7123
7124 #[test]
7126 fn rate_limiter_token_bucket_basic() {
7127 let now = Instant::now();
7128 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(5.0, now));
7132 assert!(limiter.try_consume(5.0, now));
7133
7134 assert!(!limiter.try_consume(1.0, now));
7136 }
7137
7138 #[test]
7139 fn rate_limiter_token_replenishment() {
7140 let now = Instant::now();
7141 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(10.0, now));
7145 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7149 assert!(limiter.try_consume(10.0, later)); assert!(!limiter.try_consume(0.1, later)); let later = later + Duration::from_millis(500);
7154 assert!(limiter.try_consume(5.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7157
7158 #[test]
7159 fn rate_limiter_max_tokens_cap() {
7160 let now = Instant::now();
7161 let mut limiter = AddressObservationRateLimiter::new(10, now);
7162
7163 let later = now + Duration::from_secs(2);
7165 assert!(limiter.try_consume(10.0, later));
7167 assert!(!limiter.try_consume(10.1, later)); let later2 = later + Duration::from_secs(1);
7171 assert!(limiter.try_consume(3.0, later2));
7172
7173 let much_later = later2 + Duration::from_secs(2);
7175 assert!(limiter.try_consume(10.0, much_later)); assert!(!limiter.try_consume(0.1, much_later)); }
7178
7179 #[test]
7180 fn rate_limiter_fractional_consumption() {
7181 let now = Instant::now();
7182 let mut limiter = AddressObservationRateLimiter::new(10, now);
7183
7184 assert!(limiter.try_consume(0.5, now));
7186 assert!(limiter.try_consume(2.3, now));
7187 assert!(limiter.try_consume(7.2, now)); assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_millis(100); assert!(limiter.try_consume(1.0, later));
7193 assert!(!limiter.try_consume(0.1, later));
7194 }
7195
7196 #[test]
7197 fn rate_limiter_zero_rate() {
7198 let now = Instant::now();
7199 let mut limiter = AddressObservationRateLimiter::new(0, now); assert!(!limiter.try_consume(1.0, now));
7203 assert!(!limiter.try_consume(0.1, now));
7204 assert!(!limiter.try_consume(0.001, now));
7205
7206 let later = now + Duration::from_secs(10);
7208 assert!(!limiter.try_consume(0.001, later));
7209 }
7210
7211 #[test]
7212 fn rate_limiter_high_rate() {
7213 let now = Instant::now();
7214 let mut limiter = AddressObservationRateLimiter::new(63, now); assert!(limiter.try_consume(60.0, now));
7218 assert!(limiter.try_consume(3.0, now));
7219 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7223 assert!(limiter.try_consume(63.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7226
7227 #[test]
7228 fn rate_limiter_time_precision() {
7229 let now = Instant::now();
7230 let mut limiter = AddressObservationRateLimiter::new(100, now); assert!(limiter.try_consume(100.0, now));
7234 assert!(!limiter.try_consume(0.1, now));
7235
7236 let later = now + Duration::from_millis(10);
7238 assert!(limiter.try_consume(0.8, later)); assert!(!limiter.try_consume(0.5, later)); let much_later = later + Duration::from_millis(100); assert!(limiter.try_consume(5.0, much_later)); limiter.tokens = 0.0; let final_time = much_later + Duration::from_millis(1);
7250 limiter.update_tokens(final_time); assert!(limiter.tokens >= 0.09 && limiter.tokens <= 0.11);
7255 }
7256
7257 #[test]
7258 fn per_path_rate_limiting_independent() {
7259 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7260 let now = Instant::now();
7261 let mut state = AddressDiscoveryState::new(&config, now);
7262
7263 state.observe_all_paths = true;
7265
7266 state.update_rate_limit(5.0);
7268
7269 state
7271 .sent_observations
7272 .insert(0, paths::PathAddressInfo::new());
7273 state
7274 .sent_observations
7275 .insert(1, paths::PathAddressInfo::new());
7276 state
7277 .sent_observations
7278 .insert(2, paths::PathAddressInfo::new());
7279
7280 state
7282 .sent_observations
7283 .get_mut(&0)
7284 .unwrap()
7285 .observed_address = Some(SocketAddr::new(
7286 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7287 8080,
7288 ));
7289 state
7290 .sent_observations
7291 .get_mut(&1)
7292 .unwrap()
7293 .observed_address = Some(SocketAddr::new(
7294 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
7295 8081,
7296 ));
7297 state
7298 .sent_observations
7299 .get_mut(&2)
7300 .unwrap()
7301 .observed_address = Some(SocketAddr::new(
7302 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)),
7303 8082,
7304 ));
7305
7306 for _ in 0..3 {
7308 assert!(state.should_send_observation(0, now));
7309 state.record_observation_sent(0);
7310 state.sent_observations.get_mut(&0).unwrap().notified = false;
7312 }
7313
7314 for _ in 0..2 {
7316 assert!(state.should_send_observation(1, now));
7317 state.record_observation_sent(1);
7318 state.sent_observations.get_mut(&1).unwrap().notified = false;
7320 }
7321
7322 assert!(!state.should_send_observation(2, now));
7324
7325 let later = now + Duration::from_secs(1);
7327
7328 assert!(state.should_send_observation(0, later));
7330 assert!(state.should_send_observation(1, later));
7331 assert!(state.should_send_observation(2, later));
7332 }
7333
7334 #[test]
7335 fn per_path_rate_limiting_with_path_specific_limits() {
7336 let now = Instant::now();
7337 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7338 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7339 let config = TransportConfig::default();
7340
7341 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
7343 let mut path2 = paths::PathData::new(remote2, false, None, now, &config);
7344
7345 path1.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now); path2.observation_rate_limiter = paths::PathObservationRateLimiter::new(5, now); for _ in 0..10 {
7351 assert!(path1.observation_rate_limiter.can_send(now));
7352 path1.observation_rate_limiter.consume_token(now);
7353 }
7354 assert!(!path1.observation_rate_limiter.can_send(now));
7355
7356 for _ in 0..5 {
7358 assert!(path2.observation_rate_limiter.can_send(now));
7359 path2.observation_rate_limiter.consume_token(now);
7360 }
7361 assert!(!path2.observation_rate_limiter.can_send(now));
7362 }
7363
7364 #[test]
7365 fn per_path_rate_limiting_address_change_detection() {
7366 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7367 let now = Instant::now();
7368 let mut state = AddressDiscoveryState::new(&config, now);
7369
7370 let path_id = 0;
7372 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7373 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8080);
7374
7375 assert!(state.should_send_observation(path_id, now));
7377
7378 let frame = state.queue_observed_address_frame(path_id, addr1);
7380 assert!(frame.is_some());
7381
7382 assert!(!state.should_send_observation(path_id, now));
7384
7385 if let Some(info) = state.sent_observations.get_mut(&path_id) {
7387 info.notified = false;
7388 info.observed_address = Some(addr2);
7389 }
7390
7391 assert!(state.should_send_observation(path_id, now));
7393 }
7394
7395 #[test]
7396 fn per_path_rate_limiting_migration() {
7397 let now = Instant::now();
7398 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7399 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7400 let config = TransportConfig::default();
7401
7402 let mut path = paths::PathData::new(remote1, false, None, now, &config);
7404 path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7405
7406 for _ in 0..5 {
7408 assert!(path.observation_rate_limiter.can_send(now));
7409 path.observation_rate_limiter.consume_token(now);
7410 }
7411
7412 let mut new_path = paths::PathData::new(remote2, false, None, now, &config);
7414
7415 new_path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7418
7419 for _ in 0..10 {
7421 assert!(new_path.observation_rate_limiter.can_send(now));
7422 new_path.observation_rate_limiter.consume_token(now);
7423 }
7424 assert!(!new_path.observation_rate_limiter.can_send(now));
7425 }
7426
7427 #[test]
7428 fn per_path_rate_limiting_disabled_paths() {
7429 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7430 let now = Instant::now();
7431 let mut state = AddressDiscoveryState::new(&config, now);
7432
7433 assert!(state.should_send_observation(0, now));
7435
7436 assert!(!state.should_send_observation(1, now));
7438 assert!(!state.should_send_observation(2, now));
7439
7440 let later = now + Duration::from_secs(1);
7442 assert!(!state.should_send_observation(1, later));
7443 }
7444
7445 #[test]
7446 fn respecting_negotiated_max_observation_rate_basic() {
7447 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7448 let now = Instant::now();
7449 let mut state = AddressDiscoveryState::new(&config, now);
7450
7451 state.max_observation_rate = 10; state.rate_limiter = AddressObservationRateLimiter::new(10, now);
7454
7455 for _ in 0..10 {
7457 assert!(state.should_send_observation(0, now));
7458 }
7459 assert!(!state.should_send_observation(0, now));
7461 }
7462
7463 #[test]
7464 fn respecting_negotiated_max_observation_rate_zero() {
7465 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7466 let now = Instant::now();
7467 let mut state = AddressDiscoveryState::new(&config, now);
7468
7469 state.max_observation_rate = 0;
7471 state.rate_limiter = AddressObservationRateLimiter::new(0, now);
7472
7473 assert!(!state.should_send_observation(0, now));
7475 assert!(!state.should_send_observation(1, now));
7476
7477 let later = now + Duration::from_secs(10);
7479 assert!(!state.should_send_observation(0, later));
7480 }
7481
7482 #[test]
7483 fn respecting_negotiated_max_observation_rate_higher() {
7484 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7485 let now = Instant::now();
7486 let mut state = AddressDiscoveryState::new(&config, now);
7487
7488 state
7490 .sent_observations
7491 .insert(0, paths::PathAddressInfo::new());
7492 state
7493 .sent_observations
7494 .get_mut(&0)
7495 .unwrap()
7496 .observed_address = Some(SocketAddr::new(
7497 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7498 8080,
7499 ));
7500
7501 state.update_rate_limit(5.0);
7503
7504 state.max_observation_rate = 20; for _ in 0..5 {
7509 assert!(state.should_send_observation(0, now));
7510 state.record_observation_sent(0);
7511 state.sent_observations.get_mut(&0).unwrap().notified = false;
7513 }
7514 assert!(!state.should_send_observation(0, now));
7516 }
7517
7518 #[test]
7519 fn respecting_negotiated_max_observation_rate_dynamic_update() {
7520 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7521 let now = Instant::now();
7522 let mut state = AddressDiscoveryState::new(&config, now);
7523
7524 state
7526 .sent_observations
7527 .insert(0, paths::PathAddressInfo::new());
7528 state
7529 .sent_observations
7530 .get_mut(&0)
7531 .unwrap()
7532 .observed_address = Some(SocketAddr::new(
7533 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7534 8080,
7535 ));
7536
7537 for _ in 0..5 {
7539 assert!(state.should_send_observation(0, now));
7540 state.record_observation_sent(0);
7541 state.sent_observations.get_mut(&0).unwrap().notified = false;
7543 }
7544
7545 state.max_observation_rate = 3;
7549 state.rate_limiter.set_rate(3);
7550
7551 for _ in 0..3 {
7554 assert!(state.should_send_observation(0, now));
7555 state.record_observation_sent(0);
7556 state.sent_observations.get_mut(&0).unwrap().notified = false;
7558 }
7559
7560 assert!(!state.should_send_observation(0, now));
7562
7563 let later = now + Duration::from_secs(1);
7565 for _ in 0..3 {
7566 assert!(state.should_send_observation(0, later));
7567 state.record_observation_sent(0);
7568 state.sent_observations.get_mut(&0).unwrap().notified = false;
7570 }
7571
7572 assert!(!state.should_send_observation(0, later));
7574 }
7575
7576 #[test]
7577 fn respecting_negotiated_max_observation_rate_with_paths() {
7578 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7579 let now = Instant::now();
7580 let mut state = AddressDiscoveryState::new(&config, now);
7581
7582 state.observe_all_paths = true;
7584
7585 for i in 0..3 {
7587 state
7588 .sent_observations
7589 .insert(i, paths::PathAddressInfo::new());
7590 state
7591 .sent_observations
7592 .get_mut(&i)
7593 .unwrap()
7594 .observed_address = Some(SocketAddr::new(
7595 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100 + i as u8)),
7596 5000,
7597 ));
7598 }
7599
7600 for _ in 0..3 {
7603 for i in 0..3 {
7605 if state.should_send_observation(i, now) {
7606 state.record_observation_sent(i);
7607 state.sent_observations.get_mut(&i).unwrap().notified = false;
7609 }
7610 }
7611 }
7612
7613 assert!(state.should_send_observation(0, now));
7616 state.record_observation_sent(0);
7617
7618 assert!(!state.should_send_observation(0, now));
7620 assert!(!state.should_send_observation(1, now));
7621 assert!(!state.should_send_observation(2, now));
7622 }
7623
7624 #[test]
7625 fn queue_observed_address_frame_basic() {
7626 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7627 let now = Instant::now();
7628 let mut state = AddressDiscoveryState::new(&config, now);
7629
7630 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7632 let frame = state.queue_observed_address_frame(0, address);
7633
7634 assert!(frame.is_some());
7636 let frame = frame.unwrap();
7637 assert_eq!(frame.address, address);
7638
7639 assert!(state.sent_observations.contains_key(&0));
7641 assert!(state.sent_observations.get(&0).unwrap().notified);
7642 }
7643
7644 #[test]
7645 fn queue_observed_address_frame_rate_limited() {
7646 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7647 let now = Instant::now();
7648 let mut state = AddressDiscoveryState::new(&config, now);
7649
7650 state.observe_all_paths = true;
7652
7653 let mut addresses = Vec::new();
7655 for i in 0..10 {
7656 let addr = SocketAddr::new(
7657 IpAddr::V4(Ipv4Addr::new(192, 168, 1, i as u8)),
7658 5000 + i as u16,
7659 );
7660 addresses.push(addr);
7661 assert!(
7662 state.queue_observed_address_frame(i as u64, addr).is_some(),
7663 "Frame {} should be allowed",
7664 i + 1
7665 );
7666 }
7667
7668 let addr11 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 11)), 5011);
7670 assert!(
7671 state.queue_observed_address_frame(10, addr11).is_none(),
7672 "11th frame should be rate limited"
7673 );
7674 }
7675
7676 #[test]
7677 fn queue_observed_address_frame_disabled() {
7678 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7679 let now = Instant::now();
7680 let mut state = AddressDiscoveryState::new(&config, now);
7681
7682 state.enabled = false;
7684
7685 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7686
7687 assert!(state.queue_observed_address_frame(0, address).is_none());
7689 }
7690
7691 #[test]
7692 fn queue_observed_address_frame_already_notified() {
7693 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7694 let now = Instant::now();
7695 let mut state = AddressDiscoveryState::new(&config, now);
7696
7697 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7698
7699 assert!(state.queue_observed_address_frame(0, address).is_some());
7701
7702 assert!(state.queue_observed_address_frame(0, address).is_none());
7704
7705 let new_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7707 assert!(state.queue_observed_address_frame(0, new_address).is_none());
7708 }
7709
7710 #[test]
7711 fn queue_observed_address_frame_primary_path_only() {
7712 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7713 let now = Instant::now();
7714 let mut state = AddressDiscoveryState::new(&config, now);
7715
7716 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7717
7718 assert!(state.queue_observed_address_frame(0, address).is_some());
7720
7721 assert!(state.queue_observed_address_frame(1, address).is_none());
7723 assert!(state.queue_observed_address_frame(2, address).is_none());
7724 }
7725
7726 #[test]
7727 fn queue_observed_address_frame_updates_path_info() {
7728 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7729 let now = Instant::now();
7730 let mut state = AddressDiscoveryState::new(&config, now);
7731
7732 let address = SocketAddr::new(
7733 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
7734 5000,
7735 );
7736
7737 let frame = state.queue_observed_address_frame(0, address);
7739 assert!(frame.is_some());
7740
7741 let path_info = state.sent_observations.get(&0).unwrap();
7743 assert_eq!(path_info.observed_address, Some(address));
7744 assert!(path_info.notified);
7745
7746 assert_eq!(state.received_history.len(), 0);
7749 }
7750
7751 #[test]
7752 fn retransmits_includes_outbound_observations() {
7753 use crate::connection::spaces::Retransmits;
7754
7755 let mut retransmits = Retransmits::default();
7757
7758 assert!(retransmits.outbound_observations.is_empty());
7760
7761 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7763 let frame = frame::ObservedAddress {
7764 sequence_number: VarInt::from_u32(1),
7765 address,
7766 };
7767 retransmits.outbound_observations.push(frame);
7768
7769 assert_eq!(retransmits.outbound_observations.len(), 1);
7771 assert_eq!(retransmits.outbound_observations[0].address, address);
7772 }
7773
7774 #[test]
7775 fn check_for_address_observations_no_peer_support() {
7776 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7777 let now = Instant::now();
7778 let mut state = AddressDiscoveryState::new(&config, now);
7779
7780 state
7782 .sent_observations
7783 .insert(0, paths::PathAddressInfo::new());
7784 state
7785 .sent_observations
7786 .get_mut(&0)
7787 .unwrap()
7788 .observed_address = Some(SocketAddr::new(
7789 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)),
7790 5000,
7791 ));
7792
7793 let frames = state.check_for_address_observations(0, false, now);
7795
7796 assert!(frames.is_empty());
7798 }
7799
7800 #[test]
7801 fn check_for_address_observations_with_peer_support() {
7802 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7803 let now = Instant::now();
7804 let mut state = AddressDiscoveryState::new(&config, now);
7805
7806 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7808 state
7809 .sent_observations
7810 .insert(0, paths::PathAddressInfo::new());
7811 state
7812 .sent_observations
7813 .get_mut(&0)
7814 .unwrap()
7815 .observed_address = Some(address);
7816
7817 let frames = state.check_for_address_observations(0, true, now);
7819
7820 assert_eq!(frames.len(), 1);
7822 assert_eq!(frames[0].address, address);
7823
7824 assert!(state.sent_observations.get(&0).unwrap().notified);
7826 }
7827
7828 #[test]
7829 fn check_for_address_observations_rate_limited() {
7830 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7831 let now = Instant::now();
7832 let mut state = AddressDiscoveryState::new(&config, now);
7833
7834 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7836 state
7837 .sent_observations
7838 .insert(0, paths::PathAddressInfo::new());
7839 state
7840 .sent_observations
7841 .get_mut(&0)
7842 .unwrap()
7843 .observed_address = Some(address);
7844
7845 for _ in 0..10 {
7847 let frames = state.check_for_address_observations(0, true, now);
7848 if frames.is_empty() {
7849 break;
7850 }
7851 state.sent_observations.get_mut(&0).unwrap().notified = false;
7853 }
7854
7855 assert_eq!(state.rate_limiter.tokens, 0.0);
7857
7858 state.sent_observations.get_mut(&0).unwrap().notified = false;
7860
7861 let frames2 = state.check_for_address_observations(0, true, now);
7863 assert_eq!(frames2.len(), 0);
7864
7865 state.sent_observations.get_mut(&0).unwrap().notified = false;
7867
7868 let later = now + Duration::from_millis(200); let frames3 = state.check_for_address_observations(0, true, later);
7871 assert_eq!(frames3.len(), 1);
7872 }
7873
7874 #[test]
7875 fn check_for_address_observations_multiple_paths() {
7876 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7877 let now = Instant::now();
7878 let mut state = AddressDiscoveryState::new(&config, now);
7879
7880 state.observe_all_paths = true;
7882
7883 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7885 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7886
7887 state
7888 .sent_observations
7889 .insert(0, paths::PathAddressInfo::new());
7890 state
7891 .sent_observations
7892 .get_mut(&0)
7893 .unwrap()
7894 .observed_address = Some(addr1);
7895
7896 state
7897 .sent_observations
7898 .insert(1, paths::PathAddressInfo::new());
7899 state
7900 .sent_observations
7901 .get_mut(&1)
7902 .unwrap()
7903 .observed_address = Some(addr2);
7904
7905 let frames = state.check_for_address_observations(0, true, now);
7907
7908 assert_eq!(frames.len(), 2);
7910
7911 let addresses: Vec<_> = frames.iter().map(|f| f.address).collect();
7913 assert!(addresses.contains(&addr1));
7914 assert!(addresses.contains(&addr2));
7915
7916 assert!(state.sent_observations.get(&0).unwrap().notified);
7918 assert!(state.sent_observations.get(&1).unwrap().notified);
7919 }
7920
7921 #[test]
7923 fn test_rate_limiter_configuration() {
7924 let state = AddressDiscoveryState::new_with_params(true, 10.0, false);
7926 assert_eq!(state.rate_limiter.rate, 10.0);
7927 assert_eq!(state.rate_limiter.max_tokens, 10.0);
7928 assert_eq!(state.rate_limiter.tokens, 10.0);
7929
7930 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
7931 assert_eq!(state.rate_limiter.rate, 63.0);
7932 assert_eq!(state.rate_limiter.max_tokens, 63.0);
7933 }
7934
7935 #[test]
7936 fn test_rate_limiter_update_configuration() {
7937 let mut state = AddressDiscoveryState::new_with_params(true, 5.0, false);
7938
7939 assert_eq!(state.rate_limiter.rate, 5.0);
7941
7942 state.update_rate_limit(10.0);
7944 assert_eq!(state.rate_limiter.rate, 10.0);
7945 assert_eq!(state.rate_limiter.max_tokens, 10.0);
7946
7947 state.rate_limiter.tokens = 15.0;
7949 state.update_rate_limit(8.0);
7950 assert_eq!(state.rate_limiter.tokens, 8.0);
7951 }
7952
7953 #[test]
7954 fn test_rate_limiter_from_transport_params() {
7955 let mut params = TransportParameters::default();
7956 params.address_discovery = Some(AddressDiscoveryConfig::SendAndReceive);
7957
7958 let state = AddressDiscoveryState::from_transport_params(¶ms);
7959 assert!(state.is_some());
7960 let state = state.unwrap();
7961 assert_eq!(state.rate_limiter.rate, 10.0); assert!(!state.observe_all_paths); }
7964
7965 #[test]
7966 fn test_rate_limiter_zero_rate() {
7967 let state = AddressDiscoveryState::new_with_params(true, 0.0, false);
7968 assert_eq!(state.rate_limiter.rate, 0.0);
7969 assert_eq!(state.rate_limiter.tokens, 0.0);
7970
7971 let address = "192.168.1.1:443".parse().unwrap();
7973 let mut state = AddressDiscoveryState::new_with_params(true, 0.0, false);
7974 let frame = state.queue_observed_address_frame(0, address);
7975 assert!(frame.is_none());
7976 }
7977
7978 #[test]
7979 fn test_rate_limiter_configuration_edge_cases() {
7980 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
7982 assert_eq!(state.rate_limiter.rate, 63.0);
7983
7984 let state = AddressDiscoveryState::new_with_params(true, 100.0, false);
7986 assert_eq!(state.rate_limiter.rate, 100.0);
7988
7989 let state = AddressDiscoveryState::new_with_params(true, 2.5, false);
7991 assert_eq!(state.rate_limiter.rate, 2.0);
7993 }
7994
7995 #[test]
7996 fn test_rate_limiter_runtime_update() {
7997 let mut state = AddressDiscoveryState::new_with_params(true, 10.0, false);
7998 let now = Instant::now();
7999
8000 state.rate_limiter.tokens = 5.0;
8002
8003 state.update_rate_limit(3.0);
8005
8006 assert_eq!(state.rate_limiter.tokens, 3.0);
8008 assert_eq!(state.rate_limiter.rate, 3.0);
8009 assert_eq!(state.rate_limiter.max_tokens, 3.0);
8010
8011 let later = now + Duration::from_secs(1);
8013 state.rate_limiter.update_tokens(later);
8014
8015 assert_eq!(state.rate_limiter.tokens, 3.0);
8017 }
8018
8019 #[test]
8021 fn test_address_discovery_state_initialization_default() {
8022 let now = Instant::now();
8024 let default_config = crate::transport_parameters::AddressDiscoveryConfig::default();
8025
8026 let address_discovery_state = Some(AddressDiscoveryState::new(&default_config, now));
8029
8030 assert!(address_discovery_state.is_some());
8031 let state = address_discovery_state.unwrap();
8032
8033 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
8037 }
8038
8039 #[test]
8040 fn test_address_discovery_state_initialization_on_handshake() {
8041 let now = Instant::now();
8043
8044 let mut address_discovery_state = Some(AddressDiscoveryState::new(
8046 &crate::transport_parameters::AddressDiscoveryConfig::default(),
8047 now,
8048 ));
8049
8050 let peer_params = TransportParameters {
8052 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8053 ..TransportParameters::default()
8054 };
8055
8056 if let Some(peer_config) = &peer_params.address_discovery {
8058 address_discovery_state = Some(AddressDiscoveryState::new(peer_config, now));
8060 }
8061
8062 assert!(address_discovery_state.is_some());
8064 let state = address_discovery_state.unwrap();
8065 assert!(state.enabled);
8066 assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths); }
8070
8071 #[test]
8072 fn test_address_discovery_negotiation_disabled_peer() {
8073 let now = Instant::now();
8075
8076 let our_config = AddressDiscoveryConfig::SendAndReceive;
8078 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8079
8080 let peer_params = TransportParameters {
8082 address_discovery: None,
8083 ..TransportParameters::default()
8084 };
8085
8086 if peer_params.address_discovery.is_none() {
8088 if let Some(state) = &mut address_discovery_state {
8089 state.enabled = false;
8090 }
8091 }
8092
8093 let state = address_discovery_state.unwrap();
8095 assert!(!state.enabled); }
8097
8098 #[test]
8099 fn test_address_discovery_negotiation_rate_limiting() {
8100 let now = Instant::now();
8102
8103 let our_config = AddressDiscoveryConfig::SendAndReceive;
8105 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8106
8107 if let Some(state) = &mut address_discovery_state {
8109 state.max_observation_rate = 30;
8110 state.update_rate_limit(30.0);
8111 }
8112
8113 let peer_params = TransportParameters {
8115 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8116 ..TransportParameters::default()
8117 };
8118
8119 if let (Some(state), Some(_peer_config)) =
8122 (&mut address_discovery_state, &peer_params.address_discovery)
8123 {
8124 let peer_rate = 15u8;
8127 let negotiated_rate = state.max_observation_rate.min(peer_rate);
8128 state.update_rate_limit(negotiated_rate as f64);
8129 }
8130
8131 let state = address_discovery_state.unwrap();
8133 assert_eq!(state.rate_limiter.rate, 15.0); }
8135
8136 #[test]
8137 fn test_address_discovery_path_initialization() {
8138 let now = Instant::now();
8140 let config = AddressDiscoveryConfig::SendAndReceive;
8141 let mut state = AddressDiscoveryState::new(&config, now);
8142
8143 assert!(state.sent_observations.is_empty());
8145 assert!(state.received_observations.is_empty());
8146
8147 let should_send = state.should_send_observation(0, now);
8149 assert!(should_send); }
8154
8155 #[test]
8156 fn test_address_discovery_multiple_path_initialization() {
8157 let now = Instant::now();
8159 let config = AddressDiscoveryConfig::SendAndReceive;
8160 let mut state = AddressDiscoveryState::new(&config, now);
8161
8162 assert!(state.should_send_observation(0, now)); assert!(!state.should_send_observation(1, now)); assert!(!state.should_send_observation(2, now)); state.observe_all_paths = true;
8169 assert!(state.should_send_observation(1, now)); assert!(state.should_send_observation(2, now)); let config_primary_only = AddressDiscoveryConfig::SendAndReceive;
8174 let mut state_primary = AddressDiscoveryState::new(&config_primary_only, now);
8175
8176 assert!(state_primary.should_send_observation(0, now)); assert!(!state_primary.should_send_observation(1, now)); }
8179
8180 #[test]
8181 fn test_handle_observed_address_frame_valid() {
8182 let now = Instant::now();
8184 let config = AddressDiscoveryConfig::SendAndReceive;
8185 let mut state = AddressDiscoveryState::new(&config, now);
8186
8187 let observed_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8189 state.handle_observed_address(observed_addr, 0, now);
8190
8191 assert_eq!(state.received_history.len(), 1);
8193 assert_eq!(state.received_history[0].address, observed_addr);
8194 assert_eq!(state.received_history[0].path_id, 0);
8195 assert_eq!(state.received_history[0].received_at, now);
8196
8197 let path_info = state.received_observations.get(&0).unwrap();
8199 assert_eq!(path_info.observed_address, Some(observed_addr));
8200 assert_eq!(path_info.last_observed, Some(now));
8201 assert_eq!(path_info.observation_count, 1);
8202 }
8203
8204 #[test]
8205 fn test_handle_multiple_received_history() {
8206 let now = Instant::now();
8208 let config = AddressDiscoveryConfig::SendAndReceive;
8209 let mut state = AddressDiscoveryState::new(&config, now);
8210
8211 let addr1 = SocketAddr::from(([192, 168, 1, 100], 5000));
8213 let addr2 = SocketAddr::from(([10, 0, 0, 50], 6000));
8214 let addr3 = SocketAddr::from(([192, 168, 1, 100], 7000)); state.handle_observed_address(addr1, 0, now);
8217 state.handle_observed_address(addr2, 1, now);
8218 state.handle_observed_address(addr3, 0, now + Duration::from_millis(100));
8219
8220 assert_eq!(state.received_history.len(), 3);
8222
8223 let path0_info = state.received_observations.get(&0).unwrap();
8225 assert_eq!(path0_info.observed_address, Some(addr3));
8226 assert_eq!(path0_info.observation_count, 1); let path1_info = state.received_observations.get(&1).unwrap();
8230 assert_eq!(path1_info.observed_address, Some(addr2));
8231 assert_eq!(path1_info.observation_count, 1);
8232 }
8233
8234 #[test]
8235 fn test_get_observed_address() {
8236 let now = Instant::now();
8238 let config = AddressDiscoveryConfig::SendAndReceive;
8239 let mut state = AddressDiscoveryState::new(&config, now);
8240
8241 assert_eq!(state.get_observed_address(0), None);
8243
8244 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8246 state.handle_observed_address(addr, 0, now);
8247
8248 assert_eq!(state.get_observed_address(0), Some(addr));
8250
8251 assert_eq!(state.get_observed_address(999), None);
8253 }
8254
8255 #[test]
8256 fn test_has_unnotified_changes() {
8257 let now = Instant::now();
8259 let config = AddressDiscoveryConfig::SendAndReceive;
8260 let mut state = AddressDiscoveryState::new(&config, now);
8261
8262 assert!(!state.has_unnotified_changes());
8264
8265 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8267 state.handle_observed_address(addr, 0, now);
8268 assert!(state.has_unnotified_changes());
8269
8270 if let Some(path_info) = state.received_observations.get_mut(&0) {
8272 path_info.notified = true;
8273 }
8274 assert!(!state.has_unnotified_changes());
8275
8276 let addr2 = SocketAddr::from(([192, 168, 1, 100], 6000));
8278 state.handle_observed_address(addr2, 0, now + Duration::from_secs(1));
8279 assert!(state.has_unnotified_changes());
8280 }
8281
8282 #[test]
8283 fn test_address_discovery_disabled() {
8284 let now = Instant::now();
8286 let config = AddressDiscoveryConfig::SendAndReceive;
8287 let mut state = AddressDiscoveryState::new(&config, now);
8288
8289 state.enabled = false;
8291
8292 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8294 state.handle_observed_address(addr, 0, now);
8295
8296 assert_eq!(state.received_history.len(), 0);
8298
8299 assert!(!state.should_send_observation(0, now));
8301 }
8302
8303 #[test]
8304 fn test_rate_limiting_basic() {
8305 let now = Instant::now();
8307 let config = AddressDiscoveryConfig::SendAndReceive;
8308 let mut state = AddressDiscoveryState::new(&config, now);
8309
8310 state.observe_all_paths = true;
8312 state.rate_limiter.set_rate(2); assert!(state.should_send_observation(0, now));
8316 state.record_observation_sent(0);
8318
8319 assert!(state.should_send_observation(1, now));
8321 state.record_observation_sent(1);
8322
8323 assert!(!state.should_send_observation(2, now));
8325
8326 let later = now + Duration::from_millis(500);
8328 assert!(state.should_send_observation(3, later));
8329 state.record_observation_sent(3);
8330
8331 assert!(!state.should_send_observation(4, later));
8333
8334 let _one_sec_later = now + Duration::from_secs(1);
8338 let two_sec_later = now + Duration::from_secs(2);
8342 assert!(state.should_send_observation(5, two_sec_later));
8343 state.record_observation_sent(5);
8344
8345 assert!(state.should_send_observation(6, two_sec_later));
8356 state.record_observation_sent(6);
8357
8358 assert!(
8360 !state.should_send_observation(7, two_sec_later),
8361 "Expected no tokens available"
8362 );
8363 }
8364
8365 #[test]
8366 fn test_rate_limiting_per_path() {
8367 let now = Instant::now();
8369 let config = AddressDiscoveryConfig::SendAndReceive;
8370 let mut state = AddressDiscoveryState::new(&config, now);
8371
8372 state
8374 .sent_observations
8375 .insert(0, paths::PathAddressInfo::new());
8376 state
8377 .sent_observations
8378 .get_mut(&0)
8379 .unwrap()
8380 .observed_address = Some(SocketAddr::new(
8381 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
8382 8080,
8383 ));
8384
8385 for _ in 0..10 {
8387 assert!(state.should_send_observation(0, now));
8388 state.record_observation_sent(0);
8389 state.sent_observations.get_mut(&0).unwrap().notified = false;
8391 }
8392
8393 assert!(!state.should_send_observation(0, now));
8395
8396 let later = now + Duration::from_millis(100);
8398 assert!(state.should_send_observation(0, later));
8399 state.record_observation_sent(0);
8400
8401 state.sent_observations.get_mut(&0).unwrap().notified = false;
8403
8404 assert!(!state.should_send_observation(0, later));
8406 }
8407
8408 #[test]
8409 fn test_rate_limiting_zero_rate() {
8410 let now = Instant::now();
8412 let config = AddressDiscoveryConfig::SendAndReceive;
8413 let mut state = AddressDiscoveryState::new(&config, now);
8414
8415 state.rate_limiter.set_rate(0);
8417 state.rate_limiter.tokens = 0.0;
8418 state.rate_limiter.max_tokens = 0.0;
8419
8420 assert!(!state.should_send_observation(0, now));
8422 assert!(!state.should_send_observation(0, now + Duration::from_secs(10)));
8423 assert!(!state.should_send_observation(0, now + Duration::from_secs(100)));
8424 }
8425
8426 #[test]
8427 fn test_rate_limiting_update() {
8428 let now = Instant::now();
8430 let config = AddressDiscoveryConfig::SendAndReceive;
8431 let mut state = AddressDiscoveryState::new(&config, now);
8432
8433 state.observe_all_paths = true;
8435
8436 for i in 0..12 {
8438 state
8439 .sent_observations
8440 .insert(i, paths::PathAddressInfo::new());
8441 state
8442 .sent_observations
8443 .get_mut(&i)
8444 .unwrap()
8445 .observed_address = Some(SocketAddr::new(
8446 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8447 8080,
8448 ));
8449 }
8450
8451 for i in 0..10 {
8454 assert!(state.should_send_observation(i, now));
8455 state.record_observation_sent(i);
8456 }
8457 assert!(!state.should_send_observation(10, now));
8459
8460 state.update_rate_limit(20.0);
8462
8463 let later = now + Duration::from_millis(50);
8466 assert!(state.should_send_observation(10, later));
8467 state.record_observation_sent(10);
8468
8469 let later2 = now + Duration::from_millis(100);
8471 assert!(state.should_send_observation(11, later2));
8472 }
8473
8474 #[test]
8475 fn test_rate_limiting_burst() {
8476 let now = Instant::now();
8478 let config = AddressDiscoveryConfig::SendAndReceive;
8479 let mut state = AddressDiscoveryState::new(&config, now);
8480
8481 for _ in 0..10 {
8483 assert!(state.should_send_observation(0, now));
8484 state.record_observation_sent(0);
8485 }
8486
8487 assert!(!state.should_send_observation(0, now));
8489
8490 let later = now + Duration::from_millis(100);
8492 assert!(state.should_send_observation(0, later));
8493 state.record_observation_sent(0);
8494 assert!(!state.should_send_observation(0, later));
8495 }
8496
8497 #[test]
8498 fn test_connection_rate_limiting_with_check_observations() {
8499 let now = Instant::now();
8501 let config = AddressDiscoveryConfig::SendAndReceive;
8502 let mut state = AddressDiscoveryState::new(&config, now);
8503
8504 let mut path_info = paths::PathAddressInfo::new();
8506 path_info.update_observed_address(
8507 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
8508 now,
8509 );
8510 state.sent_observations.insert(0, path_info);
8511
8512 let frame1 =
8514 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8515 assert!(frame1.is_some());
8516 state.record_observation_sent(0);
8517
8518 if let Some(info) = state.sent_observations.get_mut(&0) {
8520 info.notified = false;
8521 }
8522
8523 for _ in 1..10 {
8525 if let Some(info) = state.sent_observations.get_mut(&0) {
8527 info.notified = false;
8528 }
8529 let frame =
8530 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8531 assert!(frame.is_some());
8532 state.record_observation_sent(0);
8533 }
8534
8535 if let Some(info) = state.sent_observations.get_mut(&0) {
8537 info.notified = false;
8538 }
8539 let frame3 =
8540 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8541 assert!(frame3.is_none()); let later = now + Duration::from_millis(100);
8545 state.rate_limiter.update_tokens(later); if let Some(info) = state.sent_observations.get_mut(&0) {
8549 info.notified = false;
8550 }
8551
8552 let frame4 =
8553 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8554 assert!(frame4.is_some()); }
8556
8557 #[test]
8558 fn test_queue_observed_address_frame() {
8559 let now = Instant::now();
8561 let config = AddressDiscoveryConfig::SendAndReceive;
8562 let mut state = AddressDiscoveryState::new(&config, now);
8563
8564 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8565
8566 let frame = state.queue_observed_address_frame(0, addr);
8568 assert!(frame.is_some());
8569 assert_eq!(frame.unwrap().address, addr);
8570
8571 state.record_observation_sent(0);
8573
8574 for i in 0..9 {
8576 if let Some(info) = state.sent_observations.get_mut(&0) {
8578 info.notified = false;
8579 }
8580
8581 let frame = state.queue_observed_address_frame(0, addr);
8582 assert!(frame.is_some(), "Frame {} should be allowed", i + 2);
8583 state.record_observation_sent(0);
8584 }
8585
8586 if let Some(info) = state.sent_observations.get_mut(&0) {
8588 info.notified = false;
8589 }
8590
8591 let frame = state.queue_observed_address_frame(0, addr);
8593 assert!(frame.is_none(), "11th frame should be rate limited");
8594 }
8595
8596 #[test]
8597 fn test_multi_path_basic() {
8598 let now = Instant::now();
8600 let config = AddressDiscoveryConfig::SendAndReceive;
8601 let mut state = AddressDiscoveryState::new(&config, now);
8602
8603 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8604 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8605 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8606
8607 state.handle_observed_address(addr1, 0, now);
8609 state.handle_observed_address(addr2, 1, now);
8610 state.handle_observed_address(addr3, 2, now);
8611
8612 assert_eq!(state.get_observed_address(0), Some(addr1));
8614 assert_eq!(state.get_observed_address(1), Some(addr2));
8615 assert_eq!(state.get_observed_address(2), Some(addr3));
8616
8617 assert!(state.has_unnotified_changes());
8619
8620 assert_eq!(state.received_history.len(), 3);
8622 }
8623
8624 #[test]
8625 fn test_multi_path_observe_primary_only() {
8626 let now = Instant::now();
8628 let config = AddressDiscoveryConfig::SendAndReceive;
8629 let mut state = AddressDiscoveryState::new(&config, now);
8630
8631 assert!(state.should_send_observation(0, now));
8633 state.record_observation_sent(0);
8634
8635 assert!(!state.should_send_observation(1, now));
8637 assert!(!state.should_send_observation(2, now));
8638
8639 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8641 assert!(state.queue_observed_address_frame(0, addr).is_some());
8642 assert!(state.queue_observed_address_frame(1, addr).is_none());
8643 assert!(state.queue_observed_address_frame(2, addr).is_none());
8644 }
8645
8646 #[test]
8647 fn test_multi_path_rate_limiting() {
8648 let now = Instant::now();
8650 let config = AddressDiscoveryConfig::SendAndReceive;
8651 let mut state = AddressDiscoveryState::new(&config, now);
8652
8653 state.observe_all_paths = true;
8655
8656 for i in 0..21 {
8658 state
8659 .sent_observations
8660 .insert(i, paths::PathAddressInfo::new());
8661 state
8662 .sent_observations
8663 .get_mut(&i)
8664 .unwrap()
8665 .observed_address = Some(SocketAddr::new(
8666 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8667 8080,
8668 ));
8669 }
8670
8671 for i in 0..10 {
8673 assert!(state.should_send_observation(i, now));
8674 state.record_observation_sent(i);
8675 }
8676
8677 assert!(!state.should_send_observation(10, now));
8679
8680 state.sent_observations.get_mut(&0).unwrap().notified = false;
8682 assert!(!state.should_send_observation(0, now)); let later = now + Duration::from_secs(1);
8686 for i in 10..20 {
8687 assert!(state.should_send_observation(i, later));
8688 state.record_observation_sent(i);
8689 }
8690 assert!(!state.should_send_observation(20, later));
8692 }
8693
8694 #[test]
8695 fn test_multi_path_address_changes() {
8696 let now = Instant::now();
8698 let config = AddressDiscoveryConfig::SendAndReceive;
8699 let mut state = AddressDiscoveryState::new(&config, now);
8700
8701 let addr1a = SocketAddr::from(([192, 168, 1, 1], 5000));
8702 let addr1b = SocketAddr::from(([192, 168, 1, 2], 5000));
8703 let addr2a = SocketAddr::from(([10, 0, 0, 1], 6000));
8704 let addr2b = SocketAddr::from(([10, 0, 0, 2], 6000));
8705
8706 state.handle_observed_address(addr1a, 0, now);
8708 state.handle_observed_address(addr2a, 1, now);
8709
8710 if let Some(info) = state.received_observations.get_mut(&0) {
8712 info.notified = true;
8713 }
8714 if let Some(info) = state.received_observations.get_mut(&1) {
8715 info.notified = true;
8716 }
8717 assert!(!state.has_unnotified_changes());
8718
8719 state.handle_observed_address(addr1b, 0, now + Duration::from_secs(1));
8721 assert!(state.has_unnotified_changes());
8722
8723 assert_eq!(state.get_observed_address(0), Some(addr1b));
8725 assert_eq!(state.get_observed_address(1), Some(addr2a));
8726
8727 if let Some(info) = state.received_observations.get_mut(&0) {
8729 info.notified = true;
8730 }
8731 assert!(!state.has_unnotified_changes());
8732
8733 state.handle_observed_address(addr2b, 1, now + Duration::from_secs(2));
8735 assert!(state.has_unnotified_changes());
8736 }
8737
8738 #[test]
8739 fn test_multi_path_migration() {
8740 let now = Instant::now();
8742 let config = AddressDiscoveryConfig::SendAndReceive;
8743 let mut state = AddressDiscoveryState::new(&config, now);
8744
8745 let addr_old = SocketAddr::from(([192, 168, 1, 1], 5000));
8746 let addr_new = SocketAddr::from(([10, 0, 0, 1], 6000));
8747
8748 state.handle_observed_address(addr_old, 0, now);
8750 assert_eq!(state.get_observed_address(0), Some(addr_old));
8751
8752 state.handle_observed_address(addr_new, 1, now + Duration::from_secs(1));
8754
8755 assert_eq!(state.get_observed_address(0), Some(addr_old));
8757 assert_eq!(state.get_observed_address(1), Some(addr_new));
8758
8759 assert_eq!(state.received_observations.len(), 2);
8762 }
8763
8764 #[test]
8765 fn test_check_for_address_observations_multi_path() {
8766 let now = Instant::now();
8768 let config = AddressDiscoveryConfig::SendAndReceive;
8769 let mut state = AddressDiscoveryState::new(&config, now);
8770
8771 state.observe_all_paths = true;
8773
8774 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8776 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8777 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8778
8779 state
8781 .sent_observations
8782 .insert(0, paths::PathAddressInfo::new());
8783 state
8784 .sent_observations
8785 .get_mut(&0)
8786 .unwrap()
8787 .observed_address = Some(addr1);
8788 state
8789 .sent_observations
8790 .insert(1, paths::PathAddressInfo::new());
8791 state
8792 .sent_observations
8793 .get_mut(&1)
8794 .unwrap()
8795 .observed_address = Some(addr2);
8796 state
8797 .sent_observations
8798 .insert(2, paths::PathAddressInfo::new());
8799 state
8800 .sent_observations
8801 .get_mut(&2)
8802 .unwrap()
8803 .observed_address = Some(addr3);
8804
8805 let frames = state.check_for_address_observations(0, true, now);
8807
8808 assert_eq!(frames.len(), 3);
8810
8811 let frame_addrs: Vec<_> = frames.iter().map(|f| f.address).collect();
8813 assert!(frame_addrs.contains(&addr1), "addr1 should be in frames");
8814 assert!(frame_addrs.contains(&addr2), "addr2 should be in frames");
8815 assert!(frame_addrs.contains(&addr3), "addr3 should be in frames");
8816
8817 assert!(!state.has_unnotified_changes());
8819 }
8820
8821 #[test]
8822 fn test_multi_path_with_peer_not_supporting() {
8823 let now = Instant::now();
8825 let config = AddressDiscoveryConfig::SendAndReceive;
8826 let mut state = AddressDiscoveryState::new(&config, now);
8827
8828 state.handle_observed_address(SocketAddr::from(([192, 168, 1, 1], 5000)), 0, now);
8830 state.handle_observed_address(SocketAddr::from(([10, 0, 0, 1], 6000)), 1, now);
8831
8832 let frames = state.check_for_address_observations(0, false, now);
8834 assert_eq!(frames.len(), 0);
8835
8836 assert!(state.has_unnotified_changes());
8838 }
8839
8840 #[test]
8842 fn test_bootstrap_node_aggressive_observation_mode() {
8843 let config = AddressDiscoveryConfig::SendAndReceive;
8845 let now = Instant::now();
8846 let mut state = AddressDiscoveryState::new(&config, now);
8847
8848 assert!(!state.is_bootstrap_mode());
8850
8851 state.set_bootstrap_mode(true);
8853 assert!(state.is_bootstrap_mode());
8854
8855 assert!(state.should_observe_path(0)); assert!(state.should_observe_path(1)); assert!(state.should_observe_path(2));
8859
8860 let bootstrap_rate = state.get_effective_rate_limit();
8862 assert!(bootstrap_rate > 10.0); }
8864
8865 #[test]
8866 fn test_bootstrap_node_immediate_observation() {
8867 let config = AddressDiscoveryConfig::SendAndReceive;
8869 let now = Instant::now();
8870 let mut state = AddressDiscoveryState::new(&config, now);
8871 state.set_bootstrap_mode(true);
8872
8873 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8875 state.handle_observed_address(addr, 0, now);
8876
8877 assert!(state.should_send_observation_immediately(true));
8879
8880 assert!(state.should_send_observation(0, now));
8882
8883 let frame = state.queue_observed_address_frame(0, addr);
8885 assert!(frame.is_some());
8886 }
8887
8888 #[test]
8889 fn test_bootstrap_node_multiple_path_observations() {
8890 let config = AddressDiscoveryConfig::SendAndReceive;
8892 let now = Instant::now();
8893 let mut state = AddressDiscoveryState::new(&config, now);
8894 state.set_bootstrap_mode(true);
8895
8896 let addrs = vec![
8898 (0u64, SocketAddr::from(([192, 168, 1, 1], 5000))),
8899 (1u64, SocketAddr::from(([10, 0, 0, 1], 6000))),
8900 (2u64, SocketAddr::from(([172, 16, 0, 1], 7000))),
8901 ];
8902
8903 for (path_id, addr) in &addrs {
8904 state
8905 .sent_observations
8906 .insert(*path_id, paths::PathAddressInfo::new());
8907 state
8908 .sent_observations
8909 .get_mut(path_id)
8910 .unwrap()
8911 .observed_address = Some(*addr);
8912 }
8913
8914 let frames = state.check_for_address_observations(0, true, now);
8916 assert_eq!(frames.len(), 3);
8917
8918 for (_, addr) in &addrs {
8920 assert!(frames.iter().any(|f| f.address == *addr));
8921 }
8922 }
8923
8924 #[test]
8925 fn test_bootstrap_node_rate_limit_override() {
8926 let config = AddressDiscoveryConfig::SendAndReceive;
8928 let now = Instant::now();
8929 let mut state = AddressDiscoveryState::new(&config, now);
8930 state.set_bootstrap_mode(true);
8931
8932 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8934
8935 for i in 0..10 {
8937 state.handle_observed_address(addr, i, now);
8938 let can_send = state.should_send_observation(i, now);
8939 assert!(can_send, "Bootstrap node should send observation {i}");
8940 state.record_observation_sent(i);
8941 }
8942 }
8943
8944 #[test]
8945 fn test_bootstrap_node_configuration() {
8946 let config = AddressDiscoveryConfig::SendAndReceive;
8948 let mut state = AddressDiscoveryState::new(&config, Instant::now());
8949
8950 state.set_bootstrap_mode(true);
8952
8953 assert!(state.bootstrap_mode);
8955 assert!(state.enabled);
8956
8957 let effective_rate = state.get_effective_rate_limit();
8959 assert!(effective_rate > state.max_observation_rate as f64);
8960 }
8961
8962 #[test]
8963 fn test_bootstrap_node_persistent_observation() {
8964 let config = AddressDiscoveryConfig::SendAndReceive;
8966 let mut now = Instant::now();
8967 let mut state = AddressDiscoveryState::new(&config, now);
8968 state.set_bootstrap_mode(true);
8969
8970 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8971 let addr2 = SocketAddr::from(([192, 168, 1, 2], 5000));
8972
8973 state.handle_observed_address(addr1, 0, now);
8975 assert!(state.should_send_observation(0, now));
8976 state.record_observation_sent(0);
8977
8978 now += Duration::from_secs(60);
8980 state.handle_observed_address(addr2, 0, now);
8981
8982 assert!(state.should_send_observation(0, now));
8984 }
8985
8986 #[test]
8987 fn test_bootstrap_node_multi_peer_support() {
8988 let config = AddressDiscoveryConfig::SendAndReceive;
8991 let now = Instant::now();
8992 let mut state = AddressDiscoveryState::new(&config, now);
8993 state.set_bootstrap_mode(true);
8994
8995 let peer_addresses: Vec<(u64, SocketAddr)> = vec![
8997 (0, SocketAddr::from(([192, 168, 1, 1], 5000))), (1, SocketAddr::from(([10, 0, 0, 1], 6000))), (2, SocketAddr::from(([172, 16, 0, 1], 7000))), (3, SocketAddr::from(([192, 168, 2, 1], 8000))), ];
9002
9003 for (path_id, addr) in &peer_addresses {
9005 state
9006 .sent_observations
9007 .insert(*path_id, paths::PathAddressInfo::new());
9008 state
9009 .sent_observations
9010 .get_mut(path_id)
9011 .unwrap()
9012 .observed_address = Some(*addr);
9013 }
9014
9015 let frames = state.check_for_address_observations(0, true, now);
9017 assert_eq!(frames.len(), peer_addresses.len());
9018
9019 for (_, addr) in &peer_addresses {
9021 assert!(frames.iter().any(|f| f.address == *addr));
9022 }
9023 }
9024
9025 mod address_discovery_tests {
9027 include!("address_discovery_tests.rs");
9028 }
9029}