1#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
9use std::{
10 cmp,
11 collections::VecDeque,
12 convert::TryFrom,
13 fmt, io, mem,
14 net::{IpAddr, SocketAddr},
15 sync::Arc,
16};
17
18use bytes::{Bytes, BytesMut};
19use frame::StreamMetaVec;
20use rand::{Rng, SeedableRng, rngs::StdRng};
23use thiserror::Error;
24use tracing::{debug, error, info, trace, trace_span, warn};
25
26use crate::{
27 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
28 MIN_INITIAL_SIZE, MtuDiscoveryConfig, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
29 TransportError, TransportErrorCode, VarInt,
30 cid_generator::ConnectionIdGenerator,
31 cid_queue::CidQueue,
32 coding::BufMutExt,
33 config::{ServerConfig, TransportConfig},
34 crypto::{self, KeyPair, Keys, PacketKey},
35 endpoint::AddressDiscoveryStats,
36 frame::{self, Close, Datagram, FrameStruct, NewToken},
37 nat_traversal_api::PeerId,
38 packet::{
39 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
40 PacketNumber, PartialDecode, SpaceId,
41 },
42 range_set::ArrayRangeSet,
43 shared::{
44 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
45 EndpointEvent, EndpointEventInner,
46 },
47 token::{ResetToken, Token, TokenPayload},
48 transport_parameters::TransportParameters,
49};
50
51mod ack_frequency;
52use ack_frequency::AckFrequencyState;
53
54pub(crate) mod nat_traversal;
55use nat_traversal::NatTraversalState;
56pub(crate) use nat_traversal::{CoordinationPhase, NatTraversalError};
58
59mod assembler;
60pub use assembler::Chunk;
61
62mod cid_state;
63use cid_state::CidState;
64
65mod datagrams;
66use datagrams::DatagramState;
67pub use datagrams::{Datagrams, SendDatagramError};
68
69mod mtud;
70use mtud::MtuDiscovery;
71
72mod pacing;
73
74mod packet_builder;
75use packet_builder::PacketBuilder;
76
77mod packet_crypto;
78use packet_crypto::{PrevCrypto, ZeroRttCrypto};
79
80mod paths;
81pub use paths::RttEstimator;
82use paths::{NatTraversalChallenges, PathData, PathResponses};
83
84mod send_buffer;
85
86mod spaces;
87#[cfg(fuzzing)]
88pub use spaces::Retransmits;
89#[cfg(not(fuzzing))]
90use spaces::Retransmits;
91use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
92
93mod stats;
94pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
95
96mod streams;
97#[cfg(fuzzing)]
98pub use streams::StreamsState;
99#[cfg(not(fuzzing))]
100use streams::StreamsState;
101pub use streams::{
102 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
103 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
104};
105
106mod timer;
107use crate::congestion::Controller;
108use timer::{Timer, TimerTable};
109
110pub struct Connection {
150 endpoint_config: Arc<EndpointConfig>,
151 config: Arc<TransportConfig>,
152 rng: StdRng,
153 crypto: Box<dyn crypto::Session>,
154 handshake_cid: ConnectionId,
156 rem_handshake_cid: ConnectionId,
158 local_ip: Option<IpAddr>,
161 path: PathData,
162 allow_mtud: bool,
164 prev_path: Option<(ConnectionId, PathData)>,
165 state: State,
166 side: ConnectionSide,
167 zero_rtt_enabled: bool,
169 zero_rtt_crypto: Option<ZeroRttCrypto>,
171 key_phase: bool,
172 key_phase_size: u64,
174 peer_params: TransportParameters,
176 orig_rem_cid: ConnectionId,
178 initial_dst_cid: ConnectionId,
180 retry_src_cid: Option<ConnectionId>,
183 lost_packets: u64,
185 events: VecDeque<Event>,
186 endpoint_events: VecDeque<EndpointEventInner>,
187 spin_enabled: bool,
189 spin: bool,
191 spaces: [PacketSpace; 3],
193 highest_space: SpaceId,
195 prev_crypto: Option<PrevCrypto>,
197 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
202 accepted_0rtt: bool,
203 permit_idle_reset: bool,
205 idle_timeout: Option<Duration>,
207 timers: TimerTable,
208 authentication_failures: u64,
210 error: Option<ConnectionError>,
212 packet_number_filter: PacketNumberFilter,
214
215 path_responses: PathResponses,
220 nat_traversal_challenges: NatTraversalChallenges,
222 close: bool,
223
224 ack_frequency: AckFrequencyState,
228
229 pto_count: u32,
234
235 receiving_ecn: bool,
240 total_authed_packets: u64,
242 app_limited: bool,
245
246 streams: StreamsState,
247 rem_cids: CidQueue,
249 local_cid_state: CidState,
251 datagrams: DatagramState,
253 stats: ConnectionStats,
255 version: u32,
257
258 nat_traversal: Option<NatTraversalState>,
260
261 nat_traversal_frame_config: frame::nat_traversal_unified::NatTraversalFrameConfig,
263
264 address_discovery_state: Option<AddressDiscoveryState>,
266
267 pqc_state: PqcState,
269
270 #[cfg(feature = "trace")]
272 trace_context: crate::tracing::TraceContext,
273
274 #[cfg(feature = "trace")]
276 event_log: Arc<crate::tracing::EventLog>,
277
278 #[cfg(feature = "__qlog")]
280 qlog_streamer: Option<Box<dyn std::io::Write + Send + Sync>>,
281
282 peer_id_for_tokens: Option<PeerId>,
284 delay_new_token_until_binding: bool,
287}
288
289impl Connection {
290 pub(crate) fn new(
291 endpoint_config: Arc<EndpointConfig>,
292 config: Arc<TransportConfig>,
293 init_cid: ConnectionId,
294 loc_cid: ConnectionId,
295 rem_cid: ConnectionId,
296 remote: SocketAddr,
297 local_ip: Option<IpAddr>,
298 crypto: Box<dyn crypto::Session>,
299 cid_gen: &dyn ConnectionIdGenerator,
300 now: Instant,
301 version: u32,
302 allow_mtud: bool,
303 rng_seed: [u8; 32],
304 side_args: SideArgs,
305 ) -> Self {
306 let pref_addr_cid = side_args.pref_addr_cid();
307 let path_validated = side_args.path_validated();
308 let connection_side = ConnectionSide::from(side_args);
309 let side = connection_side.side();
310 let initial_space = PacketSpace {
311 crypto: Some(crypto.initial_keys(&init_cid, side)),
312 ..PacketSpace::new(now)
313 };
314 let state = State::Handshake(state::Handshake {
315 rem_cid_set: side.is_server(),
316 expected_token: Bytes::new(),
317 client_hello: None,
318 });
319 let mut rng = StdRng::from_seed(rng_seed);
320 let mut this = Self {
321 endpoint_config,
322 crypto,
323 handshake_cid: loc_cid,
324 rem_handshake_cid: rem_cid,
325 local_cid_state: CidState::new(
326 cid_gen.cid_len(),
327 cid_gen.cid_lifetime(),
328 now,
329 if pref_addr_cid.is_some() { 2 } else { 1 },
330 ),
331 path: PathData::new(remote, allow_mtud, None, now, &config),
332 allow_mtud,
333 local_ip,
334 prev_path: None,
335 state,
336 side: connection_side,
337 zero_rtt_enabled: false,
338 zero_rtt_crypto: None,
339 key_phase: false,
340 key_phase_size: rng.gen_range(10..1000),
347 peer_params: TransportParameters::default(),
348 orig_rem_cid: rem_cid,
349 initial_dst_cid: init_cid,
350 retry_src_cid: None,
351 lost_packets: 0,
352 events: VecDeque::new(),
353 endpoint_events: VecDeque::new(),
354 spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
355 spin: false,
356 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
357 highest_space: SpaceId::Initial,
358 prev_crypto: None,
359 next_crypto: None,
360 accepted_0rtt: false,
361 permit_idle_reset: true,
362 idle_timeout: match config.max_idle_timeout {
363 None | Some(VarInt(0)) => None,
364 Some(dur) => Some(Duration::from_millis(dur.0)),
365 },
366 timers: TimerTable::default(),
367 authentication_failures: 0,
368 error: None,
369 #[cfg(test)]
370 packet_number_filter: match config.deterministic_packet_numbers {
371 false => PacketNumberFilter::new(&mut rng),
372 true => PacketNumberFilter::disabled(),
373 },
374 #[cfg(not(test))]
375 packet_number_filter: PacketNumberFilter::new(&mut rng),
376
377 path_responses: PathResponses::default(),
378 nat_traversal_challenges: NatTraversalChallenges::default(),
379 close: false,
380
381 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
382 &TransportParameters::default(),
383 )),
384
385 pto_count: 0,
386
387 app_limited: false,
388 receiving_ecn: false,
389 total_authed_packets: 0,
390
391 streams: StreamsState::new(
392 side,
393 config.max_concurrent_uni_streams,
394 config.max_concurrent_bidi_streams,
395 config.send_window,
396 config.receive_window,
397 config.stream_receive_window,
398 ),
399 datagrams: DatagramState::default(),
400 config,
401 rem_cids: CidQueue::new(rem_cid),
402 rng,
403 stats: ConnectionStats::default(),
404 version,
405 nat_traversal: None, nat_traversal_frame_config:
407 frame::nat_traversal_unified::NatTraversalFrameConfig::default(),
408 address_discovery_state: {
409 Some(AddressDiscoveryState::new(
412 &crate::transport_parameters::AddressDiscoveryConfig::default(),
413 now,
414 ))
415 },
416 pqc_state: PqcState::new(),
417
418 #[cfg(feature = "trace")]
419 trace_context: crate::tracing::TraceContext::new(crate::tracing::TraceId::new()),
420
421 #[cfg(feature = "trace")]
422 event_log: crate::tracing::global_log(),
423
424 #[cfg(feature = "__qlog")]
425 qlog_streamer: None,
426
427 peer_id_for_tokens: None,
428 delay_new_token_until_binding: false,
429 };
430
431 #[cfg(feature = "trace")]
433 {
434 use crate::trace_event;
435 use crate::tracing::{Event, EventData, socket_addr_to_bytes, timestamp_now};
436 let _peer_id = {
438 let mut id = [0u8; 32];
439 let addr_bytes = match remote {
440 SocketAddr::V4(addr) => addr.ip().octets().to_vec(),
441 SocketAddr::V6(addr) => addr.ip().octets().to_vec(),
442 };
443 id[..addr_bytes.len().min(32)]
444 .copy_from_slice(&addr_bytes[..addr_bytes.len().min(32)]);
445 id
446 };
447
448 let (addr_bytes, addr_type) = socket_addr_to_bytes(remote);
449 trace_event!(
450 &this.event_log,
451 Event {
452 timestamp: timestamp_now(),
453 trace_id: this.trace_context.trace_id(),
454 sequence: 0,
455 _padding: 0,
456 node_id: [0u8; 32], event_data: EventData::ConnInit {
458 endpoint_bytes: addr_bytes,
459 addr_type,
460 _padding: [0u8; 45],
461 },
462 }
463 );
464 }
465
466 if path_validated {
467 this.on_path_validated();
468 }
469 if side.is_client() {
470 this.write_crypto();
472 this.init_0rtt();
473 }
474 this
475 }
476
477 #[cfg(feature = "__qlog")]
479 pub fn set_qlog(
480 &mut self,
481 writer: Box<dyn std::io::Write + Send + Sync>,
482 _title: Option<String>,
483 _description: Option<String>,
484 _now: Instant,
485 ) {
486 self.qlog_streamer = Some(writer);
487 }
488
489 #[cfg(feature = "__qlog")]
491 fn emit_qlog_recovery_metrics(&mut self, _now: Instant) {
492 }
495
496 #[must_use]
504 pub fn poll_timeout(&mut self) -> Option<Instant> {
505 let mut next_timeout = self.timers.next_timeout();
506
507 if let Some(nat_state) = &self.nat_traversal {
509 if let Some(nat_timeout) = nat_state.get_next_timeout(Instant::now()) {
510 self.timers.set(Timer::NatTraversal, nat_timeout);
512 next_timeout = Some(next_timeout.map_or(nat_timeout, |t| t.min(nat_timeout)));
513 }
514 }
515
516 next_timeout
517 }
518
519 #[must_use]
525 pub fn poll(&mut self) -> Option<Event> {
526 if let Some(x) = self.events.pop_front() {
527 return Some(x);
528 }
529
530 if let Some(event) = self.streams.poll() {
531 return Some(Event::Stream(event));
532 }
533
534 if let Some(err) = self.error.take() {
535 return Some(Event::ConnectionLost { reason: err });
536 }
537
538 None
539 }
540
541 #[must_use]
543 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
544 self.endpoint_events.pop_front().map(EndpointEvent)
545 }
546
547 #[must_use]
549 pub fn streams(&mut self) -> Streams<'_> {
550 Streams {
551 state: &mut self.streams,
552 conn_state: &self.state,
553 }
554 }
555
556 #[must_use]
560 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
561 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
562 RecvStream {
563 id,
564 state: &mut self.streams,
565 pending: &mut self.spaces[SpaceId::Data].pending,
566 }
567 }
568
569 #[must_use]
571 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
572 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
573 SendStream {
574 id,
575 state: &mut self.streams,
576 pending: &mut self.spaces[SpaceId::Data].pending,
577 conn_state: &self.state,
578 }
579 }
580
581 #[must_use]
591 pub fn poll_transmit(
592 &mut self,
593 now: Instant,
594 max_datagrams: usize,
595 buf: &mut Vec<u8>,
596 ) -> Option<Transmit> {
597 assert!(max_datagrams != 0);
598 let max_datagrams = match self.config.enable_segmentation_offload {
599 false => 1,
600 true => max_datagrams,
601 };
602
603 let mut num_datagrams = 0;
604 let mut datagram_start = 0;
607 let mut segment_size = usize::from(self.path.current_mtu());
608
609 if let Some(nat_traversal) = &mut self.nat_traversal {
611 if nat_traversal.check_coordination_timeout(now) {
612 trace!("NAT traversal coordination timed out, may retry");
613 }
614 }
615
616 if let Some(challenge) = self.send_nat_traversal_challenge(now, buf) {
618 return Some(challenge);
619 }
620
621 if let Some(challenge) = self.send_path_challenge(now, buf) {
622 return Some(challenge);
623 }
624
625 for space in SpaceId::iter() {
627 let request_immediate_ack =
628 space == SpaceId::Data && self.peer_supports_ack_frequency();
629 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
630 }
631
632 let close = match self.state {
634 State::Drained => {
635 self.app_limited = true;
636 return None;
637 }
638 State::Draining | State::Closed(_) => {
639 if !self.close {
642 self.app_limited = true;
643 return None;
644 }
645 true
646 }
647 _ => false,
648 };
649
650 if let Some(config) = &self.config.ack_frequency_config {
652 self.spaces[SpaceId::Data].pending.ack_frequency = self
653 .ack_frequency
654 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
655 && self.highest_space == SpaceId::Data
656 && self.peer_supports_ack_frequency();
657 }
658
659 let mut buf_capacity = 0;
663
664 let mut coalesce = true;
665 let mut builder_storage: Option<PacketBuilder> = None;
666 let mut sent_frames = None;
667 let mut pad_datagram = false;
668 let mut pad_datagram_to_mtu = false;
669 let mut congestion_blocked = false;
670
671 let mut space_idx = 0;
673 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
674 while space_idx < spaces.len() {
677 let space_id = spaces[space_idx];
678 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
685 let frame_space_1rtt =
686 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
687
688 let can_send = self.space_can_send(space_id, frame_space_1rtt);
690 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
691 space_idx += 1;
692 continue;
693 }
694
695 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
696 || self.spaces[space_id].ping_pending
697 || self.spaces[space_id].immediate_ack_pending;
698 if space_id == SpaceId::Data {
699 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
700 }
701
702 pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
703
704 let buf_end = if let Some(builder) = &builder_storage {
708 buf.len().max(builder.min_size) + builder.tag_len
709 } else {
710 buf.len()
711 };
712
713 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
714 crypto.packet.local.tag_len()
715 } else if space_id == SpaceId::Data {
716 match self.zero_rtt_crypto.as_ref() {
717 Some(crypto) => crypto.packet.tag_len(),
718 None => {
719 error!(
721 "sending packets in the application data space requires known 0-RTT or 1-RTT keys"
722 );
723 return None;
724 }
725 }
726 } else {
727 unreachable!("tried to send {:?} packet without keys", space_id)
728 };
729 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
730 if num_datagrams >= max_datagrams {
734 break;
736 }
737
738 if self
745 .path
746 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
747 {
748 trace!("blocked by anti-amplification");
749 break;
750 }
751
752 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
755 let untracked_bytes = if let Some(builder) = &builder_storage {
757 buf_capacity - builder.partial_encode.start
758 } else {
759 0
760 } as u64;
761 debug_assert!(untracked_bytes <= segment_size as u64);
762
763 let bytes_to_send = segment_size as u64 + untracked_bytes;
764 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
765 space_idx += 1;
766 congestion_blocked = true;
767 trace!("blocked by congestion control");
770 continue;
771 }
772
773 let smoothed_rtt = self.path.rtt.get();
775 if let Some(delay) = self.path.pacing.delay(
776 smoothed_rtt,
777 bytes_to_send,
778 self.path.current_mtu(),
779 self.path.congestion.window(),
780 now,
781 ) {
782 self.timers.set(Timer::Pacing, delay);
783 congestion_blocked = true;
784 trace!("blocked by pacing");
787 break;
788 }
789 }
790
791 if let Some(mut builder) = builder_storage.take() {
793 if pad_datagram {
794 let min_size = self.pqc_state.min_initial_size();
795 builder.pad_to(min_size);
796 }
797
798 if num_datagrams > 1 || pad_datagram_to_mtu {
799 const MAX_PADDING: usize = 16;
812 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
813 - datagram_start
814 + builder.tag_len;
815 if (packet_len_unpadded + MAX_PADDING < segment_size
816 && !pad_datagram_to_mtu)
817 || datagram_start + segment_size > buf_capacity
818 {
819 trace!(
820 "GSO truncated by demand for {} padding bytes or loss probe",
821 segment_size - packet_len_unpadded
822 );
823 builder_storage = Some(builder);
824 break;
825 }
826
827 builder.pad_to(segment_size as u16);
830 }
831
832 builder.finish_and_track(now, self, sent_frames.take(), buf);
833
834 if num_datagrams == 1 {
835 segment_size = buf.len();
842 buf_capacity = buf.len();
845
846 if space_id == SpaceId::Data {
853 let frame_space_1rtt =
854 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
855 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
856 break;
857 }
858 }
859 }
860 }
861
862 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
864 0 => segment_size,
865 _ => {
866 self.spaces[space_id].loss_probes -= 1;
867 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
871 }
872 };
873 buf_capacity += next_datagram_size_limit;
874 if buf.capacity() < buf_capacity {
875 buf.reserve(max_datagrams * segment_size);
884 }
885 num_datagrams += 1;
886 coalesce = true;
887 pad_datagram = false;
888 datagram_start = buf.len();
889
890 debug_assert_eq!(
891 datagram_start % segment_size,
892 0,
893 "datagrams in a GSO batch must be aligned to the segment size"
894 );
895 } else {
896 if let Some(builder) = builder_storage.take() {
900 builder.finish_and_track(now, self, sent_frames.take(), buf);
901 }
902 }
903
904 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
905
906 if self.spaces[SpaceId::Initial].crypto.is_some()
911 && space_id == SpaceId::Handshake
912 && self.side.is_client()
913 {
914 self.discard_space(now, SpaceId::Initial);
917 }
918 if let Some(ref mut prev) = self.prev_crypto {
919 prev.update_unacked = false;
920 }
921
922 debug_assert!(
923 builder_storage.is_none() && sent_frames.is_none(),
924 "Previous packet must have been finished"
925 );
926
927 let builder = builder_storage.insert(PacketBuilder::new(
928 now,
929 space_id,
930 self.rem_cids.active(),
931 buf,
932 buf_capacity,
933 datagram_start,
934 ack_eliciting,
935 self,
936 )?);
937 coalesce = coalesce && !builder.short_header;
938
939 let should_adjust_coalescing = self
941 .pqc_state
942 .should_adjust_coalescing(buf.len() - datagram_start, space_id);
943
944 if should_adjust_coalescing {
945 coalesce = false;
946 trace!("Disabling coalescing for PQC handshake in {:?}", space_id);
947 }
948
949 pad_datagram |=
951 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
952
953 if close {
954 trace!("sending CONNECTION_CLOSE");
955 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
960 Self::populate_acks(
961 now,
962 self.receiving_ecn,
963 &mut SentFrames::default(),
964 &mut self.spaces[space_id],
965 buf,
966 &mut self.stats,
967 );
968 }
969
970 debug_assert!(
974 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
975 "ACKs should leave space for ConnectionClose"
976 );
977 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
978 let max_frame_size = builder.max_size - buf.len();
979 match self.state {
980 State::Closed(state::Closed { ref reason }) => {
981 if space_id == SpaceId::Data || reason.is_transport_layer() {
982 reason.encode(buf, max_frame_size)
983 } else {
984 frame::ConnectionClose {
985 error_code: TransportErrorCode::APPLICATION_ERROR,
986 frame_type: None,
987 reason: Bytes::new(),
988 }
989 .encode(buf, max_frame_size)
990 }
991 }
992 State::Draining => frame::ConnectionClose {
993 error_code: TransportErrorCode::NO_ERROR,
994 frame_type: None,
995 reason: Bytes::new(),
996 }
997 .encode(buf, max_frame_size),
998 _ => unreachable!(
999 "tried to make a close packet when the connection wasn't closed"
1000 ),
1001 }
1002 }
1003 if space_id == self.highest_space {
1004 self.close = false;
1006 break;
1008 } else {
1009 space_idx += 1;
1013 continue;
1014 }
1015 }
1016
1017 if space_id == SpaceId::Data && num_datagrams == 1 {
1020 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
1021 let mut builder = builder_storage.take().unwrap();
1024 trace!("PATH_RESPONSE {:08x} (off-path)", token);
1025 buf.write(frame::FrameType::PATH_RESPONSE);
1026 buf.write(token);
1027 self.stats.frame_tx.path_response += 1;
1028 let min_size = self.pqc_state.min_initial_size();
1029 builder.pad_to(min_size);
1030 builder.finish_and_track(
1031 now,
1032 self,
1033 Some(SentFrames {
1034 non_retransmits: true,
1035 ..SentFrames::default()
1036 }),
1037 buf,
1038 );
1039 self.stats.udp_tx.on_sent(1, buf.len());
1040
1041 #[cfg(feature = "trace")]
1043 {
1044 use crate::trace_packet_sent;
1045 trace_packet_sent!(
1047 &self.event_log,
1048 self.trace_context.trace_id(),
1049 buf.len() as u32,
1050 0 );
1052 }
1053
1054 return Some(Transmit {
1055 destination: remote,
1056 size: buf.len(),
1057 ecn: None,
1058 segment_size: None,
1059 src_ip: self.local_ip,
1060 });
1061 }
1062 }
1063
1064 if space_id == SpaceId::Data && self.address_discovery_state.is_some() {
1066 let peer_supports = self.peer_params.address_discovery.is_some();
1067
1068 if let Some(state) = &mut self.address_discovery_state {
1069 if peer_supports {
1070 if let Some(frame) = state.queue_observed_address_frame(0, self.path.remote)
1071 {
1072 self.spaces[space_id]
1073 .pending
1074 .outbound_observations
1075 .push(frame);
1076 }
1077 }
1078 }
1079 }
1080
1081 let sent =
1082 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
1083
1084 debug_assert!(
1091 !(sent.is_ack_only(&self.streams)
1092 && !can_send.acks
1093 && can_send.other
1094 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
1095 && self.datagrams.outgoing.is_empty()),
1096 "SendableFrames was {can_send:?}, but only ACKs have been written"
1097 );
1098 pad_datagram |= sent.requires_padding;
1099
1100 if sent.largest_acked.is_some() {
1101 self.spaces[space_id].pending_acks.acks_sent();
1102 self.timers.stop(Timer::MaxAckDelay);
1103 }
1104
1105 sent_frames = Some(sent);
1107
1108 }
1111
1112 if let Some(mut builder) = builder_storage {
1114 if pad_datagram {
1115 let min_size = self.pqc_state.min_initial_size();
1116 builder.pad_to(min_size);
1117 }
1118
1119 if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
1125 builder.pad_to(segment_size as u16);
1126 }
1127
1128 let last_packet_number = builder.exact_number;
1129 builder.finish_and_track(now, self, sent_frames, buf);
1130 self.path
1131 .congestion
1132 .on_sent(now, buf.len() as u64, last_packet_number);
1133
1134 #[cfg(feature = "__qlog")]
1135 self.emit_qlog_recovery_metrics(now);
1136 }
1137
1138 self.app_limited = buf.is_empty() && !congestion_blocked;
1139
1140 if buf.is_empty() && self.state.is_established() {
1142 let space_id = SpaceId::Data;
1143 let probe_size = self
1144 .path
1145 .mtud
1146 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
1147
1148 let buf_capacity = probe_size as usize;
1149 buf.reserve(buf_capacity);
1150
1151 let mut builder = PacketBuilder::new(
1152 now,
1153 space_id,
1154 self.rem_cids.active(),
1155 buf,
1156 buf_capacity,
1157 0,
1158 true,
1159 self,
1160 )?;
1161
1162 buf.write(frame::FrameType::PING);
1164 self.stats.frame_tx.ping += 1;
1165
1166 if self.peer_supports_ack_frequency() {
1168 buf.write(frame::FrameType::IMMEDIATE_ACK);
1169 self.stats.frame_tx.immediate_ack += 1;
1170 }
1171
1172 builder.pad_to(probe_size);
1173 let sent_frames = SentFrames {
1174 non_retransmits: true,
1175 ..Default::default()
1176 };
1177 builder.finish_and_track(now, self, Some(sent_frames), buf);
1178
1179 self.stats.path.sent_plpmtud_probes += 1;
1180 num_datagrams = 1;
1181
1182 trace!(?probe_size, "writing MTUD probe");
1183 }
1184
1185 if buf.is_empty() {
1186 return None;
1187 }
1188
1189 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1190 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1191
1192 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
1193
1194 #[cfg(feature = "trace")]
1196 {
1197 use crate::trace_packet_sent;
1198 let packet_num = self.spaces[SpaceId::Data]
1201 .next_packet_number
1202 .saturating_sub(1);
1203 trace_packet_sent!(
1204 &self.event_log,
1205 self.trace_context.trace_id(),
1206 buf.len() as u32,
1207 packet_num
1208 );
1209 }
1210
1211 Some(Transmit {
1212 destination: self.path.remote,
1213 size: buf.len(),
1214 ecn: if self.path.sending_ecn {
1215 Some(EcnCodepoint::Ect0)
1216 } else {
1217 None
1218 },
1219 segment_size: match num_datagrams {
1220 1 => None,
1221 _ => Some(segment_size),
1222 },
1223 src_ip: self.local_ip,
1224 })
1225 }
1226
1227 fn send_coordination_request(&mut self, _now: Instant, _buf: &mut Vec<u8>) -> Option<Transmit> {
1229 let nat = self.nat_traversal.as_mut()?;
1231 if !nat.should_send_punch_request() {
1232 return None;
1233 }
1234
1235 let coord = nat.coordination.as_ref()?;
1236 let round = coord.round;
1237 if coord.punch_targets.is_empty() {
1238 return None;
1239 }
1240
1241 trace!(
1242 "queuing PUNCH_ME_NOW round {} with {} targets",
1243 round,
1244 coord.punch_targets.len()
1245 );
1246
1247 for target in &coord.punch_targets {
1249 let punch = frame::PunchMeNow {
1250 round,
1251 paired_with_sequence_number: target.remote_sequence,
1252 address: target.remote_addr,
1253 target_peer_id: None,
1254 };
1255 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1256 }
1257
1258 nat.mark_punch_request_sent();
1260
1261 None
1263 }
1264
1265 fn send_coordinated_path_challenge(
1267 &mut self,
1268 now: Instant,
1269 buf: &mut Vec<u8>,
1270 ) -> Option<Transmit> {
1271 if let Some(nat_traversal) = &mut self.nat_traversal {
1273 if nat_traversal.should_start_punching(now) {
1274 nat_traversal.start_punching_phase(now);
1275 }
1276 }
1277
1278 let (target_addr, challenge) = {
1280 let nat_traversal = self.nat_traversal.as_ref()?;
1281 match nat_traversal.get_coordination_phase() {
1282 Some(CoordinationPhase::Punching) => {
1283 let targets = nat_traversal.get_punch_targets_from_coordination()?;
1284 if targets.is_empty() {
1285 return None;
1286 }
1287 let target = &targets[0];
1289 (target.remote_addr, target.challenge)
1290 }
1291 _ => return None,
1292 }
1293 };
1294
1295 debug_assert_eq!(
1296 self.highest_space,
1297 SpaceId::Data,
1298 "PATH_CHALLENGE queued without 1-RTT keys"
1299 );
1300
1301 buf.reserve(self.pqc_state.min_initial_size() as usize);
1302 let buf_capacity = buf.capacity();
1303
1304 let mut builder = PacketBuilder::new(
1305 now,
1306 SpaceId::Data,
1307 self.rem_cids.active(),
1308 buf,
1309 buf_capacity,
1310 0,
1311 false,
1312 self,
1313 )?;
1314
1315 trace!(
1316 "sending coordinated PATH_CHALLENGE {:08x} to {}",
1317 challenge, target_addr
1318 );
1319 buf.write(frame::FrameType::PATH_CHALLENGE);
1320 buf.write(challenge);
1321 self.stats.frame_tx.path_challenge += 1;
1322
1323 let min_size = self.pqc_state.min_initial_size();
1324 builder.pad_to(min_size);
1325 builder.finish_and_track(now, self, None, buf);
1326
1327 if let Some(nat_traversal) = &mut self.nat_traversal {
1329 nat_traversal.mark_coordination_validating();
1330 }
1331
1332 Some(Transmit {
1333 destination: target_addr,
1334 size: buf.len(),
1335 ecn: if self.path.sending_ecn {
1336 Some(EcnCodepoint::Ect0)
1337 } else {
1338 None
1339 },
1340 segment_size: None,
1341 src_ip: self.local_ip,
1342 })
1343 }
1344
1345 fn send_nat_traversal_challenge(
1347 &mut self,
1348 now: Instant,
1349 buf: &mut Vec<u8>,
1350 ) -> Option<Transmit> {
1351 if let Some(request) = self.send_coordination_request(now, buf) {
1353 return Some(request);
1354 }
1355
1356 if let Some(punch) = self.send_coordinated_path_challenge(now, buf) {
1358 return Some(punch);
1359 }
1360
1361 let (remote_addr, remote_sequence) = {
1363 let nat_traversal = self.nat_traversal.as_ref()?;
1364 let candidates = nat_traversal.get_validation_candidates();
1365 if candidates.is_empty() {
1366 return None;
1367 }
1368 let (sequence, candidate) = candidates[0];
1370 (candidate.address, sequence)
1371 };
1372
1373 let challenge = self.rng.r#gen::<u64>();
1374
1375 if let Err(e) =
1377 self.nat_traversal
1378 .as_mut()?
1379 .start_validation(remote_sequence, challenge, now)
1380 {
1381 warn!("Failed to start NAT traversal validation: {}", e);
1382 return None;
1383 }
1384
1385 debug_assert_eq!(
1386 self.highest_space,
1387 SpaceId::Data,
1388 "PATH_CHALLENGE queued without 1-RTT keys"
1389 );
1390
1391 buf.reserve(self.pqc_state.min_initial_size() as usize);
1392 let buf_capacity = buf.capacity();
1393
1394 let mut builder = PacketBuilder::new(
1396 now,
1397 SpaceId::Data,
1398 self.rem_cids.active(),
1399 buf,
1400 buf_capacity,
1401 0,
1402 false,
1403 self,
1404 )?;
1405
1406 trace!(
1407 "sending PATH_CHALLENGE {:08x} to NAT candidate {}",
1408 challenge, remote_addr
1409 );
1410 buf.write(frame::FrameType::PATH_CHALLENGE);
1411 buf.write(challenge);
1412 self.stats.frame_tx.path_challenge += 1;
1413
1414 let min_size = self.pqc_state.min_initial_size();
1416 builder.pad_to(min_size);
1417
1418 builder.finish_and_track(now, self, None, buf);
1419
1420 Some(Transmit {
1421 destination: remote_addr,
1422 size: buf.len(),
1423 ecn: if self.path.sending_ecn {
1424 Some(EcnCodepoint::Ect0)
1425 } else {
1426 None
1427 },
1428 segment_size: None,
1429 src_ip: self.local_ip,
1430 })
1431 }
1432
1433 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1435 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1436 if !prev_path.challenge_pending {
1437 return None;
1438 }
1439 prev_path.challenge_pending = false;
1440 let token = prev_path
1441 .challenge
1442 .expect("previous path challenge pending without token");
1443 let destination = prev_path.remote;
1444 debug_assert_eq!(
1445 self.highest_space,
1446 SpaceId::Data,
1447 "PATH_CHALLENGE queued without 1-RTT keys"
1448 );
1449 buf.reserve(self.pqc_state.min_initial_size() as usize);
1450
1451 let buf_capacity = buf.capacity();
1452
1453 let mut builder = PacketBuilder::new(
1459 now,
1460 SpaceId::Data,
1461 *prev_cid,
1462 buf,
1463 buf_capacity,
1464 0,
1465 false,
1466 self,
1467 )?;
1468 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1469 buf.write(frame::FrameType::PATH_CHALLENGE);
1470 buf.write(token);
1471 self.stats.frame_tx.path_challenge += 1;
1472
1473 let min_size = self.pqc_state.min_initial_size();
1478 builder.pad_to(min_size);
1479
1480 builder.finish(self, buf);
1481 self.stats.udp_tx.on_sent(1, buf.len());
1482
1483 Some(Transmit {
1484 destination,
1485 size: buf.len(),
1486 ecn: None,
1487 segment_size: None,
1488 src_ip: self.local_ip,
1489 })
1490 }
1491
1492 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1494 if self.spaces[space_id].crypto.is_none()
1495 && (space_id != SpaceId::Data
1496 || self.zero_rtt_crypto.is_none()
1497 || self.side.is_server())
1498 {
1499 return SendableFrames::empty();
1501 }
1502 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1503 if space_id == SpaceId::Data {
1504 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1505 }
1506 can_send
1507 }
1508
1509 pub fn handle_event(&mut self, event: ConnectionEvent) {
1515 use ConnectionEventInner::*;
1516 match event.0 {
1517 Datagram(DatagramConnectionEvent {
1518 now,
1519 remote,
1520 ecn,
1521 first_decode,
1522 remaining,
1523 }) => {
1524 if remote != self.path.remote && !self.side.remote_may_migrate() {
1528 trace!("discarding packet from unrecognized peer {}", remote);
1529 return;
1530 }
1531
1532 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1533
1534 self.stats.udp_rx.datagrams += 1;
1535 self.stats.udp_rx.bytes += first_decode.len() as u64;
1536 let data_len = first_decode.len();
1537
1538 self.handle_decode(now, remote, ecn, first_decode);
1539 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1544
1545 if let Some(data) = remaining {
1546 self.stats.udp_rx.bytes += data.len() as u64;
1547 self.handle_coalesced(now, remote, ecn, data);
1548 }
1549
1550 #[cfg(feature = "__qlog")]
1551 self.emit_qlog_recovery_metrics(now);
1552
1553 if was_anti_amplification_blocked {
1554 self.set_loss_detection_timer(now);
1558 }
1559 }
1560 NewIdentifiers(ids, now) => {
1561 self.local_cid_state.new_cids(&ids, now);
1562 ids.into_iter().rev().for_each(|frame| {
1563 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1564 });
1565 if self.timers.get(Timer::PushNewCid).is_none_or(|x| x <= now) {
1567 self.reset_cid_retirement();
1568 }
1569 }
1570 QueueAddAddress(add) => {
1571 self.spaces[SpaceId::Data].pending.add_addresses.push(add);
1573 }
1574 QueuePunchMeNow(punch) => {
1575 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1577 }
1578 }
1579 }
1580
1581 pub fn handle_timeout(&mut self, now: Instant) {
1591 for &timer in &Timer::VALUES {
1592 if !self.timers.is_expired(timer, now) {
1593 continue;
1594 }
1595 self.timers.stop(timer);
1596 trace!(timer = ?timer, "timeout");
1597 match timer {
1598 Timer::Close => {
1599 self.state = State::Drained;
1600 self.endpoint_events.push_back(EndpointEventInner::Drained);
1601 }
1602 Timer::Idle => {
1603 self.kill(ConnectionError::TimedOut);
1604 }
1605 Timer::KeepAlive => {
1606 trace!("sending keep-alive");
1607 self.ping();
1608 }
1609 Timer::LossDetection => {
1610 self.on_loss_detection_timeout(now);
1611
1612 #[cfg(feature = "__qlog")]
1613 self.emit_qlog_recovery_metrics(now);
1614 }
1615 Timer::KeyDiscard => {
1616 self.zero_rtt_crypto = None;
1617 self.prev_crypto = None;
1618 }
1619 Timer::PathValidation => {
1620 debug!("path validation failed");
1621 if let Some((_, prev)) = self.prev_path.take() {
1622 self.path = prev;
1623 }
1624 self.path.challenge = None;
1625 self.path.challenge_pending = false;
1626 }
1627 Timer::Pacing => trace!("pacing timer expired"),
1628 Timer::NatTraversal => {
1629 self.handle_nat_traversal_timeout(now);
1630 }
1631 Timer::PushNewCid => {
1632 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1634 if !self.state.is_closed() {
1635 trace!(
1636 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1637 self.local_cid_state.retire_prior_to()
1638 );
1639 self.endpoint_events
1640 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1641 }
1642 }
1643 Timer::MaxAckDelay => {
1644 trace!("max ack delay reached");
1645 self.spaces[SpaceId::Data]
1647 .pending_acks
1648 .on_max_ack_delay_timeout()
1649 }
1650 }
1651 }
1652 }
1653
1654 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1666 self.close_inner(
1667 now,
1668 Close::Application(frame::ApplicationClose { error_code, reason }),
1669 )
1670 }
1671
1672 fn close_inner(&mut self, now: Instant, reason: Close) {
1673 let was_closed = self.state.is_closed();
1674 if !was_closed {
1675 self.close_common();
1676 self.set_close_timer(now);
1677 self.close = true;
1678 self.state = State::Closed(state::Closed { reason });
1679 }
1680 }
1681
1682 pub fn datagrams(&mut self) -> Datagrams<'_> {
1684 Datagrams { conn: self }
1685 }
1686
1687 pub fn stats(&self) -> ConnectionStats {
1689 let mut stats = self.stats;
1690 stats.path.rtt = self.path.rtt.get();
1691 stats.path.cwnd = self.path.congestion.window();
1692 stats.path.current_mtu = self.path.mtud.current_mtu();
1693
1694 stats
1695 }
1696
1697 pub fn set_token_binding_peer_id(&mut self, pid: PeerId) {
1699 self.peer_id_for_tokens = Some(pid);
1700 }
1701
1702 pub fn set_delay_new_token_until_binding(&mut self, v: bool) {
1704 self.delay_new_token_until_binding = v;
1705 }
1706
1707 pub fn ping(&mut self) {
1711 self.spaces[self.highest_space].ping_pending = true;
1712 }
1713
1714 pub(crate) fn is_pqc(&self) -> bool {
1716 self.pqc_state.using_pqc
1717 }
1718
1719 pub fn force_key_update(&mut self) {
1723 if !self.state.is_established() {
1724 debug!("ignoring forced key update in illegal state");
1725 return;
1726 }
1727 if self.prev_crypto.is_some() {
1728 debug!("ignoring redundant forced key update");
1731 return;
1732 }
1733 self.update_keys(None, false);
1734 }
1735
1736 pub fn crypto_session(&self) -> &dyn crypto::Session {
1738 &*self.crypto
1739 }
1740
1741 pub fn is_handshaking(&self) -> bool {
1746 self.state.is_handshake()
1747 }
1748
1749 pub fn is_closed(&self) -> bool {
1757 self.state.is_closed()
1758 }
1759
1760 pub fn is_drained(&self) -> bool {
1765 self.state.is_drained()
1766 }
1767
1768 pub fn accepted_0rtt(&self) -> bool {
1772 self.accepted_0rtt
1773 }
1774
1775 pub fn has_0rtt(&self) -> bool {
1777 self.zero_rtt_enabled
1778 }
1779
1780 pub fn has_pending_retransmits(&self) -> bool {
1782 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1783 }
1784
1785 pub fn side(&self) -> Side {
1787 self.side.side()
1788 }
1789
1790 pub fn remote_address(&self) -> SocketAddr {
1792 self.path.remote
1793 }
1794
1795 pub fn local_ip(&self) -> Option<IpAddr> {
1805 self.local_ip
1806 }
1807
1808 pub fn rtt(&self) -> Duration {
1810 self.path.rtt.get()
1811 }
1812
1813 pub fn congestion_state(&self) -> &dyn Controller {
1815 self.path.congestion.as_ref()
1816 }
1817
1818 pub fn path_changed(&mut self, now: Instant) {
1829 self.path.reset(now, &self.config);
1830 }
1831
1832 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1837 self.streams.set_max_concurrent(dir, count);
1838 let pending = &mut self.spaces[SpaceId::Data].pending;
1841 self.streams.queue_max_stream_id(pending);
1842 }
1843
1844 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1850 self.streams.max_concurrent(dir)
1851 }
1852
1853 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1855 if self.streams.set_receive_window(receive_window) {
1856 self.spaces[SpaceId::Data].pending.max_data = true;
1857 }
1858 }
1859
1860 pub fn set_address_discovery_enabled(&mut self, enabled: bool) {
1862 if let Some(ref mut state) = self.address_discovery_state {
1863 state.enabled = enabled;
1864 }
1865 }
1866
1867 pub fn address_discovery_enabled(&self) -> bool {
1869 self.address_discovery_state
1870 .as_ref()
1871 .is_some_and(|state| state.enabled)
1872 }
1873
1874 pub fn observed_address(&self) -> Option<SocketAddr> {
1879 self.address_discovery_state
1880 .as_ref()
1881 .and_then(|state| state.get_observed_address(0)) }
1883
1884 #[allow(dead_code)]
1886 pub(crate) fn address_discovery_state(&self) -> Option<&AddressDiscoveryState> {
1887 self.address_discovery_state.as_ref()
1888 }
1889
1890 fn on_ack_received(
1891 &mut self,
1892 now: Instant,
1893 space: SpaceId,
1894 ack: frame::Ack,
1895 ) -> Result<(), TransportError> {
1896 if ack.largest >= self.spaces[space].next_packet_number {
1897 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1898 }
1899 let new_largest = {
1900 let space = &mut self.spaces[space];
1901 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
1902 space.largest_acked_packet = Some(ack.largest);
1903 if let Some(info) = space.sent_packets.get(&ack.largest) {
1904 space.largest_acked_packet_sent = info.time_sent;
1908 }
1909 true
1910 } else {
1911 false
1912 }
1913 };
1914
1915 let mut newly_acked = ArrayRangeSet::new();
1917 for range in ack.iter() {
1918 self.packet_number_filter.check_ack(space, range.clone())?;
1919 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1920 newly_acked.insert_one(pn);
1921 }
1922 }
1923
1924 if newly_acked.is_empty() {
1925 return Ok(());
1926 }
1927
1928 let mut ack_eliciting_acked = false;
1929 for packet in newly_acked.elts() {
1930 if let Some(info) = self.spaces[space].take(packet) {
1931 if let Some(acked) = info.largest_acked {
1932 self.spaces[space].pending_acks.subtract_below(acked);
1938 }
1939 ack_eliciting_acked |= info.ack_eliciting;
1940
1941 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1943 if mtu_updated {
1944 self.path
1945 .congestion
1946 .on_mtu_update(self.path.mtud.current_mtu());
1947 }
1948
1949 self.ack_frequency.on_acked(packet);
1951
1952 self.on_packet_acked(now, packet, info);
1953 }
1954 }
1955
1956 self.path.congestion.on_end_acks(
1957 now,
1958 self.path.in_flight.bytes,
1959 self.app_limited,
1960 self.spaces[space].largest_acked_packet,
1961 );
1962
1963 if new_largest && ack_eliciting_acked {
1964 let ack_delay = if space != SpaceId::Data {
1965 Duration::from_micros(0)
1966 } else {
1967 cmp::min(
1968 self.ack_frequency.peer_max_ack_delay,
1969 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1970 )
1971 };
1972 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1973 self.path.rtt.update(ack_delay, rtt);
1974 if self.path.first_packet_after_rtt_sample.is_none() {
1975 self.path.first_packet_after_rtt_sample =
1976 Some((space, self.spaces[space].next_packet_number));
1977 }
1978 }
1979
1980 self.detect_lost_packets(now, space, true);
1982
1983 if self.peer_completed_address_validation() {
1984 self.pto_count = 0;
1985 }
1986
1987 if self.path.sending_ecn {
1989 if let Some(ecn) = ack.ecn {
1990 if new_largest {
1995 let sent = self.spaces[space].largest_acked_packet_sent;
1996 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1997 }
1998 } else {
1999 debug!("ECN not acknowledged by peer");
2001 self.path.sending_ecn = false;
2002 }
2003 }
2004
2005 self.set_loss_detection_timer(now);
2006 Ok(())
2007 }
2008
2009 fn process_ecn(
2011 &mut self,
2012 now: Instant,
2013 space: SpaceId,
2014 newly_acked: u64,
2015 ecn: frame::EcnCounts,
2016 largest_sent_time: Instant,
2017 ) {
2018 match self.spaces[space].detect_ecn(newly_acked, ecn) {
2019 Err(e) => {
2020 debug!("halting ECN due to verification failure: {}", e);
2021 self.path.sending_ecn = false;
2022 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
2025 }
2026 Ok(false) => {}
2027 Ok(true) => {
2028 self.stats.path.congestion_events += 1;
2029 self.path
2030 .congestion
2031 .on_congestion_event(now, largest_sent_time, false, 0);
2032 }
2033 }
2034 }
2035
2036 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
2039 self.remove_in_flight(pn, &info);
2040 if info.ack_eliciting && self.path.challenge.is_none() {
2041 self.path.congestion.on_ack(
2044 now,
2045 info.time_sent,
2046 info.size.into(),
2047 self.app_limited,
2048 &self.path.rtt,
2049 );
2050 }
2051
2052 if let Some(retransmits) = info.retransmits.get() {
2054 for (id, _) in retransmits.reset_stream.iter() {
2055 self.streams.reset_acked(*id);
2056 }
2057 }
2058
2059 for frame in info.stream_frames {
2060 self.streams.received_ack_of(frame);
2061 }
2062 }
2063
2064 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2065 let start = if self.zero_rtt_crypto.is_some() {
2066 now
2067 } else {
2068 self.prev_crypto
2069 .as_ref()
2070 .expect("no previous keys")
2071 .end_packet
2072 .as_ref()
2073 .expect("update not acknowledged yet")
2074 .1
2075 };
2076 self.timers
2077 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
2078 }
2079
2080 fn on_loss_detection_timeout(&mut self, now: Instant) {
2081 if let Some((_, pn_space)) = self.loss_time_and_space() {
2082 self.detect_lost_packets(now, pn_space, false);
2084 self.set_loss_detection_timer(now);
2085 return;
2086 }
2087
2088 let (_, space) = match self.pto_time_and_space(now) {
2089 Some(x) => x,
2090 None => {
2091 error!("PTO expired while unset");
2092 return;
2093 }
2094 };
2095 trace!(
2096 in_flight = self.path.in_flight.bytes,
2097 count = self.pto_count,
2098 ?space,
2099 "PTO fired"
2100 );
2101
2102 let count = match self.path.in_flight.ack_eliciting {
2103 0 => {
2106 debug_assert!(!self.peer_completed_address_validation());
2107 1
2108 }
2109 _ => 2,
2111 };
2112 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
2113 self.pto_count = self.pto_count.saturating_add(1);
2114 self.set_loss_detection_timer(now);
2115 }
2116
2117 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
2118 let mut lost_packets = Vec::<u64>::new();
2119 let mut lost_mtu_probe = None;
2120 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
2121 let rtt = self.path.rtt.conservative();
2122 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
2123
2124 let lost_send_time = now.checked_sub(loss_delay).unwrap();
2126 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
2127 let packet_threshold = self.config.packet_threshold as u64;
2128 let mut size_of_lost_packets = 0u64;
2129
2130 let congestion_period =
2134 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
2135 let mut persistent_congestion_start: Option<Instant> = None;
2136 let mut prev_packet = None;
2137 let mut in_persistent_congestion = false;
2138
2139 let space = &mut self.spaces[pn_space];
2140 space.loss_time = None;
2141
2142 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
2143 if prev_packet != Some(packet.wrapping_sub(1)) {
2144 persistent_congestion_start = None;
2146 }
2147
2148 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
2149 {
2150 if Some(packet) == in_flight_mtu_probe {
2151 lost_mtu_probe = in_flight_mtu_probe;
2154 } else {
2155 lost_packets.push(packet);
2156 size_of_lost_packets += info.size as u64;
2157 if info.ack_eliciting && due_to_ack {
2158 match persistent_congestion_start {
2159 Some(start) if info.time_sent - start > congestion_period => {
2162 in_persistent_congestion = true;
2163 }
2164 None if self
2166 .path
2167 .first_packet_after_rtt_sample
2168 .is_some_and(|x| x < (pn_space, packet)) =>
2169 {
2170 persistent_congestion_start = Some(info.time_sent);
2171 }
2172 _ => {}
2173 }
2174 }
2175 }
2176 } else {
2177 let next_loss_time = info.time_sent + loss_delay;
2178 space.loss_time = Some(
2179 space
2180 .loss_time
2181 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
2182 );
2183 persistent_congestion_start = None;
2184 }
2185
2186 prev_packet = Some(packet);
2187 }
2188
2189 if let Some(largest_lost) = lost_packets.last().cloned() {
2191 let old_bytes_in_flight = self.path.in_flight.bytes;
2192 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
2193 self.lost_packets += lost_packets.len() as u64;
2194 self.stats.path.lost_packets += lost_packets.len() as u64;
2195 self.stats.path.lost_bytes += size_of_lost_packets;
2196 trace!(
2197 "packets lost: {:?}, bytes lost: {}",
2198 lost_packets, size_of_lost_packets
2199 );
2200
2201 for &packet in &lost_packets {
2202 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2204 for frame in info.stream_frames {
2205 self.streams.retransmit(frame);
2206 }
2207 self.spaces[pn_space].pending |= info.retransmits;
2208 self.path.mtud.on_non_probe_lost(packet, info.size);
2209 }
2210
2211 if self.path.mtud.black_hole_detected(now) {
2212 self.stats.path.black_holes_detected += 1;
2213 self.path
2214 .congestion
2215 .on_mtu_update(self.path.mtud.current_mtu());
2216 if let Some(max_datagram_size) = self.datagrams().max_size() {
2217 self.datagrams.drop_oversized(max_datagram_size);
2218 }
2219 }
2220
2221 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
2223
2224 if lost_ack_eliciting {
2225 self.stats.path.congestion_events += 1;
2226 self.path.congestion.on_congestion_event(
2227 now,
2228 largest_lost_sent,
2229 in_persistent_congestion,
2230 size_of_lost_packets,
2231 );
2232 }
2233 }
2234
2235 if let Some(packet) = lost_mtu_probe {
2237 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2239 self.path.mtud.on_probe_lost();
2240 self.stats.path.lost_plpmtud_probes += 1;
2241 }
2242 }
2243
2244 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
2245 SpaceId::iter()
2246 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
2247 .min_by_key(|&(time, _)| time)
2248 }
2249
2250 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
2251 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
2252 let mut duration = self.path.rtt.pto_base() * backoff;
2253
2254 if self.path.in_flight.ack_eliciting == 0 {
2255 debug_assert!(!self.peer_completed_address_validation());
2256 let space = match self.highest_space {
2257 SpaceId::Handshake => SpaceId::Handshake,
2258 _ => SpaceId::Initial,
2259 };
2260 return Some((now + duration, space));
2261 }
2262
2263 let mut result = None;
2264 for space in SpaceId::iter() {
2265 if self.spaces[space].in_flight == 0 {
2266 continue;
2267 }
2268 if space == SpaceId::Data {
2269 if self.is_handshaking() {
2271 return result;
2272 }
2273 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2275 }
2276 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
2277 Some(time) => time,
2278 None => continue,
2279 };
2280 let pto = last_ack_eliciting + duration;
2281 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2282 result = Some((pto, space));
2283 }
2284 }
2285 result
2286 }
2287
2288 fn peer_completed_address_validation(&self) -> bool {
2289 if self.side.is_server() || self.state.is_closed() {
2290 return true;
2291 }
2292 self.spaces[SpaceId::Handshake]
2295 .largest_acked_packet
2296 .is_some()
2297 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
2298 || (self.spaces[SpaceId::Data].crypto.is_some()
2299 && self.spaces[SpaceId::Handshake].crypto.is_none())
2300 }
2301
2302 fn set_loss_detection_timer(&mut self, now: Instant) {
2303 if self.state.is_closed() {
2304 return;
2308 }
2309
2310 if let Some((loss_time, _)) = self.loss_time_and_space() {
2311 self.timers.set(Timer::LossDetection, loss_time);
2313 return;
2314 }
2315
2316 if self.path.anti_amplification_blocked(1) {
2317 self.timers.stop(Timer::LossDetection);
2319 return;
2320 }
2321
2322 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
2323 self.timers.stop(Timer::LossDetection);
2326 return;
2327 }
2328
2329 if let Some((timeout, _)) = self.pto_time_and_space(now) {
2332 self.timers.set(Timer::LossDetection, timeout);
2333 } else {
2334 self.timers.stop(Timer::LossDetection);
2335 }
2336 }
2337
2338 fn pto(&self, space: SpaceId) -> Duration {
2340 let max_ack_delay = match space {
2341 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
2342 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
2343 };
2344 self.path.rtt.pto_base() + max_ack_delay
2345 }
2346
2347 fn on_packet_authenticated(
2348 &mut self,
2349 now: Instant,
2350 space_id: SpaceId,
2351 ecn: Option<EcnCodepoint>,
2352 packet: Option<u64>,
2353 spin: bool,
2354 is_1rtt: bool,
2355 ) {
2356 self.total_authed_packets += 1;
2357 self.reset_keep_alive(now);
2358 self.reset_idle_timeout(now, space_id);
2359 self.permit_idle_reset = true;
2360 self.receiving_ecn |= ecn.is_some();
2361 if let Some(x) = ecn {
2362 let space = &mut self.spaces[space_id];
2363 space.ecn_counters += x;
2364
2365 if x.is_ce() {
2366 space.pending_acks.set_immediate_ack_required();
2367 }
2368 }
2369
2370 let packet = match packet {
2371 Some(x) => x,
2372 None => return,
2373 };
2374 if self.side.is_server() {
2375 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
2376 self.discard_space(now, SpaceId::Initial);
2378 }
2379 if self.zero_rtt_crypto.is_some() && is_1rtt {
2380 self.set_key_discard_timer(now, space_id)
2382 }
2383 }
2384 let space = &mut self.spaces[space_id];
2385 space.pending_acks.insert_one(packet, now);
2386 if packet >= space.rx_packet {
2387 space.rx_packet = packet;
2388 self.spin = self.side.is_client() ^ spin;
2390 }
2391 }
2392
2393 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
2394 let timeout = match self.idle_timeout {
2395 None => return,
2396 Some(dur) => dur,
2397 };
2398 if self.state.is_closed() {
2399 self.timers.stop(Timer::Idle);
2400 return;
2401 }
2402 let dt = cmp::max(timeout, 3 * self.pto(space));
2403 self.timers.set(Timer::Idle, now + dt);
2404 }
2405
2406 fn reset_keep_alive(&mut self, now: Instant) {
2407 let interval = match self.config.keep_alive_interval {
2408 Some(x) if self.state.is_established() => x,
2409 _ => return,
2410 };
2411 self.timers.set(Timer::KeepAlive, now + interval);
2412 }
2413
2414 fn reset_cid_retirement(&mut self) {
2415 if let Some(t) = self.local_cid_state.next_timeout() {
2416 self.timers.set(Timer::PushNewCid, t);
2417 }
2418 }
2419
2420 pub(crate) fn handle_first_packet(
2425 &mut self,
2426 now: Instant,
2427 remote: SocketAddr,
2428 ecn: Option<EcnCodepoint>,
2429 packet_number: u64,
2430 packet: InitialPacket,
2431 remaining: Option<BytesMut>,
2432 ) -> Result<(), ConnectionError> {
2433 let span = trace_span!("first recv");
2434 let _guard = span.enter();
2435 debug_assert!(self.side.is_server());
2436 let len = packet.header_data.len() + packet.payload.len();
2437 self.path.total_recvd = len as u64;
2438
2439 match self.state {
2440 State::Handshake(ref mut state) => {
2441 state.expected_token = packet.header.token.clone();
2442 }
2443 _ => unreachable!("first packet must be delivered in Handshake state"),
2444 }
2445
2446 self.on_packet_authenticated(
2447 now,
2448 SpaceId::Initial,
2449 ecn,
2450 Some(packet_number),
2451 false,
2452 false,
2453 );
2454
2455 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2456 if let Some(data) = remaining {
2457 self.handle_coalesced(now, remote, ecn, data);
2458 }
2459
2460 #[cfg(feature = "__qlog")]
2461 self.emit_qlog_recovery_metrics(now);
2462
2463 Ok(())
2464 }
2465
2466 fn init_0rtt(&mut self) {
2467 let (header, packet) = match self.crypto.early_crypto() {
2468 Some(x) => x,
2469 None => return,
2470 };
2471 if self.side.is_client() {
2472 match self.crypto.transport_parameters() {
2473 Ok(params) => {
2474 let params = params
2475 .expect("crypto layer didn't supply transport parameters with ticket");
2476 let params = TransportParameters {
2478 initial_src_cid: None,
2479 original_dst_cid: None,
2480 preferred_address: None,
2481 retry_src_cid: None,
2482 stateless_reset_token: None,
2483 min_ack_delay: None,
2484 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2485 max_ack_delay: TransportParameters::default().max_ack_delay,
2486 ..params
2487 };
2488 self.set_peer_params(params);
2489 }
2490 Err(e) => {
2491 error!("session ticket has malformed transport parameters: {}", e);
2492 return;
2493 }
2494 }
2495 }
2496 trace!("0-RTT enabled");
2497 self.zero_rtt_enabled = true;
2498 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2499 }
2500
2501 fn read_crypto(
2502 &mut self,
2503 space: SpaceId,
2504 crypto: &frame::Crypto,
2505 payload_len: usize,
2506 ) -> Result<(), TransportError> {
2507 let expected = if !self.state.is_handshake() {
2508 SpaceId::Data
2509 } else if self.highest_space == SpaceId::Initial {
2510 SpaceId::Initial
2511 } else {
2512 SpaceId::Handshake
2515 };
2516 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2520
2521 let end = crypto.offset + crypto.data.len() as u64;
2522 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2523 warn!(
2524 "received new {:?} CRYPTO data when expecting {:?}",
2525 space, expected
2526 );
2527 return Err(TransportError::PROTOCOL_VIOLATION(
2528 "new data at unexpected encryption level",
2529 ));
2530 }
2531
2532 self.pqc_state.detect_pqc_from_crypto(&crypto.data, space);
2534
2535 if self.pqc_state.should_trigger_mtu_discovery() {
2537 self.path
2539 .mtud
2540 .reset(self.pqc_state.min_initial_size(), self.config.min_mtu);
2541 trace!("Triggered MTU discovery for PQC handshake");
2542 }
2543
2544 let space = &mut self.spaces[space];
2545 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2546 if max > self.config.crypto_buffer_size as u64 {
2547 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2548 }
2549
2550 space
2551 .crypto_stream
2552 .insert(crypto.offset, crypto.data.clone(), payload_len);
2553 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2554 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2555 if self.crypto.read_handshake(&chunk.bytes)? {
2556 self.events.push_back(Event::HandshakeDataReady);
2557 }
2558 }
2559
2560 Ok(())
2561 }
2562
2563 fn write_crypto(&mut self) {
2564 loop {
2565 let space = self.highest_space;
2566 let mut outgoing = Vec::new();
2567 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2568 match space {
2569 SpaceId::Initial => {
2570 self.upgrade_crypto(SpaceId::Handshake, crypto);
2571 }
2572 SpaceId::Handshake => {
2573 self.upgrade_crypto(SpaceId::Data, crypto);
2574 }
2575 _ => unreachable!("got updated secrets during 1-RTT"),
2576 }
2577 }
2578 if outgoing.is_empty() {
2579 if space == self.highest_space {
2580 break;
2581 } else {
2582 continue;
2584 }
2585 }
2586 let offset = self.spaces[space].crypto_offset;
2587 let outgoing = Bytes::from(outgoing);
2588 if let State::Handshake(ref mut state) = self.state {
2589 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2590 state.client_hello = Some(outgoing.clone());
2591 }
2592 }
2593 self.spaces[space].crypto_offset += outgoing.len() as u64;
2594 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2595
2596 let use_pqc_fragmentation = self.pqc_state.using_pqc && outgoing.len() > 1200;
2598
2599 if use_pqc_fragmentation {
2600 let frames = self.pqc_state.packet_handler.fragment_crypto_data(
2602 &outgoing,
2603 offset,
2604 self.pqc_state.min_initial_size() as usize,
2605 );
2606 for frame in frames {
2607 self.spaces[space].pending.crypto.push_back(frame);
2608 }
2609 } else {
2610 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2612 offset,
2613 data: outgoing,
2614 });
2615 }
2616 }
2617 }
2618
2619 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2621 debug_assert!(
2622 self.spaces[space].crypto.is_none(),
2623 "already reached packet space {space:?}"
2624 );
2625 trace!("{:?} keys ready", space);
2626 if space == SpaceId::Data {
2627 self.next_crypto = Some(
2629 self.crypto
2630 .next_1rtt_keys()
2631 .expect("handshake should be complete"),
2632 );
2633 }
2634
2635 self.spaces[space].crypto = Some(crypto);
2636 debug_assert!(space as usize > self.highest_space as usize);
2637 self.highest_space = space;
2638 if space == SpaceId::Data && self.side.is_client() {
2639 self.zero_rtt_crypto = None;
2641 }
2642 }
2643
2644 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2645 debug_assert!(space_id != SpaceId::Data);
2646 trace!("discarding {:?} keys", space_id);
2647 if space_id == SpaceId::Initial {
2648 if let ConnectionSide::Client { token, .. } = &mut self.side {
2650 *token = Bytes::new();
2651 }
2652 }
2653 let space = &mut self.spaces[space_id];
2654 space.crypto = None;
2655 space.time_of_last_ack_eliciting_packet = None;
2656 space.loss_time = None;
2657 space.in_flight = 0;
2658 let sent_packets = mem::take(&mut space.sent_packets);
2659 for (pn, packet) in sent_packets.into_iter() {
2660 self.remove_in_flight(pn, &packet);
2661 }
2662 self.set_loss_detection_timer(now)
2663 }
2664
2665 fn handle_coalesced(
2666 &mut self,
2667 now: Instant,
2668 remote: SocketAddr,
2669 ecn: Option<EcnCodepoint>,
2670 data: BytesMut,
2671 ) {
2672 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2673 let mut remaining = Some(data);
2674 while let Some(data) = remaining {
2675 match PartialDecode::new(
2676 data,
2677 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2678 &[self.version],
2679 self.endpoint_config.grease_quic_bit,
2680 ) {
2681 Ok((partial_decode, rest)) => {
2682 remaining = rest;
2683 self.handle_decode(now, remote, ecn, partial_decode);
2684 }
2685 Err(e) => {
2686 trace!("malformed header: {}", e);
2687 return;
2688 }
2689 }
2690 }
2691 }
2692
2693 fn handle_decode(
2694 &mut self,
2695 now: Instant,
2696 remote: SocketAddr,
2697 ecn: Option<EcnCodepoint>,
2698 partial_decode: PartialDecode,
2699 ) {
2700 if let Some(decoded) = packet_crypto::unprotect_header(
2701 partial_decode,
2702 &self.spaces,
2703 self.zero_rtt_crypto.as_ref(),
2704 self.peer_params.stateless_reset_token,
2705 ) {
2706 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2707 }
2708 }
2709
2710 fn handle_packet(
2711 &mut self,
2712 now: Instant,
2713 remote: SocketAddr,
2714 ecn: Option<EcnCodepoint>,
2715 packet: Option<Packet>,
2716 stateless_reset: bool,
2717 ) {
2718 self.stats.udp_rx.ios += 1;
2719 if let Some(ref packet) = packet {
2720 trace!(
2721 "got {:?} packet ({} bytes) from {} using id {}",
2722 packet.header.space(),
2723 packet.payload.len() + packet.header_data.len(),
2724 remote,
2725 packet.header.dst_cid(),
2726 );
2727
2728 #[cfg(feature = "trace")]
2730 {
2731 use crate::trace_packet_received;
2732 let packet_size = packet.payload.len() + packet.header_data.len();
2734 trace_packet_received!(
2735 &self.event_log,
2736 self.trace_context.trace_id(),
2737 packet_size as u32,
2738 0 );
2740 }
2741 }
2742
2743 if self.is_handshaking() && remote != self.path.remote {
2744 debug!("discarding packet with unexpected remote during handshake");
2745 return;
2746 }
2747
2748 let was_closed = self.state.is_closed();
2749 let was_drained = self.state.is_drained();
2750
2751 let decrypted = match packet {
2752 None => Err(None),
2753 Some(mut packet) => self
2754 .decrypt_packet(now, &mut packet)
2755 .map(move |number| (packet, number)),
2756 };
2757 let result = match decrypted {
2758 _ if stateless_reset => {
2759 debug!("got stateless reset");
2760 Err(ConnectionError::Reset)
2761 }
2762 Err(Some(e)) => {
2763 warn!("illegal packet: {}", e);
2764 Err(e.into())
2765 }
2766 Err(None) => {
2767 debug!("failed to authenticate packet");
2768 self.authentication_failures += 1;
2769 let integrity_limit = self.spaces[self.highest_space]
2770 .crypto
2771 .as_ref()
2772 .unwrap()
2773 .packet
2774 .local
2775 .integrity_limit();
2776 if self.authentication_failures > integrity_limit {
2777 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2778 } else {
2779 return;
2780 }
2781 }
2782 Ok((packet, number)) => {
2783 let span = match number {
2784 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2785 None => trace_span!("recv", space = ?packet.header.space()),
2786 };
2787 let _guard = span.enter();
2788
2789 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2790 if number.is_some_and(is_duplicate) {
2791 debug!("discarding possible duplicate packet");
2792 return;
2793 } else if self.state.is_handshake() && packet.header.is_short() {
2794 trace!("dropping short packet during handshake");
2796 return;
2797 } else {
2798 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2799 if let State::Handshake(ref hs) = self.state {
2800 if self.side.is_server() && token != &hs.expected_token {
2801 warn!("discarding Initial with invalid retry token");
2805 return;
2806 }
2807 }
2808 }
2809
2810 if !self.state.is_closed() {
2811 let spin = match packet.header {
2812 Header::Short { spin, .. } => spin,
2813 _ => false,
2814 };
2815 self.on_packet_authenticated(
2816 now,
2817 packet.header.space(),
2818 ecn,
2819 number,
2820 spin,
2821 packet.header.is_1rtt(),
2822 );
2823 }
2824
2825 self.process_decrypted_packet(now, remote, number, packet)
2826 }
2827 }
2828 };
2829
2830 if let Err(conn_err) = result {
2832 self.error = Some(conn_err.clone());
2833 self.state = match conn_err {
2834 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2835 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2836 ConnectionError::Reset
2837 | ConnectionError::TransportError(TransportError {
2838 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2839 ..
2840 }) => State::Drained,
2841 ConnectionError::TimedOut => {
2842 unreachable!("timeouts aren't generated by packet processing");
2843 }
2844 ConnectionError::TransportError(err) => {
2845 debug!("closing connection due to transport error: {}", err);
2846 State::closed(err)
2847 }
2848 ConnectionError::VersionMismatch => State::Draining,
2849 ConnectionError::LocallyClosed => {
2850 unreachable!("LocallyClosed isn't generated by packet processing");
2851 }
2852 ConnectionError::CidsExhausted => {
2853 unreachable!("CidsExhausted isn't generated by packet processing");
2854 }
2855 };
2856 }
2857
2858 if !was_closed && self.state.is_closed() {
2859 self.close_common();
2860 if !self.state.is_drained() {
2861 self.set_close_timer(now);
2862 }
2863 }
2864 if !was_drained && self.state.is_drained() {
2865 self.endpoint_events.push_back(EndpointEventInner::Drained);
2866 self.timers.stop(Timer::Close);
2869 }
2870
2871 if let State::Closed(_) = self.state {
2873 self.close = remote == self.path.remote;
2874 }
2875 }
2876
2877 fn process_decrypted_packet(
2878 &mut self,
2879 now: Instant,
2880 remote: SocketAddr,
2881 number: Option<u64>,
2882 packet: Packet,
2883 ) -> Result<(), ConnectionError> {
2884 let state = match self.state {
2885 State::Established => {
2886 match packet.header.space() {
2887 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2888 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2889 _ => {
2890 trace!("discarding unexpected pre-handshake packet");
2891 }
2892 }
2893 return Ok(());
2894 }
2895 State::Closed(_) => {
2896 for result in frame::Iter::new(packet.payload.freeze())? {
2897 let frame = match result {
2898 Ok(frame) => frame,
2899 Err(err) => {
2900 debug!("frame decoding error: {err:?}");
2901 continue;
2902 }
2903 };
2904
2905 if let Frame::Padding = frame {
2906 continue;
2907 };
2908
2909 self.stats.frame_rx.record(&frame);
2910
2911 if let Frame::Close(_) = frame {
2912 trace!("draining");
2913 self.state = State::Draining;
2914 break;
2915 }
2916 }
2917 return Ok(());
2918 }
2919 State::Draining | State::Drained => return Ok(()),
2920 State::Handshake(ref mut state) => state,
2921 };
2922
2923 match packet.header {
2924 Header::Retry {
2925 src_cid: rem_cid, ..
2926 } => {
2927 if self.side.is_server() {
2928 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2929 }
2930
2931 if self.total_authed_packets > 1
2932 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2934 &self.rem_cids.active(),
2935 &packet.header_data,
2936 &packet.payload,
2937 )
2938 {
2939 trace!("discarding invalid Retry");
2940 return Ok(());
2948 }
2949
2950 trace!("retrying with CID {}", rem_cid);
2951 let client_hello = state.client_hello.take().unwrap();
2952 self.retry_src_cid = Some(rem_cid);
2953 self.rem_cids.update_initial_cid(rem_cid);
2954 self.rem_handshake_cid = rem_cid;
2955
2956 let space = &mut self.spaces[SpaceId::Initial];
2957 if let Some(info) = space.take(0) {
2958 self.on_packet_acked(now, 0, info);
2959 };
2960
2961 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
2963 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2964 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2965 crypto_offset: client_hello.len() as u64,
2966 ..PacketSpace::new(now)
2967 };
2968 self.spaces[SpaceId::Initial]
2969 .pending
2970 .crypto
2971 .push_back(frame::Crypto {
2972 offset: 0,
2973 data: client_hello,
2974 });
2975
2976 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2978 for (pn, info) in zero_rtt {
2979 self.remove_in_flight(pn, &info);
2980 self.spaces[SpaceId::Data].pending |= info.retransmits;
2981 }
2982 self.streams.retransmit_all_for_0rtt();
2983
2984 let token_len = packet.payload.len() - 16;
2985 let ConnectionSide::Client { ref mut token, .. } = self.side else {
2986 unreachable!("we already short-circuited if we're server");
2987 };
2988 *token = packet.payload.freeze().split_to(token_len);
2989 self.state = State::Handshake(state::Handshake {
2990 expected_token: Bytes::new(),
2991 rem_cid_set: false,
2992 client_hello: None,
2993 });
2994 Ok(())
2995 }
2996 Header::Long {
2997 ty: LongType::Handshake,
2998 src_cid: rem_cid,
2999 ..
3000 } => {
3001 if rem_cid != self.rem_handshake_cid {
3002 debug!(
3003 "discarding packet with mismatched remote CID: {} != {}",
3004 self.rem_handshake_cid, rem_cid
3005 );
3006 return Ok(());
3007 }
3008 self.on_path_validated();
3009
3010 self.process_early_payload(now, packet)?;
3011 if self.state.is_closed() {
3012 return Ok(());
3013 }
3014
3015 if self.crypto.is_handshaking() {
3016 trace!("handshake ongoing");
3017 return Ok(());
3018 }
3019
3020 if self.side.is_client() {
3021 let params =
3023 self.crypto
3024 .transport_parameters()?
3025 .ok_or_else(|| TransportError {
3026 code: TransportErrorCode::crypto(0x6d),
3027 frame: None,
3028 reason: "transport parameters missing".into(),
3029 })?;
3030
3031 if self.has_0rtt() {
3032 if !self.crypto.early_data_accepted().unwrap() {
3033 debug_assert!(self.side.is_client());
3034 debug!("0-RTT rejected");
3035 self.accepted_0rtt = false;
3036 self.streams.zero_rtt_rejected();
3037
3038 self.spaces[SpaceId::Data].pending = Retransmits::default();
3040
3041 let sent_packets =
3043 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
3044 for (pn, packet) in sent_packets {
3045 self.remove_in_flight(pn, &packet);
3046 }
3047 } else {
3048 self.accepted_0rtt = true;
3049 params.validate_resumption_from(&self.peer_params)?;
3050 }
3051 }
3052 if let Some(token) = params.stateless_reset_token {
3053 self.endpoint_events
3054 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
3055 }
3056 self.handle_peer_params(params)?;
3057 self.issue_first_cids(now);
3058 } else {
3059 self.spaces[SpaceId::Data].pending.handshake_done = true;
3061 self.discard_space(now, SpaceId::Handshake);
3062 }
3063
3064 self.events.push_back(Event::Connected);
3065 self.state = State::Established;
3066 trace!("established");
3067 Ok(())
3068 }
3069 Header::Initial(InitialHeader {
3070 src_cid: rem_cid, ..
3071 }) => {
3072 if !state.rem_cid_set {
3073 trace!("switching remote CID to {}", rem_cid);
3074 let mut state = state.clone();
3075 self.rem_cids.update_initial_cid(rem_cid);
3076 self.rem_handshake_cid = rem_cid;
3077 self.orig_rem_cid = rem_cid;
3078 state.rem_cid_set = true;
3079 self.state = State::Handshake(state);
3080 } else if rem_cid != self.rem_handshake_cid {
3081 debug!(
3082 "discarding packet with mismatched remote CID: {} != {}",
3083 self.rem_handshake_cid, rem_cid
3084 );
3085 return Ok(());
3086 }
3087
3088 let starting_space = self.highest_space;
3089 self.process_early_payload(now, packet)?;
3090
3091 if self.side.is_server()
3092 && starting_space == SpaceId::Initial
3093 && self.highest_space != SpaceId::Initial
3094 {
3095 let params =
3096 self.crypto
3097 .transport_parameters()?
3098 .ok_or_else(|| TransportError {
3099 code: TransportErrorCode::crypto(0x6d),
3100 frame: None,
3101 reason: "transport parameters missing".into(),
3102 })?;
3103 self.handle_peer_params(params)?;
3104 self.issue_first_cids(now);
3105 self.init_0rtt();
3106 }
3107 Ok(())
3108 }
3109 Header::Long {
3110 ty: LongType::ZeroRtt,
3111 ..
3112 } => {
3113 self.process_payload(now, remote, number.unwrap(), packet)?;
3114 Ok(())
3115 }
3116 Header::VersionNegotiate { .. } => {
3117 if self.total_authed_packets > 1 {
3118 return Ok(());
3119 }
3120 let supported = packet
3121 .payload
3122 .chunks(4)
3123 .any(|x| match <[u8; 4]>::try_from(x) {
3124 Ok(version) => self.version == u32::from_be_bytes(version),
3125 Err(_) => false,
3126 });
3127 if supported {
3128 return Ok(());
3129 }
3130 debug!("remote doesn't support our version");
3131 Err(ConnectionError::VersionMismatch)
3132 }
3133 Header::Short { .. } => unreachable!(
3134 "short packets received during handshake are discarded in handle_packet"
3135 ),
3136 }
3137 }
3138
3139 fn process_early_payload(
3141 &mut self,
3142 now: Instant,
3143 packet: Packet,
3144 ) -> Result<(), TransportError> {
3145 debug_assert_ne!(packet.header.space(), SpaceId::Data);
3146 let payload_len = packet.payload.len();
3147 let mut ack_eliciting = false;
3148 for result in frame::Iter::new(packet.payload.freeze())? {
3149 let frame = result?;
3150 let span = match frame {
3151 Frame::Padding => continue,
3152 _ => Some(trace_span!("frame", ty = %frame.ty())),
3153 };
3154
3155 self.stats.frame_rx.record(&frame);
3156
3157 let _guard = span.as_ref().map(|x| x.enter());
3158 ack_eliciting |= frame.is_ack_eliciting();
3159
3160 match frame {
3162 Frame::Padding | Frame::Ping => {}
3163 Frame::Crypto(frame) => {
3164 self.read_crypto(packet.header.space(), &frame, payload_len)?;
3165 }
3166 Frame::Ack(ack) => {
3167 self.on_ack_received(now, packet.header.space(), ack)?;
3168 }
3169 Frame::Close(reason) => {
3170 self.error = Some(reason.into());
3171 self.state = State::Draining;
3172 return Ok(());
3173 }
3174 _ => {
3175 let mut err =
3176 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
3177 err.frame = Some(frame.ty());
3178 return Err(err);
3179 }
3180 }
3181 }
3182
3183 if ack_eliciting {
3184 self.spaces[packet.header.space()]
3186 .pending_acks
3187 .set_immediate_ack_required();
3188 }
3189
3190 self.write_crypto();
3191 Ok(())
3192 }
3193
3194 fn process_payload(
3195 &mut self,
3196 now: Instant,
3197 remote: SocketAddr,
3198 number: u64,
3199 packet: Packet,
3200 ) -> Result<(), TransportError> {
3201 let payload = packet.payload.freeze();
3202 let mut is_probing_packet = true;
3203 let mut close = None;
3204 let payload_len = payload.len();
3205 let mut ack_eliciting = false;
3206 for result in frame::Iter::new(payload)? {
3207 let frame = result?;
3208 let span = match frame {
3209 Frame::Padding => continue,
3210 _ => Some(trace_span!("frame", ty = %frame.ty())),
3211 };
3212
3213 self.stats.frame_rx.record(&frame);
3214 match &frame {
3217 Frame::Crypto(f) => {
3218 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
3219 }
3220 Frame::Stream(f) => {
3221 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
3222 }
3223 Frame::Datagram(f) => {
3224 trace!(len = f.data.len(), "got datagram frame");
3225 }
3226 f => {
3227 trace!("got frame {:?}", f);
3228 }
3229 }
3230
3231 let _guard = span.as_ref().map(|x| x.enter());
3232 if packet.header.is_0rtt() {
3233 match frame {
3234 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
3235 return Err(TransportError::PROTOCOL_VIOLATION(
3236 "illegal frame type in 0-RTT",
3237 ));
3238 }
3239 _ => {}
3240 }
3241 }
3242 ack_eliciting |= frame.is_ack_eliciting();
3243
3244 match frame {
3246 Frame::Padding
3247 | Frame::PathChallenge(_)
3248 | Frame::PathResponse(_)
3249 | Frame::NewConnectionId(_) => {}
3250 _ => {
3251 is_probing_packet = false;
3252 }
3253 }
3254 match frame {
3255 Frame::Crypto(frame) => {
3256 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
3257 }
3258 Frame::Stream(frame) => {
3259 if self.streams.received(frame, payload_len)?.should_transmit() {
3260 self.spaces[SpaceId::Data].pending.max_data = true;
3261 }
3262 }
3263 Frame::Ack(ack) => {
3264 self.on_ack_received(now, SpaceId::Data, ack)?;
3265 }
3266 Frame::Padding | Frame::Ping => {}
3267 Frame::Close(reason) => {
3268 close = Some(reason);
3269 }
3270 Frame::PathChallenge(token) => {
3271 self.path_responses.push(number, token, remote);
3272 if remote == self.path.remote {
3273 match self.peer_supports_ack_frequency() {
3276 true => self.immediate_ack(),
3277 false => self.ping(),
3278 }
3279 }
3280 }
3281 Frame::PathResponse(token) => {
3282 if self.path.challenge == Some(token) && remote == self.path.remote {
3283 trace!("new path validated");
3284 self.timers.stop(Timer::PathValidation);
3285 self.path.challenge = None;
3286 self.path.validated = true;
3287 if let Some((_, ref mut prev_path)) = self.prev_path {
3288 prev_path.challenge = None;
3289 prev_path.challenge_pending = false;
3290 }
3291 self.on_path_validated();
3292 } else if let Some(nat_traversal) = &mut self.nat_traversal {
3293 match nat_traversal.handle_validation_success(remote, token, now) {
3295 Ok(sequence) => {
3296 trace!(
3297 "NAT traversal candidate {} validated for sequence {}",
3298 remote, sequence
3299 );
3300
3301 if nat_traversal.handle_coordination_success(remote, now) {
3303 trace!("Coordination succeeded via {}", remote);
3304
3305 let can_migrate = match &self.side {
3307 ConnectionSide::Client { .. } => true, ConnectionSide::Server { server_config } => {
3309 server_config.migration
3310 }
3311 };
3312
3313 if can_migrate {
3314 let best_pairs = nat_traversal.get_best_succeeded_pairs();
3316 if let Some(best) = best_pairs.first() {
3317 if best.remote_addr == remote
3318 && best.remote_addr != self.path.remote
3319 {
3320 debug!(
3321 "NAT traversal found better path, initiating migration"
3322 );
3323 if let Err(e) =
3325 self.migrate_to_nat_traversal_path(now)
3326 {
3327 warn!(
3328 "Failed to migrate to NAT traversal path: {:?}",
3329 e
3330 );
3331 }
3332 }
3333 }
3334 }
3335 } else {
3336 if nat_traversal.mark_pair_succeeded(remote) {
3338 trace!("NAT traversal pair succeeded for {}", remote);
3339 }
3340 }
3341 }
3342 Err(NatTraversalError::ChallengeMismatch) => {
3343 debug!(
3344 "PATH_RESPONSE challenge mismatch for NAT candidate {}",
3345 remote
3346 );
3347 }
3348 Err(e) => {
3349 debug!("NAT traversal validation error: {}", e);
3350 }
3351 }
3352 } else {
3353 debug!(token, "ignoring invalid PATH_RESPONSE");
3354 }
3355 }
3356 Frame::MaxData(bytes) => {
3357 self.streams.received_max_data(bytes);
3358 }
3359 Frame::MaxStreamData { id, offset } => {
3360 self.streams.received_max_stream_data(id, offset)?;
3361 }
3362 Frame::MaxStreams { dir, count } => {
3363 self.streams.received_max_streams(dir, count)?;
3364 }
3365 Frame::ResetStream(frame) => {
3366 if self.streams.received_reset(frame)?.should_transmit() {
3367 self.spaces[SpaceId::Data].pending.max_data = true;
3368 }
3369 }
3370 Frame::DataBlocked { offset } => {
3371 debug!(offset, "peer claims to be blocked at connection level");
3372 }
3373 Frame::StreamDataBlocked { id, offset } => {
3374 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
3375 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
3376 return Err(TransportError::STREAM_STATE_ERROR(
3377 "STREAM_DATA_BLOCKED on send-only stream",
3378 ));
3379 }
3380 debug!(
3381 stream = %id,
3382 offset, "peer claims to be blocked at stream level"
3383 );
3384 }
3385 Frame::StreamsBlocked { dir, limit } => {
3386 if limit > MAX_STREAM_COUNT {
3387 return Err(TransportError::FRAME_ENCODING_ERROR(
3388 "unrepresentable stream limit",
3389 ));
3390 }
3391 debug!(
3392 "peer claims to be blocked opening more than {} {} streams",
3393 limit, dir
3394 );
3395 }
3396 Frame::StopSending(frame::StopSending { id, error_code }) => {
3397 if id.initiator() != self.side.side() {
3398 if id.dir() == Dir::Uni {
3399 debug!("got STOP_SENDING on recv-only {}", id);
3400 return Err(TransportError::STREAM_STATE_ERROR(
3401 "STOP_SENDING on recv-only stream",
3402 ));
3403 }
3404 } else if self.streams.is_local_unopened(id) {
3405 return Err(TransportError::STREAM_STATE_ERROR(
3406 "STOP_SENDING on unopened stream",
3407 ));
3408 }
3409 self.streams.received_stop_sending(id, error_code);
3410 }
3411 Frame::RetireConnectionId { sequence } => {
3412 let allow_more_cids = self
3413 .local_cid_state
3414 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
3415 self.endpoint_events
3416 .push_back(EndpointEventInner::RetireConnectionId(
3417 now,
3418 sequence,
3419 allow_more_cids,
3420 ));
3421 }
3422 Frame::NewConnectionId(frame) => {
3423 trace!(
3424 sequence = frame.sequence,
3425 id = %frame.id,
3426 retire_prior_to = frame.retire_prior_to,
3427 );
3428 if self.rem_cids.active().is_empty() {
3429 return Err(TransportError::PROTOCOL_VIOLATION(
3430 "NEW_CONNECTION_ID when CIDs aren't in use",
3431 ));
3432 }
3433 if frame.retire_prior_to > frame.sequence {
3434 return Err(TransportError::PROTOCOL_VIOLATION(
3435 "NEW_CONNECTION_ID retiring unissued CIDs",
3436 ));
3437 }
3438
3439 use crate::cid_queue::InsertError;
3440 match self.rem_cids.insert(frame) {
3441 Ok(None) => {}
3442 Ok(Some((retired, reset_token))) => {
3443 let pending_retired =
3444 &mut self.spaces[SpaceId::Data].pending.retire_cids;
3445 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
3448 if (pending_retired.len() as u64)
3451 .saturating_add(retired.end.saturating_sub(retired.start))
3452 > MAX_PENDING_RETIRED_CIDS
3453 {
3454 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
3455 "queued too many retired CIDs",
3456 ));
3457 }
3458 pending_retired.extend(retired);
3459 self.set_reset_token(reset_token);
3460 }
3461 Err(InsertError::ExceedsLimit) => {
3462 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
3463 }
3464 Err(InsertError::Retired) => {
3465 trace!("discarding already-retired");
3466 self.spaces[SpaceId::Data]
3470 .pending
3471 .retire_cids
3472 .push(frame.sequence);
3473 continue;
3474 }
3475 };
3476
3477 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
3478 self.update_rem_cid();
3481 }
3482 }
3483 Frame::NewToken(NewToken { token }) => {
3484 let ConnectionSide::Client {
3485 token_store,
3486 server_name,
3487 ..
3488 } = &self.side
3489 else {
3490 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
3491 };
3492 if token.is_empty() {
3493 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
3494 }
3495 trace!("got new token");
3496 token_store.insert(server_name, token);
3497 }
3498 Frame::Datagram(datagram) => {
3499 if self
3500 .datagrams
3501 .received(datagram, &self.config.datagram_receive_buffer_size)?
3502 {
3503 self.events.push_back(Event::DatagramReceived);
3504 }
3505 }
3506 Frame::AckFrequency(ack_frequency) => {
3507 let space = &mut self.spaces[SpaceId::Data];
3509
3510 if !self
3511 .ack_frequency
3512 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3513 {
3514 continue;
3516 }
3517
3518 if let Some(timeout) = space
3521 .pending_acks
3522 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3523 {
3524 self.timers.set(Timer::MaxAckDelay, timeout);
3525 }
3526 }
3527 Frame::ImmediateAck => {
3528 self.spaces[SpaceId::Data]
3530 .pending_acks
3531 .set_immediate_ack_required();
3532 }
3533 Frame::HandshakeDone => {
3534 if self.side.is_server() {
3535 return Err(TransportError::PROTOCOL_VIOLATION(
3536 "client sent HANDSHAKE_DONE",
3537 ));
3538 }
3539 if self.spaces[SpaceId::Handshake].crypto.is_some() {
3540 self.discard_space(now, SpaceId::Handshake);
3541 }
3542 }
3543 Frame::AddAddress(add_address) => {
3544 self.handle_add_address(&add_address, now)?;
3545 }
3546 Frame::PunchMeNow(punch_me_now) => {
3547 self.handle_punch_me_now(&punch_me_now, now)?;
3548 }
3549 Frame::RemoveAddress(remove_address) => {
3550 self.handle_remove_address(&remove_address)?;
3551 }
3552 Frame::ObservedAddress(observed_address) => {
3553 self.handle_observed_address_frame(&observed_address, now)?;
3554 }
3555 Frame::TryConnectTo(try_connect_to) => {
3556 self.handle_try_connect_to(&try_connect_to, now)?;
3557 }
3558 Frame::TryConnectToResponse(response) => {
3559 self.handle_try_connect_to_response(&response)?;
3560 }
3561 }
3562 }
3563
3564 let space = &mut self.spaces[SpaceId::Data];
3565 if space
3566 .pending_acks
3567 .packet_received(now, number, ack_eliciting, &space.dedup)
3568 {
3569 self.timers
3570 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3571 }
3572
3573 let pending = &mut self.spaces[SpaceId::Data].pending;
3578 self.streams.queue_max_stream_id(pending);
3579
3580 if let Some(reason) = close {
3581 self.error = Some(reason.into());
3582 self.state = State::Draining;
3583 self.close = true;
3584 }
3585
3586 if remote != self.path.remote
3587 && !is_probing_packet
3588 && number == self.spaces[SpaceId::Data].rx_packet
3589 {
3590 let ConnectionSide::Server { ref server_config } = self.side else {
3591 return Err(TransportError::PROTOCOL_VIOLATION(
3592 "packets from unknown remote should be dropped by clients",
3593 ));
3594 };
3595 debug_assert!(
3596 server_config.migration,
3597 "migration-initiating packets should have been dropped immediately"
3598 );
3599 self.migrate(now, remote);
3600 self.update_rem_cid();
3602 self.spin = false;
3603 }
3604
3605 Ok(())
3606 }
3607
3608 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3609 trace!(%remote, "migration initiated");
3610 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3614 PathData::from_previous(remote, &self.path, now)
3615 } else {
3616 let peer_max_udp_payload_size =
3617 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3618 .unwrap_or(u16::MAX);
3619 PathData::new(
3620 remote,
3621 self.allow_mtud,
3622 Some(peer_max_udp_payload_size),
3623 now,
3624 &self.config,
3625 )
3626 };
3627 new_path.challenge = Some(self.rng.r#gen());
3628 new_path.challenge_pending = true;
3629 let prev_pto = self.pto(SpaceId::Data);
3630
3631 let mut prev = mem::replace(&mut self.path, new_path);
3632 if prev.challenge.is_none() {
3634 prev.challenge = Some(self.rng.r#gen());
3635 prev.challenge_pending = true;
3636 self.prev_path = Some((self.rem_cids.active(), prev));
3639 }
3640
3641 self.timers.set(
3642 Timer::PathValidation,
3643 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3644 );
3645 }
3646
3647 pub fn local_address_changed(&mut self) {
3649 self.update_rem_cid();
3650 self.ping();
3651 }
3652
3653 pub fn migrate_to_nat_traversal_path(&mut self, now: Instant) -> Result<(), TransportError> {
3655 let (remote_addr, local_addr) = {
3657 let nat_state = self
3658 .nat_traversal
3659 .as_ref()
3660 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
3661
3662 let best_pairs = nat_state.get_best_succeeded_pairs();
3664 if best_pairs.is_empty() {
3665 return Err(TransportError::PROTOCOL_VIOLATION(
3666 "No validated NAT traversal paths",
3667 ));
3668 }
3669
3670 let best_path = best_pairs
3672 .iter()
3673 .find(|pair| pair.remote_addr != self.path.remote)
3674 .or_else(|| best_pairs.first());
3675
3676 let best_path = best_path.ok_or_else(|| {
3677 TransportError::PROTOCOL_VIOLATION("No suitable NAT traversal path")
3678 })?;
3679
3680 debug!(
3681 "Migrating to NAT traversal path: {} -> {} (priority: {})",
3682 self.path.remote, best_path.remote_addr, best_path.priority
3683 );
3684
3685 (best_path.remote_addr, best_path.local_addr)
3686 };
3687
3688 self.migrate(now, remote_addr);
3690
3691 if local_addr != SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0) {
3693 self.local_ip = Some(local_addr.ip());
3694 }
3695
3696 self.path.challenge_pending = true;
3698
3699 Ok(())
3700 }
3701
3702 fn update_rem_cid(&mut self) {
3704 let (reset_token, retired) = match self.rem_cids.next() {
3705 Some(x) => x,
3706 None => return,
3707 };
3708
3709 self.spaces[SpaceId::Data]
3711 .pending
3712 .retire_cids
3713 .extend(retired);
3714 self.set_reset_token(reset_token);
3715 }
3716
3717 fn set_reset_token(&mut self, reset_token: ResetToken) {
3718 self.endpoint_events
3719 .push_back(EndpointEventInner::ResetToken(
3720 self.path.remote,
3721 reset_token,
3722 ));
3723 self.peer_params.stateless_reset_token = Some(reset_token);
3724 }
3725
3726 fn issue_first_cids(&mut self, now: Instant) {
3728 if self.local_cid_state.cid_len() == 0 {
3729 return;
3730 }
3731
3732 let mut n = self.peer_params.issue_cids_limit() - 1;
3734 if let ConnectionSide::Server { server_config } = &self.side {
3735 if server_config.has_preferred_address() {
3736 n -= 1;
3738 }
3739 }
3740 self.endpoint_events
3741 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3742 }
3743
3744 fn populate_packet(
3745 &mut self,
3746 now: Instant,
3747 space_id: SpaceId,
3748 buf: &mut Vec<u8>,
3749 max_size: usize,
3750 pn: u64,
3751 ) -> SentFrames {
3752 let mut sent = SentFrames::default();
3753 let space = &mut self.spaces[space_id];
3754 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3755 space.pending_acks.maybe_ack_non_eliciting();
3756
3757 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3759 buf.write(frame::FrameType::HANDSHAKE_DONE);
3760 sent.retransmits.get_or_create().handshake_done = true;
3761 self.stats.frame_tx.handshake_done =
3763 self.stats.frame_tx.handshake_done.saturating_add(1);
3764 }
3765
3766 if mem::replace(&mut space.ping_pending, false) {
3768 trace!("PING");
3769 buf.write(frame::FrameType::PING);
3770 sent.non_retransmits = true;
3771 self.stats.frame_tx.ping += 1;
3772 }
3773
3774 if mem::replace(&mut space.immediate_ack_pending, false) {
3776 trace!("IMMEDIATE_ACK");
3777 buf.write(frame::FrameType::IMMEDIATE_ACK);
3778 sent.non_retransmits = true;
3779 self.stats.frame_tx.immediate_ack += 1;
3780 }
3781
3782 if space.pending_acks.can_send() {
3784 Self::populate_acks(
3785 now,
3786 self.receiving_ecn,
3787 &mut sent,
3788 space,
3789 buf,
3790 &mut self.stats,
3791 );
3792 }
3793
3794 if mem::replace(&mut space.pending.ack_frequency, false) {
3796 let sequence_number = self.ack_frequency.next_sequence_number();
3797
3798 let config = self.config.ack_frequency_config.as_ref().unwrap();
3800
3801 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3803 self.path.rtt.get(),
3804 config,
3805 &self.peer_params,
3806 );
3807
3808 trace!(?max_ack_delay, "ACK_FREQUENCY");
3809
3810 frame::AckFrequency {
3811 sequence: sequence_number,
3812 ack_eliciting_threshold: config.ack_eliciting_threshold,
3813 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3814 reordering_threshold: config.reordering_threshold,
3815 }
3816 .encode(buf);
3817
3818 sent.retransmits.get_or_create().ack_frequency = true;
3819
3820 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3821 self.stats.frame_tx.ack_frequency += 1;
3822 }
3823
3824 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3826 if let Some(token) = self.path.challenge {
3828 self.path.challenge_pending = false;
3830 sent.non_retransmits = true;
3831 sent.requires_padding = true;
3832 trace!("PATH_CHALLENGE {:08x}", token);
3833 buf.write(frame::FrameType::PATH_CHALLENGE);
3834 buf.write(token);
3835 self.stats.frame_tx.path_challenge += 1;
3836 }
3837
3838 }
3847
3848 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3850 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3851 sent.non_retransmits = true;
3852 sent.requires_padding = true;
3853 trace!("PATH_RESPONSE {:08x}", token);
3854 buf.write(frame::FrameType::PATH_RESPONSE);
3855 buf.write(token);
3856 self.stats.frame_tx.path_response += 1;
3857 }
3858 }
3859
3860 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3862 let mut frame = match space.pending.crypto.pop_front() {
3863 Some(x) => x,
3864 None => break,
3865 };
3866
3867 let max_crypto_data_size = max_size
3872 - buf.len()
3873 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3875 - 2; let available_space = max_size - buf.len();
3879 let remaining_data = frame.data.len();
3880 let optimal_size = self
3881 .pqc_state
3882 .calculate_crypto_frame_size(available_space, remaining_data);
3883
3884 let len = frame
3885 .data
3886 .len()
3887 .min(2usize.pow(14) - 1)
3888 .min(max_crypto_data_size)
3889 .min(optimal_size);
3890
3891 let data = frame.data.split_to(len);
3892 let truncated = frame::Crypto {
3893 offset: frame.offset,
3894 data,
3895 };
3896 trace!(
3897 "CRYPTO: off {} len {}",
3898 truncated.offset,
3899 truncated.data.len()
3900 );
3901 truncated.encode(buf);
3902 self.stats.frame_tx.crypto += 1;
3903 sent.retransmits.get_or_create().crypto.push_back(truncated);
3904 if !frame.data.is_empty() {
3905 frame.offset += len as u64;
3906 space.pending.crypto.push_front(frame);
3907 }
3908 }
3909
3910 if space_id == SpaceId::Data {
3911 self.streams.write_control_frames(
3912 buf,
3913 &mut space.pending,
3914 &mut sent.retransmits,
3915 &mut self.stats.frame_tx,
3916 max_size,
3917 );
3918 }
3919
3920 while buf.len() + 44 < max_size {
3922 let issued = match space.pending.new_cids.pop() {
3923 Some(x) => x,
3924 None => break,
3925 };
3926 trace!(
3927 sequence = issued.sequence,
3928 id = %issued.id,
3929 "NEW_CONNECTION_ID"
3930 );
3931 frame::NewConnectionId {
3932 sequence: issued.sequence,
3933 retire_prior_to: self.local_cid_state.retire_prior_to(),
3934 id: issued.id,
3935 reset_token: issued.reset_token,
3936 }
3937 .encode(buf);
3938 sent.retransmits.get_or_create().new_cids.push(issued);
3939 self.stats.frame_tx.new_connection_id += 1;
3940 }
3941
3942 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3944 let seq = match space.pending.retire_cids.pop() {
3945 Some(x) => x,
3946 None => break,
3947 };
3948 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3949 buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3950 buf.write_var(seq);
3951 sent.retransmits.get_or_create().retire_cids.push(seq);
3952 self.stats.frame_tx.retire_connection_id += 1;
3953 }
3954
3955 let mut sent_datagrams = false;
3957 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3958 match self.datagrams.write(buf, max_size) {
3959 true => {
3960 sent_datagrams = true;
3961 sent.non_retransmits = true;
3962 self.stats.frame_tx.datagram += 1;
3963 }
3964 false => break,
3965 }
3966 }
3967 if self.datagrams.send_blocked && sent_datagrams {
3968 self.events.push_back(Event::DatagramsUnblocked);
3969 self.datagrams.send_blocked = false;
3970 }
3971
3972 while let Some(remote_addr) = space.pending.new_tokens.pop() {
3974 debug_assert_eq!(space_id, SpaceId::Data);
3975 let ConnectionSide::Server { server_config } = &self.side else {
3976 debug_assert!(false, "NEW_TOKEN frames should not be enqueued by clients");
3978 continue;
3979 };
3980
3981 if remote_addr != self.path.remote {
3982 continue;
3987 }
3988
3989 if self.delay_new_token_until_binding && self.peer_id_for_tokens.is_none() {
3992 space.pending.new_tokens.push(remote_addr);
3994 break;
3995 }
3996
3997 let new_token = if let Some(pid) = self.peer_id_for_tokens {
3999 let nonce_u128: u128 = self.rng.r#gen();
4002 let nonce = nonce_u128.to_le_bytes();
4003 let cid = self.rem_cids.active();
4004 let mut pt = Vec::with_capacity(32 + 1 + cid.len() + 16);
4005 pt.extend_from_slice(&pid.0);
4006 pt.push(cid.len() as u8);
4007 pt.extend_from_slice(&cid[..]);
4008 pt.extend_from_slice(&nonce);
4009 let mut tok = pt;
4010 tok.extend_from_slice(&nonce[..12]);
4011 NewToken { token: tok.into() }
4012 } else {
4013 let token = Token::new(
4014 TokenPayload::Validation {
4015 ip: remote_addr.ip(),
4016 issued: server_config.time_source.now(),
4017 },
4018 &mut self.rng,
4019 );
4020 NewToken {
4021 token: token.encode(&*server_config.token_key).into(),
4022 }
4023 };
4024
4025 if buf.len() + new_token.size() >= max_size {
4026 space.pending.new_tokens.push(remote_addr);
4027 break;
4028 }
4029
4030 new_token.encode(buf);
4031 sent.retransmits
4032 .get_or_create()
4033 .new_tokens
4034 .push(remote_addr);
4035 self.stats.frame_tx.new_token += 1;
4036 }
4037
4038 while buf.len() + frame::AddAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4040 let add_address = match space.pending.add_addresses.pop() {
4041 Some(x) => x,
4042 None => break,
4043 };
4044 trace!(
4045 sequence = %add_address.sequence,
4046 address = %add_address.address,
4047 "ADD_ADDRESS"
4048 );
4049 if self.nat_traversal_frame_config.use_rfc_format {
4051 add_address.encode_rfc(buf);
4052 } else {
4053 add_address.encode_legacy(buf);
4054 }
4055 sent.retransmits
4056 .get_or_create()
4057 .add_addresses
4058 .push(add_address);
4059 self.stats.frame_tx.add_address += 1;
4060 }
4061
4062 while buf.len() + frame::PunchMeNow::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4064 let punch_me_now = match space.pending.punch_me_now.pop() {
4065 Some(x) => x,
4066 None => break,
4067 };
4068 trace!(
4069 round = %punch_me_now.round,
4070 paired_with_sequence_number = %punch_me_now.paired_with_sequence_number,
4071 "PUNCH_ME_NOW"
4072 );
4073 if self.nat_traversal_frame_config.use_rfc_format {
4075 punch_me_now.encode_rfc(buf);
4076 } else {
4077 punch_me_now.encode_legacy(buf);
4078 }
4079 sent.retransmits
4080 .get_or_create()
4081 .punch_me_now
4082 .push(punch_me_now);
4083 self.stats.frame_tx.punch_me_now += 1;
4084 }
4085
4086 while buf.len() + frame::RemoveAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4088 let remove_address = match space.pending.remove_addresses.pop() {
4089 Some(x) => x,
4090 None => break,
4091 };
4092 trace!(
4093 sequence = %remove_address.sequence,
4094 "REMOVE_ADDRESS"
4095 );
4096 remove_address.encode(buf);
4098 sent.retransmits
4099 .get_or_create()
4100 .remove_addresses
4101 .push(remove_address);
4102 self.stats.frame_tx.remove_address += 1;
4103 }
4104
4105 while buf.len() + frame::ObservedAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data
4107 {
4108 let observed_address = match space.pending.outbound_observations.pop() {
4109 Some(x) => x,
4110 None => break,
4111 };
4112 trace!(
4113 address = %observed_address.address,
4114 "OBSERVED_ADDRESS"
4115 );
4116 observed_address.encode(buf);
4117 sent.retransmits
4118 .get_or_create()
4119 .outbound_observations
4120 .push(observed_address);
4121 self.stats.frame_tx.observed_address += 1;
4122 }
4123
4124 if space_id == SpaceId::Data {
4126 sent.stream_frames =
4127 self.streams
4128 .write_stream_frames(buf, max_size, self.config.send_fairness);
4129 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
4130 }
4131
4132 sent
4133 }
4134
4135 fn populate_acks(
4140 now: Instant,
4141 receiving_ecn: bool,
4142 sent: &mut SentFrames,
4143 space: &mut PacketSpace,
4144 buf: &mut Vec<u8>,
4145 stats: &mut ConnectionStats,
4146 ) {
4147 debug_assert!(!space.pending_acks.ranges().is_empty());
4148
4149 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
4151 let ecn = if receiving_ecn {
4152 Some(&space.ecn_counters)
4153 } else {
4154 None
4155 };
4156 sent.largest_acked = space.pending_acks.ranges().max();
4157
4158 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
4159
4160 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
4162 let delay = delay_micros >> ack_delay_exp.into_inner();
4163
4164 trace!(
4165 "ACK {:?}, Delay = {}us",
4166 space.pending_acks.ranges(),
4167 delay_micros
4168 );
4169
4170 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
4171 stats.frame_tx.acks += 1;
4172 }
4173
4174 fn close_common(&mut self) {
4175 trace!("connection closed");
4176 for &timer in &Timer::VALUES {
4177 self.timers.stop(timer);
4178 }
4179 }
4180
4181 fn set_close_timer(&mut self, now: Instant) {
4182 self.timers
4183 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
4184 }
4185
4186 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
4188 if Some(self.orig_rem_cid) != params.initial_src_cid
4189 || (self.side.is_client()
4190 && (Some(self.initial_dst_cid) != params.original_dst_cid
4191 || self.retry_src_cid != params.retry_src_cid))
4192 {
4193 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
4194 "CID authentication failure",
4195 ));
4196 }
4197
4198 self.set_peer_params(params);
4199
4200 Ok(())
4201 }
4202
4203 fn set_peer_params(&mut self, params: TransportParameters) {
4204 self.streams.set_params(¶ms);
4205 self.idle_timeout =
4206 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
4207 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
4208 if let Some(ref info) = params.preferred_address {
4209 self.rem_cids.insert(frame::NewConnectionId {
4210 sequence: 1,
4211 id: info.connection_id,
4212 reset_token: info.stateless_reset_token,
4213 retire_prior_to: 0,
4214 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
4215 }
4216 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
4217
4218 self.negotiate_nat_traversal_capability(¶ms);
4220
4221 let local_has_nat_traversal = self.config.nat_traversal_config.is_some();
4224 let local_supports_rfc = local_has_nat_traversal;
4227 self.nat_traversal_frame_config = frame::nat_traversal_unified::NatTraversalFrameConfig {
4228 use_rfc_format: local_supports_rfc && params.supports_rfc_nat_traversal(),
4230 accept_legacy: true,
4232 };
4233
4234 self.negotiate_address_discovery(¶ms);
4236
4237 self.pqc_state.update_from_peer_params(¶ms);
4239
4240 if self.pqc_state.enabled && self.pqc_state.using_pqc {
4242 trace!("PQC enabled, adjusting MTU discovery for larger handshake packets");
4243 let current_mtu = self.path.mtud.current_mtu();
4247 if current_mtu < self.pqc_state.handshake_mtu {
4248 trace!(
4249 "Current MTU {} is less than PQC handshake MTU {}, will rely on MTU discovery",
4250 current_mtu, self.pqc_state.handshake_mtu
4251 );
4252 }
4253 }
4254
4255 self.peer_params = params;
4256 self.path.mtud.on_peer_max_udp_payload_size_received(
4257 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
4258 );
4259 }
4260
4261 fn negotiate_nat_traversal_capability(&mut self, params: &TransportParameters) {
4263 let peer_nat_config = match ¶ms.nat_traversal {
4265 Some(config) => config,
4266 None => {
4267 if self.config.nat_traversal_config.is_some() {
4269 debug!(
4270 "Peer does not support NAT traversal, maintaining backward compatibility"
4271 );
4272 self.emit_nat_traversal_capability_event(false);
4273
4274 self.set_nat_traversal_compatibility_mode(false);
4276 }
4277 return;
4278 }
4279 };
4280
4281 let local_nat_config = match &self.config.nat_traversal_config {
4283 Some(config) => config,
4284 None => {
4285 debug!("NAT traversal not enabled locally, ignoring peer support");
4286 self.emit_nat_traversal_capability_event(false);
4287 self.set_nat_traversal_compatibility_mode(false);
4288 return;
4289 }
4290 };
4291
4292 info!("Both peers support NAT traversal, negotiating capabilities");
4294
4295 match self.negotiate_nat_traversal_parameters(local_nat_config, peer_nat_config) {
4297 Ok(negotiated_config) => {
4298 info!("NAT traversal capability negotiated successfully");
4299 self.emit_nat_traversal_capability_event(true);
4300
4301 self.init_nat_traversal_with_negotiated_config(&negotiated_config);
4303
4304 self.set_nat_traversal_compatibility_mode(true);
4306
4307 if matches!(
4309 negotiated_config,
4310 crate::transport_parameters::NatTraversalConfig::ClientSupport
4311 ) {
4312 self.initiate_nat_traversal_process();
4313 }
4314 }
4315 Err(e) => {
4316 warn!("NAT traversal capability negotiation failed: {}", e);
4317 self.emit_nat_traversal_capability_event(false);
4318 self.set_nat_traversal_compatibility_mode(false);
4319 }
4320 }
4321 }
4322
4323 fn emit_nat_traversal_capability_event(&mut self, negotiated: bool) {
4377 if negotiated {
4380 info!("NAT traversal capability successfully negotiated");
4381 } else {
4382 info!("NAT traversal capability not available (peer or local support missing)");
4383 }
4384
4385 }
4388
4389 fn set_nat_traversal_compatibility_mode(&mut self, enabled: bool) {
4391 if enabled {
4392 debug!("NAT traversal enabled for this connection");
4393 } else {
4395 debug!("NAT traversal disabled for this connection (backward compatibility mode)");
4396 if self.nat_traversal.is_some() {
4398 warn!("Clearing NAT traversal state due to compatibility mode");
4399 self.nat_traversal = None;
4400 }
4401 }
4402 }
4403
4404 fn negotiate_nat_traversal_parameters(
4406 &self,
4407 local_config: &crate::transport_parameters::NatTraversalConfig,
4408 peer_config: &crate::transport_parameters::NatTraversalConfig,
4409 ) -> Result<crate::transport_parameters::NatTraversalConfig, String> {
4410 match (local_config, peer_config) {
4415 (
4417 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4418 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4419 concurrency_limit,
4420 },
4421 ) => Ok(
4422 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4423 concurrency_limit: *concurrency_limit,
4424 },
4425 ),
4426 (
4428 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4429 concurrency_limit,
4430 },
4431 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4432 ) => Ok(
4433 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4434 concurrency_limit: *concurrency_limit,
4435 },
4436 ),
4437 (
4439 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4440 concurrency_limit: limit1,
4441 },
4442 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4443 concurrency_limit: limit2,
4444 },
4445 ) => Ok(
4446 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4447 concurrency_limit: (*limit1).min(*limit2),
4448 },
4449 ),
4450 (
4452 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4453 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4454 ) => Err("Both endpoints claim to be NAT traversal clients".to_string()),
4455 }
4456 }
4457
4458 fn init_nat_traversal_with_negotiated_config(
4463 &mut self,
4464 _config: &crate::transport_parameters::NatTraversalConfig,
4465 ) {
4466 let max_candidates = 50; let coordination_timeout = Duration::from_secs(10); self.nat_traversal = Some(NatTraversalState::new(max_candidates, coordination_timeout));
4473
4474 trace!("NAT traversal initialized for symmetric P2P node");
4475
4476 self.prepare_address_observation();
4479 self.schedule_candidate_discovery();
4480 self.prepare_coordination_handling();
4481 }
4482
4483 fn initiate_nat_traversal_process(&mut self) {
4485 if let Some(nat_state) = &mut self.nat_traversal {
4486 match nat_state.start_candidate_discovery() {
4487 Ok(()) => {
4488 debug!("NAT traversal process initiated - candidate discovery started");
4489 self.timers.set(
4491 Timer::NatTraversal,
4492 Instant::now() + Duration::from_millis(100),
4493 );
4494 }
4495 Err(e) => {
4496 warn!("Failed to initiate NAT traversal process: {}", e);
4497 }
4498 }
4499 }
4500 }
4501
4502 fn prepare_address_observation(&mut self) {
4504 debug!("Preparing for address observation as bootstrap node");
4505 }
4508
4509 fn schedule_candidate_discovery(&mut self) {
4511 debug!("Scheduling candidate discovery for client endpoint");
4512 self.timers.set(
4514 Timer::NatTraversal,
4515 Instant::now() + Duration::from_millis(50),
4516 );
4517 }
4518
4519 fn prepare_coordination_handling(&mut self) {
4521 debug!("Preparing to handle coordination requests as server endpoint");
4522 }
4525
4526 fn handle_nat_traversal_timeout(&mut self, now: Instant) {
4528 let timeout_result = if let Some(nat_state) = &mut self.nat_traversal {
4530 nat_state.handle_timeout(now)
4531 } else {
4532 return;
4533 };
4534
4535 match timeout_result {
4537 Ok(actions) => {
4538 for action in actions {
4539 match action {
4540 nat_traversal::TimeoutAction::RetryDiscovery => {
4541 debug!("NAT traversal timeout: retrying candidate discovery");
4542 if let Some(nat_state) = &mut self.nat_traversal {
4543 if let Err(e) = nat_state.start_candidate_discovery() {
4544 warn!("Failed to retry candidate discovery: {}", e);
4545 }
4546 }
4547 }
4548 nat_traversal::TimeoutAction::RetryCoordination => {
4549 debug!("NAT traversal timeout: retrying coordination");
4550 self.timers
4552 .set(Timer::NatTraversal, now + Duration::from_secs(2));
4553 }
4554 nat_traversal::TimeoutAction::StartValidation => {
4555 debug!("NAT traversal timeout: starting path validation");
4556 self.start_nat_traversal_validation(now);
4557 }
4558 nat_traversal::TimeoutAction::Complete => {
4559 debug!("NAT traversal completed successfully");
4560 self.timers.stop(Timer::NatTraversal);
4562 }
4563 nat_traversal::TimeoutAction::Failed => {
4564 warn!("NAT traversal failed after timeout");
4565 self.handle_nat_traversal_failure();
4567 }
4568 }
4569 }
4570 }
4571 Err(e) => {
4572 warn!("NAT traversal timeout handling failed: {}", e);
4573 self.handle_nat_traversal_failure();
4574 }
4575 }
4576 }
4577
4578 fn start_nat_traversal_validation(&mut self, now: Instant) {
4580 if let Some(nat_state) = &mut self.nat_traversal {
4581 let pairs = nat_state.get_next_validation_pairs(3);
4583
4584 for pair in pairs {
4585 let challenge = self.rng.r#gen();
4587 self.path.challenge = Some(challenge);
4588 self.path.challenge_pending = true;
4589
4590 debug!(
4591 "Starting path validation for NAT traversal candidate: {}",
4592 pair.remote_addr
4593 );
4594 }
4595
4596 self.timers
4598 .set(Timer::PathValidation, now + Duration::from_secs(3));
4599 }
4600 }
4601
4602 fn handle_nat_traversal_failure(&mut self) {
4604 warn!("NAT traversal failed, considering fallback options");
4605
4606 self.nat_traversal = None;
4608 self.timers.stop(Timer::NatTraversal);
4609
4610 debug!("NAT traversal disabled for this connection due to failure");
4617 }
4618
4619 pub fn nat_traversal_supported(&self) -> bool {
4621 self.nat_traversal.is_some()
4622 && self.config.nat_traversal_config.is_some()
4623 && self.peer_params.nat_traversal.is_some()
4624 }
4625
4626 pub fn nat_traversal_config(&self) -> Option<&crate::transport_parameters::NatTraversalConfig> {
4628 self.peer_params.nat_traversal.as_ref()
4629 }
4630
4631 pub fn nat_traversal_ready(&self) -> bool {
4633 self.nat_traversal_supported() && matches!(self.state, State::Established)
4634 }
4635
4636 #[allow(dead_code)]
4641 pub(crate) fn nat_traversal_stats(&self) -> Option<nat_traversal::NatTraversalStats> {
4642 self.nat_traversal.as_ref().map(|state| state.stats.clone())
4643 }
4644
4645 #[cfg(test)]
4649 #[allow(dead_code)]
4650 pub(crate) fn force_enable_nat_traversal(&mut self) {
4651 use crate::transport_parameters::NatTraversalConfig;
4652
4653 let config = NatTraversalConfig::ServerSupport {
4655 concurrency_limit: VarInt::from_u32(5),
4656 };
4657
4658 self.peer_params.nat_traversal = Some(config.clone());
4659 self.config = Arc::new({
4660 let mut transport_config = (*self.config).clone();
4661 transport_config.nat_traversal_config = Some(config);
4662 transport_config
4663 });
4664
4665 self.nat_traversal = Some(NatTraversalState::new(8, Duration::from_secs(10)));
4667 }
4668
4669 fn derive_peer_id_from_connection(&self) -> [u8; 32] {
4672 let mut hasher = std::collections::hash_map::DefaultHasher::new();
4674 use std::hash::Hasher;
4675 hasher.write(&self.rem_handshake_cid);
4676 hasher.write(&self.handshake_cid);
4677 hasher.write(&self.path.remote.to_string().into_bytes());
4678 let hash = hasher.finish();
4679 let mut peer_id = [0u8; 32];
4680 peer_id[..8].copy_from_slice(&hash.to_be_bytes());
4681 let cid_bytes = self.rem_handshake_cid.as_ref();
4683 let copy_len = (cid_bytes.len()).min(24);
4684 peer_id[8..8 + copy_len].copy_from_slice(&cid_bytes[..copy_len]);
4685 peer_id
4686 }
4687
4688 fn handle_add_address(
4690 &mut self,
4691 add_address: &crate::frame::AddAddress,
4692 now: Instant,
4693 ) -> Result<(), TransportError> {
4694 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4695 TransportError::PROTOCOL_VIOLATION("AddAddress frame without NAT traversal negotiation")
4696 })?;
4697
4698 match nat_state.add_remote_candidate(
4699 add_address.sequence,
4700 add_address.address,
4701 add_address.priority,
4702 now,
4703 ) {
4704 Ok(()) => {
4705 trace!(
4706 "Added remote candidate: {} (seq={}, priority={})",
4707 add_address.address, add_address.sequence, add_address.priority
4708 );
4709
4710 self.trigger_candidate_validation(add_address.address, now)?;
4712 Ok(())
4713 }
4714 Err(NatTraversalError::TooManyCandidates) => Err(TransportError::PROTOCOL_VIOLATION(
4715 "too many NAT traversal candidates",
4716 )),
4717 Err(NatTraversalError::DuplicateAddress) => {
4718 Ok(())
4720 }
4721 Err(e) => {
4722 warn!("Failed to add remote candidate: {}", e);
4723 Ok(()) }
4725 }
4726 }
4727
4728 fn handle_punch_me_now(
4732 &mut self,
4733 punch_me_now: &crate::frame::PunchMeNow,
4734 now: Instant,
4735 ) -> Result<(), TransportError> {
4736 trace!(
4737 "Received PunchMeNow: round={}, target_seq={}, local_addr={}",
4738 punch_me_now.round, punch_me_now.paired_with_sequence_number, punch_me_now.address
4739 );
4740
4741 if let Some(nat_state) = &self.nat_traversal {
4743 if nat_state.bootstrap_coordinator.is_some() {
4745 let from_peer_id = self.derive_peer_id_from_connection();
4747
4748 let punch_me_now_clone = punch_me_now.clone();
4750 drop(nat_state); match self
4753 .nat_traversal
4754 .as_mut()
4755 .unwrap()
4756 .handle_punch_me_now_frame(
4757 from_peer_id,
4758 self.path.remote,
4759 &punch_me_now_clone,
4760 now,
4761 ) {
4762 Ok(Some(coordination_frame)) => {
4763 trace!("Node coordinating PUNCH_ME_NOW between peers");
4764
4765 if let Some(target_peer_id) = punch_me_now.target_peer_id {
4767 self.endpoint_events.push_back(
4768 crate::shared::EndpointEventInner::RelayPunchMeNow(
4769 target_peer_id,
4770 coordination_frame,
4771 ),
4772 );
4773 }
4774
4775 return Ok(());
4776 }
4777 Ok(None) => {
4778 trace!("Coordination completed or no action needed");
4779 return Ok(());
4780 }
4781 Err(e) => {
4782 warn!("Coordination failed: {}", e);
4783 return Ok(());
4784 }
4785 }
4786 }
4787 }
4788
4789 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4791 TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation")
4792 })?;
4793
4794 if nat_state
4796 .handle_peer_punch_request(punch_me_now.round, now)
4797 .map_err(|_e| {
4798 TransportError::PROTOCOL_VIOLATION("Failed to handle peer punch request")
4799 })?
4800 {
4801 trace!("Coordination synchronized for round {}", punch_me_now.round);
4802
4803 let _local_addr = self
4806 .local_ip
4807 .map(|ip| SocketAddr::new(ip, 0))
4808 .unwrap_or_else(|| {
4809 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
4810 });
4811
4812 let target = nat_traversal::PunchTarget {
4813 remote_addr: punch_me_now.address,
4814 remote_sequence: punch_me_now.paired_with_sequence_number,
4815 challenge: self.rng.r#gen(),
4816 };
4817
4818 let _ = nat_state.start_coordination_round(vec![target], now);
4820 } else {
4821 debug!(
4822 "Failed to synchronize coordination for round {}",
4823 punch_me_now.round
4824 );
4825 }
4826
4827 Ok(())
4828 }
4829
4830 fn handle_remove_address(
4832 &mut self,
4833 remove_address: &crate::frame::RemoveAddress,
4834 ) -> Result<(), TransportError> {
4835 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4836 TransportError::PROTOCOL_VIOLATION(
4837 "RemoveAddress frame without NAT traversal negotiation",
4838 )
4839 })?;
4840
4841 if nat_state.remove_candidate(remove_address.sequence) {
4842 trace!(
4843 "Removed candidate with sequence {}",
4844 remove_address.sequence
4845 );
4846 } else {
4847 trace!(
4848 "Attempted to remove unknown candidate sequence {}",
4849 remove_address.sequence
4850 );
4851 }
4852
4853 Ok(())
4854 }
4855
4856 fn handle_observed_address_frame(
4858 &mut self,
4859 observed_address: &crate::frame::ObservedAddress,
4860 now: Instant,
4861 ) -> Result<(), TransportError> {
4862 println!(
4863 "DEBUG: handle_observed_address_frame: received address {}",
4864 observed_address.address
4865 );
4866 let state = self.address_discovery_state.as_mut().ok_or_else(|| {
4868 TransportError::PROTOCOL_VIOLATION(
4869 "ObservedAddress frame without address discovery negotiation",
4870 )
4871 })?;
4872
4873 if !state.enabled {
4875 return Err(TransportError::PROTOCOL_VIOLATION(
4876 "ObservedAddress frame received when address discovery is disabled",
4877 ));
4878 }
4879
4880 #[cfg(feature = "trace")]
4882 {
4883 use crate::trace_observed_address_received;
4884 trace_observed_address_received!(
4886 &self.event_log,
4887 self.trace_context.trace_id(),
4888 observed_address.address,
4889 0u64 );
4891 }
4892
4893 let path_id = 0u64; if let Some(&last_seq) = state.last_received_sequence.get(&path_id) {
4901 if observed_address.sequence_number <= last_seq {
4902 trace!(
4903 "Ignoring OBSERVED_ADDRESS frame with stale sequence number {} (last was {})",
4904 observed_address.sequence_number, last_seq
4905 );
4906 return Ok(());
4907 }
4908 }
4909
4910 state
4912 .last_received_sequence
4913 .insert(path_id, observed_address.sequence_number);
4914
4915 state.handle_observed_address(observed_address.address, path_id, now);
4917
4918 self.path
4920 .update_observed_address(observed_address.address, now);
4921
4922 trace!(
4924 "Received ObservedAddress frame: address={} for path={}",
4925 observed_address.address, path_id
4926 );
4927
4928 Ok(())
4929 }
4930
4931 fn handle_try_connect_to(
4936 &mut self,
4937 try_connect_to: &crate::frame::TryConnectTo,
4938 now: Instant,
4939 ) -> Result<(), TransportError> {
4940 trace!(
4941 "Received TryConnectTo: request_id={}, target={}, timeout_ms={}",
4942 try_connect_to.request_id, try_connect_to.target_address, try_connect_to.timeout_ms
4943 );
4944
4945 let target = try_connect_to.target_address;
4947
4948 if target.ip().is_loopback() {
4950 warn!(
4951 "Rejecting TryConnectTo request to loopback address: {}",
4952 target
4953 );
4954 let response = crate::frame::TryConnectToResponse {
4956 request_id: try_connect_to.request_id,
4957 success: false,
4958 error_code: Some(crate::frame::TryConnectError::InvalidAddress),
4959 source_address: self.path.remote,
4960 };
4961 self.spaces[SpaceId::Data]
4962 .pending
4963 .try_connect_to_responses
4964 .push(response);
4965 return Ok(());
4966 }
4967
4968 if target.ip().is_unspecified() {
4970 warn!(
4971 "Rejecting TryConnectTo request to unspecified address: {}",
4972 target
4973 );
4974 let response = crate::frame::TryConnectToResponse {
4975 request_id: try_connect_to.request_id,
4976 success: false,
4977 error_code: Some(crate::frame::TryConnectError::InvalidAddress),
4978 source_address: self.path.remote,
4979 };
4980 self.spaces[SpaceId::Data]
4981 .pending
4982 .try_connect_to_responses
4983 .push(response);
4984 return Ok(());
4985 }
4986
4987 self.endpoint_events
4990 .push_back(EndpointEventInner::TryConnectTo {
4991 request_id: try_connect_to.request_id,
4992 target_address: try_connect_to.target_address,
4993 timeout_ms: try_connect_to.timeout_ms,
4994 requester_connection: self.path.remote,
4995 requested_at: now,
4996 });
4997
4998 trace!(
4999 "Queued TryConnectTo attempt for request_id={}",
5000 try_connect_to.request_id
5001 );
5002
5003 Ok(())
5004 }
5005
5006 fn handle_try_connect_to_response(
5008 &mut self,
5009 response: &crate::frame::TryConnectToResponse,
5010 ) -> Result<(), TransportError> {
5011 trace!(
5012 "Received TryConnectToResponse: request_id={}, success={}, error={:?}, source={}",
5013 response.request_id, response.success, response.error_code, response.source_address
5014 );
5015
5016 if response.success {
5019 debug!(
5020 "TryConnectTo succeeded: target can receive connections from {}",
5021 response.source_address
5022 );
5023
5024 if let Some(nat_state) = &mut self.nat_traversal {
5026 nat_state
5027 .record_successful_callback_probe(response.request_id, response.source_address);
5028 }
5029 } else {
5030 debug!("TryConnectTo failed with error: {:?}", response.error_code);
5031
5032 if let Some(nat_state) = &mut self.nat_traversal {
5034 nat_state.record_failed_callback_probe(response.request_id, response.error_code);
5035 }
5036 }
5037
5038 Ok(())
5039 }
5040
5041 pub fn queue_add_address(&mut self, sequence: VarInt, address: SocketAddr, priority: VarInt) {
5043 let add_address = frame::AddAddress {
5045 sequence,
5046 address,
5047 priority,
5048 };
5049
5050 self.spaces[SpaceId::Data]
5051 .pending
5052 .add_addresses
5053 .push(add_address);
5054 trace!(
5055 "Queued AddAddress frame: seq={}, addr={}, priority={}",
5056 sequence, address, priority
5057 );
5058 }
5059
5060 pub fn queue_punch_me_now(
5062 &mut self,
5063 round: VarInt,
5064 paired_with_sequence_number: VarInt,
5065 address: SocketAddr,
5066 ) {
5067 let punch_me_now = frame::PunchMeNow {
5068 round,
5069 paired_with_sequence_number,
5070 address,
5071 target_peer_id: None, };
5073
5074 self.spaces[SpaceId::Data]
5075 .pending
5076 .punch_me_now
5077 .push(punch_me_now);
5078 trace!(
5079 "Queued PunchMeNow frame: round={}, target={}",
5080 round, paired_with_sequence_number
5081 );
5082 }
5083
5084 pub fn queue_remove_address(&mut self, sequence: VarInt) {
5086 let remove_address = frame::RemoveAddress { sequence };
5087
5088 self.spaces[SpaceId::Data]
5089 .pending
5090 .remove_addresses
5091 .push(remove_address);
5092 trace!("Queued RemoveAddress frame: seq={}", sequence);
5093 }
5094
5095 pub fn queue_observed_address(&mut self, address: SocketAddr) {
5097 let sequence_number = if let Some(state) = &mut self.address_discovery_state {
5099 let seq = state.next_sequence_number;
5100 state.next_sequence_number =
5101 VarInt::from_u64(state.next_sequence_number.into_inner() + 1)
5102 .expect("sequence number overflow");
5103 seq
5104 } else {
5105 VarInt::from_u32(0)
5107 };
5108
5109 let observed_address = frame::ObservedAddress {
5110 sequence_number,
5111 address,
5112 };
5113 self.spaces[SpaceId::Data]
5114 .pending
5115 .outbound_observations
5116 .push(observed_address);
5117 trace!("Queued ObservedAddress frame: addr={}", address);
5118 }
5119
5120 pub fn check_for_address_observations(&mut self, now: Instant) {
5122 let Some(state) = &mut self.address_discovery_state else {
5124 return;
5125 };
5126
5127 if !state.enabled {
5129 return;
5130 }
5131
5132 let path_id = 0u64; let remote_address = self.path.remote;
5137
5138 if state.should_send_observation(path_id, now) {
5140 if let Some(frame) = state.queue_observed_address_frame(path_id, remote_address) {
5142 self.spaces[SpaceId::Data]
5144 .pending
5145 .outbound_observations
5146 .push(frame);
5147
5148 state.record_observation_sent(path_id);
5150
5151 #[cfg(feature = "trace")]
5153 {
5154 use crate::trace_observed_address_sent;
5155 trace_observed_address_sent!(
5157 &self.event_log,
5158 self.trace_context.trace_id(),
5159 remote_address,
5160 path_id
5161 );
5162 }
5163
5164 trace!(
5165 "Queued OBSERVED_ADDRESS frame for path {} with address {}",
5166 path_id, remote_address
5167 );
5168 }
5169 }
5170 }
5171
5172 fn trigger_candidate_validation(
5174 &mut self,
5175 candidate_address: SocketAddr,
5176 now: Instant,
5177 ) -> Result<(), TransportError> {
5178 let nat_state = self
5179 .nat_traversal
5180 .as_mut()
5181 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5182
5183 if nat_state
5185 .active_validations
5186 .contains_key(&candidate_address)
5187 {
5188 trace!("Validation already in progress for {}", candidate_address);
5189 return Ok(());
5190 }
5191
5192 let challenge = self.rng.r#gen::<u64>();
5194
5195 let validation_state = nat_traversal::PathValidationState {
5197 challenge,
5198 sent_at: now,
5199 retry_count: 0,
5200 max_retries: 3,
5201 coordination_round: None,
5202 timeout_state: nat_traversal::AdaptiveTimeoutState::new(),
5203 last_retry_at: None,
5204 };
5205
5206 nat_state
5208 .active_validations
5209 .insert(candidate_address, validation_state);
5210
5211 self.nat_traversal_challenges
5213 .push(candidate_address, challenge);
5214
5215 nat_state.stats.validations_succeeded += 1; trace!(
5219 "Triggered PATH_CHALLENGE validation for {} with challenge {:016x}",
5220 candidate_address, challenge
5221 );
5222
5223 Ok(())
5224 }
5225
5226 pub fn nat_traversal_state(&self) -> Option<(usize, usize)> {
5231 self.nat_traversal
5232 .as_ref()
5233 .map(|state| (state.local_candidates.len(), state.remote_candidates.len()))
5234 }
5235
5236 pub fn initiate_nat_traversal_coordination(
5238 &mut self,
5239 now: Instant,
5240 ) -> Result<(), TransportError> {
5241 let nat_state = self
5242 .nat_traversal
5243 .as_mut()
5244 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5245
5246 if nat_state.should_send_punch_request() {
5248 nat_state.generate_candidate_pairs(now);
5250
5251 let pairs = nat_state.get_next_validation_pairs(3);
5253 if pairs.is_empty() {
5254 return Err(TransportError::PROTOCOL_VIOLATION(
5255 "No candidate pairs for coordination",
5256 ));
5257 }
5258
5259 let targets: Vec<_> = pairs
5261 .into_iter()
5262 .map(|pair| nat_traversal::PunchTarget {
5263 remote_addr: pair.remote_addr,
5264 remote_sequence: pair.remote_sequence,
5265 challenge: self.rng.r#gen(),
5266 })
5267 .collect();
5268
5269 let round = nat_state
5271 .start_coordination_round(targets, now)
5272 .map_err(|_e| {
5273 TransportError::PROTOCOL_VIOLATION("Failed to start coordination round")
5274 })?;
5275
5276 let local_addr = self
5279 .local_ip
5280 .map(|ip| SocketAddr::new(ip, self.local_ip.map(|_| 0).unwrap_or(0)))
5281 .unwrap_or_else(|| {
5282 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
5283 });
5284
5285 let punch_me_now = frame::PunchMeNow {
5286 round,
5287 paired_with_sequence_number: VarInt::from_u32(0), address: local_addr,
5289 target_peer_id: None, };
5291
5292 self.spaces[SpaceId::Data]
5293 .pending
5294 .punch_me_now
5295 .push(punch_me_now);
5296 nat_state.mark_punch_request_sent();
5297
5298 trace!("Initiated NAT traversal coordination round {}", round);
5299 }
5300
5301 Ok(())
5302 }
5303
5304 pub fn validate_nat_candidates(&mut self, now: Instant) {
5306 self.generate_nat_traversal_challenges(now);
5307 }
5308
5309 pub fn send_nat_address_advertisement(
5324 &mut self,
5325 address: SocketAddr,
5326 priority: u32,
5327 ) -> Result<u64, ConnectionError> {
5328 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5330 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5331 "NAT traversal not enabled on this connection",
5332 ))
5333 })?;
5334
5335 let sequence = nat_state.next_sequence;
5337 nat_state.next_sequence =
5338 VarInt::from_u64(nat_state.next_sequence.into_inner() + 1).unwrap();
5339
5340 let now = Instant::now();
5342 nat_state.local_candidates.insert(
5343 sequence,
5344 nat_traversal::AddressCandidate {
5345 address,
5346 priority,
5347 source: nat_traversal::CandidateSource::Local,
5348 discovered_at: now,
5349 state: nat_traversal::CandidateState::New,
5350 attempt_count: 0,
5351 last_attempt: None,
5352 },
5353 );
5354
5355 nat_state.stats.local_candidates_sent += 1;
5357
5358 self.queue_add_address(sequence, address, VarInt::from_u32(priority));
5360
5361 debug!(
5362 "Queued ADD_ADDRESS frame: addr={}, priority={}, seq={}",
5363 address, priority, sequence
5364 );
5365 Ok(sequence.into_inner())
5366 }
5367
5368 pub fn send_nat_punch_coordination(
5381 &mut self,
5382 paired_with_sequence_number: u64,
5383 address: SocketAddr,
5384 round: u32,
5385 ) -> Result<(), ConnectionError> {
5386 let _nat_state = self.nat_traversal.as_ref().ok_or_else(|| {
5388 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5389 "NAT traversal not enabled on this connection",
5390 ))
5391 })?;
5392
5393 self.queue_punch_me_now(
5395 VarInt::from_u32(round),
5396 VarInt::from_u64(paired_with_sequence_number).map_err(|_| {
5397 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5398 "Invalid target sequence number",
5399 ))
5400 })?,
5401 address,
5402 );
5403
5404 debug!(
5405 "Queued PUNCH_ME_NOW frame: paired_with_seq={}, addr={}, round={}",
5406 paired_with_sequence_number, address, round
5407 );
5408 Ok(())
5409 }
5410
5411 pub fn send_nat_address_removal(&mut self, sequence: u64) -> Result<(), ConnectionError> {
5422 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5424 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5425 "NAT traversal not enabled on this connection",
5426 ))
5427 })?;
5428
5429 let sequence_varint = VarInt::from_u64(sequence).map_err(|_| {
5430 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5431 "Invalid sequence number",
5432 ))
5433 })?;
5434
5435 nat_state.local_candidates.remove(&sequence_varint);
5437
5438 self.queue_remove_address(sequence_varint);
5440
5441 debug!("Queued REMOVE_ADDRESS frame: seq={}", sequence);
5442 Ok(())
5443 }
5444
5445 #[allow(dead_code)]
5454 pub(crate) fn get_nat_traversal_stats(&self) -> Option<&nat_traversal::NatTraversalStats> {
5455 self.nat_traversal.as_ref().map(|state| &state.stats)
5456 }
5457
5458 pub fn is_nat_traversal_enabled(&self) -> bool {
5460 self.nat_traversal.is_some()
5461 }
5462
5463 fn negotiate_address_discovery(&mut self, peer_params: &TransportParameters) {
5467 let now = Instant::now();
5468
5469 match &peer_params.address_discovery {
5471 Some(peer_config) => {
5472 if let Some(state) = &mut self.address_discovery_state {
5474 if state.enabled {
5475 debug!(
5478 "Address discovery negotiated: rate={}, all_paths={}",
5479 state.max_observation_rate, state.observe_all_paths
5480 );
5481 } else {
5482 debug!("Address discovery disabled locally, ignoring peer support");
5484 }
5485 } else {
5486 self.address_discovery_state =
5488 Some(AddressDiscoveryState::new(peer_config, now));
5489 debug!("Address discovery initialized from peer config");
5490 }
5491 }
5492 _ => {
5493 if let Some(state) = &mut self.address_discovery_state {
5495 state.enabled = false;
5496 debug!("Address discovery disabled - peer doesn't support it");
5497 }
5498 }
5499 }
5500
5501 if let Some(state) = &self.address_discovery_state {
5503 if state.enabled {
5504 self.path.set_observation_rate(state.max_observation_rate);
5505 }
5506 }
5507 }
5508
5509 fn decrypt_packet(
5510 &mut self,
5511 now: Instant,
5512 packet: &mut Packet,
5513 ) -> Result<Option<u64>, Option<TransportError>> {
5514 let result = packet_crypto::decrypt_packet_body(
5515 packet,
5516 &self.spaces,
5517 self.zero_rtt_crypto.as_ref(),
5518 self.key_phase,
5519 self.prev_crypto.as_ref(),
5520 self.next_crypto.as_ref(),
5521 )?;
5522
5523 let result = match result {
5524 Some(r) => r,
5525 None => return Ok(None),
5526 };
5527
5528 if result.outgoing_key_update_acked {
5529 if let Some(prev) = self.prev_crypto.as_mut() {
5530 prev.end_packet = Some((result.number, now));
5531 self.set_key_discard_timer(now, packet.header.space());
5532 }
5533 }
5534
5535 if result.incoming_key_update {
5536 trace!("key update authenticated");
5537 self.update_keys(Some((result.number, now)), true);
5538 self.set_key_discard_timer(now, packet.header.space());
5539 }
5540
5541 Ok(Some(result.number))
5542 }
5543
5544 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5545 trace!("executing key update");
5546 let new = self
5550 .crypto
5551 .next_1rtt_keys()
5552 .expect("only called for `Data` packets");
5553 self.key_phase_size = new
5554 .local
5555 .confidentiality_limit()
5556 .saturating_sub(KEY_UPDATE_MARGIN);
5557 let old = mem::replace(
5558 &mut self.spaces[SpaceId::Data]
5559 .crypto
5560 .as_mut()
5561 .unwrap() .packet,
5563 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5564 );
5565 self.spaces[SpaceId::Data].sent_with_keys = 0;
5566 self.prev_crypto = Some(PrevCrypto {
5567 crypto: old,
5568 end_packet,
5569 update_unacked: remote,
5570 });
5571 self.key_phase = !self.key_phase;
5572 }
5573
5574 fn peer_supports_ack_frequency(&self) -> bool {
5575 self.peer_params.min_ack_delay.is_some()
5576 }
5577
5578 pub(crate) fn immediate_ack(&mut self) {
5583 self.spaces[self.highest_space].immediate_ack_pending = true;
5584 }
5585
5586 #[cfg(test)]
5588 #[allow(dead_code)]
5589 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5590 let (first_decode, remaining) = match &event.0 {
5591 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5592 first_decode,
5593 remaining,
5594 ..
5595 }) => (first_decode, remaining),
5596 _ => return None,
5597 };
5598
5599 if remaining.is_some() {
5600 panic!("Packets should never be coalesced in tests");
5601 }
5602
5603 let decrypted_header = packet_crypto::unprotect_header(
5604 first_decode.clone(),
5605 &self.spaces,
5606 self.zero_rtt_crypto.as_ref(),
5607 self.peer_params.stateless_reset_token,
5608 )?;
5609
5610 let mut packet = decrypted_header.packet?;
5611 packet_crypto::decrypt_packet_body(
5612 &mut packet,
5613 &self.spaces,
5614 self.zero_rtt_crypto.as_ref(),
5615 self.key_phase,
5616 self.prev_crypto.as_ref(),
5617 self.next_crypto.as_ref(),
5618 )
5619 .ok()?;
5620
5621 Some(packet.payload.to_vec())
5622 }
5623
5624 #[cfg(test)]
5627 #[allow(dead_code)]
5628 pub(crate) fn bytes_in_flight(&self) -> u64 {
5629 self.path.in_flight.bytes
5630 }
5631
5632 #[cfg(test)]
5634 #[allow(dead_code)]
5635 pub(crate) fn congestion_window(&self) -> u64 {
5636 self.path
5637 .congestion
5638 .window()
5639 .saturating_sub(self.path.in_flight.bytes)
5640 }
5641
5642 #[cfg(test)]
5644 #[allow(dead_code)]
5645 pub(crate) fn is_idle(&self) -> bool {
5646 Timer::VALUES
5647 .iter()
5648 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
5649 .filter_map(|&t| Some((t, self.timers.get(t)?)))
5650 .min_by_key(|&(_, time)| time)
5651 .is_none_or(|(timer, _)| timer == Timer::Idle)
5652 }
5653
5654 #[cfg(test)]
5656 #[allow(dead_code)]
5657 pub(crate) fn lost_packets(&self) -> u64 {
5658 self.lost_packets
5659 }
5660
5661 #[cfg(test)]
5663 #[allow(dead_code)]
5664 pub(crate) fn using_ecn(&self) -> bool {
5665 self.path.sending_ecn
5666 }
5667
5668 #[cfg(test)]
5670 #[allow(dead_code)]
5671 pub(crate) fn total_recvd(&self) -> u64 {
5672 self.path.total_recvd
5673 }
5674
5675 #[cfg(test)]
5676 #[allow(dead_code)]
5677 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
5678 self.local_cid_state.active_seq()
5679 }
5680
5681 #[cfg(test)]
5684 #[allow(dead_code)]
5685 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
5686 let n = self.local_cid_state.assign_retire_seq(v);
5687 self.endpoint_events
5688 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
5689 }
5690
5691 #[cfg(test)]
5693 #[allow(dead_code)]
5694 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
5695 self.rem_cids.active_seq()
5696 }
5697
5698 #[cfg(test)]
5700 #[cfg(test)]
5701 #[allow(dead_code)]
5702 pub(crate) fn path_mtu(&self) -> u16 {
5703 self.path.current_mtu()
5704 }
5705
5706 fn can_send_1rtt(&self, max_size: usize) -> bool {
5710 self.streams.can_send_stream_data()
5711 || self.path.challenge_pending
5712 || self
5713 .prev_path
5714 .as_ref()
5715 .is_some_and(|(_, x)| x.challenge_pending)
5716 || !self.path_responses.is_empty()
5717 || !self.nat_traversal_challenges.is_empty()
5718 || self
5719 .datagrams
5720 .outgoing
5721 .front()
5722 .is_some_and(|x| x.size(true) <= max_size)
5723 }
5724
5725 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
5727 for path in [&mut self.path]
5729 .into_iter()
5730 .chain(self.prev_path.as_mut().map(|(_, data)| data))
5731 {
5732 if path.remove_in_flight(pn, packet) {
5733 return;
5734 }
5735 }
5736 }
5737
5738 fn kill(&mut self, reason: ConnectionError) {
5740 self.close_common();
5741 self.error = Some(reason);
5742 self.state = State::Drained;
5743 self.endpoint_events.push_back(EndpointEventInner::Drained);
5744 }
5745
5746 fn generate_nat_traversal_challenges(&mut self, now: Instant) {
5748 let candidates: Vec<(VarInt, SocketAddr)> = if let Some(nat_state) = &self.nat_traversal {
5750 nat_state
5751 .get_validation_candidates()
5752 .into_iter()
5753 .take(3) .map(|(seq, candidate)| (seq, candidate.address))
5755 .collect()
5756 } else {
5757 return;
5758 };
5759
5760 if candidates.is_empty() {
5761 return;
5762 }
5763
5764 if let Some(nat_state) = &mut self.nat_traversal {
5766 for (seq, address) in candidates {
5767 let challenge: u64 = self.rng.r#gen();
5769
5770 if let Err(e) = nat_state.start_validation(seq, challenge, now) {
5772 debug!("Failed to start validation for candidate {}: {}", seq, e);
5773 continue;
5774 }
5775
5776 self.nat_traversal_challenges.push(address, challenge);
5778 trace!(
5779 "Queuing NAT validation PATH_CHALLENGE for {} with token {:08x}",
5780 address, challenge
5781 );
5782 }
5783 }
5784 }
5785
5786 pub fn current_mtu(&self) -> u16 {
5790 self.path.current_mtu()
5791 }
5792
5793 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
5800 let pn_len = match pn {
5801 Some(pn) => PacketNumber::new(
5802 pn,
5803 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
5804 )
5805 .len(),
5806 None => 4,
5808 };
5809
5810 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
5812 }
5813
5814 fn tag_len_1rtt(&self) -> usize {
5815 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
5816 Some(crypto) => Some(&*crypto.packet.local),
5817 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
5818 };
5819 key.map_or(16, |x| x.tag_len())
5823 }
5824
5825 fn on_path_validated(&mut self) {
5827 self.path.validated = true;
5828 let ConnectionSide::Server { server_config } = &self.side else {
5829 return;
5830 };
5831 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
5832 new_tokens.clear();
5833 for _ in 0..server_config.validation_token.sent {
5834 new_tokens.push(self.path.remote);
5835 }
5836 }
5837}
5838
5839impl fmt::Debug for Connection {
5840 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5841 f.debug_struct("Connection")
5842 .field("handshake_cid", &self.handshake_cid)
5843 .finish()
5844 }
5845}
5846
5847enum ConnectionSide {
5849 Client {
5850 token: Bytes,
5852 token_store: Arc<dyn TokenStore>,
5853 server_name: String,
5854 },
5855 Server {
5856 server_config: Arc<ServerConfig>,
5857 },
5858}
5859
5860impl ConnectionSide {
5861 fn remote_may_migrate(&self) -> bool {
5862 match self {
5863 Self::Server { server_config } => server_config.migration,
5864 Self::Client { .. } => false,
5865 }
5866 }
5867
5868 fn is_client(&self) -> bool {
5869 self.side().is_client()
5870 }
5871
5872 fn is_server(&self) -> bool {
5873 self.side().is_server()
5874 }
5875
5876 fn side(&self) -> Side {
5877 match *self {
5878 Self::Client { .. } => Side::Client,
5879 Self::Server { .. } => Side::Server,
5880 }
5881 }
5882}
5883
5884impl From<SideArgs> for ConnectionSide {
5885 fn from(side: SideArgs) -> Self {
5886 match side {
5887 SideArgs::Client {
5888 token_store,
5889 server_name,
5890 } => Self::Client {
5891 token: token_store.take(&server_name).unwrap_or_default(),
5892 token_store,
5893 server_name,
5894 },
5895 SideArgs::Server {
5896 server_config,
5897 pref_addr_cid: _,
5898 path_validated: _,
5899 } => Self::Server { server_config },
5900 }
5901 }
5902}
5903
5904pub(crate) enum SideArgs {
5906 Client {
5907 token_store: Arc<dyn TokenStore>,
5908 server_name: String,
5909 },
5910 Server {
5911 server_config: Arc<ServerConfig>,
5912 pref_addr_cid: Option<ConnectionId>,
5913 path_validated: bool,
5914 },
5915}
5916
5917impl SideArgs {
5918 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
5919 match *self {
5920 Self::Client { .. } => None,
5921 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
5922 }
5923 }
5924
5925 pub(crate) fn path_validated(&self) -> bool {
5926 match *self {
5927 Self::Client { .. } => true,
5928 Self::Server { path_validated, .. } => path_validated,
5929 }
5930 }
5931
5932 pub(crate) fn side(&self) -> Side {
5933 match *self {
5934 Self::Client { .. } => Side::Client,
5935 Self::Server { .. } => Side::Server,
5936 }
5937 }
5938}
5939
5940#[derive(Debug, Error, Clone, PartialEq, Eq)]
5942pub enum ConnectionError {
5943 #[error("peer doesn't implement any supported version")]
5945 VersionMismatch,
5946 #[error(transparent)]
5948 TransportError(#[from] TransportError),
5949 #[error("aborted by peer: {0}")]
5951 ConnectionClosed(frame::ConnectionClose),
5952 #[error("closed by peer: {0}")]
5954 ApplicationClosed(frame::ApplicationClose),
5955 #[error("reset by peer")]
5957 Reset,
5958 #[error("timed out")]
5964 TimedOut,
5965 #[error("closed")]
5967 LocallyClosed,
5968 #[error("CIDs exhausted")]
5972 CidsExhausted,
5973}
5974
5975impl From<Close> for ConnectionError {
5976 fn from(x: Close) -> Self {
5977 match x {
5978 Close::Connection(reason) => Self::ConnectionClosed(reason),
5979 Close::Application(reason) => Self::ApplicationClosed(reason),
5980 }
5981 }
5982}
5983
5984impl From<ConnectionError> for io::Error {
5986 fn from(x: ConnectionError) -> Self {
5987 use ConnectionError::*;
5988 let kind = match x {
5989 TimedOut => io::ErrorKind::TimedOut,
5990 Reset => io::ErrorKind::ConnectionReset,
5991 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
5992 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
5993 io::ErrorKind::Other
5994 }
5995 };
5996 Self::new(kind, x)
5997 }
5998}
5999
6000#[derive(Clone, Debug)]
6001pub enum State {
6003 Handshake(state::Handshake),
6005 Established,
6007 Closed(state::Closed),
6009 Draining,
6011 Drained,
6013}
6014
6015impl State {
6016 fn closed<R: Into<Close>>(reason: R) -> Self {
6017 Self::Closed(state::Closed {
6018 reason: reason.into(),
6019 })
6020 }
6021
6022 fn is_handshake(&self) -> bool {
6023 matches!(*self, Self::Handshake(_))
6024 }
6025
6026 fn is_established(&self) -> bool {
6027 matches!(*self, Self::Established)
6028 }
6029
6030 fn is_closed(&self) -> bool {
6031 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
6032 }
6033
6034 fn is_drained(&self) -> bool {
6035 matches!(*self, Self::Drained)
6036 }
6037}
6038
6039mod state {
6040 use super::*;
6041
6042 #[derive(Clone, Debug)]
6043 pub struct Handshake {
6044 pub(super) rem_cid_set: bool,
6048 pub(super) expected_token: Bytes,
6052 pub(super) client_hello: Option<Bytes>,
6056 }
6057
6058 #[derive(Clone, Debug)]
6059 pub struct Closed {
6060 pub(super) reason: Close,
6061 }
6062}
6063
6064#[derive(Debug)]
6066pub enum Event {
6067 HandshakeDataReady,
6069 Connected,
6071 ConnectionLost {
6075 reason: ConnectionError,
6077 },
6078 Stream(StreamEvent),
6080 DatagramReceived,
6082 DatagramsUnblocked,
6084}
6085
6086fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
6087 if x > y { x - y } else { Duration::ZERO }
6088}
6089
6090fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6091 Duration::from_micros(params.max_ack_delay.0 * 1000)
6092}
6093
6094const MAX_BACKOFF_EXPONENT: u32 = 16;
6096
6097const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6105
6106const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6112 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6113
6114const KEY_UPDATE_MARGIN: u64 = 10_000;
6118
6119#[derive(Default)]
6120struct SentFrames {
6121 retransmits: ThinRetransmits,
6122 largest_acked: Option<u64>,
6123 stream_frames: StreamMetaVec,
6124 non_retransmits: bool,
6126 requires_padding: bool,
6127}
6128
6129impl SentFrames {
6130 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6132 self.largest_acked.is_some()
6133 && !self.non_retransmits
6134 && self.stream_frames.is_empty()
6135 && self.retransmits.is_empty(streams)
6136 }
6137}
6138
6139fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6147 match (x, y) {
6148 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6149 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6150 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6151 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6152 }
6153}
6154
6155#[derive(Debug, Clone)]
6157pub(crate) struct PqcState {
6158 enabled: bool,
6160 #[allow(dead_code)]
6162 algorithms: Option<crate::transport_parameters::PqcAlgorithms>,
6163 handshake_mtu: u16,
6165 using_pqc: bool,
6167 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler,
6169}
6170
6171#[allow(dead_code)]
6172impl PqcState {
6173 fn new() -> Self {
6174 Self {
6175 enabled: false,
6176 algorithms: None,
6177 handshake_mtu: MIN_INITIAL_SIZE,
6178 using_pqc: false,
6179 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler::new(),
6180 }
6181 }
6182
6183 fn min_initial_size(&self) -> u16 {
6185 if self.enabled && self.using_pqc {
6186 std::cmp::max(self.handshake_mtu, 4096)
6188 } else {
6189 MIN_INITIAL_SIZE
6190 }
6191 }
6192
6193 fn update_from_peer_params(&mut self, params: &TransportParameters) {
6195 if let Some(ref algorithms) = params.pqc_algorithms {
6196 self.enabled = true;
6197 self.algorithms = Some(algorithms.clone());
6198 if algorithms.ml_kem_768 || algorithms.ml_dsa_65 {
6200 self.using_pqc = true;
6201 self.handshake_mtu = 4096; }
6203 }
6204 }
6205
6206 fn detect_pqc_from_crypto(&mut self, crypto_data: &[u8], space: SpaceId) {
6208 if !self.enabled {
6209 return;
6210 }
6211 if self.packet_handler.detect_pqc_handshake(crypto_data, space) {
6212 self.using_pqc = true;
6213 self.handshake_mtu = self.packet_handler.get_min_packet_size(space);
6215 }
6216 }
6217
6218 fn should_trigger_mtu_discovery(&mut self) -> bool {
6220 self.packet_handler.should_trigger_mtu_discovery()
6221 }
6222
6223 fn get_mtu_config(&self) -> MtuDiscoveryConfig {
6225 self.packet_handler.get_pqc_mtu_config()
6226 }
6227
6228 fn calculate_crypto_frame_size(&self, available_space: usize, remaining_data: usize) -> usize {
6230 self.packet_handler
6231 .calculate_crypto_frame_size(available_space, remaining_data)
6232 }
6233
6234 fn should_adjust_coalescing(&self, current_size: usize, space: SpaceId) -> bool {
6236 self.packet_handler
6237 .adjust_coalescing_for_pqc(current_size, space)
6238 }
6239
6240 fn on_packet_sent(&mut self, space: SpaceId, size: u16) {
6242 self.packet_handler.on_packet_sent(space, size);
6243 }
6244
6245 fn reset(&mut self) {
6247 self.enabled = false;
6248 self.algorithms = None;
6249 self.handshake_mtu = MIN_INITIAL_SIZE;
6250 self.using_pqc = false;
6251 self.packet_handler.reset();
6252 }
6253}
6254
6255impl Default for PqcState {
6256 fn default() -> Self {
6257 Self::new()
6258 }
6259}
6260
6261#[derive(Debug, Clone)]
6263pub(crate) struct AddressDiscoveryState {
6264 enabled: bool,
6266 max_observation_rate: u8,
6268 observe_all_paths: bool,
6270 sent_observations: std::collections::HashMap<u64, paths::PathAddressInfo>,
6272 received_observations: std::collections::HashMap<u64, paths::PathAddressInfo>,
6274 rate_limiter: AddressObservationRateLimiter,
6276 received_history: Vec<ObservedAddressEvent>,
6278 bootstrap_mode: bool,
6280 next_sequence_number: VarInt,
6282 last_received_sequence: std::collections::HashMap<u64, VarInt>,
6284 frames_sent: u64,
6286}
6287
6288#[derive(Debug, Clone, PartialEq, Eq)]
6290struct ObservedAddressEvent {
6291 address: SocketAddr,
6293 received_at: Instant,
6295 path_id: u64,
6297}
6298
6299#[derive(Debug, Clone)]
6301struct AddressObservationRateLimiter {
6302 tokens: f64,
6304 max_tokens: f64,
6306 rate: f64,
6308 last_update: Instant,
6310}
6311
6312#[allow(dead_code)]
6313impl AddressDiscoveryState {
6314 fn new(config: &crate::transport_parameters::AddressDiscoveryConfig, now: Instant) -> Self {
6316 use crate::transport_parameters::AddressDiscoveryConfig::*;
6317
6318 let (enabled, _can_send, _can_receive) = match config {
6320 SendOnly => (true, true, false),
6321 ReceiveOnly => (true, false, true),
6322 SendAndReceive => (true, true, true),
6323 };
6324
6325 let max_observation_rate = 10u8; let observe_all_paths = false; Self {
6331 enabled,
6332 max_observation_rate,
6333 observe_all_paths,
6334 sent_observations: std::collections::HashMap::new(),
6335 received_observations: std::collections::HashMap::new(),
6336 rate_limiter: AddressObservationRateLimiter::new(max_observation_rate, now),
6337 received_history: Vec::new(),
6338 bootstrap_mode: false,
6339 next_sequence_number: VarInt::from_u32(0),
6340 last_received_sequence: std::collections::HashMap::new(),
6341 frames_sent: 0,
6342 }
6343 }
6344
6345 fn should_send_observation(&mut self, path_id: u64, now: Instant) -> bool {
6347 if !self.should_observe_path(path_id) {
6349 return false;
6350 }
6351
6352 let needs_observation = match self.sent_observations.get(&path_id) {
6354 Some(info) => info.observed_address.is_none() || !info.notified,
6355 None => true,
6356 };
6357
6358 if !needs_observation {
6359 return false;
6360 }
6361
6362 self.rate_limiter.try_consume(1.0, now)
6364 }
6365
6366 fn record_observation_sent(&mut self, path_id: u64) {
6368 if let Some(info) = self.sent_observations.get_mut(&path_id) {
6369 info.mark_notified();
6370 }
6371 }
6372
6373 fn handle_observed_address(&mut self, address: SocketAddr, path_id: u64, now: Instant) {
6375 if !self.enabled {
6376 return;
6377 }
6378
6379 self.received_history.push(ObservedAddressEvent {
6380 address,
6381 received_at: now,
6382 path_id,
6383 });
6384
6385 let info = self
6387 .received_observations
6388 .entry(path_id)
6389 .or_insert_with(paths::PathAddressInfo::new);
6390 info.update_observed_address(address, now);
6391 }
6392
6393 pub(crate) fn get_observed_address(&self, path_id: u64) -> Option<SocketAddr> {
6395 self.received_observations
6396 .get(&path_id)
6397 .and_then(|info| info.observed_address)
6398 }
6399
6400 pub(crate) fn get_all_received_history(&self) -> Vec<SocketAddr> {
6402 self.received_observations
6403 .values()
6404 .filter_map(|info| info.observed_address)
6405 .collect()
6406 }
6407
6408 pub(crate) fn stats(&self) -> AddressDiscoveryStats {
6410 AddressDiscoveryStats {
6411 frames_sent: self.frames_sent,
6412 frames_received: self.received_history.len() as u64,
6413 addresses_discovered: self
6414 .received_observations
6415 .values()
6416 .filter(|info| info.observed_address.is_some())
6417 .count() as u64,
6418 address_changes_detected: 0, }
6420 }
6421
6422 fn has_unnotified_changes(&self) -> bool {
6428 let has_unsent = self
6430 .sent_observations
6431 .values()
6432 .any(|info| info.observed_address.is_some() && !info.notified);
6433
6434 let has_unreceived = self
6436 .received_observations
6437 .values()
6438 .any(|info| info.observed_address.is_some() && !info.notified);
6439
6440 has_unsent || has_unreceived
6441 }
6442
6443 fn queue_observed_address_frame(
6445 &mut self,
6446 path_id: u64,
6447 address: SocketAddr,
6448 ) -> Option<frame::ObservedAddress> {
6449 if !self.enabled {
6451 return None;
6452 }
6453
6454 if !self.observe_all_paths && path_id != 0 {
6456 return None;
6457 }
6458
6459 if let Some(info) = self.sent_observations.get(&path_id) {
6461 if info.notified {
6462 return None;
6463 }
6464 }
6465
6466 if self.rate_limiter.tokens < 1.0 {
6468 return None;
6469 }
6470
6471 self.rate_limiter.tokens -= 1.0;
6473
6474 let info = self
6476 .sent_observations
6477 .entry(path_id)
6478 .or_insert_with(paths::PathAddressInfo::new);
6479 info.observed_address = Some(address);
6480 info.notified = true;
6481
6482 println!(
6483 "DEBUG: queue_observed_address_frame: ACTUALLY QUEUING frame for path {} with address {}",
6484 path_id, address
6485 );
6486
6487 let sequence_number = self.next_sequence_number;
6489 self.next_sequence_number = VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6490 .expect("sequence number overflow");
6491
6492 Some(frame::ObservedAddress {
6493 sequence_number,
6494 address,
6495 })
6496 }
6497
6498 fn check_for_address_observations(
6500 &mut self,
6501 _current_path: u64,
6502 peer_supports_address_discovery: bool,
6503 now: Instant,
6504 ) -> Vec<frame::ObservedAddress> {
6505 let mut frames = Vec::new();
6506
6507 if !self.enabled || !peer_supports_address_discovery {
6509 return frames;
6510 }
6511
6512 self.rate_limiter.update_tokens(now);
6514
6515 let paths_to_notify: Vec<u64> = self
6517 .sent_observations
6518 .iter()
6519 .filter_map(|(&path_id, info)| {
6520 if info.observed_address.is_some() && !info.notified {
6521 Some(path_id)
6522 } else {
6523 None
6524 }
6525 })
6526 .collect();
6527
6528 for path_id in paths_to_notify {
6530 if !self.should_observe_path(path_id) {
6532 continue;
6533 }
6534
6535 if !self.bootstrap_mode && self.rate_limiter.tokens < 1.0 {
6537 break; }
6539
6540 if let Some(info) = self.sent_observations.get_mut(&path_id) {
6542 if let Some(address) = info.observed_address {
6543 if self.bootstrap_mode {
6545 self.rate_limiter.tokens -= 0.2; } else {
6547 self.rate_limiter.tokens -= 1.0;
6548 }
6549
6550 info.notified = true;
6552
6553 let sequence_number = self.next_sequence_number;
6555 self.next_sequence_number =
6556 VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6557 .expect("sequence number overflow");
6558
6559 self.frames_sent += 1;
6560
6561 frames.push(frame::ObservedAddress {
6562 sequence_number,
6563 address,
6564 });
6565 }
6566 }
6567 }
6568
6569 frames
6570 }
6571
6572 fn update_rate_limit(&mut self, new_rate: f64) {
6574 self.max_observation_rate = new_rate as u8;
6575 self.rate_limiter.set_rate(new_rate as u8);
6576 }
6577
6578 fn from_transport_params(params: &TransportParameters) -> Option<Self> {
6580 params
6581 .address_discovery
6582 .as_ref()
6583 .map(|config| Self::new(config, Instant::now()))
6584 }
6585
6586 #[cfg(test)]
6588 fn new_with_params(enabled: bool, max_rate: f64, observe_all_paths: bool) -> Self {
6589 if !enabled {
6591 return Self {
6593 enabled: false,
6594 max_observation_rate: max_rate as u8,
6595 observe_all_paths,
6596 sent_observations: std::collections::HashMap::new(),
6597 received_observations: std::collections::HashMap::new(),
6598 rate_limiter: AddressObservationRateLimiter::new(max_rate as u8, Instant::now()),
6599 received_history: Vec::new(),
6600 bootstrap_mode: false,
6601 next_sequence_number: VarInt::from_u32(0),
6602 last_received_sequence: std::collections::HashMap::new(),
6603 frames_sent: 0,
6604 };
6605 }
6606
6607 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6609 let mut state = Self::new(&config, Instant::now());
6610 state.max_observation_rate = max_rate as u8;
6611 state.observe_all_paths = observe_all_paths;
6612 state.rate_limiter = AddressObservationRateLimiter::new(max_rate as u8, Instant::now());
6613 state
6614 }
6615
6616 fn set_bootstrap_mode(&mut self, enabled: bool) {
6618 self.bootstrap_mode = enabled;
6619 if enabled {
6621 let bootstrap_rate = self.get_effective_rate_limit();
6622 self.rate_limiter.rate = bootstrap_rate;
6623 self.rate_limiter.max_tokens = bootstrap_rate * 2.0; self.rate_limiter.tokens = self.rate_limiter.max_tokens;
6626 }
6627 }
6628
6629 fn is_bootstrap_mode(&self) -> bool {
6631 self.bootstrap_mode
6632 }
6633
6634 fn get_effective_rate_limit(&self) -> f64 {
6636 if self.bootstrap_mode {
6637 (self.max_observation_rate as f64) * 5.0
6639 } else {
6640 self.max_observation_rate as f64
6641 }
6642 }
6643
6644 fn should_observe_path(&self, path_id: u64) -> bool {
6646 if !self.enabled {
6647 return false;
6648 }
6649
6650 if self.bootstrap_mode {
6652 return true;
6653 }
6654
6655 self.observe_all_paths || path_id == 0
6657 }
6658
6659 fn should_send_observation_immediately(&self, is_new_connection: bool) -> bool {
6661 self.bootstrap_mode && is_new_connection
6662 }
6663}
6664
6665#[allow(dead_code)]
6666impl AddressObservationRateLimiter {
6667 fn new(rate: u8, now: Instant) -> Self {
6669 let rate_f64 = rate as f64;
6670 Self {
6671 tokens: rate_f64,
6672 max_tokens: rate_f64,
6673 rate: rate_f64,
6674 last_update: now,
6675 }
6676 }
6677
6678 fn try_consume(&mut self, tokens: f64, now: Instant) -> bool {
6680 self.update_tokens(now);
6681
6682 if self.tokens >= tokens {
6683 self.tokens -= tokens;
6684 true
6685 } else {
6686 false
6687 }
6688 }
6689
6690 fn update_tokens(&mut self, now: Instant) {
6692 let elapsed = now.saturating_duration_since(self.last_update);
6693 let new_tokens = elapsed.as_secs_f64() * self.rate;
6694 self.tokens = (self.tokens + new_tokens).min(self.max_tokens);
6695 self.last_update = now;
6696 }
6697
6698 fn set_rate(&mut self, rate: u8) {
6700 let rate_f64 = rate as f64;
6701 self.rate = rate_f64;
6702 self.max_tokens = rate_f64;
6703 if self.tokens > self.max_tokens {
6705 self.tokens = self.max_tokens;
6706 }
6707 }
6708}
6709
6710#[cfg(test)]
6711mod tests {
6712 use super::*;
6713 use crate::transport_parameters::AddressDiscoveryConfig;
6714 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
6715
6716 #[test]
6717 fn address_discovery_state_new() {
6718 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6719 let now = Instant::now();
6720 let state = AddressDiscoveryState::new(&config, now);
6721
6722 assert!(state.enabled);
6723 assert_eq!(state.max_observation_rate, 10);
6724 assert!(!state.observe_all_paths);
6725 assert!(state.sent_observations.is_empty());
6726 assert!(state.received_observations.is_empty());
6727 assert!(state.received_history.is_empty());
6728 assert_eq!(state.rate_limiter.tokens, 10.0);
6729 }
6730
6731 #[test]
6732 fn address_discovery_state_disabled() {
6733 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6734 let now = Instant::now();
6735 let mut state = AddressDiscoveryState::new(&config, now);
6736
6737 state.enabled = false;
6739
6740 assert!(!state.should_send_observation(0, now));
6742 }
6743
6744 #[test]
6745 fn address_discovery_state_should_send_observation() {
6746 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6747 let now = Instant::now();
6748 let mut state = AddressDiscoveryState::new(&config, now);
6749
6750 assert!(state.should_send_observation(0, now));
6752
6753 let mut path_info = paths::PathAddressInfo::new();
6755 path_info.update_observed_address(
6756 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6757 now,
6758 );
6759 path_info.mark_notified();
6760 state.sent_observations.insert(0, path_info);
6761
6762 assert!(!state.should_send_observation(0, now));
6764
6765 assert!(!state.should_send_observation(1, now));
6767 }
6768
6769 #[test]
6770 fn address_discovery_state_rate_limiting() {
6771 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6772 let now = Instant::now();
6773 let mut state = AddressDiscoveryState::new(&config, now);
6774
6775 state.observe_all_paths = true;
6777
6778 assert!(state.should_send_observation(0, now));
6780
6781 state.rate_limiter.try_consume(9.0, now); assert!(!state.should_send_observation(0, now));
6786
6787 let later = now + Duration::from_secs(1);
6789 state.rate_limiter.update_tokens(later);
6790 assert!(state.should_send_observation(0, later));
6791 }
6792
6793 #[test]
6794 fn address_discovery_state_handle_observed_address() {
6795 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6796 let now = Instant::now();
6797 let mut state = AddressDiscoveryState::new(&config, now);
6798
6799 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6800 let addr2 = SocketAddr::new(
6801 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
6802 8080,
6803 );
6804
6805 state.handle_observed_address(addr1, 0, now);
6807 assert_eq!(state.received_history.len(), 1);
6808 assert_eq!(state.received_history[0].address, addr1);
6809 assert_eq!(state.received_history[0].path_id, 0);
6810
6811 let later = now + Duration::from_millis(100);
6813 state.handle_observed_address(addr2, 1, later);
6814 assert_eq!(state.received_history.len(), 2);
6815 assert_eq!(state.received_history[1].address, addr2);
6816 assert_eq!(state.received_history[1].path_id, 1);
6817 }
6818
6819 #[test]
6820 fn address_discovery_state_get_observed_address() {
6821 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6822 let now = Instant::now();
6823 let mut state = AddressDiscoveryState::new(&config, now);
6824
6825 assert_eq!(state.get_observed_address(0), None);
6827
6828 let mut path_info = paths::PathAddressInfo::new();
6830 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80);
6831 path_info.update_observed_address(addr, now);
6832 state.received_observations.insert(0, path_info);
6833
6834 assert_eq!(state.get_observed_address(0), Some(addr));
6836 assert_eq!(state.get_observed_address(1), None);
6837 }
6838
6839 #[test]
6840 fn address_discovery_state_unnotified_changes() {
6841 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6842 let now = Instant::now();
6843 let mut state = AddressDiscoveryState::new(&config, now);
6844
6845 assert!(!state.has_unnotified_changes());
6847
6848 let mut path_info = paths::PathAddressInfo::new();
6850 path_info.update_observed_address(
6851 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6852 now,
6853 );
6854 state.sent_observations.insert(0, path_info);
6855
6856 assert!(state.has_unnotified_changes());
6858
6859 state.record_observation_sent(0);
6861 assert!(!state.has_unnotified_changes());
6862 }
6863
6864 #[test]
6865 fn address_observation_rate_limiter_token_bucket() {
6866 let now = Instant::now();
6867 let mut limiter = AddressObservationRateLimiter::new(5, now); assert_eq!(limiter.tokens, 5.0);
6871 assert_eq!(limiter.max_tokens, 5.0);
6872 assert_eq!(limiter.rate, 5.0);
6873
6874 assert!(limiter.try_consume(3.0, now));
6876 assert_eq!(limiter.tokens, 2.0);
6877
6878 assert!(!limiter.try_consume(3.0, now));
6880 assert_eq!(limiter.tokens, 2.0);
6881
6882 let later = now + Duration::from_secs(1);
6884 limiter.update_tokens(later);
6885 assert_eq!(limiter.tokens, 5.0); let half_sec = now + Duration::from_millis(500);
6889 let mut limiter2 = AddressObservationRateLimiter::new(5, now);
6890 limiter2.try_consume(3.0, now);
6891 limiter2.update_tokens(half_sec);
6892 assert_eq!(limiter2.tokens, 4.5); }
6894
6895 #[test]
6897 fn connection_initializes_address_discovery_state_default() {
6898 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6901 let state = AddressDiscoveryState::new(&config, Instant::now());
6902 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
6905 }
6906
6907 #[test]
6908 fn connection_initializes_with_address_discovery_enabled() {
6909 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6911 let state = AddressDiscoveryState::new(&config, Instant::now());
6912 assert!(state.enabled);
6913 assert_eq!(state.max_observation_rate, 10);
6914 assert!(!state.observe_all_paths);
6915 }
6916
6917 #[test]
6918 fn connection_address_discovery_enabled_by_default() {
6919 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6921 let state = AddressDiscoveryState::new(&config, Instant::now());
6922 assert!(state.enabled); }
6924
6925 #[test]
6926 fn negotiate_max_idle_timeout_commutative() {
6927 let test_params = [
6928 (None, None, None),
6929 (None, Some(VarInt(0)), None),
6930 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6931 (Some(VarInt(0)), Some(VarInt(0)), None),
6932 (
6933 Some(VarInt(2)),
6934 Some(VarInt(0)),
6935 Some(Duration::from_millis(2)),
6936 ),
6937 (
6938 Some(VarInt(1)),
6939 Some(VarInt(4)),
6940 Some(Duration::from_millis(1)),
6941 ),
6942 ];
6943
6944 for (left, right, result) in test_params {
6945 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6946 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6947 }
6948 }
6949
6950 #[test]
6951 fn path_creation_initializes_address_discovery() {
6952 let config = TransportConfig::default();
6953 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6954 let now = Instant::now();
6955
6956 let path = paths::PathData::new(remote, false, None, now, &config);
6958
6959 assert!(path.address_info.observed_address.is_none());
6961 assert!(path.address_info.last_observed.is_none());
6962 assert_eq!(path.address_info.observation_count, 0);
6963 assert!(!path.address_info.notified);
6964
6965 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6967 assert_eq!(path.observation_rate_limiter.max_tokens, 10.0);
6968 assert_eq!(path.observation_rate_limiter.tokens, 10.0);
6969 }
6970
6971 #[test]
6972 fn path_migration_resets_address_discovery() {
6973 let config = TransportConfig::default();
6974 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6975 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6976 let now = Instant::now();
6977
6978 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
6980 path1.update_observed_address(remote1, now);
6981 path1.mark_address_notified();
6982 path1.consume_observation_token(now);
6983 path1.set_observation_rate(20);
6984
6985 let path2 = paths::PathData::from_previous(remote2, &path1, now);
6987
6988 assert!(path2.address_info.observed_address.is_none());
6990 assert!(path2.address_info.last_observed.is_none());
6991 assert_eq!(path2.address_info.observation_count, 0);
6992 assert!(!path2.address_info.notified);
6993
6994 assert_eq!(path2.observation_rate_limiter.rate, 20.0);
6996 assert_eq!(path2.observation_rate_limiter.tokens, 20.0);
6997 }
6998
6999 #[test]
7000 fn connection_path_updates_observation_rate() {
7001 let config = TransportConfig::default();
7002 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 42);
7003 let now = Instant::now();
7004
7005 let mut path = paths::PathData::new(remote, false, None, now, &config);
7006
7007 assert_eq!(path.observation_rate_limiter.rate, 10.0);
7009
7010 path.set_observation_rate(25);
7012 assert_eq!(path.observation_rate_limiter.rate, 25.0);
7013 assert_eq!(path.observation_rate_limiter.max_tokens, 25.0);
7014
7015 path.observation_rate_limiter.tokens = 30.0; path.set_observation_rate(20);
7018 assert_eq!(path.observation_rate_limiter.tokens, 20.0); }
7020
7021 #[test]
7022 fn path_validation_preserves_discovery_state() {
7023 let config = TransportConfig::default();
7024 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7025 let now = Instant::now();
7026
7027 let mut path = paths::PathData::new(remote, false, None, now, &config);
7028
7029 let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 5678);
7031 path.update_observed_address(observed, now);
7032 path.set_observation_rate(15);
7033
7034 path.validated = true;
7036
7037 assert_eq!(path.address_info.observed_address, Some(observed));
7039 assert_eq!(path.observation_rate_limiter.rate, 15.0);
7040 }
7041
7042 #[test]
7043 fn address_discovery_state_initialization() {
7044 let state = AddressDiscoveryState::new_with_params(true, 30.0, true);
7046
7047 assert!(state.enabled);
7048 assert_eq!(state.max_observation_rate, 30);
7049 assert!(state.observe_all_paths);
7050 assert!(state.sent_observations.is_empty());
7051 assert!(state.received_observations.is_empty());
7052 assert!(state.received_history.is_empty());
7053 }
7054
7055 #[test]
7057 fn handle_observed_address_frame_basic() {
7058 let config = AddressDiscoveryConfig::SendAndReceive;
7059 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7060 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7061 let now = Instant::now();
7062 let path_id = 0;
7063
7064 state.handle_observed_address(addr, path_id, now);
7066
7067 assert_eq!(state.received_history.len(), 1);
7069 assert_eq!(state.received_history[0].address, addr);
7070 assert_eq!(state.received_history[0].path_id, path_id);
7071 assert_eq!(state.received_history[0].received_at, now);
7072
7073 assert!(state.received_observations.contains_key(&path_id));
7075 let path_info = &state.received_observations[&path_id];
7076 assert_eq!(path_info.observed_address, Some(addr));
7077 assert_eq!(path_info.last_observed, Some(now));
7078 assert_eq!(path_info.observation_count, 1);
7079 }
7080
7081 #[test]
7082 fn handle_observed_address_frame_multiple_observations() {
7083 let config = AddressDiscoveryConfig::SendAndReceive;
7084 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7085 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7086 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
7087 let now = Instant::now();
7088 let path_id = 0;
7089
7090 state.handle_observed_address(addr1, path_id, now);
7092 state.handle_observed_address(addr1, path_id, now + Duration::from_secs(1));
7093 state.handle_observed_address(addr2, path_id, now + Duration::from_secs(2));
7094
7095 assert_eq!(state.received_history.len(), 3);
7097
7098 let path_info = &state.received_observations[&path_id];
7100 assert_eq!(path_info.observed_address, Some(addr2));
7101 assert_eq!(path_info.observation_count, 1); }
7103
7104 #[test]
7105 fn handle_observed_address_frame_disabled() {
7106 let config = AddressDiscoveryConfig::SendAndReceive;
7107 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7108 state.enabled = false; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7110 let now = Instant::now();
7111
7112 state.handle_observed_address(addr, 0, now);
7114
7115 assert!(state.received_history.is_empty());
7117 assert!(state.sent_observations.is_empty());
7118 assert!(state.received_observations.is_empty());
7119 }
7120
7121 #[test]
7122 fn should_send_observation_basic() {
7123 let config = AddressDiscoveryConfig::SendAndReceive;
7124 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7125 state.max_observation_rate = 10;
7126 let now = Instant::now();
7127 let path_id = 0;
7128
7129 assert!(state.should_send_observation(path_id, now));
7131
7132 state.record_observation_sent(path_id);
7134
7135 assert!(state.should_send_observation(path_id, now));
7137 }
7138
7139 #[test]
7140 fn should_send_observation_rate_limiting() {
7141 let config = AddressDiscoveryConfig::SendAndReceive;
7142 let now = Instant::now();
7143 let mut state = AddressDiscoveryState::new(&config, now);
7144 state.max_observation_rate = 2; state.update_rate_limit(2.0);
7146 let path_id = 0;
7147
7148 assert!(state.should_send_observation(path_id, now));
7150 state.record_observation_sent(path_id);
7151 assert!(state.should_send_observation(path_id, now));
7152 state.record_observation_sent(path_id);
7153
7154 assert!(!state.should_send_observation(path_id, now));
7156
7157 let later = now + Duration::from_secs(1);
7159 assert!(state.should_send_observation(path_id, later));
7160 }
7161
7162 #[test]
7163 fn should_send_observation_disabled() {
7164 let config = AddressDiscoveryConfig::SendAndReceive;
7165 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7166 state.enabled = false;
7167
7168 assert!(!state.should_send_observation(0, Instant::now()));
7170 }
7171
7172 #[test]
7173 fn should_send_observation_per_path() {
7174 let config = AddressDiscoveryConfig::SendAndReceive;
7175 let now = Instant::now();
7176 let mut state = AddressDiscoveryState::new(&config, now);
7177 state.max_observation_rate = 2; state.observe_all_paths = true;
7179 state.update_rate_limit(2.0);
7180
7181 assert!(state.should_send_observation(0, now));
7183 state.record_observation_sent(0);
7184
7185 assert!(state.should_send_observation(1, now));
7187 state.record_observation_sent(1);
7188
7189 assert!(!state.should_send_observation(0, now));
7191 assert!(!state.should_send_observation(1, now));
7192
7193 let later = now + Duration::from_secs(1);
7195 assert!(state.should_send_observation(0, later));
7196 }
7197
7198 #[test]
7199 fn has_unnotified_changes_test() {
7200 let config = AddressDiscoveryConfig::SendAndReceive;
7201 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7202 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7203 let now = Instant::now();
7204
7205 assert!(!state.has_unnotified_changes());
7207
7208 state.handle_observed_address(addr, 0, now);
7210 assert!(state.has_unnotified_changes());
7211
7212 state.received_observations.get_mut(&0).unwrap().notified = true;
7214 assert!(!state.has_unnotified_changes());
7215 }
7216
7217 #[test]
7218 fn get_observed_address_test() {
7219 let config = AddressDiscoveryConfig::SendAndReceive;
7220 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7221 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7222 let now = Instant::now();
7223 let path_id = 0;
7224
7225 assert_eq!(state.get_observed_address(path_id), None);
7227
7228 state.handle_observed_address(addr, path_id, now);
7230 assert_eq!(state.get_observed_address(path_id), Some(addr));
7231
7232 assert_eq!(state.get_observed_address(999), None);
7234 }
7235
7236 #[test]
7238 fn rate_limiter_token_bucket_basic() {
7239 let now = Instant::now();
7240 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(5.0, now));
7244 assert!(limiter.try_consume(5.0, now));
7245
7246 assert!(!limiter.try_consume(1.0, now));
7248 }
7249
7250 #[test]
7251 fn rate_limiter_token_replenishment() {
7252 let now = Instant::now();
7253 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(10.0, now));
7257 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7261 assert!(limiter.try_consume(10.0, later)); assert!(!limiter.try_consume(0.1, later)); let later = later + Duration::from_millis(500);
7266 assert!(limiter.try_consume(5.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7269
7270 #[test]
7271 fn rate_limiter_max_tokens_cap() {
7272 let now = Instant::now();
7273 let mut limiter = AddressObservationRateLimiter::new(10, now);
7274
7275 let later = now + Duration::from_secs(2);
7277 assert!(limiter.try_consume(10.0, later));
7279 assert!(!limiter.try_consume(10.1, later)); let later2 = later + Duration::from_secs(1);
7283 assert!(limiter.try_consume(3.0, later2));
7284
7285 let much_later = later2 + Duration::from_secs(2);
7287 assert!(limiter.try_consume(10.0, much_later)); assert!(!limiter.try_consume(0.1, much_later)); }
7290
7291 #[test]
7292 fn rate_limiter_fractional_consumption() {
7293 let now = Instant::now();
7294 let mut limiter = AddressObservationRateLimiter::new(10, now);
7295
7296 assert!(limiter.try_consume(0.5, now));
7298 assert!(limiter.try_consume(2.3, now));
7299 assert!(limiter.try_consume(7.2, now)); assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_millis(100); assert!(limiter.try_consume(1.0, later));
7305 assert!(!limiter.try_consume(0.1, later));
7306 }
7307
7308 #[test]
7309 fn rate_limiter_zero_rate() {
7310 let now = Instant::now();
7311 let mut limiter = AddressObservationRateLimiter::new(0, now); assert!(!limiter.try_consume(1.0, now));
7315 assert!(!limiter.try_consume(0.1, now));
7316 assert!(!limiter.try_consume(0.001, now));
7317
7318 let later = now + Duration::from_secs(10);
7320 assert!(!limiter.try_consume(0.001, later));
7321 }
7322
7323 #[test]
7324 fn rate_limiter_high_rate() {
7325 let now = Instant::now();
7326 let mut limiter = AddressObservationRateLimiter::new(63, now); assert!(limiter.try_consume(60.0, now));
7330 assert!(limiter.try_consume(3.0, now));
7331 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7335 assert!(limiter.try_consume(63.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7338
7339 #[test]
7340 fn rate_limiter_time_precision() {
7341 let now = Instant::now();
7342 let mut limiter = AddressObservationRateLimiter::new(100, now); assert!(limiter.try_consume(100.0, now));
7346 assert!(!limiter.try_consume(0.1, now));
7347
7348 let later = now + Duration::from_millis(10);
7350 assert!(limiter.try_consume(0.8, later)); assert!(!limiter.try_consume(0.5, later)); let much_later = later + Duration::from_millis(100); assert!(limiter.try_consume(5.0, much_later)); limiter.tokens = 0.0; let final_time = much_later + Duration::from_millis(1);
7362 limiter.update_tokens(final_time); assert!(limiter.tokens >= 0.09 && limiter.tokens <= 0.11);
7367 }
7368
7369 #[test]
7370 fn per_path_rate_limiting_independent() {
7371 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7372 let now = Instant::now();
7373 let mut state = AddressDiscoveryState::new(&config, now);
7374
7375 state.observe_all_paths = true;
7377
7378 state.update_rate_limit(5.0);
7380
7381 state
7383 .sent_observations
7384 .insert(0, paths::PathAddressInfo::new());
7385 state
7386 .sent_observations
7387 .insert(1, paths::PathAddressInfo::new());
7388 state
7389 .sent_observations
7390 .insert(2, paths::PathAddressInfo::new());
7391
7392 state
7394 .sent_observations
7395 .get_mut(&0)
7396 .unwrap()
7397 .observed_address = Some(SocketAddr::new(
7398 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7399 8080,
7400 ));
7401 state
7402 .sent_observations
7403 .get_mut(&1)
7404 .unwrap()
7405 .observed_address = Some(SocketAddr::new(
7406 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
7407 8081,
7408 ));
7409 state
7410 .sent_observations
7411 .get_mut(&2)
7412 .unwrap()
7413 .observed_address = Some(SocketAddr::new(
7414 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)),
7415 8082,
7416 ));
7417
7418 for _ in 0..3 {
7420 assert!(state.should_send_observation(0, now));
7421 state.record_observation_sent(0);
7422 state.sent_observations.get_mut(&0).unwrap().notified = false;
7424 }
7425
7426 for _ in 0..2 {
7428 assert!(state.should_send_observation(1, now));
7429 state.record_observation_sent(1);
7430 state.sent_observations.get_mut(&1).unwrap().notified = false;
7432 }
7433
7434 assert!(!state.should_send_observation(2, now));
7436
7437 let later = now + Duration::from_secs(1);
7439
7440 assert!(state.should_send_observation(0, later));
7442 assert!(state.should_send_observation(1, later));
7443 assert!(state.should_send_observation(2, later));
7444 }
7445
7446 #[test]
7447 fn per_path_rate_limiting_with_path_specific_limits() {
7448 let now = Instant::now();
7449 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7450 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7451 let config = TransportConfig::default();
7452
7453 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
7455 let mut path2 = paths::PathData::new(remote2, false, None, now, &config);
7456
7457 path1.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now); path2.observation_rate_limiter = paths::PathObservationRateLimiter::new(5, now); for _ in 0..10 {
7463 assert!(path1.observation_rate_limiter.can_send(now));
7464 path1.observation_rate_limiter.consume_token(now);
7465 }
7466 assert!(!path1.observation_rate_limiter.can_send(now));
7467
7468 for _ in 0..5 {
7470 assert!(path2.observation_rate_limiter.can_send(now));
7471 path2.observation_rate_limiter.consume_token(now);
7472 }
7473 assert!(!path2.observation_rate_limiter.can_send(now));
7474 }
7475
7476 #[test]
7477 fn per_path_rate_limiting_address_change_detection() {
7478 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7479 let now = Instant::now();
7480 let mut state = AddressDiscoveryState::new(&config, now);
7481
7482 let path_id = 0;
7484 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7485 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8080);
7486
7487 assert!(state.should_send_observation(path_id, now));
7489
7490 let frame = state.queue_observed_address_frame(path_id, addr1);
7492 assert!(frame.is_some());
7493
7494 assert!(!state.should_send_observation(path_id, now));
7496
7497 if let Some(info) = state.sent_observations.get_mut(&path_id) {
7499 info.notified = false;
7500 info.observed_address = Some(addr2);
7501 }
7502
7503 assert!(state.should_send_observation(path_id, now));
7505 }
7506
7507 #[test]
7508 fn per_path_rate_limiting_migration() {
7509 let now = Instant::now();
7510 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7511 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7512 let config = TransportConfig::default();
7513
7514 let mut path = paths::PathData::new(remote1, false, None, now, &config);
7516 path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7517
7518 for _ in 0..5 {
7520 assert!(path.observation_rate_limiter.can_send(now));
7521 path.observation_rate_limiter.consume_token(now);
7522 }
7523
7524 let mut new_path = paths::PathData::new(remote2, false, None, now, &config);
7526
7527 new_path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7530
7531 for _ in 0..10 {
7533 assert!(new_path.observation_rate_limiter.can_send(now));
7534 new_path.observation_rate_limiter.consume_token(now);
7535 }
7536 assert!(!new_path.observation_rate_limiter.can_send(now));
7537 }
7538
7539 #[test]
7540 fn per_path_rate_limiting_disabled_paths() {
7541 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7542 let now = Instant::now();
7543 let mut state = AddressDiscoveryState::new(&config, now);
7544
7545 assert!(state.should_send_observation(0, now));
7547
7548 assert!(!state.should_send_observation(1, now));
7550 assert!(!state.should_send_observation(2, now));
7551
7552 let later = now + Duration::from_secs(1);
7554 assert!(!state.should_send_observation(1, later));
7555 }
7556
7557 #[test]
7558 fn respecting_negotiated_max_observation_rate_basic() {
7559 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7560 let now = Instant::now();
7561 let mut state = AddressDiscoveryState::new(&config, now);
7562
7563 state.max_observation_rate = 10; state.rate_limiter = AddressObservationRateLimiter::new(10, now);
7566
7567 for _ in 0..10 {
7569 assert!(state.should_send_observation(0, now));
7570 }
7571 assert!(!state.should_send_observation(0, now));
7573 }
7574
7575 #[test]
7576 fn respecting_negotiated_max_observation_rate_zero() {
7577 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7578 let now = Instant::now();
7579 let mut state = AddressDiscoveryState::new(&config, now);
7580
7581 state.max_observation_rate = 0;
7583 state.rate_limiter = AddressObservationRateLimiter::new(0, now);
7584
7585 assert!(!state.should_send_observation(0, now));
7587 assert!(!state.should_send_observation(1, now));
7588
7589 let later = now + Duration::from_secs(10);
7591 assert!(!state.should_send_observation(0, later));
7592 }
7593
7594 #[test]
7595 fn respecting_negotiated_max_observation_rate_higher() {
7596 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7597 let now = Instant::now();
7598 let mut state = AddressDiscoveryState::new(&config, now);
7599
7600 state
7602 .sent_observations
7603 .insert(0, paths::PathAddressInfo::new());
7604 state
7605 .sent_observations
7606 .get_mut(&0)
7607 .unwrap()
7608 .observed_address = Some(SocketAddr::new(
7609 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7610 8080,
7611 ));
7612
7613 state.update_rate_limit(5.0);
7615
7616 state.max_observation_rate = 20; for _ in 0..5 {
7621 assert!(state.should_send_observation(0, now));
7622 state.record_observation_sent(0);
7623 state.sent_observations.get_mut(&0).unwrap().notified = false;
7625 }
7626 assert!(!state.should_send_observation(0, now));
7628 }
7629
7630 #[test]
7631 fn respecting_negotiated_max_observation_rate_dynamic_update() {
7632 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7633 let now = Instant::now();
7634 let mut state = AddressDiscoveryState::new(&config, now);
7635
7636 state
7638 .sent_observations
7639 .insert(0, paths::PathAddressInfo::new());
7640 state
7641 .sent_observations
7642 .get_mut(&0)
7643 .unwrap()
7644 .observed_address = Some(SocketAddr::new(
7645 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7646 8080,
7647 ));
7648
7649 for _ in 0..5 {
7651 assert!(state.should_send_observation(0, now));
7652 state.record_observation_sent(0);
7653 state.sent_observations.get_mut(&0).unwrap().notified = false;
7655 }
7656
7657 state.max_observation_rate = 3;
7661 state.rate_limiter.set_rate(3);
7662
7663 for _ in 0..3 {
7666 assert!(state.should_send_observation(0, now));
7667 state.record_observation_sent(0);
7668 state.sent_observations.get_mut(&0).unwrap().notified = false;
7670 }
7671
7672 assert!(!state.should_send_observation(0, now));
7674
7675 let later = now + Duration::from_secs(1);
7677 for _ in 0..3 {
7678 assert!(state.should_send_observation(0, later));
7679 state.record_observation_sent(0);
7680 state.sent_observations.get_mut(&0).unwrap().notified = false;
7682 }
7683
7684 assert!(!state.should_send_observation(0, later));
7686 }
7687
7688 #[test]
7689 fn respecting_negotiated_max_observation_rate_with_paths() {
7690 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7691 let now = Instant::now();
7692 let mut state = AddressDiscoveryState::new(&config, now);
7693
7694 state.observe_all_paths = true;
7696
7697 for i in 0..3 {
7699 state
7700 .sent_observations
7701 .insert(i, paths::PathAddressInfo::new());
7702 state
7703 .sent_observations
7704 .get_mut(&i)
7705 .unwrap()
7706 .observed_address = Some(SocketAddr::new(
7707 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100 + i as u8)),
7708 5000,
7709 ));
7710 }
7711
7712 for _ in 0..3 {
7715 for i in 0..3 {
7717 if state.should_send_observation(i, now) {
7718 state.record_observation_sent(i);
7719 state.sent_observations.get_mut(&i).unwrap().notified = false;
7721 }
7722 }
7723 }
7724
7725 assert!(state.should_send_observation(0, now));
7728 state.record_observation_sent(0);
7729
7730 assert!(!state.should_send_observation(0, now));
7732 assert!(!state.should_send_observation(1, now));
7733 assert!(!state.should_send_observation(2, now));
7734 }
7735
7736 #[test]
7737 fn queue_observed_address_frame_basic() {
7738 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7739 let now = Instant::now();
7740 let mut state = AddressDiscoveryState::new(&config, now);
7741
7742 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7744 let frame = state.queue_observed_address_frame(0, address);
7745
7746 assert!(frame.is_some());
7748 let frame = frame.unwrap();
7749 assert_eq!(frame.address, address);
7750
7751 assert!(state.sent_observations.contains_key(&0));
7753 assert!(state.sent_observations.get(&0).unwrap().notified);
7754 }
7755
7756 #[test]
7757 fn queue_observed_address_frame_rate_limited() {
7758 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7759 let now = Instant::now();
7760 let mut state = AddressDiscoveryState::new(&config, now);
7761
7762 state.observe_all_paths = true;
7764
7765 let mut addresses = Vec::new();
7767 for i in 0..10 {
7768 let addr = SocketAddr::new(
7769 IpAddr::V4(Ipv4Addr::new(192, 168, 1, i as u8)),
7770 5000 + i as u16,
7771 );
7772 addresses.push(addr);
7773 assert!(
7774 state.queue_observed_address_frame(i as u64, addr).is_some(),
7775 "Frame {} should be allowed",
7776 i + 1
7777 );
7778 }
7779
7780 let addr11 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 11)), 5011);
7782 assert!(
7783 state.queue_observed_address_frame(10, addr11).is_none(),
7784 "11th frame should be rate limited"
7785 );
7786 }
7787
7788 #[test]
7789 fn queue_observed_address_frame_disabled() {
7790 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7791 let now = Instant::now();
7792 let mut state = AddressDiscoveryState::new(&config, now);
7793
7794 state.enabled = false;
7796
7797 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7798
7799 assert!(state.queue_observed_address_frame(0, address).is_none());
7801 }
7802
7803 #[test]
7804 fn queue_observed_address_frame_already_notified() {
7805 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7806 let now = Instant::now();
7807 let mut state = AddressDiscoveryState::new(&config, now);
7808
7809 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7810
7811 assert!(state.queue_observed_address_frame(0, address).is_some());
7813
7814 assert!(state.queue_observed_address_frame(0, address).is_none());
7816
7817 let new_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7819 assert!(state.queue_observed_address_frame(0, new_address).is_none());
7820 }
7821
7822 #[test]
7823 fn queue_observed_address_frame_primary_path_only() {
7824 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7825 let now = Instant::now();
7826 let mut state = AddressDiscoveryState::new(&config, now);
7827
7828 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7829
7830 assert!(state.queue_observed_address_frame(0, address).is_some());
7832
7833 assert!(state.queue_observed_address_frame(1, address).is_none());
7835 assert!(state.queue_observed_address_frame(2, address).is_none());
7836 }
7837
7838 #[test]
7839 fn queue_observed_address_frame_updates_path_info() {
7840 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7841 let now = Instant::now();
7842 let mut state = AddressDiscoveryState::new(&config, now);
7843
7844 let address = SocketAddr::new(
7845 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
7846 5000,
7847 );
7848
7849 let frame = state.queue_observed_address_frame(0, address);
7851 assert!(frame.is_some());
7852
7853 let path_info = state.sent_observations.get(&0).unwrap();
7855 assert_eq!(path_info.observed_address, Some(address));
7856 assert!(path_info.notified);
7857
7858 assert_eq!(state.received_history.len(), 0);
7861 }
7862
7863 #[test]
7864 fn retransmits_includes_outbound_observations() {
7865 use crate::connection::spaces::Retransmits;
7866
7867 let mut retransmits = Retransmits::default();
7869
7870 assert!(retransmits.outbound_observations.is_empty());
7872
7873 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7875 let frame = frame::ObservedAddress {
7876 sequence_number: VarInt::from_u32(1),
7877 address,
7878 };
7879 retransmits.outbound_observations.push(frame);
7880
7881 assert_eq!(retransmits.outbound_observations.len(), 1);
7883 assert_eq!(retransmits.outbound_observations[0].address, address);
7884 }
7885
7886 #[test]
7887 fn check_for_address_observations_no_peer_support() {
7888 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7889 let now = Instant::now();
7890 let mut state = AddressDiscoveryState::new(&config, now);
7891
7892 state
7894 .sent_observations
7895 .insert(0, paths::PathAddressInfo::new());
7896 state
7897 .sent_observations
7898 .get_mut(&0)
7899 .unwrap()
7900 .observed_address = Some(SocketAddr::new(
7901 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)),
7902 5000,
7903 ));
7904
7905 let frames = state.check_for_address_observations(0, false, now);
7907
7908 assert!(frames.is_empty());
7910 }
7911
7912 #[test]
7913 fn check_for_address_observations_with_peer_support() {
7914 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7915 let now = Instant::now();
7916 let mut state = AddressDiscoveryState::new(&config, now);
7917
7918 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7920 state
7921 .sent_observations
7922 .insert(0, paths::PathAddressInfo::new());
7923 state
7924 .sent_observations
7925 .get_mut(&0)
7926 .unwrap()
7927 .observed_address = Some(address);
7928
7929 let frames = state.check_for_address_observations(0, true, now);
7931
7932 assert_eq!(frames.len(), 1);
7934 assert_eq!(frames[0].address, address);
7935
7936 assert!(state.sent_observations.get(&0).unwrap().notified);
7938 }
7939
7940 #[test]
7941 fn check_for_address_observations_rate_limited() {
7942 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7943 let now = Instant::now();
7944 let mut state = AddressDiscoveryState::new(&config, now);
7945
7946 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7948 state
7949 .sent_observations
7950 .insert(0, paths::PathAddressInfo::new());
7951 state
7952 .sent_observations
7953 .get_mut(&0)
7954 .unwrap()
7955 .observed_address = Some(address);
7956
7957 for _ in 0..10 {
7959 let frames = state.check_for_address_observations(0, true, now);
7960 if frames.is_empty() {
7961 break;
7962 }
7963 state.sent_observations.get_mut(&0).unwrap().notified = false;
7965 }
7966
7967 assert_eq!(state.rate_limiter.tokens, 0.0);
7969
7970 state.sent_observations.get_mut(&0).unwrap().notified = false;
7972
7973 let frames2 = state.check_for_address_observations(0, true, now);
7975 assert_eq!(frames2.len(), 0);
7976
7977 state.sent_observations.get_mut(&0).unwrap().notified = false;
7979
7980 let later = now + Duration::from_millis(200); let frames3 = state.check_for_address_observations(0, true, later);
7983 assert_eq!(frames3.len(), 1);
7984 }
7985
7986 #[test]
7987 fn check_for_address_observations_multiple_paths() {
7988 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7989 let now = Instant::now();
7990 let mut state = AddressDiscoveryState::new(&config, now);
7991
7992 state.observe_all_paths = true;
7994
7995 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7997 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7998
7999 state
8000 .sent_observations
8001 .insert(0, paths::PathAddressInfo::new());
8002 state
8003 .sent_observations
8004 .get_mut(&0)
8005 .unwrap()
8006 .observed_address = Some(addr1);
8007
8008 state
8009 .sent_observations
8010 .insert(1, paths::PathAddressInfo::new());
8011 state
8012 .sent_observations
8013 .get_mut(&1)
8014 .unwrap()
8015 .observed_address = Some(addr2);
8016
8017 let frames = state.check_for_address_observations(0, true, now);
8019
8020 assert_eq!(frames.len(), 2);
8022
8023 let addresses: Vec<_> = frames.iter().map(|f| f.address).collect();
8025 assert!(addresses.contains(&addr1));
8026 assert!(addresses.contains(&addr2));
8027
8028 assert!(state.sent_observations.get(&0).unwrap().notified);
8030 assert!(state.sent_observations.get(&1).unwrap().notified);
8031 }
8032
8033 #[test]
8035 fn test_rate_limiter_configuration() {
8036 let state = AddressDiscoveryState::new_with_params(true, 10.0, false);
8038 assert_eq!(state.rate_limiter.rate, 10.0);
8039 assert_eq!(state.rate_limiter.max_tokens, 10.0);
8040 assert_eq!(state.rate_limiter.tokens, 10.0);
8041
8042 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
8043 assert_eq!(state.rate_limiter.rate, 63.0);
8044 assert_eq!(state.rate_limiter.max_tokens, 63.0);
8045 }
8046
8047 #[test]
8048 fn test_rate_limiter_update_configuration() {
8049 let mut state = AddressDiscoveryState::new_with_params(true, 5.0, false);
8050
8051 assert_eq!(state.rate_limiter.rate, 5.0);
8053
8054 state.update_rate_limit(10.0);
8056 assert_eq!(state.rate_limiter.rate, 10.0);
8057 assert_eq!(state.rate_limiter.max_tokens, 10.0);
8058
8059 state.rate_limiter.tokens = 15.0;
8061 state.update_rate_limit(8.0);
8062 assert_eq!(state.rate_limiter.tokens, 8.0);
8063 }
8064
8065 #[test]
8066 fn test_rate_limiter_from_transport_params() {
8067 let mut params = TransportParameters::default();
8068 params.address_discovery = Some(AddressDiscoveryConfig::SendAndReceive);
8069
8070 let state = AddressDiscoveryState::from_transport_params(¶ms);
8071 assert!(state.is_some());
8072 let state = state.unwrap();
8073 assert_eq!(state.rate_limiter.rate, 10.0); assert!(!state.observe_all_paths); }
8076
8077 #[test]
8078 fn test_rate_limiter_zero_rate() {
8079 let state = AddressDiscoveryState::new_with_params(true, 0.0, false);
8080 assert_eq!(state.rate_limiter.rate, 0.0);
8081 assert_eq!(state.rate_limiter.tokens, 0.0);
8082
8083 let address = "192.168.1.1:443".parse().unwrap();
8085 let mut state = AddressDiscoveryState::new_with_params(true, 0.0, false);
8086 let frame = state.queue_observed_address_frame(0, address);
8087 assert!(frame.is_none());
8088 }
8089
8090 #[test]
8091 fn test_rate_limiter_configuration_edge_cases() {
8092 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
8094 assert_eq!(state.rate_limiter.rate, 63.0);
8095
8096 let state = AddressDiscoveryState::new_with_params(true, 100.0, false);
8098 assert_eq!(state.rate_limiter.rate, 100.0);
8100
8101 let state = AddressDiscoveryState::new_with_params(true, 2.5, false);
8103 assert_eq!(state.rate_limiter.rate, 2.0);
8105 }
8106
8107 #[test]
8108 fn test_rate_limiter_runtime_update() {
8109 let mut state = AddressDiscoveryState::new_with_params(true, 10.0, false);
8110 let now = Instant::now();
8111
8112 state.rate_limiter.tokens = 5.0;
8114
8115 state.update_rate_limit(3.0);
8117
8118 assert_eq!(state.rate_limiter.tokens, 3.0);
8120 assert_eq!(state.rate_limiter.rate, 3.0);
8121 assert_eq!(state.rate_limiter.max_tokens, 3.0);
8122
8123 let later = now + Duration::from_secs(1);
8125 state.rate_limiter.update_tokens(later);
8126
8127 assert_eq!(state.rate_limiter.tokens, 3.0);
8129 }
8130
8131 #[test]
8133 fn test_address_discovery_state_initialization_default() {
8134 let now = Instant::now();
8136 let default_config = crate::transport_parameters::AddressDiscoveryConfig::default();
8137
8138 let address_discovery_state = Some(AddressDiscoveryState::new(&default_config, now));
8141
8142 assert!(address_discovery_state.is_some());
8143 let state = address_discovery_state.unwrap();
8144
8145 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
8149 }
8150
8151 #[test]
8152 fn test_address_discovery_state_initialization_on_handshake() {
8153 let now = Instant::now();
8155
8156 let mut address_discovery_state = Some(AddressDiscoveryState::new(
8158 &crate::transport_parameters::AddressDiscoveryConfig::default(),
8159 now,
8160 ));
8161
8162 let peer_params = TransportParameters {
8164 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8165 ..TransportParameters::default()
8166 };
8167
8168 if let Some(peer_config) = &peer_params.address_discovery {
8170 address_discovery_state = Some(AddressDiscoveryState::new(peer_config, now));
8172 }
8173
8174 assert!(address_discovery_state.is_some());
8176 let state = address_discovery_state.unwrap();
8177 assert!(state.enabled);
8178 assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths); }
8182
8183 #[test]
8184 fn test_address_discovery_negotiation_disabled_peer() {
8185 let now = Instant::now();
8187
8188 let our_config = AddressDiscoveryConfig::SendAndReceive;
8190 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8191
8192 let peer_params = TransportParameters {
8194 address_discovery: None,
8195 ..TransportParameters::default()
8196 };
8197
8198 if peer_params.address_discovery.is_none() {
8200 if let Some(state) = &mut address_discovery_state {
8201 state.enabled = false;
8202 }
8203 }
8204
8205 let state = address_discovery_state.unwrap();
8207 assert!(!state.enabled); }
8209
8210 #[test]
8211 fn test_address_discovery_negotiation_rate_limiting() {
8212 let now = Instant::now();
8214
8215 let our_config = AddressDiscoveryConfig::SendAndReceive;
8217 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8218
8219 if let Some(state) = &mut address_discovery_state {
8221 state.max_observation_rate = 30;
8222 state.update_rate_limit(30.0);
8223 }
8224
8225 let peer_params = TransportParameters {
8227 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8228 ..TransportParameters::default()
8229 };
8230
8231 if let (Some(state), Some(_peer_config)) =
8234 (&mut address_discovery_state, &peer_params.address_discovery)
8235 {
8236 let peer_rate = 15u8;
8239 let negotiated_rate = state.max_observation_rate.min(peer_rate);
8240 state.update_rate_limit(negotiated_rate as f64);
8241 }
8242
8243 let state = address_discovery_state.unwrap();
8245 assert_eq!(state.rate_limiter.rate, 15.0); }
8247
8248 #[test]
8249 fn test_address_discovery_path_initialization() {
8250 let now = Instant::now();
8252 let config = AddressDiscoveryConfig::SendAndReceive;
8253 let mut state = AddressDiscoveryState::new(&config, now);
8254
8255 assert!(state.sent_observations.is_empty());
8257 assert!(state.received_observations.is_empty());
8258
8259 let should_send = state.should_send_observation(0, now);
8261 assert!(should_send); }
8266
8267 #[test]
8268 fn test_address_discovery_multiple_path_initialization() {
8269 let now = Instant::now();
8271 let config = AddressDiscoveryConfig::SendAndReceive;
8272 let mut state = AddressDiscoveryState::new(&config, now);
8273
8274 assert!(state.should_send_observation(0, now)); assert!(!state.should_send_observation(1, now)); assert!(!state.should_send_observation(2, now)); state.observe_all_paths = true;
8281 assert!(state.should_send_observation(1, now)); assert!(state.should_send_observation(2, now)); let config_primary_only = AddressDiscoveryConfig::SendAndReceive;
8286 let mut state_primary = AddressDiscoveryState::new(&config_primary_only, now);
8287
8288 assert!(state_primary.should_send_observation(0, now)); assert!(!state_primary.should_send_observation(1, now)); }
8291
8292 #[test]
8293 fn test_handle_observed_address_frame_valid() {
8294 let now = Instant::now();
8296 let config = AddressDiscoveryConfig::SendAndReceive;
8297 let mut state = AddressDiscoveryState::new(&config, now);
8298
8299 let observed_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8301 state.handle_observed_address(observed_addr, 0, now);
8302
8303 assert_eq!(state.received_history.len(), 1);
8305 assert_eq!(state.received_history[0].address, observed_addr);
8306 assert_eq!(state.received_history[0].path_id, 0);
8307 assert_eq!(state.received_history[0].received_at, now);
8308
8309 let path_info = state.received_observations.get(&0).unwrap();
8311 assert_eq!(path_info.observed_address, Some(observed_addr));
8312 assert_eq!(path_info.last_observed, Some(now));
8313 assert_eq!(path_info.observation_count, 1);
8314 }
8315
8316 #[test]
8317 fn test_handle_multiple_received_history() {
8318 let now = Instant::now();
8320 let config = AddressDiscoveryConfig::SendAndReceive;
8321 let mut state = AddressDiscoveryState::new(&config, now);
8322
8323 let addr1 = SocketAddr::from(([192, 168, 1, 100], 5000));
8325 let addr2 = SocketAddr::from(([10, 0, 0, 50], 6000));
8326 let addr3 = SocketAddr::from(([192, 168, 1, 100], 7000)); state.handle_observed_address(addr1, 0, now);
8329 state.handle_observed_address(addr2, 1, now);
8330 state.handle_observed_address(addr3, 0, now + Duration::from_millis(100));
8331
8332 assert_eq!(state.received_history.len(), 3);
8334
8335 let path0_info = state.received_observations.get(&0).unwrap();
8337 assert_eq!(path0_info.observed_address, Some(addr3));
8338 assert_eq!(path0_info.observation_count, 1); let path1_info = state.received_observations.get(&1).unwrap();
8342 assert_eq!(path1_info.observed_address, Some(addr2));
8343 assert_eq!(path1_info.observation_count, 1);
8344 }
8345
8346 #[test]
8347 fn test_get_observed_address() {
8348 let now = Instant::now();
8350 let config = AddressDiscoveryConfig::SendAndReceive;
8351 let mut state = AddressDiscoveryState::new(&config, now);
8352
8353 assert_eq!(state.get_observed_address(0), None);
8355
8356 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8358 state.handle_observed_address(addr, 0, now);
8359
8360 assert_eq!(state.get_observed_address(0), Some(addr));
8362
8363 assert_eq!(state.get_observed_address(999), None);
8365 }
8366
8367 #[test]
8368 fn test_has_unnotified_changes() {
8369 let now = Instant::now();
8371 let config = AddressDiscoveryConfig::SendAndReceive;
8372 let mut state = AddressDiscoveryState::new(&config, now);
8373
8374 assert!(!state.has_unnotified_changes());
8376
8377 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8379 state.handle_observed_address(addr, 0, now);
8380 assert!(state.has_unnotified_changes());
8381
8382 if let Some(path_info) = state.received_observations.get_mut(&0) {
8384 path_info.notified = true;
8385 }
8386 assert!(!state.has_unnotified_changes());
8387
8388 let addr2 = SocketAddr::from(([192, 168, 1, 100], 6000));
8390 state.handle_observed_address(addr2, 0, now + Duration::from_secs(1));
8391 assert!(state.has_unnotified_changes());
8392 }
8393
8394 #[test]
8395 fn test_address_discovery_disabled() {
8396 let now = Instant::now();
8398 let config = AddressDiscoveryConfig::SendAndReceive;
8399 let mut state = AddressDiscoveryState::new(&config, now);
8400
8401 state.enabled = false;
8403
8404 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8406 state.handle_observed_address(addr, 0, now);
8407
8408 assert_eq!(state.received_history.len(), 0);
8410
8411 assert!(!state.should_send_observation(0, now));
8413 }
8414
8415 #[test]
8416 fn test_rate_limiting_basic() {
8417 let now = Instant::now();
8419 let config = AddressDiscoveryConfig::SendAndReceive;
8420 let mut state = AddressDiscoveryState::new(&config, now);
8421
8422 state.observe_all_paths = true;
8424 state.rate_limiter.set_rate(2); assert!(state.should_send_observation(0, now));
8428 state.record_observation_sent(0);
8430
8431 assert!(state.should_send_observation(1, now));
8433 state.record_observation_sent(1);
8434
8435 assert!(!state.should_send_observation(2, now));
8437
8438 let later = now + Duration::from_millis(500);
8440 assert!(state.should_send_observation(3, later));
8441 state.record_observation_sent(3);
8442
8443 assert!(!state.should_send_observation(4, later));
8445
8446 let _one_sec_later = now + Duration::from_secs(1);
8450 let two_sec_later = now + Duration::from_secs(2);
8454 assert!(state.should_send_observation(5, two_sec_later));
8455 state.record_observation_sent(5);
8456
8457 assert!(state.should_send_observation(6, two_sec_later));
8468 state.record_observation_sent(6);
8469
8470 assert!(
8472 !state.should_send_observation(7, two_sec_later),
8473 "Expected no tokens available"
8474 );
8475 }
8476
8477 #[test]
8478 fn test_rate_limiting_per_path() {
8479 let now = Instant::now();
8481 let config = AddressDiscoveryConfig::SendAndReceive;
8482 let mut state = AddressDiscoveryState::new(&config, now);
8483
8484 state
8486 .sent_observations
8487 .insert(0, paths::PathAddressInfo::new());
8488 state
8489 .sent_observations
8490 .get_mut(&0)
8491 .unwrap()
8492 .observed_address = Some(SocketAddr::new(
8493 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
8494 8080,
8495 ));
8496
8497 for _ in 0..10 {
8499 assert!(state.should_send_observation(0, now));
8500 state.record_observation_sent(0);
8501 state.sent_observations.get_mut(&0).unwrap().notified = false;
8503 }
8504
8505 assert!(!state.should_send_observation(0, now));
8507
8508 let later = now + Duration::from_millis(100);
8510 assert!(state.should_send_observation(0, later));
8511 state.record_observation_sent(0);
8512
8513 state.sent_observations.get_mut(&0).unwrap().notified = false;
8515
8516 assert!(!state.should_send_observation(0, later));
8518 }
8519
8520 #[test]
8521 fn test_rate_limiting_zero_rate() {
8522 let now = Instant::now();
8524 let config = AddressDiscoveryConfig::SendAndReceive;
8525 let mut state = AddressDiscoveryState::new(&config, now);
8526
8527 state.rate_limiter.set_rate(0);
8529 state.rate_limiter.tokens = 0.0;
8530 state.rate_limiter.max_tokens = 0.0;
8531
8532 assert!(!state.should_send_observation(0, now));
8534 assert!(!state.should_send_observation(0, now + Duration::from_secs(10)));
8535 assert!(!state.should_send_observation(0, now + Duration::from_secs(100)));
8536 }
8537
8538 #[test]
8539 fn test_rate_limiting_update() {
8540 let now = Instant::now();
8542 let config = AddressDiscoveryConfig::SendAndReceive;
8543 let mut state = AddressDiscoveryState::new(&config, now);
8544
8545 state.observe_all_paths = true;
8547
8548 for i in 0..12 {
8550 state
8551 .sent_observations
8552 .insert(i, paths::PathAddressInfo::new());
8553 state
8554 .sent_observations
8555 .get_mut(&i)
8556 .unwrap()
8557 .observed_address = Some(SocketAddr::new(
8558 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8559 8080,
8560 ));
8561 }
8562
8563 for i in 0..10 {
8566 assert!(state.should_send_observation(i, now));
8567 state.record_observation_sent(i);
8568 }
8569 assert!(!state.should_send_observation(10, now));
8571
8572 state.update_rate_limit(20.0);
8574
8575 let later = now + Duration::from_millis(50);
8578 assert!(state.should_send_observation(10, later));
8579 state.record_observation_sent(10);
8580
8581 let later2 = now + Duration::from_millis(100);
8583 assert!(state.should_send_observation(11, later2));
8584 }
8585
8586 #[test]
8587 fn test_rate_limiting_burst() {
8588 let now = Instant::now();
8590 let config = AddressDiscoveryConfig::SendAndReceive;
8591 let mut state = AddressDiscoveryState::new(&config, now);
8592
8593 for _ in 0..10 {
8595 assert!(state.should_send_observation(0, now));
8596 state.record_observation_sent(0);
8597 }
8598
8599 assert!(!state.should_send_observation(0, now));
8601
8602 let later = now + Duration::from_millis(100);
8604 assert!(state.should_send_observation(0, later));
8605 state.record_observation_sent(0);
8606 assert!(!state.should_send_observation(0, later));
8607 }
8608
8609 #[test]
8610 fn test_connection_rate_limiting_with_check_observations() {
8611 let now = Instant::now();
8613 let config = AddressDiscoveryConfig::SendAndReceive;
8614 let mut state = AddressDiscoveryState::new(&config, now);
8615
8616 let mut path_info = paths::PathAddressInfo::new();
8618 path_info.update_observed_address(
8619 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
8620 now,
8621 );
8622 state.sent_observations.insert(0, path_info);
8623
8624 let frame1 =
8626 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8627 assert!(frame1.is_some());
8628 state.record_observation_sent(0);
8629
8630 if let Some(info) = state.sent_observations.get_mut(&0) {
8632 info.notified = false;
8633 }
8634
8635 for _ in 1..10 {
8637 if let Some(info) = state.sent_observations.get_mut(&0) {
8639 info.notified = false;
8640 }
8641 let frame =
8642 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8643 assert!(frame.is_some());
8644 state.record_observation_sent(0);
8645 }
8646
8647 if let Some(info) = state.sent_observations.get_mut(&0) {
8649 info.notified = false;
8650 }
8651 let frame3 =
8652 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8653 assert!(frame3.is_none()); let later = now + Duration::from_millis(100);
8657 state.rate_limiter.update_tokens(later); if let Some(info) = state.sent_observations.get_mut(&0) {
8661 info.notified = false;
8662 }
8663
8664 let frame4 =
8665 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8666 assert!(frame4.is_some()); }
8668
8669 #[test]
8670 fn test_queue_observed_address_frame() {
8671 let now = Instant::now();
8673 let config = AddressDiscoveryConfig::SendAndReceive;
8674 let mut state = AddressDiscoveryState::new(&config, now);
8675
8676 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8677
8678 let frame = state.queue_observed_address_frame(0, addr);
8680 assert!(frame.is_some());
8681 assert_eq!(frame.unwrap().address, addr);
8682
8683 state.record_observation_sent(0);
8685
8686 for i in 0..9 {
8688 if let Some(info) = state.sent_observations.get_mut(&0) {
8690 info.notified = false;
8691 }
8692
8693 let frame = state.queue_observed_address_frame(0, addr);
8694 assert!(frame.is_some(), "Frame {} should be allowed", i + 2);
8695 state.record_observation_sent(0);
8696 }
8697
8698 if let Some(info) = state.sent_observations.get_mut(&0) {
8700 info.notified = false;
8701 }
8702
8703 let frame = state.queue_observed_address_frame(0, addr);
8705 assert!(frame.is_none(), "11th frame should be rate limited");
8706 }
8707
8708 #[test]
8709 fn test_multi_path_basic() {
8710 let now = Instant::now();
8712 let config = AddressDiscoveryConfig::SendAndReceive;
8713 let mut state = AddressDiscoveryState::new(&config, now);
8714
8715 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8716 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8717 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8718
8719 state.handle_observed_address(addr1, 0, now);
8721 state.handle_observed_address(addr2, 1, now);
8722 state.handle_observed_address(addr3, 2, now);
8723
8724 assert_eq!(state.get_observed_address(0), Some(addr1));
8726 assert_eq!(state.get_observed_address(1), Some(addr2));
8727 assert_eq!(state.get_observed_address(2), Some(addr3));
8728
8729 assert!(state.has_unnotified_changes());
8731
8732 assert_eq!(state.received_history.len(), 3);
8734 }
8735
8736 #[test]
8737 fn test_multi_path_observe_primary_only() {
8738 let now = Instant::now();
8740 let config = AddressDiscoveryConfig::SendAndReceive;
8741 let mut state = AddressDiscoveryState::new(&config, now);
8742
8743 assert!(state.should_send_observation(0, now));
8745 state.record_observation_sent(0);
8746
8747 assert!(!state.should_send_observation(1, now));
8749 assert!(!state.should_send_observation(2, now));
8750
8751 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8753 assert!(state.queue_observed_address_frame(0, addr).is_some());
8754 assert!(state.queue_observed_address_frame(1, addr).is_none());
8755 assert!(state.queue_observed_address_frame(2, addr).is_none());
8756 }
8757
8758 #[test]
8759 fn test_multi_path_rate_limiting() {
8760 let now = Instant::now();
8762 let config = AddressDiscoveryConfig::SendAndReceive;
8763 let mut state = AddressDiscoveryState::new(&config, now);
8764
8765 state.observe_all_paths = true;
8767
8768 for i in 0..21 {
8770 state
8771 .sent_observations
8772 .insert(i, paths::PathAddressInfo::new());
8773 state
8774 .sent_observations
8775 .get_mut(&i)
8776 .unwrap()
8777 .observed_address = Some(SocketAddr::new(
8778 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8779 8080,
8780 ));
8781 }
8782
8783 for i in 0..10 {
8785 assert!(state.should_send_observation(i, now));
8786 state.record_observation_sent(i);
8787 }
8788
8789 assert!(!state.should_send_observation(10, now));
8791
8792 state.sent_observations.get_mut(&0).unwrap().notified = false;
8794 assert!(!state.should_send_observation(0, now)); let later = now + Duration::from_secs(1);
8798 for i in 10..20 {
8799 assert!(state.should_send_observation(i, later));
8800 state.record_observation_sent(i);
8801 }
8802 assert!(!state.should_send_observation(20, later));
8804 }
8805
8806 #[test]
8807 fn test_multi_path_address_changes() {
8808 let now = Instant::now();
8810 let config = AddressDiscoveryConfig::SendAndReceive;
8811 let mut state = AddressDiscoveryState::new(&config, now);
8812
8813 let addr1a = SocketAddr::from(([192, 168, 1, 1], 5000));
8814 let addr1b = SocketAddr::from(([192, 168, 1, 2], 5000));
8815 let addr2a = SocketAddr::from(([10, 0, 0, 1], 6000));
8816 let addr2b = SocketAddr::from(([10, 0, 0, 2], 6000));
8817
8818 state.handle_observed_address(addr1a, 0, now);
8820 state.handle_observed_address(addr2a, 1, now);
8821
8822 if let Some(info) = state.received_observations.get_mut(&0) {
8824 info.notified = true;
8825 }
8826 if let Some(info) = state.received_observations.get_mut(&1) {
8827 info.notified = true;
8828 }
8829 assert!(!state.has_unnotified_changes());
8830
8831 state.handle_observed_address(addr1b, 0, now + Duration::from_secs(1));
8833 assert!(state.has_unnotified_changes());
8834
8835 assert_eq!(state.get_observed_address(0), Some(addr1b));
8837 assert_eq!(state.get_observed_address(1), Some(addr2a));
8838
8839 if let Some(info) = state.received_observations.get_mut(&0) {
8841 info.notified = true;
8842 }
8843 assert!(!state.has_unnotified_changes());
8844
8845 state.handle_observed_address(addr2b, 1, now + Duration::from_secs(2));
8847 assert!(state.has_unnotified_changes());
8848 }
8849
8850 #[test]
8851 fn test_multi_path_migration() {
8852 let now = Instant::now();
8854 let config = AddressDiscoveryConfig::SendAndReceive;
8855 let mut state = AddressDiscoveryState::new(&config, now);
8856
8857 let addr_old = SocketAddr::from(([192, 168, 1, 1], 5000));
8858 let addr_new = SocketAddr::from(([10, 0, 0, 1], 6000));
8859
8860 state.handle_observed_address(addr_old, 0, now);
8862 assert_eq!(state.get_observed_address(0), Some(addr_old));
8863
8864 state.handle_observed_address(addr_new, 1, now + Duration::from_secs(1));
8866
8867 assert_eq!(state.get_observed_address(0), Some(addr_old));
8869 assert_eq!(state.get_observed_address(1), Some(addr_new));
8870
8871 assert_eq!(state.received_observations.len(), 2);
8874 }
8875
8876 #[test]
8877 fn test_check_for_address_observations_multi_path() {
8878 let now = Instant::now();
8880 let config = AddressDiscoveryConfig::SendAndReceive;
8881 let mut state = AddressDiscoveryState::new(&config, now);
8882
8883 state.observe_all_paths = true;
8885
8886 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8888 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8889 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8890
8891 state
8893 .sent_observations
8894 .insert(0, paths::PathAddressInfo::new());
8895 state
8896 .sent_observations
8897 .get_mut(&0)
8898 .unwrap()
8899 .observed_address = Some(addr1);
8900 state
8901 .sent_observations
8902 .insert(1, paths::PathAddressInfo::new());
8903 state
8904 .sent_observations
8905 .get_mut(&1)
8906 .unwrap()
8907 .observed_address = Some(addr2);
8908 state
8909 .sent_observations
8910 .insert(2, paths::PathAddressInfo::new());
8911 state
8912 .sent_observations
8913 .get_mut(&2)
8914 .unwrap()
8915 .observed_address = Some(addr3);
8916
8917 let frames = state.check_for_address_observations(0, true, now);
8919
8920 assert_eq!(frames.len(), 3);
8922
8923 let frame_addrs: Vec<_> = frames.iter().map(|f| f.address).collect();
8925 assert!(frame_addrs.contains(&addr1), "addr1 should be in frames");
8926 assert!(frame_addrs.contains(&addr2), "addr2 should be in frames");
8927 assert!(frame_addrs.contains(&addr3), "addr3 should be in frames");
8928
8929 assert!(!state.has_unnotified_changes());
8931 }
8932
8933 #[test]
8934 fn test_multi_path_with_peer_not_supporting() {
8935 let now = Instant::now();
8937 let config = AddressDiscoveryConfig::SendAndReceive;
8938 let mut state = AddressDiscoveryState::new(&config, now);
8939
8940 state.handle_observed_address(SocketAddr::from(([192, 168, 1, 1], 5000)), 0, now);
8942 state.handle_observed_address(SocketAddr::from(([10, 0, 0, 1], 6000)), 1, now);
8943
8944 let frames = state.check_for_address_observations(0, false, now);
8946 assert_eq!(frames.len(), 0);
8947
8948 assert!(state.has_unnotified_changes());
8950 }
8951
8952 #[test]
8954 fn test_bootstrap_node_aggressive_observation_mode() {
8955 let config = AddressDiscoveryConfig::SendAndReceive;
8957 let now = Instant::now();
8958 let mut state = AddressDiscoveryState::new(&config, now);
8959
8960 assert!(!state.is_bootstrap_mode());
8962
8963 state.set_bootstrap_mode(true);
8965 assert!(state.is_bootstrap_mode());
8966
8967 assert!(state.should_observe_path(0)); assert!(state.should_observe_path(1)); assert!(state.should_observe_path(2));
8971
8972 let bootstrap_rate = state.get_effective_rate_limit();
8974 assert!(bootstrap_rate > 10.0); }
8976
8977 #[test]
8978 fn test_bootstrap_node_immediate_observation() {
8979 let config = AddressDiscoveryConfig::SendAndReceive;
8981 let now = Instant::now();
8982 let mut state = AddressDiscoveryState::new(&config, now);
8983 state.set_bootstrap_mode(true);
8984
8985 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8987 state.handle_observed_address(addr, 0, now);
8988
8989 assert!(state.should_send_observation_immediately(true));
8991
8992 assert!(state.should_send_observation(0, now));
8994
8995 let frame = state.queue_observed_address_frame(0, addr);
8997 assert!(frame.is_some());
8998 }
8999
9000 #[test]
9001 fn test_bootstrap_node_multiple_path_observations() {
9002 let config = AddressDiscoveryConfig::SendAndReceive;
9004 let now = Instant::now();
9005 let mut state = AddressDiscoveryState::new(&config, now);
9006 state.set_bootstrap_mode(true);
9007
9008 let addrs = vec![
9010 (0u64, SocketAddr::from(([192, 168, 1, 1], 5000))),
9011 (1u64, SocketAddr::from(([10, 0, 0, 1], 6000))),
9012 (2u64, SocketAddr::from(([172, 16, 0, 1], 7000))),
9013 ];
9014
9015 for (path_id, addr) in &addrs {
9016 state
9017 .sent_observations
9018 .insert(*path_id, paths::PathAddressInfo::new());
9019 state
9020 .sent_observations
9021 .get_mut(path_id)
9022 .unwrap()
9023 .observed_address = Some(*addr);
9024 }
9025
9026 let frames = state.check_for_address_observations(0, true, now);
9028 assert_eq!(frames.len(), 3);
9029
9030 for (_, addr) in &addrs {
9032 assert!(frames.iter().any(|f| f.address == *addr));
9033 }
9034 }
9035
9036 #[test]
9037 fn test_bootstrap_node_rate_limit_override() {
9038 let config = AddressDiscoveryConfig::SendAndReceive;
9040 let now = Instant::now();
9041 let mut state = AddressDiscoveryState::new(&config, now);
9042 state.set_bootstrap_mode(true);
9043
9044 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
9046
9047 for i in 0..10 {
9049 state.handle_observed_address(addr, i, now);
9050 let can_send = state.should_send_observation(i, now);
9051 assert!(can_send, "Bootstrap node should send observation {i}");
9052 state.record_observation_sent(i);
9053 }
9054 }
9055
9056 #[test]
9057 fn test_bootstrap_node_configuration() {
9058 let config = AddressDiscoveryConfig::SendAndReceive;
9060 let mut state = AddressDiscoveryState::new(&config, Instant::now());
9061
9062 state.set_bootstrap_mode(true);
9064
9065 assert!(state.bootstrap_mode);
9067 assert!(state.enabled);
9068
9069 let effective_rate = state.get_effective_rate_limit();
9071 assert!(effective_rate > state.max_observation_rate as f64);
9072 }
9073
9074 #[test]
9075 fn test_bootstrap_node_persistent_observation() {
9076 let config = AddressDiscoveryConfig::SendAndReceive;
9078 let mut now = Instant::now();
9079 let mut state = AddressDiscoveryState::new(&config, now);
9080 state.set_bootstrap_mode(true);
9081
9082 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
9083 let addr2 = SocketAddr::from(([192, 168, 1, 2], 5000));
9084
9085 state.handle_observed_address(addr1, 0, now);
9087 assert!(state.should_send_observation(0, now));
9088 state.record_observation_sent(0);
9089
9090 now += Duration::from_secs(60);
9092 state.handle_observed_address(addr2, 0, now);
9093
9094 assert!(state.should_send_observation(0, now));
9096 }
9097
9098 #[test]
9099 fn test_bootstrap_node_multi_peer_support() {
9100 let config = AddressDiscoveryConfig::SendAndReceive;
9103 let now = Instant::now();
9104 let mut state = AddressDiscoveryState::new(&config, now);
9105 state.set_bootstrap_mode(true);
9106
9107 let peer_addresses: Vec<(u64, SocketAddr)> = vec![
9109 (0, SocketAddr::from(([192, 168, 1, 1], 5000))), (1, SocketAddr::from(([10, 0, 0, 1], 6000))), (2, SocketAddr::from(([172, 16, 0, 1], 7000))), (3, SocketAddr::from(([192, 168, 2, 1], 8000))), ];
9114
9115 for (path_id, addr) in &peer_addresses {
9117 state
9118 .sent_observations
9119 .insert(*path_id, paths::PathAddressInfo::new());
9120 state
9121 .sent_observations
9122 .get_mut(path_id)
9123 .unwrap()
9124 .observed_address = Some(*addr);
9125 }
9126
9127 let frames = state.check_for_address_observations(0, true, now);
9129 assert_eq!(frames.len(), peer_addresses.len());
9130
9131 for (_, addr) in &peer_addresses {
9133 assert!(frames.iter().any(|f| f.address == *addr));
9134 }
9135 }
9136
9137 mod address_discovery_tests {
9139 include!("address_discovery_tests.rs");
9140 }
9141}