1#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
9use std::{
10 cmp,
11 collections::VecDeque,
12 convert::TryFrom,
13 fmt, io, mem,
14 net::{IpAddr, SocketAddr},
15 sync::Arc,
16};
17
18use bytes::{Bytes, BytesMut};
19use frame::StreamMetaVec;
20use rand::{Rng, SeedableRng, rngs::StdRng};
23use thiserror::Error;
24use tracing::{debug, error, info, trace, trace_span, warn};
25
26use crate::{
27 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
28 MIN_INITIAL_SIZE, MtuDiscoveryConfig, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
29 TransportError, TransportErrorCode, VarInt,
30 cid_generator::ConnectionIdGenerator,
31 cid_queue::CidQueue,
32 coding::BufMutExt,
33 config::{ServerConfig, TransportConfig},
34 crypto::{self, KeyPair, Keys, PacketKey},
35 endpoint::AddressDiscoveryStats,
36 frame::{self, Close, Datagram, FrameStruct, NewToken},
37 nat_traversal_api::PeerId,
38 packet::{
39 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
40 PacketNumber, PartialDecode, SpaceId,
41 },
42 range_set::ArrayRangeSet,
43 shared::{
44 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
45 EndpointEvent, EndpointEventInner,
46 },
47 token::{ResetToken, Token, TokenPayload},
48 transport_parameters::TransportParameters,
49};
50
51mod ack_frequency;
52use ack_frequency::AckFrequencyState;
53
54pub(crate) mod nat_traversal;
55use nat_traversal::NatTraversalState;
56pub(crate) use nat_traversal::{CoordinationPhase, NatTraversalError, NatTraversalRole};
57
58mod assembler;
59pub use assembler::Chunk;
60
61mod cid_state;
62use cid_state::CidState;
63
64mod datagrams;
65use datagrams::DatagramState;
66pub use datagrams::{Datagrams, SendDatagramError};
67
68mod mtud;
69use mtud::MtuDiscovery;
70
71mod pacing;
72
73mod packet_builder;
74use packet_builder::PacketBuilder;
75
76mod packet_crypto;
77use packet_crypto::{PrevCrypto, ZeroRttCrypto};
78
79mod paths;
80pub use paths::RttEstimator;
81use paths::{NatTraversalChallenges, PathData, PathResponses};
82
83mod send_buffer;
84
85mod spaces;
86#[cfg(fuzzing)]
87pub use spaces::Retransmits;
88#[cfg(not(fuzzing))]
89use spaces::Retransmits;
90use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
91
92mod stats;
93pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
94
95mod streams;
96#[cfg(fuzzing)]
97pub use streams::StreamsState;
98#[cfg(not(fuzzing))]
99use streams::StreamsState;
100pub use streams::{
101 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
102 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
103};
104
105mod timer;
106use crate::congestion::Controller;
107use timer::{Timer, TimerTable};
108
109pub struct Connection {
149 endpoint_config: Arc<EndpointConfig>,
150 config: Arc<TransportConfig>,
151 rng: StdRng,
152 crypto: Box<dyn crypto::Session>,
153 handshake_cid: ConnectionId,
155 rem_handshake_cid: ConnectionId,
157 local_ip: Option<IpAddr>,
160 path: PathData,
161 allow_mtud: bool,
163 prev_path: Option<(ConnectionId, PathData)>,
164 state: State,
165 side: ConnectionSide,
166 zero_rtt_enabled: bool,
168 zero_rtt_crypto: Option<ZeroRttCrypto>,
170 key_phase: bool,
171 key_phase_size: u64,
173 peer_params: TransportParameters,
175 orig_rem_cid: ConnectionId,
177 initial_dst_cid: ConnectionId,
179 retry_src_cid: Option<ConnectionId>,
182 lost_packets: u64,
184 events: VecDeque<Event>,
185 endpoint_events: VecDeque<EndpointEventInner>,
186 spin_enabled: bool,
188 spin: bool,
190 spaces: [PacketSpace; 3],
192 highest_space: SpaceId,
194 prev_crypto: Option<PrevCrypto>,
196 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
201 accepted_0rtt: bool,
202 permit_idle_reset: bool,
204 idle_timeout: Option<Duration>,
206 timers: TimerTable,
207 authentication_failures: u64,
209 error: Option<ConnectionError>,
211 packet_number_filter: PacketNumberFilter,
213
214 path_responses: PathResponses,
219 nat_traversal_challenges: NatTraversalChallenges,
221 close: bool,
222
223 ack_frequency: AckFrequencyState,
227
228 pto_count: u32,
233
234 receiving_ecn: bool,
239 total_authed_packets: u64,
241 app_limited: bool,
244
245 streams: StreamsState,
246 rem_cids: CidQueue,
248 local_cid_state: CidState,
250 datagrams: DatagramState,
252 stats: ConnectionStats,
254 version: u32,
256
257 nat_traversal: Option<NatTraversalState>,
259
260 nat_traversal_frame_config: frame::nat_traversal_unified::NatTraversalFrameConfig,
262
263 address_discovery_state: Option<AddressDiscoveryState>,
265
266 pqc_state: PqcState,
268
269 #[cfg(feature = "trace")]
271 trace_context: crate::tracing::TraceContext,
272
273 #[cfg(feature = "trace")]
275 event_log: Arc<crate::tracing::EventLog>,
276
277 #[cfg(feature = "__qlog")]
279 qlog_streamer: Option<Box<dyn std::io::Write + Send + Sync>>,
280
281 peer_id_for_tokens: Option<PeerId>,
283 delay_new_token_until_binding: bool,
286}
287
288impl Connection {
289 pub(crate) fn new(
290 endpoint_config: Arc<EndpointConfig>,
291 config: Arc<TransportConfig>,
292 init_cid: ConnectionId,
293 loc_cid: ConnectionId,
294 rem_cid: ConnectionId,
295 remote: SocketAddr,
296 local_ip: Option<IpAddr>,
297 crypto: Box<dyn crypto::Session>,
298 cid_gen: &dyn ConnectionIdGenerator,
299 now: Instant,
300 version: u32,
301 allow_mtud: bool,
302 rng_seed: [u8; 32],
303 side_args: SideArgs,
304 ) -> Self {
305 let pref_addr_cid = side_args.pref_addr_cid();
306 let path_validated = side_args.path_validated();
307 let connection_side = ConnectionSide::from(side_args);
308 let side = connection_side.side();
309 let initial_space = PacketSpace {
310 crypto: Some(crypto.initial_keys(&init_cid, side)),
311 ..PacketSpace::new(now)
312 };
313 let state = State::Handshake(state::Handshake {
314 rem_cid_set: side.is_server(),
315 expected_token: Bytes::new(),
316 client_hello: None,
317 });
318 let mut rng = StdRng::from_seed(rng_seed);
319 let mut this = Self {
320 endpoint_config,
321 crypto,
322 handshake_cid: loc_cid,
323 rem_handshake_cid: rem_cid,
324 local_cid_state: CidState::new(
325 cid_gen.cid_len(),
326 cid_gen.cid_lifetime(),
327 now,
328 if pref_addr_cid.is_some() { 2 } else { 1 },
329 ),
330 path: PathData::new(remote, allow_mtud, None, now, &config),
331 allow_mtud,
332 local_ip,
333 prev_path: None,
334 state,
335 side: connection_side,
336 zero_rtt_enabled: false,
337 zero_rtt_crypto: None,
338 key_phase: false,
339 key_phase_size: rng.gen_range(10..1000),
346 peer_params: TransportParameters::default(),
347 orig_rem_cid: rem_cid,
348 initial_dst_cid: init_cid,
349 retry_src_cid: None,
350 lost_packets: 0,
351 events: VecDeque::new(),
352 endpoint_events: VecDeque::new(),
353 spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
354 spin: false,
355 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
356 highest_space: SpaceId::Initial,
357 prev_crypto: None,
358 next_crypto: None,
359 accepted_0rtt: false,
360 permit_idle_reset: true,
361 idle_timeout: match config.max_idle_timeout {
362 None | Some(VarInt(0)) => None,
363 Some(dur) => Some(Duration::from_millis(dur.0)),
364 },
365 timers: TimerTable::default(),
366 authentication_failures: 0,
367 error: None,
368 #[cfg(test)]
369 packet_number_filter: match config.deterministic_packet_numbers {
370 false => PacketNumberFilter::new(&mut rng),
371 true => PacketNumberFilter::disabled(),
372 },
373 #[cfg(not(test))]
374 packet_number_filter: PacketNumberFilter::new(&mut rng),
375
376 path_responses: PathResponses::default(),
377 nat_traversal_challenges: NatTraversalChallenges::default(),
378 close: false,
379
380 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
381 &TransportParameters::default(),
382 )),
383
384 pto_count: 0,
385
386 app_limited: false,
387 receiving_ecn: false,
388 total_authed_packets: 0,
389
390 streams: StreamsState::new(
391 side,
392 config.max_concurrent_uni_streams,
393 config.max_concurrent_bidi_streams,
394 config.send_window,
395 config.receive_window,
396 config.stream_receive_window,
397 ),
398 datagrams: DatagramState::default(),
399 config,
400 rem_cids: CidQueue::new(rem_cid),
401 rng,
402 stats: ConnectionStats::default(),
403 version,
404 nat_traversal: None, nat_traversal_frame_config:
406 frame::nat_traversal_unified::NatTraversalFrameConfig::default(),
407 address_discovery_state: {
408 Some(AddressDiscoveryState::new(
411 &crate::transport_parameters::AddressDiscoveryConfig::default(),
412 now,
413 ))
414 },
415 pqc_state: PqcState::new(),
416
417 #[cfg(feature = "trace")]
418 trace_context: crate::tracing::TraceContext::new(crate::tracing::TraceId::new()),
419
420 #[cfg(feature = "trace")]
421 event_log: crate::tracing::global_log(),
422
423 #[cfg(feature = "__qlog")]
424 qlog_streamer: None,
425
426 peer_id_for_tokens: None,
427 delay_new_token_until_binding: false,
428 };
429
430 #[cfg(feature = "trace")]
432 {
433 use crate::trace_event;
434 use crate::tracing::{Event, EventData, socket_addr_to_bytes, timestamp_now};
435 let _peer_id = {
437 let mut id = [0u8; 32];
438 let addr_bytes = match remote {
439 SocketAddr::V4(addr) => addr.ip().octets().to_vec(),
440 SocketAddr::V6(addr) => addr.ip().octets().to_vec(),
441 };
442 id[..addr_bytes.len().min(32)]
443 .copy_from_slice(&addr_bytes[..addr_bytes.len().min(32)]);
444 id
445 };
446
447 let (addr_bytes, addr_type) = socket_addr_to_bytes(remote);
448 trace_event!(
449 &this.event_log,
450 Event {
451 timestamp: timestamp_now(),
452 trace_id: this.trace_context.trace_id(),
453 sequence: 0,
454 _padding: 0,
455 node_id: [0u8; 32], event_data: EventData::ConnInit {
457 endpoint_bytes: addr_bytes,
458 addr_type,
459 _padding: [0u8; 45],
460 },
461 }
462 );
463 }
464
465 if path_validated {
466 this.on_path_validated();
467 }
468 if side.is_client() {
469 this.write_crypto();
471 this.init_0rtt();
472 }
473 this
474 }
475
476 #[cfg(feature = "__qlog")]
478 pub fn set_qlog(
479 &mut self,
480 writer: Box<dyn std::io::Write + Send + Sync>,
481 _title: Option<String>,
482 _description: Option<String>,
483 _now: Instant,
484 ) {
485 self.qlog_streamer = Some(writer);
486 }
487
488 #[cfg(feature = "__qlog")]
490 fn emit_qlog_recovery_metrics(&mut self, _now: Instant) {
491 }
494
495 #[must_use]
503 pub fn poll_timeout(&mut self) -> Option<Instant> {
504 let mut next_timeout = self.timers.next_timeout();
505
506 if let Some(nat_state) = &self.nat_traversal {
508 if let Some(nat_timeout) = nat_state.get_next_timeout(Instant::now()) {
509 self.timers.set(Timer::NatTraversal, nat_timeout);
511 next_timeout = Some(next_timeout.map_or(nat_timeout, |t| t.min(nat_timeout)));
512 }
513 }
514
515 next_timeout
516 }
517
518 #[must_use]
524 pub fn poll(&mut self) -> Option<Event> {
525 if let Some(x) = self.events.pop_front() {
526 return Some(x);
527 }
528
529 if let Some(event) = self.streams.poll() {
530 return Some(Event::Stream(event));
531 }
532
533 if let Some(err) = self.error.take() {
534 return Some(Event::ConnectionLost { reason: err });
535 }
536
537 None
538 }
539
540 #[must_use]
542 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
543 self.endpoint_events.pop_front().map(EndpointEvent)
544 }
545
546 #[must_use]
548 pub fn streams(&mut self) -> Streams<'_> {
549 Streams {
550 state: &mut self.streams,
551 conn_state: &self.state,
552 }
553 }
554
555 #[must_use]
559 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
560 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
561 RecvStream {
562 id,
563 state: &mut self.streams,
564 pending: &mut self.spaces[SpaceId::Data].pending,
565 }
566 }
567
568 #[must_use]
570 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
571 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
572 SendStream {
573 id,
574 state: &mut self.streams,
575 pending: &mut self.spaces[SpaceId::Data].pending,
576 conn_state: &self.state,
577 }
578 }
579
580 #[must_use]
590 pub fn poll_transmit(
591 &mut self,
592 now: Instant,
593 max_datagrams: usize,
594 buf: &mut Vec<u8>,
595 ) -> Option<Transmit> {
596 assert!(max_datagrams != 0);
597 let max_datagrams = match self.config.enable_segmentation_offload {
598 false => 1,
599 true => max_datagrams,
600 };
601
602 let mut num_datagrams = 0;
603 let mut datagram_start = 0;
606 let mut segment_size = usize::from(self.path.current_mtu());
607
608 if let Some(nat_traversal) = &mut self.nat_traversal {
610 if nat_traversal.check_coordination_timeout(now) {
611 trace!("NAT traversal coordination timed out, may retry");
612 }
613 }
614
615 if let Some(challenge) = self.send_nat_traversal_challenge(now, buf) {
617 return Some(challenge);
618 }
619
620 if let Some(challenge) = self.send_path_challenge(now, buf) {
621 return Some(challenge);
622 }
623
624 for space in SpaceId::iter() {
626 let request_immediate_ack =
627 space == SpaceId::Data && self.peer_supports_ack_frequency();
628 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
629 }
630
631 let close = match self.state {
633 State::Drained => {
634 self.app_limited = true;
635 return None;
636 }
637 State::Draining | State::Closed(_) => {
638 if !self.close {
641 self.app_limited = true;
642 return None;
643 }
644 true
645 }
646 _ => false,
647 };
648
649 if let Some(config) = &self.config.ack_frequency_config {
651 self.spaces[SpaceId::Data].pending.ack_frequency = self
652 .ack_frequency
653 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
654 && self.highest_space == SpaceId::Data
655 && self.peer_supports_ack_frequency();
656 }
657
658 let mut buf_capacity = 0;
662
663 let mut coalesce = true;
664 let mut builder_storage: Option<PacketBuilder> = None;
665 let mut sent_frames = None;
666 let mut pad_datagram = false;
667 let mut pad_datagram_to_mtu = false;
668 let mut congestion_blocked = false;
669
670 let mut space_idx = 0;
672 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
673 while space_idx < spaces.len() {
676 let space_id = spaces[space_idx];
677 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
684 let frame_space_1rtt =
685 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
686
687 let can_send = self.space_can_send(space_id, frame_space_1rtt);
689 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
690 space_idx += 1;
691 continue;
692 }
693
694 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
695 || self.spaces[space_id].ping_pending
696 || self.spaces[space_id].immediate_ack_pending;
697 if space_id == SpaceId::Data {
698 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
699 }
700
701 pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
702
703 let buf_end = if let Some(builder) = &builder_storage {
707 buf.len().max(builder.min_size) + builder.tag_len
708 } else {
709 buf.len()
710 };
711
712 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
713 crypto.packet.local.tag_len()
714 } else if space_id == SpaceId::Data {
715 match self.zero_rtt_crypto.as_ref() {
716 Some(crypto) => crypto.packet.tag_len(),
717 None => {
718 error!(
720 "sending packets in the application data space requires known 0-RTT or 1-RTT keys"
721 );
722 return None;
723 }
724 }
725 } else {
726 unreachable!("tried to send {:?} packet without keys", space_id)
727 };
728 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
729 if num_datagrams >= max_datagrams {
733 break;
735 }
736
737 if self
744 .path
745 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
746 {
747 trace!("blocked by anti-amplification");
748 break;
749 }
750
751 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
754 let untracked_bytes = if let Some(builder) = &builder_storage {
756 buf_capacity - builder.partial_encode.start
757 } else {
758 0
759 } as u64;
760 debug_assert!(untracked_bytes <= segment_size as u64);
761
762 let bytes_to_send = segment_size as u64 + untracked_bytes;
763 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
764 space_idx += 1;
765 congestion_blocked = true;
766 trace!("blocked by congestion control");
769 continue;
770 }
771
772 let smoothed_rtt = self.path.rtt.get();
774 if let Some(delay) = self.path.pacing.delay(
775 smoothed_rtt,
776 bytes_to_send,
777 self.path.current_mtu(),
778 self.path.congestion.window(),
779 now,
780 ) {
781 self.timers.set(Timer::Pacing, delay);
782 congestion_blocked = true;
783 trace!("blocked by pacing");
786 break;
787 }
788 }
789
790 if let Some(mut builder) = builder_storage.take() {
792 if pad_datagram {
793 let min_size = self.pqc_state.min_initial_size();
794 builder.pad_to(min_size);
795 }
796
797 if num_datagrams > 1 || pad_datagram_to_mtu {
798 const MAX_PADDING: usize = 16;
811 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
812 - datagram_start
813 + builder.tag_len;
814 if (packet_len_unpadded + MAX_PADDING < segment_size
815 && !pad_datagram_to_mtu)
816 || datagram_start + segment_size > buf_capacity
817 {
818 trace!(
819 "GSO truncated by demand for {} padding bytes or loss probe",
820 segment_size - packet_len_unpadded
821 );
822 builder_storage = Some(builder);
823 break;
824 }
825
826 builder.pad_to(segment_size as u16);
829 }
830
831 builder.finish_and_track(now, self, sent_frames.take(), buf);
832
833 if num_datagrams == 1 {
834 segment_size = buf.len();
841 buf_capacity = buf.len();
844
845 if space_id == SpaceId::Data {
852 let frame_space_1rtt =
853 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
854 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
855 break;
856 }
857 }
858 }
859 }
860
861 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
863 0 => segment_size,
864 _ => {
865 self.spaces[space_id].loss_probes -= 1;
866 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
870 }
871 };
872 buf_capacity += next_datagram_size_limit;
873 if buf.capacity() < buf_capacity {
874 buf.reserve(max_datagrams * segment_size);
883 }
884 num_datagrams += 1;
885 coalesce = true;
886 pad_datagram = false;
887 datagram_start = buf.len();
888
889 debug_assert_eq!(
890 datagram_start % segment_size,
891 0,
892 "datagrams in a GSO batch must be aligned to the segment size"
893 );
894 } else {
895 if let Some(builder) = builder_storage.take() {
899 builder.finish_and_track(now, self, sent_frames.take(), buf);
900 }
901 }
902
903 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
904
905 if self.spaces[SpaceId::Initial].crypto.is_some()
910 && space_id == SpaceId::Handshake
911 && self.side.is_client()
912 {
913 self.discard_space(now, SpaceId::Initial);
916 }
917 if let Some(ref mut prev) = self.prev_crypto {
918 prev.update_unacked = false;
919 }
920
921 debug_assert!(
922 builder_storage.is_none() && sent_frames.is_none(),
923 "Previous packet must have been finished"
924 );
925
926 let builder = builder_storage.insert(PacketBuilder::new(
927 now,
928 space_id,
929 self.rem_cids.active(),
930 buf,
931 buf_capacity,
932 datagram_start,
933 ack_eliciting,
934 self,
935 )?);
936 coalesce = coalesce && !builder.short_header;
937
938 let should_adjust_coalescing = self
940 .pqc_state
941 .should_adjust_coalescing(buf.len() - datagram_start, space_id);
942
943 if should_adjust_coalescing {
944 coalesce = false;
945 trace!("Disabling coalescing for PQC handshake in {:?}", space_id);
946 }
947
948 pad_datagram |=
950 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
951
952 if close {
953 trace!("sending CONNECTION_CLOSE");
954 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
959 Self::populate_acks(
960 now,
961 self.receiving_ecn,
962 &mut SentFrames::default(),
963 &mut self.spaces[space_id],
964 buf,
965 &mut self.stats,
966 );
967 }
968
969 debug_assert!(
973 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
974 "ACKs should leave space for ConnectionClose"
975 );
976 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
977 let max_frame_size = builder.max_size - buf.len();
978 match self.state {
979 State::Closed(state::Closed { ref reason }) => {
980 if space_id == SpaceId::Data || reason.is_transport_layer() {
981 reason.encode(buf, max_frame_size)
982 } else {
983 frame::ConnectionClose {
984 error_code: TransportErrorCode::APPLICATION_ERROR,
985 frame_type: None,
986 reason: Bytes::new(),
987 }
988 .encode(buf, max_frame_size)
989 }
990 }
991 State::Draining => frame::ConnectionClose {
992 error_code: TransportErrorCode::NO_ERROR,
993 frame_type: None,
994 reason: Bytes::new(),
995 }
996 .encode(buf, max_frame_size),
997 _ => unreachable!(
998 "tried to make a close packet when the connection wasn't closed"
999 ),
1000 }
1001 }
1002 if space_id == self.highest_space {
1003 self.close = false;
1005 break;
1007 } else {
1008 space_idx += 1;
1012 continue;
1013 }
1014 }
1015
1016 if space_id == SpaceId::Data && num_datagrams == 1 {
1019 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
1020 let mut builder = builder_storage.take().unwrap();
1023 trace!("PATH_RESPONSE {:08x} (off-path)", token);
1024 buf.write(frame::FrameType::PATH_RESPONSE);
1025 buf.write(token);
1026 self.stats.frame_tx.path_response += 1;
1027 let min_size = self.pqc_state.min_initial_size();
1028 builder.pad_to(min_size);
1029 builder.finish_and_track(
1030 now,
1031 self,
1032 Some(SentFrames {
1033 non_retransmits: true,
1034 ..SentFrames::default()
1035 }),
1036 buf,
1037 );
1038 self.stats.udp_tx.on_sent(1, buf.len());
1039
1040 #[cfg(feature = "trace")]
1042 {
1043 use crate::trace_packet_sent;
1044 trace_packet_sent!(
1046 &self.event_log,
1047 self.trace_context.trace_id(),
1048 buf.len() as u32,
1049 0 );
1051 }
1052
1053 return Some(Transmit {
1054 destination: remote,
1055 size: buf.len(),
1056 ecn: None,
1057 segment_size: None,
1058 src_ip: self.local_ip,
1059 });
1060 }
1061 }
1062
1063 if space_id == SpaceId::Data && self.address_discovery_state.is_some() {
1065 let peer_supports = self.peer_params.address_discovery.is_some();
1066
1067 if let Some(state) = &mut self.address_discovery_state {
1068 let frames = state.check_for_address_observations(0, peer_supports, now);
1069 self.spaces[space_id]
1070 .pending
1071 .observed_addresses
1072 .extend(frames);
1073 }
1074 }
1075
1076 let sent =
1077 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
1078
1079 debug_assert!(
1086 !(sent.is_ack_only(&self.streams)
1087 && !can_send.acks
1088 && can_send.other
1089 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
1090 && self.datagrams.outgoing.is_empty()),
1091 "SendableFrames was {can_send:?}, but only ACKs have been written"
1092 );
1093 pad_datagram |= sent.requires_padding;
1094
1095 if sent.largest_acked.is_some() {
1096 self.spaces[space_id].pending_acks.acks_sent();
1097 self.timers.stop(Timer::MaxAckDelay);
1098 }
1099
1100 sent_frames = Some(sent);
1102
1103 }
1106
1107 if let Some(mut builder) = builder_storage {
1109 if pad_datagram {
1110 let min_size = self.pqc_state.min_initial_size();
1111 builder.pad_to(min_size);
1112 }
1113
1114 if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
1120 builder.pad_to(segment_size as u16);
1121 }
1122
1123 let last_packet_number = builder.exact_number;
1124 builder.finish_and_track(now, self, sent_frames, buf);
1125 self.path
1126 .congestion
1127 .on_sent(now, buf.len() as u64, last_packet_number);
1128
1129 #[cfg(feature = "__qlog")]
1130 self.emit_qlog_recovery_metrics(now);
1131 }
1132
1133 self.app_limited = buf.is_empty() && !congestion_blocked;
1134
1135 if buf.is_empty() && self.state.is_established() {
1137 let space_id = SpaceId::Data;
1138 let probe_size = self
1139 .path
1140 .mtud
1141 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
1142
1143 let buf_capacity = probe_size as usize;
1144 buf.reserve(buf_capacity);
1145
1146 let mut builder = PacketBuilder::new(
1147 now,
1148 space_id,
1149 self.rem_cids.active(),
1150 buf,
1151 buf_capacity,
1152 0,
1153 true,
1154 self,
1155 )?;
1156
1157 buf.write(frame::FrameType::PING);
1159 self.stats.frame_tx.ping += 1;
1160
1161 if self.peer_supports_ack_frequency() {
1163 buf.write(frame::FrameType::IMMEDIATE_ACK);
1164 self.stats.frame_tx.immediate_ack += 1;
1165 }
1166
1167 builder.pad_to(probe_size);
1168 let sent_frames = SentFrames {
1169 non_retransmits: true,
1170 ..Default::default()
1171 };
1172 builder.finish_and_track(now, self, Some(sent_frames), buf);
1173
1174 self.stats.path.sent_plpmtud_probes += 1;
1175 num_datagrams = 1;
1176
1177 trace!(?probe_size, "writing MTUD probe");
1178 }
1179
1180 if buf.is_empty() {
1181 return None;
1182 }
1183
1184 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1185 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1186
1187 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
1188
1189 #[cfg(feature = "trace")]
1191 {
1192 use crate::trace_packet_sent;
1193 let packet_num = self.spaces[SpaceId::Data]
1196 .next_packet_number
1197 .saturating_sub(1);
1198 trace_packet_sent!(
1199 &self.event_log,
1200 self.trace_context.trace_id(),
1201 buf.len() as u32,
1202 packet_num
1203 );
1204 }
1205
1206 Some(Transmit {
1207 destination: self.path.remote,
1208 size: buf.len(),
1209 ecn: if self.path.sending_ecn {
1210 Some(EcnCodepoint::Ect0)
1211 } else {
1212 None
1213 },
1214 segment_size: match num_datagrams {
1215 1 => None,
1216 _ => Some(segment_size),
1217 },
1218 src_ip: self.local_ip,
1219 })
1220 }
1221
1222 fn send_coordination_request(&mut self, _now: Instant, _buf: &mut Vec<u8>) -> Option<Transmit> {
1224 let nat = self.nat_traversal.as_mut()?;
1226 if !nat.should_send_punch_request() {
1227 return None;
1228 }
1229
1230 let coord = nat.coordination.as_ref()?;
1231 let round = coord.round;
1232 if coord.punch_targets.is_empty() {
1233 return None;
1234 }
1235
1236 trace!(
1237 "queuing PUNCH_ME_NOW round {} with {} targets",
1238 round,
1239 coord.punch_targets.len()
1240 );
1241
1242 for target in &coord.punch_targets {
1244 let punch = frame::PunchMeNow {
1245 round,
1246 paired_with_sequence_number: target.remote_sequence,
1247 address: target.remote_addr,
1248 target_peer_id: None,
1249 };
1250 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1251 }
1252
1253 nat.mark_punch_request_sent();
1255
1256 None
1258 }
1259
1260 fn send_coordinated_path_challenge(
1262 &mut self,
1263 now: Instant,
1264 buf: &mut Vec<u8>,
1265 ) -> Option<Transmit> {
1266 if let Some(nat_traversal) = &mut self.nat_traversal {
1268 if nat_traversal.should_start_punching(now) {
1269 nat_traversal.start_punching_phase(now);
1270 }
1271 }
1272
1273 let (target_addr, challenge) = {
1275 let nat_traversal = self.nat_traversal.as_ref()?;
1276 match nat_traversal.get_coordination_phase() {
1277 Some(CoordinationPhase::Punching) => {
1278 let targets = nat_traversal.get_punch_targets_from_coordination()?;
1279 if targets.is_empty() {
1280 return None;
1281 }
1282 let target = &targets[0];
1284 (target.remote_addr, target.challenge)
1285 }
1286 _ => return None,
1287 }
1288 };
1289
1290 debug_assert_eq!(
1291 self.highest_space,
1292 SpaceId::Data,
1293 "PATH_CHALLENGE queued without 1-RTT keys"
1294 );
1295
1296 #[cfg(feature = "pqc")]
1297 buf.reserve(self.pqc_state.min_initial_size() as usize);
1298 #[cfg(not(feature = "pqc"))]
1299 buf.reserve(MIN_INITIAL_SIZE as usize);
1300 let buf_capacity = buf.capacity();
1301
1302 let mut builder = PacketBuilder::new(
1303 now,
1304 SpaceId::Data,
1305 self.rem_cids.active(),
1306 buf,
1307 buf_capacity,
1308 0,
1309 false,
1310 self,
1311 )?;
1312
1313 trace!(
1314 "sending coordinated PATH_CHALLENGE {:08x} to {}",
1315 challenge, target_addr
1316 );
1317 buf.write(frame::FrameType::PATH_CHALLENGE);
1318 buf.write(challenge);
1319 self.stats.frame_tx.path_challenge += 1;
1320
1321 #[cfg(feature = "pqc")]
1322 let min_size = self.pqc_state.min_initial_size();
1323 #[cfg(not(feature = "pqc"))]
1324 let min_size = MIN_INITIAL_SIZE;
1325 builder.pad_to(min_size);
1326 builder.finish_and_track(now, self, None, buf);
1327
1328 if let Some(nat_traversal) = &mut self.nat_traversal {
1330 nat_traversal.mark_coordination_validating();
1331 }
1332
1333 Some(Transmit {
1334 destination: target_addr,
1335 size: buf.len(),
1336 ecn: if self.path.sending_ecn {
1337 Some(EcnCodepoint::Ect0)
1338 } else {
1339 None
1340 },
1341 segment_size: None,
1342 src_ip: self.local_ip,
1343 })
1344 }
1345
1346 fn send_nat_traversal_challenge(
1348 &mut self,
1349 now: Instant,
1350 buf: &mut Vec<u8>,
1351 ) -> Option<Transmit> {
1352 if let Some(request) = self.send_coordination_request(now, buf) {
1354 return Some(request);
1355 }
1356
1357 if let Some(punch) = self.send_coordinated_path_challenge(now, buf) {
1359 return Some(punch);
1360 }
1361
1362 let (remote_addr, remote_sequence) = {
1364 let nat_traversal = self.nat_traversal.as_ref()?;
1365 let candidates = nat_traversal.get_validation_candidates();
1366 if candidates.is_empty() {
1367 return None;
1368 }
1369 let (sequence, candidate) = candidates[0];
1371 (candidate.address, sequence)
1372 };
1373
1374 let challenge = self.rng.r#gen::<u64>();
1375
1376 if let Err(e) =
1378 self.nat_traversal
1379 .as_mut()?
1380 .start_validation(remote_sequence, challenge, now)
1381 {
1382 warn!("Failed to start NAT traversal validation: {}", e);
1383 return None;
1384 }
1385
1386 debug_assert_eq!(
1387 self.highest_space,
1388 SpaceId::Data,
1389 "PATH_CHALLENGE queued without 1-RTT keys"
1390 );
1391
1392 #[cfg(feature = "pqc")]
1393 buf.reserve(self.pqc_state.min_initial_size() as usize);
1394 #[cfg(not(feature = "pqc"))]
1395 buf.reserve(MIN_INITIAL_SIZE as usize);
1396 let buf_capacity = buf.capacity();
1397
1398 let mut builder = PacketBuilder::new(
1400 now,
1401 SpaceId::Data,
1402 self.rem_cids.active(),
1403 buf,
1404 buf_capacity,
1405 0,
1406 false,
1407 self,
1408 )?;
1409
1410 trace!(
1411 "sending PATH_CHALLENGE {:08x} to NAT candidate {}",
1412 challenge, remote_addr
1413 );
1414 buf.write(frame::FrameType::PATH_CHALLENGE);
1415 buf.write(challenge);
1416 self.stats.frame_tx.path_challenge += 1;
1417
1418 #[cfg(feature = "pqc")]
1420 let min_size = self.pqc_state.min_initial_size();
1421 #[cfg(not(feature = "pqc"))]
1422 let min_size = MIN_INITIAL_SIZE;
1423 builder.pad_to(min_size);
1424
1425 builder.finish_and_track(now, self, None, buf);
1426
1427 Some(Transmit {
1428 destination: remote_addr,
1429 size: buf.len(),
1430 ecn: if self.path.sending_ecn {
1431 Some(EcnCodepoint::Ect0)
1432 } else {
1433 None
1434 },
1435 segment_size: None,
1436 src_ip: self.local_ip,
1437 })
1438 }
1439
1440 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1442 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1443 if !prev_path.challenge_pending {
1444 return None;
1445 }
1446 prev_path.challenge_pending = false;
1447 let token = prev_path
1448 .challenge
1449 .expect("previous path challenge pending without token");
1450 let destination = prev_path.remote;
1451 debug_assert_eq!(
1452 self.highest_space,
1453 SpaceId::Data,
1454 "PATH_CHALLENGE queued without 1-RTT keys"
1455 );
1456 #[cfg(feature = "pqc")]
1457 buf.reserve(self.pqc_state.min_initial_size() as usize);
1458 #[cfg(not(feature = "pqc"))]
1459 buf.reserve(MIN_INITIAL_SIZE as usize);
1460
1461 let buf_capacity = buf.capacity();
1462
1463 let mut builder = PacketBuilder::new(
1469 now,
1470 SpaceId::Data,
1471 *prev_cid,
1472 buf,
1473 buf_capacity,
1474 0,
1475 false,
1476 self,
1477 )?;
1478 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1479 buf.write(frame::FrameType::PATH_CHALLENGE);
1480 buf.write(token);
1481 self.stats.frame_tx.path_challenge += 1;
1482
1483 #[cfg(feature = "pqc")]
1488 let min_size = self.pqc_state.min_initial_size();
1489 #[cfg(not(feature = "pqc"))]
1490 let min_size = MIN_INITIAL_SIZE;
1491 builder.pad_to(min_size);
1492
1493 builder.finish(self, buf);
1494 self.stats.udp_tx.on_sent(1, buf.len());
1495
1496 Some(Transmit {
1497 destination,
1498 size: buf.len(),
1499 ecn: None,
1500 segment_size: None,
1501 src_ip: self.local_ip,
1502 })
1503 }
1504
1505 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1507 if self.spaces[space_id].crypto.is_none()
1508 && (space_id != SpaceId::Data
1509 || self.zero_rtt_crypto.is_none()
1510 || self.side.is_server())
1511 {
1512 return SendableFrames::empty();
1514 }
1515 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1516 if space_id == SpaceId::Data {
1517 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1518 }
1519 can_send
1520 }
1521
1522 pub fn handle_event(&mut self, event: ConnectionEvent) {
1528 use ConnectionEventInner::*;
1529 match event.0 {
1530 Datagram(DatagramConnectionEvent {
1531 now,
1532 remote,
1533 ecn,
1534 first_decode,
1535 remaining,
1536 }) => {
1537 if remote != self.path.remote && !self.side.remote_may_migrate() {
1541 trace!("discarding packet from unrecognized peer {}", remote);
1542 return;
1543 }
1544
1545 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1546
1547 self.stats.udp_rx.datagrams += 1;
1548 self.stats.udp_rx.bytes += first_decode.len() as u64;
1549 let data_len = first_decode.len();
1550
1551 self.handle_decode(now, remote, ecn, first_decode);
1552 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1557
1558 if let Some(data) = remaining {
1559 self.stats.udp_rx.bytes += data.len() as u64;
1560 self.handle_coalesced(now, remote, ecn, data);
1561 }
1562
1563 #[cfg(feature = "__qlog")]
1564 self.emit_qlog_recovery_metrics(now);
1565
1566 if was_anti_amplification_blocked {
1567 self.set_loss_detection_timer(now);
1571 }
1572 }
1573 NewIdentifiers(ids, now) => {
1574 self.local_cid_state.new_cids(&ids, now);
1575 ids.into_iter().rev().for_each(|frame| {
1576 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1577 });
1578 if self.timers.get(Timer::PushNewCid).is_none_or(|x| x <= now) {
1580 self.reset_cid_retirement();
1581 }
1582 }
1583 QueueAddAddress(add) => {
1584 self.spaces[SpaceId::Data].pending.add_addresses.push(add);
1586 }
1587 QueuePunchMeNow(punch) => {
1588 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1590 }
1591 }
1592 }
1593
1594 pub fn handle_timeout(&mut self, now: Instant) {
1604 for &timer in &Timer::VALUES {
1605 if !self.timers.is_expired(timer, now) {
1606 continue;
1607 }
1608 self.timers.stop(timer);
1609 trace!(timer = ?timer, "timeout");
1610 match timer {
1611 Timer::Close => {
1612 self.state = State::Drained;
1613 self.endpoint_events.push_back(EndpointEventInner::Drained);
1614 }
1615 Timer::Idle => {
1616 self.kill(ConnectionError::TimedOut);
1617 }
1618 Timer::KeepAlive => {
1619 trace!("sending keep-alive");
1620 self.ping();
1621 }
1622 Timer::LossDetection => {
1623 self.on_loss_detection_timeout(now);
1624
1625 #[cfg(feature = "__qlog")]
1626 self.emit_qlog_recovery_metrics(now);
1627 }
1628 Timer::KeyDiscard => {
1629 self.zero_rtt_crypto = None;
1630 self.prev_crypto = None;
1631 }
1632 Timer::PathValidation => {
1633 debug!("path validation failed");
1634 if let Some((_, prev)) = self.prev_path.take() {
1635 self.path = prev;
1636 }
1637 self.path.challenge = None;
1638 self.path.challenge_pending = false;
1639 }
1640 Timer::Pacing => trace!("pacing timer expired"),
1641 Timer::NatTraversal => {
1642 self.handle_nat_traversal_timeout(now);
1643 }
1644 Timer::PushNewCid => {
1645 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1647 if !self.state.is_closed() {
1648 trace!(
1649 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1650 self.local_cid_state.retire_prior_to()
1651 );
1652 self.endpoint_events
1653 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1654 }
1655 }
1656 Timer::MaxAckDelay => {
1657 trace!("max ack delay reached");
1658 self.spaces[SpaceId::Data]
1660 .pending_acks
1661 .on_max_ack_delay_timeout()
1662 }
1663 }
1664 }
1665 }
1666
1667 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1679 self.close_inner(
1680 now,
1681 Close::Application(frame::ApplicationClose { error_code, reason }),
1682 )
1683 }
1684
1685 fn close_inner(&mut self, now: Instant, reason: Close) {
1686 let was_closed = self.state.is_closed();
1687 if !was_closed {
1688 self.close_common();
1689 self.set_close_timer(now);
1690 self.close = true;
1691 self.state = State::Closed(state::Closed { reason });
1692 }
1693 }
1694
1695 pub fn datagrams(&mut self) -> Datagrams<'_> {
1697 Datagrams { conn: self }
1698 }
1699
1700 pub fn stats(&self) -> ConnectionStats {
1702 let mut stats = self.stats;
1703 stats.path.rtt = self.path.rtt.get();
1704 stats.path.cwnd = self.path.congestion.window();
1705 stats.path.current_mtu = self.path.mtud.current_mtu();
1706
1707 stats
1708 }
1709
1710 pub fn set_token_binding_peer_id(&mut self, pid: PeerId) {
1712 self.peer_id_for_tokens = Some(pid);
1713 }
1714
1715 pub fn set_delay_new_token_until_binding(&mut self, v: bool) {
1717 self.delay_new_token_until_binding = v;
1718 }
1719
1720 pub fn ping(&mut self) {
1724 self.spaces[self.highest_space].ping_pending = true;
1725 }
1726
1727 pub(crate) fn is_pqc(&self) -> bool {
1729 self.pqc_state.using_pqc
1730 }
1731
1732 pub fn force_key_update(&mut self) {
1736 if !self.state.is_established() {
1737 debug!("ignoring forced key update in illegal state");
1738 return;
1739 }
1740 if self.prev_crypto.is_some() {
1741 debug!("ignoring redundant forced key update");
1744 return;
1745 }
1746 self.update_keys(None, false);
1747 }
1748
1749 #[doc(hidden)]
1751 #[deprecated]
1752 pub fn initiate_key_update(&mut self) {
1753 self.force_key_update();
1754 }
1755
1756 pub fn crypto_session(&self) -> &dyn crypto::Session {
1758 &*self.crypto
1759 }
1760
1761 pub fn is_handshaking(&self) -> bool {
1766 self.state.is_handshake()
1767 }
1768
1769 pub fn is_closed(&self) -> bool {
1777 self.state.is_closed()
1778 }
1779
1780 pub fn is_drained(&self) -> bool {
1785 self.state.is_drained()
1786 }
1787
1788 pub fn accepted_0rtt(&self) -> bool {
1792 self.accepted_0rtt
1793 }
1794
1795 pub fn has_0rtt(&self) -> bool {
1797 self.zero_rtt_enabled
1798 }
1799
1800 pub fn has_pending_retransmits(&self) -> bool {
1802 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1803 }
1804
1805 pub fn side(&self) -> Side {
1807 self.side.side()
1808 }
1809
1810 pub fn remote_address(&self) -> SocketAddr {
1812 self.path.remote
1813 }
1814
1815 pub fn local_ip(&self) -> Option<IpAddr> {
1825 self.local_ip
1826 }
1827
1828 pub fn rtt(&self) -> Duration {
1830 self.path.rtt.get()
1831 }
1832
1833 pub fn congestion_state(&self) -> &dyn Controller {
1835 self.path.congestion.as_ref()
1836 }
1837
1838 pub fn path_changed(&mut self, now: Instant) {
1849 self.path.reset(now, &self.config);
1850 }
1851
1852 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1857 self.streams.set_max_concurrent(dir, count);
1858 let pending = &mut self.spaces[SpaceId::Data].pending;
1861 self.streams.queue_max_stream_id(pending);
1862 }
1863
1864 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1870 self.streams.max_concurrent(dir)
1871 }
1872
1873 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1875 if self.streams.set_receive_window(receive_window) {
1876 self.spaces[SpaceId::Data].pending.max_data = true;
1877 }
1878 }
1879
1880 pub fn set_address_discovery_enabled(&mut self, enabled: bool) {
1882 if let Some(ref mut state) = self.address_discovery_state {
1883 state.enabled = enabled;
1884 }
1885 }
1886
1887 pub fn address_discovery_enabled(&self) -> bool {
1889 self.address_discovery_state
1890 .as_ref()
1891 .is_some_and(|state| state.enabled)
1892 }
1893
1894 pub fn observed_address(&self) -> Option<SocketAddr> {
1899 self.address_discovery_state
1900 .as_ref()
1901 .and_then(|state| state.get_observed_address(0)) }
1903
1904 #[allow(dead_code)]
1906 pub(crate) fn address_discovery_state(&self) -> Option<&AddressDiscoveryState> {
1907 self.address_discovery_state.as_ref()
1908 }
1909
1910 fn on_ack_received(
1911 &mut self,
1912 now: Instant,
1913 space: SpaceId,
1914 ack: frame::Ack,
1915 ) -> Result<(), TransportError> {
1916 if ack.largest >= self.spaces[space].next_packet_number {
1917 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1918 }
1919 let new_largest = {
1920 let space = &mut self.spaces[space];
1921 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
1922 space.largest_acked_packet = Some(ack.largest);
1923 if let Some(info) = space.sent_packets.get(&ack.largest) {
1924 space.largest_acked_packet_sent = info.time_sent;
1928 }
1929 true
1930 } else {
1931 false
1932 }
1933 };
1934
1935 let mut newly_acked = ArrayRangeSet::new();
1937 for range in ack.iter() {
1938 self.packet_number_filter.check_ack(space, range.clone())?;
1939 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1940 newly_acked.insert_one(pn);
1941 }
1942 }
1943
1944 if newly_acked.is_empty() {
1945 return Ok(());
1946 }
1947
1948 let mut ack_eliciting_acked = false;
1949 for packet in newly_acked.elts() {
1950 if let Some(info) = self.spaces[space].take(packet) {
1951 if let Some(acked) = info.largest_acked {
1952 self.spaces[space].pending_acks.subtract_below(acked);
1958 }
1959 ack_eliciting_acked |= info.ack_eliciting;
1960
1961 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1963 if mtu_updated {
1964 self.path
1965 .congestion
1966 .on_mtu_update(self.path.mtud.current_mtu());
1967 }
1968
1969 self.ack_frequency.on_acked(packet);
1971
1972 self.on_packet_acked(now, packet, info);
1973 }
1974 }
1975
1976 self.path.congestion.on_end_acks(
1977 now,
1978 self.path.in_flight.bytes,
1979 self.app_limited,
1980 self.spaces[space].largest_acked_packet,
1981 );
1982
1983 if new_largest && ack_eliciting_acked {
1984 let ack_delay = if space != SpaceId::Data {
1985 Duration::from_micros(0)
1986 } else {
1987 cmp::min(
1988 self.ack_frequency.peer_max_ack_delay,
1989 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1990 )
1991 };
1992 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1993 self.path.rtt.update(ack_delay, rtt);
1994 if self.path.first_packet_after_rtt_sample.is_none() {
1995 self.path.first_packet_after_rtt_sample =
1996 Some((space, self.spaces[space].next_packet_number));
1997 }
1998 }
1999
2000 self.detect_lost_packets(now, space, true);
2002
2003 if self.peer_completed_address_validation() {
2004 self.pto_count = 0;
2005 }
2006
2007 if self.path.sending_ecn {
2009 if let Some(ecn) = ack.ecn {
2010 if new_largest {
2015 let sent = self.spaces[space].largest_acked_packet_sent;
2016 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
2017 }
2018 } else {
2019 debug!("ECN not acknowledged by peer");
2021 self.path.sending_ecn = false;
2022 }
2023 }
2024
2025 self.set_loss_detection_timer(now);
2026 Ok(())
2027 }
2028
2029 fn process_ecn(
2031 &mut self,
2032 now: Instant,
2033 space: SpaceId,
2034 newly_acked: u64,
2035 ecn: frame::EcnCounts,
2036 largest_sent_time: Instant,
2037 ) {
2038 match self.spaces[space].detect_ecn(newly_acked, ecn) {
2039 Err(e) => {
2040 debug!("halting ECN due to verification failure: {}", e);
2041 self.path.sending_ecn = false;
2042 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
2045 }
2046 Ok(false) => {}
2047 Ok(true) => {
2048 self.stats.path.congestion_events += 1;
2049 self.path
2050 .congestion
2051 .on_congestion_event(now, largest_sent_time, false, 0);
2052 }
2053 }
2054 }
2055
2056 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
2059 self.remove_in_flight(pn, &info);
2060 if info.ack_eliciting && self.path.challenge.is_none() {
2061 self.path.congestion.on_ack(
2064 now,
2065 info.time_sent,
2066 info.size.into(),
2067 self.app_limited,
2068 &self.path.rtt,
2069 );
2070 }
2071
2072 if let Some(retransmits) = info.retransmits.get() {
2074 for (id, _) in retransmits.reset_stream.iter() {
2075 self.streams.reset_acked(*id);
2076 }
2077 }
2078
2079 for frame in info.stream_frames {
2080 self.streams.received_ack_of(frame);
2081 }
2082 }
2083
2084 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2085 let start = if self.zero_rtt_crypto.is_some() {
2086 now
2087 } else {
2088 self.prev_crypto
2089 .as_ref()
2090 .expect("no previous keys")
2091 .end_packet
2092 .as_ref()
2093 .expect("update not acknowledged yet")
2094 .1
2095 };
2096 self.timers
2097 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
2098 }
2099
2100 fn on_loss_detection_timeout(&mut self, now: Instant) {
2101 if let Some((_, pn_space)) = self.loss_time_and_space() {
2102 self.detect_lost_packets(now, pn_space, false);
2104 self.set_loss_detection_timer(now);
2105 return;
2106 }
2107
2108 let (_, space) = match self.pto_time_and_space(now) {
2109 Some(x) => x,
2110 None => {
2111 error!("PTO expired while unset");
2112 return;
2113 }
2114 };
2115 trace!(
2116 in_flight = self.path.in_flight.bytes,
2117 count = self.pto_count,
2118 ?space,
2119 "PTO fired"
2120 );
2121
2122 let count = match self.path.in_flight.ack_eliciting {
2123 0 => {
2126 debug_assert!(!self.peer_completed_address_validation());
2127 1
2128 }
2129 _ => 2,
2131 };
2132 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
2133 self.pto_count = self.pto_count.saturating_add(1);
2134 self.set_loss_detection_timer(now);
2135 }
2136
2137 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
2138 let mut lost_packets = Vec::<u64>::new();
2139 let mut lost_mtu_probe = None;
2140 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
2141 let rtt = self.path.rtt.conservative();
2142 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
2143
2144 let lost_send_time = now.checked_sub(loss_delay).unwrap();
2146 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
2147 let packet_threshold = self.config.packet_threshold as u64;
2148 let mut size_of_lost_packets = 0u64;
2149
2150 let congestion_period =
2154 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
2155 let mut persistent_congestion_start: Option<Instant> = None;
2156 let mut prev_packet = None;
2157 let mut in_persistent_congestion = false;
2158
2159 let space = &mut self.spaces[pn_space];
2160 space.loss_time = None;
2161
2162 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
2163 if prev_packet != Some(packet.wrapping_sub(1)) {
2164 persistent_congestion_start = None;
2166 }
2167
2168 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
2169 {
2170 if Some(packet) == in_flight_mtu_probe {
2171 lost_mtu_probe = in_flight_mtu_probe;
2174 } else {
2175 lost_packets.push(packet);
2176 size_of_lost_packets += info.size as u64;
2177 if info.ack_eliciting && due_to_ack {
2178 match persistent_congestion_start {
2179 Some(start) if info.time_sent - start > congestion_period => {
2182 in_persistent_congestion = true;
2183 }
2184 None if self
2186 .path
2187 .first_packet_after_rtt_sample
2188 .is_some_and(|x| x < (pn_space, packet)) =>
2189 {
2190 persistent_congestion_start = Some(info.time_sent);
2191 }
2192 _ => {}
2193 }
2194 }
2195 }
2196 } else {
2197 let next_loss_time = info.time_sent + loss_delay;
2198 space.loss_time = Some(
2199 space
2200 .loss_time
2201 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
2202 );
2203 persistent_congestion_start = None;
2204 }
2205
2206 prev_packet = Some(packet);
2207 }
2208
2209 if let Some(largest_lost) = lost_packets.last().cloned() {
2211 let old_bytes_in_flight = self.path.in_flight.bytes;
2212 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
2213 self.lost_packets += lost_packets.len() as u64;
2214 self.stats.path.lost_packets += lost_packets.len() as u64;
2215 self.stats.path.lost_bytes += size_of_lost_packets;
2216 trace!(
2217 "packets lost: {:?}, bytes lost: {}",
2218 lost_packets, size_of_lost_packets
2219 );
2220
2221 for &packet in &lost_packets {
2222 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2224 for frame in info.stream_frames {
2225 self.streams.retransmit(frame);
2226 }
2227 self.spaces[pn_space].pending |= info.retransmits;
2228 self.path.mtud.on_non_probe_lost(packet, info.size);
2229 }
2230
2231 if self.path.mtud.black_hole_detected(now) {
2232 self.stats.path.black_holes_detected += 1;
2233 self.path
2234 .congestion
2235 .on_mtu_update(self.path.mtud.current_mtu());
2236 if let Some(max_datagram_size) = self.datagrams().max_size() {
2237 self.datagrams.drop_oversized(max_datagram_size);
2238 }
2239 }
2240
2241 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
2243
2244 if lost_ack_eliciting {
2245 self.stats.path.congestion_events += 1;
2246 self.path.congestion.on_congestion_event(
2247 now,
2248 largest_lost_sent,
2249 in_persistent_congestion,
2250 size_of_lost_packets,
2251 );
2252 }
2253 }
2254
2255 if let Some(packet) = lost_mtu_probe {
2257 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2259 self.path.mtud.on_probe_lost();
2260 self.stats.path.lost_plpmtud_probes += 1;
2261 }
2262 }
2263
2264 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
2265 SpaceId::iter()
2266 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
2267 .min_by_key(|&(time, _)| time)
2268 }
2269
2270 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
2271 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
2272 let mut duration = self.path.rtt.pto_base() * backoff;
2273
2274 if self.path.in_flight.ack_eliciting == 0 {
2275 debug_assert!(!self.peer_completed_address_validation());
2276 let space = match self.highest_space {
2277 SpaceId::Handshake => SpaceId::Handshake,
2278 _ => SpaceId::Initial,
2279 };
2280 return Some((now + duration, space));
2281 }
2282
2283 let mut result = None;
2284 for space in SpaceId::iter() {
2285 if self.spaces[space].in_flight == 0 {
2286 continue;
2287 }
2288 if space == SpaceId::Data {
2289 if self.is_handshaking() {
2291 return result;
2292 }
2293 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2295 }
2296 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
2297 Some(time) => time,
2298 None => continue,
2299 };
2300 let pto = last_ack_eliciting + duration;
2301 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2302 result = Some((pto, space));
2303 }
2304 }
2305 result
2306 }
2307
2308 fn peer_completed_address_validation(&self) -> bool {
2309 if self.side.is_server() || self.state.is_closed() {
2310 return true;
2311 }
2312 self.spaces[SpaceId::Handshake]
2315 .largest_acked_packet
2316 .is_some()
2317 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
2318 || (self.spaces[SpaceId::Data].crypto.is_some()
2319 && self.spaces[SpaceId::Handshake].crypto.is_none())
2320 }
2321
2322 fn set_loss_detection_timer(&mut self, now: Instant) {
2323 if self.state.is_closed() {
2324 return;
2328 }
2329
2330 if let Some((loss_time, _)) = self.loss_time_and_space() {
2331 self.timers.set(Timer::LossDetection, loss_time);
2333 return;
2334 }
2335
2336 if self.path.anti_amplification_blocked(1) {
2337 self.timers.stop(Timer::LossDetection);
2339 return;
2340 }
2341
2342 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
2343 self.timers.stop(Timer::LossDetection);
2346 return;
2347 }
2348
2349 if let Some((timeout, _)) = self.pto_time_and_space(now) {
2352 self.timers.set(Timer::LossDetection, timeout);
2353 } else {
2354 self.timers.stop(Timer::LossDetection);
2355 }
2356 }
2357
2358 fn pto(&self, space: SpaceId) -> Duration {
2360 let max_ack_delay = match space {
2361 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
2362 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
2363 };
2364 self.path.rtt.pto_base() + max_ack_delay
2365 }
2366
2367 fn on_packet_authenticated(
2368 &mut self,
2369 now: Instant,
2370 space_id: SpaceId,
2371 ecn: Option<EcnCodepoint>,
2372 packet: Option<u64>,
2373 spin: bool,
2374 is_1rtt: bool,
2375 ) {
2376 self.total_authed_packets += 1;
2377 self.reset_keep_alive(now);
2378 self.reset_idle_timeout(now, space_id);
2379 self.permit_idle_reset = true;
2380 self.receiving_ecn |= ecn.is_some();
2381 if let Some(x) = ecn {
2382 let space = &mut self.spaces[space_id];
2383 space.ecn_counters += x;
2384
2385 if x.is_ce() {
2386 space.pending_acks.set_immediate_ack_required();
2387 }
2388 }
2389
2390 let packet = match packet {
2391 Some(x) => x,
2392 None => return,
2393 };
2394 if self.side.is_server() {
2395 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
2396 self.discard_space(now, SpaceId::Initial);
2398 }
2399 if self.zero_rtt_crypto.is_some() && is_1rtt {
2400 self.set_key_discard_timer(now, space_id)
2402 }
2403 }
2404 let space = &mut self.spaces[space_id];
2405 space.pending_acks.insert_one(packet, now);
2406 if packet >= space.rx_packet {
2407 space.rx_packet = packet;
2408 self.spin = self.side.is_client() ^ spin;
2410 }
2411 }
2412
2413 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
2414 let timeout = match self.idle_timeout {
2415 None => return,
2416 Some(dur) => dur,
2417 };
2418 if self.state.is_closed() {
2419 self.timers.stop(Timer::Idle);
2420 return;
2421 }
2422 let dt = cmp::max(timeout, 3 * self.pto(space));
2423 self.timers.set(Timer::Idle, now + dt);
2424 }
2425
2426 fn reset_keep_alive(&mut self, now: Instant) {
2427 let interval = match self.config.keep_alive_interval {
2428 Some(x) if self.state.is_established() => x,
2429 _ => return,
2430 };
2431 self.timers.set(Timer::KeepAlive, now + interval);
2432 }
2433
2434 fn reset_cid_retirement(&mut self) {
2435 if let Some(t) = self.local_cid_state.next_timeout() {
2436 self.timers.set(Timer::PushNewCid, t);
2437 }
2438 }
2439
2440 pub(crate) fn handle_first_packet(
2445 &mut self,
2446 now: Instant,
2447 remote: SocketAddr,
2448 ecn: Option<EcnCodepoint>,
2449 packet_number: u64,
2450 packet: InitialPacket,
2451 remaining: Option<BytesMut>,
2452 ) -> Result<(), ConnectionError> {
2453 let span = trace_span!("first recv");
2454 let _guard = span.enter();
2455 debug_assert!(self.side.is_server());
2456 let len = packet.header_data.len() + packet.payload.len();
2457 self.path.total_recvd = len as u64;
2458
2459 match self.state {
2460 State::Handshake(ref mut state) => {
2461 state.expected_token = packet.header.token.clone();
2462 }
2463 _ => unreachable!("first packet must be delivered in Handshake state"),
2464 }
2465
2466 self.on_packet_authenticated(
2467 now,
2468 SpaceId::Initial,
2469 ecn,
2470 Some(packet_number),
2471 false,
2472 false,
2473 );
2474
2475 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2476 if let Some(data) = remaining {
2477 self.handle_coalesced(now, remote, ecn, data);
2478 }
2479
2480 #[cfg(feature = "__qlog")]
2481 self.emit_qlog_recovery_metrics(now);
2482
2483 Ok(())
2484 }
2485
2486 fn init_0rtt(&mut self) {
2487 let (header, packet) = match self.crypto.early_crypto() {
2488 Some(x) => x,
2489 None => return,
2490 };
2491 if self.side.is_client() {
2492 match self.crypto.transport_parameters() {
2493 Ok(params) => {
2494 let params = params
2495 .expect("crypto layer didn't supply transport parameters with ticket");
2496 let params = TransportParameters {
2498 initial_src_cid: None,
2499 original_dst_cid: None,
2500 preferred_address: None,
2501 retry_src_cid: None,
2502 stateless_reset_token: None,
2503 min_ack_delay: None,
2504 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2505 max_ack_delay: TransportParameters::default().max_ack_delay,
2506 ..params
2507 };
2508 self.set_peer_params(params);
2509 }
2510 Err(e) => {
2511 error!("session ticket has malformed transport parameters: {}", e);
2512 return;
2513 }
2514 }
2515 }
2516 trace!("0-RTT enabled");
2517 self.zero_rtt_enabled = true;
2518 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2519 }
2520
2521 fn read_crypto(
2522 &mut self,
2523 space: SpaceId,
2524 crypto: &frame::Crypto,
2525 payload_len: usize,
2526 ) -> Result<(), TransportError> {
2527 let expected = if !self.state.is_handshake() {
2528 SpaceId::Data
2529 } else if self.highest_space == SpaceId::Initial {
2530 SpaceId::Initial
2531 } else {
2532 SpaceId::Handshake
2535 };
2536 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2540
2541 let end = crypto.offset + crypto.data.len() as u64;
2542 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2543 warn!(
2544 "received new {:?} CRYPTO data when expecting {:?}",
2545 space, expected
2546 );
2547 return Err(TransportError::PROTOCOL_VIOLATION(
2548 "new data at unexpected encryption level",
2549 ));
2550 }
2551
2552 #[cfg(feature = "pqc")]
2554 {
2555 self.pqc_state.detect_pqc_from_crypto(&crypto.data, space);
2556
2557 if self.pqc_state.should_trigger_mtu_discovery() {
2559 self.path
2561 .mtud
2562 .reset(self.pqc_state.min_initial_size(), self.config.min_mtu);
2563 trace!("Triggered MTU discovery for PQC handshake");
2564 }
2565 }
2566
2567 let space = &mut self.spaces[space];
2568 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2569 if max > self.config.crypto_buffer_size as u64 {
2570 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2571 }
2572
2573 space
2574 .crypto_stream
2575 .insert(crypto.offset, crypto.data.clone(), payload_len);
2576 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2577 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2578 if self.crypto.read_handshake(&chunk.bytes)? {
2579 self.events.push_back(Event::HandshakeDataReady);
2580 }
2581 }
2582
2583 Ok(())
2584 }
2585
2586 fn write_crypto(&mut self) {
2587 loop {
2588 let space = self.highest_space;
2589 let mut outgoing = Vec::new();
2590 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2591 match space {
2592 SpaceId::Initial => {
2593 self.upgrade_crypto(SpaceId::Handshake, crypto);
2594 }
2595 SpaceId::Handshake => {
2596 self.upgrade_crypto(SpaceId::Data, crypto);
2597 }
2598 _ => unreachable!("got updated secrets during 1-RTT"),
2599 }
2600 }
2601 if outgoing.is_empty() {
2602 if space == self.highest_space {
2603 break;
2604 } else {
2605 continue;
2607 }
2608 }
2609 let offset = self.spaces[space].crypto_offset;
2610 let outgoing = Bytes::from(outgoing);
2611 if let State::Handshake(ref mut state) = self.state {
2612 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2613 state.client_hello = Some(outgoing.clone());
2614 }
2615 }
2616 self.spaces[space].crypto_offset += outgoing.len() as u64;
2617 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2618
2619 #[cfg(feature = "pqc")]
2621 let use_pqc_fragmentation = self.pqc_state.using_pqc && outgoing.len() > 1200;
2622 #[cfg(not(feature = "pqc"))]
2623 let use_pqc_fragmentation = false;
2624
2625 if use_pqc_fragmentation {
2626 #[cfg(feature = "pqc")]
2628 {
2629 let frames = self.pqc_state.packet_handler.fragment_crypto_data(
2630 &outgoing,
2631 offset,
2632 self.pqc_state.min_initial_size() as usize,
2633 );
2634 for frame in frames {
2635 self.spaces[space].pending.crypto.push_back(frame);
2636 }
2637 }
2638 } else {
2639 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2641 offset,
2642 data: outgoing,
2643 });
2644 }
2645 }
2646 }
2647
2648 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2650 debug_assert!(
2651 self.spaces[space].crypto.is_none(),
2652 "already reached packet space {space:?}"
2653 );
2654 trace!("{:?} keys ready", space);
2655 if space == SpaceId::Data {
2656 self.next_crypto = Some(
2658 self.crypto
2659 .next_1rtt_keys()
2660 .expect("handshake should be complete"),
2661 );
2662 }
2663
2664 self.spaces[space].crypto = Some(crypto);
2665 debug_assert!(space as usize > self.highest_space as usize);
2666 self.highest_space = space;
2667 if space == SpaceId::Data && self.side.is_client() {
2668 self.zero_rtt_crypto = None;
2670 }
2671 }
2672
2673 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2674 debug_assert!(space_id != SpaceId::Data);
2675 trace!("discarding {:?} keys", space_id);
2676 if space_id == SpaceId::Initial {
2677 if let ConnectionSide::Client { token, .. } = &mut self.side {
2679 *token = Bytes::new();
2680 }
2681 }
2682 let space = &mut self.spaces[space_id];
2683 space.crypto = None;
2684 space.time_of_last_ack_eliciting_packet = None;
2685 space.loss_time = None;
2686 space.in_flight = 0;
2687 let sent_packets = mem::take(&mut space.sent_packets);
2688 for (pn, packet) in sent_packets.into_iter() {
2689 self.remove_in_flight(pn, &packet);
2690 }
2691 self.set_loss_detection_timer(now)
2692 }
2693
2694 fn handle_coalesced(
2695 &mut self,
2696 now: Instant,
2697 remote: SocketAddr,
2698 ecn: Option<EcnCodepoint>,
2699 data: BytesMut,
2700 ) {
2701 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2702 let mut remaining = Some(data);
2703 while let Some(data) = remaining {
2704 match PartialDecode::new(
2705 data,
2706 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2707 &[self.version],
2708 self.endpoint_config.grease_quic_bit,
2709 ) {
2710 Ok((partial_decode, rest)) => {
2711 remaining = rest;
2712 self.handle_decode(now, remote, ecn, partial_decode);
2713 }
2714 Err(e) => {
2715 trace!("malformed header: {}", e);
2716 return;
2717 }
2718 }
2719 }
2720 }
2721
2722 fn handle_decode(
2723 &mut self,
2724 now: Instant,
2725 remote: SocketAddr,
2726 ecn: Option<EcnCodepoint>,
2727 partial_decode: PartialDecode,
2728 ) {
2729 if let Some(decoded) = packet_crypto::unprotect_header(
2730 partial_decode,
2731 &self.spaces,
2732 self.zero_rtt_crypto.as_ref(),
2733 self.peer_params.stateless_reset_token,
2734 ) {
2735 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2736 }
2737 }
2738
2739 fn handle_packet(
2740 &mut self,
2741 now: Instant,
2742 remote: SocketAddr,
2743 ecn: Option<EcnCodepoint>,
2744 packet: Option<Packet>,
2745 stateless_reset: bool,
2746 ) {
2747 self.stats.udp_rx.ios += 1;
2748 if let Some(ref packet) = packet {
2749 trace!(
2750 "got {:?} packet ({} bytes) from {} using id {}",
2751 packet.header.space(),
2752 packet.payload.len() + packet.header_data.len(),
2753 remote,
2754 packet.header.dst_cid(),
2755 );
2756
2757 #[cfg(feature = "trace")]
2759 {
2760 use crate::trace_packet_received;
2761 let packet_size = packet.payload.len() + packet.header_data.len();
2763 trace_packet_received!(
2764 &self.event_log,
2765 self.trace_context.trace_id(),
2766 packet_size as u32,
2767 0 );
2769 }
2770 }
2771
2772 if self.is_handshaking() && remote != self.path.remote {
2773 debug!("discarding packet with unexpected remote during handshake");
2774 return;
2775 }
2776
2777 let was_closed = self.state.is_closed();
2778 let was_drained = self.state.is_drained();
2779
2780 let decrypted = match packet {
2781 None => Err(None),
2782 Some(mut packet) => self
2783 .decrypt_packet(now, &mut packet)
2784 .map(move |number| (packet, number)),
2785 };
2786 let result = match decrypted {
2787 _ if stateless_reset => {
2788 debug!("got stateless reset");
2789 Err(ConnectionError::Reset)
2790 }
2791 Err(Some(e)) => {
2792 warn!("illegal packet: {}", e);
2793 Err(e.into())
2794 }
2795 Err(None) => {
2796 debug!("failed to authenticate packet");
2797 self.authentication_failures += 1;
2798 let integrity_limit = self.spaces[self.highest_space]
2799 .crypto
2800 .as_ref()
2801 .unwrap()
2802 .packet
2803 .local
2804 .integrity_limit();
2805 if self.authentication_failures > integrity_limit {
2806 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2807 } else {
2808 return;
2809 }
2810 }
2811 Ok((packet, number)) => {
2812 let span = match number {
2813 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2814 None => trace_span!("recv", space = ?packet.header.space()),
2815 };
2816 let _guard = span.enter();
2817
2818 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2819 if number.is_some_and(is_duplicate) {
2820 debug!("discarding possible duplicate packet");
2821 return;
2822 } else if self.state.is_handshake() && packet.header.is_short() {
2823 trace!("dropping short packet during handshake");
2825 return;
2826 } else {
2827 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2828 if let State::Handshake(ref hs) = self.state {
2829 if self.side.is_server() && token != &hs.expected_token {
2830 warn!("discarding Initial with invalid retry token");
2834 return;
2835 }
2836 }
2837 }
2838
2839 if !self.state.is_closed() {
2840 let spin = match packet.header {
2841 Header::Short { spin, .. } => spin,
2842 _ => false,
2843 };
2844 self.on_packet_authenticated(
2845 now,
2846 packet.header.space(),
2847 ecn,
2848 number,
2849 spin,
2850 packet.header.is_1rtt(),
2851 );
2852 }
2853
2854 self.process_decrypted_packet(now, remote, number, packet)
2855 }
2856 }
2857 };
2858
2859 if let Err(conn_err) = result {
2861 self.error = Some(conn_err.clone());
2862 self.state = match conn_err {
2863 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2864 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2865 ConnectionError::Reset
2866 | ConnectionError::TransportError(TransportError {
2867 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2868 ..
2869 }) => State::Drained,
2870 ConnectionError::TimedOut => {
2871 unreachable!("timeouts aren't generated by packet processing");
2872 }
2873 ConnectionError::TransportError(err) => {
2874 debug!("closing connection due to transport error: {}", err);
2875 State::closed(err)
2876 }
2877 ConnectionError::VersionMismatch => State::Draining,
2878 ConnectionError::LocallyClosed => {
2879 unreachable!("LocallyClosed isn't generated by packet processing");
2880 }
2881 ConnectionError::CidsExhausted => {
2882 unreachable!("CidsExhausted isn't generated by packet processing");
2883 }
2884 };
2885 }
2886
2887 if !was_closed && self.state.is_closed() {
2888 self.close_common();
2889 if !self.state.is_drained() {
2890 self.set_close_timer(now);
2891 }
2892 }
2893 if !was_drained && self.state.is_drained() {
2894 self.endpoint_events.push_back(EndpointEventInner::Drained);
2895 self.timers.stop(Timer::Close);
2898 }
2899
2900 if let State::Closed(_) = self.state {
2902 self.close = remote == self.path.remote;
2903 }
2904 }
2905
2906 fn process_decrypted_packet(
2907 &mut self,
2908 now: Instant,
2909 remote: SocketAddr,
2910 number: Option<u64>,
2911 packet: Packet,
2912 ) -> Result<(), ConnectionError> {
2913 let state = match self.state {
2914 State::Established => {
2915 match packet.header.space() {
2916 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2917 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2918 _ => {
2919 trace!("discarding unexpected pre-handshake packet");
2920 }
2921 }
2922 return Ok(());
2923 }
2924 State::Closed(_) => {
2925 for result in frame::Iter::new(packet.payload.freeze())? {
2926 let frame = match result {
2927 Ok(frame) => frame,
2928 Err(err) => {
2929 debug!("frame decoding error: {err:?}");
2930 continue;
2931 }
2932 };
2933
2934 if let Frame::Padding = frame {
2935 continue;
2936 };
2937
2938 self.stats.frame_rx.record(&frame);
2939
2940 if let Frame::Close(_) = frame {
2941 trace!("draining");
2942 self.state = State::Draining;
2943 break;
2944 }
2945 }
2946 return Ok(());
2947 }
2948 State::Draining | State::Drained => return Ok(()),
2949 State::Handshake(ref mut state) => state,
2950 };
2951
2952 match packet.header {
2953 Header::Retry {
2954 src_cid: rem_cid, ..
2955 } => {
2956 if self.side.is_server() {
2957 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2958 }
2959
2960 if self.total_authed_packets > 1
2961 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2963 &self.rem_cids.active(),
2964 &packet.header_data,
2965 &packet.payload,
2966 )
2967 {
2968 trace!("discarding invalid Retry");
2969 return Ok(());
2977 }
2978
2979 trace!("retrying with CID {}", rem_cid);
2980 let client_hello = state.client_hello.take().unwrap();
2981 self.retry_src_cid = Some(rem_cid);
2982 self.rem_cids.update_initial_cid(rem_cid);
2983 self.rem_handshake_cid = rem_cid;
2984
2985 let space = &mut self.spaces[SpaceId::Initial];
2986 if let Some(info) = space.take(0) {
2987 self.on_packet_acked(now, 0, info);
2988 };
2989
2990 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
2992 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2993 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2994 crypto_offset: client_hello.len() as u64,
2995 ..PacketSpace::new(now)
2996 };
2997 self.spaces[SpaceId::Initial]
2998 .pending
2999 .crypto
3000 .push_back(frame::Crypto {
3001 offset: 0,
3002 data: client_hello,
3003 });
3004
3005 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
3007 for (pn, info) in zero_rtt {
3008 self.remove_in_flight(pn, &info);
3009 self.spaces[SpaceId::Data].pending |= info.retransmits;
3010 }
3011 self.streams.retransmit_all_for_0rtt();
3012
3013 let token_len = packet.payload.len() - 16;
3014 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3015 unreachable!("we already short-circuited if we're server");
3016 };
3017 *token = packet.payload.freeze().split_to(token_len);
3018 self.state = State::Handshake(state::Handshake {
3019 expected_token: Bytes::new(),
3020 rem_cid_set: false,
3021 client_hello: None,
3022 });
3023 Ok(())
3024 }
3025 Header::Long {
3026 ty: LongType::Handshake,
3027 src_cid: rem_cid,
3028 ..
3029 } => {
3030 if rem_cid != self.rem_handshake_cid {
3031 debug!(
3032 "discarding packet with mismatched remote CID: {} != {}",
3033 self.rem_handshake_cid, rem_cid
3034 );
3035 return Ok(());
3036 }
3037 self.on_path_validated();
3038
3039 self.process_early_payload(now, packet)?;
3040 if self.state.is_closed() {
3041 return Ok(());
3042 }
3043
3044 if self.crypto.is_handshaking() {
3045 trace!("handshake ongoing");
3046 return Ok(());
3047 }
3048
3049 if self.side.is_client() {
3050 let params =
3052 self.crypto
3053 .transport_parameters()?
3054 .ok_or_else(|| TransportError {
3055 code: TransportErrorCode::crypto(0x6d),
3056 frame: None,
3057 reason: "transport parameters missing".into(),
3058 })?;
3059
3060 if self.has_0rtt() {
3061 if !self.crypto.early_data_accepted().unwrap() {
3062 debug_assert!(self.side.is_client());
3063 debug!("0-RTT rejected");
3064 self.accepted_0rtt = false;
3065 self.streams.zero_rtt_rejected();
3066
3067 self.spaces[SpaceId::Data].pending = Retransmits::default();
3069
3070 let sent_packets =
3072 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
3073 for (pn, packet) in sent_packets {
3074 self.remove_in_flight(pn, &packet);
3075 }
3076 } else {
3077 self.accepted_0rtt = true;
3078 params.validate_resumption_from(&self.peer_params)?;
3079 }
3080 }
3081 if let Some(token) = params.stateless_reset_token {
3082 self.endpoint_events
3083 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
3084 }
3085 self.handle_peer_params(params)?;
3086 self.issue_first_cids(now);
3087 } else {
3088 self.spaces[SpaceId::Data].pending.handshake_done = true;
3090 self.discard_space(now, SpaceId::Handshake);
3091 }
3092
3093 self.events.push_back(Event::Connected);
3094 self.state = State::Established;
3095 trace!("established");
3096 Ok(())
3097 }
3098 Header::Initial(InitialHeader {
3099 src_cid: rem_cid, ..
3100 }) => {
3101 if !state.rem_cid_set {
3102 trace!("switching remote CID to {}", rem_cid);
3103 let mut state = state.clone();
3104 self.rem_cids.update_initial_cid(rem_cid);
3105 self.rem_handshake_cid = rem_cid;
3106 self.orig_rem_cid = rem_cid;
3107 state.rem_cid_set = true;
3108 self.state = State::Handshake(state);
3109 } else if rem_cid != self.rem_handshake_cid {
3110 debug!(
3111 "discarding packet with mismatched remote CID: {} != {}",
3112 self.rem_handshake_cid, rem_cid
3113 );
3114 return Ok(());
3115 }
3116
3117 let starting_space = self.highest_space;
3118 self.process_early_payload(now, packet)?;
3119
3120 if self.side.is_server()
3121 && starting_space == SpaceId::Initial
3122 && self.highest_space != SpaceId::Initial
3123 {
3124 let params =
3125 self.crypto
3126 .transport_parameters()?
3127 .ok_or_else(|| TransportError {
3128 code: TransportErrorCode::crypto(0x6d),
3129 frame: None,
3130 reason: "transport parameters missing".into(),
3131 })?;
3132 self.handle_peer_params(params)?;
3133 self.issue_first_cids(now);
3134 self.init_0rtt();
3135 }
3136 Ok(())
3137 }
3138 Header::Long {
3139 ty: LongType::ZeroRtt,
3140 ..
3141 } => {
3142 self.process_payload(now, remote, number.unwrap(), packet)?;
3143 Ok(())
3144 }
3145 Header::VersionNegotiate { .. } => {
3146 if self.total_authed_packets > 1 {
3147 return Ok(());
3148 }
3149 let supported = packet
3150 .payload
3151 .chunks(4)
3152 .any(|x| match <[u8; 4]>::try_from(x) {
3153 Ok(version) => self.version == u32::from_be_bytes(version),
3154 Err(_) => false,
3155 });
3156 if supported {
3157 return Ok(());
3158 }
3159 debug!("remote doesn't support our version");
3160 Err(ConnectionError::VersionMismatch)
3161 }
3162 Header::Short { .. } => unreachable!(
3163 "short packets received during handshake are discarded in handle_packet"
3164 ),
3165 }
3166 }
3167
3168 fn process_early_payload(
3170 &mut self,
3171 now: Instant,
3172 packet: Packet,
3173 ) -> Result<(), TransportError> {
3174 debug_assert_ne!(packet.header.space(), SpaceId::Data);
3175 let payload_len = packet.payload.len();
3176 let mut ack_eliciting = false;
3177 for result in frame::Iter::new(packet.payload.freeze())? {
3178 let frame = result?;
3179 let span = match frame {
3180 Frame::Padding => continue,
3181 _ => Some(trace_span!("frame", ty = %frame.ty())),
3182 };
3183
3184 self.stats.frame_rx.record(&frame);
3185
3186 let _guard = span.as_ref().map(|x| x.enter());
3187 ack_eliciting |= frame.is_ack_eliciting();
3188
3189 match frame {
3191 Frame::Padding | Frame::Ping => {}
3192 Frame::Crypto(frame) => {
3193 self.read_crypto(packet.header.space(), &frame, payload_len)?;
3194 }
3195 Frame::Ack(ack) => {
3196 self.on_ack_received(now, packet.header.space(), ack)?;
3197 }
3198 Frame::Close(reason) => {
3199 self.error = Some(reason.into());
3200 self.state = State::Draining;
3201 return Ok(());
3202 }
3203 _ => {
3204 let mut err =
3205 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
3206 err.frame = Some(frame.ty());
3207 return Err(err);
3208 }
3209 }
3210 }
3211
3212 if ack_eliciting {
3213 self.spaces[packet.header.space()]
3215 .pending_acks
3216 .set_immediate_ack_required();
3217 }
3218
3219 self.write_crypto();
3220 Ok(())
3221 }
3222
3223 fn process_payload(
3224 &mut self,
3225 now: Instant,
3226 remote: SocketAddr,
3227 number: u64,
3228 packet: Packet,
3229 ) -> Result<(), TransportError> {
3230 let payload = packet.payload.freeze();
3231 let mut is_probing_packet = true;
3232 let mut close = None;
3233 let payload_len = payload.len();
3234 let mut ack_eliciting = false;
3235 for result in frame::Iter::new(payload)? {
3236 let frame = result?;
3237 let span = match frame {
3238 Frame::Padding => continue,
3239 _ => Some(trace_span!("frame", ty = %frame.ty())),
3240 };
3241
3242 self.stats.frame_rx.record(&frame);
3243 match &frame {
3246 Frame::Crypto(f) => {
3247 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
3248 }
3249 Frame::Stream(f) => {
3250 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
3251 }
3252 Frame::Datagram(f) => {
3253 trace!(len = f.data.len(), "got datagram frame");
3254 }
3255 f => {
3256 trace!("got frame {:?}", f);
3257 }
3258 }
3259
3260 let _guard = span.as_ref().map(|x| x.enter());
3261 if packet.header.is_0rtt() {
3262 match frame {
3263 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
3264 return Err(TransportError::PROTOCOL_VIOLATION(
3265 "illegal frame type in 0-RTT",
3266 ));
3267 }
3268 _ => {}
3269 }
3270 }
3271 ack_eliciting |= frame.is_ack_eliciting();
3272
3273 match frame {
3275 Frame::Padding
3276 | Frame::PathChallenge(_)
3277 | Frame::PathResponse(_)
3278 | Frame::NewConnectionId(_) => {}
3279 _ => {
3280 is_probing_packet = false;
3281 }
3282 }
3283 match frame {
3284 Frame::Crypto(frame) => {
3285 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
3286 }
3287 Frame::Stream(frame) => {
3288 if self.streams.received(frame, payload_len)?.should_transmit() {
3289 self.spaces[SpaceId::Data].pending.max_data = true;
3290 }
3291 }
3292 Frame::Ack(ack) => {
3293 self.on_ack_received(now, SpaceId::Data, ack)?;
3294 }
3295 Frame::Padding | Frame::Ping => {}
3296 Frame::Close(reason) => {
3297 close = Some(reason);
3298 }
3299 Frame::PathChallenge(token) => {
3300 self.path_responses.push(number, token, remote);
3301 if remote == self.path.remote {
3302 match self.peer_supports_ack_frequency() {
3305 true => self.immediate_ack(),
3306 false => self.ping(),
3307 }
3308 }
3309 }
3310 Frame::PathResponse(token) => {
3311 if self.path.challenge == Some(token) && remote == self.path.remote {
3312 trace!("new path validated");
3313 self.timers.stop(Timer::PathValidation);
3314 self.path.challenge = None;
3315 self.path.validated = true;
3316 if let Some((_, ref mut prev_path)) = self.prev_path {
3317 prev_path.challenge = None;
3318 prev_path.challenge_pending = false;
3319 }
3320 self.on_path_validated();
3321 } else if let Some(nat_traversal) = &mut self.nat_traversal {
3322 match nat_traversal.handle_validation_success(remote, token, now) {
3324 Ok(sequence) => {
3325 trace!(
3326 "NAT traversal candidate {} validated for sequence {}",
3327 remote, sequence
3328 );
3329
3330 if nat_traversal.handle_coordination_success(remote, now) {
3332 trace!("Coordination succeeded via {}", remote);
3333
3334 let can_migrate = match &self.side {
3336 ConnectionSide::Client { .. } => true, ConnectionSide::Server { server_config } => {
3338 server_config.migration
3339 }
3340 };
3341
3342 if can_migrate {
3343 let best_pairs = nat_traversal.get_best_succeeded_pairs();
3345 if let Some(best) = best_pairs.first() {
3346 if best.remote_addr == remote
3347 && best.remote_addr != self.path.remote
3348 {
3349 debug!(
3350 "NAT traversal found better path, initiating migration"
3351 );
3352 if let Err(e) =
3354 self.migrate_to_nat_traversal_path(now)
3355 {
3356 warn!(
3357 "Failed to migrate to NAT traversal path: {:?}",
3358 e
3359 );
3360 }
3361 }
3362 }
3363 }
3364 } else {
3365 if nat_traversal.mark_pair_succeeded(remote) {
3367 trace!("NAT traversal pair succeeded for {}", remote);
3368 }
3369 }
3370 }
3371 Err(NatTraversalError::ChallengeMismatch) => {
3372 debug!(
3373 "PATH_RESPONSE challenge mismatch for NAT candidate {}",
3374 remote
3375 );
3376 }
3377 Err(e) => {
3378 debug!("NAT traversal validation error: {}", e);
3379 }
3380 }
3381 } else {
3382 debug!(token, "ignoring invalid PATH_RESPONSE");
3383 }
3384 }
3385 Frame::MaxData(bytes) => {
3386 self.streams.received_max_data(bytes);
3387 }
3388 Frame::MaxStreamData { id, offset } => {
3389 self.streams.received_max_stream_data(id, offset)?;
3390 }
3391 Frame::MaxStreams { dir, count } => {
3392 self.streams.received_max_streams(dir, count)?;
3393 }
3394 Frame::ResetStream(frame) => {
3395 if self.streams.received_reset(frame)?.should_transmit() {
3396 self.spaces[SpaceId::Data].pending.max_data = true;
3397 }
3398 }
3399 Frame::DataBlocked { offset } => {
3400 debug!(offset, "peer claims to be blocked at connection level");
3401 }
3402 Frame::StreamDataBlocked { id, offset } => {
3403 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
3404 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
3405 return Err(TransportError::STREAM_STATE_ERROR(
3406 "STREAM_DATA_BLOCKED on send-only stream",
3407 ));
3408 }
3409 debug!(
3410 stream = %id,
3411 offset, "peer claims to be blocked at stream level"
3412 );
3413 }
3414 Frame::StreamsBlocked { dir, limit } => {
3415 if limit > MAX_STREAM_COUNT {
3416 return Err(TransportError::FRAME_ENCODING_ERROR(
3417 "unrepresentable stream limit",
3418 ));
3419 }
3420 debug!(
3421 "peer claims to be blocked opening more than {} {} streams",
3422 limit, dir
3423 );
3424 }
3425 Frame::StopSending(frame::StopSending { id, error_code }) => {
3426 if id.initiator() != self.side.side() {
3427 if id.dir() == Dir::Uni {
3428 debug!("got STOP_SENDING on recv-only {}", id);
3429 return Err(TransportError::STREAM_STATE_ERROR(
3430 "STOP_SENDING on recv-only stream",
3431 ));
3432 }
3433 } else if self.streams.is_local_unopened(id) {
3434 return Err(TransportError::STREAM_STATE_ERROR(
3435 "STOP_SENDING on unopened stream",
3436 ));
3437 }
3438 self.streams.received_stop_sending(id, error_code);
3439 }
3440 Frame::RetireConnectionId { sequence } => {
3441 let allow_more_cids = self
3442 .local_cid_state
3443 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
3444 self.endpoint_events
3445 .push_back(EndpointEventInner::RetireConnectionId(
3446 now,
3447 sequence,
3448 allow_more_cids,
3449 ));
3450 }
3451 Frame::NewConnectionId(frame) => {
3452 trace!(
3453 sequence = frame.sequence,
3454 id = %frame.id,
3455 retire_prior_to = frame.retire_prior_to,
3456 );
3457 if self.rem_cids.active().is_empty() {
3458 return Err(TransportError::PROTOCOL_VIOLATION(
3459 "NEW_CONNECTION_ID when CIDs aren't in use",
3460 ));
3461 }
3462 if frame.retire_prior_to > frame.sequence {
3463 return Err(TransportError::PROTOCOL_VIOLATION(
3464 "NEW_CONNECTION_ID retiring unissued CIDs",
3465 ));
3466 }
3467
3468 use crate::cid_queue::InsertError;
3469 match self.rem_cids.insert(frame) {
3470 Ok(None) => {}
3471 Ok(Some((retired, reset_token))) => {
3472 let pending_retired =
3473 &mut self.spaces[SpaceId::Data].pending.retire_cids;
3474 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
3477 if (pending_retired.len() as u64)
3480 .saturating_add(retired.end.saturating_sub(retired.start))
3481 > MAX_PENDING_RETIRED_CIDS
3482 {
3483 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
3484 "queued too many retired CIDs",
3485 ));
3486 }
3487 pending_retired.extend(retired);
3488 self.set_reset_token(reset_token);
3489 }
3490 Err(InsertError::ExceedsLimit) => {
3491 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
3492 }
3493 Err(InsertError::Retired) => {
3494 trace!("discarding already-retired");
3495 self.spaces[SpaceId::Data]
3499 .pending
3500 .retire_cids
3501 .push(frame.sequence);
3502 continue;
3503 }
3504 };
3505
3506 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
3507 self.update_rem_cid();
3510 }
3511 }
3512 Frame::NewToken(NewToken { token }) => {
3513 let ConnectionSide::Client {
3514 token_store,
3515 server_name,
3516 ..
3517 } = &self.side
3518 else {
3519 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
3520 };
3521 if token.is_empty() {
3522 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
3523 }
3524 trace!("got new token");
3525 token_store.insert(server_name, token);
3526 }
3527 Frame::Datagram(datagram) => {
3528 if self
3529 .datagrams
3530 .received(datagram, &self.config.datagram_receive_buffer_size)?
3531 {
3532 self.events.push_back(Event::DatagramReceived);
3533 }
3534 }
3535 Frame::AckFrequency(ack_frequency) => {
3536 let space = &mut self.spaces[SpaceId::Data];
3538
3539 if !self
3540 .ack_frequency
3541 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3542 {
3543 continue;
3545 }
3546
3547 if let Some(timeout) = space
3550 .pending_acks
3551 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3552 {
3553 self.timers.set(Timer::MaxAckDelay, timeout);
3554 }
3555 }
3556 Frame::ImmediateAck => {
3557 self.spaces[SpaceId::Data]
3559 .pending_acks
3560 .set_immediate_ack_required();
3561 }
3562 Frame::HandshakeDone => {
3563 if self.side.is_server() {
3564 return Err(TransportError::PROTOCOL_VIOLATION(
3565 "client sent HANDSHAKE_DONE",
3566 ));
3567 }
3568 if self.spaces[SpaceId::Handshake].crypto.is_some() {
3569 self.discard_space(now, SpaceId::Handshake);
3570 }
3571 }
3572 Frame::AddAddress(add_address) => {
3573 self.handle_add_address(&add_address, now)?;
3574 }
3575 Frame::PunchMeNow(punch_me_now) => {
3576 self.handle_punch_me_now(&punch_me_now, now)?;
3577 }
3578 Frame::RemoveAddress(remove_address) => {
3579 self.handle_remove_address(&remove_address)?;
3580 }
3581 Frame::ObservedAddress(observed_address) => {
3582 self.handle_observed_address_frame(&observed_address, now)?;
3583 }
3584 }
3585 }
3586
3587 let space = &mut self.spaces[SpaceId::Data];
3588 if space
3589 .pending_acks
3590 .packet_received(now, number, ack_eliciting, &space.dedup)
3591 {
3592 self.timers
3593 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3594 }
3595
3596 let pending = &mut self.spaces[SpaceId::Data].pending;
3601 self.streams.queue_max_stream_id(pending);
3602
3603 if let Some(reason) = close {
3604 self.error = Some(reason.into());
3605 self.state = State::Draining;
3606 self.close = true;
3607 }
3608
3609 if remote != self.path.remote
3610 && !is_probing_packet
3611 && number == self.spaces[SpaceId::Data].rx_packet
3612 {
3613 let ConnectionSide::Server { ref server_config } = self.side else {
3614 return Err(TransportError::PROTOCOL_VIOLATION(
3615 "packets from unknown remote should be dropped by clients",
3616 ));
3617 };
3618 debug_assert!(
3619 server_config.migration,
3620 "migration-initiating packets should have been dropped immediately"
3621 );
3622 self.migrate(now, remote);
3623 self.update_rem_cid();
3625 self.spin = false;
3626 }
3627
3628 Ok(())
3629 }
3630
3631 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3632 trace!(%remote, "migration initiated");
3633 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3637 PathData::from_previous(remote, &self.path, now)
3638 } else {
3639 let peer_max_udp_payload_size =
3640 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3641 .unwrap_or(u16::MAX);
3642 PathData::new(
3643 remote,
3644 self.allow_mtud,
3645 Some(peer_max_udp_payload_size),
3646 now,
3647 &self.config,
3648 )
3649 };
3650 new_path.challenge = Some(self.rng.r#gen());
3651 new_path.challenge_pending = true;
3652 let prev_pto = self.pto(SpaceId::Data);
3653
3654 let mut prev = mem::replace(&mut self.path, new_path);
3655 if prev.challenge.is_none() {
3657 prev.challenge = Some(self.rng.r#gen());
3658 prev.challenge_pending = true;
3659 self.prev_path = Some((self.rem_cids.active(), prev));
3662 }
3663
3664 self.timers.set(
3665 Timer::PathValidation,
3666 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3667 );
3668 }
3669
3670 pub fn local_address_changed(&mut self) {
3672 self.update_rem_cid();
3673 self.ping();
3674 }
3675
3676 pub fn migrate_to_nat_traversal_path(&mut self, now: Instant) -> Result<(), TransportError> {
3678 let (remote_addr, local_addr) = {
3680 let nat_state = self
3681 .nat_traversal
3682 .as_ref()
3683 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
3684
3685 let best_pairs = nat_state.get_best_succeeded_pairs();
3687 if best_pairs.is_empty() {
3688 return Err(TransportError::PROTOCOL_VIOLATION(
3689 "No validated NAT traversal paths",
3690 ));
3691 }
3692
3693 let best_path = best_pairs
3695 .iter()
3696 .find(|pair| pair.remote_addr != self.path.remote)
3697 .or_else(|| best_pairs.first());
3698
3699 let best_path = best_path.ok_or_else(|| {
3700 TransportError::PROTOCOL_VIOLATION("No suitable NAT traversal path")
3701 })?;
3702
3703 debug!(
3704 "Migrating to NAT traversal path: {} -> {} (priority: {})",
3705 self.path.remote, best_path.remote_addr, best_path.priority
3706 );
3707
3708 (best_path.remote_addr, best_path.local_addr)
3709 };
3710
3711 self.migrate(now, remote_addr);
3713
3714 if local_addr != SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0) {
3716 self.local_ip = Some(local_addr.ip());
3717 }
3718
3719 self.path.challenge_pending = true;
3721
3722 Ok(())
3723 }
3724
3725 fn update_rem_cid(&mut self) {
3727 let (reset_token, retired) = match self.rem_cids.next() {
3728 Some(x) => x,
3729 None => return,
3730 };
3731
3732 self.spaces[SpaceId::Data]
3734 .pending
3735 .retire_cids
3736 .extend(retired);
3737 self.set_reset_token(reset_token);
3738 }
3739
3740 fn set_reset_token(&mut self, reset_token: ResetToken) {
3741 self.endpoint_events
3742 .push_back(EndpointEventInner::ResetToken(
3743 self.path.remote,
3744 reset_token,
3745 ));
3746 self.peer_params.stateless_reset_token = Some(reset_token);
3747 }
3748
3749 fn issue_first_cids(&mut self, now: Instant) {
3751 if self.local_cid_state.cid_len() == 0 {
3752 return;
3753 }
3754
3755 let mut n = self.peer_params.issue_cids_limit() - 1;
3757 if let ConnectionSide::Server { server_config } = &self.side {
3758 if server_config.has_preferred_address() {
3759 n -= 1;
3761 }
3762 }
3763 self.endpoint_events
3764 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3765 }
3766
3767 fn populate_packet(
3768 &mut self,
3769 now: Instant,
3770 space_id: SpaceId,
3771 buf: &mut Vec<u8>,
3772 max_size: usize,
3773 pn: u64,
3774 ) -> SentFrames {
3775 let mut sent = SentFrames::default();
3776 let space = &mut self.spaces[space_id];
3777 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3778 space.pending_acks.maybe_ack_non_eliciting();
3779
3780 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3782 buf.write(frame::FrameType::HANDSHAKE_DONE);
3783 sent.retransmits.get_or_create().handshake_done = true;
3784 self.stats.frame_tx.handshake_done =
3786 self.stats.frame_tx.handshake_done.saturating_add(1);
3787 }
3788
3789 if mem::replace(&mut space.ping_pending, false) {
3791 trace!("PING");
3792 buf.write(frame::FrameType::PING);
3793 sent.non_retransmits = true;
3794 self.stats.frame_tx.ping += 1;
3795 }
3796
3797 if mem::replace(&mut space.immediate_ack_pending, false) {
3799 trace!("IMMEDIATE_ACK");
3800 buf.write(frame::FrameType::IMMEDIATE_ACK);
3801 sent.non_retransmits = true;
3802 self.stats.frame_tx.immediate_ack += 1;
3803 }
3804
3805 if space.pending_acks.can_send() {
3807 Self::populate_acks(
3808 now,
3809 self.receiving_ecn,
3810 &mut sent,
3811 space,
3812 buf,
3813 &mut self.stats,
3814 );
3815 }
3816
3817 if mem::replace(&mut space.pending.ack_frequency, false) {
3819 let sequence_number = self.ack_frequency.next_sequence_number();
3820
3821 let config = self.config.ack_frequency_config.as_ref().unwrap();
3823
3824 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3826 self.path.rtt.get(),
3827 config,
3828 &self.peer_params,
3829 );
3830
3831 trace!(?max_ack_delay, "ACK_FREQUENCY");
3832
3833 frame::AckFrequency {
3834 sequence: sequence_number,
3835 ack_eliciting_threshold: config.ack_eliciting_threshold,
3836 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3837 reordering_threshold: config.reordering_threshold,
3838 }
3839 .encode(buf);
3840
3841 sent.retransmits.get_or_create().ack_frequency = true;
3842
3843 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3844 self.stats.frame_tx.ack_frequency += 1;
3845 }
3846
3847 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3849 if let Some(token) = self.path.challenge {
3851 self.path.challenge_pending = false;
3853 sent.non_retransmits = true;
3854 sent.requires_padding = true;
3855 trace!("PATH_CHALLENGE {:08x}", token);
3856 buf.write(frame::FrameType::PATH_CHALLENGE);
3857 buf.write(token);
3858 self.stats.frame_tx.path_challenge += 1;
3859 }
3860
3861 }
3870
3871 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3873 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3874 sent.non_retransmits = true;
3875 sent.requires_padding = true;
3876 trace!("PATH_RESPONSE {:08x}", token);
3877 buf.write(frame::FrameType::PATH_RESPONSE);
3878 buf.write(token);
3879 self.stats.frame_tx.path_response += 1;
3880 }
3881 }
3882
3883 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3885 let mut frame = match space.pending.crypto.pop_front() {
3886 Some(x) => x,
3887 None => break,
3888 };
3889
3890 let max_crypto_data_size = max_size
3895 - buf.len()
3896 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3898 - 2; let available_space = max_size - buf.len();
3902 let remaining_data = frame.data.len();
3903 #[cfg(feature = "pqc")]
3904 let optimal_size = self
3905 .pqc_state
3906 .calculate_crypto_frame_size(available_space, remaining_data);
3907 #[cfg(not(feature = "pqc"))]
3908 let optimal_size = available_space.min(remaining_data);
3909
3910 let len = frame
3911 .data
3912 .len()
3913 .min(2usize.pow(14) - 1)
3914 .min(max_crypto_data_size)
3915 .min(optimal_size);
3916
3917 let data = frame.data.split_to(len);
3918 let truncated = frame::Crypto {
3919 offset: frame.offset,
3920 data,
3921 };
3922 trace!(
3923 "CRYPTO: off {} len {}",
3924 truncated.offset,
3925 truncated.data.len()
3926 );
3927 truncated.encode(buf);
3928 self.stats.frame_tx.crypto += 1;
3929 sent.retransmits.get_or_create().crypto.push_back(truncated);
3930 if !frame.data.is_empty() {
3931 frame.offset += len as u64;
3932 space.pending.crypto.push_front(frame);
3933 }
3934 }
3935
3936 if space_id == SpaceId::Data {
3937 self.streams.write_control_frames(
3938 buf,
3939 &mut space.pending,
3940 &mut sent.retransmits,
3941 &mut self.stats.frame_tx,
3942 max_size,
3943 );
3944 }
3945
3946 while buf.len() + 44 < max_size {
3948 let issued = match space.pending.new_cids.pop() {
3949 Some(x) => x,
3950 None => break,
3951 };
3952 trace!(
3953 sequence = issued.sequence,
3954 id = %issued.id,
3955 "NEW_CONNECTION_ID"
3956 );
3957 frame::NewConnectionId {
3958 sequence: issued.sequence,
3959 retire_prior_to: self.local_cid_state.retire_prior_to(),
3960 id: issued.id,
3961 reset_token: issued.reset_token,
3962 }
3963 .encode(buf);
3964 sent.retransmits.get_or_create().new_cids.push(issued);
3965 self.stats.frame_tx.new_connection_id += 1;
3966 }
3967
3968 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3970 let seq = match space.pending.retire_cids.pop() {
3971 Some(x) => x,
3972 None => break,
3973 };
3974 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3975 buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3976 buf.write_var(seq);
3977 sent.retransmits.get_or_create().retire_cids.push(seq);
3978 self.stats.frame_tx.retire_connection_id += 1;
3979 }
3980
3981 let mut sent_datagrams = false;
3983 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3984 match self.datagrams.write(buf, max_size) {
3985 true => {
3986 sent_datagrams = true;
3987 sent.non_retransmits = true;
3988 self.stats.frame_tx.datagram += 1;
3989 }
3990 false => break,
3991 }
3992 }
3993 if self.datagrams.send_blocked && sent_datagrams {
3994 self.events.push_back(Event::DatagramsUnblocked);
3995 self.datagrams.send_blocked = false;
3996 }
3997
3998 while let Some(remote_addr) = space.pending.new_tokens.pop() {
4000 debug_assert_eq!(space_id, SpaceId::Data);
4001 let ConnectionSide::Server { server_config } = &self.side else {
4002 debug_assert!(false, "NEW_TOKEN frames should not be enqueued by clients");
4004 continue;
4005 };
4006
4007 if remote_addr != self.path.remote {
4008 continue;
4013 }
4014
4015 if self.delay_new_token_until_binding && self.peer_id_for_tokens.is_none() {
4018 space.pending.new_tokens.push(remote_addr);
4020 break;
4021 }
4022
4023 let new_token = if let Some(pid) = self.peer_id_for_tokens {
4025 let nonce_u128: u128 = self.rng.r#gen();
4028 let nonce = nonce_u128.to_le_bytes();
4029 let cid = self.rem_cids.active();
4030 let mut pt = Vec::with_capacity(32 + 1 + cid.len() + 16);
4031 pt.extend_from_slice(&pid.0);
4032 pt.push(cid.len() as u8);
4033 pt.extend_from_slice(&cid[..]);
4034 pt.extend_from_slice(&nonce);
4035 let mut tok = pt;
4036 tok.extend_from_slice(&nonce[..12]);
4037 NewToken { token: tok.into() }
4038 } else {
4039 let token = Token::new(
4040 TokenPayload::Validation {
4041 ip: remote_addr.ip(),
4042 issued: server_config.time_source.now(),
4043 },
4044 &mut self.rng,
4045 );
4046 NewToken {
4047 token: token.encode(&*server_config.token_key).into(),
4048 }
4049 };
4050
4051 if buf.len() + new_token.size() >= max_size {
4052 space.pending.new_tokens.push(remote_addr);
4053 break;
4054 }
4055
4056 new_token.encode(buf);
4057 sent.retransmits
4058 .get_or_create()
4059 .new_tokens
4060 .push(remote_addr);
4061 self.stats.frame_tx.new_token += 1;
4062 }
4063
4064 while buf.len() + frame::AddAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4066 let add_address = match space.pending.add_addresses.pop() {
4067 Some(x) => x,
4068 None => break,
4069 };
4070 trace!(
4071 sequence = %add_address.sequence,
4072 address = %add_address.address,
4073 "ADD_ADDRESS"
4074 );
4075 if self.nat_traversal_frame_config.use_rfc_format {
4077 add_address.encode_rfc(buf);
4078 } else {
4079 add_address.encode_legacy(buf);
4080 }
4081 sent.retransmits
4082 .get_or_create()
4083 .add_addresses
4084 .push(add_address);
4085 self.stats.frame_tx.add_address += 1;
4086 }
4087
4088 while buf.len() + frame::PunchMeNow::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4090 let punch_me_now = match space.pending.punch_me_now.pop() {
4091 Some(x) => x,
4092 None => break,
4093 };
4094 trace!(
4095 round = %punch_me_now.round,
4096 paired_with_sequence_number = %punch_me_now.paired_with_sequence_number,
4097 "PUNCH_ME_NOW"
4098 );
4099 if self.nat_traversal_frame_config.use_rfc_format {
4101 punch_me_now.encode_rfc(buf);
4102 } else {
4103 punch_me_now.encode_legacy(buf);
4104 }
4105 sent.retransmits
4106 .get_or_create()
4107 .punch_me_now
4108 .push(punch_me_now);
4109 self.stats.frame_tx.punch_me_now += 1;
4110 }
4111
4112 while buf.len() + frame::RemoveAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4114 let remove_address = match space.pending.remove_addresses.pop() {
4115 Some(x) => x,
4116 None => break,
4117 };
4118 trace!(
4119 sequence = %remove_address.sequence,
4120 "REMOVE_ADDRESS"
4121 );
4122 remove_address.encode(buf);
4124 sent.retransmits
4125 .get_or_create()
4126 .remove_addresses
4127 .push(remove_address);
4128 self.stats.frame_tx.remove_address += 1;
4129 }
4130
4131 while buf.len() + frame::ObservedAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data
4133 {
4134 let observed_address = match space.pending.observed_addresses.pop() {
4135 Some(x) => x,
4136 None => break,
4137 };
4138 trace!(
4139 address = %observed_address.address,
4140 "OBSERVED_ADDRESS"
4141 );
4142 observed_address.encode(buf);
4143 sent.retransmits
4144 .get_or_create()
4145 .observed_addresses
4146 .push(observed_address);
4147 self.stats.frame_tx.observed_address += 1;
4148 }
4149
4150 if space_id == SpaceId::Data {
4152 sent.stream_frames =
4153 self.streams
4154 .write_stream_frames(buf, max_size, self.config.send_fairness);
4155 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
4156 }
4157
4158 sent
4159 }
4160
4161 fn populate_acks(
4166 now: Instant,
4167 receiving_ecn: bool,
4168 sent: &mut SentFrames,
4169 space: &mut PacketSpace,
4170 buf: &mut Vec<u8>,
4171 stats: &mut ConnectionStats,
4172 ) {
4173 debug_assert!(!space.pending_acks.ranges().is_empty());
4174
4175 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
4177 let ecn = if receiving_ecn {
4178 Some(&space.ecn_counters)
4179 } else {
4180 None
4181 };
4182 sent.largest_acked = space.pending_acks.ranges().max();
4183
4184 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
4185
4186 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
4188 let delay = delay_micros >> ack_delay_exp.into_inner();
4189
4190 trace!(
4191 "ACK {:?}, Delay = {}us",
4192 space.pending_acks.ranges(),
4193 delay_micros
4194 );
4195
4196 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
4197 stats.frame_tx.acks += 1;
4198 }
4199
4200 fn close_common(&mut self) {
4201 trace!("connection closed");
4202 for &timer in &Timer::VALUES {
4203 self.timers.stop(timer);
4204 }
4205 }
4206
4207 fn set_close_timer(&mut self, now: Instant) {
4208 self.timers
4209 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
4210 }
4211
4212 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
4214 if Some(self.orig_rem_cid) != params.initial_src_cid
4215 || (self.side.is_client()
4216 && (Some(self.initial_dst_cid) != params.original_dst_cid
4217 || self.retry_src_cid != params.retry_src_cid))
4218 {
4219 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
4220 "CID authentication failure",
4221 ));
4222 }
4223
4224 self.set_peer_params(params);
4225
4226 Ok(())
4227 }
4228
4229 fn set_peer_params(&mut self, params: TransportParameters) {
4230 self.streams.set_params(¶ms);
4231 self.idle_timeout =
4232 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
4233 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
4234 if let Some(ref info) = params.preferred_address {
4235 self.rem_cids.insert(frame::NewConnectionId {
4236 sequence: 1,
4237 id: info.connection_id,
4238 reset_token: info.stateless_reset_token,
4239 retire_prior_to: 0,
4240 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
4241 }
4242 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
4243
4244 self.negotiate_nat_traversal_capability(¶ms);
4246
4247 let local_has_nat_traversal = self.config.nat_traversal_config.is_some();
4250 let local_supports_rfc = local_has_nat_traversal;
4253 self.nat_traversal_frame_config = frame::nat_traversal_unified::NatTraversalFrameConfig {
4254 use_rfc_format: local_supports_rfc && params.supports_rfc_nat_traversal(),
4256 accept_legacy: true,
4258 };
4259
4260 self.negotiate_address_discovery(¶ms);
4262
4263 #[cfg(feature = "pqc")]
4265 {
4266 self.pqc_state.update_from_peer_params(¶ms);
4267
4268 if self.pqc_state.enabled && self.pqc_state.using_pqc {
4270 trace!("PQC enabled, adjusting MTU discovery for larger handshake packets");
4271 let current_mtu = self.path.mtud.current_mtu();
4275 if current_mtu < self.pqc_state.handshake_mtu {
4276 trace!(
4277 "Current MTU {} is less than PQC handshake MTU {}, will rely on MTU discovery",
4278 current_mtu, self.pqc_state.handshake_mtu
4279 );
4280 }
4281 }
4282 }
4283
4284 self.peer_params = params;
4285 self.path.mtud.on_peer_max_udp_payload_size_received(
4286 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
4287 );
4288 }
4289
4290 fn negotiate_nat_traversal_capability(&mut self, params: &TransportParameters) {
4292 let peer_nat_config = match ¶ms.nat_traversal {
4294 Some(config) => config,
4295 None => {
4296 if self.config.nat_traversal_config.is_some() {
4298 debug!(
4299 "Peer does not support NAT traversal, maintaining backward compatibility"
4300 );
4301 self.emit_nat_traversal_capability_event(false);
4302
4303 self.set_nat_traversal_compatibility_mode(false);
4305 }
4306 return;
4307 }
4308 };
4309
4310 let local_nat_config = match &self.config.nat_traversal_config {
4312 Some(config) => config,
4313 None => {
4314 debug!("NAT traversal not enabled locally, ignoring peer support");
4315 self.emit_nat_traversal_capability_event(false);
4316 self.set_nat_traversal_compatibility_mode(false);
4317 return;
4318 }
4319 };
4320
4321 info!("Both peers support NAT traversal, negotiating capabilities");
4323
4324 match self.negotiate_nat_traversal_parameters(local_nat_config, peer_nat_config) {
4326 Ok(negotiated_config) => {
4327 info!("NAT traversal capability negotiated successfully");
4328 self.emit_nat_traversal_capability_event(true);
4329
4330 self.init_nat_traversal_with_negotiated_config(&negotiated_config);
4332
4333 self.set_nat_traversal_compatibility_mode(true);
4335
4336 if matches!(
4338 negotiated_config,
4339 crate::transport_parameters::NatTraversalConfig::ClientSupport
4340 ) {
4341 self.initiate_nat_traversal_process();
4342 }
4343 }
4344 Err(e) => {
4345 warn!("NAT traversal capability negotiation failed: {}", e);
4346 self.emit_nat_traversal_capability_event(false);
4347 self.set_nat_traversal_compatibility_mode(false);
4348 }
4349 }
4350 }
4351
4352 fn emit_nat_traversal_capability_event(&mut self, negotiated: bool) {
4406 if negotiated {
4409 info!("NAT traversal capability successfully negotiated");
4410 } else {
4411 info!("NAT traversal capability not available (peer or local support missing)");
4412 }
4413
4414 }
4417
4418 fn set_nat_traversal_compatibility_mode(&mut self, enabled: bool) {
4420 if enabled {
4421 debug!("NAT traversal enabled for this connection");
4422 } else {
4424 debug!("NAT traversal disabled for this connection (backward compatibility mode)");
4425 if self.nat_traversal.is_some() {
4427 warn!("Clearing NAT traversal state due to compatibility mode");
4428 self.nat_traversal = None;
4429 }
4430 }
4431 }
4432
4433 fn negotiate_nat_traversal_parameters(
4435 &self,
4436 local_config: &crate::transport_parameters::NatTraversalConfig,
4437 peer_config: &crate::transport_parameters::NatTraversalConfig,
4438 ) -> Result<crate::transport_parameters::NatTraversalConfig, String> {
4439 match (local_config, peer_config) {
4444 (
4446 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4447 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4448 concurrency_limit,
4449 },
4450 ) => Ok(
4451 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4452 concurrency_limit: *concurrency_limit,
4453 },
4454 ),
4455 (
4457 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4458 concurrency_limit,
4459 },
4460 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4461 ) => Ok(
4462 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4463 concurrency_limit: *concurrency_limit,
4464 },
4465 ),
4466 (
4468 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4469 concurrency_limit: limit1,
4470 },
4471 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4472 concurrency_limit: limit2,
4473 },
4474 ) => Ok(
4475 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4476 concurrency_limit: (*limit1).min(*limit2),
4477 },
4478 ),
4479 (
4481 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4482 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4483 ) => Err("Both endpoints claim to be NAT traversal clients".to_string()),
4484 }
4485 }
4486
4487 fn init_nat_traversal_with_negotiated_config(
4489 &mut self,
4490 config: &crate::transport_parameters::NatTraversalConfig,
4491 ) {
4492 let (role, _concurrency_limit) = match config {
4495 crate::transport_parameters::NatTraversalConfig::ClientSupport => {
4496 (NatTraversalRole::Client, 10) }
4499 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4500 concurrency_limit,
4501 } => {
4502 (
4504 NatTraversalRole::Server { can_relay: false },
4505 concurrency_limit.into_inner() as u32,
4506 )
4507 }
4508 };
4509
4510 let max_candidates = 50; let coordination_timeout = Duration::from_secs(10); self.nat_traversal = Some(NatTraversalState::new(
4516 role,
4517 max_candidates,
4518 coordination_timeout,
4519 ));
4520
4521 trace!(
4522 "NAT traversal initialized with negotiated config: role={:?}",
4523 role
4524 );
4525
4526 match role {
4528 NatTraversalRole::Bootstrap => {
4529 self.prepare_address_observation();
4531 }
4532 NatTraversalRole::Client => {
4533 self.schedule_candidate_discovery();
4535 }
4536 NatTraversalRole::Server { .. } => {
4537 self.prepare_coordination_handling();
4539 }
4540 }
4541 }
4542
4543 fn initiate_nat_traversal_process(&mut self) {
4545 if let Some(nat_state) = &mut self.nat_traversal {
4546 match nat_state.start_candidate_discovery() {
4547 Ok(()) => {
4548 debug!("NAT traversal process initiated - candidate discovery started");
4549 self.timers.set(
4551 Timer::NatTraversal,
4552 Instant::now() + Duration::from_millis(100),
4553 );
4554 }
4555 Err(e) => {
4556 warn!("Failed to initiate NAT traversal process: {}", e);
4557 }
4558 }
4559 }
4560 }
4561
4562 fn prepare_address_observation(&mut self) {
4564 debug!("Preparing for address observation as bootstrap node");
4565 }
4568
4569 fn schedule_candidate_discovery(&mut self) {
4571 debug!("Scheduling candidate discovery for client endpoint");
4572 self.timers.set(
4574 Timer::NatTraversal,
4575 Instant::now() + Duration::from_millis(50),
4576 );
4577 }
4578
4579 fn prepare_coordination_handling(&mut self) {
4581 debug!("Preparing to handle coordination requests as server endpoint");
4582 }
4585
4586 fn handle_nat_traversal_timeout(&mut self, now: Instant) {
4588 let timeout_result = if let Some(nat_state) = &mut self.nat_traversal {
4590 nat_state.handle_timeout(now)
4591 } else {
4592 return;
4593 };
4594
4595 match timeout_result {
4597 Ok(actions) => {
4598 for action in actions {
4599 match action {
4600 nat_traversal::TimeoutAction::RetryDiscovery => {
4601 debug!("NAT traversal timeout: retrying candidate discovery");
4602 if let Some(nat_state) = &mut self.nat_traversal {
4603 if let Err(e) = nat_state.start_candidate_discovery() {
4604 warn!("Failed to retry candidate discovery: {}", e);
4605 }
4606 }
4607 }
4608 nat_traversal::TimeoutAction::RetryCoordination => {
4609 debug!("NAT traversal timeout: retrying coordination");
4610 self.timers
4612 .set(Timer::NatTraversal, now + Duration::from_secs(2));
4613 }
4614 nat_traversal::TimeoutAction::StartValidation => {
4615 debug!("NAT traversal timeout: starting path validation");
4616 self.start_nat_traversal_validation(now);
4617 }
4618 nat_traversal::TimeoutAction::Complete => {
4619 debug!("NAT traversal completed successfully");
4620 self.timers.stop(Timer::NatTraversal);
4622 }
4623 nat_traversal::TimeoutAction::Failed => {
4624 warn!("NAT traversal failed after timeout");
4625 self.handle_nat_traversal_failure();
4627 }
4628 }
4629 }
4630 }
4631 Err(e) => {
4632 warn!("NAT traversal timeout handling failed: {}", e);
4633 self.handle_nat_traversal_failure();
4634 }
4635 }
4636 }
4637
4638 fn start_nat_traversal_validation(&mut self, now: Instant) {
4640 if let Some(nat_state) = &mut self.nat_traversal {
4641 let pairs = nat_state.get_next_validation_pairs(3);
4643
4644 for pair in pairs {
4645 let challenge = self.rng.r#gen();
4647 self.path.challenge = Some(challenge);
4648 self.path.challenge_pending = true;
4649
4650 debug!(
4651 "Starting path validation for NAT traversal candidate: {}",
4652 pair.remote_addr
4653 );
4654 }
4655
4656 self.timers
4658 .set(Timer::PathValidation, now + Duration::from_secs(3));
4659 }
4660 }
4661
4662 fn handle_nat_traversal_failure(&mut self) {
4664 warn!("NAT traversal failed, considering fallback options");
4665
4666 self.nat_traversal = None;
4668 self.timers.stop(Timer::NatTraversal);
4669
4670 debug!("NAT traversal disabled for this connection due to failure");
4677 }
4678
4679 pub fn nat_traversal_supported(&self) -> bool {
4681 self.nat_traversal.is_some()
4682 && self.config.nat_traversal_config.is_some()
4683 && self.peer_params.nat_traversal.is_some()
4684 }
4685
4686 pub fn nat_traversal_config(&self) -> Option<&crate::transport_parameters::NatTraversalConfig> {
4688 self.peer_params.nat_traversal.as_ref()
4689 }
4690
4691 pub fn nat_traversal_ready(&self) -> bool {
4693 self.nat_traversal_supported() && matches!(self.state, State::Established)
4694 }
4695
4696 #[allow(dead_code)]
4701 pub(crate) fn nat_traversal_stats(&self) -> Option<nat_traversal::NatTraversalStats> {
4702 self.nat_traversal.as_ref().map(|state| state.stats.clone())
4703 }
4704
4705 #[cfg(test)]
4707 #[allow(dead_code)]
4708 pub(crate) fn force_enable_nat_traversal(&mut self, role: NatTraversalRole) {
4709 use crate::transport_parameters::NatTraversalConfig;
4710
4711 let config = match role {
4713 NatTraversalRole::Client => NatTraversalConfig::ClientSupport,
4714 NatTraversalRole::Server { .. } | NatTraversalRole::Bootstrap => {
4715 NatTraversalConfig::ServerSupport {
4716 concurrency_limit: VarInt::from_u32(5),
4717 }
4718 }
4719 };
4720
4721 self.peer_params.nat_traversal = Some(config.clone());
4722 self.config = Arc::new({
4723 let mut transport_config = (*self.config).clone();
4724 transport_config.nat_traversal_config = Some(config);
4725 transport_config
4726 });
4727
4728 self.nat_traversal = Some(NatTraversalState::new(role, 8, Duration::from_secs(10)));
4729 }
4730
4731 fn derive_peer_id_from_connection(&self) -> [u8; 32] {
4734 let mut hasher = std::collections::hash_map::DefaultHasher::new();
4736 use std::hash::Hasher;
4737 hasher.write(&self.rem_handshake_cid);
4738 hasher.write(&self.handshake_cid);
4739 hasher.write(&self.path.remote.to_string().into_bytes());
4740 let hash = hasher.finish();
4741 let mut peer_id = [0u8; 32];
4742 peer_id[..8].copy_from_slice(&hash.to_be_bytes());
4743 let cid_bytes = self.rem_handshake_cid.as_ref();
4745 let copy_len = (cid_bytes.len()).min(24);
4746 peer_id[8..8 + copy_len].copy_from_slice(&cid_bytes[..copy_len]);
4747 peer_id
4748 }
4749
4750 fn handle_add_address(
4752 &mut self,
4753 add_address: &crate::frame::AddAddress,
4754 now: Instant,
4755 ) -> Result<(), TransportError> {
4756 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4757 TransportError::PROTOCOL_VIOLATION("AddAddress frame without NAT traversal negotiation")
4758 })?;
4759
4760 match nat_state.add_remote_candidate(
4761 add_address.sequence,
4762 add_address.address,
4763 add_address.priority,
4764 now,
4765 ) {
4766 Ok(()) => {
4767 trace!(
4768 "Added remote candidate: {} (seq={}, priority={})",
4769 add_address.address, add_address.sequence, add_address.priority
4770 );
4771
4772 self.trigger_candidate_validation(add_address.address, now)?;
4774 Ok(())
4775 }
4776 Err(NatTraversalError::TooManyCandidates) => Err(TransportError::PROTOCOL_VIOLATION(
4777 "too many NAT traversal candidates",
4778 )),
4779 Err(NatTraversalError::DuplicateAddress) => {
4780 Ok(())
4782 }
4783 Err(e) => {
4784 warn!("Failed to add remote candidate: {}", e);
4785 Ok(()) }
4787 }
4788 }
4789
4790 fn handle_punch_me_now(
4792 &mut self,
4793 punch_me_now: &crate::frame::PunchMeNow,
4794 now: Instant,
4795 ) -> Result<(), TransportError> {
4796 trace!(
4797 "Received PunchMeNow: round={}, target_seq={}, local_addr={}",
4798 punch_me_now.round, punch_me_now.paired_with_sequence_number, punch_me_now.address
4799 );
4800
4801 if let Some(nat_state) = &self.nat_traversal {
4803 if matches!(nat_state.role, NatTraversalRole::Bootstrap) {
4804 let from_peer_id = self.derive_peer_id_from_connection();
4806
4807 let punch_me_now_clone = punch_me_now.clone();
4809 drop(nat_state); match self
4812 .nat_traversal
4813 .as_mut()
4814 .unwrap()
4815 .handle_punch_me_now_frame(
4816 from_peer_id,
4817 self.path.remote,
4818 &punch_me_now_clone,
4819 now,
4820 ) {
4821 Ok(Some(coordination_frame)) => {
4822 trace!("Bootstrap node coordinating PUNCH_ME_NOW between peers");
4823
4824 if let Some(target_peer_id) = punch_me_now.target_peer_id {
4826 self.endpoint_events.push_back(
4827 crate::shared::EndpointEventInner::RelayPunchMeNow(
4828 target_peer_id,
4829 coordination_frame,
4830 ),
4831 );
4832 }
4833
4834 return Ok(());
4835 }
4836 Ok(None) => {
4837 trace!("Bootstrap coordination completed or no action needed");
4838 return Ok(());
4839 }
4840 Err(e) => {
4841 warn!("Bootstrap coordination failed: {}", e);
4842 return Ok(());
4843 }
4844 }
4845 }
4846 }
4847
4848 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4850 TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation")
4851 })?;
4852
4853 if nat_state
4855 .handle_peer_punch_request(punch_me_now.round, now)
4856 .map_err(|_e| {
4857 TransportError::PROTOCOL_VIOLATION("Failed to handle peer punch request")
4858 })?
4859 {
4860 trace!("Coordination synchronized for round {}", punch_me_now.round);
4861
4862 let _local_addr = self
4865 .local_ip
4866 .map(|ip| SocketAddr::new(ip, 0))
4867 .unwrap_or_else(|| {
4868 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
4869 });
4870
4871 let target = nat_traversal::PunchTarget {
4872 remote_addr: punch_me_now.address,
4873 remote_sequence: punch_me_now.paired_with_sequence_number,
4874 challenge: self.rng.r#gen(),
4875 };
4876
4877 let _ = nat_state.start_coordination_round(vec![target], now);
4879 } else {
4880 debug!(
4881 "Failed to synchronize coordination for round {}",
4882 punch_me_now.round
4883 );
4884 }
4885
4886 Ok(())
4887 }
4888
4889 fn handle_remove_address(
4891 &mut self,
4892 remove_address: &crate::frame::RemoveAddress,
4893 ) -> Result<(), TransportError> {
4894 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4895 TransportError::PROTOCOL_VIOLATION(
4896 "RemoveAddress frame without NAT traversal negotiation",
4897 )
4898 })?;
4899
4900 if nat_state.remove_candidate(remove_address.sequence) {
4901 trace!(
4902 "Removed candidate with sequence {}",
4903 remove_address.sequence
4904 );
4905 } else {
4906 trace!(
4907 "Attempted to remove unknown candidate sequence {}",
4908 remove_address.sequence
4909 );
4910 }
4911
4912 Ok(())
4913 }
4914
4915 fn handle_observed_address_frame(
4917 &mut self,
4918 observed_address: &crate::frame::ObservedAddress,
4919 now: Instant,
4920 ) -> Result<(), TransportError> {
4921 let state = self.address_discovery_state.as_mut().ok_or_else(|| {
4923 TransportError::PROTOCOL_VIOLATION(
4924 "ObservedAddress frame without address discovery negotiation",
4925 )
4926 })?;
4927
4928 if !state.enabled {
4930 return Err(TransportError::PROTOCOL_VIOLATION(
4931 "ObservedAddress frame received when address discovery is disabled",
4932 ));
4933 }
4934
4935 #[cfg(feature = "trace")]
4937 {
4938 use crate::trace_observed_address_received;
4939 trace_observed_address_received!(
4941 &self.event_log,
4942 self.trace_context.trace_id(),
4943 observed_address.address,
4944 0u64 );
4946 }
4947
4948 let path_id = 0u64; if let Some(&last_seq) = state.last_received_sequence.get(&path_id) {
4956 if observed_address.sequence_number <= last_seq {
4957 trace!(
4958 "Ignoring OBSERVED_ADDRESS frame with stale sequence number {} (last was {})",
4959 observed_address.sequence_number, last_seq
4960 );
4961 return Ok(());
4962 }
4963 }
4964
4965 state
4967 .last_received_sequence
4968 .insert(path_id, observed_address.sequence_number);
4969
4970 state.handle_observed_address(observed_address.address, path_id, now);
4972
4973 self.path
4975 .update_observed_address(observed_address.address, now);
4976
4977 trace!(
4979 "Received ObservedAddress frame: address={} for path={}",
4980 observed_address.address, path_id
4981 );
4982
4983 Ok(())
4984 }
4985
4986 pub fn queue_add_address(&mut self, sequence: VarInt, address: SocketAddr, priority: VarInt) {
4988 let add_address = frame::AddAddress {
4990 sequence,
4991 address,
4992 priority,
4993 };
4994
4995 self.spaces[SpaceId::Data]
4996 .pending
4997 .add_addresses
4998 .push(add_address);
4999 trace!(
5000 "Queued AddAddress frame: seq={}, addr={}, priority={}",
5001 sequence, address, priority
5002 );
5003 }
5004
5005 pub fn queue_punch_me_now(
5007 &mut self,
5008 round: VarInt,
5009 paired_with_sequence_number: VarInt,
5010 address: SocketAddr,
5011 ) {
5012 let punch_me_now = frame::PunchMeNow {
5013 round,
5014 paired_with_sequence_number,
5015 address,
5016 target_peer_id: None, };
5018
5019 self.spaces[SpaceId::Data]
5020 .pending
5021 .punch_me_now
5022 .push(punch_me_now);
5023 trace!(
5024 "Queued PunchMeNow frame: round={}, target={}",
5025 round, paired_with_sequence_number
5026 );
5027 }
5028
5029 pub fn queue_remove_address(&mut self, sequence: VarInt) {
5031 let remove_address = frame::RemoveAddress { sequence };
5032
5033 self.spaces[SpaceId::Data]
5034 .pending
5035 .remove_addresses
5036 .push(remove_address);
5037 trace!("Queued RemoveAddress frame: seq={}", sequence);
5038 }
5039
5040 pub fn queue_observed_address(&mut self, address: SocketAddr) {
5042 let sequence_number = if let Some(state) = &mut self.address_discovery_state {
5044 let seq = state.next_sequence_number;
5045 state.next_sequence_number =
5046 VarInt::from_u64(state.next_sequence_number.into_inner() + 1)
5047 .expect("sequence number overflow");
5048 seq
5049 } else {
5050 VarInt::from_u32(0)
5052 };
5053
5054 let observed_address = frame::ObservedAddress {
5055 sequence_number,
5056 address,
5057 };
5058 self.spaces[SpaceId::Data]
5059 .pending
5060 .observed_addresses
5061 .push(observed_address);
5062 trace!("Queued ObservedAddress frame: addr={}", address);
5063 }
5064
5065 pub fn check_for_address_observations(&mut self, now: Instant) {
5067 let Some(state) = &mut self.address_discovery_state else {
5069 return;
5070 };
5071
5072 if !state.enabled {
5074 return;
5075 }
5076
5077 let path_id = 0u64; let remote_address = self.path.remote;
5082
5083 if state.should_send_observation(path_id, now) {
5085 if let Some(frame) = state.queue_observed_address_frame(path_id, remote_address) {
5087 self.spaces[SpaceId::Data]
5089 .pending
5090 .observed_addresses
5091 .push(frame);
5092
5093 state.record_observation_sent(path_id);
5095
5096 #[cfg(feature = "trace")]
5098 {
5099 use crate::trace_observed_address_sent;
5100 trace_observed_address_sent!(
5102 &self.event_log,
5103 self.trace_context.trace_id(),
5104 remote_address,
5105 path_id
5106 );
5107 }
5108
5109 trace!(
5110 "Queued OBSERVED_ADDRESS frame for path {} with address {}",
5111 path_id, remote_address
5112 );
5113 }
5114 }
5115 }
5116
5117 fn trigger_candidate_validation(
5119 &mut self,
5120 candidate_address: SocketAddr,
5121 now: Instant,
5122 ) -> Result<(), TransportError> {
5123 let nat_state = self
5124 .nat_traversal
5125 .as_mut()
5126 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5127
5128 if nat_state
5130 .active_validations
5131 .contains_key(&candidate_address)
5132 {
5133 trace!("Validation already in progress for {}", candidate_address);
5134 return Ok(());
5135 }
5136
5137 let challenge = self.rng.r#gen::<u64>();
5139
5140 let validation_state = nat_traversal::PathValidationState {
5142 challenge,
5143 sent_at: now,
5144 retry_count: 0,
5145 max_retries: 3,
5146 coordination_round: None,
5147 timeout_state: nat_traversal::AdaptiveTimeoutState::new(),
5148 last_retry_at: None,
5149 };
5150
5151 nat_state
5153 .active_validations
5154 .insert(candidate_address, validation_state);
5155
5156 self.nat_traversal_challenges
5158 .push(candidate_address, challenge);
5159
5160 nat_state.stats.validations_succeeded += 1; trace!(
5164 "Triggered PATH_CHALLENGE validation for {} with challenge {:016x}",
5165 candidate_address, challenge
5166 );
5167
5168 Ok(())
5169 }
5170
5171 pub fn nat_traversal_state(&self) -> Option<(NatTraversalRole, usize, usize)> {
5173 self.nat_traversal.as_ref().map(|state| {
5174 (
5175 state.role,
5176 state.local_candidates.len(),
5177 state.remote_candidates.len(),
5178 )
5179 })
5180 }
5181
5182 pub fn initiate_nat_traversal_coordination(
5184 &mut self,
5185 now: Instant,
5186 ) -> Result<(), TransportError> {
5187 let nat_state = self
5188 .nat_traversal
5189 .as_mut()
5190 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5191
5192 if nat_state.should_send_punch_request() {
5194 nat_state.generate_candidate_pairs(now);
5196
5197 let pairs = nat_state.get_next_validation_pairs(3);
5199 if pairs.is_empty() {
5200 return Err(TransportError::PROTOCOL_VIOLATION(
5201 "No candidate pairs for coordination",
5202 ));
5203 }
5204
5205 let targets: Vec<_> = pairs
5207 .into_iter()
5208 .map(|pair| nat_traversal::PunchTarget {
5209 remote_addr: pair.remote_addr,
5210 remote_sequence: pair.remote_sequence,
5211 challenge: self.rng.r#gen(),
5212 })
5213 .collect();
5214
5215 let round = nat_state
5217 .start_coordination_round(targets, now)
5218 .map_err(|_e| {
5219 TransportError::PROTOCOL_VIOLATION("Failed to start coordination round")
5220 })?;
5221
5222 let local_addr = self
5225 .local_ip
5226 .map(|ip| SocketAddr::new(ip, self.local_ip.map(|_| 0).unwrap_or(0)))
5227 .unwrap_or_else(|| {
5228 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
5229 });
5230
5231 let punch_me_now = frame::PunchMeNow {
5232 round,
5233 paired_with_sequence_number: VarInt::from_u32(0), address: local_addr,
5235 target_peer_id: None, };
5237
5238 self.spaces[SpaceId::Data]
5239 .pending
5240 .punch_me_now
5241 .push(punch_me_now);
5242 nat_state.mark_punch_request_sent();
5243
5244 trace!("Initiated NAT traversal coordination round {}", round);
5245 }
5246
5247 Ok(())
5248 }
5249
5250 pub fn validate_nat_candidates(&mut self, now: Instant) {
5252 self.generate_nat_traversal_challenges(now);
5253 }
5254
5255 pub fn send_nat_address_advertisement(
5270 &mut self,
5271 address: SocketAddr,
5272 priority: u32,
5273 ) -> Result<u64, ConnectionError> {
5274 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5276 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5277 "NAT traversal not enabled on this connection",
5278 ))
5279 })?;
5280
5281 let sequence = nat_state.next_sequence;
5283 nat_state.next_sequence =
5284 VarInt::from_u64(nat_state.next_sequence.into_inner() + 1).unwrap();
5285
5286 let now = Instant::now();
5288 nat_state.local_candidates.insert(
5289 sequence,
5290 nat_traversal::AddressCandidate {
5291 address,
5292 priority,
5293 source: nat_traversal::CandidateSource::Local,
5294 discovered_at: now,
5295 state: nat_traversal::CandidateState::New,
5296 attempt_count: 0,
5297 last_attempt: None,
5298 },
5299 );
5300
5301 nat_state.stats.local_candidates_sent += 1;
5303
5304 self.queue_add_address(sequence, address, VarInt::from_u32(priority));
5306
5307 debug!(
5308 "Queued ADD_ADDRESS frame: addr={}, priority={}, seq={}",
5309 address, priority, sequence
5310 );
5311 Ok(sequence.into_inner())
5312 }
5313
5314 pub fn send_nat_punch_coordination(
5327 &mut self,
5328 paired_with_sequence_number: u64,
5329 address: SocketAddr,
5330 round: u32,
5331 ) -> Result<(), ConnectionError> {
5332 let _nat_state = self.nat_traversal.as_ref().ok_or_else(|| {
5334 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5335 "NAT traversal not enabled on this connection",
5336 ))
5337 })?;
5338
5339 self.queue_punch_me_now(
5341 VarInt::from_u32(round),
5342 VarInt::from_u64(paired_with_sequence_number).map_err(|_| {
5343 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5344 "Invalid target sequence number",
5345 ))
5346 })?,
5347 address,
5348 );
5349
5350 debug!(
5351 "Queued PUNCH_ME_NOW frame: paired_with_seq={}, addr={}, round={}",
5352 paired_with_sequence_number, address, round
5353 );
5354 Ok(())
5355 }
5356
5357 pub fn send_nat_address_removal(&mut self, sequence: u64) -> Result<(), ConnectionError> {
5368 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5370 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5371 "NAT traversal not enabled on this connection",
5372 ))
5373 })?;
5374
5375 let sequence_varint = VarInt::from_u64(sequence).map_err(|_| {
5376 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5377 "Invalid sequence number",
5378 ))
5379 })?;
5380
5381 nat_state.local_candidates.remove(&sequence_varint);
5383
5384 self.queue_remove_address(sequence_varint);
5386
5387 debug!("Queued REMOVE_ADDRESS frame: seq={}", sequence);
5388 Ok(())
5389 }
5390
5391 #[allow(dead_code)]
5400 pub(crate) fn get_nat_traversal_stats(&self) -> Option<&nat_traversal::NatTraversalStats> {
5401 self.nat_traversal.as_ref().map(|state| &state.stats)
5402 }
5403
5404 pub fn is_nat_traversal_enabled(&self) -> bool {
5406 self.nat_traversal.is_some()
5407 }
5408
5409 pub fn get_nat_traversal_role(&self) -> Option<NatTraversalRole> {
5411 self.nat_traversal.as_ref().map(|state| state.role)
5412 }
5413
5414 fn negotiate_address_discovery(&mut self, peer_params: &TransportParameters) {
5416 let now = Instant::now();
5417
5418 match &peer_params.address_discovery {
5420 Some(peer_config) => {
5421 if let Some(state) = &mut self.address_discovery_state {
5423 if state.enabled {
5424 debug!(
5427 "Address discovery negotiated: rate={}, all_paths={}",
5428 state.max_observation_rate, state.observe_all_paths
5429 );
5430 } else {
5431 debug!("Address discovery disabled locally, ignoring peer support");
5433 }
5434 } else {
5435 self.address_discovery_state =
5437 Some(AddressDiscoveryState::new(peer_config, now));
5438 debug!("Address discovery initialized from peer config");
5439 }
5440 }
5441 _ => {
5442 if let Some(state) = &mut self.address_discovery_state {
5444 state.enabled = false;
5445 debug!("Address discovery disabled - peer doesn't support it");
5446 }
5447 }
5448 }
5449
5450 if let Some(state) = &self.address_discovery_state {
5452 if state.enabled {
5453 self.path.set_observation_rate(state.max_observation_rate);
5454 }
5455 }
5456 }
5457
5458 fn decrypt_packet(
5459 &mut self,
5460 now: Instant,
5461 packet: &mut Packet,
5462 ) -> Result<Option<u64>, Option<TransportError>> {
5463 let result = packet_crypto::decrypt_packet_body(
5464 packet,
5465 &self.spaces,
5466 self.zero_rtt_crypto.as_ref(),
5467 self.key_phase,
5468 self.prev_crypto.as_ref(),
5469 self.next_crypto.as_ref(),
5470 )?;
5471
5472 let result = match result {
5473 Some(r) => r,
5474 None => return Ok(None),
5475 };
5476
5477 if result.outgoing_key_update_acked {
5478 if let Some(prev) = self.prev_crypto.as_mut() {
5479 prev.end_packet = Some((result.number, now));
5480 self.set_key_discard_timer(now, packet.header.space());
5481 }
5482 }
5483
5484 if result.incoming_key_update {
5485 trace!("key update authenticated");
5486 self.update_keys(Some((result.number, now)), true);
5487 self.set_key_discard_timer(now, packet.header.space());
5488 }
5489
5490 Ok(Some(result.number))
5491 }
5492
5493 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5494 trace!("executing key update");
5495 let new = self
5499 .crypto
5500 .next_1rtt_keys()
5501 .expect("only called for `Data` packets");
5502 self.key_phase_size = new
5503 .local
5504 .confidentiality_limit()
5505 .saturating_sub(KEY_UPDATE_MARGIN);
5506 let old = mem::replace(
5507 &mut self.spaces[SpaceId::Data]
5508 .crypto
5509 .as_mut()
5510 .unwrap() .packet,
5512 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5513 );
5514 self.spaces[SpaceId::Data].sent_with_keys = 0;
5515 self.prev_crypto = Some(PrevCrypto {
5516 crypto: old,
5517 end_packet,
5518 update_unacked: remote,
5519 });
5520 self.key_phase = !self.key_phase;
5521 }
5522
5523 fn peer_supports_ack_frequency(&self) -> bool {
5524 self.peer_params.min_ack_delay.is_some()
5525 }
5526
5527 pub(crate) fn immediate_ack(&mut self) {
5532 self.spaces[self.highest_space].immediate_ack_pending = true;
5533 }
5534
5535 #[cfg(test)]
5537 #[allow(dead_code)]
5538 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5539 let (first_decode, remaining) = match &event.0 {
5540 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5541 first_decode,
5542 remaining,
5543 ..
5544 }) => (first_decode, remaining),
5545 _ => return None,
5546 };
5547
5548 if remaining.is_some() {
5549 panic!("Packets should never be coalesced in tests");
5550 }
5551
5552 let decrypted_header = packet_crypto::unprotect_header(
5553 first_decode.clone(),
5554 &self.spaces,
5555 self.zero_rtt_crypto.as_ref(),
5556 self.peer_params.stateless_reset_token,
5557 )?;
5558
5559 let mut packet = decrypted_header.packet?;
5560 packet_crypto::decrypt_packet_body(
5561 &mut packet,
5562 &self.spaces,
5563 self.zero_rtt_crypto.as_ref(),
5564 self.key_phase,
5565 self.prev_crypto.as_ref(),
5566 self.next_crypto.as_ref(),
5567 )
5568 .ok()?;
5569
5570 Some(packet.payload.to_vec())
5571 }
5572
5573 #[cfg(test)]
5576 #[allow(dead_code)]
5577 pub(crate) fn bytes_in_flight(&self) -> u64 {
5578 self.path.in_flight.bytes
5579 }
5580
5581 #[cfg(test)]
5583 #[allow(dead_code)]
5584 pub(crate) fn congestion_window(&self) -> u64 {
5585 self.path
5586 .congestion
5587 .window()
5588 .saturating_sub(self.path.in_flight.bytes)
5589 }
5590
5591 #[cfg(test)]
5593 #[allow(dead_code)]
5594 pub(crate) fn is_idle(&self) -> bool {
5595 Timer::VALUES
5596 .iter()
5597 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
5598 .filter_map(|&t| Some((t, self.timers.get(t)?)))
5599 .min_by_key(|&(_, time)| time)
5600 .is_none_or(|(timer, _)| timer == Timer::Idle)
5601 }
5602
5603 #[cfg(test)]
5605 #[allow(dead_code)]
5606 pub(crate) fn lost_packets(&self) -> u64 {
5607 self.lost_packets
5608 }
5609
5610 #[cfg(test)]
5612 #[allow(dead_code)]
5613 pub(crate) fn using_ecn(&self) -> bool {
5614 self.path.sending_ecn
5615 }
5616
5617 #[cfg(test)]
5619 #[allow(dead_code)]
5620 pub(crate) fn total_recvd(&self) -> u64 {
5621 self.path.total_recvd
5622 }
5623
5624 #[cfg(test)]
5625 #[allow(dead_code)]
5626 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
5627 self.local_cid_state.active_seq()
5628 }
5629
5630 #[cfg(test)]
5633 #[allow(dead_code)]
5634 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
5635 let n = self.local_cid_state.assign_retire_seq(v);
5636 self.endpoint_events
5637 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
5638 }
5639
5640 #[cfg(test)]
5642 #[allow(dead_code)]
5643 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
5644 self.rem_cids.active_seq()
5645 }
5646
5647 #[cfg(test)]
5649 #[cfg(test)]
5650 #[allow(dead_code)]
5651 pub(crate) fn path_mtu(&self) -> u16 {
5652 self.path.current_mtu()
5653 }
5654
5655 fn can_send_1rtt(&self, max_size: usize) -> bool {
5659 self.streams.can_send_stream_data()
5660 || self.path.challenge_pending
5661 || self
5662 .prev_path
5663 .as_ref()
5664 .is_some_and(|(_, x)| x.challenge_pending)
5665 || !self.path_responses.is_empty()
5666 || !self.nat_traversal_challenges.is_empty()
5667 || self
5668 .datagrams
5669 .outgoing
5670 .front()
5671 .is_some_and(|x| x.size(true) <= max_size)
5672 }
5673
5674 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
5676 for path in [&mut self.path]
5678 .into_iter()
5679 .chain(self.prev_path.as_mut().map(|(_, data)| data))
5680 {
5681 if path.remove_in_flight(pn, packet) {
5682 return;
5683 }
5684 }
5685 }
5686
5687 fn kill(&mut self, reason: ConnectionError) {
5689 self.close_common();
5690 self.error = Some(reason);
5691 self.state = State::Drained;
5692 self.endpoint_events.push_back(EndpointEventInner::Drained);
5693 }
5694
5695 fn generate_nat_traversal_challenges(&mut self, now: Instant) {
5697 let candidates: Vec<(VarInt, SocketAddr)> = if let Some(nat_state) = &self.nat_traversal {
5699 nat_state
5700 .get_validation_candidates()
5701 .into_iter()
5702 .take(3) .map(|(seq, candidate)| (seq, candidate.address))
5704 .collect()
5705 } else {
5706 return;
5707 };
5708
5709 if candidates.is_empty() {
5710 return;
5711 }
5712
5713 if let Some(nat_state) = &mut self.nat_traversal {
5715 for (seq, address) in candidates {
5716 let challenge: u64 = self.rng.r#gen();
5718
5719 if let Err(e) = nat_state.start_validation(seq, challenge, now) {
5721 debug!("Failed to start validation for candidate {}: {}", seq, e);
5722 continue;
5723 }
5724
5725 self.nat_traversal_challenges.push(address, challenge);
5727 trace!(
5728 "Queuing NAT validation PATH_CHALLENGE for {} with token {:08x}",
5729 address, challenge
5730 );
5731 }
5732 }
5733 }
5734
5735 pub fn current_mtu(&self) -> u16 {
5739 self.path.current_mtu()
5740 }
5741
5742 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
5749 let pn_len = match pn {
5750 Some(pn) => PacketNumber::new(
5751 pn,
5752 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
5753 )
5754 .len(),
5755 None => 4,
5757 };
5758
5759 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
5761 }
5762
5763 fn tag_len_1rtt(&self) -> usize {
5764 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
5765 Some(crypto) => Some(&*crypto.packet.local),
5766 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
5767 };
5768 key.map_or(16, |x| x.tag_len())
5772 }
5773
5774 fn on_path_validated(&mut self) {
5776 self.path.validated = true;
5777 let ConnectionSide::Server { server_config } = &self.side else {
5778 return;
5779 };
5780 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
5781 new_tokens.clear();
5782 for _ in 0..server_config.validation_token.sent {
5783 new_tokens.push(self.path.remote);
5784 }
5785 }
5786}
5787
5788impl fmt::Debug for Connection {
5789 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5790 f.debug_struct("Connection")
5791 .field("handshake_cid", &self.handshake_cid)
5792 .finish()
5793 }
5794}
5795
5796enum ConnectionSide {
5798 Client {
5799 token: Bytes,
5801 token_store: Arc<dyn TokenStore>,
5802 server_name: String,
5803 },
5804 Server {
5805 server_config: Arc<ServerConfig>,
5806 },
5807}
5808
5809impl ConnectionSide {
5810 fn remote_may_migrate(&self) -> bool {
5811 match self {
5812 Self::Server { server_config } => server_config.migration,
5813 Self::Client { .. } => false,
5814 }
5815 }
5816
5817 fn is_client(&self) -> bool {
5818 self.side().is_client()
5819 }
5820
5821 fn is_server(&self) -> bool {
5822 self.side().is_server()
5823 }
5824
5825 fn side(&self) -> Side {
5826 match *self {
5827 Self::Client { .. } => Side::Client,
5828 Self::Server { .. } => Side::Server,
5829 }
5830 }
5831}
5832
5833impl From<SideArgs> for ConnectionSide {
5834 fn from(side: SideArgs) -> Self {
5835 match side {
5836 SideArgs::Client {
5837 token_store,
5838 server_name,
5839 } => Self::Client {
5840 token: token_store.take(&server_name).unwrap_or_default(),
5841 token_store,
5842 server_name,
5843 },
5844 SideArgs::Server {
5845 server_config,
5846 pref_addr_cid: _,
5847 path_validated: _,
5848 } => Self::Server { server_config },
5849 }
5850 }
5851}
5852
5853pub(crate) enum SideArgs {
5855 Client {
5856 token_store: Arc<dyn TokenStore>,
5857 server_name: String,
5858 },
5859 Server {
5860 server_config: Arc<ServerConfig>,
5861 pref_addr_cid: Option<ConnectionId>,
5862 path_validated: bool,
5863 },
5864}
5865
5866impl SideArgs {
5867 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
5868 match *self {
5869 Self::Client { .. } => None,
5870 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
5871 }
5872 }
5873
5874 pub(crate) fn path_validated(&self) -> bool {
5875 match *self {
5876 Self::Client { .. } => true,
5877 Self::Server { path_validated, .. } => path_validated,
5878 }
5879 }
5880
5881 pub(crate) fn side(&self) -> Side {
5882 match *self {
5883 Self::Client { .. } => Side::Client,
5884 Self::Server { .. } => Side::Server,
5885 }
5886 }
5887}
5888
5889#[derive(Debug, Error, Clone, PartialEq, Eq)]
5891pub enum ConnectionError {
5892 #[error("peer doesn't implement any supported version")]
5894 VersionMismatch,
5895 #[error(transparent)]
5897 TransportError(#[from] TransportError),
5898 #[error("aborted by peer: {0}")]
5900 ConnectionClosed(frame::ConnectionClose),
5901 #[error("closed by peer: {0}")]
5903 ApplicationClosed(frame::ApplicationClose),
5904 #[error("reset by peer")]
5906 Reset,
5907 #[error("timed out")]
5913 TimedOut,
5914 #[error("closed")]
5916 LocallyClosed,
5917 #[error("CIDs exhausted")]
5921 CidsExhausted,
5922}
5923
5924impl From<Close> for ConnectionError {
5925 fn from(x: Close) -> Self {
5926 match x {
5927 Close::Connection(reason) => Self::ConnectionClosed(reason),
5928 Close::Application(reason) => Self::ApplicationClosed(reason),
5929 }
5930 }
5931}
5932
5933impl From<ConnectionError> for io::Error {
5935 fn from(x: ConnectionError) -> Self {
5936 use ConnectionError::*;
5937 let kind = match x {
5938 TimedOut => io::ErrorKind::TimedOut,
5939 Reset => io::ErrorKind::ConnectionReset,
5940 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
5941 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
5942 io::ErrorKind::Other
5943 }
5944 };
5945 Self::new(kind, x)
5946 }
5947}
5948
5949#[derive(Clone, Debug)]
5950pub enum State {
5952 Handshake(state::Handshake),
5954 Established,
5956 Closed(state::Closed),
5958 Draining,
5960 Drained,
5962}
5963
5964impl State {
5965 fn closed<R: Into<Close>>(reason: R) -> Self {
5966 Self::Closed(state::Closed {
5967 reason: reason.into(),
5968 })
5969 }
5970
5971 fn is_handshake(&self) -> bool {
5972 matches!(*self, Self::Handshake(_))
5973 }
5974
5975 fn is_established(&self) -> bool {
5976 matches!(*self, Self::Established)
5977 }
5978
5979 fn is_closed(&self) -> bool {
5980 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
5981 }
5982
5983 fn is_drained(&self) -> bool {
5984 matches!(*self, Self::Drained)
5985 }
5986}
5987
5988mod state {
5989 use super::*;
5990
5991 #[derive(Clone, Debug)]
5992 pub struct Handshake {
5993 pub(super) rem_cid_set: bool,
5997 pub(super) expected_token: Bytes,
6001 pub(super) client_hello: Option<Bytes>,
6005 }
6006
6007 #[derive(Clone, Debug)]
6008 pub struct Closed {
6009 pub(super) reason: Close,
6010 }
6011}
6012
6013#[derive(Debug)]
6015pub enum Event {
6016 HandshakeDataReady,
6018 Connected,
6020 ConnectionLost {
6024 reason: ConnectionError,
6026 },
6027 Stream(StreamEvent),
6029 DatagramReceived,
6031 DatagramsUnblocked,
6033}
6034
6035fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
6036 if x > y { x - y } else { Duration::ZERO }
6037}
6038
6039fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6040 Duration::from_micros(params.max_ack_delay.0 * 1000)
6041}
6042
6043const MAX_BACKOFF_EXPONENT: u32 = 16;
6045
6046const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6054
6055const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6061 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6062
6063const KEY_UPDATE_MARGIN: u64 = 10_000;
6067
6068#[derive(Default)]
6069struct SentFrames {
6070 retransmits: ThinRetransmits,
6071 largest_acked: Option<u64>,
6072 stream_frames: StreamMetaVec,
6073 non_retransmits: bool,
6075 requires_padding: bool,
6076}
6077
6078impl SentFrames {
6079 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6081 self.largest_acked.is_some()
6082 && !self.non_retransmits
6083 && self.stream_frames.is_empty()
6084 && self.retransmits.is_empty(streams)
6085 }
6086}
6087
6088fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6096 match (x, y) {
6097 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6098 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6099 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6100 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6101 }
6102}
6103
6104#[derive(Debug, Clone)]
6106pub(crate) struct PqcState {
6107 enabled: bool,
6109 #[allow(dead_code)]
6111 algorithms: Option<crate::transport_parameters::PqcAlgorithms>,
6112 handshake_mtu: u16,
6114 using_pqc: bool,
6116 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler,
6118}
6119
6120#[allow(dead_code)]
6121impl PqcState {
6122 fn new() -> Self {
6123 Self {
6124 enabled: false,
6125 algorithms: None,
6126 handshake_mtu: MIN_INITIAL_SIZE,
6127 using_pqc: false,
6128 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler::new(),
6129 }
6130 }
6131
6132 fn min_initial_size(&self) -> u16 {
6134 if self.enabled && self.using_pqc {
6135 std::cmp::max(self.handshake_mtu, 4096)
6137 } else {
6138 MIN_INITIAL_SIZE
6139 }
6140 }
6141
6142 fn update_from_peer_params(&mut self, params: &TransportParameters) {
6144 if let Some(ref algorithms) = params.pqc_algorithms {
6145 self.enabled = true;
6146 self.algorithms = Some(algorithms.clone());
6147 if algorithms.ml_kem_768
6149 || algorithms.ml_dsa_65
6150 || algorithms.hybrid_x25519_ml_kem
6151 || algorithms.hybrid_ed25519_ml_dsa
6152 {
6153 self.using_pqc = true;
6154 self.handshake_mtu = 4096; }
6156 }
6157 }
6158
6159 fn detect_pqc_from_crypto(&mut self, crypto_data: &[u8], space: SpaceId) {
6161 if self.packet_handler.detect_pqc_handshake(crypto_data, space) {
6162 self.using_pqc = true;
6163 self.handshake_mtu = self.packet_handler.get_min_packet_size(space);
6165 }
6166 }
6167
6168 fn should_trigger_mtu_discovery(&mut self) -> bool {
6170 self.packet_handler.should_trigger_mtu_discovery()
6171 }
6172
6173 fn get_mtu_config(&self) -> MtuDiscoveryConfig {
6175 self.packet_handler.get_pqc_mtu_config()
6176 }
6177
6178 fn calculate_crypto_frame_size(&self, available_space: usize, remaining_data: usize) -> usize {
6180 self.packet_handler
6181 .calculate_crypto_frame_size(available_space, remaining_data)
6182 }
6183
6184 fn should_adjust_coalescing(&self, current_size: usize, space: SpaceId) -> bool {
6186 self.packet_handler
6187 .adjust_coalescing_for_pqc(current_size, space)
6188 }
6189
6190 fn on_packet_sent(&mut self, space: SpaceId, size: u16) {
6192 self.packet_handler.on_packet_sent(space, size);
6193 }
6194
6195 fn reset(&mut self) {
6197 self.enabled = false;
6198 self.algorithms = None;
6199 self.handshake_mtu = MIN_INITIAL_SIZE;
6200 self.using_pqc = false;
6201 self.packet_handler.reset();
6202 }
6203}
6204
6205#[cfg(feature = "pqc")]
6206impl Default for PqcState {
6207 fn default() -> Self {
6208 Self::new()
6209 }
6210}
6211
6212#[derive(Debug, Clone)]
6214pub(crate) struct AddressDiscoveryState {
6215 enabled: bool,
6217 max_observation_rate: u8,
6219 observe_all_paths: bool,
6221 path_addresses: std::collections::HashMap<u64, paths::PathAddressInfo>,
6223 rate_limiter: AddressObservationRateLimiter,
6225 observed_addresses: Vec<ObservedAddressEvent>,
6227 bootstrap_mode: bool,
6229 next_sequence_number: VarInt,
6231 last_received_sequence: std::collections::HashMap<u64, VarInt>,
6233}
6234
6235#[derive(Debug, Clone, PartialEq, Eq)]
6237struct ObservedAddressEvent {
6238 address: SocketAddr,
6240 received_at: Instant,
6242 path_id: u64,
6244}
6245
6246#[derive(Debug, Clone)]
6248struct AddressObservationRateLimiter {
6249 tokens: f64,
6251 max_tokens: f64,
6253 rate: f64,
6255 last_update: Instant,
6257}
6258
6259#[allow(dead_code)]
6260impl AddressDiscoveryState {
6261 fn new(config: &crate::transport_parameters::AddressDiscoveryConfig, now: Instant) -> Self {
6263 use crate::transport_parameters::AddressDiscoveryConfig::*;
6264
6265 let (enabled, _can_send, _can_receive) = match config {
6267 SendOnly => (true, true, false),
6268 ReceiveOnly => (true, false, true),
6269 SendAndReceive => (true, true, true),
6270 };
6271
6272 let max_observation_rate = 10u8; let observe_all_paths = false; Self {
6278 enabled,
6279 max_observation_rate,
6280 observe_all_paths,
6281 path_addresses: std::collections::HashMap::new(),
6282 rate_limiter: AddressObservationRateLimiter::new(max_observation_rate, now),
6283 observed_addresses: Vec::new(),
6284 bootstrap_mode: false,
6285 next_sequence_number: VarInt::from_u32(0),
6286 last_received_sequence: std::collections::HashMap::new(),
6287 }
6288 }
6289
6290 fn should_send_observation(&mut self, path_id: u64, now: Instant) -> bool {
6292 if !self.should_observe_path(path_id) {
6294 return false;
6295 }
6296
6297 let needs_observation = match self.path_addresses.get(&path_id) {
6299 Some(info) => info.observed_address.is_none() || !info.notified,
6300 None => true,
6301 };
6302
6303 if !needs_observation {
6304 return false;
6305 }
6306
6307 self.rate_limiter.try_consume(1.0, now)
6309 }
6310
6311 fn record_observation_sent(&mut self, path_id: u64) {
6313 if let Some(info) = self.path_addresses.get_mut(&path_id) {
6314 info.mark_notified();
6315 }
6316 }
6317
6318 fn handle_observed_address(&mut self, address: SocketAddr, path_id: u64, now: Instant) {
6320 if !self.enabled {
6321 return;
6322 }
6323
6324 self.observed_addresses.push(ObservedAddressEvent {
6325 address,
6326 received_at: now,
6327 path_id,
6328 });
6329
6330 let info = self
6332 .path_addresses
6333 .entry(path_id)
6334 .or_insert_with(paths::PathAddressInfo::new);
6335 info.update_observed_address(address, now);
6336 }
6337
6338 pub(crate) fn get_observed_address(&self, path_id: u64) -> Option<SocketAddr> {
6340 self.path_addresses
6341 .get(&path_id)
6342 .and_then(|info| info.observed_address)
6343 }
6344
6345 pub(crate) fn get_all_observed_addresses(&self) -> Vec<SocketAddr> {
6347 self.path_addresses
6348 .values()
6349 .filter_map(|info| info.observed_address)
6350 .collect()
6351 }
6352
6353 pub(crate) fn stats(&self) -> AddressDiscoveryStats {
6355 AddressDiscoveryStats {
6356 frames_sent: self.observed_addresses.len() as u64, frames_received: self.observed_addresses.len() as u64,
6358 addresses_discovered: self
6359 .path_addresses
6360 .values()
6361 .filter(|info| info.observed_address.is_some())
6362 .count() as u64,
6363 address_changes_detected: 0, }
6365 }
6366
6367 fn has_unnotified_changes(&self) -> bool {
6369 self.path_addresses
6370 .values()
6371 .any(|info| info.observed_address.is_some() && !info.notified)
6372 }
6373
6374 fn queue_observed_address_frame(
6376 &mut self,
6377 path_id: u64,
6378 address: SocketAddr,
6379 ) -> Option<frame::ObservedAddress> {
6380 if !self.enabled {
6382 return None;
6383 }
6384
6385 if !self.observe_all_paths && path_id != 0 {
6387 return None;
6388 }
6389
6390 if let Some(info) = self.path_addresses.get(&path_id) {
6392 if info.notified {
6393 return None;
6394 }
6395 }
6396
6397 if self.rate_limiter.tokens < 1.0 {
6399 return None;
6400 }
6401
6402 self.rate_limiter.tokens -= 1.0;
6404
6405 let info = self
6407 .path_addresses
6408 .entry(path_id)
6409 .or_insert_with(paths::PathAddressInfo::new);
6410 info.observed_address = Some(address);
6411 info.notified = true;
6412
6413 let sequence_number = self.next_sequence_number;
6415 self.next_sequence_number = VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6416 .expect("sequence number overflow");
6417
6418 Some(frame::ObservedAddress {
6419 sequence_number,
6420 address,
6421 })
6422 }
6423
6424 fn check_for_address_observations(
6426 &mut self,
6427 _current_path: u64,
6428 peer_supports_address_discovery: bool,
6429 now: Instant,
6430 ) -> Vec<frame::ObservedAddress> {
6431 let mut frames = Vec::new();
6432
6433 if !self.enabled || !peer_supports_address_discovery {
6435 return frames;
6436 }
6437
6438 self.rate_limiter.update_tokens(now);
6440
6441 let paths_to_notify: Vec<u64> = self
6443 .path_addresses
6444 .iter()
6445 .filter_map(|(&path_id, info)| {
6446 if info.observed_address.is_some() && !info.notified {
6447 Some(path_id)
6448 } else {
6449 None
6450 }
6451 })
6452 .collect();
6453
6454 for path_id in paths_to_notify {
6456 if !self.should_observe_path(path_id) {
6458 continue;
6459 }
6460
6461 if !self.bootstrap_mode && self.rate_limiter.tokens < 1.0 {
6463 break; }
6465
6466 if let Some(info) = self.path_addresses.get_mut(&path_id) {
6468 if let Some(address) = info.observed_address {
6469 if self.bootstrap_mode {
6471 self.rate_limiter.tokens -= 0.2; } else {
6473 self.rate_limiter.tokens -= 1.0;
6474 }
6475
6476 info.notified = true;
6478
6479 let sequence_number = self.next_sequence_number;
6481 self.next_sequence_number =
6482 VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6483 .expect("sequence number overflow");
6484
6485 frames.push(frame::ObservedAddress {
6486 sequence_number,
6487 address,
6488 });
6489 }
6490 }
6491 }
6492
6493 frames
6494 }
6495
6496 fn update_rate_limit(&mut self, new_rate: f64) {
6498 self.max_observation_rate = new_rate as u8;
6499 self.rate_limiter.set_rate(new_rate as u8);
6500 }
6501
6502 fn from_transport_params(params: &TransportParameters) -> Option<Self> {
6504 params
6505 .address_discovery
6506 .as_ref()
6507 .map(|config| Self::new(config, Instant::now()))
6508 }
6509
6510 #[cfg(test)]
6512 fn new_with_params(enabled: bool, max_rate: f64, observe_all_paths: bool) -> Self {
6513 if !enabled {
6515 return Self {
6517 enabled: false,
6518 max_observation_rate: max_rate as u8,
6519 observe_all_paths,
6520 path_addresses: std::collections::HashMap::new(),
6521 rate_limiter: AddressObservationRateLimiter::new(max_rate as u8, Instant::now()),
6522 observed_addresses: Vec::new(),
6523 bootstrap_mode: false,
6524 next_sequence_number: VarInt::from_u32(0),
6525 last_received_sequence: std::collections::HashMap::new(),
6526 };
6527 }
6528
6529 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6531 let mut state = Self::new(&config, Instant::now());
6532 state.max_observation_rate = max_rate as u8;
6533 state.observe_all_paths = observe_all_paths;
6534 state.rate_limiter = AddressObservationRateLimiter::new(max_rate as u8, Instant::now());
6535 state
6536 }
6537
6538 fn set_bootstrap_mode(&mut self, enabled: bool) {
6540 self.bootstrap_mode = enabled;
6541 if enabled {
6543 let bootstrap_rate = self.get_effective_rate_limit();
6544 self.rate_limiter.rate = bootstrap_rate;
6545 self.rate_limiter.max_tokens = bootstrap_rate * 2.0; self.rate_limiter.tokens = self.rate_limiter.max_tokens;
6548 }
6549 }
6550
6551 fn is_bootstrap_mode(&self) -> bool {
6553 self.bootstrap_mode
6554 }
6555
6556 fn get_effective_rate_limit(&self) -> f64 {
6558 if self.bootstrap_mode {
6559 (self.max_observation_rate as f64) * 5.0
6561 } else {
6562 self.max_observation_rate as f64
6563 }
6564 }
6565
6566 fn should_observe_path(&self, path_id: u64) -> bool {
6568 if !self.enabled {
6569 return false;
6570 }
6571
6572 if self.bootstrap_mode {
6574 return true;
6575 }
6576
6577 self.observe_all_paths || path_id == 0
6579 }
6580
6581 fn should_send_observation_immediately(&self, is_new_connection: bool) -> bool {
6583 self.bootstrap_mode && is_new_connection
6584 }
6585}
6586
6587#[allow(dead_code)]
6588impl AddressObservationRateLimiter {
6589 fn new(rate: u8, now: Instant) -> Self {
6591 let rate_f64 = rate as f64;
6592 Self {
6593 tokens: rate_f64,
6594 max_tokens: rate_f64,
6595 rate: rate_f64,
6596 last_update: now,
6597 }
6598 }
6599
6600 fn try_consume(&mut self, tokens: f64, now: Instant) -> bool {
6602 self.update_tokens(now);
6603
6604 if self.tokens >= tokens {
6605 self.tokens -= tokens;
6606 true
6607 } else {
6608 false
6609 }
6610 }
6611
6612 fn update_tokens(&mut self, now: Instant) {
6614 let elapsed = now.saturating_duration_since(self.last_update);
6615 let new_tokens = elapsed.as_secs_f64() * self.rate;
6616 self.tokens = (self.tokens + new_tokens).min(self.max_tokens);
6617 self.last_update = now;
6618 }
6619
6620 fn set_rate(&mut self, rate: u8) {
6622 let rate_f64 = rate as f64;
6623 self.rate = rate_f64;
6624 self.max_tokens = rate_f64;
6625 if self.tokens > self.max_tokens {
6627 self.tokens = self.max_tokens;
6628 }
6629 }
6630}
6631
6632#[cfg(test)]
6633mod tests {
6634 use super::*;
6635 use crate::transport_parameters::AddressDiscoveryConfig;
6636 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
6637
6638 #[test]
6639 fn address_discovery_state_new() {
6640 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6641 let now = Instant::now();
6642 let state = AddressDiscoveryState::new(&config, now);
6643
6644 assert!(state.enabled);
6645 assert_eq!(state.max_observation_rate, 10);
6646 assert!(!state.observe_all_paths);
6647 assert!(state.path_addresses.is_empty());
6648 assert!(state.observed_addresses.is_empty());
6649 assert_eq!(state.rate_limiter.tokens, 10.0);
6650 }
6651
6652 #[test]
6653 fn address_discovery_state_disabled() {
6654 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6655 let now = Instant::now();
6656 let mut state = AddressDiscoveryState::new(&config, now);
6657
6658 state.enabled = false;
6660
6661 assert!(!state.should_send_observation(0, now));
6663 }
6664
6665 #[test]
6666 fn address_discovery_state_should_send_observation() {
6667 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6668 let now = Instant::now();
6669 let mut state = AddressDiscoveryState::new(&config, now);
6670
6671 assert!(state.should_send_observation(0, now));
6673
6674 let mut path_info = paths::PathAddressInfo::new();
6676 path_info.update_observed_address(
6677 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6678 now,
6679 );
6680 path_info.mark_notified();
6681 state.path_addresses.insert(0, path_info);
6682
6683 assert!(!state.should_send_observation(0, now));
6685
6686 assert!(!state.should_send_observation(1, now));
6688 }
6689
6690 #[test]
6691 fn address_discovery_state_rate_limiting() {
6692 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6693 let now = Instant::now();
6694 let mut state = AddressDiscoveryState::new(&config, now);
6695
6696 state.observe_all_paths = true;
6698
6699 assert!(state.should_send_observation(0, now));
6701
6702 state.rate_limiter.try_consume(9.0, now); assert!(!state.should_send_observation(0, now));
6707
6708 let later = now + Duration::from_secs(1);
6710 state.rate_limiter.update_tokens(later);
6711 assert!(state.should_send_observation(0, later));
6712 }
6713
6714 #[test]
6715 fn address_discovery_state_handle_observed_address() {
6716 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6717 let now = Instant::now();
6718 let mut state = AddressDiscoveryState::new(&config, now);
6719
6720 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6721 let addr2 = SocketAddr::new(
6722 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
6723 8080,
6724 );
6725
6726 state.handle_observed_address(addr1, 0, now);
6728 assert_eq!(state.observed_addresses.len(), 1);
6729 assert_eq!(state.observed_addresses[0].address, addr1);
6730 assert_eq!(state.observed_addresses[0].path_id, 0);
6731
6732 let later = now + Duration::from_millis(100);
6734 state.handle_observed_address(addr2, 1, later);
6735 assert_eq!(state.observed_addresses.len(), 2);
6736 assert_eq!(state.observed_addresses[1].address, addr2);
6737 assert_eq!(state.observed_addresses[1].path_id, 1);
6738 }
6739
6740 #[test]
6741 fn address_discovery_state_get_observed_address() {
6742 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6743 let now = Instant::now();
6744 let mut state = AddressDiscoveryState::new(&config, now);
6745
6746 assert_eq!(state.get_observed_address(0), None);
6748
6749 let mut path_info = paths::PathAddressInfo::new();
6751 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80);
6752 path_info.update_observed_address(addr, now);
6753 state.path_addresses.insert(0, path_info);
6754
6755 assert_eq!(state.get_observed_address(0), Some(addr));
6757 assert_eq!(state.get_observed_address(1), None);
6758 }
6759
6760 #[test]
6761 fn address_discovery_state_unnotified_changes() {
6762 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6763 let now = Instant::now();
6764 let mut state = AddressDiscoveryState::new(&config, now);
6765
6766 assert!(!state.has_unnotified_changes());
6768
6769 let mut path_info = paths::PathAddressInfo::new();
6771 path_info.update_observed_address(
6772 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6773 now,
6774 );
6775 state.path_addresses.insert(0, path_info);
6776
6777 assert!(state.has_unnotified_changes());
6779
6780 state.record_observation_sent(0);
6782 assert!(!state.has_unnotified_changes());
6783 }
6784
6785 #[test]
6786 fn address_observation_rate_limiter_token_bucket() {
6787 let now = Instant::now();
6788 let mut limiter = AddressObservationRateLimiter::new(5, now); assert_eq!(limiter.tokens, 5.0);
6792 assert_eq!(limiter.max_tokens, 5.0);
6793 assert_eq!(limiter.rate, 5.0);
6794
6795 assert!(limiter.try_consume(3.0, now));
6797 assert_eq!(limiter.tokens, 2.0);
6798
6799 assert!(!limiter.try_consume(3.0, now));
6801 assert_eq!(limiter.tokens, 2.0);
6802
6803 let later = now + Duration::from_secs(1);
6805 limiter.update_tokens(later);
6806 assert_eq!(limiter.tokens, 5.0); let half_sec = now + Duration::from_millis(500);
6810 let mut limiter2 = AddressObservationRateLimiter::new(5, now);
6811 limiter2.try_consume(3.0, now);
6812 limiter2.update_tokens(half_sec);
6813 assert_eq!(limiter2.tokens, 4.5); }
6815
6816 #[test]
6818 fn connection_initializes_address_discovery_state_default() {
6819 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6822 let state = AddressDiscoveryState::new(&config, Instant::now());
6823 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
6826 }
6827
6828 #[test]
6829 fn connection_initializes_with_address_discovery_enabled() {
6830 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6832 let state = AddressDiscoveryState::new(&config, Instant::now());
6833 assert!(state.enabled);
6834 assert_eq!(state.max_observation_rate, 10);
6835 assert!(!state.observe_all_paths);
6836 }
6837
6838 #[test]
6839 fn connection_address_discovery_enabled_by_default() {
6840 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6842 let state = AddressDiscoveryState::new(&config, Instant::now());
6843 assert!(state.enabled); }
6845
6846 #[test]
6847 fn negotiate_max_idle_timeout_commutative() {
6848 let test_params = [
6849 (None, None, None),
6850 (None, Some(VarInt(0)), None),
6851 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6852 (Some(VarInt(0)), Some(VarInt(0)), None),
6853 (
6854 Some(VarInt(2)),
6855 Some(VarInt(0)),
6856 Some(Duration::from_millis(2)),
6857 ),
6858 (
6859 Some(VarInt(1)),
6860 Some(VarInt(4)),
6861 Some(Duration::from_millis(1)),
6862 ),
6863 ];
6864
6865 for (left, right, result) in test_params {
6866 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6867 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6868 }
6869 }
6870
6871 #[test]
6872 fn path_creation_initializes_address_discovery() {
6873 let config = TransportConfig::default();
6874 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6875 let now = Instant::now();
6876
6877 let path = paths::PathData::new(remote, false, None, now, &config);
6879
6880 assert!(path.address_info.observed_address.is_none());
6882 assert!(path.address_info.last_observed.is_none());
6883 assert_eq!(path.address_info.observation_count, 0);
6884 assert!(!path.address_info.notified);
6885
6886 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6888 assert_eq!(path.observation_rate_limiter.max_tokens, 10.0);
6889 assert_eq!(path.observation_rate_limiter.tokens, 10.0);
6890 }
6891
6892 #[test]
6893 fn path_migration_resets_address_discovery() {
6894 let config = TransportConfig::default();
6895 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6896 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6897 let now = Instant::now();
6898
6899 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
6901 path1.update_observed_address(remote1, now);
6902 path1.mark_address_notified();
6903 path1.consume_observation_token(now);
6904 path1.set_observation_rate(20);
6905
6906 let path2 = paths::PathData::from_previous(remote2, &path1, now);
6908
6909 assert!(path2.address_info.observed_address.is_none());
6911 assert!(path2.address_info.last_observed.is_none());
6912 assert_eq!(path2.address_info.observation_count, 0);
6913 assert!(!path2.address_info.notified);
6914
6915 assert_eq!(path2.observation_rate_limiter.rate, 20.0);
6917 assert_eq!(path2.observation_rate_limiter.tokens, 20.0);
6918 }
6919
6920 #[test]
6921 fn connection_path_updates_observation_rate() {
6922 let config = TransportConfig::default();
6923 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 42);
6924 let now = Instant::now();
6925
6926 let mut path = paths::PathData::new(remote, false, None, now, &config);
6927
6928 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6930
6931 path.set_observation_rate(25);
6933 assert_eq!(path.observation_rate_limiter.rate, 25.0);
6934 assert_eq!(path.observation_rate_limiter.max_tokens, 25.0);
6935
6936 path.observation_rate_limiter.tokens = 30.0; path.set_observation_rate(20);
6939 assert_eq!(path.observation_rate_limiter.tokens, 20.0); }
6941
6942 #[test]
6943 fn path_validation_preserves_discovery_state() {
6944 let config = TransportConfig::default();
6945 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6946 let now = Instant::now();
6947
6948 let mut path = paths::PathData::new(remote, false, None, now, &config);
6949
6950 let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 5678);
6952 path.update_observed_address(observed, now);
6953 path.set_observation_rate(15);
6954
6955 path.validated = true;
6957
6958 assert_eq!(path.address_info.observed_address, Some(observed));
6960 assert_eq!(path.observation_rate_limiter.rate, 15.0);
6961 }
6962
6963 #[test]
6964 fn address_discovery_state_initialization() {
6965 let state = AddressDiscoveryState::new_with_params(true, 30.0, true);
6967
6968 assert!(state.enabled);
6969 assert_eq!(state.max_observation_rate, 30);
6970 assert!(state.observe_all_paths);
6971 assert!(state.path_addresses.is_empty());
6972 assert!(state.observed_addresses.is_empty());
6973 }
6974
6975 #[test]
6977 fn handle_observed_address_frame_basic() {
6978 let config = AddressDiscoveryConfig::SendAndReceive;
6979 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6980 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6981 let now = Instant::now();
6982 let path_id = 0;
6983
6984 state.handle_observed_address(addr, path_id, now);
6986
6987 assert_eq!(state.observed_addresses.len(), 1);
6989 assert_eq!(state.observed_addresses[0].address, addr);
6990 assert_eq!(state.observed_addresses[0].path_id, path_id);
6991 assert_eq!(state.observed_addresses[0].received_at, now);
6992
6993 assert!(state.path_addresses.contains_key(&path_id));
6995 let path_info = &state.path_addresses[&path_id];
6996 assert_eq!(path_info.observed_address, Some(addr));
6997 assert_eq!(path_info.last_observed, Some(now));
6998 assert_eq!(path_info.observation_count, 1);
6999 }
7000
7001 #[test]
7002 fn handle_observed_address_frame_multiple_observations() {
7003 let config = AddressDiscoveryConfig::SendAndReceive;
7004 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7005 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7006 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
7007 let now = Instant::now();
7008 let path_id = 0;
7009
7010 state.handle_observed_address(addr1, path_id, now);
7012 state.handle_observed_address(addr1, path_id, now + Duration::from_secs(1));
7013 state.handle_observed_address(addr2, path_id, now + Duration::from_secs(2));
7014
7015 assert_eq!(state.observed_addresses.len(), 3);
7017
7018 let path_info = &state.path_addresses[&path_id];
7020 assert_eq!(path_info.observed_address, Some(addr2));
7021 assert_eq!(path_info.observation_count, 1); }
7023
7024 #[test]
7025 fn handle_observed_address_frame_disabled() {
7026 let config = AddressDiscoveryConfig::SendAndReceive;
7027 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7028 state.enabled = false; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7030 let now = Instant::now();
7031
7032 state.handle_observed_address(addr, 0, now);
7034
7035 assert!(state.observed_addresses.is_empty());
7037 assert!(state.path_addresses.is_empty());
7038 }
7039
7040 #[test]
7041 fn should_send_observation_basic() {
7042 let config = AddressDiscoveryConfig::SendAndReceive;
7043 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7044 state.max_observation_rate = 10;
7045 let now = Instant::now();
7046 let path_id = 0;
7047
7048 assert!(state.should_send_observation(path_id, now));
7050
7051 state.record_observation_sent(path_id);
7053
7054 assert!(state.should_send_observation(path_id, now));
7056 }
7057
7058 #[test]
7059 fn should_send_observation_rate_limiting() {
7060 let config = AddressDiscoveryConfig::SendAndReceive;
7061 let now = Instant::now();
7062 let mut state = AddressDiscoveryState::new(&config, now);
7063 state.max_observation_rate = 2; state.update_rate_limit(2.0);
7065 let path_id = 0;
7066
7067 assert!(state.should_send_observation(path_id, now));
7069 state.record_observation_sent(path_id);
7070 assert!(state.should_send_observation(path_id, now));
7071 state.record_observation_sent(path_id);
7072
7073 assert!(!state.should_send_observation(path_id, now));
7075
7076 let later = now + Duration::from_secs(1);
7078 assert!(state.should_send_observation(path_id, later));
7079 }
7080
7081 #[test]
7082 fn should_send_observation_disabled() {
7083 let config = AddressDiscoveryConfig::SendAndReceive;
7084 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7085 state.enabled = false;
7086
7087 assert!(!state.should_send_observation(0, Instant::now()));
7089 }
7090
7091 #[test]
7092 fn should_send_observation_per_path() {
7093 let config = AddressDiscoveryConfig::SendAndReceive;
7094 let now = Instant::now();
7095 let mut state = AddressDiscoveryState::new(&config, now);
7096 state.max_observation_rate = 2; state.observe_all_paths = true;
7098 state.update_rate_limit(2.0);
7099
7100 assert!(state.should_send_observation(0, now));
7102 state.record_observation_sent(0);
7103
7104 assert!(state.should_send_observation(1, now));
7106 state.record_observation_sent(1);
7107
7108 assert!(!state.should_send_observation(0, now));
7110 assert!(!state.should_send_observation(1, now));
7111
7112 let later = now + Duration::from_secs(1);
7114 assert!(state.should_send_observation(0, later));
7115 }
7116
7117 #[test]
7118 fn has_unnotified_changes_test() {
7119 let config = AddressDiscoveryConfig::SendAndReceive;
7120 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7121 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7122 let now = Instant::now();
7123
7124 assert!(!state.has_unnotified_changes());
7126
7127 state.handle_observed_address(addr, 0, now);
7129 assert!(state.has_unnotified_changes());
7130
7131 state.path_addresses.get_mut(&0).unwrap().notified = true;
7133 assert!(!state.has_unnotified_changes());
7134 }
7135
7136 #[test]
7137 fn get_observed_address_test() {
7138 let config = AddressDiscoveryConfig::SendAndReceive;
7139 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7140 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7141 let now = Instant::now();
7142 let path_id = 0;
7143
7144 assert_eq!(state.get_observed_address(path_id), None);
7146
7147 state.handle_observed_address(addr, path_id, now);
7149 assert_eq!(state.get_observed_address(path_id), Some(addr));
7150
7151 assert_eq!(state.get_observed_address(999), None);
7153 }
7154
7155 #[test]
7157 fn rate_limiter_token_bucket_basic() {
7158 let now = Instant::now();
7159 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(5.0, now));
7163 assert!(limiter.try_consume(5.0, now));
7164
7165 assert!(!limiter.try_consume(1.0, now));
7167 }
7168
7169 #[test]
7170 fn rate_limiter_token_replenishment() {
7171 let now = Instant::now();
7172 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(10.0, now));
7176 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7180 assert!(limiter.try_consume(10.0, later)); assert!(!limiter.try_consume(0.1, later)); let later = later + Duration::from_millis(500);
7185 assert!(limiter.try_consume(5.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7188
7189 #[test]
7190 fn rate_limiter_max_tokens_cap() {
7191 let now = Instant::now();
7192 let mut limiter = AddressObservationRateLimiter::new(10, now);
7193
7194 let later = now + Duration::from_secs(2);
7196 assert!(limiter.try_consume(10.0, later));
7198 assert!(!limiter.try_consume(10.1, later)); let later2 = later + Duration::from_secs(1);
7202 assert!(limiter.try_consume(3.0, later2));
7203
7204 let much_later = later2 + Duration::from_secs(2);
7206 assert!(limiter.try_consume(10.0, much_later)); assert!(!limiter.try_consume(0.1, much_later)); }
7209
7210 #[test]
7211 fn rate_limiter_fractional_consumption() {
7212 let now = Instant::now();
7213 let mut limiter = AddressObservationRateLimiter::new(10, now);
7214
7215 assert!(limiter.try_consume(0.5, now));
7217 assert!(limiter.try_consume(2.3, now));
7218 assert!(limiter.try_consume(7.2, now)); assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_millis(100); assert!(limiter.try_consume(1.0, later));
7224 assert!(!limiter.try_consume(0.1, later));
7225 }
7226
7227 #[test]
7228 fn rate_limiter_zero_rate() {
7229 let now = Instant::now();
7230 let mut limiter = AddressObservationRateLimiter::new(0, now); assert!(!limiter.try_consume(1.0, now));
7234 assert!(!limiter.try_consume(0.1, now));
7235 assert!(!limiter.try_consume(0.001, now));
7236
7237 let later = now + Duration::from_secs(10);
7239 assert!(!limiter.try_consume(0.001, later));
7240 }
7241
7242 #[test]
7243 fn rate_limiter_high_rate() {
7244 let now = Instant::now();
7245 let mut limiter = AddressObservationRateLimiter::new(63, now); assert!(limiter.try_consume(60.0, now));
7249 assert!(limiter.try_consume(3.0, now));
7250 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7254 assert!(limiter.try_consume(63.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7257
7258 #[test]
7259 fn rate_limiter_time_precision() {
7260 let now = Instant::now();
7261 let mut limiter = AddressObservationRateLimiter::new(100, now); assert!(limiter.try_consume(100.0, now));
7265 assert!(!limiter.try_consume(0.1, now));
7266
7267 let later = now + Duration::from_millis(10);
7269 assert!(limiter.try_consume(0.8, later)); assert!(!limiter.try_consume(0.5, later)); let much_later = later + Duration::from_millis(100); assert!(limiter.try_consume(5.0, much_later)); limiter.tokens = 0.0; let final_time = much_later + Duration::from_millis(1);
7281 limiter.update_tokens(final_time); assert!(limiter.tokens >= 0.09 && limiter.tokens <= 0.11);
7286 }
7287
7288 #[test]
7289 fn per_path_rate_limiting_independent() {
7290 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7291 let now = Instant::now();
7292 let mut state = AddressDiscoveryState::new(&config, now);
7293
7294 state.observe_all_paths = true;
7296
7297 state.update_rate_limit(5.0);
7299
7300 state
7302 .path_addresses
7303 .insert(0, paths::PathAddressInfo::new());
7304 state
7305 .path_addresses
7306 .insert(1, paths::PathAddressInfo::new());
7307 state
7308 .path_addresses
7309 .insert(2, paths::PathAddressInfo::new());
7310
7311 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7313 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7314 8080,
7315 ));
7316 state.path_addresses.get_mut(&1).unwrap().observed_address = Some(SocketAddr::new(
7317 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
7318 8081,
7319 ));
7320 state.path_addresses.get_mut(&2).unwrap().observed_address = Some(SocketAddr::new(
7321 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)),
7322 8082,
7323 ));
7324
7325 for _ in 0..3 {
7327 assert!(state.should_send_observation(0, now));
7328 state.record_observation_sent(0);
7329 state.path_addresses.get_mut(&0).unwrap().notified = false;
7331 }
7332
7333 for _ in 0..2 {
7335 assert!(state.should_send_observation(1, now));
7336 state.record_observation_sent(1);
7337 state.path_addresses.get_mut(&1).unwrap().notified = false;
7339 }
7340
7341 assert!(!state.should_send_observation(2, now));
7343
7344 let later = now + Duration::from_secs(1);
7346
7347 assert!(state.should_send_observation(0, later));
7349 assert!(state.should_send_observation(1, later));
7350 assert!(state.should_send_observation(2, later));
7351 }
7352
7353 #[test]
7354 fn per_path_rate_limiting_with_path_specific_limits() {
7355 let now = Instant::now();
7356 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7357 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7358 let config = TransportConfig::default();
7359
7360 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
7362 let mut path2 = paths::PathData::new(remote2, false, None, now, &config);
7363
7364 path1.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now); path2.observation_rate_limiter = paths::PathObservationRateLimiter::new(5, now); for _ in 0..10 {
7370 assert!(path1.observation_rate_limiter.can_send(now));
7371 path1.observation_rate_limiter.consume_token(now);
7372 }
7373 assert!(!path1.observation_rate_limiter.can_send(now));
7374
7375 for _ in 0..5 {
7377 assert!(path2.observation_rate_limiter.can_send(now));
7378 path2.observation_rate_limiter.consume_token(now);
7379 }
7380 assert!(!path2.observation_rate_limiter.can_send(now));
7381 }
7382
7383 #[test]
7384 fn per_path_rate_limiting_address_change_detection() {
7385 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7386 let now = Instant::now();
7387 let mut state = AddressDiscoveryState::new(&config, now);
7388
7389 let path_id = 0;
7391 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7392 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8080);
7393
7394 assert!(state.should_send_observation(path_id, now));
7396 state.handle_observed_address(addr1, path_id, now);
7397 state.record_observation_sent(path_id);
7398
7399 assert!(!state.should_send_observation(path_id, now));
7401
7402 state.handle_observed_address(addr2, path_id, now);
7404 if let Some(info) = state.path_addresses.get_mut(&path_id) {
7405 info.notified = false; }
7407
7408 assert!(state.should_send_observation(path_id, now));
7410 }
7411
7412 #[test]
7413 fn per_path_rate_limiting_migration() {
7414 let now = Instant::now();
7415 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7416 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7417 let config = TransportConfig::default();
7418
7419 let mut path = paths::PathData::new(remote1, false, None, now, &config);
7421 path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7422
7423 for _ in 0..5 {
7425 assert!(path.observation_rate_limiter.can_send(now));
7426 path.observation_rate_limiter.consume_token(now);
7427 }
7428
7429 let mut new_path = paths::PathData::new(remote2, false, None, now, &config);
7431
7432 new_path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7435
7436 for _ in 0..10 {
7438 assert!(new_path.observation_rate_limiter.can_send(now));
7439 new_path.observation_rate_limiter.consume_token(now);
7440 }
7441 assert!(!new_path.observation_rate_limiter.can_send(now));
7442 }
7443
7444 #[test]
7445 fn per_path_rate_limiting_disabled_paths() {
7446 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7447 let now = Instant::now();
7448 let mut state = AddressDiscoveryState::new(&config, now);
7449
7450 assert!(state.should_send_observation(0, now));
7452
7453 assert!(!state.should_send_observation(1, now));
7455 assert!(!state.should_send_observation(2, now));
7456
7457 let later = now + Duration::from_secs(1);
7459 assert!(!state.should_send_observation(1, later));
7460 }
7461
7462 #[test]
7463 fn respecting_negotiated_max_observation_rate_basic() {
7464 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7465 let now = Instant::now();
7466 let mut state = AddressDiscoveryState::new(&config, now);
7467
7468 state.max_observation_rate = 10; state.rate_limiter = AddressObservationRateLimiter::new(10, now);
7471
7472 for _ in 0..10 {
7474 assert!(state.should_send_observation(0, now));
7475 }
7476 assert!(!state.should_send_observation(0, now));
7478 }
7479
7480 #[test]
7481 fn respecting_negotiated_max_observation_rate_zero() {
7482 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7483 let now = Instant::now();
7484 let mut state = AddressDiscoveryState::new(&config, now);
7485
7486 state.max_observation_rate = 0;
7488 state.rate_limiter = AddressObservationRateLimiter::new(0, now);
7489
7490 assert!(!state.should_send_observation(0, now));
7492 assert!(!state.should_send_observation(1, now));
7493
7494 let later = now + Duration::from_secs(10);
7496 assert!(!state.should_send_observation(0, later));
7497 }
7498
7499 #[test]
7500 fn respecting_negotiated_max_observation_rate_higher() {
7501 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7502 let now = Instant::now();
7503 let mut state = AddressDiscoveryState::new(&config, now);
7504
7505 state
7507 .path_addresses
7508 .insert(0, paths::PathAddressInfo::new());
7509 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7510 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7511 8080,
7512 ));
7513
7514 state.update_rate_limit(5.0);
7516
7517 state.max_observation_rate = 20; for _ in 0..5 {
7522 assert!(state.should_send_observation(0, now));
7523 state.record_observation_sent(0);
7524 state.path_addresses.get_mut(&0).unwrap().notified = false;
7526 }
7527 assert!(!state.should_send_observation(0, now));
7529 }
7530
7531 #[test]
7532 fn respecting_negotiated_max_observation_rate_dynamic_update() {
7533 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7534 let now = Instant::now();
7535 let mut state = AddressDiscoveryState::new(&config, now);
7536
7537 state
7539 .path_addresses
7540 .insert(0, paths::PathAddressInfo::new());
7541 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7542 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7543 8080,
7544 ));
7545
7546 for _ in 0..5 {
7548 assert!(state.should_send_observation(0, now));
7549 state.record_observation_sent(0);
7550 state.path_addresses.get_mut(&0).unwrap().notified = false;
7552 }
7553
7554 state.max_observation_rate = 3;
7558 state.rate_limiter.set_rate(3);
7559
7560 for _ in 0..3 {
7563 assert!(state.should_send_observation(0, now));
7564 state.record_observation_sent(0);
7565 state.path_addresses.get_mut(&0).unwrap().notified = false;
7567 }
7568
7569 assert!(!state.should_send_observation(0, now));
7571
7572 let later = now + Duration::from_secs(1);
7574 for _ in 0..3 {
7575 assert!(state.should_send_observation(0, later));
7576 state.record_observation_sent(0);
7577 state.path_addresses.get_mut(&0).unwrap().notified = false;
7579 }
7580
7581 assert!(!state.should_send_observation(0, later));
7583 }
7584
7585 #[test]
7586 fn respecting_negotiated_max_observation_rate_with_paths() {
7587 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7588 let now = Instant::now();
7589 let mut state = AddressDiscoveryState::new(&config, now);
7590
7591 state.observe_all_paths = true;
7593
7594 for i in 0..3 {
7596 state
7597 .path_addresses
7598 .insert(i, paths::PathAddressInfo::new());
7599 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
7600 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100 + i as u8)),
7601 5000,
7602 ));
7603 }
7604
7605 for _ in 0..3 {
7608 for i in 0..3 {
7610 if state.should_send_observation(i, now) {
7611 state.record_observation_sent(i);
7612 state.path_addresses.get_mut(&i).unwrap().notified = false;
7614 }
7615 }
7616 }
7617
7618 assert!(state.should_send_observation(0, now));
7621 state.record_observation_sent(0);
7622
7623 assert!(!state.should_send_observation(0, now));
7625 assert!(!state.should_send_observation(1, now));
7626 assert!(!state.should_send_observation(2, now));
7627 }
7628
7629 #[test]
7630 fn queue_observed_address_frame_basic() {
7631 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7632 let now = Instant::now();
7633 let mut state = AddressDiscoveryState::new(&config, now);
7634
7635 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7637 let frame = state.queue_observed_address_frame(0, address);
7638
7639 assert!(frame.is_some());
7641 let frame = frame.unwrap();
7642 assert_eq!(frame.address, address);
7643
7644 assert!(state.path_addresses.contains_key(&0));
7646 assert!(state.path_addresses.get(&0).unwrap().notified);
7647 }
7648
7649 #[test]
7650 fn queue_observed_address_frame_rate_limited() {
7651 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7652 let now = Instant::now();
7653 let mut state = AddressDiscoveryState::new(&config, now);
7654
7655 state.observe_all_paths = true;
7657
7658 let mut addresses = Vec::new();
7660 for i in 0..10 {
7661 let addr = SocketAddr::new(
7662 IpAddr::V4(Ipv4Addr::new(192, 168, 1, i as u8)),
7663 5000 + i as u16,
7664 );
7665 addresses.push(addr);
7666 assert!(
7667 state.queue_observed_address_frame(i as u64, addr).is_some(),
7668 "Frame {} should be allowed",
7669 i + 1
7670 );
7671 }
7672
7673 let addr11 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 11)), 5011);
7675 assert!(
7676 state.queue_observed_address_frame(10, addr11).is_none(),
7677 "11th frame should be rate limited"
7678 );
7679 }
7680
7681 #[test]
7682 fn queue_observed_address_frame_disabled() {
7683 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7684 let now = Instant::now();
7685 let mut state = AddressDiscoveryState::new(&config, now);
7686
7687 state.enabled = false;
7689
7690 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7691
7692 assert!(state.queue_observed_address_frame(0, address).is_none());
7694 }
7695
7696 #[test]
7697 fn queue_observed_address_frame_already_notified() {
7698 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7699 let now = Instant::now();
7700 let mut state = AddressDiscoveryState::new(&config, now);
7701
7702 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7703
7704 assert!(state.queue_observed_address_frame(0, address).is_some());
7706
7707 assert!(state.queue_observed_address_frame(0, address).is_none());
7709
7710 let new_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7712 assert!(state.queue_observed_address_frame(0, new_address).is_none());
7713 }
7714
7715 #[test]
7716 fn queue_observed_address_frame_primary_path_only() {
7717 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7718 let now = Instant::now();
7719 let mut state = AddressDiscoveryState::new(&config, now);
7720
7721 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7722
7723 assert!(state.queue_observed_address_frame(0, address).is_some());
7725
7726 assert!(state.queue_observed_address_frame(1, address).is_none());
7728 assert!(state.queue_observed_address_frame(2, address).is_none());
7729 }
7730
7731 #[test]
7732 fn queue_observed_address_frame_updates_path_info() {
7733 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7734 let now = Instant::now();
7735 let mut state = AddressDiscoveryState::new(&config, now);
7736
7737 let address = SocketAddr::new(
7738 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
7739 5000,
7740 );
7741
7742 let frame = state.queue_observed_address_frame(0, address);
7744 assert!(frame.is_some());
7745
7746 let path_info = state.path_addresses.get(&0).unwrap();
7748 assert_eq!(path_info.observed_address, Some(address));
7749 assert!(path_info.notified);
7750
7751 assert_eq!(state.observed_addresses.len(), 0);
7754 }
7755
7756 #[test]
7757 fn retransmits_includes_observed_addresses() {
7758 use crate::connection::spaces::Retransmits;
7759
7760 let mut retransmits = Retransmits::default();
7762
7763 assert!(retransmits.observed_addresses.is_empty());
7765
7766 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7768 let frame = frame::ObservedAddress {
7769 sequence_number: VarInt::from_u32(1),
7770 address,
7771 };
7772 retransmits.observed_addresses.push(frame);
7773
7774 assert_eq!(retransmits.observed_addresses.len(), 1);
7776 assert_eq!(retransmits.observed_addresses[0].address, address);
7777 }
7778
7779 #[test]
7780 fn check_for_address_observations_no_peer_support() {
7781 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7782 let now = Instant::now();
7783 let mut state = AddressDiscoveryState::new(&config, now);
7784
7785 state
7787 .path_addresses
7788 .insert(0, paths::PathAddressInfo::new());
7789 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7790 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)),
7791 5000,
7792 ));
7793
7794 let frames = state.check_for_address_observations(0, false, now);
7796
7797 assert!(frames.is_empty());
7799 }
7800
7801 #[test]
7802 fn check_for_address_observations_with_peer_support() {
7803 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7804 let now = Instant::now();
7805 let mut state = AddressDiscoveryState::new(&config, now);
7806
7807 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7809 state
7810 .path_addresses
7811 .insert(0, paths::PathAddressInfo::new());
7812 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(address);
7813
7814 let frames = state.check_for_address_observations(0, true, now);
7816
7817 assert_eq!(frames.len(), 1);
7819 assert_eq!(frames[0].address, address);
7820
7821 assert!(state.path_addresses.get(&0).unwrap().notified);
7823 }
7824
7825 #[test]
7826 fn check_for_address_observations_rate_limited() {
7827 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7828 let now = Instant::now();
7829 let mut state = AddressDiscoveryState::new(&config, now);
7830
7831 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7833 state
7834 .path_addresses
7835 .insert(0, paths::PathAddressInfo::new());
7836 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(address);
7837
7838 for _ in 0..10 {
7840 let frames = state.check_for_address_observations(0, true, now);
7841 if frames.is_empty() {
7842 break;
7843 }
7844 state.path_addresses.get_mut(&0).unwrap().notified = false;
7846 }
7847
7848 assert_eq!(state.rate_limiter.tokens, 0.0);
7850
7851 state.path_addresses.get_mut(&0).unwrap().notified = false;
7853
7854 let frames2 = state.check_for_address_observations(0, true, now);
7856 assert_eq!(frames2.len(), 0);
7857
7858 state.path_addresses.get_mut(&0).unwrap().notified = false;
7860
7861 let later = now + Duration::from_millis(200); let frames3 = state.check_for_address_observations(0, true, later);
7864 assert_eq!(frames3.len(), 1);
7865 }
7866
7867 #[test]
7868 fn check_for_address_observations_multiple_paths() {
7869 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7870 let now = Instant::now();
7871 let mut state = AddressDiscoveryState::new(&config, now);
7872
7873 state.observe_all_paths = true;
7875
7876 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7878 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7879
7880 state
7881 .path_addresses
7882 .insert(0, paths::PathAddressInfo::new());
7883 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(addr1);
7884
7885 state
7886 .path_addresses
7887 .insert(1, paths::PathAddressInfo::new());
7888 state.path_addresses.get_mut(&1).unwrap().observed_address = Some(addr2);
7889
7890 let frames = state.check_for_address_observations(0, true, now);
7892
7893 assert_eq!(frames.len(), 2);
7895
7896 let addresses: Vec<_> = frames.iter().map(|f| f.address).collect();
7898 assert!(addresses.contains(&addr1));
7899 assert!(addresses.contains(&addr2));
7900
7901 assert!(state.path_addresses.get(&0).unwrap().notified);
7903 assert!(state.path_addresses.get(&1).unwrap().notified);
7904 }
7905
7906 #[test]
7908 fn test_rate_limiter_configuration() {
7909 let state = AddressDiscoveryState::new_with_params(true, 10.0, false);
7911 assert_eq!(state.rate_limiter.rate, 10.0);
7912 assert_eq!(state.rate_limiter.max_tokens, 10.0);
7913 assert_eq!(state.rate_limiter.tokens, 10.0);
7914
7915 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
7916 assert_eq!(state.rate_limiter.rate, 63.0);
7917 assert_eq!(state.rate_limiter.max_tokens, 63.0);
7918 }
7919
7920 #[test]
7921 fn test_rate_limiter_update_configuration() {
7922 let mut state = AddressDiscoveryState::new_with_params(true, 5.0, false);
7923
7924 assert_eq!(state.rate_limiter.rate, 5.0);
7926
7927 state.update_rate_limit(10.0);
7929 assert_eq!(state.rate_limiter.rate, 10.0);
7930 assert_eq!(state.rate_limiter.max_tokens, 10.0);
7931
7932 state.rate_limiter.tokens = 15.0;
7934 state.update_rate_limit(8.0);
7935 assert_eq!(state.rate_limiter.tokens, 8.0);
7936 }
7937
7938 #[test]
7939 fn test_rate_limiter_from_transport_params() {
7940 let mut params = TransportParameters::default();
7941 params.address_discovery = Some(AddressDiscoveryConfig::SendAndReceive);
7942
7943 let state = AddressDiscoveryState::from_transport_params(¶ms);
7944 assert!(state.is_some());
7945 let state = state.unwrap();
7946 assert_eq!(state.rate_limiter.rate, 10.0); assert!(!state.observe_all_paths); }
7949
7950 #[test]
7951 fn test_rate_limiter_zero_rate() {
7952 let state = AddressDiscoveryState::new_with_params(true, 0.0, false);
7953 assert_eq!(state.rate_limiter.rate, 0.0);
7954 assert_eq!(state.rate_limiter.tokens, 0.0);
7955
7956 let address = "192.168.1.1:443".parse().unwrap();
7958 let mut state = AddressDiscoveryState::new_with_params(true, 0.0, false);
7959 let frame = state.queue_observed_address_frame(0, address);
7960 assert!(frame.is_none());
7961 }
7962
7963 #[test]
7964 fn test_rate_limiter_configuration_edge_cases() {
7965 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
7967 assert_eq!(state.rate_limiter.rate, 63.0);
7968
7969 let state = AddressDiscoveryState::new_with_params(true, 100.0, false);
7971 assert_eq!(state.rate_limiter.rate, 100.0);
7973
7974 let state = AddressDiscoveryState::new_with_params(true, 2.5, false);
7976 assert_eq!(state.rate_limiter.rate, 2.0);
7978 }
7979
7980 #[test]
7981 fn test_rate_limiter_runtime_update() {
7982 let mut state = AddressDiscoveryState::new_with_params(true, 10.0, false);
7983 let now = Instant::now();
7984
7985 state.rate_limiter.tokens = 5.0;
7987
7988 state.update_rate_limit(3.0);
7990
7991 assert_eq!(state.rate_limiter.tokens, 3.0);
7993 assert_eq!(state.rate_limiter.rate, 3.0);
7994 assert_eq!(state.rate_limiter.max_tokens, 3.0);
7995
7996 let later = now + Duration::from_secs(1);
7998 state.rate_limiter.update_tokens(later);
7999
8000 assert_eq!(state.rate_limiter.tokens, 3.0);
8002 }
8003
8004 #[test]
8006 fn test_address_discovery_state_initialization_default() {
8007 let now = Instant::now();
8009 let default_config = crate::transport_parameters::AddressDiscoveryConfig::default();
8010
8011 let address_discovery_state = Some(AddressDiscoveryState::new(&default_config, now));
8014
8015 assert!(address_discovery_state.is_some());
8016 let state = address_discovery_state.unwrap();
8017
8018 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
8022 }
8023
8024 #[test]
8025 fn test_address_discovery_state_initialization_on_handshake() {
8026 let now = Instant::now();
8028
8029 let mut address_discovery_state = Some(AddressDiscoveryState::new(
8031 &crate::transport_parameters::AddressDiscoveryConfig::default(),
8032 now,
8033 ));
8034
8035 let peer_params = TransportParameters {
8037 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8038 ..TransportParameters::default()
8039 };
8040
8041 if let Some(peer_config) = &peer_params.address_discovery {
8043 address_discovery_state = Some(AddressDiscoveryState::new(peer_config, now));
8045 }
8046
8047 assert!(address_discovery_state.is_some());
8049 let state = address_discovery_state.unwrap();
8050 assert!(state.enabled);
8051 assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths); }
8055
8056 #[test]
8057 fn test_address_discovery_negotiation_disabled_peer() {
8058 let now = Instant::now();
8060
8061 let our_config = AddressDiscoveryConfig::SendAndReceive;
8063 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8064
8065 let peer_params = TransportParameters {
8067 address_discovery: None,
8068 ..TransportParameters::default()
8069 };
8070
8071 if peer_params.address_discovery.is_none() {
8073 if let Some(state) = &mut address_discovery_state {
8074 state.enabled = false;
8075 }
8076 }
8077
8078 let state = address_discovery_state.unwrap();
8080 assert!(!state.enabled); }
8082
8083 #[test]
8084 fn test_address_discovery_negotiation_rate_limiting() {
8085 let now = Instant::now();
8087
8088 let our_config = AddressDiscoveryConfig::SendAndReceive;
8090 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8091
8092 if let Some(state) = &mut address_discovery_state {
8094 state.max_observation_rate = 30;
8095 state.update_rate_limit(30.0);
8096 }
8097
8098 let peer_params = TransportParameters {
8100 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8101 ..TransportParameters::default()
8102 };
8103
8104 if let (Some(state), Some(_peer_config)) =
8107 (&mut address_discovery_state, &peer_params.address_discovery)
8108 {
8109 let peer_rate = 15u8;
8112 let negotiated_rate = state.max_observation_rate.min(peer_rate);
8113 state.update_rate_limit(negotiated_rate as f64);
8114 }
8115
8116 let state = address_discovery_state.unwrap();
8118 assert_eq!(state.rate_limiter.rate, 15.0); }
8120
8121 #[test]
8122 fn test_address_discovery_path_initialization() {
8123 let now = Instant::now();
8125 let config = AddressDiscoveryConfig::SendAndReceive;
8126 let mut state = AddressDiscoveryState::new(&config, now);
8127
8128 assert!(state.path_addresses.is_empty());
8130
8131 let should_send = state.should_send_observation(0, now);
8133 assert!(should_send); }
8138
8139 #[test]
8140 fn test_address_discovery_multiple_path_initialization() {
8141 let now = Instant::now();
8143 let config = AddressDiscoveryConfig::SendAndReceive;
8144 let mut state = AddressDiscoveryState::new(&config, now);
8145
8146 assert!(state.should_send_observation(0, now)); assert!(!state.should_send_observation(1, now)); assert!(!state.should_send_observation(2, now)); state.observe_all_paths = true;
8153 assert!(state.should_send_observation(1, now)); assert!(state.should_send_observation(2, now)); let config_primary_only = AddressDiscoveryConfig::SendAndReceive;
8158 let mut state_primary = AddressDiscoveryState::new(&config_primary_only, now);
8159
8160 assert!(state_primary.should_send_observation(0, now)); assert!(!state_primary.should_send_observation(1, now)); }
8163
8164 #[test]
8165 fn test_handle_observed_address_frame_valid() {
8166 let now = Instant::now();
8168 let config = AddressDiscoveryConfig::SendAndReceive;
8169 let mut state = AddressDiscoveryState::new(&config, now);
8170
8171 let observed_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8173 state.handle_observed_address(observed_addr, 0, now);
8174
8175 assert_eq!(state.observed_addresses.len(), 1);
8177 assert_eq!(state.observed_addresses[0].address, observed_addr);
8178 assert_eq!(state.observed_addresses[0].path_id, 0);
8179 assert_eq!(state.observed_addresses[0].received_at, now);
8180
8181 let path_info = state.path_addresses.get(&0).unwrap();
8183 assert_eq!(path_info.observed_address, Some(observed_addr));
8184 assert_eq!(path_info.last_observed, Some(now));
8185 assert_eq!(path_info.observation_count, 1);
8186 }
8187
8188 #[test]
8189 fn test_handle_multiple_observed_addresses() {
8190 let now = Instant::now();
8192 let config = AddressDiscoveryConfig::SendAndReceive;
8193 let mut state = AddressDiscoveryState::new(&config, now);
8194
8195 let addr1 = SocketAddr::from(([192, 168, 1, 100], 5000));
8197 let addr2 = SocketAddr::from(([10, 0, 0, 50], 6000));
8198 let addr3 = SocketAddr::from(([192, 168, 1, 100], 7000)); state.handle_observed_address(addr1, 0, now);
8201 state.handle_observed_address(addr2, 1, now);
8202 state.handle_observed_address(addr3, 0, now + Duration::from_millis(100));
8203
8204 assert_eq!(state.observed_addresses.len(), 3);
8206
8207 let path0_info = state.path_addresses.get(&0).unwrap();
8209 assert_eq!(path0_info.observed_address, Some(addr3));
8210 assert_eq!(path0_info.observation_count, 1); let path1_info = state.path_addresses.get(&1).unwrap();
8214 assert_eq!(path1_info.observed_address, Some(addr2));
8215 assert_eq!(path1_info.observation_count, 1);
8216 }
8217
8218 #[test]
8219 fn test_get_observed_address() {
8220 let now = Instant::now();
8222 let config = AddressDiscoveryConfig::SendAndReceive;
8223 let mut state = AddressDiscoveryState::new(&config, now);
8224
8225 assert_eq!(state.get_observed_address(0), None);
8227
8228 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8230 state.handle_observed_address(addr, 0, now);
8231
8232 assert_eq!(state.get_observed_address(0), Some(addr));
8234
8235 assert_eq!(state.get_observed_address(999), None);
8237 }
8238
8239 #[test]
8240 fn test_has_unnotified_changes() {
8241 let now = Instant::now();
8243 let config = AddressDiscoveryConfig::SendAndReceive;
8244 let mut state = AddressDiscoveryState::new(&config, now);
8245
8246 assert!(!state.has_unnotified_changes());
8248
8249 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8251 state.handle_observed_address(addr, 0, now);
8252 assert!(state.has_unnotified_changes());
8253
8254 if let Some(path_info) = state.path_addresses.get_mut(&0) {
8256 path_info.notified = true;
8257 }
8258 assert!(!state.has_unnotified_changes());
8259
8260 let addr2 = SocketAddr::from(([192, 168, 1, 100], 6000));
8262 state.handle_observed_address(addr2, 0, now + Duration::from_secs(1));
8263 assert!(state.has_unnotified_changes());
8264 }
8265
8266 #[test]
8267 fn test_address_discovery_disabled() {
8268 let now = Instant::now();
8270 let config = AddressDiscoveryConfig::SendAndReceive;
8271 let mut state = AddressDiscoveryState::new(&config, now);
8272
8273 state.enabled = false;
8275
8276 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8278 state.handle_observed_address(addr, 0, now);
8279
8280 assert_eq!(state.observed_addresses.len(), 0);
8282
8283 assert!(!state.should_send_observation(0, now));
8285 }
8286
8287 #[test]
8288 fn test_rate_limiting_basic() {
8289 let now = Instant::now();
8291 let config = AddressDiscoveryConfig::SendAndReceive;
8292 let mut state = AddressDiscoveryState::new(&config, now);
8293
8294 state.observe_all_paths = true;
8296 state.rate_limiter.set_rate(2); assert!(state.should_send_observation(0, now));
8300 state.record_observation_sent(0);
8302
8303 assert!(state.should_send_observation(1, now));
8305 state.record_observation_sent(1);
8306
8307 assert!(!state.should_send_observation(2, now));
8309
8310 let later = now + Duration::from_millis(500);
8312 assert!(state.should_send_observation(3, later));
8313 state.record_observation_sent(3);
8314
8315 assert!(!state.should_send_observation(4, later));
8317
8318 let _one_sec_later = now + Duration::from_secs(1);
8322 let two_sec_later = now + Duration::from_secs(2);
8326 assert!(state.should_send_observation(5, two_sec_later));
8327 state.record_observation_sent(5);
8328
8329 assert!(state.should_send_observation(6, two_sec_later));
8340 state.record_observation_sent(6);
8341
8342 assert!(
8344 !state.should_send_observation(7, two_sec_later),
8345 "Expected no tokens available"
8346 );
8347 }
8348
8349 #[test]
8350 fn test_rate_limiting_per_path() {
8351 let now = Instant::now();
8353 let config = AddressDiscoveryConfig::SendAndReceive;
8354 let mut state = AddressDiscoveryState::new(&config, now);
8355
8356 state
8358 .path_addresses
8359 .insert(0, paths::PathAddressInfo::new());
8360 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
8361 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
8362 8080,
8363 ));
8364
8365 for _ in 0..10 {
8367 assert!(state.should_send_observation(0, now));
8368 state.record_observation_sent(0);
8369 state.path_addresses.get_mut(&0).unwrap().notified = false;
8371 }
8372
8373 assert!(!state.should_send_observation(0, now));
8375
8376 let later = now + Duration::from_millis(100);
8378 assert!(state.should_send_observation(0, later));
8379 state.record_observation_sent(0);
8380
8381 state.path_addresses.get_mut(&0).unwrap().notified = false;
8383
8384 assert!(!state.should_send_observation(0, later));
8386 }
8387
8388 #[test]
8389 fn test_rate_limiting_zero_rate() {
8390 let now = Instant::now();
8392 let config = AddressDiscoveryConfig::SendAndReceive;
8393 let mut state = AddressDiscoveryState::new(&config, now);
8394
8395 state.rate_limiter.set_rate(0);
8397 state.rate_limiter.tokens = 0.0;
8398 state.rate_limiter.max_tokens = 0.0;
8399
8400 assert!(!state.should_send_observation(0, now));
8402 assert!(!state.should_send_observation(0, now + Duration::from_secs(10)));
8403 assert!(!state.should_send_observation(0, now + Duration::from_secs(100)));
8404 }
8405
8406 #[test]
8407 fn test_rate_limiting_update() {
8408 let now = Instant::now();
8410 let config = AddressDiscoveryConfig::SendAndReceive;
8411 let mut state = AddressDiscoveryState::new(&config, now);
8412
8413 state.observe_all_paths = true;
8415
8416 for i in 0..12 {
8418 state
8419 .path_addresses
8420 .insert(i, paths::PathAddressInfo::new());
8421 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
8422 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8423 8080,
8424 ));
8425 }
8426
8427 for i in 0..10 {
8430 assert!(state.should_send_observation(i, now));
8431 state.record_observation_sent(i);
8432 }
8433 assert!(!state.should_send_observation(10, now));
8435
8436 state.update_rate_limit(20.0);
8438
8439 let later = now + Duration::from_millis(50);
8442 assert!(state.should_send_observation(10, later));
8443 state.record_observation_sent(10);
8444
8445 let later2 = now + Duration::from_millis(100);
8447 assert!(state.should_send_observation(11, later2));
8448 }
8449
8450 #[test]
8451 fn test_rate_limiting_burst() {
8452 let now = Instant::now();
8454 let config = AddressDiscoveryConfig::SendAndReceive;
8455 let mut state = AddressDiscoveryState::new(&config, now);
8456
8457 for _ in 0..10 {
8459 assert!(state.should_send_observation(0, now));
8460 state.record_observation_sent(0);
8461 }
8462
8463 assert!(!state.should_send_observation(0, now));
8465
8466 let later = now + Duration::from_millis(100);
8468 assert!(state.should_send_observation(0, later));
8469 state.record_observation_sent(0);
8470 assert!(!state.should_send_observation(0, later));
8471 }
8472
8473 #[test]
8474 fn test_connection_rate_limiting_with_check_observations() {
8475 let now = Instant::now();
8477 let config = AddressDiscoveryConfig::SendAndReceive;
8478 let mut state = AddressDiscoveryState::new(&config, now);
8479
8480 let mut path_info = paths::PathAddressInfo::new();
8482 path_info.update_observed_address(
8483 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
8484 now,
8485 );
8486 state.path_addresses.insert(0, path_info);
8487
8488 let frame1 =
8490 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8491 assert!(frame1.is_some());
8492 state.record_observation_sent(0);
8493
8494 if let Some(info) = state.path_addresses.get_mut(&0) {
8496 info.notified = false;
8497 }
8498
8499 for _ in 1..10 {
8501 if let Some(info) = state.path_addresses.get_mut(&0) {
8503 info.notified = false;
8504 }
8505 let frame =
8506 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8507 assert!(frame.is_some());
8508 state.record_observation_sent(0);
8509 }
8510
8511 if let Some(info) = state.path_addresses.get_mut(&0) {
8513 info.notified = false;
8514 }
8515 let frame3 =
8516 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8517 assert!(frame3.is_none()); let later = now + Duration::from_millis(100);
8521 state.rate_limiter.update_tokens(later); if let Some(info) = state.path_addresses.get_mut(&0) {
8525 info.notified = false;
8526 }
8527
8528 let frame4 =
8529 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8530 assert!(frame4.is_some()); }
8532
8533 #[test]
8534 fn test_queue_observed_address_frame() {
8535 let now = Instant::now();
8537 let config = AddressDiscoveryConfig::SendAndReceive;
8538 let mut state = AddressDiscoveryState::new(&config, now);
8539
8540 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8541
8542 let frame = state.queue_observed_address_frame(0, addr);
8544 assert!(frame.is_some());
8545 assert_eq!(frame.unwrap().address, addr);
8546
8547 state.record_observation_sent(0);
8549
8550 for i in 0..9 {
8552 if let Some(info) = state.path_addresses.get_mut(&0) {
8554 info.notified = false;
8555 }
8556
8557 let frame = state.queue_observed_address_frame(0, addr);
8558 assert!(frame.is_some(), "Frame {} should be allowed", i + 2);
8559 state.record_observation_sent(0);
8560 }
8561
8562 if let Some(info) = state.path_addresses.get_mut(&0) {
8564 info.notified = false;
8565 }
8566
8567 let frame = state.queue_observed_address_frame(0, addr);
8569 assert!(frame.is_none(), "11th frame should be rate limited");
8570 }
8571
8572 #[test]
8573 fn test_multi_path_basic() {
8574 let now = Instant::now();
8576 let config = AddressDiscoveryConfig::SendAndReceive;
8577 let mut state = AddressDiscoveryState::new(&config, now);
8578
8579 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8580 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8581 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8582
8583 state.handle_observed_address(addr1, 0, now);
8585 state.handle_observed_address(addr2, 1, now);
8586 state.handle_observed_address(addr3, 2, now);
8587
8588 assert_eq!(state.get_observed_address(0), Some(addr1));
8590 assert_eq!(state.get_observed_address(1), Some(addr2));
8591 assert_eq!(state.get_observed_address(2), Some(addr3));
8592
8593 assert!(state.has_unnotified_changes());
8595
8596 assert_eq!(state.observed_addresses.len(), 3);
8598 }
8599
8600 #[test]
8601 fn test_multi_path_observe_primary_only() {
8602 let now = Instant::now();
8604 let config = AddressDiscoveryConfig::SendAndReceive;
8605 let mut state = AddressDiscoveryState::new(&config, now);
8606
8607 assert!(state.should_send_observation(0, now));
8609 state.record_observation_sent(0);
8610
8611 assert!(!state.should_send_observation(1, now));
8613 assert!(!state.should_send_observation(2, now));
8614
8615 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8617 assert!(state.queue_observed_address_frame(0, addr).is_some());
8618 assert!(state.queue_observed_address_frame(1, addr).is_none());
8619 assert!(state.queue_observed_address_frame(2, addr).is_none());
8620 }
8621
8622 #[test]
8623 fn test_multi_path_rate_limiting() {
8624 let now = Instant::now();
8626 let config = AddressDiscoveryConfig::SendAndReceive;
8627 let mut state = AddressDiscoveryState::new(&config, now);
8628
8629 state.observe_all_paths = true;
8631
8632 for i in 0..21 {
8634 state
8635 .path_addresses
8636 .insert(i, paths::PathAddressInfo::new());
8637 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
8638 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8639 8080,
8640 ));
8641 }
8642
8643 for i in 0..10 {
8645 assert!(state.should_send_observation(i, now));
8646 state.record_observation_sent(i);
8647 }
8648
8649 assert!(!state.should_send_observation(10, now));
8651
8652 state.path_addresses.get_mut(&0).unwrap().notified = false;
8654 assert!(!state.should_send_observation(0, now)); let later = now + Duration::from_secs(1);
8658 for i in 10..20 {
8659 assert!(state.should_send_observation(i, later));
8660 state.record_observation_sent(i);
8661 }
8662 assert!(!state.should_send_observation(20, later));
8664 }
8665
8666 #[test]
8667 fn test_multi_path_address_changes() {
8668 let now = Instant::now();
8670 let config = AddressDiscoveryConfig::SendAndReceive;
8671 let mut state = AddressDiscoveryState::new(&config, now);
8672
8673 let addr1a = SocketAddr::from(([192, 168, 1, 1], 5000));
8674 let addr1b = SocketAddr::from(([192, 168, 1, 2], 5000));
8675 let addr2a = SocketAddr::from(([10, 0, 0, 1], 6000));
8676 let addr2b = SocketAddr::from(([10, 0, 0, 2], 6000));
8677
8678 state.handle_observed_address(addr1a, 0, now);
8680 state.handle_observed_address(addr2a, 1, now);
8681
8682 state.record_observation_sent(0);
8684 state.record_observation_sent(1);
8685 assert!(!state.has_unnotified_changes());
8686
8687 state.handle_observed_address(addr1b, 0, now + Duration::from_secs(1));
8689 assert!(state.has_unnotified_changes());
8690
8691 assert_eq!(state.get_observed_address(0), Some(addr1b));
8693 assert_eq!(state.get_observed_address(1), Some(addr2a));
8694
8695 state.record_observation_sent(0);
8697 assert!(!state.has_unnotified_changes());
8698
8699 state.handle_observed_address(addr2b, 1, now + Duration::from_secs(2));
8701 assert!(state.has_unnotified_changes());
8702 }
8703
8704 #[test]
8705 fn test_multi_path_migration() {
8706 let now = Instant::now();
8708 let config = AddressDiscoveryConfig::SendAndReceive;
8709 let mut state = AddressDiscoveryState::new(&config, now);
8710
8711 let addr_old = SocketAddr::from(([192, 168, 1, 1], 5000));
8712 let addr_new = SocketAddr::from(([10, 0, 0, 1], 6000));
8713
8714 state.handle_observed_address(addr_old, 0, now);
8716 assert_eq!(state.get_observed_address(0), Some(addr_old));
8717
8718 state.handle_observed_address(addr_new, 1, now + Duration::from_secs(1));
8720
8721 assert_eq!(state.get_observed_address(0), Some(addr_old));
8723 assert_eq!(state.get_observed_address(1), Some(addr_new));
8724
8725 assert_eq!(state.path_addresses.len(), 2);
8728 }
8729
8730 #[test]
8731 fn test_check_for_address_observations_multi_path() {
8732 let now = Instant::now();
8734 let config = AddressDiscoveryConfig::SendAndReceive;
8735 let mut state = AddressDiscoveryState::new(&config, now);
8736
8737 state.observe_all_paths = true;
8739
8740 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8742 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8743 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8744
8745 state.handle_observed_address(addr1, 0, now);
8746 state.handle_observed_address(addr2, 1, now);
8747 state.handle_observed_address(addr3, 2, now);
8748
8749 let frames = state.check_for_address_observations(0, true, now);
8751
8752 assert_eq!(frames.len(), 3);
8754
8755 let frame_addrs: Vec<_> = frames.iter().map(|f| f.address).collect();
8757 assert!(frame_addrs.contains(&addr1), "addr1 should be in frames");
8758 assert!(frame_addrs.contains(&addr2), "addr2 should be in frames");
8759 assert!(frame_addrs.contains(&addr3), "addr3 should be in frames");
8760
8761 assert!(!state.has_unnotified_changes());
8763 }
8764
8765 #[test]
8766 fn test_multi_path_with_peer_not_supporting() {
8767 let now = Instant::now();
8769 let config = AddressDiscoveryConfig::SendAndReceive;
8770 let mut state = AddressDiscoveryState::new(&config, now);
8771
8772 state.handle_observed_address(SocketAddr::from(([192, 168, 1, 1], 5000)), 0, now);
8774 state.handle_observed_address(SocketAddr::from(([10, 0, 0, 1], 6000)), 1, now);
8775
8776 let frames = state.check_for_address_observations(0, false, now);
8778 assert_eq!(frames.len(), 0);
8779
8780 assert!(state.has_unnotified_changes());
8782 }
8783
8784 #[test]
8786 fn test_bootstrap_node_aggressive_observation_mode() {
8787 let config = AddressDiscoveryConfig::SendAndReceive;
8789 let now = Instant::now();
8790 let mut state = AddressDiscoveryState::new(&config, now);
8791
8792 assert!(!state.is_bootstrap_mode());
8794
8795 state.set_bootstrap_mode(true);
8797 assert!(state.is_bootstrap_mode());
8798
8799 assert!(state.should_observe_path(0)); assert!(state.should_observe_path(1)); assert!(state.should_observe_path(2));
8803
8804 let bootstrap_rate = state.get_effective_rate_limit();
8806 assert!(bootstrap_rate > 10.0); }
8808
8809 #[test]
8810 fn test_bootstrap_node_immediate_observation() {
8811 let config = AddressDiscoveryConfig::SendAndReceive;
8813 let now = Instant::now();
8814 let mut state = AddressDiscoveryState::new(&config, now);
8815 state.set_bootstrap_mode(true);
8816
8817 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8819 state.handle_observed_address(addr, 0, now);
8820
8821 assert!(state.should_send_observation_immediately(true));
8823
8824 assert!(state.should_send_observation(0, now));
8826
8827 let frame = state.queue_observed_address_frame(0, addr);
8829 assert!(frame.is_some());
8830 }
8831
8832 #[test]
8833 fn test_bootstrap_node_multiple_path_observations() {
8834 let config = AddressDiscoveryConfig::SendAndReceive;
8836 let now = Instant::now();
8837 let mut state = AddressDiscoveryState::new(&config, now);
8838 state.set_bootstrap_mode(true);
8839
8840 let addrs = vec![
8842 (0, SocketAddr::from(([192, 168, 1, 1], 5000))),
8843 (1, SocketAddr::from(([10, 0, 0, 1], 6000))),
8844 (2, SocketAddr::from(([172, 16, 0, 1], 7000))),
8845 ];
8846
8847 for (path_id, addr) in &addrs {
8848 state.handle_observed_address(*addr, *path_id, now);
8849 }
8850
8851 let frames = state.check_for_address_observations(0, true, now);
8853 assert_eq!(frames.len(), 3);
8854
8855 for (_, addr) in &addrs {
8857 assert!(frames.iter().any(|f| f.address == *addr));
8858 }
8859 }
8860
8861 #[test]
8862 fn test_bootstrap_node_rate_limit_override() {
8863 let config = AddressDiscoveryConfig::SendAndReceive;
8865 let now = Instant::now();
8866 let mut state = AddressDiscoveryState::new(&config, now);
8867 state.set_bootstrap_mode(true);
8868
8869 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8871
8872 for i in 0..10 {
8874 state.handle_observed_address(addr, i, now);
8875 let can_send = state.should_send_observation(i, now);
8876 assert!(can_send, "Bootstrap node should send observation {i}");
8877 state.record_observation_sent(i);
8878 }
8879 }
8880
8881 #[test]
8882 fn test_bootstrap_node_configuration() {
8883 let config = AddressDiscoveryConfig::SendAndReceive;
8885 let mut state = AddressDiscoveryState::new(&config, Instant::now());
8886
8887 state.set_bootstrap_mode(true);
8889
8890 assert!(state.bootstrap_mode);
8892 assert!(state.enabled);
8893
8894 let effective_rate = state.get_effective_rate_limit();
8896 assert!(effective_rate > state.max_observation_rate as f64);
8897 }
8898
8899 #[test]
8900 fn test_bootstrap_node_persistent_observation() {
8901 let config = AddressDiscoveryConfig::SendAndReceive;
8903 let mut now = Instant::now();
8904 let mut state = AddressDiscoveryState::new(&config, now);
8905 state.set_bootstrap_mode(true);
8906
8907 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8908 let addr2 = SocketAddr::from(([192, 168, 1, 2], 5000));
8909
8910 state.handle_observed_address(addr1, 0, now);
8912 assert!(state.should_send_observation(0, now));
8913 state.record_observation_sent(0);
8914
8915 now += Duration::from_secs(60);
8917 state.handle_observed_address(addr2, 0, now);
8918
8919 assert!(state.should_send_observation(0, now));
8921 }
8922
8923 #[test]
8924 fn test_bootstrap_node_multi_peer_support() {
8925 let config = AddressDiscoveryConfig::SendAndReceive;
8928 let now = Instant::now();
8929 let mut state = AddressDiscoveryState::new(&config, now);
8930 state.set_bootstrap_mode(true);
8931
8932 let peer_addresses = vec![
8934 (0, SocketAddr::from(([192, 168, 1, 1], 5000))), (1, SocketAddr::from(([10, 0, 0, 1], 6000))), (2, SocketAddr::from(([172, 16, 0, 1], 7000))), (3, SocketAddr::from(([192, 168, 2, 1], 8000))), ];
8939
8940 for (path_id, addr) in &peer_addresses {
8942 state.handle_observed_address(*addr, *path_id, now);
8943 }
8944
8945 let frames = state.check_for_address_observations(0, true, now);
8947 assert_eq!(frames.len(), peer_addresses.len());
8948
8949 for (_, addr) in &peer_addresses {
8951 assert!(frames.iter().any(|f| f.address == *addr));
8952 }
8953 }
8954
8955 mod address_discovery_tests {
8957 include!("address_discovery_tests.rs");
8958 }
8959}