1#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
9use std::{
10 cmp,
11 collections::VecDeque,
12 convert::TryFrom,
13 fmt, io, mem,
14 net::{IpAddr, SocketAddr},
15 sync::Arc,
16};
17
18use bytes::{Bytes, BytesMut};
19use frame::StreamMetaVec;
20use rand::{Rng, SeedableRng, rngs::StdRng};
23use thiserror::Error;
24use tracing::{debug, error, info, trace, trace_span, warn};
25
26use crate::{
27 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
28 MIN_INITIAL_SIZE, MtuDiscoveryConfig, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
29 TransportError, TransportErrorCode, VarInt,
30 cid_generator::ConnectionIdGenerator,
31 cid_queue::CidQueue,
32 coding::BufMutExt,
33 config::{ServerConfig, TransportConfig},
34 crypto::{self, KeyPair, Keys, PacketKey},
35 endpoint::AddressDiscoveryStats,
36 frame::{self, Close, Datagram, FrameStruct, NewToken},
37 nat_traversal_api::PeerId,
38 packet::{
39 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
40 PacketNumber, PartialDecode, SpaceId,
41 },
42 range_set::ArrayRangeSet,
43 shared::{
44 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
45 EndpointEvent, EndpointEventInner,
46 },
47 token::{ResetToken, Token, TokenPayload},
48 transport_parameters::TransportParameters,
49};
50
51mod ack_frequency;
52use ack_frequency::AckFrequencyState;
53
54pub(crate) mod nat_traversal;
55use nat_traversal::NatTraversalState;
56pub(crate) use nat_traversal::{CoordinationPhase, NatTraversalError, NatTraversalRole};
57
58mod assembler;
59pub use assembler::Chunk;
60
61mod cid_state;
62use cid_state::CidState;
63
64mod datagrams;
65use datagrams::DatagramState;
66pub use datagrams::{Datagrams, SendDatagramError};
67
68mod mtud;
69use mtud::MtuDiscovery;
70
71mod pacing;
72
73mod packet_builder;
74use packet_builder::PacketBuilder;
75
76mod packet_crypto;
77use packet_crypto::{PrevCrypto, ZeroRttCrypto};
78
79mod paths;
80pub use paths::RttEstimator;
81use paths::{NatTraversalChallenges, PathData, PathResponses};
82
83mod send_buffer;
84
85mod spaces;
86#[cfg(fuzzing)]
87pub use spaces::Retransmits;
88#[cfg(not(fuzzing))]
89use spaces::Retransmits;
90use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
91
92mod stats;
93pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
94
95mod streams;
96#[cfg(fuzzing)]
97pub use streams::StreamsState;
98#[cfg(not(fuzzing))]
99use streams::StreamsState;
100pub use streams::{
101 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
102 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
103};
104
105mod timer;
106use crate::congestion::Controller;
107use timer::{Timer, TimerTable};
108
109pub struct Connection {
149 endpoint_config: Arc<EndpointConfig>,
150 config: Arc<TransportConfig>,
151 rng: StdRng,
152 crypto: Box<dyn crypto::Session>,
153 handshake_cid: ConnectionId,
155 rem_handshake_cid: ConnectionId,
157 local_ip: Option<IpAddr>,
160 path: PathData,
161 allow_mtud: bool,
163 prev_path: Option<(ConnectionId, PathData)>,
164 state: State,
165 side: ConnectionSide,
166 zero_rtt_enabled: bool,
168 zero_rtt_crypto: Option<ZeroRttCrypto>,
170 key_phase: bool,
171 key_phase_size: u64,
173 peer_params: TransportParameters,
175 orig_rem_cid: ConnectionId,
177 initial_dst_cid: ConnectionId,
179 retry_src_cid: Option<ConnectionId>,
182 lost_packets: u64,
184 events: VecDeque<Event>,
185 endpoint_events: VecDeque<EndpointEventInner>,
186 spin_enabled: bool,
188 spin: bool,
190 spaces: [PacketSpace; 3],
192 highest_space: SpaceId,
194 prev_crypto: Option<PrevCrypto>,
196 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
201 accepted_0rtt: bool,
202 permit_idle_reset: bool,
204 idle_timeout: Option<Duration>,
206 timers: TimerTable,
207 authentication_failures: u64,
209 error: Option<ConnectionError>,
211 packet_number_filter: PacketNumberFilter,
213
214 path_responses: PathResponses,
219 nat_traversal_challenges: NatTraversalChallenges,
221 close: bool,
222
223 ack_frequency: AckFrequencyState,
227
228 pto_count: u32,
233
234 receiving_ecn: bool,
239 total_authed_packets: u64,
241 app_limited: bool,
244
245 streams: StreamsState,
246 rem_cids: CidQueue,
248 local_cid_state: CidState,
250 datagrams: DatagramState,
252 stats: ConnectionStats,
254 version: u32,
256
257 nat_traversal: Option<NatTraversalState>,
259
260 nat_traversal_frame_config: frame::nat_traversal_unified::NatTraversalFrameConfig,
262
263 address_discovery_state: Option<AddressDiscoveryState>,
265
266 pqc_state: PqcState,
268
269 #[cfg(feature = "trace")]
271 trace_context: crate::tracing::TraceContext,
272
273 #[cfg(feature = "trace")]
275 event_log: Arc<crate::tracing::EventLog>,
276
277 #[cfg(feature = "__qlog")]
279 qlog_streamer: Option<Box<dyn std::io::Write + Send + Sync>>,
280
281 peer_id_for_tokens: Option<PeerId>,
283 delay_new_token_until_binding: bool,
286 expected_peer_id_from_token: Option<PeerId>,
288}
289
290impl Connection {
291 pub(crate) fn new(
292 endpoint_config: Arc<EndpointConfig>,
293 config: Arc<TransportConfig>,
294 init_cid: ConnectionId,
295 loc_cid: ConnectionId,
296 rem_cid: ConnectionId,
297 remote: SocketAddr,
298 local_ip: Option<IpAddr>,
299 crypto: Box<dyn crypto::Session>,
300 cid_gen: &dyn ConnectionIdGenerator,
301 now: Instant,
302 version: u32,
303 allow_mtud: bool,
304 rng_seed: [u8; 32],
305 side_args: SideArgs,
306 ) -> Self {
307 let pref_addr_cid = side_args.pref_addr_cid();
308 let path_validated = side_args.path_validated();
309 let connection_side = ConnectionSide::from(side_args);
310 let side = connection_side.side();
311 let initial_space = PacketSpace {
312 crypto: Some(crypto.initial_keys(&init_cid, side)),
313 ..PacketSpace::new(now)
314 };
315 let state = State::Handshake(state::Handshake {
316 rem_cid_set: side.is_server(),
317 expected_token: Bytes::new(),
318 client_hello: None,
319 });
320 let mut rng = StdRng::from_seed(rng_seed);
321 let mut this = Self {
322 endpoint_config,
323 crypto,
324 handshake_cid: loc_cid,
325 rem_handshake_cid: rem_cid,
326 local_cid_state: CidState::new(
327 cid_gen.cid_len(),
328 cid_gen.cid_lifetime(),
329 now,
330 if pref_addr_cid.is_some() { 2 } else { 1 },
331 ),
332 path: PathData::new(remote, allow_mtud, None, now, &config),
333 allow_mtud,
334 local_ip,
335 prev_path: None,
336 state,
337 side: connection_side,
338 zero_rtt_enabled: false,
339 zero_rtt_crypto: None,
340 key_phase: false,
341 key_phase_size: rng.gen_range(10..1000),
348 peer_params: TransportParameters::default(),
349 orig_rem_cid: rem_cid,
350 initial_dst_cid: init_cid,
351 retry_src_cid: None,
352 lost_packets: 0,
353 events: VecDeque::new(),
354 endpoint_events: VecDeque::new(),
355 spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
356 spin: false,
357 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
358 highest_space: SpaceId::Initial,
359 prev_crypto: None,
360 next_crypto: None,
361 accepted_0rtt: false,
362 permit_idle_reset: true,
363 idle_timeout: match config.max_idle_timeout {
364 None | Some(VarInt(0)) => None,
365 Some(dur) => Some(Duration::from_millis(dur.0)),
366 },
367 timers: TimerTable::default(),
368 authentication_failures: 0,
369 error: None,
370 #[cfg(test)]
371 packet_number_filter: match config.deterministic_packet_numbers {
372 false => PacketNumberFilter::new(&mut rng),
373 true => PacketNumberFilter::disabled(),
374 },
375 #[cfg(not(test))]
376 packet_number_filter: PacketNumberFilter::new(&mut rng),
377
378 path_responses: PathResponses::default(),
379 nat_traversal_challenges: NatTraversalChallenges::default(),
380 close: false,
381
382 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
383 &TransportParameters::default(),
384 )),
385
386 pto_count: 0,
387
388 app_limited: false,
389 receiving_ecn: false,
390 total_authed_packets: 0,
391
392 streams: StreamsState::new(
393 side,
394 config.max_concurrent_uni_streams,
395 config.max_concurrent_bidi_streams,
396 config.send_window,
397 config.receive_window,
398 config.stream_receive_window,
399 ),
400 datagrams: DatagramState::default(),
401 config,
402 rem_cids: CidQueue::new(rem_cid),
403 rng,
404 stats: ConnectionStats::default(),
405 version,
406 nat_traversal: None, nat_traversal_frame_config:
408 frame::nat_traversal_unified::NatTraversalFrameConfig::default(),
409 address_discovery_state: {
410 Some(AddressDiscoveryState::new(
413 &crate::transport_parameters::AddressDiscoveryConfig::default(),
414 now,
415 ))
416 },
417 pqc_state: PqcState::new(),
418
419 #[cfg(feature = "trace")]
420 trace_context: crate::tracing::TraceContext::new(crate::tracing::TraceId::new()),
421
422 #[cfg(feature = "trace")]
423 event_log: crate::tracing::global_log(),
424
425 #[cfg(feature = "__qlog")]
426 qlog_streamer: None,
427
428 peer_id_for_tokens: None,
429 delay_new_token_until_binding: false,
430 expected_peer_id_from_token: None,
431 };
432
433 #[cfg(feature = "trace")]
435 {
436 use crate::trace_event;
437 use crate::tracing::{Event, EventData, socket_addr_to_bytes, timestamp_now};
438 let _peer_id = {
440 let mut id = [0u8; 32];
441 let addr_bytes = match remote {
442 SocketAddr::V4(addr) => addr.ip().octets().to_vec(),
443 SocketAddr::V6(addr) => addr.ip().octets().to_vec(),
444 };
445 id[..addr_bytes.len().min(32)]
446 .copy_from_slice(&addr_bytes[..addr_bytes.len().min(32)]);
447 id
448 };
449
450 let (addr_bytes, addr_type) = socket_addr_to_bytes(remote);
451 trace_event!(
452 &this.event_log,
453 Event {
454 timestamp: timestamp_now(),
455 trace_id: this.trace_context.trace_id(),
456 sequence: 0,
457 _padding: 0,
458 node_id: [0u8; 32], event_data: EventData::ConnInit {
460 endpoint_bytes: addr_bytes,
461 addr_type,
462 _padding: [0u8; 45],
463 },
464 }
465 );
466 }
467
468 if path_validated {
469 this.on_path_validated();
470 }
471 if side.is_client() {
472 this.write_crypto();
474 this.init_0rtt();
475 }
476 this
477 }
478
479 #[cfg(feature = "__qlog")]
481 pub fn set_qlog(
482 &mut self,
483 writer: Box<dyn std::io::Write + Send + Sync>,
484 _title: Option<String>,
485 _description: Option<String>,
486 _now: Instant,
487 ) {
488 self.qlog_streamer = Some(writer);
489 }
490
491 #[cfg(feature = "__qlog")]
493 fn emit_qlog_recovery_metrics(&mut self, _now: Instant) {
494 }
497
498 #[must_use]
506 pub fn poll_timeout(&mut self) -> Option<Instant> {
507 let mut next_timeout = self.timers.next_timeout();
508
509 if let Some(nat_state) = &self.nat_traversal {
511 if let Some(nat_timeout) = nat_state.get_next_timeout(Instant::now()) {
512 self.timers.set(Timer::NatTraversal, nat_timeout);
514 next_timeout = Some(next_timeout.map_or(nat_timeout, |t| t.min(nat_timeout)));
515 }
516 }
517
518 next_timeout
519 }
520
521 #[must_use]
527 pub fn poll(&mut self) -> Option<Event> {
528 if let Some(x) = self.events.pop_front() {
529 return Some(x);
530 }
531
532 if let Some(event) = self.streams.poll() {
533 return Some(Event::Stream(event));
534 }
535
536 if let Some(err) = self.error.take() {
537 return Some(Event::ConnectionLost { reason: err });
538 }
539
540 None
541 }
542
543 #[must_use]
545 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
546 self.endpoint_events.pop_front().map(EndpointEvent)
547 }
548
549 #[must_use]
551 pub fn streams(&mut self) -> Streams<'_> {
552 Streams {
553 state: &mut self.streams,
554 conn_state: &self.state,
555 }
556 }
557
558 #[must_use]
562 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
563 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
564 RecvStream {
565 id,
566 state: &mut self.streams,
567 pending: &mut self.spaces[SpaceId::Data].pending,
568 }
569 }
570
571 #[must_use]
573 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
574 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
575 SendStream {
576 id,
577 state: &mut self.streams,
578 pending: &mut self.spaces[SpaceId::Data].pending,
579 conn_state: &self.state,
580 }
581 }
582
583 #[must_use]
593 pub fn poll_transmit(
594 &mut self,
595 now: Instant,
596 max_datagrams: usize,
597 buf: &mut Vec<u8>,
598 ) -> Option<Transmit> {
599 assert!(max_datagrams != 0);
600 let max_datagrams = match self.config.enable_segmentation_offload {
601 false => 1,
602 true => max_datagrams,
603 };
604
605 let mut num_datagrams = 0;
606 let mut datagram_start = 0;
609 let mut segment_size = usize::from(self.path.current_mtu());
610
611 if let Some(nat_traversal) = &mut self.nat_traversal {
613 if nat_traversal.check_coordination_timeout(now) {
614 trace!("NAT traversal coordination timed out, may retry");
615 }
616 }
617
618 if let Some(challenge) = self.send_nat_traversal_challenge(now, buf) {
620 return Some(challenge);
621 }
622
623 if let Some(challenge) = self.send_path_challenge(now, buf) {
624 return Some(challenge);
625 }
626
627 for space in SpaceId::iter() {
629 let request_immediate_ack =
630 space == SpaceId::Data && self.peer_supports_ack_frequency();
631 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
632 }
633
634 let close = match self.state {
636 State::Drained => {
637 self.app_limited = true;
638 return None;
639 }
640 State::Draining | State::Closed(_) => {
641 if !self.close {
644 self.app_limited = true;
645 return None;
646 }
647 true
648 }
649 _ => false,
650 };
651
652 if let Some(config) = &self.config.ack_frequency_config {
654 self.spaces[SpaceId::Data].pending.ack_frequency = self
655 .ack_frequency
656 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
657 && self.highest_space == SpaceId::Data
658 && self.peer_supports_ack_frequency();
659 }
660
661 let mut buf_capacity = 0;
665
666 let mut coalesce = true;
667 let mut builder_storage: Option<PacketBuilder> = None;
668 let mut sent_frames = None;
669 let mut pad_datagram = false;
670 let mut pad_datagram_to_mtu = false;
671 let mut congestion_blocked = false;
672
673 let mut space_idx = 0;
675 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
676 while space_idx < spaces.len() {
679 let space_id = spaces[space_idx];
680 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
687 let frame_space_1rtt =
688 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
689
690 let can_send = self.space_can_send(space_id, frame_space_1rtt);
692 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
693 space_idx += 1;
694 continue;
695 }
696
697 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
698 || self.spaces[space_id].ping_pending
699 || self.spaces[space_id].immediate_ack_pending;
700 if space_id == SpaceId::Data {
701 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
702 }
703
704 pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
705
706 let buf_end = if let Some(builder) = &builder_storage {
710 buf.len().max(builder.min_size) + builder.tag_len
711 } else {
712 buf.len()
713 };
714
715 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
716 crypto.packet.local.tag_len()
717 } else if space_id == SpaceId::Data {
718 match self.zero_rtt_crypto.as_ref() {
719 Some(crypto) => crypto.packet.tag_len(),
720 None => {
721 error!(
723 "sending packets in the application data space requires known 0-RTT or 1-RTT keys"
724 );
725 return None;
726 }
727 }
728 } else {
729 unreachable!("tried to send {:?} packet without keys", space_id)
730 };
731 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
732 if num_datagrams >= max_datagrams {
736 break;
738 }
739
740 if self
747 .path
748 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
749 {
750 trace!("blocked by anti-amplification");
751 break;
752 }
753
754 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
757 let untracked_bytes = if let Some(builder) = &builder_storage {
759 buf_capacity - builder.partial_encode.start
760 } else {
761 0
762 } as u64;
763 debug_assert!(untracked_bytes <= segment_size as u64);
764
765 let bytes_to_send = segment_size as u64 + untracked_bytes;
766 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
767 space_idx += 1;
768 congestion_blocked = true;
769 trace!("blocked by congestion control");
772 continue;
773 }
774
775 let smoothed_rtt = self.path.rtt.get();
777 if let Some(delay) = self.path.pacing.delay(
778 smoothed_rtt,
779 bytes_to_send,
780 self.path.current_mtu(),
781 self.path.congestion.window(),
782 now,
783 ) {
784 self.timers.set(Timer::Pacing, delay);
785 congestion_blocked = true;
786 trace!("blocked by pacing");
789 break;
790 }
791 }
792
793 if let Some(mut builder) = builder_storage.take() {
795 if pad_datagram {
796 let min_size = self.pqc_state.min_initial_size();
797 builder.pad_to(min_size);
798 }
799
800 if num_datagrams > 1 || pad_datagram_to_mtu {
801 const MAX_PADDING: usize = 16;
814 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
815 - datagram_start
816 + builder.tag_len;
817 if (packet_len_unpadded + MAX_PADDING < segment_size
818 && !pad_datagram_to_mtu)
819 || datagram_start + segment_size > buf_capacity
820 {
821 trace!(
822 "GSO truncated by demand for {} padding bytes or loss probe",
823 segment_size - packet_len_unpadded
824 );
825 builder_storage = Some(builder);
826 break;
827 }
828
829 builder.pad_to(segment_size as u16);
832 }
833
834 builder.finish_and_track(now, self, sent_frames.take(), buf);
835
836 if num_datagrams == 1 {
837 segment_size = buf.len();
844 buf_capacity = buf.len();
847
848 if space_id == SpaceId::Data {
855 let frame_space_1rtt =
856 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
857 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
858 break;
859 }
860 }
861 }
862 }
863
864 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
866 0 => segment_size,
867 _ => {
868 self.spaces[space_id].loss_probes -= 1;
869 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
873 }
874 };
875 buf_capacity += next_datagram_size_limit;
876 if buf.capacity() < buf_capacity {
877 buf.reserve(max_datagrams * segment_size);
886 }
887 num_datagrams += 1;
888 coalesce = true;
889 pad_datagram = false;
890 datagram_start = buf.len();
891
892 debug_assert_eq!(
893 datagram_start % segment_size,
894 0,
895 "datagrams in a GSO batch must be aligned to the segment size"
896 );
897 } else {
898 if let Some(builder) = builder_storage.take() {
902 builder.finish_and_track(now, self, sent_frames.take(), buf);
903 }
904 }
905
906 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
907
908 if self.spaces[SpaceId::Initial].crypto.is_some()
913 && space_id == SpaceId::Handshake
914 && self.side.is_client()
915 {
916 self.discard_space(now, SpaceId::Initial);
919 }
920 if let Some(ref mut prev) = self.prev_crypto {
921 prev.update_unacked = false;
922 }
923
924 debug_assert!(
925 builder_storage.is_none() && sent_frames.is_none(),
926 "Previous packet must have been finished"
927 );
928
929 let builder = builder_storage.insert(PacketBuilder::new(
930 now,
931 space_id,
932 self.rem_cids.active(),
933 buf,
934 buf_capacity,
935 datagram_start,
936 ack_eliciting,
937 self,
938 )?);
939 coalesce = coalesce && !builder.short_header;
940
941 let should_adjust_coalescing = self
943 .pqc_state
944 .should_adjust_coalescing(buf.len() - datagram_start, space_id);
945
946 if should_adjust_coalescing {
947 coalesce = false;
948 trace!("Disabling coalescing for PQC handshake in {:?}", space_id);
949 }
950
951 pad_datagram |=
953 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
954
955 if close {
956 trace!("sending CONNECTION_CLOSE");
957 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
962 Self::populate_acks(
963 now,
964 self.receiving_ecn,
965 &mut SentFrames::default(),
966 &mut self.spaces[space_id],
967 buf,
968 &mut self.stats,
969 );
970 }
971
972 debug_assert!(
976 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
977 "ACKs should leave space for ConnectionClose"
978 );
979 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
980 let max_frame_size = builder.max_size - buf.len();
981 match self.state {
982 State::Closed(state::Closed { ref reason }) => {
983 if space_id == SpaceId::Data || reason.is_transport_layer() {
984 reason.encode(buf, max_frame_size)
985 } else {
986 frame::ConnectionClose {
987 error_code: TransportErrorCode::APPLICATION_ERROR,
988 frame_type: None,
989 reason: Bytes::new(),
990 }
991 .encode(buf, max_frame_size)
992 }
993 }
994 State::Draining => frame::ConnectionClose {
995 error_code: TransportErrorCode::NO_ERROR,
996 frame_type: None,
997 reason: Bytes::new(),
998 }
999 .encode(buf, max_frame_size),
1000 _ => unreachable!(
1001 "tried to make a close packet when the connection wasn't closed"
1002 ),
1003 }
1004 }
1005 if space_id == self.highest_space {
1006 self.close = false;
1008 break;
1010 } else {
1011 space_idx += 1;
1015 continue;
1016 }
1017 }
1018
1019 if space_id == SpaceId::Data && num_datagrams == 1 {
1022 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
1023 let mut builder = builder_storage.take().unwrap();
1026 trace!("PATH_RESPONSE {:08x} (off-path)", token);
1027 buf.write(frame::FrameType::PATH_RESPONSE);
1028 buf.write(token);
1029 self.stats.frame_tx.path_response += 1;
1030 let min_size = self.pqc_state.min_initial_size();
1031 builder.pad_to(min_size);
1032 builder.finish_and_track(
1033 now,
1034 self,
1035 Some(SentFrames {
1036 non_retransmits: true,
1037 ..SentFrames::default()
1038 }),
1039 buf,
1040 );
1041 self.stats.udp_tx.on_sent(1, buf.len());
1042
1043 #[cfg(feature = "trace")]
1045 {
1046 use crate::trace_packet_sent;
1047 trace_packet_sent!(
1049 &self.event_log,
1050 self.trace_context.trace_id(),
1051 buf.len() as u32,
1052 0 );
1054 }
1055
1056 return Some(Transmit {
1057 destination: remote,
1058 size: buf.len(),
1059 ecn: None,
1060 segment_size: None,
1061 src_ip: self.local_ip,
1062 });
1063 }
1064 }
1065
1066 if space_id == SpaceId::Data && self.address_discovery_state.is_some() {
1068 let peer_supports = self.peer_params.address_discovery.is_some();
1069
1070 if let Some(state) = &mut self.address_discovery_state {
1071 let frames = state.check_for_address_observations(0, peer_supports, now);
1072 self.spaces[space_id]
1073 .pending
1074 .observed_addresses
1075 .extend(frames);
1076 }
1077 }
1078
1079 let sent =
1080 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
1081
1082 debug_assert!(
1089 !(sent.is_ack_only(&self.streams)
1090 && !can_send.acks
1091 && can_send.other
1092 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
1093 && self.datagrams.outgoing.is_empty()),
1094 "SendableFrames was {can_send:?}, but only ACKs have been written"
1095 );
1096 pad_datagram |= sent.requires_padding;
1097
1098 if sent.largest_acked.is_some() {
1099 self.spaces[space_id].pending_acks.acks_sent();
1100 self.timers.stop(Timer::MaxAckDelay);
1101 }
1102
1103 sent_frames = Some(sent);
1105
1106 }
1109
1110 if let Some(mut builder) = builder_storage {
1112 if pad_datagram {
1113 let min_size = self.pqc_state.min_initial_size();
1114 builder.pad_to(min_size);
1115 }
1116
1117 if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
1123 builder.pad_to(segment_size as u16);
1124 }
1125
1126 let last_packet_number = builder.exact_number;
1127 builder.finish_and_track(now, self, sent_frames, buf);
1128 self.path
1129 .congestion
1130 .on_sent(now, buf.len() as u64, last_packet_number);
1131
1132 #[cfg(feature = "__qlog")]
1133 self.emit_qlog_recovery_metrics(now);
1134 }
1135
1136 self.app_limited = buf.is_empty() && !congestion_blocked;
1137
1138 if buf.is_empty() && self.state.is_established() {
1140 let space_id = SpaceId::Data;
1141 let probe_size = self
1142 .path
1143 .mtud
1144 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
1145
1146 let buf_capacity = probe_size as usize;
1147 buf.reserve(buf_capacity);
1148
1149 let mut builder = PacketBuilder::new(
1150 now,
1151 space_id,
1152 self.rem_cids.active(),
1153 buf,
1154 buf_capacity,
1155 0,
1156 true,
1157 self,
1158 )?;
1159
1160 buf.write(frame::FrameType::PING);
1162 self.stats.frame_tx.ping += 1;
1163
1164 if self.peer_supports_ack_frequency() {
1166 buf.write(frame::FrameType::IMMEDIATE_ACK);
1167 self.stats.frame_tx.immediate_ack += 1;
1168 }
1169
1170 builder.pad_to(probe_size);
1171 let sent_frames = SentFrames {
1172 non_retransmits: true,
1173 ..Default::default()
1174 };
1175 builder.finish_and_track(now, self, Some(sent_frames), buf);
1176
1177 self.stats.path.sent_plpmtud_probes += 1;
1178 num_datagrams = 1;
1179
1180 trace!(?probe_size, "writing MTUD probe");
1181 }
1182
1183 if buf.is_empty() {
1184 return None;
1185 }
1186
1187 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1188 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1189
1190 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
1191
1192 #[cfg(feature = "trace")]
1194 {
1195 use crate::trace_packet_sent;
1196 let packet_num = self.spaces[SpaceId::Data]
1199 .next_packet_number
1200 .saturating_sub(1);
1201 trace_packet_sent!(
1202 &self.event_log,
1203 self.trace_context.trace_id(),
1204 buf.len() as u32,
1205 packet_num
1206 );
1207 }
1208
1209 Some(Transmit {
1210 destination: self.path.remote,
1211 size: buf.len(),
1212 ecn: if self.path.sending_ecn {
1213 Some(EcnCodepoint::Ect0)
1214 } else {
1215 None
1216 },
1217 segment_size: match num_datagrams {
1218 1 => None,
1219 _ => Some(segment_size),
1220 },
1221 src_ip: self.local_ip,
1222 })
1223 }
1224
1225 fn send_coordination_request(&mut self, _now: Instant, _buf: &mut Vec<u8>) -> Option<Transmit> {
1227 let nat = self.nat_traversal.as_mut()?;
1229 if !nat.should_send_punch_request() {
1230 return None;
1231 }
1232
1233 let coord = nat.coordination.as_ref()?;
1234 let round = coord.round;
1235 if coord.punch_targets.is_empty() {
1236 return None;
1237 }
1238
1239 trace!(
1240 "queuing PUNCH_ME_NOW round {} with {} targets",
1241 round,
1242 coord.punch_targets.len()
1243 );
1244
1245 for target in &coord.punch_targets {
1247 let punch = frame::PunchMeNow {
1248 round,
1249 paired_with_sequence_number: target.remote_sequence,
1250 address: target.remote_addr,
1251 target_peer_id: None,
1252 };
1253 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1254 }
1255
1256 nat.mark_punch_request_sent();
1258
1259 None
1261 }
1262
1263 fn send_coordinated_path_challenge(
1265 &mut self,
1266 now: Instant,
1267 buf: &mut Vec<u8>,
1268 ) -> Option<Transmit> {
1269 if let Some(nat_traversal) = &mut self.nat_traversal {
1271 if nat_traversal.should_start_punching(now) {
1272 nat_traversal.start_punching_phase(now);
1273 }
1274 }
1275
1276 let (target_addr, challenge) = {
1278 let nat_traversal = self.nat_traversal.as_ref()?;
1279 match nat_traversal.get_coordination_phase() {
1280 Some(CoordinationPhase::Punching) => {
1281 let targets = nat_traversal.get_punch_targets_from_coordination()?;
1282 if targets.is_empty() {
1283 return None;
1284 }
1285 let target = &targets[0];
1287 (target.remote_addr, target.challenge)
1288 }
1289 _ => return None,
1290 }
1291 };
1292
1293 debug_assert_eq!(
1294 self.highest_space,
1295 SpaceId::Data,
1296 "PATH_CHALLENGE queued without 1-RTT keys"
1297 );
1298
1299 #[cfg(feature = "pqc")]
1300 buf.reserve(self.pqc_state.min_initial_size() as usize);
1301 #[cfg(not(feature = "pqc"))]
1302 buf.reserve(MIN_INITIAL_SIZE as usize);
1303 let buf_capacity = buf.capacity();
1304
1305 let mut builder = PacketBuilder::new(
1306 now,
1307 SpaceId::Data,
1308 self.rem_cids.active(),
1309 buf,
1310 buf_capacity,
1311 0,
1312 false,
1313 self,
1314 )?;
1315
1316 trace!(
1317 "sending coordinated PATH_CHALLENGE {:08x} to {}",
1318 challenge, target_addr
1319 );
1320 buf.write(frame::FrameType::PATH_CHALLENGE);
1321 buf.write(challenge);
1322 self.stats.frame_tx.path_challenge += 1;
1323
1324 #[cfg(feature = "pqc")]
1325 let min_size = self.pqc_state.min_initial_size();
1326 #[cfg(not(feature = "pqc"))]
1327 let min_size = MIN_INITIAL_SIZE;
1328 builder.pad_to(min_size);
1329 builder.finish_and_track(now, self, None, buf);
1330
1331 if let Some(nat_traversal) = &mut self.nat_traversal {
1333 nat_traversal.mark_coordination_validating();
1334 }
1335
1336 Some(Transmit {
1337 destination: target_addr,
1338 size: buf.len(),
1339 ecn: if self.path.sending_ecn {
1340 Some(EcnCodepoint::Ect0)
1341 } else {
1342 None
1343 },
1344 segment_size: None,
1345 src_ip: self.local_ip,
1346 })
1347 }
1348
1349 fn send_nat_traversal_challenge(
1351 &mut self,
1352 now: Instant,
1353 buf: &mut Vec<u8>,
1354 ) -> Option<Transmit> {
1355 if let Some(request) = self.send_coordination_request(now, buf) {
1357 return Some(request);
1358 }
1359
1360 if let Some(punch) = self.send_coordinated_path_challenge(now, buf) {
1362 return Some(punch);
1363 }
1364
1365 let (remote_addr, remote_sequence) = {
1367 let nat_traversal = self.nat_traversal.as_ref()?;
1368 let candidates = nat_traversal.get_validation_candidates();
1369 if candidates.is_empty() {
1370 return None;
1371 }
1372 let (sequence, candidate) = candidates[0];
1374 (candidate.address, sequence)
1375 };
1376
1377 let challenge = self.rng.r#gen::<u64>();
1378
1379 if let Err(e) =
1381 self.nat_traversal
1382 .as_mut()?
1383 .start_validation(remote_sequence, challenge, now)
1384 {
1385 warn!("Failed to start NAT traversal validation: {}", e);
1386 return None;
1387 }
1388
1389 debug_assert_eq!(
1390 self.highest_space,
1391 SpaceId::Data,
1392 "PATH_CHALLENGE queued without 1-RTT keys"
1393 );
1394
1395 #[cfg(feature = "pqc")]
1396 buf.reserve(self.pqc_state.min_initial_size() as usize);
1397 #[cfg(not(feature = "pqc"))]
1398 buf.reserve(MIN_INITIAL_SIZE as usize);
1399 let buf_capacity = buf.capacity();
1400
1401 let mut builder = PacketBuilder::new(
1403 now,
1404 SpaceId::Data,
1405 self.rem_cids.active(),
1406 buf,
1407 buf_capacity,
1408 0,
1409 false,
1410 self,
1411 )?;
1412
1413 trace!(
1414 "sending PATH_CHALLENGE {:08x} to NAT candidate {}",
1415 challenge, remote_addr
1416 );
1417 buf.write(frame::FrameType::PATH_CHALLENGE);
1418 buf.write(challenge);
1419 self.stats.frame_tx.path_challenge += 1;
1420
1421 #[cfg(feature = "pqc")]
1423 let min_size = self.pqc_state.min_initial_size();
1424 #[cfg(not(feature = "pqc"))]
1425 let min_size = MIN_INITIAL_SIZE;
1426 builder.pad_to(min_size);
1427
1428 builder.finish_and_track(now, self, None, buf);
1429
1430 Some(Transmit {
1431 destination: remote_addr,
1432 size: buf.len(),
1433 ecn: if self.path.sending_ecn {
1434 Some(EcnCodepoint::Ect0)
1435 } else {
1436 None
1437 },
1438 segment_size: None,
1439 src_ip: self.local_ip,
1440 })
1441 }
1442
1443 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1445 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1446 if !prev_path.challenge_pending {
1447 return None;
1448 }
1449 prev_path.challenge_pending = false;
1450 let token = prev_path
1451 .challenge
1452 .expect("previous path challenge pending without token");
1453 let destination = prev_path.remote;
1454 debug_assert_eq!(
1455 self.highest_space,
1456 SpaceId::Data,
1457 "PATH_CHALLENGE queued without 1-RTT keys"
1458 );
1459 #[cfg(feature = "pqc")]
1460 buf.reserve(self.pqc_state.min_initial_size() as usize);
1461 #[cfg(not(feature = "pqc"))]
1462 buf.reserve(MIN_INITIAL_SIZE as usize);
1463
1464 let buf_capacity = buf.capacity();
1465
1466 let mut builder = PacketBuilder::new(
1472 now,
1473 SpaceId::Data,
1474 *prev_cid,
1475 buf,
1476 buf_capacity,
1477 0,
1478 false,
1479 self,
1480 )?;
1481 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1482 buf.write(frame::FrameType::PATH_CHALLENGE);
1483 buf.write(token);
1484 self.stats.frame_tx.path_challenge += 1;
1485
1486 #[cfg(feature = "pqc")]
1491 let min_size = self.pqc_state.min_initial_size();
1492 #[cfg(not(feature = "pqc"))]
1493 let min_size = MIN_INITIAL_SIZE;
1494 builder.pad_to(min_size);
1495
1496 builder.finish(self, buf);
1497 self.stats.udp_tx.on_sent(1, buf.len());
1498
1499 Some(Transmit {
1500 destination,
1501 size: buf.len(),
1502 ecn: None,
1503 segment_size: None,
1504 src_ip: self.local_ip,
1505 })
1506 }
1507
1508 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1510 if self.spaces[space_id].crypto.is_none()
1511 && (space_id != SpaceId::Data
1512 || self.zero_rtt_crypto.is_none()
1513 || self.side.is_server())
1514 {
1515 return SendableFrames::empty();
1517 }
1518 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1519 if space_id == SpaceId::Data {
1520 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1521 }
1522 can_send
1523 }
1524
1525 pub fn handle_event(&mut self, event: ConnectionEvent) {
1531 use ConnectionEventInner::*;
1532 match event.0 {
1533 Datagram(DatagramConnectionEvent {
1534 now,
1535 remote,
1536 ecn,
1537 first_decode,
1538 remaining,
1539 }) => {
1540 if remote != self.path.remote && !self.side.remote_may_migrate() {
1544 trace!("discarding packet from unrecognized peer {}", remote);
1545 return;
1546 }
1547
1548 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1549
1550 self.stats.udp_rx.datagrams += 1;
1551 self.stats.udp_rx.bytes += first_decode.len() as u64;
1552 let data_len = first_decode.len();
1553
1554 self.handle_decode(now, remote, ecn, first_decode);
1555 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1560
1561 if let Some(data) = remaining {
1562 self.stats.udp_rx.bytes += data.len() as u64;
1563 self.handle_coalesced(now, remote, ecn, data);
1564 }
1565
1566 #[cfg(feature = "__qlog")]
1567 self.emit_qlog_recovery_metrics(now);
1568
1569 if was_anti_amplification_blocked {
1570 self.set_loss_detection_timer(now);
1574 }
1575 }
1576 NewIdentifiers(ids, now) => {
1577 self.local_cid_state.new_cids(&ids, now);
1578 ids.into_iter().rev().for_each(|frame| {
1579 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1580 });
1581 if self.timers.get(Timer::PushNewCid).is_none_or(|x| x <= now) {
1583 self.reset_cid_retirement();
1584 }
1585 }
1586 QueueAddAddress(add) => {
1587 self.spaces[SpaceId::Data].pending.add_addresses.push(add);
1589 }
1590 QueuePunchMeNow(punch) => {
1591 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1593 }
1594 }
1595 }
1596
1597 pub fn handle_timeout(&mut self, now: Instant) {
1607 for &timer in &Timer::VALUES {
1608 if !self.timers.is_expired(timer, now) {
1609 continue;
1610 }
1611 self.timers.stop(timer);
1612 trace!(timer = ?timer, "timeout");
1613 match timer {
1614 Timer::Close => {
1615 self.state = State::Drained;
1616 self.endpoint_events.push_back(EndpointEventInner::Drained);
1617 }
1618 Timer::Idle => {
1619 self.kill(ConnectionError::TimedOut);
1620 }
1621 Timer::KeepAlive => {
1622 trace!("sending keep-alive");
1623 self.ping();
1624 }
1625 Timer::LossDetection => {
1626 self.on_loss_detection_timeout(now);
1627
1628 #[cfg(feature = "__qlog")]
1629 self.emit_qlog_recovery_metrics(now);
1630 }
1631 Timer::KeyDiscard => {
1632 self.zero_rtt_crypto = None;
1633 self.prev_crypto = None;
1634 }
1635 Timer::PathValidation => {
1636 debug!("path validation failed");
1637 if let Some((_, prev)) = self.prev_path.take() {
1638 self.path = prev;
1639 }
1640 self.path.challenge = None;
1641 self.path.challenge_pending = false;
1642 }
1643 Timer::Pacing => trace!("pacing timer expired"),
1644 Timer::NatTraversal => {
1645 self.handle_nat_traversal_timeout(now);
1646 }
1647 Timer::PushNewCid => {
1648 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1650 if !self.state.is_closed() {
1651 trace!(
1652 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1653 self.local_cid_state.retire_prior_to()
1654 );
1655 self.endpoint_events
1656 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1657 }
1658 }
1659 Timer::MaxAckDelay => {
1660 trace!("max ack delay reached");
1661 self.spaces[SpaceId::Data]
1663 .pending_acks
1664 .on_max_ack_delay_timeout()
1665 }
1666 }
1667 }
1668 }
1669
1670 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1682 self.close_inner(
1683 now,
1684 Close::Application(frame::ApplicationClose { error_code, reason }),
1685 )
1686 }
1687
1688 fn close_inner(&mut self, now: Instant, reason: Close) {
1689 let was_closed = self.state.is_closed();
1690 if !was_closed {
1691 self.close_common();
1692 self.set_close_timer(now);
1693 self.close = true;
1694 self.state = State::Closed(state::Closed { reason });
1695 }
1696 }
1697
1698 pub fn datagrams(&mut self) -> Datagrams<'_> {
1700 Datagrams { conn: self }
1701 }
1702
1703 pub fn stats(&self) -> ConnectionStats {
1705 let mut stats = self.stats;
1706 stats.path.rtt = self.path.rtt.get();
1707 stats.path.cwnd = self.path.congestion.window();
1708 stats.path.current_mtu = self.path.mtud.current_mtu();
1709
1710 stats
1711 }
1712
1713 pub fn set_token_binding_peer_id(&mut self, pid: PeerId) {
1715 self.peer_id_for_tokens = Some(pid);
1716 }
1717
1718 pub fn set_delay_new_token_until_binding(&mut self, v: bool) {
1720 self.delay_new_token_until_binding = v;
1721 }
1722
1723 pub(crate) fn set_expected_peer_id_from_token(&mut self, expected: Option<PeerId>) {
1725 self.expected_peer_id_from_token = expected;
1726 }
1727
1728 pub fn bound_peer_id(&self) -> Option<PeerId> {
1730 self.peer_id_for_tokens
1731 }
1732
1733 pub fn expected_peer_id(&self) -> Option<PeerId> {
1735 self.expected_peer_id_from_token
1736 }
1737
1738 pub fn ping(&mut self) {
1742 self.spaces[self.highest_space].ping_pending = true;
1743 }
1744
1745 pub(crate) fn is_pqc(&self) -> bool {
1747 self.pqc_state.using_pqc
1748 }
1749
1750 pub fn force_key_update(&mut self) {
1754 if !self.state.is_established() {
1755 debug!("ignoring forced key update in illegal state");
1756 return;
1757 }
1758 if self.prev_crypto.is_some() {
1759 debug!("ignoring redundant forced key update");
1762 return;
1763 }
1764 self.update_keys(None, false);
1765 }
1766
1767 #[doc(hidden)]
1769 #[deprecated]
1770 pub fn initiate_key_update(&mut self) {
1771 self.force_key_update();
1772 }
1773
1774 pub fn crypto_session(&self) -> &dyn crypto::Session {
1776 &*self.crypto
1777 }
1778
1779 pub fn is_handshaking(&self) -> bool {
1784 self.state.is_handshake()
1785 }
1786
1787 pub fn is_closed(&self) -> bool {
1795 self.state.is_closed()
1796 }
1797
1798 pub fn is_drained(&self) -> bool {
1803 self.state.is_drained()
1804 }
1805
1806 pub fn accepted_0rtt(&self) -> bool {
1810 self.accepted_0rtt
1811 }
1812
1813 pub fn has_0rtt(&self) -> bool {
1815 self.zero_rtt_enabled
1816 }
1817
1818 pub fn has_pending_retransmits(&self) -> bool {
1820 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1821 }
1822
1823 pub fn side(&self) -> Side {
1825 self.side.side()
1826 }
1827
1828 pub fn remote_address(&self) -> SocketAddr {
1830 self.path.remote
1831 }
1832
1833 pub fn local_ip(&self) -> Option<IpAddr> {
1843 self.local_ip
1844 }
1845
1846 pub fn rtt(&self) -> Duration {
1848 self.path.rtt.get()
1849 }
1850
1851 pub fn congestion_state(&self) -> &dyn Controller {
1853 self.path.congestion.as_ref()
1854 }
1855
1856 pub fn path_changed(&mut self, now: Instant) {
1867 self.path.reset(now, &self.config);
1868 }
1869
1870 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1875 self.streams.set_max_concurrent(dir, count);
1876 let pending = &mut self.spaces[SpaceId::Data].pending;
1879 self.streams.queue_max_stream_id(pending);
1880 }
1881
1882 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1888 self.streams.max_concurrent(dir)
1889 }
1890
1891 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1893 if self.streams.set_receive_window(receive_window) {
1894 self.spaces[SpaceId::Data].pending.max_data = true;
1895 }
1896 }
1897
1898 pub fn set_address_discovery_enabled(&mut self, enabled: bool) {
1900 if let Some(ref mut state) = self.address_discovery_state {
1901 state.enabled = enabled;
1902 }
1903 }
1904
1905 pub fn address_discovery_enabled(&self) -> bool {
1907 self.address_discovery_state
1908 .as_ref()
1909 .is_some_and(|state| state.enabled)
1910 }
1911
1912 pub fn observed_address(&self) -> Option<SocketAddr> {
1917 self.address_discovery_state
1918 .as_ref()
1919 .and_then(|state| state.get_observed_address(0)) }
1921
1922 #[allow(dead_code)]
1924 pub(crate) fn address_discovery_state(&self) -> Option<&AddressDiscoveryState> {
1925 self.address_discovery_state.as_ref()
1926 }
1927
1928 fn on_ack_received(
1929 &mut self,
1930 now: Instant,
1931 space: SpaceId,
1932 ack: frame::Ack,
1933 ) -> Result<(), TransportError> {
1934 if ack.largest >= self.spaces[space].next_packet_number {
1935 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1936 }
1937 let new_largest = {
1938 let space = &mut self.spaces[space];
1939 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
1940 space.largest_acked_packet = Some(ack.largest);
1941 if let Some(info) = space.sent_packets.get(&ack.largest) {
1942 space.largest_acked_packet_sent = info.time_sent;
1946 }
1947 true
1948 } else {
1949 false
1950 }
1951 };
1952
1953 let mut newly_acked = ArrayRangeSet::new();
1955 for range in ack.iter() {
1956 self.packet_number_filter.check_ack(space, range.clone())?;
1957 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1958 newly_acked.insert_one(pn);
1959 }
1960 }
1961
1962 if newly_acked.is_empty() {
1963 return Ok(());
1964 }
1965
1966 let mut ack_eliciting_acked = false;
1967 for packet in newly_acked.elts() {
1968 if let Some(info) = self.spaces[space].take(packet) {
1969 if let Some(acked) = info.largest_acked {
1970 self.spaces[space].pending_acks.subtract_below(acked);
1976 }
1977 ack_eliciting_acked |= info.ack_eliciting;
1978
1979 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1981 if mtu_updated {
1982 self.path
1983 .congestion
1984 .on_mtu_update(self.path.mtud.current_mtu());
1985 }
1986
1987 self.ack_frequency.on_acked(packet);
1989
1990 self.on_packet_acked(now, packet, info);
1991 }
1992 }
1993
1994 self.path.congestion.on_end_acks(
1995 now,
1996 self.path.in_flight.bytes,
1997 self.app_limited,
1998 self.spaces[space].largest_acked_packet,
1999 );
2000
2001 if new_largest && ack_eliciting_acked {
2002 let ack_delay = if space != SpaceId::Data {
2003 Duration::from_micros(0)
2004 } else {
2005 cmp::min(
2006 self.ack_frequency.peer_max_ack_delay,
2007 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2008 )
2009 };
2010 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
2011 self.path.rtt.update(ack_delay, rtt);
2012 if self.path.first_packet_after_rtt_sample.is_none() {
2013 self.path.first_packet_after_rtt_sample =
2014 Some((space, self.spaces[space].next_packet_number));
2015 }
2016 }
2017
2018 self.detect_lost_packets(now, space, true);
2020
2021 if self.peer_completed_address_validation() {
2022 self.pto_count = 0;
2023 }
2024
2025 if self.path.sending_ecn {
2027 if let Some(ecn) = ack.ecn {
2028 if new_largest {
2033 let sent = self.spaces[space].largest_acked_packet_sent;
2034 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
2035 }
2036 } else {
2037 debug!("ECN not acknowledged by peer");
2039 self.path.sending_ecn = false;
2040 }
2041 }
2042
2043 self.set_loss_detection_timer(now);
2044 Ok(())
2045 }
2046
2047 fn process_ecn(
2049 &mut self,
2050 now: Instant,
2051 space: SpaceId,
2052 newly_acked: u64,
2053 ecn: frame::EcnCounts,
2054 largest_sent_time: Instant,
2055 ) {
2056 match self.spaces[space].detect_ecn(newly_acked, ecn) {
2057 Err(e) => {
2058 debug!("halting ECN due to verification failure: {}", e);
2059 self.path.sending_ecn = false;
2060 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
2063 }
2064 Ok(false) => {}
2065 Ok(true) => {
2066 self.stats.path.congestion_events += 1;
2067 self.path
2068 .congestion
2069 .on_congestion_event(now, largest_sent_time, false, 0);
2070 }
2071 }
2072 }
2073
2074 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
2077 self.remove_in_flight(pn, &info);
2078 if info.ack_eliciting && self.path.challenge.is_none() {
2079 self.path.congestion.on_ack(
2082 now,
2083 info.time_sent,
2084 info.size.into(),
2085 self.app_limited,
2086 &self.path.rtt,
2087 );
2088 }
2089
2090 if let Some(retransmits) = info.retransmits.get() {
2092 for (id, _) in retransmits.reset_stream.iter() {
2093 self.streams.reset_acked(*id);
2094 }
2095 }
2096
2097 for frame in info.stream_frames {
2098 self.streams.received_ack_of(frame);
2099 }
2100 }
2101
2102 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2103 let start = if self.zero_rtt_crypto.is_some() {
2104 now
2105 } else {
2106 self.prev_crypto
2107 .as_ref()
2108 .expect("no previous keys")
2109 .end_packet
2110 .as_ref()
2111 .expect("update not acknowledged yet")
2112 .1
2113 };
2114 self.timers
2115 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
2116 }
2117
2118 fn on_loss_detection_timeout(&mut self, now: Instant) {
2119 if let Some((_, pn_space)) = self.loss_time_and_space() {
2120 self.detect_lost_packets(now, pn_space, false);
2122 self.set_loss_detection_timer(now);
2123 return;
2124 }
2125
2126 let (_, space) = match self.pto_time_and_space(now) {
2127 Some(x) => x,
2128 None => {
2129 error!("PTO expired while unset");
2130 return;
2131 }
2132 };
2133 trace!(
2134 in_flight = self.path.in_flight.bytes,
2135 count = self.pto_count,
2136 ?space,
2137 "PTO fired"
2138 );
2139
2140 let count = match self.path.in_flight.ack_eliciting {
2141 0 => {
2144 debug_assert!(!self.peer_completed_address_validation());
2145 1
2146 }
2147 _ => 2,
2149 };
2150 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
2151 self.pto_count = self.pto_count.saturating_add(1);
2152 self.set_loss_detection_timer(now);
2153 }
2154
2155 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
2156 let mut lost_packets = Vec::<u64>::new();
2157 let mut lost_mtu_probe = None;
2158 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
2159 let rtt = self.path.rtt.conservative();
2160 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
2161
2162 let lost_send_time = now.checked_sub(loss_delay).unwrap();
2164 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
2165 let packet_threshold = self.config.packet_threshold as u64;
2166 let mut size_of_lost_packets = 0u64;
2167
2168 let congestion_period =
2172 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
2173 let mut persistent_congestion_start: Option<Instant> = None;
2174 let mut prev_packet = None;
2175 let mut in_persistent_congestion = false;
2176
2177 let space = &mut self.spaces[pn_space];
2178 space.loss_time = None;
2179
2180 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
2181 if prev_packet != Some(packet.wrapping_sub(1)) {
2182 persistent_congestion_start = None;
2184 }
2185
2186 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
2187 {
2188 if Some(packet) == in_flight_mtu_probe {
2189 lost_mtu_probe = in_flight_mtu_probe;
2192 } else {
2193 lost_packets.push(packet);
2194 size_of_lost_packets += info.size as u64;
2195 if info.ack_eliciting && due_to_ack {
2196 match persistent_congestion_start {
2197 Some(start) if info.time_sent - start > congestion_period => {
2200 in_persistent_congestion = true;
2201 }
2202 None if self
2204 .path
2205 .first_packet_after_rtt_sample
2206 .is_some_and(|x| x < (pn_space, packet)) =>
2207 {
2208 persistent_congestion_start = Some(info.time_sent);
2209 }
2210 _ => {}
2211 }
2212 }
2213 }
2214 } else {
2215 let next_loss_time = info.time_sent + loss_delay;
2216 space.loss_time = Some(
2217 space
2218 .loss_time
2219 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
2220 );
2221 persistent_congestion_start = None;
2222 }
2223
2224 prev_packet = Some(packet);
2225 }
2226
2227 if let Some(largest_lost) = lost_packets.last().cloned() {
2229 let old_bytes_in_flight = self.path.in_flight.bytes;
2230 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
2231 self.lost_packets += lost_packets.len() as u64;
2232 self.stats.path.lost_packets += lost_packets.len() as u64;
2233 self.stats.path.lost_bytes += size_of_lost_packets;
2234 trace!(
2235 "packets lost: {:?}, bytes lost: {}",
2236 lost_packets, size_of_lost_packets
2237 );
2238
2239 for &packet in &lost_packets {
2240 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2242 for frame in info.stream_frames {
2243 self.streams.retransmit(frame);
2244 }
2245 self.spaces[pn_space].pending |= info.retransmits;
2246 self.path.mtud.on_non_probe_lost(packet, info.size);
2247 }
2248
2249 if self.path.mtud.black_hole_detected(now) {
2250 self.stats.path.black_holes_detected += 1;
2251 self.path
2252 .congestion
2253 .on_mtu_update(self.path.mtud.current_mtu());
2254 if let Some(max_datagram_size) = self.datagrams().max_size() {
2255 self.datagrams.drop_oversized(max_datagram_size);
2256 }
2257 }
2258
2259 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
2261
2262 if lost_ack_eliciting {
2263 self.stats.path.congestion_events += 1;
2264 self.path.congestion.on_congestion_event(
2265 now,
2266 largest_lost_sent,
2267 in_persistent_congestion,
2268 size_of_lost_packets,
2269 );
2270 }
2271 }
2272
2273 if let Some(packet) = lost_mtu_probe {
2275 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2277 self.path.mtud.on_probe_lost();
2278 self.stats.path.lost_plpmtud_probes += 1;
2279 }
2280 }
2281
2282 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
2283 SpaceId::iter()
2284 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
2285 .min_by_key(|&(time, _)| time)
2286 }
2287
2288 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
2289 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
2290 let mut duration = self.path.rtt.pto_base() * backoff;
2291
2292 if self.path.in_flight.ack_eliciting == 0 {
2293 debug_assert!(!self.peer_completed_address_validation());
2294 let space = match self.highest_space {
2295 SpaceId::Handshake => SpaceId::Handshake,
2296 _ => SpaceId::Initial,
2297 };
2298 return Some((now + duration, space));
2299 }
2300
2301 let mut result = None;
2302 for space in SpaceId::iter() {
2303 if self.spaces[space].in_flight == 0 {
2304 continue;
2305 }
2306 if space == SpaceId::Data {
2307 if self.is_handshaking() {
2309 return result;
2310 }
2311 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2313 }
2314 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
2315 Some(time) => time,
2316 None => continue,
2317 };
2318 let pto = last_ack_eliciting + duration;
2319 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2320 result = Some((pto, space));
2321 }
2322 }
2323 result
2324 }
2325
2326 fn peer_completed_address_validation(&self) -> bool {
2327 if self.side.is_server() || self.state.is_closed() {
2328 return true;
2329 }
2330 self.spaces[SpaceId::Handshake]
2333 .largest_acked_packet
2334 .is_some()
2335 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
2336 || (self.spaces[SpaceId::Data].crypto.is_some()
2337 && self.spaces[SpaceId::Handshake].crypto.is_none())
2338 }
2339
2340 fn set_loss_detection_timer(&mut self, now: Instant) {
2341 if self.state.is_closed() {
2342 return;
2346 }
2347
2348 if let Some((loss_time, _)) = self.loss_time_and_space() {
2349 self.timers.set(Timer::LossDetection, loss_time);
2351 return;
2352 }
2353
2354 if self.path.anti_amplification_blocked(1) {
2355 self.timers.stop(Timer::LossDetection);
2357 return;
2358 }
2359
2360 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
2361 self.timers.stop(Timer::LossDetection);
2364 return;
2365 }
2366
2367 if let Some((timeout, _)) = self.pto_time_and_space(now) {
2370 self.timers.set(Timer::LossDetection, timeout);
2371 } else {
2372 self.timers.stop(Timer::LossDetection);
2373 }
2374 }
2375
2376 fn pto(&self, space: SpaceId) -> Duration {
2378 let max_ack_delay = match space {
2379 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
2380 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
2381 };
2382 self.path.rtt.pto_base() + max_ack_delay
2383 }
2384
2385 fn on_packet_authenticated(
2386 &mut self,
2387 now: Instant,
2388 space_id: SpaceId,
2389 ecn: Option<EcnCodepoint>,
2390 packet: Option<u64>,
2391 spin: bool,
2392 is_1rtt: bool,
2393 ) {
2394 self.total_authed_packets += 1;
2395 self.reset_keep_alive(now);
2396 self.reset_idle_timeout(now, space_id);
2397 self.permit_idle_reset = true;
2398 self.receiving_ecn |= ecn.is_some();
2399 if let Some(x) = ecn {
2400 let space = &mut self.spaces[space_id];
2401 space.ecn_counters += x;
2402
2403 if x.is_ce() {
2404 space.pending_acks.set_immediate_ack_required();
2405 }
2406 }
2407
2408 let packet = match packet {
2409 Some(x) => x,
2410 None => return,
2411 };
2412 if self.side.is_server() {
2413 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
2414 self.discard_space(now, SpaceId::Initial);
2416 }
2417 if self.zero_rtt_crypto.is_some() && is_1rtt {
2418 self.set_key_discard_timer(now, space_id)
2420 }
2421 }
2422 let space = &mut self.spaces[space_id];
2423 space.pending_acks.insert_one(packet, now);
2424 if packet >= space.rx_packet {
2425 space.rx_packet = packet;
2426 self.spin = self.side.is_client() ^ spin;
2428 }
2429 }
2430
2431 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
2432 let timeout = match self.idle_timeout {
2433 None => return,
2434 Some(dur) => dur,
2435 };
2436 if self.state.is_closed() {
2437 self.timers.stop(Timer::Idle);
2438 return;
2439 }
2440 let dt = cmp::max(timeout, 3 * self.pto(space));
2441 self.timers.set(Timer::Idle, now + dt);
2442 }
2443
2444 fn reset_keep_alive(&mut self, now: Instant) {
2445 let interval = match self.config.keep_alive_interval {
2446 Some(x) if self.state.is_established() => x,
2447 _ => return,
2448 };
2449 self.timers.set(Timer::KeepAlive, now + interval);
2450 }
2451
2452 fn reset_cid_retirement(&mut self) {
2453 if let Some(t) = self.local_cid_state.next_timeout() {
2454 self.timers.set(Timer::PushNewCid, t);
2455 }
2456 }
2457
2458 pub(crate) fn handle_first_packet(
2463 &mut self,
2464 now: Instant,
2465 remote: SocketAddr,
2466 ecn: Option<EcnCodepoint>,
2467 packet_number: u64,
2468 packet: InitialPacket,
2469 remaining: Option<BytesMut>,
2470 ) -> Result<(), ConnectionError> {
2471 let span = trace_span!("first recv");
2472 let _guard = span.enter();
2473 debug_assert!(self.side.is_server());
2474 let len = packet.header_data.len() + packet.payload.len();
2475 self.path.total_recvd = len as u64;
2476
2477 match self.state {
2478 State::Handshake(ref mut state) => {
2479 state.expected_token = packet.header.token.clone();
2480 }
2481 _ => unreachable!("first packet must be delivered in Handshake state"),
2482 }
2483
2484 self.on_packet_authenticated(
2485 now,
2486 SpaceId::Initial,
2487 ecn,
2488 Some(packet_number),
2489 false,
2490 false,
2491 );
2492
2493 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2494 if let Some(data) = remaining {
2495 self.handle_coalesced(now, remote, ecn, data);
2496 }
2497
2498 #[cfg(feature = "__qlog")]
2499 self.emit_qlog_recovery_metrics(now);
2500
2501 Ok(())
2502 }
2503
2504 fn init_0rtt(&mut self) {
2505 let (header, packet) = match self.crypto.early_crypto() {
2506 Some(x) => x,
2507 None => return,
2508 };
2509 if self.side.is_client() {
2510 match self.crypto.transport_parameters() {
2511 Ok(params) => {
2512 let params = params
2513 .expect("crypto layer didn't supply transport parameters with ticket");
2514 let params = TransportParameters {
2516 initial_src_cid: None,
2517 original_dst_cid: None,
2518 preferred_address: None,
2519 retry_src_cid: None,
2520 stateless_reset_token: None,
2521 min_ack_delay: None,
2522 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2523 max_ack_delay: TransportParameters::default().max_ack_delay,
2524 ..params
2525 };
2526 self.set_peer_params(params);
2527 }
2528 Err(e) => {
2529 error!("session ticket has malformed transport parameters: {}", e);
2530 return;
2531 }
2532 }
2533 }
2534 trace!("0-RTT enabled");
2535 self.zero_rtt_enabled = true;
2536 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2537 }
2538
2539 fn read_crypto(
2540 &mut self,
2541 space: SpaceId,
2542 crypto: &frame::Crypto,
2543 payload_len: usize,
2544 ) -> Result<(), TransportError> {
2545 let expected = if !self.state.is_handshake() {
2546 SpaceId::Data
2547 } else if self.highest_space == SpaceId::Initial {
2548 SpaceId::Initial
2549 } else {
2550 SpaceId::Handshake
2553 };
2554 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2558
2559 let end = crypto.offset + crypto.data.len() as u64;
2560 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2561 warn!(
2562 "received new {:?} CRYPTO data when expecting {:?}",
2563 space, expected
2564 );
2565 return Err(TransportError::PROTOCOL_VIOLATION(
2566 "new data at unexpected encryption level",
2567 ));
2568 }
2569
2570 #[cfg(feature = "pqc")]
2572 {
2573 self.pqc_state.detect_pqc_from_crypto(&crypto.data, space);
2574
2575 if self.pqc_state.should_trigger_mtu_discovery() {
2577 self.path
2579 .mtud
2580 .reset(self.pqc_state.min_initial_size(), self.config.min_mtu);
2581 trace!("Triggered MTU discovery for PQC handshake");
2582 }
2583 }
2584
2585 let space = &mut self.spaces[space];
2586 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2587 if max > self.config.crypto_buffer_size as u64 {
2588 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2589 }
2590
2591 space
2592 .crypto_stream
2593 .insert(crypto.offset, crypto.data.clone(), payload_len);
2594 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2595 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2596 if self.crypto.read_handshake(&chunk.bytes)? {
2597 self.events.push_back(Event::HandshakeDataReady);
2598 }
2599 }
2600
2601 Ok(())
2602 }
2603
2604 fn write_crypto(&mut self) {
2605 loop {
2606 let space = self.highest_space;
2607 let mut outgoing = Vec::new();
2608 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2609 match space {
2610 SpaceId::Initial => {
2611 self.upgrade_crypto(SpaceId::Handshake, crypto);
2612 }
2613 SpaceId::Handshake => {
2614 self.upgrade_crypto(SpaceId::Data, crypto);
2615 }
2616 _ => unreachable!("got updated secrets during 1-RTT"),
2617 }
2618 }
2619 if outgoing.is_empty() {
2620 if space == self.highest_space {
2621 break;
2622 } else {
2623 continue;
2625 }
2626 }
2627 let offset = self.spaces[space].crypto_offset;
2628 let outgoing = Bytes::from(outgoing);
2629 if let State::Handshake(ref mut state) = self.state {
2630 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2631 state.client_hello = Some(outgoing.clone());
2632 }
2633 }
2634 self.spaces[space].crypto_offset += outgoing.len() as u64;
2635 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2636
2637 #[cfg(feature = "pqc")]
2639 let use_pqc_fragmentation = self.pqc_state.using_pqc && outgoing.len() > 1200;
2640 #[cfg(not(feature = "pqc"))]
2641 let use_pqc_fragmentation = false;
2642
2643 if use_pqc_fragmentation {
2644 #[cfg(feature = "pqc")]
2646 {
2647 let frames = self.pqc_state.packet_handler.fragment_crypto_data(
2648 &outgoing,
2649 offset,
2650 self.pqc_state.min_initial_size() as usize,
2651 );
2652 for frame in frames {
2653 self.spaces[space].pending.crypto.push_back(frame);
2654 }
2655 }
2656 } else {
2657 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2659 offset,
2660 data: outgoing,
2661 });
2662 }
2663 }
2664 }
2665
2666 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2668 debug_assert!(
2669 self.spaces[space].crypto.is_none(),
2670 "already reached packet space {space:?}"
2671 );
2672 trace!("{:?} keys ready", space);
2673 if space == SpaceId::Data {
2674 self.next_crypto = Some(
2676 self.crypto
2677 .next_1rtt_keys()
2678 .expect("handshake should be complete"),
2679 );
2680 }
2681
2682 self.spaces[space].crypto = Some(crypto);
2683 debug_assert!(space as usize > self.highest_space as usize);
2684 self.highest_space = space;
2685 if space == SpaceId::Data && self.side.is_client() {
2686 self.zero_rtt_crypto = None;
2688 }
2689 }
2690
2691 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2692 debug_assert!(space_id != SpaceId::Data);
2693 trace!("discarding {:?} keys", space_id);
2694 if space_id == SpaceId::Initial {
2695 if let ConnectionSide::Client { token, .. } = &mut self.side {
2697 *token = Bytes::new();
2698 }
2699 }
2700 let space = &mut self.spaces[space_id];
2701 space.crypto = None;
2702 space.time_of_last_ack_eliciting_packet = None;
2703 space.loss_time = None;
2704 space.in_flight = 0;
2705 let sent_packets = mem::take(&mut space.sent_packets);
2706 for (pn, packet) in sent_packets.into_iter() {
2707 self.remove_in_flight(pn, &packet);
2708 }
2709 self.set_loss_detection_timer(now)
2710 }
2711
2712 fn handle_coalesced(
2713 &mut self,
2714 now: Instant,
2715 remote: SocketAddr,
2716 ecn: Option<EcnCodepoint>,
2717 data: BytesMut,
2718 ) {
2719 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2720 let mut remaining = Some(data);
2721 while let Some(data) = remaining {
2722 match PartialDecode::new(
2723 data,
2724 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2725 &[self.version],
2726 self.endpoint_config.grease_quic_bit,
2727 ) {
2728 Ok((partial_decode, rest)) => {
2729 remaining = rest;
2730 self.handle_decode(now, remote, ecn, partial_decode);
2731 }
2732 Err(e) => {
2733 trace!("malformed header: {}", e);
2734 return;
2735 }
2736 }
2737 }
2738 }
2739
2740 fn handle_decode(
2741 &mut self,
2742 now: Instant,
2743 remote: SocketAddr,
2744 ecn: Option<EcnCodepoint>,
2745 partial_decode: PartialDecode,
2746 ) {
2747 if let Some(decoded) = packet_crypto::unprotect_header(
2748 partial_decode,
2749 &self.spaces,
2750 self.zero_rtt_crypto.as_ref(),
2751 self.peer_params.stateless_reset_token,
2752 ) {
2753 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2754 }
2755 }
2756
2757 fn handle_packet(
2758 &mut self,
2759 now: Instant,
2760 remote: SocketAddr,
2761 ecn: Option<EcnCodepoint>,
2762 packet: Option<Packet>,
2763 stateless_reset: bool,
2764 ) {
2765 self.stats.udp_rx.ios += 1;
2766 if let Some(ref packet) = packet {
2767 trace!(
2768 "got {:?} packet ({} bytes) from {} using id {}",
2769 packet.header.space(),
2770 packet.payload.len() + packet.header_data.len(),
2771 remote,
2772 packet.header.dst_cid(),
2773 );
2774
2775 #[cfg(feature = "trace")]
2777 {
2778 use crate::trace_packet_received;
2779 let packet_size = packet.payload.len() + packet.header_data.len();
2781 trace_packet_received!(
2782 &self.event_log,
2783 self.trace_context.trace_id(),
2784 packet_size as u32,
2785 0 );
2787 }
2788 }
2789
2790 if self.is_handshaking() && remote != self.path.remote {
2791 debug!("discarding packet with unexpected remote during handshake");
2792 return;
2793 }
2794
2795 let was_closed = self.state.is_closed();
2796 let was_drained = self.state.is_drained();
2797
2798 let decrypted = match packet {
2799 None => Err(None),
2800 Some(mut packet) => self
2801 .decrypt_packet(now, &mut packet)
2802 .map(move |number| (packet, number)),
2803 };
2804 let result = match decrypted {
2805 _ if stateless_reset => {
2806 debug!("got stateless reset");
2807 Err(ConnectionError::Reset)
2808 }
2809 Err(Some(e)) => {
2810 warn!("illegal packet: {}", e);
2811 Err(e.into())
2812 }
2813 Err(None) => {
2814 debug!("failed to authenticate packet");
2815 self.authentication_failures += 1;
2816 let integrity_limit = self.spaces[self.highest_space]
2817 .crypto
2818 .as_ref()
2819 .unwrap()
2820 .packet
2821 .local
2822 .integrity_limit();
2823 if self.authentication_failures > integrity_limit {
2824 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2825 } else {
2826 return;
2827 }
2828 }
2829 Ok((packet, number)) => {
2830 let span = match number {
2831 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2832 None => trace_span!("recv", space = ?packet.header.space()),
2833 };
2834 let _guard = span.enter();
2835
2836 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2837 if number.is_some_and(is_duplicate) {
2838 debug!("discarding possible duplicate packet");
2839 return;
2840 } else if self.state.is_handshake() && packet.header.is_short() {
2841 trace!("dropping short packet during handshake");
2843 return;
2844 } else {
2845 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2846 if let State::Handshake(ref hs) = self.state {
2847 if self.side.is_server() && token != &hs.expected_token {
2848 warn!("discarding Initial with invalid retry token");
2852 return;
2853 }
2854 }
2855 }
2856
2857 if !self.state.is_closed() {
2858 let spin = match packet.header {
2859 Header::Short { spin, .. } => spin,
2860 _ => false,
2861 };
2862 self.on_packet_authenticated(
2863 now,
2864 packet.header.space(),
2865 ecn,
2866 number,
2867 spin,
2868 packet.header.is_1rtt(),
2869 );
2870 }
2871
2872 self.process_decrypted_packet(now, remote, number, packet)
2873 }
2874 }
2875 };
2876
2877 if let Err(conn_err) = result {
2879 self.error = Some(conn_err.clone());
2880 self.state = match conn_err {
2881 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2882 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2883 ConnectionError::Reset
2884 | ConnectionError::TransportError(TransportError {
2885 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2886 ..
2887 }) => State::Drained,
2888 ConnectionError::TimedOut => {
2889 unreachable!("timeouts aren't generated by packet processing");
2890 }
2891 ConnectionError::TransportError(err) => {
2892 debug!("closing connection due to transport error: {}", err);
2893 State::closed(err)
2894 }
2895 ConnectionError::VersionMismatch => State::Draining,
2896 ConnectionError::LocallyClosed => {
2897 unreachable!("LocallyClosed isn't generated by packet processing");
2898 }
2899 ConnectionError::CidsExhausted => {
2900 unreachable!("CidsExhausted isn't generated by packet processing");
2901 }
2902 };
2903 }
2904
2905 if !was_closed && self.state.is_closed() {
2906 self.close_common();
2907 if !self.state.is_drained() {
2908 self.set_close_timer(now);
2909 }
2910 }
2911 if !was_drained && self.state.is_drained() {
2912 self.endpoint_events.push_back(EndpointEventInner::Drained);
2913 self.timers.stop(Timer::Close);
2916 }
2917
2918 if let State::Closed(_) = self.state {
2920 self.close = remote == self.path.remote;
2921 }
2922 }
2923
2924 fn process_decrypted_packet(
2925 &mut self,
2926 now: Instant,
2927 remote: SocketAddr,
2928 number: Option<u64>,
2929 packet: Packet,
2930 ) -> Result<(), ConnectionError> {
2931 let state = match self.state {
2932 State::Established => {
2933 match packet.header.space() {
2934 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2935 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2936 _ => {
2937 trace!("discarding unexpected pre-handshake packet");
2938 }
2939 }
2940 return Ok(());
2941 }
2942 State::Closed(_) => {
2943 for result in frame::Iter::new(packet.payload.freeze())? {
2944 let frame = match result {
2945 Ok(frame) => frame,
2946 Err(err) => {
2947 debug!("frame decoding error: {err:?}");
2948 continue;
2949 }
2950 };
2951
2952 if let Frame::Padding = frame {
2953 continue;
2954 };
2955
2956 self.stats.frame_rx.record(&frame);
2957
2958 if let Frame::Close(_) = frame {
2959 trace!("draining");
2960 self.state = State::Draining;
2961 break;
2962 }
2963 }
2964 return Ok(());
2965 }
2966 State::Draining | State::Drained => return Ok(()),
2967 State::Handshake(ref mut state) => state,
2968 };
2969
2970 match packet.header {
2971 Header::Retry {
2972 src_cid: rem_cid, ..
2973 } => {
2974 if self.side.is_server() {
2975 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2976 }
2977
2978 if self.total_authed_packets > 1
2979 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2981 &self.rem_cids.active(),
2982 &packet.header_data,
2983 &packet.payload,
2984 )
2985 {
2986 trace!("discarding invalid Retry");
2987 return Ok(());
2995 }
2996
2997 trace!("retrying with CID {}", rem_cid);
2998 let client_hello = state.client_hello.take().unwrap();
2999 self.retry_src_cid = Some(rem_cid);
3000 self.rem_cids.update_initial_cid(rem_cid);
3001 self.rem_handshake_cid = rem_cid;
3002
3003 let space = &mut self.spaces[SpaceId::Initial];
3004 if let Some(info) = space.take(0) {
3005 self.on_packet_acked(now, 0, info);
3006 };
3007
3008 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
3010 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
3011 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
3012 crypto_offset: client_hello.len() as u64,
3013 ..PacketSpace::new(now)
3014 };
3015 self.spaces[SpaceId::Initial]
3016 .pending
3017 .crypto
3018 .push_back(frame::Crypto {
3019 offset: 0,
3020 data: client_hello,
3021 });
3022
3023 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
3025 for (pn, info) in zero_rtt {
3026 self.remove_in_flight(pn, &info);
3027 self.spaces[SpaceId::Data].pending |= info.retransmits;
3028 }
3029 self.streams.retransmit_all_for_0rtt();
3030
3031 let token_len = packet.payload.len() - 16;
3032 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3033 unreachable!("we already short-circuited if we're server");
3034 };
3035 *token = packet.payload.freeze().split_to(token_len);
3036 self.state = State::Handshake(state::Handshake {
3037 expected_token: Bytes::new(),
3038 rem_cid_set: false,
3039 client_hello: None,
3040 });
3041 Ok(())
3042 }
3043 Header::Long {
3044 ty: LongType::Handshake,
3045 src_cid: rem_cid,
3046 ..
3047 } => {
3048 if rem_cid != self.rem_handshake_cid {
3049 debug!(
3050 "discarding packet with mismatched remote CID: {} != {}",
3051 self.rem_handshake_cid, rem_cid
3052 );
3053 return Ok(());
3054 }
3055 self.on_path_validated();
3056
3057 self.process_early_payload(now, packet)?;
3058 if self.state.is_closed() {
3059 return Ok(());
3060 }
3061
3062 if self.crypto.is_handshaking() {
3063 trace!("handshake ongoing");
3064 return Ok(());
3065 }
3066
3067 if self.side.is_client() {
3068 let params =
3070 self.crypto
3071 .transport_parameters()?
3072 .ok_or_else(|| TransportError {
3073 code: TransportErrorCode::crypto(0x6d),
3074 frame: None,
3075 reason: "transport parameters missing".into(),
3076 })?;
3077
3078 if self.has_0rtt() {
3079 if !self.crypto.early_data_accepted().unwrap() {
3080 debug_assert!(self.side.is_client());
3081 debug!("0-RTT rejected");
3082 self.accepted_0rtt = false;
3083 self.streams.zero_rtt_rejected();
3084
3085 self.spaces[SpaceId::Data].pending = Retransmits::default();
3087
3088 let sent_packets =
3090 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
3091 for (pn, packet) in sent_packets {
3092 self.remove_in_flight(pn, &packet);
3093 }
3094 } else {
3095 self.accepted_0rtt = true;
3096 params.validate_resumption_from(&self.peer_params)?;
3097 }
3098 }
3099 if let Some(token) = params.stateless_reset_token {
3100 self.endpoint_events
3101 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
3102 }
3103 self.handle_peer_params(params)?;
3104 self.issue_first_cids(now);
3105 } else {
3106 self.spaces[SpaceId::Data].pending.handshake_done = true;
3108 self.discard_space(now, SpaceId::Handshake);
3109 }
3110
3111 self.events.push_back(Event::Connected);
3112 self.state = State::Established;
3113 trace!("established");
3114 Ok(())
3115 }
3116 Header::Initial(InitialHeader {
3117 src_cid: rem_cid, ..
3118 }) => {
3119 if !state.rem_cid_set {
3120 trace!("switching remote CID to {}", rem_cid);
3121 let mut state = state.clone();
3122 self.rem_cids.update_initial_cid(rem_cid);
3123 self.rem_handshake_cid = rem_cid;
3124 self.orig_rem_cid = rem_cid;
3125 state.rem_cid_set = true;
3126 self.state = State::Handshake(state);
3127 } else if rem_cid != self.rem_handshake_cid {
3128 debug!(
3129 "discarding packet with mismatched remote CID: {} != {}",
3130 self.rem_handshake_cid, rem_cid
3131 );
3132 return Ok(());
3133 }
3134
3135 let starting_space = self.highest_space;
3136 self.process_early_payload(now, packet)?;
3137
3138 if self.side.is_server()
3139 && starting_space == SpaceId::Initial
3140 && self.highest_space != SpaceId::Initial
3141 {
3142 let params =
3143 self.crypto
3144 .transport_parameters()?
3145 .ok_or_else(|| TransportError {
3146 code: TransportErrorCode::crypto(0x6d),
3147 frame: None,
3148 reason: "transport parameters missing".into(),
3149 })?;
3150 self.handle_peer_params(params)?;
3151 self.issue_first_cids(now);
3152 self.init_0rtt();
3153 }
3154 Ok(())
3155 }
3156 Header::Long {
3157 ty: LongType::ZeroRtt,
3158 ..
3159 } => {
3160 self.process_payload(now, remote, number.unwrap(), packet)?;
3161 Ok(())
3162 }
3163 Header::VersionNegotiate { .. } => {
3164 if self.total_authed_packets > 1 {
3165 return Ok(());
3166 }
3167 let supported = packet
3168 .payload
3169 .chunks(4)
3170 .any(|x| match <[u8; 4]>::try_from(x) {
3171 Ok(version) => self.version == u32::from_be_bytes(version),
3172 Err(_) => false,
3173 });
3174 if supported {
3175 return Ok(());
3176 }
3177 debug!("remote doesn't support our version");
3178 Err(ConnectionError::VersionMismatch)
3179 }
3180 Header::Short { .. } => unreachable!(
3181 "short packets received during handshake are discarded in handle_packet"
3182 ),
3183 }
3184 }
3185
3186 fn process_early_payload(
3188 &mut self,
3189 now: Instant,
3190 packet: Packet,
3191 ) -> Result<(), TransportError> {
3192 debug_assert_ne!(packet.header.space(), SpaceId::Data);
3193 let payload_len = packet.payload.len();
3194 let mut ack_eliciting = false;
3195 for result in frame::Iter::new(packet.payload.freeze())? {
3196 let frame = result?;
3197 let span = match frame {
3198 Frame::Padding => continue,
3199 _ => Some(trace_span!("frame", ty = %frame.ty())),
3200 };
3201
3202 self.stats.frame_rx.record(&frame);
3203
3204 let _guard = span.as_ref().map(|x| x.enter());
3205 ack_eliciting |= frame.is_ack_eliciting();
3206
3207 match frame {
3209 Frame::Padding | Frame::Ping => {}
3210 Frame::Crypto(frame) => {
3211 self.read_crypto(packet.header.space(), &frame, payload_len)?;
3212 }
3213 Frame::Ack(ack) => {
3214 self.on_ack_received(now, packet.header.space(), ack)?;
3215 }
3216 Frame::Close(reason) => {
3217 self.error = Some(reason.into());
3218 self.state = State::Draining;
3219 return Ok(());
3220 }
3221 _ => {
3222 let mut err =
3223 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
3224 err.frame = Some(frame.ty());
3225 return Err(err);
3226 }
3227 }
3228 }
3229
3230 if ack_eliciting {
3231 self.spaces[packet.header.space()]
3233 .pending_acks
3234 .set_immediate_ack_required();
3235 }
3236
3237 self.write_crypto();
3238 Ok(())
3239 }
3240
3241 fn process_payload(
3242 &mut self,
3243 now: Instant,
3244 remote: SocketAddr,
3245 number: u64,
3246 packet: Packet,
3247 ) -> Result<(), TransportError> {
3248 let payload = packet.payload.freeze();
3249 let mut is_probing_packet = true;
3250 let mut close = None;
3251 let payload_len = payload.len();
3252 let mut ack_eliciting = false;
3253 for result in frame::Iter::new(payload)? {
3254 let frame = result?;
3255 let span = match frame {
3256 Frame::Padding => continue,
3257 _ => Some(trace_span!("frame", ty = %frame.ty())),
3258 };
3259
3260 self.stats.frame_rx.record(&frame);
3261 match &frame {
3264 Frame::Crypto(f) => {
3265 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
3266 }
3267 Frame::Stream(f) => {
3268 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
3269 }
3270 Frame::Datagram(f) => {
3271 trace!(len = f.data.len(), "got datagram frame");
3272 }
3273 f => {
3274 trace!("got frame {:?}", f);
3275 }
3276 }
3277
3278 let _guard = span.as_ref().map(|x| x.enter());
3279 if packet.header.is_0rtt() {
3280 match frame {
3281 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
3282 return Err(TransportError::PROTOCOL_VIOLATION(
3283 "illegal frame type in 0-RTT",
3284 ));
3285 }
3286 _ => {}
3287 }
3288 }
3289 ack_eliciting |= frame.is_ack_eliciting();
3290
3291 match frame {
3293 Frame::Padding
3294 | Frame::PathChallenge(_)
3295 | Frame::PathResponse(_)
3296 | Frame::NewConnectionId(_) => {}
3297 _ => {
3298 is_probing_packet = false;
3299 }
3300 }
3301 match frame {
3302 Frame::Crypto(frame) => {
3303 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
3304 }
3305 Frame::Stream(frame) => {
3306 if self.streams.received(frame, payload_len)?.should_transmit() {
3307 self.spaces[SpaceId::Data].pending.max_data = true;
3308 }
3309 }
3310 Frame::Ack(ack) => {
3311 self.on_ack_received(now, SpaceId::Data, ack)?;
3312 }
3313 Frame::Padding | Frame::Ping => {}
3314 Frame::Close(reason) => {
3315 close = Some(reason);
3316 }
3317 Frame::PathChallenge(token) => {
3318 self.path_responses.push(number, token, remote);
3319 if remote == self.path.remote {
3320 match self.peer_supports_ack_frequency() {
3323 true => self.immediate_ack(),
3324 false => self.ping(),
3325 }
3326 }
3327 }
3328 Frame::PathResponse(token) => {
3329 if self.path.challenge == Some(token) && remote == self.path.remote {
3330 trace!("new path validated");
3331 self.timers.stop(Timer::PathValidation);
3332 self.path.challenge = None;
3333 self.path.validated = true;
3334 if let Some((_, ref mut prev_path)) = self.prev_path {
3335 prev_path.challenge = None;
3336 prev_path.challenge_pending = false;
3337 }
3338 self.on_path_validated();
3339 } else if let Some(nat_traversal) = &mut self.nat_traversal {
3340 match nat_traversal.handle_validation_success(remote, token, now) {
3342 Ok(sequence) => {
3343 trace!(
3344 "NAT traversal candidate {} validated for sequence {}",
3345 remote, sequence
3346 );
3347
3348 if nat_traversal.handle_coordination_success(remote, now) {
3350 trace!("Coordination succeeded via {}", remote);
3351
3352 let can_migrate = match &self.side {
3354 ConnectionSide::Client { .. } => true, ConnectionSide::Server { server_config } => {
3356 server_config.migration
3357 }
3358 };
3359
3360 if can_migrate {
3361 let best_pairs = nat_traversal.get_best_succeeded_pairs();
3363 if let Some(best) = best_pairs.first() {
3364 if best.remote_addr == remote
3365 && best.remote_addr != self.path.remote
3366 {
3367 debug!(
3368 "NAT traversal found better path, initiating migration"
3369 );
3370 if let Err(e) =
3372 self.migrate_to_nat_traversal_path(now)
3373 {
3374 warn!(
3375 "Failed to migrate to NAT traversal path: {:?}",
3376 e
3377 );
3378 }
3379 }
3380 }
3381 }
3382 } else {
3383 if nat_traversal.mark_pair_succeeded(remote) {
3385 trace!("NAT traversal pair succeeded for {}", remote);
3386 }
3387 }
3388 }
3389 Err(NatTraversalError::ChallengeMismatch) => {
3390 debug!(
3391 "PATH_RESPONSE challenge mismatch for NAT candidate {}",
3392 remote
3393 );
3394 }
3395 Err(e) => {
3396 debug!("NAT traversal validation error: {}", e);
3397 }
3398 }
3399 } else {
3400 debug!(token, "ignoring invalid PATH_RESPONSE");
3401 }
3402 }
3403 Frame::MaxData(bytes) => {
3404 self.streams.received_max_data(bytes);
3405 }
3406 Frame::MaxStreamData { id, offset } => {
3407 self.streams.received_max_stream_data(id, offset)?;
3408 }
3409 Frame::MaxStreams { dir, count } => {
3410 self.streams.received_max_streams(dir, count)?;
3411 }
3412 Frame::ResetStream(frame) => {
3413 if self.streams.received_reset(frame)?.should_transmit() {
3414 self.spaces[SpaceId::Data].pending.max_data = true;
3415 }
3416 }
3417 Frame::DataBlocked { offset } => {
3418 debug!(offset, "peer claims to be blocked at connection level");
3419 }
3420 Frame::StreamDataBlocked { id, offset } => {
3421 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
3422 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
3423 return Err(TransportError::STREAM_STATE_ERROR(
3424 "STREAM_DATA_BLOCKED on send-only stream",
3425 ));
3426 }
3427 debug!(
3428 stream = %id,
3429 offset, "peer claims to be blocked at stream level"
3430 );
3431 }
3432 Frame::StreamsBlocked { dir, limit } => {
3433 if limit > MAX_STREAM_COUNT {
3434 return Err(TransportError::FRAME_ENCODING_ERROR(
3435 "unrepresentable stream limit",
3436 ));
3437 }
3438 debug!(
3439 "peer claims to be blocked opening more than {} {} streams",
3440 limit, dir
3441 );
3442 }
3443 Frame::StopSending(frame::StopSending { id, error_code }) => {
3444 if id.initiator() != self.side.side() {
3445 if id.dir() == Dir::Uni {
3446 debug!("got STOP_SENDING on recv-only {}", id);
3447 return Err(TransportError::STREAM_STATE_ERROR(
3448 "STOP_SENDING on recv-only stream",
3449 ));
3450 }
3451 } else if self.streams.is_local_unopened(id) {
3452 return Err(TransportError::STREAM_STATE_ERROR(
3453 "STOP_SENDING on unopened stream",
3454 ));
3455 }
3456 self.streams.received_stop_sending(id, error_code);
3457 }
3458 Frame::RetireConnectionId { sequence } => {
3459 let allow_more_cids = self
3460 .local_cid_state
3461 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
3462 self.endpoint_events
3463 .push_back(EndpointEventInner::RetireConnectionId(
3464 now,
3465 sequence,
3466 allow_more_cids,
3467 ));
3468 }
3469 Frame::NewConnectionId(frame) => {
3470 trace!(
3471 sequence = frame.sequence,
3472 id = %frame.id,
3473 retire_prior_to = frame.retire_prior_to,
3474 );
3475 if self.rem_cids.active().is_empty() {
3476 return Err(TransportError::PROTOCOL_VIOLATION(
3477 "NEW_CONNECTION_ID when CIDs aren't in use",
3478 ));
3479 }
3480 if frame.retire_prior_to > frame.sequence {
3481 return Err(TransportError::PROTOCOL_VIOLATION(
3482 "NEW_CONNECTION_ID retiring unissued CIDs",
3483 ));
3484 }
3485
3486 use crate::cid_queue::InsertError;
3487 match self.rem_cids.insert(frame) {
3488 Ok(None) => {}
3489 Ok(Some((retired, reset_token))) => {
3490 let pending_retired =
3491 &mut self.spaces[SpaceId::Data].pending.retire_cids;
3492 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
3495 if (pending_retired.len() as u64)
3498 .saturating_add(retired.end.saturating_sub(retired.start))
3499 > MAX_PENDING_RETIRED_CIDS
3500 {
3501 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
3502 "queued too many retired CIDs",
3503 ));
3504 }
3505 pending_retired.extend(retired);
3506 self.set_reset_token(reset_token);
3507 }
3508 Err(InsertError::ExceedsLimit) => {
3509 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
3510 }
3511 Err(InsertError::Retired) => {
3512 trace!("discarding already-retired");
3513 self.spaces[SpaceId::Data]
3517 .pending
3518 .retire_cids
3519 .push(frame.sequence);
3520 continue;
3521 }
3522 };
3523
3524 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
3525 self.update_rem_cid();
3528 }
3529 }
3530 Frame::NewToken(NewToken { token }) => {
3531 let ConnectionSide::Client {
3532 token_store,
3533 server_name,
3534 ..
3535 } = &self.side
3536 else {
3537 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
3538 };
3539 if token.is_empty() {
3540 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
3541 }
3542 trace!("got new token");
3543 token_store.insert(server_name, token);
3544 }
3545 Frame::Datagram(datagram) => {
3546 if self
3547 .datagrams
3548 .received(datagram, &self.config.datagram_receive_buffer_size)?
3549 {
3550 self.events.push_back(Event::DatagramReceived);
3551 }
3552 }
3553 Frame::AckFrequency(ack_frequency) => {
3554 let space = &mut self.spaces[SpaceId::Data];
3556
3557 if !self
3558 .ack_frequency
3559 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3560 {
3561 continue;
3563 }
3564
3565 if let Some(timeout) = space
3568 .pending_acks
3569 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3570 {
3571 self.timers.set(Timer::MaxAckDelay, timeout);
3572 }
3573 }
3574 Frame::ImmediateAck => {
3575 self.spaces[SpaceId::Data]
3577 .pending_acks
3578 .set_immediate_ack_required();
3579 }
3580 Frame::HandshakeDone => {
3581 if self.side.is_server() {
3582 return Err(TransportError::PROTOCOL_VIOLATION(
3583 "client sent HANDSHAKE_DONE",
3584 ));
3585 }
3586 if self.spaces[SpaceId::Handshake].crypto.is_some() {
3587 self.discard_space(now, SpaceId::Handshake);
3588 }
3589 }
3590 Frame::AddAddress(add_address) => {
3591 self.handle_add_address(&add_address, now)?;
3592 }
3593 Frame::PunchMeNow(punch_me_now) => {
3594 self.handle_punch_me_now(&punch_me_now, now)?;
3595 }
3596 Frame::RemoveAddress(remove_address) => {
3597 self.handle_remove_address(&remove_address)?;
3598 }
3599 Frame::ObservedAddress(observed_address) => {
3600 self.handle_observed_address_frame(&observed_address, now)?;
3601 }
3602 }
3603 }
3604
3605 let space = &mut self.spaces[SpaceId::Data];
3606 if space
3607 .pending_acks
3608 .packet_received(now, number, ack_eliciting, &space.dedup)
3609 {
3610 self.timers
3611 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3612 }
3613
3614 let pending = &mut self.spaces[SpaceId::Data].pending;
3619 self.streams.queue_max_stream_id(pending);
3620
3621 if let Some(reason) = close {
3622 self.error = Some(reason.into());
3623 self.state = State::Draining;
3624 self.close = true;
3625 }
3626
3627 if remote != self.path.remote
3628 && !is_probing_packet
3629 && number == self.spaces[SpaceId::Data].rx_packet
3630 {
3631 let ConnectionSide::Server { ref server_config } = self.side else {
3632 return Err(TransportError::PROTOCOL_VIOLATION(
3633 "packets from unknown remote should be dropped by clients",
3634 ));
3635 };
3636 debug_assert!(
3637 server_config.migration,
3638 "migration-initiating packets should have been dropped immediately"
3639 );
3640 self.migrate(now, remote);
3641 self.update_rem_cid();
3643 self.spin = false;
3644 }
3645
3646 Ok(())
3647 }
3648
3649 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3650 trace!(%remote, "migration initiated");
3651 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3655 PathData::from_previous(remote, &self.path, now)
3656 } else {
3657 let peer_max_udp_payload_size =
3658 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3659 .unwrap_or(u16::MAX);
3660 PathData::new(
3661 remote,
3662 self.allow_mtud,
3663 Some(peer_max_udp_payload_size),
3664 now,
3665 &self.config,
3666 )
3667 };
3668 new_path.challenge = Some(self.rng.r#gen());
3669 new_path.challenge_pending = true;
3670 let prev_pto = self.pto(SpaceId::Data);
3671
3672 let mut prev = mem::replace(&mut self.path, new_path);
3673 if prev.challenge.is_none() {
3675 prev.challenge = Some(self.rng.r#gen());
3676 prev.challenge_pending = true;
3677 self.prev_path = Some((self.rem_cids.active(), prev));
3680 }
3681
3682 self.timers.set(
3683 Timer::PathValidation,
3684 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3685 );
3686 }
3687
3688 pub fn local_address_changed(&mut self) {
3690 self.update_rem_cid();
3691 self.ping();
3692 }
3693
3694 pub fn migrate_to_nat_traversal_path(&mut self, now: Instant) -> Result<(), TransportError> {
3696 let (remote_addr, local_addr) = {
3698 let nat_state = self
3699 .nat_traversal
3700 .as_ref()
3701 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
3702
3703 let best_pairs = nat_state.get_best_succeeded_pairs();
3705 if best_pairs.is_empty() {
3706 return Err(TransportError::PROTOCOL_VIOLATION(
3707 "No validated NAT traversal paths",
3708 ));
3709 }
3710
3711 let best_path = best_pairs
3713 .iter()
3714 .find(|pair| pair.remote_addr != self.path.remote)
3715 .or_else(|| best_pairs.first());
3716
3717 let best_path = best_path.ok_or_else(|| {
3718 TransportError::PROTOCOL_VIOLATION("No suitable NAT traversal path")
3719 })?;
3720
3721 debug!(
3722 "Migrating to NAT traversal path: {} -> {} (priority: {})",
3723 self.path.remote, best_path.remote_addr, best_path.priority
3724 );
3725
3726 (best_path.remote_addr, best_path.local_addr)
3727 };
3728
3729 self.migrate(now, remote_addr);
3731
3732 if local_addr != SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0) {
3734 self.local_ip = Some(local_addr.ip());
3735 }
3736
3737 self.path.challenge_pending = true;
3739
3740 Ok(())
3741 }
3742
3743 fn update_rem_cid(&mut self) {
3745 let (reset_token, retired) = match self.rem_cids.next() {
3746 Some(x) => x,
3747 None => return,
3748 };
3749
3750 self.spaces[SpaceId::Data]
3752 .pending
3753 .retire_cids
3754 .extend(retired);
3755 self.set_reset_token(reset_token);
3756 }
3757
3758 fn set_reset_token(&mut self, reset_token: ResetToken) {
3759 self.endpoint_events
3760 .push_back(EndpointEventInner::ResetToken(
3761 self.path.remote,
3762 reset_token,
3763 ));
3764 self.peer_params.stateless_reset_token = Some(reset_token);
3765 }
3766
3767 fn issue_first_cids(&mut self, now: Instant) {
3769 if self.local_cid_state.cid_len() == 0 {
3770 return;
3771 }
3772
3773 let mut n = self.peer_params.issue_cids_limit() - 1;
3775 if let ConnectionSide::Server { server_config } = &self.side {
3776 if server_config.has_preferred_address() {
3777 n -= 1;
3779 }
3780 }
3781 self.endpoint_events
3782 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3783 }
3784
3785 fn populate_packet(
3786 &mut self,
3787 now: Instant,
3788 space_id: SpaceId,
3789 buf: &mut Vec<u8>,
3790 max_size: usize,
3791 pn: u64,
3792 ) -> SentFrames {
3793 let mut sent = SentFrames::default();
3794 let space = &mut self.spaces[space_id];
3795 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3796 space.pending_acks.maybe_ack_non_eliciting();
3797
3798 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3800 buf.write(frame::FrameType::HANDSHAKE_DONE);
3801 sent.retransmits.get_or_create().handshake_done = true;
3802 self.stats.frame_tx.handshake_done =
3804 self.stats.frame_tx.handshake_done.saturating_add(1);
3805 }
3806
3807 if mem::replace(&mut space.ping_pending, false) {
3809 trace!("PING");
3810 buf.write(frame::FrameType::PING);
3811 sent.non_retransmits = true;
3812 self.stats.frame_tx.ping += 1;
3813 }
3814
3815 if mem::replace(&mut space.immediate_ack_pending, false) {
3817 trace!("IMMEDIATE_ACK");
3818 buf.write(frame::FrameType::IMMEDIATE_ACK);
3819 sent.non_retransmits = true;
3820 self.stats.frame_tx.immediate_ack += 1;
3821 }
3822
3823 if space.pending_acks.can_send() {
3825 Self::populate_acks(
3826 now,
3827 self.receiving_ecn,
3828 &mut sent,
3829 space,
3830 buf,
3831 &mut self.stats,
3832 );
3833 }
3834
3835 if mem::replace(&mut space.pending.ack_frequency, false) {
3837 let sequence_number = self.ack_frequency.next_sequence_number();
3838
3839 let config = self.config.ack_frequency_config.as_ref().unwrap();
3841
3842 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3844 self.path.rtt.get(),
3845 config,
3846 &self.peer_params,
3847 );
3848
3849 trace!(?max_ack_delay, "ACK_FREQUENCY");
3850
3851 frame::AckFrequency {
3852 sequence: sequence_number,
3853 ack_eliciting_threshold: config.ack_eliciting_threshold,
3854 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3855 reordering_threshold: config.reordering_threshold,
3856 }
3857 .encode(buf);
3858
3859 sent.retransmits.get_or_create().ack_frequency = true;
3860
3861 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3862 self.stats.frame_tx.ack_frequency += 1;
3863 }
3864
3865 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3867 if let Some(token) = self.path.challenge {
3869 self.path.challenge_pending = false;
3871 sent.non_retransmits = true;
3872 sent.requires_padding = true;
3873 trace!("PATH_CHALLENGE {:08x}", token);
3874 buf.write(frame::FrameType::PATH_CHALLENGE);
3875 buf.write(token);
3876 self.stats.frame_tx.path_challenge += 1;
3877 }
3878
3879 }
3888
3889 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3891 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3892 sent.non_retransmits = true;
3893 sent.requires_padding = true;
3894 trace!("PATH_RESPONSE {:08x}", token);
3895 buf.write(frame::FrameType::PATH_RESPONSE);
3896 buf.write(token);
3897 self.stats.frame_tx.path_response += 1;
3898 }
3899 }
3900
3901 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3903 let mut frame = match space.pending.crypto.pop_front() {
3904 Some(x) => x,
3905 None => break,
3906 };
3907
3908 let max_crypto_data_size = max_size
3913 - buf.len()
3914 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3916 - 2; let available_space = max_size - buf.len();
3920 let remaining_data = frame.data.len();
3921 #[cfg(feature = "pqc")]
3922 let optimal_size = self
3923 .pqc_state
3924 .calculate_crypto_frame_size(available_space, remaining_data);
3925 #[cfg(not(feature = "pqc"))]
3926 let optimal_size = available_space.min(remaining_data);
3927
3928 let len = frame
3929 .data
3930 .len()
3931 .min(2usize.pow(14) - 1)
3932 .min(max_crypto_data_size)
3933 .min(optimal_size);
3934
3935 let data = frame.data.split_to(len);
3936 let truncated = frame::Crypto {
3937 offset: frame.offset,
3938 data,
3939 };
3940 trace!(
3941 "CRYPTO: off {} len {}",
3942 truncated.offset,
3943 truncated.data.len()
3944 );
3945 truncated.encode(buf);
3946 self.stats.frame_tx.crypto += 1;
3947 sent.retransmits.get_or_create().crypto.push_back(truncated);
3948 if !frame.data.is_empty() {
3949 frame.offset += len as u64;
3950 space.pending.crypto.push_front(frame);
3951 }
3952 }
3953
3954 if space_id == SpaceId::Data {
3955 self.streams.write_control_frames(
3956 buf,
3957 &mut space.pending,
3958 &mut sent.retransmits,
3959 &mut self.stats.frame_tx,
3960 max_size,
3961 );
3962 }
3963
3964 while buf.len() + 44 < max_size {
3966 let issued = match space.pending.new_cids.pop() {
3967 Some(x) => x,
3968 None => break,
3969 };
3970 trace!(
3971 sequence = issued.sequence,
3972 id = %issued.id,
3973 "NEW_CONNECTION_ID"
3974 );
3975 frame::NewConnectionId {
3976 sequence: issued.sequence,
3977 retire_prior_to: self.local_cid_state.retire_prior_to(),
3978 id: issued.id,
3979 reset_token: issued.reset_token,
3980 }
3981 .encode(buf);
3982 sent.retransmits.get_or_create().new_cids.push(issued);
3983 self.stats.frame_tx.new_connection_id += 1;
3984 }
3985
3986 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3988 let seq = match space.pending.retire_cids.pop() {
3989 Some(x) => x,
3990 None => break,
3991 };
3992 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3993 buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3994 buf.write_var(seq);
3995 sent.retransmits.get_or_create().retire_cids.push(seq);
3996 self.stats.frame_tx.retire_connection_id += 1;
3997 }
3998
3999 let mut sent_datagrams = false;
4001 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4002 match self.datagrams.write(buf, max_size) {
4003 true => {
4004 sent_datagrams = true;
4005 sent.non_retransmits = true;
4006 self.stats.frame_tx.datagram += 1;
4007 }
4008 false => break,
4009 }
4010 }
4011 if self.datagrams.send_blocked && sent_datagrams {
4012 self.events.push_back(Event::DatagramsUnblocked);
4013 self.datagrams.send_blocked = false;
4014 }
4015
4016 while let Some(remote_addr) = space.pending.new_tokens.pop() {
4018 debug_assert_eq!(space_id, SpaceId::Data);
4019 let ConnectionSide::Server { server_config } = &self.side else {
4020 debug_assert!(false, "NEW_TOKEN frames should not be enqueued by clients");
4022 continue;
4023 };
4024
4025 if remote_addr != self.path.remote {
4026 continue;
4031 }
4032
4033 if self.delay_new_token_until_binding && self.peer_id_for_tokens.is_none() {
4036 space.pending.new_tokens.push(remote_addr);
4038 break;
4039 }
4040
4041 let new_token = if let (Some(pid), Some(v2_key)) =
4043 (self.peer_id_for_tokens, server_config.token_v2_key.as_ref())
4044 {
4045 let cid = self.rem_cids.active();
4046 let tok =
4047 crate::token_v2::encode_retry_token_with_rng(v2_key, &pid, &cid, &mut self.rng);
4048 NewToken { token: tok.into() }
4049 } else {
4050 let token = Token::new(
4051 TokenPayload::Validation {
4052 ip: remote_addr.ip(),
4053 issued: server_config.time_source.now(),
4054 },
4055 &mut self.rng,
4056 );
4057 NewToken {
4058 token: token.encode(&*server_config.token_key).into(),
4059 }
4060 };
4061
4062 if buf.len() + new_token.size() >= max_size {
4063 space.pending.new_tokens.push(remote_addr);
4064 break;
4065 }
4066
4067 new_token.encode(buf);
4068 sent.retransmits
4069 .get_or_create()
4070 .new_tokens
4071 .push(remote_addr);
4072 self.stats.frame_tx.new_token += 1;
4073 }
4074
4075 while buf.len() + frame::AddAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4077 let add_address = match space.pending.add_addresses.pop() {
4078 Some(x) => x,
4079 None => break,
4080 };
4081 trace!(
4082 sequence = %add_address.sequence,
4083 address = %add_address.address,
4084 "ADD_ADDRESS"
4085 );
4086 if self.nat_traversal_frame_config.use_rfc_format {
4088 add_address.encode_rfc(buf);
4089 } else {
4090 add_address.encode_legacy(buf);
4091 }
4092 sent.retransmits
4093 .get_or_create()
4094 .add_addresses
4095 .push(add_address);
4096 self.stats.frame_tx.add_address += 1;
4097 }
4098
4099 while buf.len() + frame::PunchMeNow::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4101 let punch_me_now = match space.pending.punch_me_now.pop() {
4102 Some(x) => x,
4103 None => break,
4104 };
4105 trace!(
4106 round = %punch_me_now.round,
4107 paired_with_sequence_number = %punch_me_now.paired_with_sequence_number,
4108 "PUNCH_ME_NOW"
4109 );
4110 if self.nat_traversal_frame_config.use_rfc_format {
4112 punch_me_now.encode_rfc(buf);
4113 } else {
4114 punch_me_now.encode_legacy(buf);
4115 }
4116 sent.retransmits
4117 .get_or_create()
4118 .punch_me_now
4119 .push(punch_me_now);
4120 self.stats.frame_tx.punch_me_now += 1;
4121 }
4122
4123 while buf.len() + frame::RemoveAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4125 let remove_address = match space.pending.remove_addresses.pop() {
4126 Some(x) => x,
4127 None => break,
4128 };
4129 trace!(
4130 sequence = %remove_address.sequence,
4131 "REMOVE_ADDRESS"
4132 );
4133 remove_address.encode(buf);
4135 sent.retransmits
4136 .get_or_create()
4137 .remove_addresses
4138 .push(remove_address);
4139 self.stats.frame_tx.remove_address += 1;
4140 }
4141
4142 while buf.len() + frame::ObservedAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data
4144 {
4145 let observed_address = match space.pending.observed_addresses.pop() {
4146 Some(x) => x,
4147 None => break,
4148 };
4149 trace!(
4150 address = %observed_address.address,
4151 "OBSERVED_ADDRESS"
4152 );
4153 observed_address.encode(buf);
4154 sent.retransmits
4155 .get_or_create()
4156 .observed_addresses
4157 .push(observed_address);
4158 self.stats.frame_tx.observed_address += 1;
4159 }
4160
4161 if space_id == SpaceId::Data {
4163 sent.stream_frames =
4164 self.streams
4165 .write_stream_frames(buf, max_size, self.config.send_fairness);
4166 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
4167 }
4168
4169 sent
4170 }
4171
4172 fn populate_acks(
4177 now: Instant,
4178 receiving_ecn: bool,
4179 sent: &mut SentFrames,
4180 space: &mut PacketSpace,
4181 buf: &mut Vec<u8>,
4182 stats: &mut ConnectionStats,
4183 ) {
4184 debug_assert!(!space.pending_acks.ranges().is_empty());
4185
4186 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
4188 let ecn = if receiving_ecn {
4189 Some(&space.ecn_counters)
4190 } else {
4191 None
4192 };
4193 sent.largest_acked = space.pending_acks.ranges().max();
4194
4195 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
4196
4197 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
4199 let delay = delay_micros >> ack_delay_exp.into_inner();
4200
4201 trace!(
4202 "ACK {:?}, Delay = {}us",
4203 space.pending_acks.ranges(),
4204 delay_micros
4205 );
4206
4207 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
4208 stats.frame_tx.acks += 1;
4209 }
4210
4211 fn close_common(&mut self) {
4212 trace!("connection closed");
4213 for &timer in &Timer::VALUES {
4214 self.timers.stop(timer);
4215 }
4216 }
4217
4218 fn set_close_timer(&mut self, now: Instant) {
4219 self.timers
4220 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
4221 }
4222
4223 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
4225 if Some(self.orig_rem_cid) != params.initial_src_cid
4226 || (self.side.is_client()
4227 && (Some(self.initial_dst_cid) != params.original_dst_cid
4228 || self.retry_src_cid != params.retry_src_cid))
4229 {
4230 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
4231 "CID authentication failure",
4232 ));
4233 }
4234
4235 self.set_peer_params(params);
4236
4237 Ok(())
4238 }
4239
4240 fn set_peer_params(&mut self, params: TransportParameters) {
4241 self.streams.set_params(¶ms);
4242 self.idle_timeout =
4243 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
4244 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
4245 if let Some(ref info) = params.preferred_address {
4246 self.rem_cids.insert(frame::NewConnectionId {
4247 sequence: 1,
4248 id: info.connection_id,
4249 reset_token: info.stateless_reset_token,
4250 retire_prior_to: 0,
4251 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
4252 }
4253 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
4254
4255 self.negotiate_nat_traversal_capability(¶ms);
4257
4258 let local_has_nat_traversal = self.config.nat_traversal_config.is_some();
4261 let local_supports_rfc = local_has_nat_traversal;
4264 self.nat_traversal_frame_config = frame::nat_traversal_unified::NatTraversalFrameConfig {
4265 use_rfc_format: local_supports_rfc && params.supports_rfc_nat_traversal(),
4267 accept_legacy: true,
4269 };
4270
4271 self.negotiate_address_discovery(¶ms);
4273
4274 #[cfg(feature = "pqc")]
4276 {
4277 self.pqc_state.update_from_peer_params(¶ms);
4278
4279 if self.pqc_state.enabled && self.pqc_state.using_pqc {
4281 trace!("PQC enabled, adjusting MTU discovery for larger handshake packets");
4282 let current_mtu = self.path.mtud.current_mtu();
4286 if current_mtu < self.pqc_state.handshake_mtu {
4287 trace!(
4288 "Current MTU {} is less than PQC handshake MTU {}, will rely on MTU discovery",
4289 current_mtu, self.pqc_state.handshake_mtu
4290 );
4291 }
4292 }
4293 }
4294
4295 self.peer_params = params;
4296 self.path.mtud.on_peer_max_udp_payload_size_received(
4297 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
4298 );
4299 }
4300
4301 fn negotiate_nat_traversal_capability(&mut self, params: &TransportParameters) {
4303 let peer_nat_config = match ¶ms.nat_traversal {
4305 Some(config) => config,
4306 None => {
4307 if self.config.nat_traversal_config.is_some() {
4309 debug!(
4310 "Peer does not support NAT traversal, maintaining backward compatibility"
4311 );
4312 self.emit_nat_traversal_capability_event(false);
4313
4314 self.set_nat_traversal_compatibility_mode(false);
4316 }
4317 return;
4318 }
4319 };
4320
4321 let local_nat_config = match &self.config.nat_traversal_config {
4323 Some(config) => config,
4324 None => {
4325 debug!("NAT traversal not enabled locally, ignoring peer support");
4326 self.emit_nat_traversal_capability_event(false);
4327 self.set_nat_traversal_compatibility_mode(false);
4328 return;
4329 }
4330 };
4331
4332 info!("Both peers support NAT traversal, negotiating capabilities");
4334
4335 match self.negotiate_nat_traversal_parameters(local_nat_config, peer_nat_config) {
4337 Ok(negotiated_config) => {
4338 info!("NAT traversal capability negotiated successfully");
4339 self.emit_nat_traversal_capability_event(true);
4340
4341 self.init_nat_traversal_with_negotiated_config(&negotiated_config);
4343
4344 self.set_nat_traversal_compatibility_mode(true);
4346
4347 if matches!(
4349 negotiated_config,
4350 crate::transport_parameters::NatTraversalConfig::ClientSupport
4351 ) {
4352 self.initiate_nat_traversal_process();
4353 }
4354 }
4355 Err(e) => {
4356 warn!("NAT traversal capability negotiation failed: {}", e);
4357 self.emit_nat_traversal_capability_event(false);
4358 self.set_nat_traversal_compatibility_mode(false);
4359 }
4360 }
4361 }
4362
4363 fn emit_nat_traversal_capability_event(&mut self, negotiated: bool) {
4417 if negotiated {
4420 info!("NAT traversal capability successfully negotiated");
4421 } else {
4422 info!("NAT traversal capability not available (peer or local support missing)");
4423 }
4424
4425 }
4428
4429 fn set_nat_traversal_compatibility_mode(&mut self, enabled: bool) {
4431 if enabled {
4432 debug!("NAT traversal enabled for this connection");
4433 } else {
4435 debug!("NAT traversal disabled for this connection (backward compatibility mode)");
4436 if self.nat_traversal.is_some() {
4438 warn!("Clearing NAT traversal state due to compatibility mode");
4439 self.nat_traversal = None;
4440 }
4441 }
4442 }
4443
4444 fn negotiate_nat_traversal_parameters(
4446 &self,
4447 local_config: &crate::transport_parameters::NatTraversalConfig,
4448 peer_config: &crate::transport_parameters::NatTraversalConfig,
4449 ) -> Result<crate::transport_parameters::NatTraversalConfig, String> {
4450 match (local_config, peer_config) {
4455 (
4457 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4458 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4459 concurrency_limit,
4460 },
4461 ) => Ok(
4462 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4463 concurrency_limit: *concurrency_limit,
4464 },
4465 ),
4466 (
4468 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4469 concurrency_limit,
4470 },
4471 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4472 ) => Ok(
4473 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4474 concurrency_limit: *concurrency_limit,
4475 },
4476 ),
4477 (
4479 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4480 concurrency_limit: limit1,
4481 },
4482 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4483 concurrency_limit: limit2,
4484 },
4485 ) => Ok(
4486 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4487 concurrency_limit: (*limit1).min(*limit2),
4488 },
4489 ),
4490 (
4492 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4493 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4494 ) => Err("Both endpoints claim to be NAT traversal clients".to_string()),
4495 }
4496 }
4497
4498 fn init_nat_traversal_with_negotiated_config(
4500 &mut self,
4501 config: &crate::transport_parameters::NatTraversalConfig,
4502 ) {
4503 let (role, _concurrency_limit) = match config {
4506 crate::transport_parameters::NatTraversalConfig::ClientSupport => {
4507 (NatTraversalRole::Client, 10) }
4510 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4511 concurrency_limit,
4512 } => {
4513 (
4515 NatTraversalRole::Server { can_relay: false },
4516 concurrency_limit.into_inner() as u32,
4517 )
4518 }
4519 };
4520
4521 let max_candidates = 50; let coordination_timeout = Duration::from_secs(10); self.nat_traversal = Some(NatTraversalState::new(
4527 role,
4528 max_candidates,
4529 coordination_timeout,
4530 ));
4531
4532 trace!(
4533 "NAT traversal initialized with negotiated config: role={:?}",
4534 role
4535 );
4536
4537 match role {
4539 NatTraversalRole::Bootstrap => {
4540 self.prepare_address_observation();
4542 }
4543 NatTraversalRole::Client => {
4544 self.schedule_candidate_discovery();
4546 }
4547 NatTraversalRole::Server { .. } => {
4548 self.prepare_coordination_handling();
4550 }
4551 }
4552 }
4553
4554 fn initiate_nat_traversal_process(&mut self) {
4556 if let Some(nat_state) = &mut self.nat_traversal {
4557 match nat_state.start_candidate_discovery() {
4558 Ok(()) => {
4559 debug!("NAT traversal process initiated - candidate discovery started");
4560 self.timers.set(
4562 Timer::NatTraversal,
4563 Instant::now() + Duration::from_millis(100),
4564 );
4565 }
4566 Err(e) => {
4567 warn!("Failed to initiate NAT traversal process: {}", e);
4568 }
4569 }
4570 }
4571 }
4572
4573 fn prepare_address_observation(&mut self) {
4575 debug!("Preparing for address observation as bootstrap node");
4576 }
4579
4580 fn schedule_candidate_discovery(&mut self) {
4582 debug!("Scheduling candidate discovery for client endpoint");
4583 self.timers.set(
4585 Timer::NatTraversal,
4586 Instant::now() + Duration::from_millis(50),
4587 );
4588 }
4589
4590 fn prepare_coordination_handling(&mut self) {
4592 debug!("Preparing to handle coordination requests as server endpoint");
4593 }
4596
4597 fn handle_nat_traversal_timeout(&mut self, now: Instant) {
4599 let timeout_result = if let Some(nat_state) = &mut self.nat_traversal {
4601 nat_state.handle_timeout(now)
4602 } else {
4603 return;
4604 };
4605
4606 match timeout_result {
4608 Ok(actions) => {
4609 for action in actions {
4610 match action {
4611 nat_traversal::TimeoutAction::RetryDiscovery => {
4612 debug!("NAT traversal timeout: retrying candidate discovery");
4613 if let Some(nat_state) = &mut self.nat_traversal {
4614 if let Err(e) = nat_state.start_candidate_discovery() {
4615 warn!("Failed to retry candidate discovery: {}", e);
4616 }
4617 }
4618 }
4619 nat_traversal::TimeoutAction::RetryCoordination => {
4620 debug!("NAT traversal timeout: retrying coordination");
4621 self.timers
4623 .set(Timer::NatTraversal, now + Duration::from_secs(2));
4624 }
4625 nat_traversal::TimeoutAction::StartValidation => {
4626 debug!("NAT traversal timeout: starting path validation");
4627 self.start_nat_traversal_validation(now);
4628 }
4629 nat_traversal::TimeoutAction::Complete => {
4630 debug!("NAT traversal completed successfully");
4631 self.timers.stop(Timer::NatTraversal);
4633 }
4634 nat_traversal::TimeoutAction::Failed => {
4635 warn!("NAT traversal failed after timeout");
4636 self.handle_nat_traversal_failure();
4638 }
4639 }
4640 }
4641 }
4642 Err(e) => {
4643 warn!("NAT traversal timeout handling failed: {}", e);
4644 self.handle_nat_traversal_failure();
4645 }
4646 }
4647 }
4648
4649 fn start_nat_traversal_validation(&mut self, now: Instant) {
4651 if let Some(nat_state) = &mut self.nat_traversal {
4652 let pairs = nat_state.get_next_validation_pairs(3);
4654
4655 for pair in pairs {
4656 let challenge = self.rng.r#gen();
4658 self.path.challenge = Some(challenge);
4659 self.path.challenge_pending = true;
4660
4661 debug!(
4662 "Starting path validation for NAT traversal candidate: {}",
4663 pair.remote_addr
4664 );
4665 }
4666
4667 self.timers
4669 .set(Timer::PathValidation, now + Duration::from_secs(3));
4670 }
4671 }
4672
4673 fn handle_nat_traversal_failure(&mut self) {
4675 warn!("NAT traversal failed, considering fallback options");
4676
4677 self.nat_traversal = None;
4679 self.timers.stop(Timer::NatTraversal);
4680
4681 debug!("NAT traversal disabled for this connection due to failure");
4688 }
4689
4690 pub fn nat_traversal_supported(&self) -> bool {
4692 self.nat_traversal.is_some()
4693 && self.config.nat_traversal_config.is_some()
4694 && self.peer_params.nat_traversal.is_some()
4695 }
4696
4697 pub fn nat_traversal_config(&self) -> Option<&crate::transport_parameters::NatTraversalConfig> {
4699 self.peer_params.nat_traversal.as_ref()
4700 }
4701
4702 pub fn nat_traversal_ready(&self) -> bool {
4704 self.nat_traversal_supported() && matches!(self.state, State::Established)
4705 }
4706
4707 #[allow(dead_code)]
4712 pub(crate) fn nat_traversal_stats(&self) -> Option<nat_traversal::NatTraversalStats> {
4713 self.nat_traversal.as_ref().map(|state| state.stats.clone())
4714 }
4715
4716 #[cfg(test)]
4718 #[allow(dead_code)]
4719 pub(crate) fn force_enable_nat_traversal(&mut self, role: NatTraversalRole) {
4720 use crate::transport_parameters::NatTraversalConfig;
4721
4722 let config = match role {
4724 NatTraversalRole::Client => NatTraversalConfig::ClientSupport,
4725 NatTraversalRole::Server { .. } | NatTraversalRole::Bootstrap => {
4726 NatTraversalConfig::ServerSupport {
4727 concurrency_limit: VarInt::from_u32(5),
4728 }
4729 }
4730 };
4731
4732 self.peer_params.nat_traversal = Some(config.clone());
4733 self.config = Arc::new({
4734 let mut transport_config = (*self.config).clone();
4735 transport_config.nat_traversal_config = Some(config);
4736 transport_config
4737 });
4738
4739 self.nat_traversal = Some(NatTraversalState::new(role, 8, Duration::from_secs(10)));
4740 }
4741
4742 fn derive_peer_id_from_connection(&self) -> [u8; 32] {
4745 let mut hasher = std::collections::hash_map::DefaultHasher::new();
4747 use std::hash::Hasher;
4748 hasher.write(&self.rem_handshake_cid);
4749 hasher.write(&self.handshake_cid);
4750 hasher.write(&self.path.remote.to_string().into_bytes());
4751 let hash = hasher.finish();
4752 let mut peer_id = [0u8; 32];
4753 peer_id[..8].copy_from_slice(&hash.to_be_bytes());
4754 let cid_bytes = self.rem_handshake_cid.as_ref();
4756 let copy_len = (cid_bytes.len()).min(24);
4757 peer_id[8..8 + copy_len].copy_from_slice(&cid_bytes[..copy_len]);
4758 peer_id
4759 }
4760
4761 fn handle_add_address(
4763 &mut self,
4764 add_address: &crate::frame::AddAddress,
4765 now: Instant,
4766 ) -> Result<(), TransportError> {
4767 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4768 TransportError::PROTOCOL_VIOLATION("AddAddress frame without NAT traversal negotiation")
4769 })?;
4770
4771 match nat_state.add_remote_candidate(
4772 add_address.sequence,
4773 add_address.address,
4774 add_address.priority,
4775 now,
4776 ) {
4777 Ok(()) => {
4778 trace!(
4779 "Added remote candidate: {} (seq={}, priority={})",
4780 add_address.address, add_address.sequence, add_address.priority
4781 );
4782
4783 self.trigger_candidate_validation(add_address.address, now)?;
4785 Ok(())
4786 }
4787 Err(NatTraversalError::TooManyCandidates) => Err(TransportError::PROTOCOL_VIOLATION(
4788 "too many NAT traversal candidates",
4789 )),
4790 Err(NatTraversalError::DuplicateAddress) => {
4791 Ok(())
4793 }
4794 Err(e) => {
4795 warn!("Failed to add remote candidate: {}", e);
4796 Ok(()) }
4798 }
4799 }
4800
4801 fn handle_punch_me_now(
4803 &mut self,
4804 punch_me_now: &crate::frame::PunchMeNow,
4805 now: Instant,
4806 ) -> Result<(), TransportError> {
4807 trace!(
4808 "Received PunchMeNow: round={}, target_seq={}, local_addr={}",
4809 punch_me_now.round, punch_me_now.paired_with_sequence_number, punch_me_now.address
4810 );
4811
4812 if let Some(nat_state) = &self.nat_traversal {
4814 if matches!(nat_state.role, NatTraversalRole::Bootstrap) {
4815 let from_peer_id = self.derive_peer_id_from_connection();
4817
4818 let punch_me_now_clone = punch_me_now.clone();
4820 drop(nat_state); match self
4823 .nat_traversal
4824 .as_mut()
4825 .unwrap()
4826 .handle_punch_me_now_frame(
4827 from_peer_id,
4828 self.path.remote,
4829 &punch_me_now_clone,
4830 now,
4831 ) {
4832 Ok(Some(coordination_frame)) => {
4833 trace!("Bootstrap node coordinating PUNCH_ME_NOW between peers");
4834
4835 if let Some(target_peer_id) = punch_me_now.target_peer_id {
4837 self.endpoint_events.push_back(
4838 crate::shared::EndpointEventInner::RelayPunchMeNow(
4839 target_peer_id,
4840 coordination_frame,
4841 ),
4842 );
4843 }
4844
4845 return Ok(());
4846 }
4847 Ok(None) => {
4848 trace!("Bootstrap coordination completed or no action needed");
4849 return Ok(());
4850 }
4851 Err(e) => {
4852 warn!("Bootstrap coordination failed: {}", e);
4853 return Ok(());
4854 }
4855 }
4856 }
4857 }
4858
4859 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4861 TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation")
4862 })?;
4863
4864 if nat_state
4866 .handle_peer_punch_request(punch_me_now.round, now)
4867 .map_err(|_e| {
4868 TransportError::PROTOCOL_VIOLATION("Failed to handle peer punch request")
4869 })?
4870 {
4871 trace!("Coordination synchronized for round {}", punch_me_now.round);
4872
4873 let _local_addr = self
4876 .local_ip
4877 .map(|ip| SocketAddr::new(ip, 0))
4878 .unwrap_or_else(|| {
4879 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
4880 });
4881
4882 let target = nat_traversal::PunchTarget {
4883 remote_addr: punch_me_now.address,
4884 remote_sequence: punch_me_now.paired_with_sequence_number,
4885 challenge: self.rng.r#gen(),
4886 };
4887
4888 let _ = nat_state.start_coordination_round(vec![target], now);
4890 } else {
4891 debug!(
4892 "Failed to synchronize coordination for round {}",
4893 punch_me_now.round
4894 );
4895 }
4896
4897 Ok(())
4898 }
4899
4900 fn handle_remove_address(
4902 &mut self,
4903 remove_address: &crate::frame::RemoveAddress,
4904 ) -> Result<(), TransportError> {
4905 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4906 TransportError::PROTOCOL_VIOLATION(
4907 "RemoveAddress frame without NAT traversal negotiation",
4908 )
4909 })?;
4910
4911 if nat_state.remove_candidate(remove_address.sequence) {
4912 trace!(
4913 "Removed candidate with sequence {}",
4914 remove_address.sequence
4915 );
4916 } else {
4917 trace!(
4918 "Attempted to remove unknown candidate sequence {}",
4919 remove_address.sequence
4920 );
4921 }
4922
4923 Ok(())
4924 }
4925
4926 fn handle_observed_address_frame(
4928 &mut self,
4929 observed_address: &crate::frame::ObservedAddress,
4930 now: Instant,
4931 ) -> Result<(), TransportError> {
4932 let state = self.address_discovery_state.as_mut().ok_or_else(|| {
4934 TransportError::PROTOCOL_VIOLATION(
4935 "ObservedAddress frame without address discovery negotiation",
4936 )
4937 })?;
4938
4939 if !state.enabled {
4941 return Err(TransportError::PROTOCOL_VIOLATION(
4942 "ObservedAddress frame received when address discovery is disabled",
4943 ));
4944 }
4945
4946 #[cfg(feature = "trace")]
4948 {
4949 use crate::trace_observed_address_received;
4950 trace_observed_address_received!(
4952 &self.event_log,
4953 self.trace_context.trace_id(),
4954 observed_address.address,
4955 0u64 );
4957 }
4958
4959 let path_id = 0u64; if let Some(&last_seq) = state.last_received_sequence.get(&path_id) {
4967 if observed_address.sequence_number <= last_seq {
4968 trace!(
4969 "Ignoring OBSERVED_ADDRESS frame with stale sequence number {} (last was {})",
4970 observed_address.sequence_number, last_seq
4971 );
4972 return Ok(());
4973 }
4974 }
4975
4976 state
4978 .last_received_sequence
4979 .insert(path_id, observed_address.sequence_number);
4980
4981 state.handle_observed_address(observed_address.address, path_id, now);
4983
4984 self.path
4986 .update_observed_address(observed_address.address, now);
4987
4988 trace!(
4990 "Received ObservedAddress frame: address={} for path={}",
4991 observed_address.address, path_id
4992 );
4993
4994 Ok(())
4995 }
4996
4997 pub fn queue_add_address(&mut self, sequence: VarInt, address: SocketAddr, priority: VarInt) {
4999 let add_address = frame::AddAddress {
5001 sequence,
5002 address,
5003 priority,
5004 };
5005
5006 self.spaces[SpaceId::Data]
5007 .pending
5008 .add_addresses
5009 .push(add_address);
5010 trace!(
5011 "Queued AddAddress frame: seq={}, addr={}, priority={}",
5012 sequence, address, priority
5013 );
5014 }
5015
5016 pub fn queue_punch_me_now(
5018 &mut self,
5019 round: VarInt,
5020 paired_with_sequence_number: VarInt,
5021 address: SocketAddr,
5022 ) {
5023 let punch_me_now = frame::PunchMeNow {
5024 round,
5025 paired_with_sequence_number,
5026 address,
5027 target_peer_id: None, };
5029
5030 self.spaces[SpaceId::Data]
5031 .pending
5032 .punch_me_now
5033 .push(punch_me_now);
5034 trace!(
5035 "Queued PunchMeNow frame: round={}, target={}",
5036 round, paired_with_sequence_number
5037 );
5038 }
5039
5040 pub fn queue_remove_address(&mut self, sequence: VarInt) {
5042 let remove_address = frame::RemoveAddress { sequence };
5043
5044 self.spaces[SpaceId::Data]
5045 .pending
5046 .remove_addresses
5047 .push(remove_address);
5048 trace!("Queued RemoveAddress frame: seq={}", sequence);
5049 }
5050
5051 pub fn queue_observed_address(&mut self, address: SocketAddr) {
5053 let sequence_number = if let Some(state) = &mut self.address_discovery_state {
5055 let seq = state.next_sequence_number;
5056 state.next_sequence_number =
5057 VarInt::from_u64(state.next_sequence_number.into_inner() + 1)
5058 .expect("sequence number overflow");
5059 seq
5060 } else {
5061 VarInt::from_u32(0)
5063 };
5064
5065 let observed_address = frame::ObservedAddress {
5066 sequence_number,
5067 address,
5068 };
5069 self.spaces[SpaceId::Data]
5070 .pending
5071 .observed_addresses
5072 .push(observed_address);
5073 trace!("Queued ObservedAddress frame: addr={}", address);
5074 }
5075
5076 pub fn check_for_address_observations(&mut self, now: Instant) {
5078 let Some(state) = &mut self.address_discovery_state else {
5080 return;
5081 };
5082
5083 if !state.enabled {
5085 return;
5086 }
5087
5088 let path_id = 0u64; let remote_address = self.path.remote;
5093
5094 if state.should_send_observation(path_id, now) {
5096 if let Some(frame) = state.queue_observed_address_frame(path_id, remote_address) {
5098 self.spaces[SpaceId::Data]
5100 .pending
5101 .observed_addresses
5102 .push(frame);
5103
5104 state.record_observation_sent(path_id);
5106
5107 #[cfg(feature = "trace")]
5109 {
5110 use crate::trace_observed_address_sent;
5111 trace_observed_address_sent!(
5113 &self.event_log,
5114 self.trace_context.trace_id(),
5115 remote_address,
5116 path_id
5117 );
5118 }
5119
5120 trace!(
5121 "Queued OBSERVED_ADDRESS frame for path {} with address {}",
5122 path_id, remote_address
5123 );
5124 }
5125 }
5126 }
5127
5128 fn trigger_candidate_validation(
5130 &mut self,
5131 candidate_address: SocketAddr,
5132 now: Instant,
5133 ) -> Result<(), TransportError> {
5134 let nat_state = self
5135 .nat_traversal
5136 .as_mut()
5137 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5138
5139 if nat_state
5141 .active_validations
5142 .contains_key(&candidate_address)
5143 {
5144 trace!("Validation already in progress for {}", candidate_address);
5145 return Ok(());
5146 }
5147
5148 let challenge = self.rng.r#gen::<u64>();
5150
5151 let validation_state = nat_traversal::PathValidationState {
5153 challenge,
5154 sent_at: now,
5155 retry_count: 0,
5156 max_retries: 3,
5157 coordination_round: None,
5158 timeout_state: nat_traversal::AdaptiveTimeoutState::new(),
5159 last_retry_at: None,
5160 };
5161
5162 nat_state
5164 .active_validations
5165 .insert(candidate_address, validation_state);
5166
5167 self.nat_traversal_challenges
5169 .push(candidate_address, challenge);
5170
5171 nat_state.stats.validations_succeeded += 1; trace!(
5175 "Triggered PATH_CHALLENGE validation for {} with challenge {:016x}",
5176 candidate_address, challenge
5177 );
5178
5179 Ok(())
5180 }
5181
5182 pub fn nat_traversal_state(&self) -> Option<(NatTraversalRole, usize, usize)> {
5184 self.nat_traversal.as_ref().map(|state| {
5185 (
5186 state.role,
5187 state.local_candidates.len(),
5188 state.remote_candidates.len(),
5189 )
5190 })
5191 }
5192
5193 pub fn initiate_nat_traversal_coordination(
5195 &mut self,
5196 now: Instant,
5197 ) -> Result<(), TransportError> {
5198 let nat_state = self
5199 .nat_traversal
5200 .as_mut()
5201 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5202
5203 if nat_state.should_send_punch_request() {
5205 nat_state.generate_candidate_pairs(now);
5207
5208 let pairs = nat_state.get_next_validation_pairs(3);
5210 if pairs.is_empty() {
5211 return Err(TransportError::PROTOCOL_VIOLATION(
5212 "No candidate pairs for coordination",
5213 ));
5214 }
5215
5216 let targets: Vec<_> = pairs
5218 .into_iter()
5219 .map(|pair| nat_traversal::PunchTarget {
5220 remote_addr: pair.remote_addr,
5221 remote_sequence: pair.remote_sequence,
5222 challenge: self.rng.r#gen(),
5223 })
5224 .collect();
5225
5226 let round = nat_state
5228 .start_coordination_round(targets, now)
5229 .map_err(|_e| {
5230 TransportError::PROTOCOL_VIOLATION("Failed to start coordination round")
5231 })?;
5232
5233 let local_addr = self
5236 .local_ip
5237 .map(|ip| SocketAddr::new(ip, self.local_ip.map(|_| 0).unwrap_or(0)))
5238 .unwrap_or_else(|| {
5239 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
5240 });
5241
5242 let punch_me_now = frame::PunchMeNow {
5243 round,
5244 paired_with_sequence_number: VarInt::from_u32(0), address: local_addr,
5246 target_peer_id: None, };
5248
5249 self.spaces[SpaceId::Data]
5250 .pending
5251 .punch_me_now
5252 .push(punch_me_now);
5253 nat_state.mark_punch_request_sent();
5254
5255 trace!("Initiated NAT traversal coordination round {}", round);
5256 }
5257
5258 Ok(())
5259 }
5260
5261 pub fn validate_nat_candidates(&mut self, now: Instant) {
5263 self.generate_nat_traversal_challenges(now);
5264 }
5265
5266 pub fn send_nat_address_advertisement(
5281 &mut self,
5282 address: SocketAddr,
5283 priority: u32,
5284 ) -> Result<u64, ConnectionError> {
5285 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5287 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5288 "NAT traversal not enabled on this connection",
5289 ))
5290 })?;
5291
5292 let sequence = nat_state.next_sequence;
5294 nat_state.next_sequence =
5295 VarInt::from_u64(nat_state.next_sequence.into_inner() + 1).unwrap();
5296
5297 let now = Instant::now();
5299 nat_state.local_candidates.insert(
5300 sequence,
5301 nat_traversal::AddressCandidate {
5302 address,
5303 priority,
5304 source: nat_traversal::CandidateSource::Local,
5305 discovered_at: now,
5306 state: nat_traversal::CandidateState::New,
5307 attempt_count: 0,
5308 last_attempt: None,
5309 },
5310 );
5311
5312 nat_state.stats.local_candidates_sent += 1;
5314
5315 self.queue_add_address(sequence, address, VarInt::from_u32(priority));
5317
5318 debug!(
5319 "Queued ADD_ADDRESS frame: addr={}, priority={}, seq={}",
5320 address, priority, sequence
5321 );
5322 Ok(sequence.into_inner())
5323 }
5324
5325 pub fn send_nat_punch_coordination(
5338 &mut self,
5339 paired_with_sequence_number: u64,
5340 address: SocketAddr,
5341 round: u32,
5342 ) -> Result<(), ConnectionError> {
5343 let _nat_state = self.nat_traversal.as_ref().ok_or_else(|| {
5345 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5346 "NAT traversal not enabled on this connection",
5347 ))
5348 })?;
5349
5350 self.queue_punch_me_now(
5352 VarInt::from_u32(round),
5353 VarInt::from_u64(paired_with_sequence_number).map_err(|_| {
5354 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5355 "Invalid target sequence number",
5356 ))
5357 })?,
5358 address,
5359 );
5360
5361 debug!(
5362 "Queued PUNCH_ME_NOW frame: paired_with_seq={}, addr={}, round={}",
5363 paired_with_sequence_number, address, round
5364 );
5365 Ok(())
5366 }
5367
5368 pub fn send_nat_address_removal(&mut self, sequence: u64) -> Result<(), ConnectionError> {
5379 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5381 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5382 "NAT traversal not enabled on this connection",
5383 ))
5384 })?;
5385
5386 let sequence_varint = VarInt::from_u64(sequence).map_err(|_| {
5387 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5388 "Invalid sequence number",
5389 ))
5390 })?;
5391
5392 nat_state.local_candidates.remove(&sequence_varint);
5394
5395 self.queue_remove_address(sequence_varint);
5397
5398 debug!("Queued REMOVE_ADDRESS frame: seq={}", sequence);
5399 Ok(())
5400 }
5401
5402 #[allow(dead_code)]
5411 pub(crate) fn get_nat_traversal_stats(&self) -> Option<&nat_traversal::NatTraversalStats> {
5412 self.nat_traversal.as_ref().map(|state| &state.stats)
5413 }
5414
5415 pub fn is_nat_traversal_enabled(&self) -> bool {
5417 self.nat_traversal.is_some()
5418 }
5419
5420 pub fn get_nat_traversal_role(&self) -> Option<NatTraversalRole> {
5422 self.nat_traversal.as_ref().map(|state| state.role)
5423 }
5424
5425 fn negotiate_address_discovery(&mut self, peer_params: &TransportParameters) {
5427 let now = Instant::now();
5428
5429 match &peer_params.address_discovery {
5431 Some(peer_config) => {
5432 if let Some(state) = &mut self.address_discovery_state {
5434 if state.enabled {
5435 debug!(
5438 "Address discovery negotiated: rate={}, all_paths={}",
5439 state.max_observation_rate, state.observe_all_paths
5440 );
5441 } else {
5442 debug!("Address discovery disabled locally, ignoring peer support");
5444 }
5445 } else {
5446 self.address_discovery_state =
5448 Some(AddressDiscoveryState::new(peer_config, now));
5449 debug!("Address discovery initialized from peer config");
5450 }
5451 }
5452 _ => {
5453 if let Some(state) = &mut self.address_discovery_state {
5455 state.enabled = false;
5456 debug!("Address discovery disabled - peer doesn't support it");
5457 }
5458 }
5459 }
5460
5461 if let Some(state) = &self.address_discovery_state {
5463 if state.enabled {
5464 self.path.set_observation_rate(state.max_observation_rate);
5465 }
5466 }
5467 }
5468
5469 fn decrypt_packet(
5470 &mut self,
5471 now: Instant,
5472 packet: &mut Packet,
5473 ) -> Result<Option<u64>, Option<TransportError>> {
5474 let result = packet_crypto::decrypt_packet_body(
5475 packet,
5476 &self.spaces,
5477 self.zero_rtt_crypto.as_ref(),
5478 self.key_phase,
5479 self.prev_crypto.as_ref(),
5480 self.next_crypto.as_ref(),
5481 )?;
5482
5483 let result = match result {
5484 Some(r) => r,
5485 None => return Ok(None),
5486 };
5487
5488 if result.outgoing_key_update_acked {
5489 if let Some(prev) = self.prev_crypto.as_mut() {
5490 prev.end_packet = Some((result.number, now));
5491 self.set_key_discard_timer(now, packet.header.space());
5492 }
5493 }
5494
5495 if result.incoming_key_update {
5496 trace!("key update authenticated");
5497 self.update_keys(Some((result.number, now)), true);
5498 self.set_key_discard_timer(now, packet.header.space());
5499 }
5500
5501 Ok(Some(result.number))
5502 }
5503
5504 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5505 trace!("executing key update");
5506 let new = self
5510 .crypto
5511 .next_1rtt_keys()
5512 .expect("only called for `Data` packets");
5513 self.key_phase_size = new
5514 .local
5515 .confidentiality_limit()
5516 .saturating_sub(KEY_UPDATE_MARGIN);
5517 let old = mem::replace(
5518 &mut self.spaces[SpaceId::Data]
5519 .crypto
5520 .as_mut()
5521 .unwrap() .packet,
5523 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5524 );
5525 self.spaces[SpaceId::Data].sent_with_keys = 0;
5526 self.prev_crypto = Some(PrevCrypto {
5527 crypto: old,
5528 end_packet,
5529 update_unacked: remote,
5530 });
5531 self.key_phase = !self.key_phase;
5532 }
5533
5534 fn peer_supports_ack_frequency(&self) -> bool {
5535 self.peer_params.min_ack_delay.is_some()
5536 }
5537
5538 pub(crate) fn immediate_ack(&mut self) {
5543 self.spaces[self.highest_space].immediate_ack_pending = true;
5544 }
5545
5546 #[cfg(test)]
5548 #[allow(dead_code)]
5549 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5550 let (first_decode, remaining) = match &event.0 {
5551 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5552 first_decode,
5553 remaining,
5554 ..
5555 }) => (first_decode, remaining),
5556 _ => return None,
5557 };
5558
5559 if remaining.is_some() {
5560 panic!("Packets should never be coalesced in tests");
5561 }
5562
5563 let decrypted_header = packet_crypto::unprotect_header(
5564 first_decode.clone(),
5565 &self.spaces,
5566 self.zero_rtt_crypto.as_ref(),
5567 self.peer_params.stateless_reset_token,
5568 )?;
5569
5570 let mut packet = decrypted_header.packet?;
5571 packet_crypto::decrypt_packet_body(
5572 &mut packet,
5573 &self.spaces,
5574 self.zero_rtt_crypto.as_ref(),
5575 self.key_phase,
5576 self.prev_crypto.as_ref(),
5577 self.next_crypto.as_ref(),
5578 )
5579 .ok()?;
5580
5581 Some(packet.payload.to_vec())
5582 }
5583
5584 #[cfg(test)]
5587 #[allow(dead_code)]
5588 pub(crate) fn bytes_in_flight(&self) -> u64 {
5589 self.path.in_flight.bytes
5590 }
5591
5592 #[cfg(test)]
5594 #[allow(dead_code)]
5595 pub(crate) fn congestion_window(&self) -> u64 {
5596 self.path
5597 .congestion
5598 .window()
5599 .saturating_sub(self.path.in_flight.bytes)
5600 }
5601
5602 #[cfg(test)]
5604 #[allow(dead_code)]
5605 pub(crate) fn is_idle(&self) -> bool {
5606 Timer::VALUES
5607 .iter()
5608 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
5609 .filter_map(|&t| Some((t, self.timers.get(t)?)))
5610 .min_by_key(|&(_, time)| time)
5611 .is_none_or(|(timer, _)| timer == Timer::Idle)
5612 }
5613
5614 #[cfg(test)]
5616 #[allow(dead_code)]
5617 pub(crate) fn lost_packets(&self) -> u64 {
5618 self.lost_packets
5619 }
5620
5621 #[cfg(test)]
5623 #[allow(dead_code)]
5624 pub(crate) fn using_ecn(&self) -> bool {
5625 self.path.sending_ecn
5626 }
5627
5628 #[cfg(test)]
5630 #[allow(dead_code)]
5631 pub(crate) fn total_recvd(&self) -> u64 {
5632 self.path.total_recvd
5633 }
5634
5635 #[cfg(test)]
5636 #[allow(dead_code)]
5637 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
5638 self.local_cid_state.active_seq()
5639 }
5640
5641 #[cfg(test)]
5644 #[allow(dead_code)]
5645 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
5646 let n = self.local_cid_state.assign_retire_seq(v);
5647 self.endpoint_events
5648 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
5649 }
5650
5651 #[cfg(test)]
5653 #[allow(dead_code)]
5654 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
5655 self.rem_cids.active_seq()
5656 }
5657
5658 #[cfg(test)]
5660 #[cfg(test)]
5661 #[allow(dead_code)]
5662 pub(crate) fn path_mtu(&self) -> u16 {
5663 self.path.current_mtu()
5664 }
5665
5666 fn can_send_1rtt(&self, max_size: usize) -> bool {
5670 self.streams.can_send_stream_data()
5671 || self.path.challenge_pending
5672 || self
5673 .prev_path
5674 .as_ref()
5675 .is_some_and(|(_, x)| x.challenge_pending)
5676 || !self.path_responses.is_empty()
5677 || !self.nat_traversal_challenges.is_empty()
5678 || self
5679 .datagrams
5680 .outgoing
5681 .front()
5682 .is_some_and(|x| x.size(true) <= max_size)
5683 }
5684
5685 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
5687 for path in [&mut self.path]
5689 .into_iter()
5690 .chain(self.prev_path.as_mut().map(|(_, data)| data))
5691 {
5692 if path.remove_in_flight(pn, packet) {
5693 return;
5694 }
5695 }
5696 }
5697
5698 fn kill(&mut self, reason: ConnectionError) {
5700 self.close_common();
5701 self.error = Some(reason);
5702 self.state = State::Drained;
5703 self.endpoint_events.push_back(EndpointEventInner::Drained);
5704 }
5705
5706 fn generate_nat_traversal_challenges(&mut self, now: Instant) {
5708 let candidates: Vec<(VarInt, SocketAddr)> = if let Some(nat_state) = &self.nat_traversal {
5710 nat_state
5711 .get_validation_candidates()
5712 .into_iter()
5713 .take(3) .map(|(seq, candidate)| (seq, candidate.address))
5715 .collect()
5716 } else {
5717 return;
5718 };
5719
5720 if candidates.is_empty() {
5721 return;
5722 }
5723
5724 if let Some(nat_state) = &mut self.nat_traversal {
5726 for (seq, address) in candidates {
5727 let challenge: u64 = self.rng.r#gen();
5729
5730 if let Err(e) = nat_state.start_validation(seq, challenge, now) {
5732 debug!("Failed to start validation for candidate {}: {}", seq, e);
5733 continue;
5734 }
5735
5736 self.nat_traversal_challenges.push(address, challenge);
5738 trace!(
5739 "Queuing NAT validation PATH_CHALLENGE for {} with token {:08x}",
5740 address, challenge
5741 );
5742 }
5743 }
5744 }
5745
5746 pub fn current_mtu(&self) -> u16 {
5750 self.path.current_mtu()
5751 }
5752
5753 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
5760 let pn_len = match pn {
5761 Some(pn) => PacketNumber::new(
5762 pn,
5763 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
5764 )
5765 .len(),
5766 None => 4,
5768 };
5769
5770 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
5772 }
5773
5774 fn tag_len_1rtt(&self) -> usize {
5775 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
5776 Some(crypto) => Some(&*crypto.packet.local),
5777 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
5778 };
5779 key.map_or(16, |x| x.tag_len())
5783 }
5784
5785 fn on_path_validated(&mut self) {
5787 self.path.validated = true;
5788 let ConnectionSide::Server { server_config } = &self.side else {
5789 return;
5790 };
5791 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
5792 new_tokens.clear();
5793 for _ in 0..server_config.validation_token.sent {
5794 new_tokens.push(self.path.remote);
5795 }
5796 }
5797}
5798
5799impl fmt::Debug for Connection {
5800 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5801 f.debug_struct("Connection")
5802 .field("handshake_cid", &self.handshake_cid)
5803 .finish()
5804 }
5805}
5806
5807enum ConnectionSide {
5809 Client {
5810 token: Bytes,
5812 token_store: Arc<dyn TokenStore>,
5813 server_name: String,
5814 },
5815 Server {
5816 server_config: Arc<ServerConfig>,
5817 },
5818}
5819
5820impl ConnectionSide {
5821 fn remote_may_migrate(&self) -> bool {
5822 match self {
5823 Self::Server { server_config } => server_config.migration,
5824 Self::Client { .. } => false,
5825 }
5826 }
5827
5828 fn is_client(&self) -> bool {
5829 self.side().is_client()
5830 }
5831
5832 fn is_server(&self) -> bool {
5833 self.side().is_server()
5834 }
5835
5836 fn side(&self) -> Side {
5837 match *self {
5838 Self::Client { .. } => Side::Client,
5839 Self::Server { .. } => Side::Server,
5840 }
5841 }
5842}
5843
5844impl From<SideArgs> for ConnectionSide {
5845 fn from(side: SideArgs) -> Self {
5846 match side {
5847 SideArgs::Client {
5848 token_store,
5849 server_name,
5850 } => Self::Client {
5851 token: token_store.take(&server_name).unwrap_or_default(),
5852 token_store,
5853 server_name,
5854 },
5855 SideArgs::Server {
5856 server_config,
5857 pref_addr_cid: _,
5858 path_validated: _,
5859 } => Self::Server { server_config },
5860 }
5861 }
5862}
5863
5864pub(crate) enum SideArgs {
5866 Client {
5867 token_store: Arc<dyn TokenStore>,
5868 server_name: String,
5869 },
5870 Server {
5871 server_config: Arc<ServerConfig>,
5872 pref_addr_cid: Option<ConnectionId>,
5873 path_validated: bool,
5874 },
5875}
5876
5877impl SideArgs {
5878 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
5879 match *self {
5880 Self::Client { .. } => None,
5881 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
5882 }
5883 }
5884
5885 pub(crate) fn path_validated(&self) -> bool {
5886 match *self {
5887 Self::Client { .. } => true,
5888 Self::Server { path_validated, .. } => path_validated,
5889 }
5890 }
5891
5892 pub(crate) fn side(&self) -> Side {
5893 match *self {
5894 Self::Client { .. } => Side::Client,
5895 Self::Server { .. } => Side::Server,
5896 }
5897 }
5898}
5899
5900#[derive(Debug, Error, Clone, PartialEq, Eq)]
5902pub enum ConnectionError {
5903 #[error("peer doesn't implement any supported version")]
5905 VersionMismatch,
5906 #[error(transparent)]
5908 TransportError(#[from] TransportError),
5909 #[error("aborted by peer: {0}")]
5911 ConnectionClosed(frame::ConnectionClose),
5912 #[error("closed by peer: {0}")]
5914 ApplicationClosed(frame::ApplicationClose),
5915 #[error("reset by peer")]
5917 Reset,
5918 #[error("timed out")]
5924 TimedOut,
5925 #[error("closed")]
5927 LocallyClosed,
5928 #[error("CIDs exhausted")]
5932 CidsExhausted,
5933}
5934
5935impl From<Close> for ConnectionError {
5936 fn from(x: Close) -> Self {
5937 match x {
5938 Close::Connection(reason) => Self::ConnectionClosed(reason),
5939 Close::Application(reason) => Self::ApplicationClosed(reason),
5940 }
5941 }
5942}
5943
5944impl From<ConnectionError> for io::Error {
5946 fn from(x: ConnectionError) -> Self {
5947 use ConnectionError::*;
5948 let kind = match x {
5949 TimedOut => io::ErrorKind::TimedOut,
5950 Reset => io::ErrorKind::ConnectionReset,
5951 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
5952 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
5953 io::ErrorKind::Other
5954 }
5955 };
5956 Self::new(kind, x)
5957 }
5958}
5959
5960#[derive(Clone, Debug)]
5961pub enum State {
5963 Handshake(state::Handshake),
5965 Established,
5967 Closed(state::Closed),
5969 Draining,
5971 Drained,
5973}
5974
5975impl State {
5976 fn closed<R: Into<Close>>(reason: R) -> Self {
5977 Self::Closed(state::Closed {
5978 reason: reason.into(),
5979 })
5980 }
5981
5982 fn is_handshake(&self) -> bool {
5983 matches!(*self, Self::Handshake(_))
5984 }
5985
5986 fn is_established(&self) -> bool {
5987 matches!(*self, Self::Established)
5988 }
5989
5990 fn is_closed(&self) -> bool {
5991 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
5992 }
5993
5994 fn is_drained(&self) -> bool {
5995 matches!(*self, Self::Drained)
5996 }
5997}
5998
5999mod state {
6000 use super::*;
6001
6002 #[derive(Clone, Debug)]
6003 pub struct Handshake {
6004 pub(super) rem_cid_set: bool,
6008 pub(super) expected_token: Bytes,
6012 pub(super) client_hello: Option<Bytes>,
6016 }
6017
6018 #[derive(Clone, Debug)]
6019 pub struct Closed {
6020 pub(super) reason: Close,
6021 }
6022}
6023
6024#[derive(Debug)]
6026pub enum Event {
6027 HandshakeDataReady,
6029 Connected,
6031 ConnectionLost {
6035 reason: ConnectionError,
6037 },
6038 Stream(StreamEvent),
6040 DatagramReceived,
6042 DatagramsUnblocked,
6044}
6045
6046fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
6047 if x > y { x - y } else { Duration::ZERO }
6048}
6049
6050fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6051 Duration::from_micros(params.max_ack_delay.0 * 1000)
6052}
6053
6054const MAX_BACKOFF_EXPONENT: u32 = 16;
6056
6057const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6065
6066const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6072 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6073
6074const KEY_UPDATE_MARGIN: u64 = 10_000;
6078
6079#[derive(Default)]
6080struct SentFrames {
6081 retransmits: ThinRetransmits,
6082 largest_acked: Option<u64>,
6083 stream_frames: StreamMetaVec,
6084 non_retransmits: bool,
6086 requires_padding: bool,
6087}
6088
6089impl SentFrames {
6090 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6092 self.largest_acked.is_some()
6093 && !self.non_retransmits
6094 && self.stream_frames.is_empty()
6095 && self.retransmits.is_empty(streams)
6096 }
6097}
6098
6099fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6107 match (x, y) {
6108 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6109 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6110 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6111 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6112 }
6113}
6114
6115#[derive(Debug, Clone)]
6117pub(crate) struct PqcState {
6118 enabled: bool,
6120 #[allow(dead_code)]
6122 algorithms: Option<crate::transport_parameters::PqcAlgorithms>,
6123 handshake_mtu: u16,
6125 using_pqc: bool,
6127 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler,
6129}
6130
6131#[allow(dead_code)]
6132impl PqcState {
6133 fn new() -> Self {
6134 Self {
6135 enabled: false,
6136 algorithms: None,
6137 handshake_mtu: MIN_INITIAL_SIZE,
6138 using_pqc: false,
6139 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler::new(),
6140 }
6141 }
6142
6143 fn min_initial_size(&self) -> u16 {
6145 if self.enabled && self.using_pqc {
6146 std::cmp::max(self.handshake_mtu, 4096)
6148 } else {
6149 MIN_INITIAL_SIZE
6150 }
6151 }
6152
6153 fn update_from_peer_params(&mut self, params: &TransportParameters) {
6155 if let Some(ref algorithms) = params.pqc_algorithms {
6156 self.enabled = true;
6157 self.algorithms = Some(algorithms.clone());
6158 if algorithms.ml_kem_768
6160 || algorithms.ml_dsa_65
6161 || algorithms.hybrid_x25519_ml_kem
6162 || algorithms.hybrid_ed25519_ml_dsa
6163 {
6164 self.using_pqc = true;
6165 self.handshake_mtu = 4096; }
6167 }
6168 }
6169
6170 fn detect_pqc_from_crypto(&mut self, crypto_data: &[u8], space: SpaceId) {
6172 if self.packet_handler.detect_pqc_handshake(crypto_data, space) {
6173 self.using_pqc = true;
6174 self.handshake_mtu = self.packet_handler.get_min_packet_size(space);
6176 }
6177 }
6178
6179 fn should_trigger_mtu_discovery(&mut self) -> bool {
6181 self.packet_handler.should_trigger_mtu_discovery()
6182 }
6183
6184 fn get_mtu_config(&self) -> MtuDiscoveryConfig {
6186 self.packet_handler.get_pqc_mtu_config()
6187 }
6188
6189 fn calculate_crypto_frame_size(&self, available_space: usize, remaining_data: usize) -> usize {
6191 self.packet_handler
6192 .calculate_crypto_frame_size(available_space, remaining_data)
6193 }
6194
6195 fn should_adjust_coalescing(&self, current_size: usize, space: SpaceId) -> bool {
6197 self.packet_handler
6198 .adjust_coalescing_for_pqc(current_size, space)
6199 }
6200
6201 fn on_packet_sent(&mut self, space: SpaceId, size: u16) {
6203 self.packet_handler.on_packet_sent(space, size);
6204 }
6205
6206 fn reset(&mut self) {
6208 self.enabled = false;
6209 self.algorithms = None;
6210 self.handshake_mtu = MIN_INITIAL_SIZE;
6211 self.using_pqc = false;
6212 self.packet_handler.reset();
6213 }
6214}
6215
6216#[cfg(feature = "pqc")]
6217impl Default for PqcState {
6218 fn default() -> Self {
6219 Self::new()
6220 }
6221}
6222
6223#[derive(Debug, Clone)]
6225pub(crate) struct AddressDiscoveryState {
6226 enabled: bool,
6228 max_observation_rate: u8,
6230 observe_all_paths: bool,
6232 path_addresses: std::collections::HashMap<u64, paths::PathAddressInfo>,
6234 rate_limiter: AddressObservationRateLimiter,
6236 observed_addresses: Vec<ObservedAddressEvent>,
6238 bootstrap_mode: bool,
6240 next_sequence_number: VarInt,
6242 last_received_sequence: std::collections::HashMap<u64, VarInt>,
6244}
6245
6246#[derive(Debug, Clone, PartialEq, Eq)]
6248struct ObservedAddressEvent {
6249 address: SocketAddr,
6251 received_at: Instant,
6253 path_id: u64,
6255}
6256
6257#[derive(Debug, Clone)]
6259struct AddressObservationRateLimiter {
6260 tokens: f64,
6262 max_tokens: f64,
6264 rate: f64,
6266 last_update: Instant,
6268}
6269
6270#[allow(dead_code)]
6271impl AddressDiscoveryState {
6272 fn new(config: &crate::transport_parameters::AddressDiscoveryConfig, now: Instant) -> Self {
6274 use crate::transport_parameters::AddressDiscoveryConfig::*;
6275
6276 let (enabled, _can_send, _can_receive) = match config {
6278 SendOnly => (true, true, false),
6279 ReceiveOnly => (true, false, true),
6280 SendAndReceive => (true, true, true),
6281 };
6282
6283 let max_observation_rate = 10u8; let observe_all_paths = false; Self {
6289 enabled,
6290 max_observation_rate,
6291 observe_all_paths,
6292 path_addresses: std::collections::HashMap::new(),
6293 rate_limiter: AddressObservationRateLimiter::new(max_observation_rate, now),
6294 observed_addresses: Vec::new(),
6295 bootstrap_mode: false,
6296 next_sequence_number: VarInt::from_u32(0),
6297 last_received_sequence: std::collections::HashMap::new(),
6298 }
6299 }
6300
6301 fn should_send_observation(&mut self, path_id: u64, now: Instant) -> bool {
6303 if !self.should_observe_path(path_id) {
6305 return false;
6306 }
6307
6308 let needs_observation = match self.path_addresses.get(&path_id) {
6310 Some(info) => info.observed_address.is_none() || !info.notified,
6311 None => true,
6312 };
6313
6314 if !needs_observation {
6315 return false;
6316 }
6317
6318 self.rate_limiter.try_consume(1.0, now)
6320 }
6321
6322 fn record_observation_sent(&mut self, path_id: u64) {
6324 if let Some(info) = self.path_addresses.get_mut(&path_id) {
6325 info.mark_notified();
6326 }
6327 }
6328
6329 fn handle_observed_address(&mut self, address: SocketAddr, path_id: u64, now: Instant) {
6331 if !self.enabled {
6332 return;
6333 }
6334
6335 self.observed_addresses.push(ObservedAddressEvent {
6336 address,
6337 received_at: now,
6338 path_id,
6339 });
6340
6341 let info = self
6343 .path_addresses
6344 .entry(path_id)
6345 .or_insert_with(paths::PathAddressInfo::new);
6346 info.update_observed_address(address, now);
6347 }
6348
6349 pub(crate) fn get_observed_address(&self, path_id: u64) -> Option<SocketAddr> {
6351 self.path_addresses
6352 .get(&path_id)
6353 .and_then(|info| info.observed_address)
6354 }
6355
6356 pub(crate) fn get_all_observed_addresses(&self) -> Vec<SocketAddr> {
6358 self.path_addresses
6359 .values()
6360 .filter_map(|info| info.observed_address)
6361 .collect()
6362 }
6363
6364 pub(crate) fn stats(&self) -> AddressDiscoveryStats {
6366 AddressDiscoveryStats {
6367 frames_sent: self.observed_addresses.len() as u64, frames_received: self.observed_addresses.len() as u64,
6369 addresses_discovered: self
6370 .path_addresses
6371 .values()
6372 .filter(|info| info.observed_address.is_some())
6373 .count() as u64,
6374 address_changes_detected: 0, }
6376 }
6377
6378 fn has_unnotified_changes(&self) -> bool {
6380 self.path_addresses
6381 .values()
6382 .any(|info| info.observed_address.is_some() && !info.notified)
6383 }
6384
6385 fn queue_observed_address_frame(
6387 &mut self,
6388 path_id: u64,
6389 address: SocketAddr,
6390 ) -> Option<frame::ObservedAddress> {
6391 if !self.enabled {
6393 return None;
6394 }
6395
6396 if !self.observe_all_paths && path_id != 0 {
6398 return None;
6399 }
6400
6401 if let Some(info) = self.path_addresses.get(&path_id) {
6403 if info.notified {
6404 return None;
6405 }
6406 }
6407
6408 if self.rate_limiter.tokens < 1.0 {
6410 return None;
6411 }
6412
6413 self.rate_limiter.tokens -= 1.0;
6415
6416 let info = self
6418 .path_addresses
6419 .entry(path_id)
6420 .or_insert_with(paths::PathAddressInfo::new);
6421 info.observed_address = Some(address);
6422 info.notified = true;
6423
6424 let sequence_number = self.next_sequence_number;
6426 self.next_sequence_number = VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6427 .expect("sequence number overflow");
6428
6429 Some(frame::ObservedAddress {
6430 sequence_number,
6431 address,
6432 })
6433 }
6434
6435 fn check_for_address_observations(
6437 &mut self,
6438 _current_path: u64,
6439 peer_supports_address_discovery: bool,
6440 now: Instant,
6441 ) -> Vec<frame::ObservedAddress> {
6442 let mut frames = Vec::new();
6443
6444 if !self.enabled || !peer_supports_address_discovery {
6446 return frames;
6447 }
6448
6449 self.rate_limiter.update_tokens(now);
6451
6452 let paths_to_notify: Vec<u64> = self
6454 .path_addresses
6455 .iter()
6456 .filter_map(|(&path_id, info)| {
6457 if info.observed_address.is_some() && !info.notified {
6458 Some(path_id)
6459 } else {
6460 None
6461 }
6462 })
6463 .collect();
6464
6465 for path_id in paths_to_notify {
6467 if !self.should_observe_path(path_id) {
6469 continue;
6470 }
6471
6472 if !self.bootstrap_mode && self.rate_limiter.tokens < 1.0 {
6474 break; }
6476
6477 if let Some(info) = self.path_addresses.get_mut(&path_id) {
6479 if let Some(address) = info.observed_address {
6480 if self.bootstrap_mode {
6482 self.rate_limiter.tokens -= 0.2; } else {
6484 self.rate_limiter.tokens -= 1.0;
6485 }
6486
6487 info.notified = true;
6489
6490 let sequence_number = self.next_sequence_number;
6492 self.next_sequence_number =
6493 VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6494 .expect("sequence number overflow");
6495
6496 frames.push(frame::ObservedAddress {
6497 sequence_number,
6498 address,
6499 });
6500 }
6501 }
6502 }
6503
6504 frames
6505 }
6506
6507 fn update_rate_limit(&mut self, new_rate: f64) {
6509 self.max_observation_rate = new_rate as u8;
6510 self.rate_limiter.set_rate(new_rate as u8);
6511 }
6512
6513 fn from_transport_params(params: &TransportParameters) -> Option<Self> {
6515 params
6516 .address_discovery
6517 .as_ref()
6518 .map(|config| Self::new(config, Instant::now()))
6519 }
6520
6521 #[cfg(test)]
6523 fn new_with_params(enabled: bool, max_rate: f64, observe_all_paths: bool) -> Self {
6524 if !enabled {
6526 return Self {
6528 enabled: false,
6529 max_observation_rate: max_rate as u8,
6530 observe_all_paths,
6531 path_addresses: std::collections::HashMap::new(),
6532 rate_limiter: AddressObservationRateLimiter::new(max_rate as u8, Instant::now()),
6533 observed_addresses: Vec::new(),
6534 bootstrap_mode: false,
6535 next_sequence_number: VarInt::from_u32(0),
6536 last_received_sequence: std::collections::HashMap::new(),
6537 };
6538 }
6539
6540 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6542 let mut state = Self::new(&config, Instant::now());
6543 state.max_observation_rate = max_rate as u8;
6544 state.observe_all_paths = observe_all_paths;
6545 state.rate_limiter = AddressObservationRateLimiter::new(max_rate as u8, Instant::now());
6546 state
6547 }
6548
6549 fn set_bootstrap_mode(&mut self, enabled: bool) {
6551 self.bootstrap_mode = enabled;
6552 if enabled {
6554 let bootstrap_rate = self.get_effective_rate_limit();
6555 self.rate_limiter.rate = bootstrap_rate;
6556 self.rate_limiter.max_tokens = bootstrap_rate * 2.0; self.rate_limiter.tokens = self.rate_limiter.max_tokens;
6559 }
6560 }
6561
6562 fn is_bootstrap_mode(&self) -> bool {
6564 self.bootstrap_mode
6565 }
6566
6567 fn get_effective_rate_limit(&self) -> f64 {
6569 if self.bootstrap_mode {
6570 (self.max_observation_rate as f64) * 5.0
6572 } else {
6573 self.max_observation_rate as f64
6574 }
6575 }
6576
6577 fn should_observe_path(&self, path_id: u64) -> bool {
6579 if !self.enabled {
6580 return false;
6581 }
6582
6583 if self.bootstrap_mode {
6585 return true;
6586 }
6587
6588 self.observe_all_paths || path_id == 0
6590 }
6591
6592 fn should_send_observation_immediately(&self, is_new_connection: bool) -> bool {
6594 self.bootstrap_mode && is_new_connection
6595 }
6596}
6597
6598#[allow(dead_code)]
6599impl AddressObservationRateLimiter {
6600 fn new(rate: u8, now: Instant) -> Self {
6602 let rate_f64 = rate as f64;
6603 Self {
6604 tokens: rate_f64,
6605 max_tokens: rate_f64,
6606 rate: rate_f64,
6607 last_update: now,
6608 }
6609 }
6610
6611 fn try_consume(&mut self, tokens: f64, now: Instant) -> bool {
6613 self.update_tokens(now);
6614
6615 if self.tokens >= tokens {
6616 self.tokens -= tokens;
6617 true
6618 } else {
6619 false
6620 }
6621 }
6622
6623 fn update_tokens(&mut self, now: Instant) {
6625 let elapsed = now.saturating_duration_since(self.last_update);
6626 let new_tokens = elapsed.as_secs_f64() * self.rate;
6627 self.tokens = (self.tokens + new_tokens).min(self.max_tokens);
6628 self.last_update = now;
6629 }
6630
6631 fn set_rate(&mut self, rate: u8) {
6633 let rate_f64 = rate as f64;
6634 self.rate = rate_f64;
6635 self.max_tokens = rate_f64;
6636 if self.tokens > self.max_tokens {
6638 self.tokens = self.max_tokens;
6639 }
6640 }
6641}
6642
6643#[cfg(test)]
6644mod tests {
6645 use super::*;
6646 use crate::transport_parameters::AddressDiscoveryConfig;
6647 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
6648
6649 #[test]
6650 fn address_discovery_state_new() {
6651 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6652 let now = Instant::now();
6653 let state = AddressDiscoveryState::new(&config, now);
6654
6655 assert!(state.enabled);
6656 assert_eq!(state.max_observation_rate, 10);
6657 assert!(!state.observe_all_paths);
6658 assert!(state.path_addresses.is_empty());
6659 assert!(state.observed_addresses.is_empty());
6660 assert_eq!(state.rate_limiter.tokens, 10.0);
6661 }
6662
6663 #[test]
6664 fn address_discovery_state_disabled() {
6665 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6666 let now = Instant::now();
6667 let mut state = AddressDiscoveryState::new(&config, now);
6668
6669 state.enabled = false;
6671
6672 assert!(!state.should_send_observation(0, now));
6674 }
6675
6676 #[test]
6677 fn address_discovery_state_should_send_observation() {
6678 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6679 let now = Instant::now();
6680 let mut state = AddressDiscoveryState::new(&config, now);
6681
6682 assert!(state.should_send_observation(0, now));
6684
6685 let mut path_info = paths::PathAddressInfo::new();
6687 path_info.update_observed_address(
6688 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6689 now,
6690 );
6691 path_info.mark_notified();
6692 state.path_addresses.insert(0, path_info);
6693
6694 assert!(!state.should_send_observation(0, now));
6696
6697 assert!(!state.should_send_observation(1, now));
6699 }
6700
6701 #[test]
6702 fn address_discovery_state_rate_limiting() {
6703 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6704 let now = Instant::now();
6705 let mut state = AddressDiscoveryState::new(&config, now);
6706
6707 state.observe_all_paths = true;
6709
6710 assert!(state.should_send_observation(0, now));
6712
6713 state.rate_limiter.try_consume(9.0, now); assert!(!state.should_send_observation(0, now));
6718
6719 let later = now + Duration::from_secs(1);
6721 state.rate_limiter.update_tokens(later);
6722 assert!(state.should_send_observation(0, later));
6723 }
6724
6725 #[test]
6726 fn address_discovery_state_handle_observed_address() {
6727 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6728 let now = Instant::now();
6729 let mut state = AddressDiscoveryState::new(&config, now);
6730
6731 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6732 let addr2 = SocketAddr::new(
6733 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
6734 8080,
6735 );
6736
6737 state.handle_observed_address(addr1, 0, now);
6739 assert_eq!(state.observed_addresses.len(), 1);
6740 assert_eq!(state.observed_addresses[0].address, addr1);
6741 assert_eq!(state.observed_addresses[0].path_id, 0);
6742
6743 let later = now + Duration::from_millis(100);
6745 state.handle_observed_address(addr2, 1, later);
6746 assert_eq!(state.observed_addresses.len(), 2);
6747 assert_eq!(state.observed_addresses[1].address, addr2);
6748 assert_eq!(state.observed_addresses[1].path_id, 1);
6749 }
6750
6751 #[test]
6752 fn address_discovery_state_get_observed_address() {
6753 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6754 let now = Instant::now();
6755 let mut state = AddressDiscoveryState::new(&config, now);
6756
6757 assert_eq!(state.get_observed_address(0), None);
6759
6760 let mut path_info = paths::PathAddressInfo::new();
6762 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80);
6763 path_info.update_observed_address(addr, now);
6764 state.path_addresses.insert(0, path_info);
6765
6766 assert_eq!(state.get_observed_address(0), Some(addr));
6768 assert_eq!(state.get_observed_address(1), None);
6769 }
6770
6771 #[test]
6772 fn address_discovery_state_unnotified_changes() {
6773 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6774 let now = Instant::now();
6775 let mut state = AddressDiscoveryState::new(&config, now);
6776
6777 assert!(!state.has_unnotified_changes());
6779
6780 let mut path_info = paths::PathAddressInfo::new();
6782 path_info.update_observed_address(
6783 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6784 now,
6785 );
6786 state.path_addresses.insert(0, path_info);
6787
6788 assert!(state.has_unnotified_changes());
6790
6791 state.record_observation_sent(0);
6793 assert!(!state.has_unnotified_changes());
6794 }
6795
6796 #[test]
6797 fn address_observation_rate_limiter_token_bucket() {
6798 let now = Instant::now();
6799 let mut limiter = AddressObservationRateLimiter::new(5, now); assert_eq!(limiter.tokens, 5.0);
6803 assert_eq!(limiter.max_tokens, 5.0);
6804 assert_eq!(limiter.rate, 5.0);
6805
6806 assert!(limiter.try_consume(3.0, now));
6808 assert_eq!(limiter.tokens, 2.0);
6809
6810 assert!(!limiter.try_consume(3.0, now));
6812 assert_eq!(limiter.tokens, 2.0);
6813
6814 let later = now + Duration::from_secs(1);
6816 limiter.update_tokens(later);
6817 assert_eq!(limiter.tokens, 5.0); let half_sec = now + Duration::from_millis(500);
6821 let mut limiter2 = AddressObservationRateLimiter::new(5, now);
6822 limiter2.try_consume(3.0, now);
6823 limiter2.update_tokens(half_sec);
6824 assert_eq!(limiter2.tokens, 4.5); }
6826
6827 #[test]
6829 fn connection_initializes_address_discovery_state_default() {
6830 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6833 let state = AddressDiscoveryState::new(&config, Instant::now());
6834 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
6837 }
6838
6839 #[test]
6840 fn connection_initializes_with_address_discovery_enabled() {
6841 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6843 let state = AddressDiscoveryState::new(&config, Instant::now());
6844 assert!(state.enabled);
6845 assert_eq!(state.max_observation_rate, 10);
6846 assert!(!state.observe_all_paths);
6847 }
6848
6849 #[test]
6850 fn connection_address_discovery_enabled_by_default() {
6851 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6853 let state = AddressDiscoveryState::new(&config, Instant::now());
6854 assert!(state.enabled); }
6856
6857 #[test]
6858 fn negotiate_max_idle_timeout_commutative() {
6859 let test_params = [
6860 (None, None, None),
6861 (None, Some(VarInt(0)), None),
6862 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6863 (Some(VarInt(0)), Some(VarInt(0)), None),
6864 (
6865 Some(VarInt(2)),
6866 Some(VarInt(0)),
6867 Some(Duration::from_millis(2)),
6868 ),
6869 (
6870 Some(VarInt(1)),
6871 Some(VarInt(4)),
6872 Some(Duration::from_millis(1)),
6873 ),
6874 ];
6875
6876 for (left, right, result) in test_params {
6877 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6878 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6879 }
6880 }
6881
6882 #[test]
6883 fn path_creation_initializes_address_discovery() {
6884 let config = TransportConfig::default();
6885 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6886 let now = Instant::now();
6887
6888 let path = paths::PathData::new(remote, false, None, now, &config);
6890
6891 assert!(path.address_info.observed_address.is_none());
6893 assert!(path.address_info.last_observed.is_none());
6894 assert_eq!(path.address_info.observation_count, 0);
6895 assert!(!path.address_info.notified);
6896
6897 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6899 assert_eq!(path.observation_rate_limiter.max_tokens, 10.0);
6900 assert_eq!(path.observation_rate_limiter.tokens, 10.0);
6901 }
6902
6903 #[test]
6904 fn path_migration_resets_address_discovery() {
6905 let config = TransportConfig::default();
6906 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6907 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6908 let now = Instant::now();
6909
6910 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
6912 path1.update_observed_address(remote1, now);
6913 path1.mark_address_notified();
6914 path1.consume_observation_token(now);
6915 path1.set_observation_rate(20);
6916
6917 let path2 = paths::PathData::from_previous(remote2, &path1, now);
6919
6920 assert!(path2.address_info.observed_address.is_none());
6922 assert!(path2.address_info.last_observed.is_none());
6923 assert_eq!(path2.address_info.observation_count, 0);
6924 assert!(!path2.address_info.notified);
6925
6926 assert_eq!(path2.observation_rate_limiter.rate, 20.0);
6928 assert_eq!(path2.observation_rate_limiter.tokens, 20.0);
6929 }
6930
6931 #[test]
6932 fn connection_path_updates_observation_rate() {
6933 let config = TransportConfig::default();
6934 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 42);
6935 let now = Instant::now();
6936
6937 let mut path = paths::PathData::new(remote, false, None, now, &config);
6938
6939 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6941
6942 path.set_observation_rate(25);
6944 assert_eq!(path.observation_rate_limiter.rate, 25.0);
6945 assert_eq!(path.observation_rate_limiter.max_tokens, 25.0);
6946
6947 path.observation_rate_limiter.tokens = 30.0; path.set_observation_rate(20);
6950 assert_eq!(path.observation_rate_limiter.tokens, 20.0); }
6952
6953 #[test]
6954 fn path_validation_preserves_discovery_state() {
6955 let config = TransportConfig::default();
6956 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6957 let now = Instant::now();
6958
6959 let mut path = paths::PathData::new(remote, false, None, now, &config);
6960
6961 let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 5678);
6963 path.update_observed_address(observed, now);
6964 path.set_observation_rate(15);
6965
6966 path.validated = true;
6968
6969 assert_eq!(path.address_info.observed_address, Some(observed));
6971 assert_eq!(path.observation_rate_limiter.rate, 15.0);
6972 }
6973
6974 #[test]
6975 fn address_discovery_state_initialization() {
6976 let state = AddressDiscoveryState::new_with_params(true, 30.0, true);
6978
6979 assert!(state.enabled);
6980 assert_eq!(state.max_observation_rate, 30);
6981 assert!(state.observe_all_paths);
6982 assert!(state.path_addresses.is_empty());
6983 assert!(state.observed_addresses.is_empty());
6984 }
6985
6986 #[test]
6988 fn handle_observed_address_frame_basic() {
6989 let config = AddressDiscoveryConfig::SendAndReceive;
6990 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6991 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6992 let now = Instant::now();
6993 let path_id = 0;
6994
6995 state.handle_observed_address(addr, path_id, now);
6997
6998 assert_eq!(state.observed_addresses.len(), 1);
7000 assert_eq!(state.observed_addresses[0].address, addr);
7001 assert_eq!(state.observed_addresses[0].path_id, path_id);
7002 assert_eq!(state.observed_addresses[0].received_at, now);
7003
7004 assert!(state.path_addresses.contains_key(&path_id));
7006 let path_info = &state.path_addresses[&path_id];
7007 assert_eq!(path_info.observed_address, Some(addr));
7008 assert_eq!(path_info.last_observed, Some(now));
7009 assert_eq!(path_info.observation_count, 1);
7010 }
7011
7012 #[test]
7013 fn handle_observed_address_frame_multiple_observations() {
7014 let config = AddressDiscoveryConfig::SendAndReceive;
7015 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7016 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7017 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
7018 let now = Instant::now();
7019 let path_id = 0;
7020
7021 state.handle_observed_address(addr1, path_id, now);
7023 state.handle_observed_address(addr1, path_id, now + Duration::from_secs(1));
7024 state.handle_observed_address(addr2, path_id, now + Duration::from_secs(2));
7025
7026 assert_eq!(state.observed_addresses.len(), 3);
7028
7029 let path_info = &state.path_addresses[&path_id];
7031 assert_eq!(path_info.observed_address, Some(addr2));
7032 assert_eq!(path_info.observation_count, 1); }
7034
7035 #[test]
7036 fn handle_observed_address_frame_disabled() {
7037 let config = AddressDiscoveryConfig::SendAndReceive;
7038 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7039 state.enabled = false; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7041 let now = Instant::now();
7042
7043 state.handle_observed_address(addr, 0, now);
7045
7046 assert!(state.observed_addresses.is_empty());
7048 assert!(state.path_addresses.is_empty());
7049 }
7050
7051 #[test]
7052 fn should_send_observation_basic() {
7053 let config = AddressDiscoveryConfig::SendAndReceive;
7054 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7055 state.max_observation_rate = 10;
7056 let now = Instant::now();
7057 let path_id = 0;
7058
7059 assert!(state.should_send_observation(path_id, now));
7061
7062 state.record_observation_sent(path_id);
7064
7065 assert!(state.should_send_observation(path_id, now));
7067 }
7068
7069 #[test]
7070 fn should_send_observation_rate_limiting() {
7071 let config = AddressDiscoveryConfig::SendAndReceive;
7072 let now = Instant::now();
7073 let mut state = AddressDiscoveryState::new(&config, now);
7074 state.max_observation_rate = 2; state.update_rate_limit(2.0);
7076 let path_id = 0;
7077
7078 assert!(state.should_send_observation(path_id, now));
7080 state.record_observation_sent(path_id);
7081 assert!(state.should_send_observation(path_id, now));
7082 state.record_observation_sent(path_id);
7083
7084 assert!(!state.should_send_observation(path_id, now));
7086
7087 let later = now + Duration::from_secs(1);
7089 assert!(state.should_send_observation(path_id, later));
7090 }
7091
7092 #[test]
7093 fn should_send_observation_disabled() {
7094 let config = AddressDiscoveryConfig::SendAndReceive;
7095 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7096 state.enabled = false;
7097
7098 assert!(!state.should_send_observation(0, Instant::now()));
7100 }
7101
7102 #[test]
7103 fn should_send_observation_per_path() {
7104 let config = AddressDiscoveryConfig::SendAndReceive;
7105 let now = Instant::now();
7106 let mut state = AddressDiscoveryState::new(&config, now);
7107 state.max_observation_rate = 2; state.observe_all_paths = true;
7109 state.update_rate_limit(2.0);
7110
7111 assert!(state.should_send_observation(0, now));
7113 state.record_observation_sent(0);
7114
7115 assert!(state.should_send_observation(1, now));
7117 state.record_observation_sent(1);
7118
7119 assert!(!state.should_send_observation(0, now));
7121 assert!(!state.should_send_observation(1, now));
7122
7123 let later = now + Duration::from_secs(1);
7125 assert!(state.should_send_observation(0, later));
7126 }
7127
7128 #[test]
7129 fn has_unnotified_changes_test() {
7130 let config = AddressDiscoveryConfig::SendAndReceive;
7131 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7132 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7133 let now = Instant::now();
7134
7135 assert!(!state.has_unnotified_changes());
7137
7138 state.handle_observed_address(addr, 0, now);
7140 assert!(state.has_unnotified_changes());
7141
7142 state.path_addresses.get_mut(&0).unwrap().notified = true;
7144 assert!(!state.has_unnotified_changes());
7145 }
7146
7147 #[test]
7148 fn get_observed_address_test() {
7149 let config = AddressDiscoveryConfig::SendAndReceive;
7150 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7151 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7152 let now = Instant::now();
7153 let path_id = 0;
7154
7155 assert_eq!(state.get_observed_address(path_id), None);
7157
7158 state.handle_observed_address(addr, path_id, now);
7160 assert_eq!(state.get_observed_address(path_id), Some(addr));
7161
7162 assert_eq!(state.get_observed_address(999), None);
7164 }
7165
7166 #[test]
7168 fn rate_limiter_token_bucket_basic() {
7169 let now = Instant::now();
7170 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(5.0, now));
7174 assert!(limiter.try_consume(5.0, now));
7175
7176 assert!(!limiter.try_consume(1.0, now));
7178 }
7179
7180 #[test]
7181 fn rate_limiter_token_replenishment() {
7182 let now = Instant::now();
7183 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(10.0, now));
7187 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7191 assert!(limiter.try_consume(10.0, later)); assert!(!limiter.try_consume(0.1, later)); let later = later + Duration::from_millis(500);
7196 assert!(limiter.try_consume(5.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7199
7200 #[test]
7201 fn rate_limiter_max_tokens_cap() {
7202 let now = Instant::now();
7203 let mut limiter = AddressObservationRateLimiter::new(10, now);
7204
7205 let later = now + Duration::from_secs(2);
7207 assert!(limiter.try_consume(10.0, later));
7209 assert!(!limiter.try_consume(10.1, later)); let later2 = later + Duration::from_secs(1);
7213 assert!(limiter.try_consume(3.0, later2));
7214
7215 let much_later = later2 + Duration::from_secs(2);
7217 assert!(limiter.try_consume(10.0, much_later)); assert!(!limiter.try_consume(0.1, much_later)); }
7220
7221 #[test]
7222 fn rate_limiter_fractional_consumption() {
7223 let now = Instant::now();
7224 let mut limiter = AddressObservationRateLimiter::new(10, now);
7225
7226 assert!(limiter.try_consume(0.5, now));
7228 assert!(limiter.try_consume(2.3, now));
7229 assert!(limiter.try_consume(7.2, now)); assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_millis(100); assert!(limiter.try_consume(1.0, later));
7235 assert!(!limiter.try_consume(0.1, later));
7236 }
7237
7238 #[test]
7239 fn rate_limiter_zero_rate() {
7240 let now = Instant::now();
7241 let mut limiter = AddressObservationRateLimiter::new(0, now); assert!(!limiter.try_consume(1.0, now));
7245 assert!(!limiter.try_consume(0.1, now));
7246 assert!(!limiter.try_consume(0.001, now));
7247
7248 let later = now + Duration::from_secs(10);
7250 assert!(!limiter.try_consume(0.001, later));
7251 }
7252
7253 #[test]
7254 fn rate_limiter_high_rate() {
7255 let now = Instant::now();
7256 let mut limiter = AddressObservationRateLimiter::new(63, now); assert!(limiter.try_consume(60.0, now));
7260 assert!(limiter.try_consume(3.0, now));
7261 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7265 assert!(limiter.try_consume(63.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7268
7269 #[test]
7270 fn rate_limiter_time_precision() {
7271 let now = Instant::now();
7272 let mut limiter = AddressObservationRateLimiter::new(100, now); assert!(limiter.try_consume(100.0, now));
7276 assert!(!limiter.try_consume(0.1, now));
7277
7278 let later = now + Duration::from_millis(10);
7280 assert!(limiter.try_consume(0.8, later)); assert!(!limiter.try_consume(0.5, later)); let much_later = later + Duration::from_millis(100); assert!(limiter.try_consume(5.0, much_later)); limiter.tokens = 0.0; let final_time = much_later + Duration::from_millis(1);
7292 limiter.update_tokens(final_time); assert!(limiter.tokens >= 0.09 && limiter.tokens <= 0.11);
7297 }
7298
7299 #[test]
7300 fn per_path_rate_limiting_independent() {
7301 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7302 let now = Instant::now();
7303 let mut state = AddressDiscoveryState::new(&config, now);
7304
7305 state.observe_all_paths = true;
7307
7308 state.update_rate_limit(5.0);
7310
7311 state
7313 .path_addresses
7314 .insert(0, paths::PathAddressInfo::new());
7315 state
7316 .path_addresses
7317 .insert(1, paths::PathAddressInfo::new());
7318 state
7319 .path_addresses
7320 .insert(2, paths::PathAddressInfo::new());
7321
7322 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7324 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7325 8080,
7326 ));
7327 state.path_addresses.get_mut(&1).unwrap().observed_address = Some(SocketAddr::new(
7328 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
7329 8081,
7330 ));
7331 state.path_addresses.get_mut(&2).unwrap().observed_address = Some(SocketAddr::new(
7332 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)),
7333 8082,
7334 ));
7335
7336 for _ in 0..3 {
7338 assert!(state.should_send_observation(0, now));
7339 state.record_observation_sent(0);
7340 state.path_addresses.get_mut(&0).unwrap().notified = false;
7342 }
7343
7344 for _ in 0..2 {
7346 assert!(state.should_send_observation(1, now));
7347 state.record_observation_sent(1);
7348 state.path_addresses.get_mut(&1).unwrap().notified = false;
7350 }
7351
7352 assert!(!state.should_send_observation(2, now));
7354
7355 let later = now + Duration::from_secs(1);
7357
7358 assert!(state.should_send_observation(0, later));
7360 assert!(state.should_send_observation(1, later));
7361 assert!(state.should_send_observation(2, later));
7362 }
7363
7364 #[test]
7365 fn per_path_rate_limiting_with_path_specific_limits() {
7366 let now = Instant::now();
7367 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7368 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7369 let config = TransportConfig::default();
7370
7371 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
7373 let mut path2 = paths::PathData::new(remote2, false, None, now, &config);
7374
7375 path1.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now); path2.observation_rate_limiter = paths::PathObservationRateLimiter::new(5, now); for _ in 0..10 {
7381 assert!(path1.observation_rate_limiter.can_send(now));
7382 path1.observation_rate_limiter.consume_token(now);
7383 }
7384 assert!(!path1.observation_rate_limiter.can_send(now));
7385
7386 for _ in 0..5 {
7388 assert!(path2.observation_rate_limiter.can_send(now));
7389 path2.observation_rate_limiter.consume_token(now);
7390 }
7391 assert!(!path2.observation_rate_limiter.can_send(now));
7392 }
7393
7394 #[test]
7395 fn per_path_rate_limiting_address_change_detection() {
7396 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7397 let now = Instant::now();
7398 let mut state = AddressDiscoveryState::new(&config, now);
7399
7400 let path_id = 0;
7402 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7403 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8080);
7404
7405 assert!(state.should_send_observation(path_id, now));
7407 state.handle_observed_address(addr1, path_id, now);
7408 state.record_observation_sent(path_id);
7409
7410 assert!(!state.should_send_observation(path_id, now));
7412
7413 state.handle_observed_address(addr2, path_id, now);
7415 if let Some(info) = state.path_addresses.get_mut(&path_id) {
7416 info.notified = false; }
7418
7419 assert!(state.should_send_observation(path_id, now));
7421 }
7422
7423 #[test]
7424 fn per_path_rate_limiting_migration() {
7425 let now = Instant::now();
7426 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7427 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7428 let config = TransportConfig::default();
7429
7430 let mut path = paths::PathData::new(remote1, false, None, now, &config);
7432 path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7433
7434 for _ in 0..5 {
7436 assert!(path.observation_rate_limiter.can_send(now));
7437 path.observation_rate_limiter.consume_token(now);
7438 }
7439
7440 let mut new_path = paths::PathData::new(remote2, false, None, now, &config);
7442
7443 new_path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7446
7447 for _ in 0..10 {
7449 assert!(new_path.observation_rate_limiter.can_send(now));
7450 new_path.observation_rate_limiter.consume_token(now);
7451 }
7452 assert!(!new_path.observation_rate_limiter.can_send(now));
7453 }
7454
7455 #[test]
7456 fn per_path_rate_limiting_disabled_paths() {
7457 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7458 let now = Instant::now();
7459 let mut state = AddressDiscoveryState::new(&config, now);
7460
7461 assert!(state.should_send_observation(0, now));
7463
7464 assert!(!state.should_send_observation(1, now));
7466 assert!(!state.should_send_observation(2, now));
7467
7468 let later = now + Duration::from_secs(1);
7470 assert!(!state.should_send_observation(1, later));
7471 }
7472
7473 #[test]
7474 fn respecting_negotiated_max_observation_rate_basic() {
7475 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7476 let now = Instant::now();
7477 let mut state = AddressDiscoveryState::new(&config, now);
7478
7479 state.max_observation_rate = 10; state.rate_limiter = AddressObservationRateLimiter::new(10, now);
7482
7483 for _ in 0..10 {
7485 assert!(state.should_send_observation(0, now));
7486 }
7487 assert!(!state.should_send_observation(0, now));
7489 }
7490
7491 #[test]
7492 fn respecting_negotiated_max_observation_rate_zero() {
7493 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7494 let now = Instant::now();
7495 let mut state = AddressDiscoveryState::new(&config, now);
7496
7497 state.max_observation_rate = 0;
7499 state.rate_limiter = AddressObservationRateLimiter::new(0, now);
7500
7501 assert!(!state.should_send_observation(0, now));
7503 assert!(!state.should_send_observation(1, now));
7504
7505 let later = now + Duration::from_secs(10);
7507 assert!(!state.should_send_observation(0, later));
7508 }
7509
7510 #[test]
7511 fn respecting_negotiated_max_observation_rate_higher() {
7512 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7513 let now = Instant::now();
7514 let mut state = AddressDiscoveryState::new(&config, now);
7515
7516 state
7518 .path_addresses
7519 .insert(0, paths::PathAddressInfo::new());
7520 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7521 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7522 8080,
7523 ));
7524
7525 state.update_rate_limit(5.0);
7527
7528 state.max_observation_rate = 20; for _ in 0..5 {
7533 assert!(state.should_send_observation(0, now));
7534 state.record_observation_sent(0);
7535 state.path_addresses.get_mut(&0).unwrap().notified = false;
7537 }
7538 assert!(!state.should_send_observation(0, now));
7540 }
7541
7542 #[test]
7543 fn respecting_negotiated_max_observation_rate_dynamic_update() {
7544 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7545 let now = Instant::now();
7546 let mut state = AddressDiscoveryState::new(&config, now);
7547
7548 state
7550 .path_addresses
7551 .insert(0, paths::PathAddressInfo::new());
7552 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7553 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7554 8080,
7555 ));
7556
7557 for _ in 0..5 {
7559 assert!(state.should_send_observation(0, now));
7560 state.record_observation_sent(0);
7561 state.path_addresses.get_mut(&0).unwrap().notified = false;
7563 }
7564
7565 state.max_observation_rate = 3;
7569 state.rate_limiter.set_rate(3);
7570
7571 for _ in 0..3 {
7574 assert!(state.should_send_observation(0, now));
7575 state.record_observation_sent(0);
7576 state.path_addresses.get_mut(&0).unwrap().notified = false;
7578 }
7579
7580 assert!(!state.should_send_observation(0, now));
7582
7583 let later = now + Duration::from_secs(1);
7585 for _ in 0..3 {
7586 assert!(state.should_send_observation(0, later));
7587 state.record_observation_sent(0);
7588 state.path_addresses.get_mut(&0).unwrap().notified = false;
7590 }
7591
7592 assert!(!state.should_send_observation(0, later));
7594 }
7595
7596 #[test]
7597 fn respecting_negotiated_max_observation_rate_with_paths() {
7598 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7599 let now = Instant::now();
7600 let mut state = AddressDiscoveryState::new(&config, now);
7601
7602 state.observe_all_paths = true;
7604
7605 for i in 0..3 {
7607 state
7608 .path_addresses
7609 .insert(i, paths::PathAddressInfo::new());
7610 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
7611 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100 + i as u8)),
7612 5000,
7613 ));
7614 }
7615
7616 for _ in 0..3 {
7619 for i in 0..3 {
7621 if state.should_send_observation(i, now) {
7622 state.record_observation_sent(i);
7623 state.path_addresses.get_mut(&i).unwrap().notified = false;
7625 }
7626 }
7627 }
7628
7629 assert!(state.should_send_observation(0, now));
7632 state.record_observation_sent(0);
7633
7634 assert!(!state.should_send_observation(0, now));
7636 assert!(!state.should_send_observation(1, now));
7637 assert!(!state.should_send_observation(2, now));
7638 }
7639
7640 #[test]
7641 fn queue_observed_address_frame_basic() {
7642 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7643 let now = Instant::now();
7644 let mut state = AddressDiscoveryState::new(&config, now);
7645
7646 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7648 let frame = state.queue_observed_address_frame(0, address);
7649
7650 assert!(frame.is_some());
7652 let frame = frame.unwrap();
7653 assert_eq!(frame.address, address);
7654
7655 assert!(state.path_addresses.contains_key(&0));
7657 assert!(state.path_addresses.get(&0).unwrap().notified);
7658 }
7659
7660 #[test]
7661 fn queue_observed_address_frame_rate_limited() {
7662 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7663 let now = Instant::now();
7664 let mut state = AddressDiscoveryState::new(&config, now);
7665
7666 state.observe_all_paths = true;
7668
7669 let mut addresses = Vec::new();
7671 for i in 0..10 {
7672 let addr = SocketAddr::new(
7673 IpAddr::V4(Ipv4Addr::new(192, 168, 1, i as u8)),
7674 5000 + i as u16,
7675 );
7676 addresses.push(addr);
7677 assert!(
7678 state.queue_observed_address_frame(i as u64, addr).is_some(),
7679 "Frame {} should be allowed",
7680 i + 1
7681 );
7682 }
7683
7684 let addr11 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 11)), 5011);
7686 assert!(
7687 state.queue_observed_address_frame(10, addr11).is_none(),
7688 "11th frame should be rate limited"
7689 );
7690 }
7691
7692 #[test]
7693 fn queue_observed_address_frame_disabled() {
7694 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7695 let now = Instant::now();
7696 let mut state = AddressDiscoveryState::new(&config, now);
7697
7698 state.enabled = false;
7700
7701 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7702
7703 assert!(state.queue_observed_address_frame(0, address).is_none());
7705 }
7706
7707 #[test]
7708 fn queue_observed_address_frame_already_notified() {
7709 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7710 let now = Instant::now();
7711 let mut state = AddressDiscoveryState::new(&config, now);
7712
7713 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7714
7715 assert!(state.queue_observed_address_frame(0, address).is_some());
7717
7718 assert!(state.queue_observed_address_frame(0, address).is_none());
7720
7721 let new_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7723 assert!(state.queue_observed_address_frame(0, new_address).is_none());
7724 }
7725
7726 #[test]
7727 fn queue_observed_address_frame_primary_path_only() {
7728 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7729 let now = Instant::now();
7730 let mut state = AddressDiscoveryState::new(&config, now);
7731
7732 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7733
7734 assert!(state.queue_observed_address_frame(0, address).is_some());
7736
7737 assert!(state.queue_observed_address_frame(1, address).is_none());
7739 assert!(state.queue_observed_address_frame(2, address).is_none());
7740 }
7741
7742 #[test]
7743 fn queue_observed_address_frame_updates_path_info() {
7744 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7745 let now = Instant::now();
7746 let mut state = AddressDiscoveryState::new(&config, now);
7747
7748 let address = SocketAddr::new(
7749 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
7750 5000,
7751 );
7752
7753 let frame = state.queue_observed_address_frame(0, address);
7755 assert!(frame.is_some());
7756
7757 let path_info = state.path_addresses.get(&0).unwrap();
7759 assert_eq!(path_info.observed_address, Some(address));
7760 assert!(path_info.notified);
7761
7762 assert_eq!(state.observed_addresses.len(), 0);
7765 }
7766
7767 #[test]
7768 fn retransmits_includes_observed_addresses() {
7769 use crate::connection::spaces::Retransmits;
7770
7771 let mut retransmits = Retransmits::default();
7773
7774 assert!(retransmits.observed_addresses.is_empty());
7776
7777 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7779 let frame = frame::ObservedAddress {
7780 sequence_number: VarInt::from_u32(1),
7781 address,
7782 };
7783 retransmits.observed_addresses.push(frame);
7784
7785 assert_eq!(retransmits.observed_addresses.len(), 1);
7787 assert_eq!(retransmits.observed_addresses[0].address, address);
7788 }
7789
7790 #[test]
7791 fn check_for_address_observations_no_peer_support() {
7792 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7793 let now = Instant::now();
7794 let mut state = AddressDiscoveryState::new(&config, now);
7795
7796 state
7798 .path_addresses
7799 .insert(0, paths::PathAddressInfo::new());
7800 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7801 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)),
7802 5000,
7803 ));
7804
7805 let frames = state.check_for_address_observations(0, false, now);
7807
7808 assert!(frames.is_empty());
7810 }
7811
7812 #[test]
7813 fn check_for_address_observations_with_peer_support() {
7814 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7815 let now = Instant::now();
7816 let mut state = AddressDiscoveryState::new(&config, now);
7817
7818 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7820 state
7821 .path_addresses
7822 .insert(0, paths::PathAddressInfo::new());
7823 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(address);
7824
7825 let frames = state.check_for_address_observations(0, true, now);
7827
7828 assert_eq!(frames.len(), 1);
7830 assert_eq!(frames[0].address, address);
7831
7832 assert!(state.path_addresses.get(&0).unwrap().notified);
7834 }
7835
7836 #[test]
7837 fn check_for_address_observations_rate_limited() {
7838 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7839 let now = Instant::now();
7840 let mut state = AddressDiscoveryState::new(&config, now);
7841
7842 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7844 state
7845 .path_addresses
7846 .insert(0, paths::PathAddressInfo::new());
7847 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(address);
7848
7849 for _ in 0..10 {
7851 let frames = state.check_for_address_observations(0, true, now);
7852 if frames.is_empty() {
7853 break;
7854 }
7855 state.path_addresses.get_mut(&0).unwrap().notified = false;
7857 }
7858
7859 assert_eq!(state.rate_limiter.tokens, 0.0);
7861
7862 state.path_addresses.get_mut(&0).unwrap().notified = false;
7864
7865 let frames2 = state.check_for_address_observations(0, true, now);
7867 assert_eq!(frames2.len(), 0);
7868
7869 state.path_addresses.get_mut(&0).unwrap().notified = false;
7871
7872 let later = now + Duration::from_millis(200); let frames3 = state.check_for_address_observations(0, true, later);
7875 assert_eq!(frames3.len(), 1);
7876 }
7877
7878 #[test]
7879 fn check_for_address_observations_multiple_paths() {
7880 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7881 let now = Instant::now();
7882 let mut state = AddressDiscoveryState::new(&config, now);
7883
7884 state.observe_all_paths = true;
7886
7887 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7889 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7890
7891 state
7892 .path_addresses
7893 .insert(0, paths::PathAddressInfo::new());
7894 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(addr1);
7895
7896 state
7897 .path_addresses
7898 .insert(1, paths::PathAddressInfo::new());
7899 state.path_addresses.get_mut(&1).unwrap().observed_address = Some(addr2);
7900
7901 let frames = state.check_for_address_observations(0, true, now);
7903
7904 assert_eq!(frames.len(), 2);
7906
7907 let addresses: Vec<_> = frames.iter().map(|f| f.address).collect();
7909 assert!(addresses.contains(&addr1));
7910 assert!(addresses.contains(&addr2));
7911
7912 assert!(state.path_addresses.get(&0).unwrap().notified);
7914 assert!(state.path_addresses.get(&1).unwrap().notified);
7915 }
7916
7917 #[test]
7919 fn test_rate_limiter_configuration() {
7920 let state = AddressDiscoveryState::new_with_params(true, 10.0, false);
7922 assert_eq!(state.rate_limiter.rate, 10.0);
7923 assert_eq!(state.rate_limiter.max_tokens, 10.0);
7924 assert_eq!(state.rate_limiter.tokens, 10.0);
7925
7926 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
7927 assert_eq!(state.rate_limiter.rate, 63.0);
7928 assert_eq!(state.rate_limiter.max_tokens, 63.0);
7929 }
7930
7931 #[test]
7932 fn test_rate_limiter_update_configuration() {
7933 let mut state = AddressDiscoveryState::new_with_params(true, 5.0, false);
7934
7935 assert_eq!(state.rate_limiter.rate, 5.0);
7937
7938 state.update_rate_limit(10.0);
7940 assert_eq!(state.rate_limiter.rate, 10.0);
7941 assert_eq!(state.rate_limiter.max_tokens, 10.0);
7942
7943 state.rate_limiter.tokens = 15.0;
7945 state.update_rate_limit(8.0);
7946 assert_eq!(state.rate_limiter.tokens, 8.0);
7947 }
7948
7949 #[test]
7950 fn test_rate_limiter_from_transport_params() {
7951 let mut params = TransportParameters::default();
7952 params.address_discovery = Some(AddressDiscoveryConfig::SendAndReceive);
7953
7954 let state = AddressDiscoveryState::from_transport_params(¶ms);
7955 assert!(state.is_some());
7956 let state = state.unwrap();
7957 assert_eq!(state.rate_limiter.rate, 10.0); assert!(!state.observe_all_paths); }
7960
7961 #[test]
7962 fn test_rate_limiter_zero_rate() {
7963 let state = AddressDiscoveryState::new_with_params(true, 0.0, false);
7964 assert_eq!(state.rate_limiter.rate, 0.0);
7965 assert_eq!(state.rate_limiter.tokens, 0.0);
7966
7967 let address = "192.168.1.1:443".parse().unwrap();
7969 let mut state = AddressDiscoveryState::new_with_params(true, 0.0, false);
7970 let frame = state.queue_observed_address_frame(0, address);
7971 assert!(frame.is_none());
7972 }
7973
7974 #[test]
7975 fn test_rate_limiter_configuration_edge_cases() {
7976 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
7978 assert_eq!(state.rate_limiter.rate, 63.0);
7979
7980 let state = AddressDiscoveryState::new_with_params(true, 100.0, false);
7982 assert_eq!(state.rate_limiter.rate, 100.0);
7984
7985 let state = AddressDiscoveryState::new_with_params(true, 2.5, false);
7987 assert_eq!(state.rate_limiter.rate, 2.0);
7989 }
7990
7991 #[test]
7992 fn test_rate_limiter_runtime_update() {
7993 let mut state = AddressDiscoveryState::new_with_params(true, 10.0, false);
7994 let now = Instant::now();
7995
7996 state.rate_limiter.tokens = 5.0;
7998
7999 state.update_rate_limit(3.0);
8001
8002 assert_eq!(state.rate_limiter.tokens, 3.0);
8004 assert_eq!(state.rate_limiter.rate, 3.0);
8005 assert_eq!(state.rate_limiter.max_tokens, 3.0);
8006
8007 let later = now + Duration::from_secs(1);
8009 state.rate_limiter.update_tokens(later);
8010
8011 assert_eq!(state.rate_limiter.tokens, 3.0);
8013 }
8014
8015 #[test]
8017 fn test_address_discovery_state_initialization_default() {
8018 let now = Instant::now();
8020 let default_config = crate::transport_parameters::AddressDiscoveryConfig::default();
8021
8022 let address_discovery_state = Some(AddressDiscoveryState::new(&default_config, now));
8025
8026 assert!(address_discovery_state.is_some());
8027 let state = address_discovery_state.unwrap();
8028
8029 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
8033 }
8034
8035 #[test]
8036 fn test_address_discovery_state_initialization_on_handshake() {
8037 let now = Instant::now();
8039
8040 let mut address_discovery_state = Some(AddressDiscoveryState::new(
8042 &crate::transport_parameters::AddressDiscoveryConfig::default(),
8043 now,
8044 ));
8045
8046 let peer_params = TransportParameters {
8048 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8049 ..TransportParameters::default()
8050 };
8051
8052 if let Some(peer_config) = &peer_params.address_discovery {
8054 address_discovery_state = Some(AddressDiscoveryState::new(peer_config, now));
8056 }
8057
8058 assert!(address_discovery_state.is_some());
8060 let state = address_discovery_state.unwrap();
8061 assert!(state.enabled);
8062 assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths); }
8066
8067 #[test]
8068 fn test_address_discovery_negotiation_disabled_peer() {
8069 let now = Instant::now();
8071
8072 let our_config = AddressDiscoveryConfig::SendAndReceive;
8074 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8075
8076 let peer_params = TransportParameters {
8078 address_discovery: None,
8079 ..TransportParameters::default()
8080 };
8081
8082 if peer_params.address_discovery.is_none() {
8084 if let Some(state) = &mut address_discovery_state {
8085 state.enabled = false;
8086 }
8087 }
8088
8089 let state = address_discovery_state.unwrap();
8091 assert!(!state.enabled); }
8093
8094 #[test]
8095 fn test_address_discovery_negotiation_rate_limiting() {
8096 let now = Instant::now();
8098
8099 let our_config = AddressDiscoveryConfig::SendAndReceive;
8101 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8102
8103 if let Some(state) = &mut address_discovery_state {
8105 state.max_observation_rate = 30;
8106 state.update_rate_limit(30.0);
8107 }
8108
8109 let peer_params = TransportParameters {
8111 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8112 ..TransportParameters::default()
8113 };
8114
8115 if let (Some(state), Some(_peer_config)) =
8118 (&mut address_discovery_state, &peer_params.address_discovery)
8119 {
8120 let peer_rate = 15u8;
8123 let negotiated_rate = state.max_observation_rate.min(peer_rate);
8124 state.update_rate_limit(negotiated_rate as f64);
8125 }
8126
8127 let state = address_discovery_state.unwrap();
8129 assert_eq!(state.rate_limiter.rate, 15.0); }
8131
8132 #[test]
8133 fn test_address_discovery_path_initialization() {
8134 let now = Instant::now();
8136 let config = AddressDiscoveryConfig::SendAndReceive;
8137 let mut state = AddressDiscoveryState::new(&config, now);
8138
8139 assert!(state.path_addresses.is_empty());
8141
8142 let should_send = state.should_send_observation(0, now);
8144 assert!(should_send); }
8149
8150 #[test]
8151 fn test_address_discovery_multiple_path_initialization() {
8152 let now = Instant::now();
8154 let config = AddressDiscoveryConfig::SendAndReceive;
8155 let mut state = AddressDiscoveryState::new(&config, now);
8156
8157 assert!(state.should_send_observation(0, now)); assert!(!state.should_send_observation(1, now)); assert!(!state.should_send_observation(2, now)); state.observe_all_paths = true;
8164 assert!(state.should_send_observation(1, now)); assert!(state.should_send_observation(2, now)); let config_primary_only = AddressDiscoveryConfig::SendAndReceive;
8169 let mut state_primary = AddressDiscoveryState::new(&config_primary_only, now);
8170
8171 assert!(state_primary.should_send_observation(0, now)); assert!(!state_primary.should_send_observation(1, now)); }
8174
8175 #[test]
8176 fn test_handle_observed_address_frame_valid() {
8177 let now = Instant::now();
8179 let config = AddressDiscoveryConfig::SendAndReceive;
8180 let mut state = AddressDiscoveryState::new(&config, now);
8181
8182 let observed_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8184 state.handle_observed_address(observed_addr, 0, now);
8185
8186 assert_eq!(state.observed_addresses.len(), 1);
8188 assert_eq!(state.observed_addresses[0].address, observed_addr);
8189 assert_eq!(state.observed_addresses[0].path_id, 0);
8190 assert_eq!(state.observed_addresses[0].received_at, now);
8191
8192 let path_info = state.path_addresses.get(&0).unwrap();
8194 assert_eq!(path_info.observed_address, Some(observed_addr));
8195 assert_eq!(path_info.last_observed, Some(now));
8196 assert_eq!(path_info.observation_count, 1);
8197 }
8198
8199 #[test]
8200 fn test_handle_multiple_observed_addresses() {
8201 let now = Instant::now();
8203 let config = AddressDiscoveryConfig::SendAndReceive;
8204 let mut state = AddressDiscoveryState::new(&config, now);
8205
8206 let addr1 = SocketAddr::from(([192, 168, 1, 100], 5000));
8208 let addr2 = SocketAddr::from(([10, 0, 0, 50], 6000));
8209 let addr3 = SocketAddr::from(([192, 168, 1, 100], 7000)); state.handle_observed_address(addr1, 0, now);
8212 state.handle_observed_address(addr2, 1, now);
8213 state.handle_observed_address(addr3, 0, now + Duration::from_millis(100));
8214
8215 assert_eq!(state.observed_addresses.len(), 3);
8217
8218 let path0_info = state.path_addresses.get(&0).unwrap();
8220 assert_eq!(path0_info.observed_address, Some(addr3));
8221 assert_eq!(path0_info.observation_count, 1); let path1_info = state.path_addresses.get(&1).unwrap();
8225 assert_eq!(path1_info.observed_address, Some(addr2));
8226 assert_eq!(path1_info.observation_count, 1);
8227 }
8228
8229 #[test]
8230 fn test_get_observed_address() {
8231 let now = Instant::now();
8233 let config = AddressDiscoveryConfig::SendAndReceive;
8234 let mut state = AddressDiscoveryState::new(&config, now);
8235
8236 assert_eq!(state.get_observed_address(0), None);
8238
8239 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8241 state.handle_observed_address(addr, 0, now);
8242
8243 assert_eq!(state.get_observed_address(0), Some(addr));
8245
8246 assert_eq!(state.get_observed_address(999), None);
8248 }
8249
8250 #[test]
8251 fn test_has_unnotified_changes() {
8252 let now = Instant::now();
8254 let config = AddressDiscoveryConfig::SendAndReceive;
8255 let mut state = AddressDiscoveryState::new(&config, now);
8256
8257 assert!(!state.has_unnotified_changes());
8259
8260 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8262 state.handle_observed_address(addr, 0, now);
8263 assert!(state.has_unnotified_changes());
8264
8265 if let Some(path_info) = state.path_addresses.get_mut(&0) {
8267 path_info.notified = true;
8268 }
8269 assert!(!state.has_unnotified_changes());
8270
8271 let addr2 = SocketAddr::from(([192, 168, 1, 100], 6000));
8273 state.handle_observed_address(addr2, 0, now + Duration::from_secs(1));
8274 assert!(state.has_unnotified_changes());
8275 }
8276
8277 #[test]
8278 fn test_address_discovery_disabled() {
8279 let now = Instant::now();
8281 let config = AddressDiscoveryConfig::SendAndReceive;
8282 let mut state = AddressDiscoveryState::new(&config, now);
8283
8284 state.enabled = false;
8286
8287 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8289 state.handle_observed_address(addr, 0, now);
8290
8291 assert_eq!(state.observed_addresses.len(), 0);
8293
8294 assert!(!state.should_send_observation(0, now));
8296 }
8297
8298 #[test]
8299 fn test_rate_limiting_basic() {
8300 let now = Instant::now();
8302 let config = AddressDiscoveryConfig::SendAndReceive;
8303 let mut state = AddressDiscoveryState::new(&config, now);
8304
8305 state.observe_all_paths = true;
8307 state.rate_limiter.set_rate(2); assert!(state.should_send_observation(0, now));
8311 state.record_observation_sent(0);
8313
8314 assert!(state.should_send_observation(1, now));
8316 state.record_observation_sent(1);
8317
8318 assert!(!state.should_send_observation(2, now));
8320
8321 let later = now + Duration::from_millis(500);
8323 assert!(state.should_send_observation(3, later));
8324 state.record_observation_sent(3);
8325
8326 assert!(!state.should_send_observation(4, later));
8328
8329 let _one_sec_later = now + Duration::from_secs(1);
8333 let two_sec_later = now + Duration::from_secs(2);
8337 assert!(state.should_send_observation(5, two_sec_later));
8338 state.record_observation_sent(5);
8339
8340 assert!(state.should_send_observation(6, two_sec_later));
8351 state.record_observation_sent(6);
8352
8353 assert!(
8355 !state.should_send_observation(7, two_sec_later),
8356 "Expected no tokens available"
8357 );
8358 }
8359
8360 #[test]
8361 fn test_rate_limiting_per_path() {
8362 let now = Instant::now();
8364 let config = AddressDiscoveryConfig::SendAndReceive;
8365 let mut state = AddressDiscoveryState::new(&config, now);
8366
8367 state
8369 .path_addresses
8370 .insert(0, paths::PathAddressInfo::new());
8371 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
8372 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
8373 8080,
8374 ));
8375
8376 for _ in 0..10 {
8378 assert!(state.should_send_observation(0, now));
8379 state.record_observation_sent(0);
8380 state.path_addresses.get_mut(&0).unwrap().notified = false;
8382 }
8383
8384 assert!(!state.should_send_observation(0, now));
8386
8387 let later = now + Duration::from_millis(100);
8389 assert!(state.should_send_observation(0, later));
8390 state.record_observation_sent(0);
8391
8392 state.path_addresses.get_mut(&0).unwrap().notified = false;
8394
8395 assert!(!state.should_send_observation(0, later));
8397 }
8398
8399 #[test]
8400 fn test_rate_limiting_zero_rate() {
8401 let now = Instant::now();
8403 let config = AddressDiscoveryConfig::SendAndReceive;
8404 let mut state = AddressDiscoveryState::new(&config, now);
8405
8406 state.rate_limiter.set_rate(0);
8408 state.rate_limiter.tokens = 0.0;
8409 state.rate_limiter.max_tokens = 0.0;
8410
8411 assert!(!state.should_send_observation(0, now));
8413 assert!(!state.should_send_observation(0, now + Duration::from_secs(10)));
8414 assert!(!state.should_send_observation(0, now + Duration::from_secs(100)));
8415 }
8416
8417 #[test]
8418 fn test_rate_limiting_update() {
8419 let now = Instant::now();
8421 let config = AddressDiscoveryConfig::SendAndReceive;
8422 let mut state = AddressDiscoveryState::new(&config, now);
8423
8424 state.observe_all_paths = true;
8426
8427 for i in 0..12 {
8429 state
8430 .path_addresses
8431 .insert(i, paths::PathAddressInfo::new());
8432 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
8433 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8434 8080,
8435 ));
8436 }
8437
8438 for i in 0..10 {
8441 assert!(state.should_send_observation(i, now));
8442 state.record_observation_sent(i);
8443 }
8444 assert!(!state.should_send_observation(10, now));
8446
8447 state.update_rate_limit(20.0);
8449
8450 let later = now + Duration::from_millis(50);
8453 assert!(state.should_send_observation(10, later));
8454 state.record_observation_sent(10);
8455
8456 let later2 = now + Duration::from_millis(100);
8458 assert!(state.should_send_observation(11, later2));
8459 }
8460
8461 #[test]
8462 fn test_rate_limiting_burst() {
8463 let now = Instant::now();
8465 let config = AddressDiscoveryConfig::SendAndReceive;
8466 let mut state = AddressDiscoveryState::new(&config, now);
8467
8468 for _ in 0..10 {
8470 assert!(state.should_send_observation(0, now));
8471 state.record_observation_sent(0);
8472 }
8473
8474 assert!(!state.should_send_observation(0, now));
8476
8477 let later = now + Duration::from_millis(100);
8479 assert!(state.should_send_observation(0, later));
8480 state.record_observation_sent(0);
8481 assert!(!state.should_send_observation(0, later));
8482 }
8483
8484 #[test]
8485 fn test_connection_rate_limiting_with_check_observations() {
8486 let now = Instant::now();
8488 let config = AddressDiscoveryConfig::SendAndReceive;
8489 let mut state = AddressDiscoveryState::new(&config, now);
8490
8491 let mut path_info = paths::PathAddressInfo::new();
8493 path_info.update_observed_address(
8494 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
8495 now,
8496 );
8497 state.path_addresses.insert(0, path_info);
8498
8499 let frame1 =
8501 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8502 assert!(frame1.is_some());
8503 state.record_observation_sent(0);
8504
8505 if let Some(info) = state.path_addresses.get_mut(&0) {
8507 info.notified = false;
8508 }
8509
8510 for _ in 1..10 {
8512 if let Some(info) = state.path_addresses.get_mut(&0) {
8514 info.notified = false;
8515 }
8516 let frame =
8517 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8518 assert!(frame.is_some());
8519 state.record_observation_sent(0);
8520 }
8521
8522 if let Some(info) = state.path_addresses.get_mut(&0) {
8524 info.notified = false;
8525 }
8526 let frame3 =
8527 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8528 assert!(frame3.is_none()); let later = now + Duration::from_millis(100);
8532 state.rate_limiter.update_tokens(later); if let Some(info) = state.path_addresses.get_mut(&0) {
8536 info.notified = false;
8537 }
8538
8539 let frame4 =
8540 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8541 assert!(frame4.is_some()); }
8543
8544 #[test]
8545 fn test_queue_observed_address_frame() {
8546 let now = Instant::now();
8548 let config = AddressDiscoveryConfig::SendAndReceive;
8549 let mut state = AddressDiscoveryState::new(&config, now);
8550
8551 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8552
8553 let frame = state.queue_observed_address_frame(0, addr);
8555 assert!(frame.is_some());
8556 assert_eq!(frame.unwrap().address, addr);
8557
8558 state.record_observation_sent(0);
8560
8561 for i in 0..9 {
8563 if let Some(info) = state.path_addresses.get_mut(&0) {
8565 info.notified = false;
8566 }
8567
8568 let frame = state.queue_observed_address_frame(0, addr);
8569 assert!(frame.is_some(), "Frame {} should be allowed", i + 2);
8570 state.record_observation_sent(0);
8571 }
8572
8573 if let Some(info) = state.path_addresses.get_mut(&0) {
8575 info.notified = false;
8576 }
8577
8578 let frame = state.queue_observed_address_frame(0, addr);
8580 assert!(frame.is_none(), "11th frame should be rate limited");
8581 }
8582
8583 #[test]
8584 fn test_multi_path_basic() {
8585 let now = Instant::now();
8587 let config = AddressDiscoveryConfig::SendAndReceive;
8588 let mut state = AddressDiscoveryState::new(&config, now);
8589
8590 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8591 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8592 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8593
8594 state.handle_observed_address(addr1, 0, now);
8596 state.handle_observed_address(addr2, 1, now);
8597 state.handle_observed_address(addr3, 2, now);
8598
8599 assert_eq!(state.get_observed_address(0), Some(addr1));
8601 assert_eq!(state.get_observed_address(1), Some(addr2));
8602 assert_eq!(state.get_observed_address(2), Some(addr3));
8603
8604 assert!(state.has_unnotified_changes());
8606
8607 assert_eq!(state.observed_addresses.len(), 3);
8609 }
8610
8611 #[test]
8612 fn test_multi_path_observe_primary_only() {
8613 let now = Instant::now();
8615 let config = AddressDiscoveryConfig::SendAndReceive;
8616 let mut state = AddressDiscoveryState::new(&config, now);
8617
8618 assert!(state.should_send_observation(0, now));
8620 state.record_observation_sent(0);
8621
8622 assert!(!state.should_send_observation(1, now));
8624 assert!(!state.should_send_observation(2, now));
8625
8626 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8628 assert!(state.queue_observed_address_frame(0, addr).is_some());
8629 assert!(state.queue_observed_address_frame(1, addr).is_none());
8630 assert!(state.queue_observed_address_frame(2, addr).is_none());
8631 }
8632
8633 #[test]
8634 fn test_multi_path_rate_limiting() {
8635 let now = Instant::now();
8637 let config = AddressDiscoveryConfig::SendAndReceive;
8638 let mut state = AddressDiscoveryState::new(&config, now);
8639
8640 state.observe_all_paths = true;
8642
8643 for i in 0..21 {
8645 state
8646 .path_addresses
8647 .insert(i, paths::PathAddressInfo::new());
8648 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
8649 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8650 8080,
8651 ));
8652 }
8653
8654 for i in 0..10 {
8656 assert!(state.should_send_observation(i, now));
8657 state.record_observation_sent(i);
8658 }
8659
8660 assert!(!state.should_send_observation(10, now));
8662
8663 state.path_addresses.get_mut(&0).unwrap().notified = false;
8665 assert!(!state.should_send_observation(0, now)); let later = now + Duration::from_secs(1);
8669 for i in 10..20 {
8670 assert!(state.should_send_observation(i, later));
8671 state.record_observation_sent(i);
8672 }
8673 assert!(!state.should_send_observation(20, later));
8675 }
8676
8677 #[test]
8678 fn test_multi_path_address_changes() {
8679 let now = Instant::now();
8681 let config = AddressDiscoveryConfig::SendAndReceive;
8682 let mut state = AddressDiscoveryState::new(&config, now);
8683
8684 let addr1a = SocketAddr::from(([192, 168, 1, 1], 5000));
8685 let addr1b = SocketAddr::from(([192, 168, 1, 2], 5000));
8686 let addr2a = SocketAddr::from(([10, 0, 0, 1], 6000));
8687 let addr2b = SocketAddr::from(([10, 0, 0, 2], 6000));
8688
8689 state.handle_observed_address(addr1a, 0, now);
8691 state.handle_observed_address(addr2a, 1, now);
8692
8693 state.record_observation_sent(0);
8695 state.record_observation_sent(1);
8696 assert!(!state.has_unnotified_changes());
8697
8698 state.handle_observed_address(addr1b, 0, now + Duration::from_secs(1));
8700 assert!(state.has_unnotified_changes());
8701
8702 assert_eq!(state.get_observed_address(0), Some(addr1b));
8704 assert_eq!(state.get_observed_address(1), Some(addr2a));
8705
8706 state.record_observation_sent(0);
8708 assert!(!state.has_unnotified_changes());
8709
8710 state.handle_observed_address(addr2b, 1, now + Duration::from_secs(2));
8712 assert!(state.has_unnotified_changes());
8713 }
8714
8715 #[test]
8716 fn test_multi_path_migration() {
8717 let now = Instant::now();
8719 let config = AddressDiscoveryConfig::SendAndReceive;
8720 let mut state = AddressDiscoveryState::new(&config, now);
8721
8722 let addr_old = SocketAddr::from(([192, 168, 1, 1], 5000));
8723 let addr_new = SocketAddr::from(([10, 0, 0, 1], 6000));
8724
8725 state.handle_observed_address(addr_old, 0, now);
8727 assert_eq!(state.get_observed_address(0), Some(addr_old));
8728
8729 state.handle_observed_address(addr_new, 1, now + Duration::from_secs(1));
8731
8732 assert_eq!(state.get_observed_address(0), Some(addr_old));
8734 assert_eq!(state.get_observed_address(1), Some(addr_new));
8735
8736 assert_eq!(state.path_addresses.len(), 2);
8739 }
8740
8741 #[test]
8742 fn test_check_for_address_observations_multi_path() {
8743 let now = Instant::now();
8745 let config = AddressDiscoveryConfig::SendAndReceive;
8746 let mut state = AddressDiscoveryState::new(&config, now);
8747
8748 state.observe_all_paths = true;
8750
8751 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8753 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8754 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8755
8756 state.handle_observed_address(addr1, 0, now);
8757 state.handle_observed_address(addr2, 1, now);
8758 state.handle_observed_address(addr3, 2, now);
8759
8760 let frames = state.check_for_address_observations(0, true, now);
8762
8763 assert_eq!(frames.len(), 3);
8765
8766 let frame_addrs: Vec<_> = frames.iter().map(|f| f.address).collect();
8768 assert!(frame_addrs.contains(&addr1), "addr1 should be in frames");
8769 assert!(frame_addrs.contains(&addr2), "addr2 should be in frames");
8770 assert!(frame_addrs.contains(&addr3), "addr3 should be in frames");
8771
8772 assert!(!state.has_unnotified_changes());
8774 }
8775
8776 #[test]
8777 fn test_multi_path_with_peer_not_supporting() {
8778 let now = Instant::now();
8780 let config = AddressDiscoveryConfig::SendAndReceive;
8781 let mut state = AddressDiscoveryState::new(&config, now);
8782
8783 state.handle_observed_address(SocketAddr::from(([192, 168, 1, 1], 5000)), 0, now);
8785 state.handle_observed_address(SocketAddr::from(([10, 0, 0, 1], 6000)), 1, now);
8786
8787 let frames = state.check_for_address_observations(0, false, now);
8789 assert_eq!(frames.len(), 0);
8790
8791 assert!(state.has_unnotified_changes());
8793 }
8794
8795 #[test]
8797 fn test_bootstrap_node_aggressive_observation_mode() {
8798 let config = AddressDiscoveryConfig::SendAndReceive;
8800 let now = Instant::now();
8801 let mut state = AddressDiscoveryState::new(&config, now);
8802
8803 assert!(!state.is_bootstrap_mode());
8805
8806 state.set_bootstrap_mode(true);
8808 assert!(state.is_bootstrap_mode());
8809
8810 assert!(state.should_observe_path(0)); assert!(state.should_observe_path(1)); assert!(state.should_observe_path(2));
8814
8815 let bootstrap_rate = state.get_effective_rate_limit();
8817 assert!(bootstrap_rate > 10.0); }
8819
8820 #[test]
8821 fn test_bootstrap_node_immediate_observation() {
8822 let config = AddressDiscoveryConfig::SendAndReceive;
8824 let now = Instant::now();
8825 let mut state = AddressDiscoveryState::new(&config, now);
8826 state.set_bootstrap_mode(true);
8827
8828 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8830 state.handle_observed_address(addr, 0, now);
8831
8832 assert!(state.should_send_observation_immediately(true));
8834
8835 assert!(state.should_send_observation(0, now));
8837
8838 let frame = state.queue_observed_address_frame(0, addr);
8840 assert!(frame.is_some());
8841 }
8842
8843 #[test]
8844 fn test_bootstrap_node_multiple_path_observations() {
8845 let config = AddressDiscoveryConfig::SendAndReceive;
8847 let now = Instant::now();
8848 let mut state = AddressDiscoveryState::new(&config, now);
8849 state.set_bootstrap_mode(true);
8850
8851 let addrs = vec![
8853 (0, SocketAddr::from(([192, 168, 1, 1], 5000))),
8854 (1, SocketAddr::from(([10, 0, 0, 1], 6000))),
8855 (2, SocketAddr::from(([172, 16, 0, 1], 7000))),
8856 ];
8857
8858 for (path_id, addr) in &addrs {
8859 state.handle_observed_address(*addr, *path_id, now);
8860 }
8861
8862 let frames = state.check_for_address_observations(0, true, now);
8864 assert_eq!(frames.len(), 3);
8865
8866 for (_, addr) in &addrs {
8868 assert!(frames.iter().any(|f| f.address == *addr));
8869 }
8870 }
8871
8872 #[test]
8873 fn test_bootstrap_node_rate_limit_override() {
8874 let config = AddressDiscoveryConfig::SendAndReceive;
8876 let now = Instant::now();
8877 let mut state = AddressDiscoveryState::new(&config, now);
8878 state.set_bootstrap_mode(true);
8879
8880 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8882
8883 for i in 0..10 {
8885 state.handle_observed_address(addr, i, now);
8886 let can_send = state.should_send_observation(i, now);
8887 assert!(can_send, "Bootstrap node should send observation {i}");
8888 state.record_observation_sent(i);
8889 }
8890 }
8891
8892 #[test]
8893 fn test_bootstrap_node_configuration() {
8894 let config = AddressDiscoveryConfig::SendAndReceive;
8896 let mut state = AddressDiscoveryState::new(&config, Instant::now());
8897
8898 state.set_bootstrap_mode(true);
8900
8901 assert!(state.bootstrap_mode);
8903 assert!(state.enabled);
8904
8905 let effective_rate = state.get_effective_rate_limit();
8907 assert!(effective_rate > state.max_observation_rate as f64);
8908 }
8909
8910 #[test]
8911 fn test_bootstrap_node_persistent_observation() {
8912 let config = AddressDiscoveryConfig::SendAndReceive;
8914 let mut now = Instant::now();
8915 let mut state = AddressDiscoveryState::new(&config, now);
8916 state.set_bootstrap_mode(true);
8917
8918 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8919 let addr2 = SocketAddr::from(([192, 168, 1, 2], 5000));
8920
8921 state.handle_observed_address(addr1, 0, now);
8923 assert!(state.should_send_observation(0, now));
8924 state.record_observation_sent(0);
8925
8926 now += Duration::from_secs(60);
8928 state.handle_observed_address(addr2, 0, now);
8929
8930 assert!(state.should_send_observation(0, now));
8932 }
8933
8934 #[test]
8935 fn test_bootstrap_node_multi_peer_support() {
8936 let config = AddressDiscoveryConfig::SendAndReceive;
8939 let now = Instant::now();
8940 let mut state = AddressDiscoveryState::new(&config, now);
8941 state.set_bootstrap_mode(true);
8942
8943 let peer_addresses = vec![
8945 (0, SocketAddr::from(([192, 168, 1, 1], 5000))), (1, SocketAddr::from(([10, 0, 0, 1], 6000))), (2, SocketAddr::from(([172, 16, 0, 1], 7000))), (3, SocketAddr::from(([192, 168, 2, 1], 8000))), ];
8950
8951 for (path_id, addr) in &peer_addresses {
8953 state.handle_observed_address(*addr, *path_id, now);
8954 }
8955
8956 let frames = state.check_for_address_observations(0, true, now);
8958 assert_eq!(frames.len(), peer_addresses.len());
8959
8960 for (_, addr) in &peer_addresses {
8962 assert!(frames.iter().any(|f| f.address == *addr));
8963 }
8964 }
8965
8966 mod address_discovery_tests {
8968 include!("address_discovery_tests.rs");
8969 }
8970}