1use std::{
2 cmp,
3 collections::VecDeque,
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12use rand::{Rng, SeedableRng, rngs::StdRng};
15use thiserror::Error;
16use tracing::{debug, error, info, trace, trace_span, warn};
17
18use crate::{
19 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
20 MIN_INITIAL_SIZE, MtuDiscoveryConfig, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
21 TransportError, TransportErrorCode, VarInt,
22 cid_generator::ConnectionIdGenerator,
23 cid_queue::CidQueue,
24 coding::BufMutExt,
25 config::{ServerConfig, TransportConfig},
26 crypto::{self, KeyPair, Keys, PacketKey},
27 endpoint::AddressDiscoveryStats,
28 frame::{self, Close, Datagram, FrameStruct, NewToken},
29 packet::{
30 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
31 PacketNumber, PartialDecode, SpaceId,
32 },
33 range_set::ArrayRangeSet,
34 shared::{
35 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
36 EndpointEvent, EndpointEventInner,
37 },
38 token::{ResetToken, Token, TokenPayload},
39 transport_parameters::TransportParameters,
40};
41
42mod ack_frequency;
43use ack_frequency::AckFrequencyState;
44
45pub(crate) mod nat_traversal;
46use nat_traversal::NatTraversalState;
47pub(crate) use nat_traversal::{CoordinationPhase, NatTraversalError, NatTraversalRole};
48
49mod assembler;
50pub use assembler::Chunk;
51
52mod cid_state;
53use cid_state::CidState;
54
55mod datagrams;
56use datagrams::DatagramState;
57pub use datagrams::{Datagrams, SendDatagramError};
58
59mod mtud;
60use mtud::MtuDiscovery;
61
62mod pacing;
63
64mod packet_builder;
65use packet_builder::PacketBuilder;
66
67mod packet_crypto;
68use packet_crypto::{PrevCrypto, ZeroRttCrypto};
69
70mod paths;
71pub use paths::RttEstimator;
72use paths::{NatTraversalChallenges, PathData, PathResponses};
73
74mod send_buffer;
75
76mod spaces;
77#[cfg(fuzzing)]
78pub use spaces::Retransmits;
79#[cfg(not(fuzzing))]
80use spaces::Retransmits;
81use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
82
83mod stats;
84pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
85
86mod streams;
87#[cfg(fuzzing)]
88pub use streams::StreamsState;
89#[cfg(not(fuzzing))]
90use streams::StreamsState;
91pub use streams::{
92 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
93 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
94};
95
96mod timer;
97use crate::congestion::Controller;
98use timer::{Timer, TimerTable};
99
100pub struct Connection {
140 endpoint_config: Arc<EndpointConfig>,
141 config: Arc<TransportConfig>,
142 rng: StdRng,
143 crypto: Box<dyn crypto::Session>,
144 handshake_cid: ConnectionId,
146 rem_handshake_cid: ConnectionId,
148 local_ip: Option<IpAddr>,
151 path: PathData,
152 allow_mtud: bool,
154 prev_path: Option<(ConnectionId, PathData)>,
155 state: State,
156 side: ConnectionSide,
157 zero_rtt_enabled: bool,
159 zero_rtt_crypto: Option<ZeroRttCrypto>,
161 key_phase: bool,
162 key_phase_size: u64,
164 peer_params: TransportParameters,
166 orig_rem_cid: ConnectionId,
168 initial_dst_cid: ConnectionId,
170 retry_src_cid: Option<ConnectionId>,
173 lost_packets: u64,
175 events: VecDeque<Event>,
176 endpoint_events: VecDeque<EndpointEventInner>,
177 spin_enabled: bool,
179 spin: bool,
181 spaces: [PacketSpace; 3],
183 highest_space: SpaceId,
185 prev_crypto: Option<PrevCrypto>,
187 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
192 accepted_0rtt: bool,
193 permit_idle_reset: bool,
195 idle_timeout: Option<Duration>,
197 timers: TimerTable,
198 authentication_failures: u64,
200 error: Option<ConnectionError>,
202 packet_number_filter: PacketNumberFilter,
204
205 path_responses: PathResponses,
210 nat_traversal_challenges: NatTraversalChallenges,
212 close: bool,
213
214 ack_frequency: AckFrequencyState,
218
219 pto_count: u32,
224
225 receiving_ecn: bool,
230 total_authed_packets: u64,
232 app_limited: bool,
235
236 streams: StreamsState,
237 rem_cids: CidQueue,
239 local_cid_state: CidState,
241 datagrams: DatagramState,
243 stats: ConnectionStats,
245 version: u32,
247
248 nat_traversal: Option<NatTraversalState>,
250
251 nat_traversal_frame_config: frame::nat_traversal_unified::NatTraversalFrameConfig,
253
254 address_discovery_state: Option<AddressDiscoveryState>,
256
257 pqc_state: PqcState,
259
260 #[cfg(feature = "trace")]
262 trace_context: crate::tracing::TraceContext,
263
264 #[cfg(feature = "trace")]
266 event_log: Arc<crate::tracing::EventLog>,
267
268 #[cfg(feature = "__qlog")]
270 qlog_streamer: Option<Box<dyn std::io::Write + Send + Sync>>,
271}
272
273impl Connection {
274 pub(crate) fn new(
275 endpoint_config: Arc<EndpointConfig>,
276 config: Arc<TransportConfig>,
277 init_cid: ConnectionId,
278 loc_cid: ConnectionId,
279 rem_cid: ConnectionId,
280 remote: SocketAddr,
281 local_ip: Option<IpAddr>,
282 crypto: Box<dyn crypto::Session>,
283 cid_gen: &dyn ConnectionIdGenerator,
284 now: Instant,
285 version: u32,
286 allow_mtud: bool,
287 rng_seed: [u8; 32],
288 side_args: SideArgs,
289 ) -> Self {
290 let pref_addr_cid = side_args.pref_addr_cid();
291 let path_validated = side_args.path_validated();
292 let connection_side = ConnectionSide::from(side_args);
293 let side = connection_side.side();
294 let initial_space = PacketSpace {
295 crypto: Some(crypto.initial_keys(&init_cid, side)),
296 ..PacketSpace::new(now)
297 };
298 let state = State::Handshake(state::Handshake {
299 rem_cid_set: side.is_server(),
300 expected_token: Bytes::new(),
301 client_hello: None,
302 });
303 let mut rng = StdRng::from_seed(rng_seed);
304 let mut this = Self {
305 endpoint_config,
306 crypto,
307 handshake_cid: loc_cid,
308 rem_handshake_cid: rem_cid,
309 local_cid_state: CidState::new(
310 cid_gen.cid_len(),
311 cid_gen.cid_lifetime(),
312 now,
313 if pref_addr_cid.is_some() { 2 } else { 1 },
314 ),
315 path: PathData::new(remote, allow_mtud, None, now, &config),
316 allow_mtud,
317 local_ip,
318 prev_path: None,
319 state,
320 side: connection_side,
321 zero_rtt_enabled: false,
322 zero_rtt_crypto: None,
323 key_phase: false,
324 key_phase_size: rng.gen_range(10..1000),
331 peer_params: TransportParameters::default(),
332 orig_rem_cid: rem_cid,
333 initial_dst_cid: init_cid,
334 retry_src_cid: None,
335 lost_packets: 0,
336 events: VecDeque::new(),
337 endpoint_events: VecDeque::new(),
338 spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
339 spin: false,
340 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
341 highest_space: SpaceId::Initial,
342 prev_crypto: None,
343 next_crypto: None,
344 accepted_0rtt: false,
345 permit_idle_reset: true,
346 idle_timeout: match config.max_idle_timeout {
347 None | Some(VarInt(0)) => None,
348 Some(dur) => Some(Duration::from_millis(dur.0)),
349 },
350 timers: TimerTable::default(),
351 authentication_failures: 0,
352 error: None,
353 #[cfg(test)]
354 packet_number_filter: match config.deterministic_packet_numbers {
355 false => PacketNumberFilter::new(&mut rng),
356 true => PacketNumberFilter::disabled(),
357 },
358 #[cfg(not(test))]
359 packet_number_filter: PacketNumberFilter::new(&mut rng),
360
361 path_responses: PathResponses::default(),
362 nat_traversal_challenges: NatTraversalChallenges::default(),
363 close: false,
364
365 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
366 &TransportParameters::default(),
367 )),
368
369 pto_count: 0,
370
371 app_limited: false,
372 receiving_ecn: false,
373 total_authed_packets: 0,
374
375 streams: StreamsState::new(
376 side,
377 config.max_concurrent_uni_streams,
378 config.max_concurrent_bidi_streams,
379 config.send_window,
380 config.receive_window,
381 config.stream_receive_window,
382 ),
383 datagrams: DatagramState::default(),
384 config,
385 rem_cids: CidQueue::new(rem_cid),
386 rng,
387 stats: ConnectionStats::default(),
388 version,
389 nat_traversal: None, nat_traversal_frame_config:
391 frame::nat_traversal_unified::NatTraversalFrameConfig::default(),
392 address_discovery_state: {
393 Some(AddressDiscoveryState::new(
396 &crate::transport_parameters::AddressDiscoveryConfig::default(),
397 now,
398 ))
399 },
400 pqc_state: PqcState::new(),
401
402 #[cfg(feature = "trace")]
403 trace_context: crate::tracing::TraceContext::new(crate::tracing::TraceId::new()),
404
405 #[cfg(feature = "trace")]
406 event_log: crate::tracing::global_log(),
407
408 #[cfg(feature = "__qlog")]
409 qlog_streamer: None,
410 };
411
412 #[cfg(feature = "trace")]
414 {
415 use crate::trace_event;
416 use crate::tracing::{Event, EventData, socket_addr_to_bytes, timestamp_now};
417 let _peer_id = {
419 let mut id = [0u8; 32];
420 let addr_bytes = match remote {
421 SocketAddr::V4(addr) => addr.ip().octets().to_vec(),
422 SocketAddr::V6(addr) => addr.ip().octets().to_vec(),
423 };
424 id[..addr_bytes.len().min(32)]
425 .copy_from_slice(&addr_bytes[..addr_bytes.len().min(32)]);
426 id
427 };
428
429 let (addr_bytes, addr_type) = socket_addr_to_bytes(remote);
430 trace_event!(
431 &this.event_log,
432 Event {
433 timestamp: timestamp_now(),
434 trace_id: this.trace_context.trace_id(),
435 sequence: 0,
436 _padding: 0,
437 node_id: [0u8; 32], event_data: EventData::ConnInit {
439 endpoint_bytes: addr_bytes,
440 addr_type,
441 _padding: [0u8; 45],
442 },
443 }
444 );
445 }
446
447 if path_validated {
448 this.on_path_validated();
449 }
450 if side.is_client() {
451 this.write_crypto();
453 this.init_0rtt();
454 }
455 this
456 }
457
458 #[cfg(feature = "__qlog")]
460 pub fn set_qlog(
461 &mut self,
462 writer: Box<dyn std::io::Write + Send + Sync>,
463 _title: Option<String>,
464 _description: Option<String>,
465 _now: Instant,
466 ) {
467 self.qlog_streamer = Some(writer);
468 }
469
470 #[cfg(feature = "__qlog")]
472 fn emit_qlog_recovery_metrics(&mut self, _now: Instant) {
473 }
476
477 #[must_use]
485 pub fn poll_timeout(&mut self) -> Option<Instant> {
486 let mut next_timeout = self.timers.next_timeout();
487
488 if let Some(nat_state) = &self.nat_traversal {
490 if let Some(nat_timeout) = nat_state.get_next_timeout(Instant::now()) {
491 self.timers.set(Timer::NatTraversal, nat_timeout);
493 next_timeout = Some(next_timeout.map_or(nat_timeout, |t| t.min(nat_timeout)));
494 }
495 }
496
497 next_timeout
498 }
499
500 #[must_use]
506 pub fn poll(&mut self) -> Option<Event> {
507 if let Some(x) = self.events.pop_front() {
508 return Some(x);
509 }
510
511 if let Some(event) = self.streams.poll() {
512 return Some(Event::Stream(event));
513 }
514
515 if let Some(err) = self.error.take() {
516 return Some(Event::ConnectionLost { reason: err });
517 }
518
519 None
520 }
521
522 #[must_use]
524 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
525 self.endpoint_events.pop_front().map(EndpointEvent)
526 }
527
528 #[must_use]
530 pub fn streams(&mut self) -> Streams<'_> {
531 Streams {
532 state: &mut self.streams,
533 conn_state: &self.state,
534 }
535 }
536
537 #[cfg(feature = "trace")]
539 pub(crate) fn trace_context(&self) -> &crate::tracing::TraceContext {
540 &self.trace_context
541 }
542
543 #[cfg(feature = "trace")]
545 pub(crate) fn event_log(&self) -> &Arc<crate::tracing::EventLog> {
546 &self.event_log
547 }
548
549 #[must_use]
551 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
552 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
553 RecvStream {
554 id,
555 state: &mut self.streams,
556 pending: &mut self.spaces[SpaceId::Data].pending,
557 }
558 }
559
560 #[must_use]
562 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
563 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
564 SendStream {
565 id,
566 state: &mut self.streams,
567 pending: &mut self.spaces[SpaceId::Data].pending,
568 conn_state: &self.state,
569 }
570 }
571
572 #[must_use]
582 pub fn poll_transmit(
583 &mut self,
584 now: Instant,
585 max_datagrams: usize,
586 buf: &mut Vec<u8>,
587 ) -> Option<Transmit> {
588 assert!(max_datagrams != 0);
589 let max_datagrams = match self.config.enable_segmentation_offload {
590 false => 1,
591 true => max_datagrams,
592 };
593
594 let mut num_datagrams = 0;
595 let mut datagram_start = 0;
598 let mut segment_size = usize::from(self.path.current_mtu());
599
600 if let Some(nat_traversal) = &mut self.nat_traversal {
602 if nat_traversal.check_coordination_timeout(now) {
603 trace!("NAT traversal coordination timed out, may retry");
604 }
605 }
606
607 if let Some(challenge) = self.send_nat_traversal_challenge(now, buf) {
609 return Some(challenge);
610 }
611
612 if let Some(challenge) = self.send_path_challenge(now, buf) {
613 return Some(challenge);
614 }
615
616 for space in SpaceId::iter() {
618 let request_immediate_ack =
619 space == SpaceId::Data && self.peer_supports_ack_frequency();
620 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
621 }
622
623 let close = match self.state {
625 State::Drained => {
626 self.app_limited = true;
627 return None;
628 }
629 State::Draining | State::Closed(_) => {
630 if !self.close {
633 self.app_limited = true;
634 return None;
635 }
636 true
637 }
638 _ => false,
639 };
640
641 if let Some(config) = &self.config.ack_frequency_config {
643 self.spaces[SpaceId::Data].pending.ack_frequency = self
644 .ack_frequency
645 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
646 && self.highest_space == SpaceId::Data
647 && self.peer_supports_ack_frequency();
648 }
649
650 let mut buf_capacity = 0;
654
655 let mut coalesce = true;
656 let mut builder_storage: Option<PacketBuilder> = None;
657 let mut sent_frames = None;
658 let mut pad_datagram = false;
659 let mut pad_datagram_to_mtu = false;
660 let mut congestion_blocked = false;
661
662 let mut space_idx = 0;
664 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
665 while space_idx < spaces.len() {
668 let space_id = spaces[space_idx];
669 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
676 let frame_space_1rtt =
677 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
678
679 let can_send = self.space_can_send(space_id, frame_space_1rtt);
681 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
682 space_idx += 1;
683 continue;
684 }
685
686 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
687 || self.spaces[space_id].ping_pending
688 || self.spaces[space_id].immediate_ack_pending;
689 if space_id == SpaceId::Data {
690 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
691 }
692
693 pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
694
695 let buf_end = if let Some(builder) = &builder_storage {
699 buf.len().max(builder.min_size) + builder.tag_len
700 } else {
701 buf.len()
702 };
703
704 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
705 crypto.packet.local.tag_len()
706 } else if space_id == SpaceId::Data {
707 match self.zero_rtt_crypto.as_ref() {
708 Some(crypto) => crypto.packet.tag_len(),
709 None => {
710 error!(
712 "sending packets in the application data space requires known 0-RTT or 1-RTT keys"
713 );
714 return None;
715 }
716 }
717 } else {
718 unreachable!("tried to send {:?} packet without keys", space_id)
719 };
720 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
721 if num_datagrams >= max_datagrams {
725 break;
727 }
728
729 if self
736 .path
737 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
738 {
739 trace!("blocked by anti-amplification");
740 break;
741 }
742
743 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
746 let untracked_bytes = if let Some(builder) = &builder_storage {
748 buf_capacity - builder.partial_encode.start
749 } else {
750 0
751 } as u64;
752 debug_assert!(untracked_bytes <= segment_size as u64);
753
754 let bytes_to_send = segment_size as u64 + untracked_bytes;
755 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
756 space_idx += 1;
757 congestion_blocked = true;
758 trace!("blocked by congestion control");
761 continue;
762 }
763
764 let smoothed_rtt = self.path.rtt.get();
766 if let Some(delay) = self.path.pacing.delay(
767 smoothed_rtt,
768 bytes_to_send,
769 self.path.current_mtu(),
770 self.path.congestion.window(),
771 now,
772 ) {
773 self.timers.set(Timer::Pacing, delay);
774 congestion_blocked = true;
775 trace!("blocked by pacing");
778 break;
779 }
780 }
781
782 if let Some(mut builder) = builder_storage.take() {
784 if pad_datagram {
785 let min_size = self.pqc_state.min_initial_size();
786 builder.pad_to(min_size);
787 }
788
789 if num_datagrams > 1 || pad_datagram_to_mtu {
790 const MAX_PADDING: usize = 16;
803 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
804 - datagram_start
805 + builder.tag_len;
806 if (packet_len_unpadded + MAX_PADDING < segment_size
807 && !pad_datagram_to_mtu)
808 || datagram_start + segment_size > buf_capacity
809 {
810 trace!(
811 "GSO truncated by demand for {} padding bytes or loss probe",
812 segment_size - packet_len_unpadded
813 );
814 builder_storage = Some(builder);
815 break;
816 }
817
818 builder.pad_to(segment_size as u16);
821 }
822
823 builder.finish_and_track(now, self, sent_frames.take(), buf);
824
825 if num_datagrams == 1 {
826 segment_size = buf.len();
833 buf_capacity = buf.len();
836
837 if space_id == SpaceId::Data {
844 let frame_space_1rtt =
845 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
846 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
847 break;
848 }
849 }
850 }
851 }
852
853 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
855 0 => segment_size,
856 _ => {
857 self.spaces[space_id].loss_probes -= 1;
858 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
862 }
863 };
864 buf_capacity += next_datagram_size_limit;
865 if buf.capacity() < buf_capacity {
866 buf.reserve(max_datagrams * segment_size);
875 }
876 num_datagrams += 1;
877 coalesce = true;
878 pad_datagram = false;
879 datagram_start = buf.len();
880
881 debug_assert_eq!(
882 datagram_start % segment_size,
883 0,
884 "datagrams in a GSO batch must be aligned to the segment size"
885 );
886 } else {
887 if let Some(builder) = builder_storage.take() {
891 builder.finish_and_track(now, self, sent_frames.take(), buf);
892 }
893 }
894
895 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
896
897 if self.spaces[SpaceId::Initial].crypto.is_some()
902 && space_id == SpaceId::Handshake
903 && self.side.is_client()
904 {
905 self.discard_space(now, SpaceId::Initial);
908 }
909 if let Some(ref mut prev) = self.prev_crypto {
910 prev.update_unacked = false;
911 }
912
913 debug_assert!(
914 builder_storage.is_none() && sent_frames.is_none(),
915 "Previous packet must have been finished"
916 );
917
918 let builder = builder_storage.insert(PacketBuilder::new(
919 now,
920 space_id,
921 self.rem_cids.active(),
922 buf,
923 buf_capacity,
924 datagram_start,
925 ack_eliciting,
926 self,
927 )?);
928 coalesce = coalesce && !builder.short_header;
929
930 let should_adjust_coalescing = self
932 .pqc_state
933 .should_adjust_coalescing(buf.len() - datagram_start, space_id);
934
935 if should_adjust_coalescing {
936 coalesce = false;
937 trace!("Disabling coalescing for PQC handshake in {:?}", space_id);
938 }
939
940 pad_datagram |=
942 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
943
944 if close {
945 trace!("sending CONNECTION_CLOSE");
946 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
951 Self::populate_acks(
952 now,
953 self.receiving_ecn,
954 &mut SentFrames::default(),
955 &mut self.spaces[space_id],
956 buf,
957 &mut self.stats,
958 );
959 }
960
961 debug_assert!(
965 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
966 "ACKs should leave space for ConnectionClose"
967 );
968 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
969 let max_frame_size = builder.max_size - buf.len();
970 match self.state {
971 State::Closed(state::Closed { ref reason }) => {
972 if space_id == SpaceId::Data || reason.is_transport_layer() {
973 reason.encode(buf, max_frame_size)
974 } else {
975 frame::ConnectionClose {
976 error_code: TransportErrorCode::APPLICATION_ERROR,
977 frame_type: None,
978 reason: Bytes::new(),
979 }
980 .encode(buf, max_frame_size)
981 }
982 }
983 State::Draining => frame::ConnectionClose {
984 error_code: TransportErrorCode::NO_ERROR,
985 frame_type: None,
986 reason: Bytes::new(),
987 }
988 .encode(buf, max_frame_size),
989 _ => unreachable!(
990 "tried to make a close packet when the connection wasn't closed"
991 ),
992 }
993 }
994 if space_id == self.highest_space {
995 self.close = false;
997 break;
999 } else {
1000 space_idx += 1;
1004 continue;
1005 }
1006 }
1007
1008 if space_id == SpaceId::Data && num_datagrams == 1 {
1011 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
1012 let mut builder = builder_storage.take().unwrap();
1015 trace!("PATH_RESPONSE {:08x} (off-path)", token);
1016 buf.write(frame::FrameType::PATH_RESPONSE);
1017 buf.write(token);
1018 self.stats.frame_tx.path_response += 1;
1019 let min_size = self.pqc_state.min_initial_size();
1020 builder.pad_to(min_size);
1021 builder.finish_and_track(
1022 now,
1023 self,
1024 Some(SentFrames {
1025 non_retransmits: true,
1026 ..SentFrames::default()
1027 }),
1028 buf,
1029 );
1030 self.stats.udp_tx.on_sent(1, buf.len());
1031
1032 #[cfg(feature = "trace")]
1034 {
1035 use crate::trace_packet_sent;
1036 trace_packet_sent!(
1038 &self.event_log,
1039 self.trace_context.trace_id(),
1040 buf.len() as u32,
1041 0 );
1043 }
1044
1045 return Some(Transmit {
1046 destination: remote,
1047 size: buf.len(),
1048 ecn: None,
1049 segment_size: None,
1050 src_ip: self.local_ip,
1051 });
1052 }
1053 }
1054
1055 if space_id == SpaceId::Data && self.address_discovery_state.is_some() {
1057 let peer_supports = self.peer_params.address_discovery.is_some();
1058
1059 if let Some(state) = &mut self.address_discovery_state {
1060 let frames = state.check_for_address_observations(0, peer_supports, now);
1061 self.spaces[space_id]
1062 .pending
1063 .observed_addresses
1064 .extend(frames);
1065 }
1066 }
1067
1068 let sent =
1069 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
1070
1071 debug_assert!(
1078 !(sent.is_ack_only(&self.streams)
1079 && !can_send.acks
1080 && can_send.other
1081 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
1082 && self.datagrams.outgoing.is_empty()),
1083 "SendableFrames was {can_send:?}, but only ACKs have been written"
1084 );
1085 pad_datagram |= sent.requires_padding;
1086
1087 if sent.largest_acked.is_some() {
1088 self.spaces[space_id].pending_acks.acks_sent();
1089 self.timers.stop(Timer::MaxAckDelay);
1090 }
1091
1092 sent_frames = Some(sent);
1094
1095 }
1098
1099 if let Some(mut builder) = builder_storage {
1101 if pad_datagram {
1102 let min_size = self.pqc_state.min_initial_size();
1103 builder.pad_to(min_size);
1104 }
1105
1106 if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
1112 builder.pad_to(segment_size as u16);
1113 }
1114
1115 let last_packet_number = builder.exact_number;
1116 builder.finish_and_track(now, self, sent_frames, buf);
1117 self.path
1118 .congestion
1119 .on_sent(now, buf.len() as u64, last_packet_number);
1120
1121 #[cfg(feature = "__qlog")]
1122 self.emit_qlog_recovery_metrics(now);
1123 }
1124
1125 self.app_limited = buf.is_empty() && !congestion_blocked;
1126
1127 if buf.is_empty() && self.state.is_established() {
1129 let space_id = SpaceId::Data;
1130 let probe_size = self
1131 .path
1132 .mtud
1133 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
1134
1135 let buf_capacity = probe_size as usize;
1136 buf.reserve(buf_capacity);
1137
1138 let mut builder = PacketBuilder::new(
1139 now,
1140 space_id,
1141 self.rem_cids.active(),
1142 buf,
1143 buf_capacity,
1144 0,
1145 true,
1146 self,
1147 )?;
1148
1149 buf.write(frame::FrameType::PING);
1151 self.stats.frame_tx.ping += 1;
1152
1153 if self.peer_supports_ack_frequency() {
1155 buf.write(frame::FrameType::IMMEDIATE_ACK);
1156 self.stats.frame_tx.immediate_ack += 1;
1157 }
1158
1159 builder.pad_to(probe_size);
1160 let sent_frames = SentFrames {
1161 non_retransmits: true,
1162 ..Default::default()
1163 };
1164 builder.finish_and_track(now, self, Some(sent_frames), buf);
1165
1166 self.stats.path.sent_plpmtud_probes += 1;
1167 num_datagrams = 1;
1168
1169 trace!(?probe_size, "writing MTUD probe");
1170 }
1171
1172 if buf.is_empty() {
1173 return None;
1174 }
1175
1176 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1177 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1178
1179 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
1180
1181 #[cfg(feature = "trace")]
1183 {
1184 use crate::trace_packet_sent;
1185 let packet_num = self.spaces[SpaceId::Data]
1188 .next_packet_number
1189 .saturating_sub(1);
1190 trace_packet_sent!(
1191 &self.event_log,
1192 self.trace_context.trace_id(),
1193 buf.len() as u32,
1194 packet_num
1195 );
1196 }
1197
1198 Some(Transmit {
1199 destination: self.path.remote,
1200 size: buf.len(),
1201 ecn: if self.path.sending_ecn {
1202 Some(EcnCodepoint::Ect0)
1203 } else {
1204 None
1205 },
1206 segment_size: match num_datagrams {
1207 1 => None,
1208 _ => Some(segment_size),
1209 },
1210 src_ip: self.local_ip,
1211 })
1212 }
1213
1214 fn send_coordination_request(&mut self, _now: Instant, _buf: &mut Vec<u8>) -> Option<Transmit> {
1216 let nat = self.nat_traversal.as_mut()?;
1218 if !nat.should_send_punch_request() {
1219 return None;
1220 }
1221
1222 let coord = nat.coordination.as_ref()?;
1223 let round = coord.round;
1224 if coord.punch_targets.is_empty() {
1225 return None;
1226 }
1227
1228 trace!(
1229 "queuing PUNCH_ME_NOW round {} with {} targets",
1230 round,
1231 coord.punch_targets.len()
1232 );
1233
1234 for target in &coord.punch_targets {
1236 let punch = frame::PunchMeNow {
1237 round,
1238 paired_with_sequence_number: target.remote_sequence,
1239 address: target.remote_addr,
1240 target_peer_id: None,
1241 };
1242 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1243 }
1244
1245 nat.mark_punch_request_sent();
1247
1248 None
1250 }
1251
1252 fn send_coordinated_path_challenge(
1254 &mut self,
1255 now: Instant,
1256 buf: &mut Vec<u8>,
1257 ) -> Option<Transmit> {
1258 if let Some(nat_traversal) = &mut self.nat_traversal {
1260 if nat_traversal.should_start_punching(now) {
1261 nat_traversal.start_punching_phase(now);
1262 }
1263 }
1264
1265 let (target_addr, challenge) = {
1267 let nat_traversal = self.nat_traversal.as_ref()?;
1268 match nat_traversal.get_coordination_phase() {
1269 Some(CoordinationPhase::Punching) => {
1270 let targets = nat_traversal.get_punch_targets_from_coordination()?;
1271 if targets.is_empty() {
1272 return None;
1273 }
1274 let target = &targets[0];
1276 (target.remote_addr, target.challenge)
1277 }
1278 _ => return None,
1279 }
1280 };
1281
1282 debug_assert_eq!(
1283 self.highest_space,
1284 SpaceId::Data,
1285 "PATH_CHALLENGE queued without 1-RTT keys"
1286 );
1287
1288 #[cfg(feature = "pqc")]
1289 buf.reserve(self.pqc_state.min_initial_size() as usize);
1290 #[cfg(not(feature = "pqc"))]
1291 buf.reserve(MIN_INITIAL_SIZE as usize);
1292 let buf_capacity = buf.capacity();
1293
1294 let mut builder = PacketBuilder::new(
1295 now,
1296 SpaceId::Data,
1297 self.rem_cids.active(),
1298 buf,
1299 buf_capacity,
1300 0,
1301 false,
1302 self,
1303 )?;
1304
1305 trace!(
1306 "sending coordinated PATH_CHALLENGE {:08x} to {}",
1307 challenge, target_addr
1308 );
1309 buf.write(frame::FrameType::PATH_CHALLENGE);
1310 buf.write(challenge);
1311 self.stats.frame_tx.path_challenge += 1;
1312
1313 #[cfg(feature = "pqc")]
1314 let min_size = self.pqc_state.min_initial_size();
1315 #[cfg(not(feature = "pqc"))]
1316 let min_size = MIN_INITIAL_SIZE;
1317 builder.pad_to(min_size);
1318 builder.finish_and_track(now, self, None, buf);
1319
1320 if let Some(nat_traversal) = &mut self.nat_traversal {
1322 nat_traversal.mark_coordination_validating();
1323 }
1324
1325 Some(Transmit {
1326 destination: target_addr,
1327 size: buf.len(),
1328 ecn: if self.path.sending_ecn {
1329 Some(EcnCodepoint::Ect0)
1330 } else {
1331 None
1332 },
1333 segment_size: None,
1334 src_ip: self.local_ip,
1335 })
1336 }
1337
1338 fn send_nat_traversal_challenge(
1340 &mut self,
1341 now: Instant,
1342 buf: &mut Vec<u8>,
1343 ) -> Option<Transmit> {
1344 if let Some(request) = self.send_coordination_request(now, buf) {
1346 return Some(request);
1347 }
1348
1349 if let Some(punch) = self.send_coordinated_path_challenge(now, buf) {
1351 return Some(punch);
1352 }
1353
1354 let (remote_addr, remote_sequence) = {
1356 let nat_traversal = self.nat_traversal.as_ref()?;
1357 let candidates = nat_traversal.get_validation_candidates();
1358 if candidates.is_empty() {
1359 return None;
1360 }
1361 let (sequence, candidate) = candidates[0];
1363 (candidate.address, sequence)
1364 };
1365
1366 let challenge = self.rng.r#gen::<u64>();
1367
1368 if let Err(e) =
1370 self.nat_traversal
1371 .as_mut()?
1372 .start_validation(remote_sequence, challenge, now)
1373 {
1374 warn!("Failed to start NAT traversal validation: {}", e);
1375 return None;
1376 }
1377
1378 debug_assert_eq!(
1379 self.highest_space,
1380 SpaceId::Data,
1381 "PATH_CHALLENGE queued without 1-RTT keys"
1382 );
1383
1384 #[cfg(feature = "pqc")]
1385 buf.reserve(self.pqc_state.min_initial_size() as usize);
1386 #[cfg(not(feature = "pqc"))]
1387 buf.reserve(MIN_INITIAL_SIZE as usize);
1388 let buf_capacity = buf.capacity();
1389
1390 let mut builder = PacketBuilder::new(
1392 now,
1393 SpaceId::Data,
1394 self.rem_cids.active(),
1395 buf,
1396 buf_capacity,
1397 0,
1398 false,
1399 self,
1400 )?;
1401
1402 trace!(
1403 "sending PATH_CHALLENGE {:08x} to NAT candidate {}",
1404 challenge, remote_addr
1405 );
1406 buf.write(frame::FrameType::PATH_CHALLENGE);
1407 buf.write(challenge);
1408 self.stats.frame_tx.path_challenge += 1;
1409
1410 #[cfg(feature = "pqc")]
1412 let min_size = self.pqc_state.min_initial_size();
1413 #[cfg(not(feature = "pqc"))]
1414 let min_size = MIN_INITIAL_SIZE;
1415 builder.pad_to(min_size);
1416
1417 builder.finish_and_track(now, self, None, buf);
1418
1419 Some(Transmit {
1420 destination: remote_addr,
1421 size: buf.len(),
1422 ecn: if self.path.sending_ecn {
1423 Some(EcnCodepoint::Ect0)
1424 } else {
1425 None
1426 },
1427 segment_size: None,
1428 src_ip: self.local_ip,
1429 })
1430 }
1431
1432 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1434 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1435 if !prev_path.challenge_pending {
1436 return None;
1437 }
1438 prev_path.challenge_pending = false;
1439 let token = prev_path
1440 .challenge
1441 .expect("previous path challenge pending without token");
1442 let destination = prev_path.remote;
1443 debug_assert_eq!(
1444 self.highest_space,
1445 SpaceId::Data,
1446 "PATH_CHALLENGE queued without 1-RTT keys"
1447 );
1448 #[cfg(feature = "pqc")]
1449 buf.reserve(self.pqc_state.min_initial_size() as usize);
1450 #[cfg(not(feature = "pqc"))]
1451 buf.reserve(MIN_INITIAL_SIZE as usize);
1452
1453 let buf_capacity = buf.capacity();
1454
1455 let mut builder = PacketBuilder::new(
1461 now,
1462 SpaceId::Data,
1463 *prev_cid,
1464 buf,
1465 buf_capacity,
1466 0,
1467 false,
1468 self,
1469 )?;
1470 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1471 buf.write(frame::FrameType::PATH_CHALLENGE);
1472 buf.write(token);
1473 self.stats.frame_tx.path_challenge += 1;
1474
1475 #[cfg(feature = "pqc")]
1480 let min_size = self.pqc_state.min_initial_size();
1481 #[cfg(not(feature = "pqc"))]
1482 let min_size = MIN_INITIAL_SIZE;
1483 builder.pad_to(min_size);
1484
1485 builder.finish(self, buf);
1486 self.stats.udp_tx.on_sent(1, buf.len());
1487
1488 Some(Transmit {
1489 destination,
1490 size: buf.len(),
1491 ecn: None,
1492 segment_size: None,
1493 src_ip: self.local_ip,
1494 })
1495 }
1496
1497 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1499 if self.spaces[space_id].crypto.is_none()
1500 && (space_id != SpaceId::Data
1501 || self.zero_rtt_crypto.is_none()
1502 || self.side.is_server())
1503 {
1504 return SendableFrames::empty();
1506 }
1507 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1508 if space_id == SpaceId::Data {
1509 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1510 }
1511 can_send
1512 }
1513
1514 pub fn handle_event(&mut self, event: ConnectionEvent) {
1520 use ConnectionEventInner::*;
1521 match event.0 {
1522 Datagram(DatagramConnectionEvent {
1523 now,
1524 remote,
1525 ecn,
1526 first_decode,
1527 remaining,
1528 }) => {
1529 if remote != self.path.remote && !self.side.remote_may_migrate() {
1533 trace!("discarding packet from unrecognized peer {}", remote);
1534 return;
1535 }
1536
1537 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1538
1539 self.stats.udp_rx.datagrams += 1;
1540 self.stats.udp_rx.bytes += first_decode.len() as u64;
1541 let data_len = first_decode.len();
1542
1543 self.handle_decode(now, remote, ecn, first_decode);
1544 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1549
1550 if let Some(data) = remaining {
1551 self.stats.udp_rx.bytes += data.len() as u64;
1552 self.handle_coalesced(now, remote, ecn, data);
1553 }
1554
1555 #[cfg(feature = "__qlog")]
1556 self.emit_qlog_recovery_metrics(now);
1557
1558 if was_anti_amplification_blocked {
1559 self.set_loss_detection_timer(now);
1563 }
1564 }
1565 NewIdentifiers(ids, now) => {
1566 self.local_cid_state.new_cids(&ids, now);
1567 ids.into_iter().rev().for_each(|frame| {
1568 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1569 });
1570 if self.timers.get(Timer::PushNewCid).is_none_or(|x| x <= now) {
1572 self.reset_cid_retirement();
1573 }
1574 }
1575 QueueAddAddress(add) => {
1576 self.spaces[SpaceId::Data].pending.add_addresses.push(add);
1578 }
1579 QueuePunchMeNow(punch) => {
1580 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1582 }
1583 }
1584 }
1585
1586 pub fn handle_timeout(&mut self, now: Instant) {
1596 for &timer in &Timer::VALUES {
1597 if !self.timers.is_expired(timer, now) {
1598 continue;
1599 }
1600 self.timers.stop(timer);
1601 trace!(timer = ?timer, "timeout");
1602 match timer {
1603 Timer::Close => {
1604 self.state = State::Drained;
1605 self.endpoint_events.push_back(EndpointEventInner::Drained);
1606 }
1607 Timer::Idle => {
1608 self.kill(ConnectionError::TimedOut);
1609 }
1610 Timer::KeepAlive => {
1611 trace!("sending keep-alive");
1612 self.ping();
1613 }
1614 Timer::LossDetection => {
1615 self.on_loss_detection_timeout(now);
1616
1617 #[cfg(feature = "__qlog")]
1618 self.emit_qlog_recovery_metrics(now);
1619 }
1620 Timer::KeyDiscard => {
1621 self.zero_rtt_crypto = None;
1622 self.prev_crypto = None;
1623 }
1624 Timer::PathValidation => {
1625 debug!("path validation failed");
1626 if let Some((_, prev)) = self.prev_path.take() {
1627 self.path = prev;
1628 }
1629 self.path.challenge = None;
1630 self.path.challenge_pending = false;
1631 }
1632 Timer::Pacing => trace!("pacing timer expired"),
1633 Timer::NatTraversal => {
1634 self.handle_nat_traversal_timeout(now);
1635 }
1636 Timer::PushNewCid => {
1637 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1639 if !self.state.is_closed() {
1640 trace!(
1641 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1642 self.local_cid_state.retire_prior_to()
1643 );
1644 self.endpoint_events
1645 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1646 }
1647 }
1648 Timer::MaxAckDelay => {
1649 trace!("max ack delay reached");
1650 self.spaces[SpaceId::Data]
1652 .pending_acks
1653 .on_max_ack_delay_timeout()
1654 }
1655 }
1656 }
1657 }
1658
1659 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1671 self.close_inner(
1672 now,
1673 Close::Application(frame::ApplicationClose { error_code, reason }),
1674 )
1675 }
1676
1677 fn close_inner(&mut self, now: Instant, reason: Close) {
1678 let was_closed = self.state.is_closed();
1679 if !was_closed {
1680 self.close_common();
1681 self.set_close_timer(now);
1682 self.close = true;
1683 self.state = State::Closed(state::Closed { reason });
1684 }
1685 }
1686
1687 pub fn datagrams(&mut self) -> Datagrams<'_> {
1689 Datagrams { conn: self }
1690 }
1691
1692 pub fn stats(&self) -> ConnectionStats {
1694 let mut stats = self.stats;
1695 stats.path.rtt = self.path.rtt.get();
1696 stats.path.cwnd = self.path.congestion.window();
1697 stats.path.current_mtu = self.path.mtud.current_mtu();
1698
1699 stats
1700 }
1701
1702 pub fn ping(&mut self) {
1706 self.spaces[self.highest_space].ping_pending = true;
1707 }
1708
1709 pub fn force_key_update(&mut self) {
1713 if !self.state.is_established() {
1714 debug!("ignoring forced key update in illegal state");
1715 return;
1716 }
1717 if self.prev_crypto.is_some() {
1718 debug!("ignoring redundant forced key update");
1721 return;
1722 }
1723 self.update_keys(None, false);
1724 }
1725
1726 #[doc(hidden)]
1728 #[deprecated]
1729 pub fn initiate_key_update(&mut self) {
1730 self.force_key_update();
1731 }
1732
1733 pub fn crypto_session(&self) -> &dyn crypto::Session {
1735 &*self.crypto
1736 }
1737
1738 pub fn is_handshaking(&self) -> bool {
1743 self.state.is_handshake()
1744 }
1745
1746 pub fn is_closed(&self) -> bool {
1754 self.state.is_closed()
1755 }
1756
1757 pub fn is_drained(&self) -> bool {
1762 self.state.is_drained()
1763 }
1764
1765 pub fn accepted_0rtt(&self) -> bool {
1769 self.accepted_0rtt
1770 }
1771
1772 pub fn has_0rtt(&self) -> bool {
1774 self.zero_rtt_enabled
1775 }
1776
1777 pub fn has_pending_retransmits(&self) -> bool {
1779 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1780 }
1781
1782 pub fn side(&self) -> Side {
1784 self.side.side()
1785 }
1786
1787 pub fn remote_address(&self) -> SocketAddr {
1789 self.path.remote
1790 }
1791
1792 pub fn local_ip(&self) -> Option<IpAddr> {
1802 self.local_ip
1803 }
1804
1805 pub fn rtt(&self) -> Duration {
1807 self.path.rtt.get()
1808 }
1809
1810 pub fn congestion_state(&self) -> &dyn Controller {
1812 self.path.congestion.as_ref()
1813 }
1814
1815 pub fn path_changed(&mut self, now: Instant) {
1826 self.path.reset(now, &self.config);
1827 }
1828
1829 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1834 self.streams.set_max_concurrent(dir, count);
1835 let pending = &mut self.spaces[SpaceId::Data].pending;
1838 self.streams.queue_max_stream_id(pending);
1839 }
1840
1841 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1847 self.streams.max_concurrent(dir)
1848 }
1849
1850 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1852 if self.streams.set_receive_window(receive_window) {
1853 self.spaces[SpaceId::Data].pending.max_data = true;
1854 }
1855 }
1856
1857 pub fn set_address_discovery_enabled(&mut self, enabled: bool) {
1859 if let Some(ref mut state) = self.address_discovery_state {
1860 state.enabled = enabled;
1861 }
1862 }
1863
1864 pub fn address_discovery_enabled(&self) -> bool {
1866 self.address_discovery_state
1867 .as_ref()
1868 .is_some_and(|state| state.enabled)
1869 }
1870
1871 pub fn observed_address(&self) -> Option<SocketAddr> {
1876 self.address_discovery_state
1877 .as_ref()
1878 .and_then(|state| state.get_observed_address(0)) }
1880
1881 pub(crate) fn address_discovery_state(&self) -> Option<&AddressDiscoveryState> {
1883 self.address_discovery_state.as_ref()
1884 }
1885
1886 fn on_ack_received(
1887 &mut self,
1888 now: Instant,
1889 space: SpaceId,
1890 ack: frame::Ack,
1891 ) -> Result<(), TransportError> {
1892 if ack.largest >= self.spaces[space].next_packet_number {
1893 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1894 }
1895 let new_largest = {
1896 let space = &mut self.spaces[space];
1897 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
1898 space.largest_acked_packet = Some(ack.largest);
1899 if let Some(info) = space.sent_packets.get(&ack.largest) {
1900 space.largest_acked_packet_sent = info.time_sent;
1904 }
1905 true
1906 } else {
1907 false
1908 }
1909 };
1910
1911 let mut newly_acked = ArrayRangeSet::new();
1913 for range in ack.iter() {
1914 self.packet_number_filter.check_ack(space, range.clone())?;
1915 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1916 newly_acked.insert_one(pn);
1917 }
1918 }
1919
1920 if newly_acked.is_empty() {
1921 return Ok(());
1922 }
1923
1924 let mut ack_eliciting_acked = false;
1925 for packet in newly_acked.elts() {
1926 if let Some(info) = self.spaces[space].take(packet) {
1927 if let Some(acked) = info.largest_acked {
1928 self.spaces[space].pending_acks.subtract_below(acked);
1934 }
1935 ack_eliciting_acked |= info.ack_eliciting;
1936
1937 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1939 if mtu_updated {
1940 self.path
1941 .congestion
1942 .on_mtu_update(self.path.mtud.current_mtu());
1943 }
1944
1945 self.ack_frequency.on_acked(packet);
1947
1948 self.on_packet_acked(now, packet, info);
1949 }
1950 }
1951
1952 self.path.congestion.on_end_acks(
1953 now,
1954 self.path.in_flight.bytes,
1955 self.app_limited,
1956 self.spaces[space].largest_acked_packet,
1957 );
1958
1959 if new_largest && ack_eliciting_acked {
1960 let ack_delay = if space != SpaceId::Data {
1961 Duration::from_micros(0)
1962 } else {
1963 cmp::min(
1964 self.ack_frequency.peer_max_ack_delay,
1965 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1966 )
1967 };
1968 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1969 self.path.rtt.update(ack_delay, rtt);
1970 if self.path.first_packet_after_rtt_sample.is_none() {
1971 self.path.first_packet_after_rtt_sample =
1972 Some((space, self.spaces[space].next_packet_number));
1973 }
1974 }
1975
1976 self.detect_lost_packets(now, space, true);
1978
1979 if self.peer_completed_address_validation() {
1980 self.pto_count = 0;
1981 }
1982
1983 if self.path.sending_ecn {
1985 if let Some(ecn) = ack.ecn {
1986 if new_largest {
1991 let sent = self.spaces[space].largest_acked_packet_sent;
1992 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1993 }
1994 } else {
1995 debug!("ECN not acknowledged by peer");
1997 self.path.sending_ecn = false;
1998 }
1999 }
2000
2001 self.set_loss_detection_timer(now);
2002 Ok(())
2003 }
2004
2005 fn process_ecn(
2007 &mut self,
2008 now: Instant,
2009 space: SpaceId,
2010 newly_acked: u64,
2011 ecn: frame::EcnCounts,
2012 largest_sent_time: Instant,
2013 ) {
2014 match self.spaces[space].detect_ecn(newly_acked, ecn) {
2015 Err(e) => {
2016 debug!("halting ECN due to verification failure: {}", e);
2017 self.path.sending_ecn = false;
2018 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
2021 }
2022 Ok(false) => {}
2023 Ok(true) => {
2024 self.stats.path.congestion_events += 1;
2025 self.path
2026 .congestion
2027 .on_congestion_event(now, largest_sent_time, false, 0);
2028 }
2029 }
2030 }
2031
2032 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
2035 self.remove_in_flight(pn, &info);
2036 if info.ack_eliciting && self.path.challenge.is_none() {
2037 self.path.congestion.on_ack(
2040 now,
2041 info.time_sent,
2042 info.size.into(),
2043 self.app_limited,
2044 &self.path.rtt,
2045 );
2046 }
2047
2048 if let Some(retransmits) = info.retransmits.get() {
2050 for (id, _) in retransmits.reset_stream.iter() {
2051 self.streams.reset_acked(*id);
2052 }
2053 }
2054
2055 for frame in info.stream_frames {
2056 self.streams.received_ack_of(frame);
2057 }
2058 }
2059
2060 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2061 let start = if self.zero_rtt_crypto.is_some() {
2062 now
2063 } else {
2064 self.prev_crypto
2065 .as_ref()
2066 .expect("no previous keys")
2067 .end_packet
2068 .as_ref()
2069 .expect("update not acknowledged yet")
2070 .1
2071 };
2072 self.timers
2073 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
2074 }
2075
2076 fn on_loss_detection_timeout(&mut self, now: Instant) {
2077 if let Some((_, pn_space)) = self.loss_time_and_space() {
2078 self.detect_lost_packets(now, pn_space, false);
2080 self.set_loss_detection_timer(now);
2081 return;
2082 }
2083
2084 let (_, space) = match self.pto_time_and_space(now) {
2085 Some(x) => x,
2086 None => {
2087 error!("PTO expired while unset");
2088 return;
2089 }
2090 };
2091 trace!(
2092 in_flight = self.path.in_flight.bytes,
2093 count = self.pto_count,
2094 ?space,
2095 "PTO fired"
2096 );
2097
2098 let count = match self.path.in_flight.ack_eliciting {
2099 0 => {
2102 debug_assert!(!self.peer_completed_address_validation());
2103 1
2104 }
2105 _ => 2,
2107 };
2108 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
2109 self.pto_count = self.pto_count.saturating_add(1);
2110 self.set_loss_detection_timer(now);
2111 }
2112
2113 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
2114 let mut lost_packets = Vec::<u64>::new();
2115 let mut lost_mtu_probe = None;
2116 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
2117 let rtt = self.path.rtt.conservative();
2118 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
2119
2120 let lost_send_time = now.checked_sub(loss_delay).unwrap();
2122 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
2123 let packet_threshold = self.config.packet_threshold as u64;
2124 let mut size_of_lost_packets = 0u64;
2125
2126 let congestion_period =
2130 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
2131 let mut persistent_congestion_start: Option<Instant> = None;
2132 let mut prev_packet = None;
2133 let mut in_persistent_congestion = false;
2134
2135 let space = &mut self.spaces[pn_space];
2136 space.loss_time = None;
2137
2138 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
2139 if prev_packet != Some(packet.wrapping_sub(1)) {
2140 persistent_congestion_start = None;
2142 }
2143
2144 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
2145 {
2146 if Some(packet) == in_flight_mtu_probe {
2147 lost_mtu_probe = in_flight_mtu_probe;
2150 } else {
2151 lost_packets.push(packet);
2152 size_of_lost_packets += info.size as u64;
2153 if info.ack_eliciting && due_to_ack {
2154 match persistent_congestion_start {
2155 Some(start) if info.time_sent - start > congestion_period => {
2158 in_persistent_congestion = true;
2159 }
2160 None if self
2162 .path
2163 .first_packet_after_rtt_sample
2164 .is_some_and(|x| x < (pn_space, packet)) =>
2165 {
2166 persistent_congestion_start = Some(info.time_sent);
2167 }
2168 _ => {}
2169 }
2170 }
2171 }
2172 } else {
2173 let next_loss_time = info.time_sent + loss_delay;
2174 space.loss_time = Some(
2175 space
2176 .loss_time
2177 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
2178 );
2179 persistent_congestion_start = None;
2180 }
2181
2182 prev_packet = Some(packet);
2183 }
2184
2185 if let Some(largest_lost) = lost_packets.last().cloned() {
2187 let old_bytes_in_flight = self.path.in_flight.bytes;
2188 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
2189 self.lost_packets += lost_packets.len() as u64;
2190 self.stats.path.lost_packets += lost_packets.len() as u64;
2191 self.stats.path.lost_bytes += size_of_lost_packets;
2192 trace!(
2193 "packets lost: {:?}, bytes lost: {}",
2194 lost_packets, size_of_lost_packets
2195 );
2196
2197 for &packet in &lost_packets {
2198 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2200 for frame in info.stream_frames {
2201 self.streams.retransmit(frame);
2202 }
2203 self.spaces[pn_space].pending |= info.retransmits;
2204 self.path.mtud.on_non_probe_lost(packet, info.size);
2205 }
2206
2207 if self.path.mtud.black_hole_detected(now) {
2208 self.stats.path.black_holes_detected += 1;
2209 self.path
2210 .congestion
2211 .on_mtu_update(self.path.mtud.current_mtu());
2212 if let Some(max_datagram_size) = self.datagrams().max_size() {
2213 self.datagrams.drop_oversized(max_datagram_size);
2214 }
2215 }
2216
2217 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
2219
2220 if lost_ack_eliciting {
2221 self.stats.path.congestion_events += 1;
2222 self.path.congestion.on_congestion_event(
2223 now,
2224 largest_lost_sent,
2225 in_persistent_congestion,
2226 size_of_lost_packets,
2227 );
2228 }
2229 }
2230
2231 if let Some(packet) = lost_mtu_probe {
2233 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2235 self.path.mtud.on_probe_lost();
2236 self.stats.path.lost_plpmtud_probes += 1;
2237 }
2238 }
2239
2240 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
2241 SpaceId::iter()
2242 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
2243 .min_by_key(|&(time, _)| time)
2244 }
2245
2246 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
2247 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
2248 let mut duration = self.path.rtt.pto_base() * backoff;
2249
2250 if self.path.in_flight.ack_eliciting == 0 {
2251 debug_assert!(!self.peer_completed_address_validation());
2252 let space = match self.highest_space {
2253 SpaceId::Handshake => SpaceId::Handshake,
2254 _ => SpaceId::Initial,
2255 };
2256 return Some((now + duration, space));
2257 }
2258
2259 let mut result = None;
2260 for space in SpaceId::iter() {
2261 if self.spaces[space].in_flight == 0 {
2262 continue;
2263 }
2264 if space == SpaceId::Data {
2265 if self.is_handshaking() {
2267 return result;
2268 }
2269 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2271 }
2272 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
2273 Some(time) => time,
2274 None => continue,
2275 };
2276 let pto = last_ack_eliciting + duration;
2277 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2278 result = Some((pto, space));
2279 }
2280 }
2281 result
2282 }
2283
2284 fn peer_completed_address_validation(&self) -> bool {
2285 if self.side.is_server() || self.state.is_closed() {
2286 return true;
2287 }
2288 self.spaces[SpaceId::Handshake]
2291 .largest_acked_packet
2292 .is_some()
2293 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
2294 || (self.spaces[SpaceId::Data].crypto.is_some()
2295 && self.spaces[SpaceId::Handshake].crypto.is_none())
2296 }
2297
2298 fn set_loss_detection_timer(&mut self, now: Instant) {
2299 if self.state.is_closed() {
2300 return;
2304 }
2305
2306 if let Some((loss_time, _)) = self.loss_time_and_space() {
2307 self.timers.set(Timer::LossDetection, loss_time);
2309 return;
2310 }
2311
2312 if self.path.anti_amplification_blocked(1) {
2313 self.timers.stop(Timer::LossDetection);
2315 return;
2316 }
2317
2318 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
2319 self.timers.stop(Timer::LossDetection);
2322 return;
2323 }
2324
2325 if let Some((timeout, _)) = self.pto_time_and_space(now) {
2328 self.timers.set(Timer::LossDetection, timeout);
2329 } else {
2330 self.timers.stop(Timer::LossDetection);
2331 }
2332 }
2333
2334 fn pto(&self, space: SpaceId) -> Duration {
2336 let max_ack_delay = match space {
2337 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
2338 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
2339 };
2340 self.path.rtt.pto_base() + max_ack_delay
2341 }
2342
2343 fn on_packet_authenticated(
2344 &mut self,
2345 now: Instant,
2346 space_id: SpaceId,
2347 ecn: Option<EcnCodepoint>,
2348 packet: Option<u64>,
2349 spin: bool,
2350 is_1rtt: bool,
2351 ) {
2352 self.total_authed_packets += 1;
2353 self.reset_keep_alive(now);
2354 self.reset_idle_timeout(now, space_id);
2355 self.permit_idle_reset = true;
2356 self.receiving_ecn |= ecn.is_some();
2357 if let Some(x) = ecn {
2358 let space = &mut self.spaces[space_id];
2359 space.ecn_counters += x;
2360
2361 if x.is_ce() {
2362 space.pending_acks.set_immediate_ack_required();
2363 }
2364 }
2365
2366 let packet = match packet {
2367 Some(x) => x,
2368 None => return,
2369 };
2370 if self.side.is_server() {
2371 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
2372 self.discard_space(now, SpaceId::Initial);
2374 }
2375 if self.zero_rtt_crypto.is_some() && is_1rtt {
2376 self.set_key_discard_timer(now, space_id)
2378 }
2379 }
2380 let space = &mut self.spaces[space_id];
2381 space.pending_acks.insert_one(packet, now);
2382 if packet >= space.rx_packet {
2383 space.rx_packet = packet;
2384 self.spin = self.side.is_client() ^ spin;
2386 }
2387 }
2388
2389 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
2390 let timeout = match self.idle_timeout {
2391 None => return,
2392 Some(dur) => dur,
2393 };
2394 if self.state.is_closed() {
2395 self.timers.stop(Timer::Idle);
2396 return;
2397 }
2398 let dt = cmp::max(timeout, 3 * self.pto(space));
2399 self.timers.set(Timer::Idle, now + dt);
2400 }
2401
2402 fn reset_keep_alive(&mut self, now: Instant) {
2403 let interval = match self.config.keep_alive_interval {
2404 Some(x) if self.state.is_established() => x,
2405 _ => return,
2406 };
2407 self.timers.set(Timer::KeepAlive, now + interval);
2408 }
2409
2410 fn reset_cid_retirement(&mut self) {
2411 if let Some(t) = self.local_cid_state.next_timeout() {
2412 self.timers.set(Timer::PushNewCid, t);
2413 }
2414 }
2415
2416 pub(crate) fn handle_first_packet(
2421 &mut self,
2422 now: Instant,
2423 remote: SocketAddr,
2424 ecn: Option<EcnCodepoint>,
2425 packet_number: u64,
2426 packet: InitialPacket,
2427 remaining: Option<BytesMut>,
2428 ) -> Result<(), ConnectionError> {
2429 let span = trace_span!("first recv");
2430 let _guard = span.enter();
2431 debug_assert!(self.side.is_server());
2432 let len = packet.header_data.len() + packet.payload.len();
2433 self.path.total_recvd = len as u64;
2434
2435 match self.state {
2436 State::Handshake(ref mut state) => {
2437 state.expected_token = packet.header.token.clone();
2438 }
2439 _ => unreachable!("first packet must be delivered in Handshake state"),
2440 }
2441
2442 self.on_packet_authenticated(
2443 now,
2444 SpaceId::Initial,
2445 ecn,
2446 Some(packet_number),
2447 false,
2448 false,
2449 );
2450
2451 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2452 if let Some(data) = remaining {
2453 self.handle_coalesced(now, remote, ecn, data);
2454 }
2455
2456 #[cfg(feature = "__qlog")]
2457 self.emit_qlog_recovery_metrics(now);
2458
2459 Ok(())
2460 }
2461
2462 fn init_0rtt(&mut self) {
2463 let (header, packet) = match self.crypto.early_crypto() {
2464 Some(x) => x,
2465 None => return,
2466 };
2467 if self.side.is_client() {
2468 match self.crypto.transport_parameters() {
2469 Ok(params) => {
2470 let params = params
2471 .expect("crypto layer didn't supply transport parameters with ticket");
2472 let params = TransportParameters {
2474 initial_src_cid: None,
2475 original_dst_cid: None,
2476 preferred_address: None,
2477 retry_src_cid: None,
2478 stateless_reset_token: None,
2479 min_ack_delay: None,
2480 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2481 max_ack_delay: TransportParameters::default().max_ack_delay,
2482 ..params
2483 };
2484 self.set_peer_params(params);
2485 }
2486 Err(e) => {
2487 error!("session ticket has malformed transport parameters: {}", e);
2488 return;
2489 }
2490 }
2491 }
2492 trace!("0-RTT enabled");
2493 self.zero_rtt_enabled = true;
2494 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2495 }
2496
2497 fn read_crypto(
2498 &mut self,
2499 space: SpaceId,
2500 crypto: &frame::Crypto,
2501 payload_len: usize,
2502 ) -> Result<(), TransportError> {
2503 let expected = if !self.state.is_handshake() {
2504 SpaceId::Data
2505 } else if self.highest_space == SpaceId::Initial {
2506 SpaceId::Initial
2507 } else {
2508 SpaceId::Handshake
2511 };
2512 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2516
2517 let end = crypto.offset + crypto.data.len() as u64;
2518 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2519 warn!(
2520 "received new {:?} CRYPTO data when expecting {:?}",
2521 space, expected
2522 );
2523 return Err(TransportError::PROTOCOL_VIOLATION(
2524 "new data at unexpected encryption level",
2525 ));
2526 }
2527
2528 #[cfg(feature = "pqc")]
2530 {
2531 self.pqc_state.detect_pqc_from_crypto(&crypto.data, space);
2532
2533 if self.pqc_state.should_trigger_mtu_discovery() {
2535 self.path
2537 .mtud
2538 .reset(self.pqc_state.min_initial_size(), self.config.min_mtu);
2539 trace!("Triggered MTU discovery for PQC handshake");
2540 }
2541 }
2542
2543 let space = &mut self.spaces[space];
2544 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2545 if max > self.config.crypto_buffer_size as u64 {
2546 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2547 }
2548
2549 space
2550 .crypto_stream
2551 .insert(crypto.offset, crypto.data.clone(), payload_len);
2552 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2553 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2554 if self.crypto.read_handshake(&chunk.bytes)? {
2555 self.events.push_back(Event::HandshakeDataReady);
2556 }
2557 }
2558
2559 Ok(())
2560 }
2561
2562 fn write_crypto(&mut self) {
2563 loop {
2564 let space = self.highest_space;
2565 let mut outgoing = Vec::new();
2566 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2567 match space {
2568 SpaceId::Initial => {
2569 self.upgrade_crypto(SpaceId::Handshake, crypto);
2570 }
2571 SpaceId::Handshake => {
2572 self.upgrade_crypto(SpaceId::Data, crypto);
2573 }
2574 _ => unreachable!("got updated secrets during 1-RTT"),
2575 }
2576 }
2577 if outgoing.is_empty() {
2578 if space == self.highest_space {
2579 break;
2580 } else {
2581 continue;
2583 }
2584 }
2585 let offset = self.spaces[space].crypto_offset;
2586 let outgoing = Bytes::from(outgoing);
2587 if let State::Handshake(ref mut state) = self.state {
2588 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2589 state.client_hello = Some(outgoing.clone());
2590 }
2591 }
2592 self.spaces[space].crypto_offset += outgoing.len() as u64;
2593 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2594
2595 #[cfg(feature = "pqc")]
2597 let use_pqc_fragmentation = self.pqc_state.using_pqc && outgoing.len() > 1200;
2598 #[cfg(not(feature = "pqc"))]
2599 let use_pqc_fragmentation = false;
2600
2601 if use_pqc_fragmentation {
2602 #[cfg(feature = "pqc")]
2604 {
2605 let frames = self.pqc_state.packet_handler.fragment_crypto_data(
2606 &outgoing,
2607 offset,
2608 self.pqc_state.min_initial_size() as usize,
2609 );
2610 for frame in frames {
2611 self.spaces[space].pending.crypto.push_back(frame);
2612 }
2613 }
2614 } else {
2615 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2617 offset,
2618 data: outgoing,
2619 });
2620 }
2621 }
2622 }
2623
2624 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2626 debug_assert!(
2627 self.spaces[space].crypto.is_none(),
2628 "already reached packet space {space:?}"
2629 );
2630 trace!("{:?} keys ready", space);
2631 if space == SpaceId::Data {
2632 self.next_crypto = Some(
2634 self.crypto
2635 .next_1rtt_keys()
2636 .expect("handshake should be complete"),
2637 );
2638 }
2639
2640 self.spaces[space].crypto = Some(crypto);
2641 debug_assert!(space as usize > self.highest_space as usize);
2642 self.highest_space = space;
2643 if space == SpaceId::Data && self.side.is_client() {
2644 self.zero_rtt_crypto = None;
2646 }
2647 }
2648
2649 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2650 debug_assert!(space_id != SpaceId::Data);
2651 trace!("discarding {:?} keys", space_id);
2652 if space_id == SpaceId::Initial {
2653 if let ConnectionSide::Client { token, .. } = &mut self.side {
2655 *token = Bytes::new();
2656 }
2657 }
2658 let space = &mut self.spaces[space_id];
2659 space.crypto = None;
2660 space.time_of_last_ack_eliciting_packet = None;
2661 space.loss_time = None;
2662 space.in_flight = 0;
2663 let sent_packets = mem::take(&mut space.sent_packets);
2664 for (pn, packet) in sent_packets.into_iter() {
2665 self.remove_in_flight(pn, &packet);
2666 }
2667 self.set_loss_detection_timer(now)
2668 }
2669
2670 fn handle_coalesced(
2671 &mut self,
2672 now: Instant,
2673 remote: SocketAddr,
2674 ecn: Option<EcnCodepoint>,
2675 data: BytesMut,
2676 ) {
2677 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2678 let mut remaining = Some(data);
2679 while let Some(data) = remaining {
2680 match PartialDecode::new(
2681 data,
2682 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2683 &[self.version],
2684 self.endpoint_config.grease_quic_bit,
2685 ) {
2686 Ok((partial_decode, rest)) => {
2687 remaining = rest;
2688 self.handle_decode(now, remote, ecn, partial_decode);
2689 }
2690 Err(e) => {
2691 trace!("malformed header: {}", e);
2692 return;
2693 }
2694 }
2695 }
2696 }
2697
2698 fn handle_decode(
2699 &mut self,
2700 now: Instant,
2701 remote: SocketAddr,
2702 ecn: Option<EcnCodepoint>,
2703 partial_decode: PartialDecode,
2704 ) {
2705 if let Some(decoded) = packet_crypto::unprotect_header(
2706 partial_decode,
2707 &self.spaces,
2708 self.zero_rtt_crypto.as_ref(),
2709 self.peer_params.stateless_reset_token,
2710 ) {
2711 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2712 }
2713 }
2714
2715 fn handle_packet(
2716 &mut self,
2717 now: Instant,
2718 remote: SocketAddr,
2719 ecn: Option<EcnCodepoint>,
2720 packet: Option<Packet>,
2721 stateless_reset: bool,
2722 ) {
2723 self.stats.udp_rx.ios += 1;
2724 if let Some(ref packet) = packet {
2725 trace!(
2726 "got {:?} packet ({} bytes) from {} using id {}",
2727 packet.header.space(),
2728 packet.payload.len() + packet.header_data.len(),
2729 remote,
2730 packet.header.dst_cid(),
2731 );
2732
2733 #[cfg(feature = "trace")]
2735 {
2736 use crate::trace_packet_received;
2737 let packet_size = packet.payload.len() + packet.header_data.len();
2739 trace_packet_received!(
2740 &self.event_log,
2741 self.trace_context.trace_id(),
2742 packet_size as u32,
2743 0 );
2745 }
2746 }
2747
2748 if self.is_handshaking() && remote != self.path.remote {
2749 debug!("discarding packet with unexpected remote during handshake");
2750 return;
2751 }
2752
2753 let was_closed = self.state.is_closed();
2754 let was_drained = self.state.is_drained();
2755
2756 let decrypted = match packet {
2757 None => Err(None),
2758 Some(mut packet) => self
2759 .decrypt_packet(now, &mut packet)
2760 .map(move |number| (packet, number)),
2761 };
2762 let result = match decrypted {
2763 _ if stateless_reset => {
2764 debug!("got stateless reset");
2765 Err(ConnectionError::Reset)
2766 }
2767 Err(Some(e)) => {
2768 warn!("illegal packet: {}", e);
2769 Err(e.into())
2770 }
2771 Err(None) => {
2772 debug!("failed to authenticate packet");
2773 self.authentication_failures += 1;
2774 let integrity_limit = self.spaces[self.highest_space]
2775 .crypto
2776 .as_ref()
2777 .unwrap()
2778 .packet
2779 .local
2780 .integrity_limit();
2781 if self.authentication_failures > integrity_limit {
2782 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2783 } else {
2784 return;
2785 }
2786 }
2787 Ok((packet, number)) => {
2788 let span = match number {
2789 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2790 None => trace_span!("recv", space = ?packet.header.space()),
2791 };
2792 let _guard = span.enter();
2793
2794 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2795 if number.is_some_and(is_duplicate) {
2796 debug!("discarding possible duplicate packet");
2797 return;
2798 } else if self.state.is_handshake() && packet.header.is_short() {
2799 trace!("dropping short packet during handshake");
2801 return;
2802 } else {
2803 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2804 if let State::Handshake(ref hs) = self.state {
2805 if self.side.is_server() && token != &hs.expected_token {
2806 warn!("discarding Initial with invalid retry token");
2810 return;
2811 }
2812 }
2813 }
2814
2815 if !self.state.is_closed() {
2816 let spin = match packet.header {
2817 Header::Short { spin, .. } => spin,
2818 _ => false,
2819 };
2820 self.on_packet_authenticated(
2821 now,
2822 packet.header.space(),
2823 ecn,
2824 number,
2825 spin,
2826 packet.header.is_1rtt(),
2827 );
2828 }
2829
2830 self.process_decrypted_packet(now, remote, number, packet)
2831 }
2832 }
2833 };
2834
2835 if let Err(conn_err) = result {
2837 self.error = Some(conn_err.clone());
2838 self.state = match conn_err {
2839 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2840 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2841 ConnectionError::Reset
2842 | ConnectionError::TransportError(TransportError {
2843 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2844 ..
2845 }) => State::Drained,
2846 ConnectionError::TimedOut => {
2847 unreachable!("timeouts aren't generated by packet processing");
2848 }
2849 ConnectionError::TransportError(err) => {
2850 debug!("closing connection due to transport error: {}", err);
2851 State::closed(err)
2852 }
2853 ConnectionError::VersionMismatch => State::Draining,
2854 ConnectionError::LocallyClosed => {
2855 unreachable!("LocallyClosed isn't generated by packet processing");
2856 }
2857 ConnectionError::CidsExhausted => {
2858 unreachable!("CidsExhausted isn't generated by packet processing");
2859 }
2860 };
2861 }
2862
2863 if !was_closed && self.state.is_closed() {
2864 self.close_common();
2865 if !self.state.is_drained() {
2866 self.set_close_timer(now);
2867 }
2868 }
2869 if !was_drained && self.state.is_drained() {
2870 self.endpoint_events.push_back(EndpointEventInner::Drained);
2871 self.timers.stop(Timer::Close);
2874 }
2875
2876 if let State::Closed(_) = self.state {
2878 self.close = remote == self.path.remote;
2879 }
2880 }
2881
2882 fn process_decrypted_packet(
2883 &mut self,
2884 now: Instant,
2885 remote: SocketAddr,
2886 number: Option<u64>,
2887 packet: Packet,
2888 ) -> Result<(), ConnectionError> {
2889 let state = match self.state {
2890 State::Established => {
2891 match packet.header.space() {
2892 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2893 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2894 _ => {
2895 trace!("discarding unexpected pre-handshake packet");
2896 }
2897 }
2898 return Ok(());
2899 }
2900 State::Closed(_) => {
2901 for result in frame::Iter::new(packet.payload.freeze())? {
2902 let frame = match result {
2903 Ok(frame) => frame,
2904 Err(err) => {
2905 debug!("frame decoding error: {err:?}");
2906 continue;
2907 }
2908 };
2909
2910 if let Frame::Padding = frame {
2911 continue;
2912 };
2913
2914 self.stats.frame_rx.record(&frame);
2915
2916 if let Frame::Close(_) = frame {
2917 trace!("draining");
2918 self.state = State::Draining;
2919 break;
2920 }
2921 }
2922 return Ok(());
2923 }
2924 State::Draining | State::Drained => return Ok(()),
2925 State::Handshake(ref mut state) => state,
2926 };
2927
2928 match packet.header {
2929 Header::Retry {
2930 src_cid: rem_cid, ..
2931 } => {
2932 if self.side.is_server() {
2933 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2934 }
2935
2936 if self.total_authed_packets > 1
2937 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2939 &self.rem_cids.active(),
2940 &packet.header_data,
2941 &packet.payload,
2942 )
2943 {
2944 trace!("discarding invalid Retry");
2945 return Ok(());
2953 }
2954
2955 trace!("retrying with CID {}", rem_cid);
2956 let client_hello = state.client_hello.take().unwrap();
2957 self.retry_src_cid = Some(rem_cid);
2958 self.rem_cids.update_initial_cid(rem_cid);
2959 self.rem_handshake_cid = rem_cid;
2960
2961 let space = &mut self.spaces[SpaceId::Initial];
2962 if let Some(info) = space.take(0) {
2963 self.on_packet_acked(now, 0, info);
2964 };
2965
2966 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
2968 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2969 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2970 crypto_offset: client_hello.len() as u64,
2971 ..PacketSpace::new(now)
2972 };
2973 self.spaces[SpaceId::Initial]
2974 .pending
2975 .crypto
2976 .push_back(frame::Crypto {
2977 offset: 0,
2978 data: client_hello,
2979 });
2980
2981 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2983 for (pn, info) in zero_rtt {
2984 self.remove_in_flight(pn, &info);
2985 self.spaces[SpaceId::Data].pending |= info.retransmits;
2986 }
2987 self.streams.retransmit_all_for_0rtt();
2988
2989 let token_len = packet.payload.len() - 16;
2990 let ConnectionSide::Client { ref mut token, .. } = self.side else {
2991 unreachable!("we already short-circuited if we're server");
2992 };
2993 *token = packet.payload.freeze().split_to(token_len);
2994 self.state = State::Handshake(state::Handshake {
2995 expected_token: Bytes::new(),
2996 rem_cid_set: false,
2997 client_hello: None,
2998 });
2999 Ok(())
3000 }
3001 Header::Long {
3002 ty: LongType::Handshake,
3003 src_cid: rem_cid,
3004 ..
3005 } => {
3006 if rem_cid != self.rem_handshake_cid {
3007 debug!(
3008 "discarding packet with mismatched remote CID: {} != {}",
3009 self.rem_handshake_cid, rem_cid
3010 );
3011 return Ok(());
3012 }
3013 self.on_path_validated();
3014
3015 self.process_early_payload(now, packet)?;
3016 if self.state.is_closed() {
3017 return Ok(());
3018 }
3019
3020 if self.crypto.is_handshaking() {
3021 trace!("handshake ongoing");
3022 return Ok(());
3023 }
3024
3025 if self.side.is_client() {
3026 let params =
3028 self.crypto
3029 .transport_parameters()?
3030 .ok_or_else(|| TransportError {
3031 code: TransportErrorCode::crypto(0x6d),
3032 frame: None,
3033 reason: "transport parameters missing".into(),
3034 })?;
3035
3036 if self.has_0rtt() {
3037 if !self.crypto.early_data_accepted().unwrap() {
3038 debug_assert!(self.side.is_client());
3039 debug!("0-RTT rejected");
3040 self.accepted_0rtt = false;
3041 self.streams.zero_rtt_rejected();
3042
3043 self.spaces[SpaceId::Data].pending = Retransmits::default();
3045
3046 let sent_packets =
3048 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
3049 for (pn, packet) in sent_packets {
3050 self.remove_in_flight(pn, &packet);
3051 }
3052 } else {
3053 self.accepted_0rtt = true;
3054 params.validate_resumption_from(&self.peer_params)?;
3055 }
3056 }
3057 if let Some(token) = params.stateless_reset_token {
3058 self.endpoint_events
3059 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
3060 }
3061 self.handle_peer_params(params)?;
3062 self.issue_first_cids(now);
3063 } else {
3064 self.spaces[SpaceId::Data].pending.handshake_done = true;
3066 self.discard_space(now, SpaceId::Handshake);
3067 }
3068
3069 self.events.push_back(Event::Connected);
3070 self.state = State::Established;
3071 trace!("established");
3072 Ok(())
3073 }
3074 Header::Initial(InitialHeader {
3075 src_cid: rem_cid, ..
3076 }) => {
3077 if !state.rem_cid_set {
3078 trace!("switching remote CID to {}", rem_cid);
3079 let mut state = state.clone();
3080 self.rem_cids.update_initial_cid(rem_cid);
3081 self.rem_handshake_cid = rem_cid;
3082 self.orig_rem_cid = rem_cid;
3083 state.rem_cid_set = true;
3084 self.state = State::Handshake(state);
3085 } else if rem_cid != self.rem_handshake_cid {
3086 debug!(
3087 "discarding packet with mismatched remote CID: {} != {}",
3088 self.rem_handshake_cid, rem_cid
3089 );
3090 return Ok(());
3091 }
3092
3093 let starting_space = self.highest_space;
3094 self.process_early_payload(now, packet)?;
3095
3096 if self.side.is_server()
3097 && starting_space == SpaceId::Initial
3098 && self.highest_space != SpaceId::Initial
3099 {
3100 let params =
3101 self.crypto
3102 .transport_parameters()?
3103 .ok_or_else(|| TransportError {
3104 code: TransportErrorCode::crypto(0x6d),
3105 frame: None,
3106 reason: "transport parameters missing".into(),
3107 })?;
3108 self.handle_peer_params(params)?;
3109 self.issue_first_cids(now);
3110 self.init_0rtt();
3111 }
3112 Ok(())
3113 }
3114 Header::Long {
3115 ty: LongType::ZeroRtt,
3116 ..
3117 } => {
3118 self.process_payload(now, remote, number.unwrap(), packet)?;
3119 Ok(())
3120 }
3121 Header::VersionNegotiate { .. } => {
3122 if self.total_authed_packets > 1 {
3123 return Ok(());
3124 }
3125 let supported = packet
3126 .payload
3127 .chunks(4)
3128 .any(|x| match <[u8; 4]>::try_from(x) {
3129 Ok(version) => self.version == u32::from_be_bytes(version),
3130 Err(_) => false,
3131 });
3132 if supported {
3133 return Ok(());
3134 }
3135 debug!("remote doesn't support our version");
3136 Err(ConnectionError::VersionMismatch)
3137 }
3138 Header::Short { .. } => unreachable!(
3139 "short packets received during handshake are discarded in handle_packet"
3140 ),
3141 }
3142 }
3143
3144 fn process_early_payload(
3146 &mut self,
3147 now: Instant,
3148 packet: Packet,
3149 ) -> Result<(), TransportError> {
3150 debug_assert_ne!(packet.header.space(), SpaceId::Data);
3151 let payload_len = packet.payload.len();
3152 let mut ack_eliciting = false;
3153 for result in frame::Iter::new(packet.payload.freeze())? {
3154 let frame = result?;
3155 let span = match frame {
3156 Frame::Padding => continue,
3157 _ => Some(trace_span!("frame", ty = %frame.ty())),
3158 };
3159
3160 self.stats.frame_rx.record(&frame);
3161
3162 let _guard = span.as_ref().map(|x| x.enter());
3163 ack_eliciting |= frame.is_ack_eliciting();
3164
3165 match frame {
3167 Frame::Padding | Frame::Ping => {}
3168 Frame::Crypto(frame) => {
3169 self.read_crypto(packet.header.space(), &frame, payload_len)?;
3170 }
3171 Frame::Ack(ack) => {
3172 self.on_ack_received(now, packet.header.space(), ack)?;
3173 }
3174 Frame::Close(reason) => {
3175 self.error = Some(reason.into());
3176 self.state = State::Draining;
3177 return Ok(());
3178 }
3179 _ => {
3180 let mut err =
3181 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
3182 err.frame = Some(frame.ty());
3183 return Err(err);
3184 }
3185 }
3186 }
3187
3188 if ack_eliciting {
3189 self.spaces[packet.header.space()]
3191 .pending_acks
3192 .set_immediate_ack_required();
3193 }
3194
3195 self.write_crypto();
3196 Ok(())
3197 }
3198
3199 fn process_payload(
3200 &mut self,
3201 now: Instant,
3202 remote: SocketAddr,
3203 number: u64,
3204 packet: Packet,
3205 ) -> Result<(), TransportError> {
3206 let payload = packet.payload.freeze();
3207 let mut is_probing_packet = true;
3208 let mut close = None;
3209 let payload_len = payload.len();
3210 let mut ack_eliciting = false;
3211 for result in frame::Iter::new(payload)? {
3212 let frame = result?;
3213 let span = match frame {
3214 Frame::Padding => continue,
3215 _ => Some(trace_span!("frame", ty = %frame.ty())),
3216 };
3217
3218 self.stats.frame_rx.record(&frame);
3219 match &frame {
3222 Frame::Crypto(f) => {
3223 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
3224 }
3225 Frame::Stream(f) => {
3226 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
3227 }
3228 Frame::Datagram(f) => {
3229 trace!(len = f.data.len(), "got datagram frame");
3230 }
3231 f => {
3232 trace!("got frame {:?}", f);
3233 }
3234 }
3235
3236 let _guard = span.as_ref().map(|x| x.enter());
3237 if packet.header.is_0rtt() {
3238 match frame {
3239 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
3240 return Err(TransportError::PROTOCOL_VIOLATION(
3241 "illegal frame type in 0-RTT",
3242 ));
3243 }
3244 _ => {}
3245 }
3246 }
3247 ack_eliciting |= frame.is_ack_eliciting();
3248
3249 match frame {
3251 Frame::Padding
3252 | Frame::PathChallenge(_)
3253 | Frame::PathResponse(_)
3254 | Frame::NewConnectionId(_) => {}
3255 _ => {
3256 is_probing_packet = false;
3257 }
3258 }
3259 match frame {
3260 Frame::Crypto(frame) => {
3261 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
3262 }
3263 Frame::Stream(frame) => {
3264 if self.streams.received(frame, payload_len)?.should_transmit() {
3265 self.spaces[SpaceId::Data].pending.max_data = true;
3266 }
3267 }
3268 Frame::Ack(ack) => {
3269 self.on_ack_received(now, SpaceId::Data, ack)?;
3270 }
3271 Frame::Padding | Frame::Ping => {}
3272 Frame::Close(reason) => {
3273 close = Some(reason);
3274 }
3275 Frame::PathChallenge(token) => {
3276 self.path_responses.push(number, token, remote);
3277 if remote == self.path.remote {
3278 match self.peer_supports_ack_frequency() {
3281 true => self.immediate_ack(),
3282 false => self.ping(),
3283 }
3284 }
3285 }
3286 Frame::PathResponse(token) => {
3287 if self.path.challenge == Some(token) && remote == self.path.remote {
3288 trace!("new path validated");
3289 self.timers.stop(Timer::PathValidation);
3290 self.path.challenge = None;
3291 self.path.validated = true;
3292 if let Some((_, ref mut prev_path)) = self.prev_path {
3293 prev_path.challenge = None;
3294 prev_path.challenge_pending = false;
3295 }
3296 self.on_path_validated();
3297 } else if let Some(nat_traversal) = &mut self.nat_traversal {
3298 match nat_traversal.handle_validation_success(remote, token, now) {
3300 Ok(sequence) => {
3301 trace!(
3302 "NAT traversal candidate {} validated for sequence {}",
3303 remote, sequence
3304 );
3305
3306 if nat_traversal.handle_coordination_success(remote, now) {
3308 trace!("Coordination succeeded via {}", remote);
3309
3310 let can_migrate = match &self.side {
3312 ConnectionSide::Client { .. } => true, ConnectionSide::Server { server_config } => {
3314 server_config.migration
3315 }
3316 };
3317
3318 if can_migrate {
3319 let best_pairs = nat_traversal.get_best_succeeded_pairs();
3321 if let Some(best) = best_pairs.first() {
3322 if best.remote_addr == remote
3323 && best.remote_addr != self.path.remote
3324 {
3325 debug!(
3326 "NAT traversal found better path, initiating migration"
3327 );
3328 if let Err(e) =
3330 self.migrate_to_nat_traversal_path(now)
3331 {
3332 warn!(
3333 "Failed to migrate to NAT traversal path: {:?}",
3334 e
3335 );
3336 }
3337 }
3338 }
3339 }
3340 } else {
3341 if nat_traversal.mark_pair_succeeded(remote) {
3343 trace!("NAT traversal pair succeeded for {}", remote);
3344 }
3345 }
3346 }
3347 Err(NatTraversalError::ChallengeMismatch) => {
3348 debug!(
3349 "PATH_RESPONSE challenge mismatch for NAT candidate {}",
3350 remote
3351 );
3352 }
3353 Err(e) => {
3354 debug!("NAT traversal validation error: {}", e);
3355 }
3356 }
3357 } else {
3358 debug!(token, "ignoring invalid PATH_RESPONSE");
3359 }
3360 }
3361 Frame::MaxData(bytes) => {
3362 self.streams.received_max_data(bytes);
3363 }
3364 Frame::MaxStreamData { id, offset } => {
3365 self.streams.received_max_stream_data(id, offset)?;
3366 }
3367 Frame::MaxStreams { dir, count } => {
3368 self.streams.received_max_streams(dir, count)?;
3369 }
3370 Frame::ResetStream(frame) => {
3371 if self.streams.received_reset(frame)?.should_transmit() {
3372 self.spaces[SpaceId::Data].pending.max_data = true;
3373 }
3374 }
3375 Frame::DataBlocked { offset } => {
3376 debug!(offset, "peer claims to be blocked at connection level");
3377 }
3378 Frame::StreamDataBlocked { id, offset } => {
3379 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
3380 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
3381 return Err(TransportError::STREAM_STATE_ERROR(
3382 "STREAM_DATA_BLOCKED on send-only stream",
3383 ));
3384 }
3385 debug!(
3386 stream = %id,
3387 offset, "peer claims to be blocked at stream level"
3388 );
3389 }
3390 Frame::StreamsBlocked { dir, limit } => {
3391 if limit > MAX_STREAM_COUNT {
3392 return Err(TransportError::FRAME_ENCODING_ERROR(
3393 "unrepresentable stream limit",
3394 ));
3395 }
3396 debug!(
3397 "peer claims to be blocked opening more than {} {} streams",
3398 limit, dir
3399 );
3400 }
3401 Frame::StopSending(frame::StopSending { id, error_code }) => {
3402 if id.initiator() != self.side.side() {
3403 if id.dir() == Dir::Uni {
3404 debug!("got STOP_SENDING on recv-only {}", id);
3405 return Err(TransportError::STREAM_STATE_ERROR(
3406 "STOP_SENDING on recv-only stream",
3407 ));
3408 }
3409 } else if self.streams.is_local_unopened(id) {
3410 return Err(TransportError::STREAM_STATE_ERROR(
3411 "STOP_SENDING on unopened stream",
3412 ));
3413 }
3414 self.streams.received_stop_sending(id, error_code);
3415 }
3416 Frame::RetireConnectionId { sequence } => {
3417 let allow_more_cids = self
3418 .local_cid_state
3419 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
3420 self.endpoint_events
3421 .push_back(EndpointEventInner::RetireConnectionId(
3422 now,
3423 sequence,
3424 allow_more_cids,
3425 ));
3426 }
3427 Frame::NewConnectionId(frame) => {
3428 trace!(
3429 sequence = frame.sequence,
3430 id = %frame.id,
3431 retire_prior_to = frame.retire_prior_to,
3432 );
3433 if self.rem_cids.active().is_empty() {
3434 return Err(TransportError::PROTOCOL_VIOLATION(
3435 "NEW_CONNECTION_ID when CIDs aren't in use",
3436 ));
3437 }
3438 if frame.retire_prior_to > frame.sequence {
3439 return Err(TransportError::PROTOCOL_VIOLATION(
3440 "NEW_CONNECTION_ID retiring unissued CIDs",
3441 ));
3442 }
3443
3444 use crate::cid_queue::InsertError;
3445 match self.rem_cids.insert(frame) {
3446 Ok(None) => {}
3447 Ok(Some((retired, reset_token))) => {
3448 let pending_retired =
3449 &mut self.spaces[SpaceId::Data].pending.retire_cids;
3450 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
3453 if (pending_retired.len() as u64)
3456 .saturating_add(retired.end.saturating_sub(retired.start))
3457 > MAX_PENDING_RETIRED_CIDS
3458 {
3459 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
3460 "queued too many retired CIDs",
3461 ));
3462 }
3463 pending_retired.extend(retired);
3464 self.set_reset_token(reset_token);
3465 }
3466 Err(InsertError::ExceedsLimit) => {
3467 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
3468 }
3469 Err(InsertError::Retired) => {
3470 trace!("discarding already-retired");
3471 self.spaces[SpaceId::Data]
3475 .pending
3476 .retire_cids
3477 .push(frame.sequence);
3478 continue;
3479 }
3480 };
3481
3482 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
3483 self.update_rem_cid();
3486 }
3487 }
3488 Frame::NewToken(NewToken { token }) => {
3489 let ConnectionSide::Client {
3490 token_store,
3491 server_name,
3492 ..
3493 } = &self.side
3494 else {
3495 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
3496 };
3497 if token.is_empty() {
3498 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
3499 }
3500 trace!("got new token");
3501 token_store.insert(server_name, token);
3502 }
3503 Frame::Datagram(datagram) => {
3504 if self
3505 .datagrams
3506 .received(datagram, &self.config.datagram_receive_buffer_size)?
3507 {
3508 self.events.push_back(Event::DatagramReceived);
3509 }
3510 }
3511 Frame::AckFrequency(ack_frequency) => {
3512 let space = &mut self.spaces[SpaceId::Data];
3514
3515 if !self
3516 .ack_frequency
3517 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3518 {
3519 continue;
3521 }
3522
3523 if let Some(timeout) = space
3526 .pending_acks
3527 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3528 {
3529 self.timers.set(Timer::MaxAckDelay, timeout);
3530 }
3531 }
3532 Frame::ImmediateAck => {
3533 self.spaces[SpaceId::Data]
3535 .pending_acks
3536 .set_immediate_ack_required();
3537 }
3538 Frame::HandshakeDone => {
3539 if self.side.is_server() {
3540 return Err(TransportError::PROTOCOL_VIOLATION(
3541 "client sent HANDSHAKE_DONE",
3542 ));
3543 }
3544 if self.spaces[SpaceId::Handshake].crypto.is_some() {
3545 self.discard_space(now, SpaceId::Handshake);
3546 }
3547 }
3548 Frame::AddAddress(add_address) => {
3549 self.handle_add_address(&add_address, now)?;
3550 }
3551 Frame::PunchMeNow(punch_me_now) => {
3552 self.handle_punch_me_now(&punch_me_now, now)?;
3553 }
3554 Frame::RemoveAddress(remove_address) => {
3555 self.handle_remove_address(&remove_address)?;
3556 }
3557 Frame::ObservedAddress(observed_address) => {
3558 self.handle_observed_address_frame(&observed_address, now)?;
3559 }
3560 }
3561 }
3562
3563 let space = &mut self.spaces[SpaceId::Data];
3564 if space
3565 .pending_acks
3566 .packet_received(now, number, ack_eliciting, &space.dedup)
3567 {
3568 self.timers
3569 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3570 }
3571
3572 let pending = &mut self.spaces[SpaceId::Data].pending;
3577 self.streams.queue_max_stream_id(pending);
3578
3579 if let Some(reason) = close {
3580 self.error = Some(reason.into());
3581 self.state = State::Draining;
3582 self.close = true;
3583 }
3584
3585 if remote != self.path.remote
3586 && !is_probing_packet
3587 && number == self.spaces[SpaceId::Data].rx_packet
3588 {
3589 let ConnectionSide::Server { ref server_config } = self.side else {
3590 return Err(TransportError::PROTOCOL_VIOLATION(
3591 "packets from unknown remote should be dropped by clients",
3592 ));
3593 };
3594 debug_assert!(
3595 server_config.migration,
3596 "migration-initiating packets should have been dropped immediately"
3597 );
3598 self.migrate(now, remote);
3599 self.update_rem_cid();
3601 self.spin = false;
3602 }
3603
3604 Ok(())
3605 }
3606
3607 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3608 trace!(%remote, "migration initiated");
3609 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3613 PathData::from_previous(remote, &self.path, now)
3614 } else {
3615 let peer_max_udp_payload_size =
3616 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3617 .unwrap_or(u16::MAX);
3618 PathData::new(
3619 remote,
3620 self.allow_mtud,
3621 Some(peer_max_udp_payload_size),
3622 now,
3623 &self.config,
3624 )
3625 };
3626 new_path.challenge = Some(self.rng.r#gen());
3627 new_path.challenge_pending = true;
3628 let prev_pto = self.pto(SpaceId::Data);
3629
3630 let mut prev = mem::replace(&mut self.path, new_path);
3631 if prev.challenge.is_none() {
3633 prev.challenge = Some(self.rng.r#gen());
3634 prev.challenge_pending = true;
3635 self.prev_path = Some((self.rem_cids.active(), prev));
3638 }
3639
3640 self.timers.set(
3641 Timer::PathValidation,
3642 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3643 );
3644 }
3645
3646 pub fn local_address_changed(&mut self) {
3648 self.update_rem_cid();
3649 self.ping();
3650 }
3651
3652 pub fn migrate_to_nat_traversal_path(&mut self, now: Instant) -> Result<(), TransportError> {
3654 let (remote_addr, local_addr) = {
3656 let nat_state = self
3657 .nat_traversal
3658 .as_ref()
3659 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
3660
3661 let best_pairs = nat_state.get_best_succeeded_pairs();
3663 if best_pairs.is_empty() {
3664 return Err(TransportError::PROTOCOL_VIOLATION(
3665 "No validated NAT traversal paths",
3666 ));
3667 }
3668
3669 let best_path = best_pairs
3671 .iter()
3672 .find(|pair| pair.remote_addr != self.path.remote)
3673 .or_else(|| best_pairs.first());
3674
3675 let best_path = best_path.ok_or_else(|| {
3676 TransportError::PROTOCOL_VIOLATION("No suitable NAT traversal path")
3677 })?;
3678
3679 debug!(
3680 "Migrating to NAT traversal path: {} -> {} (priority: {})",
3681 self.path.remote, best_path.remote_addr, best_path.priority
3682 );
3683
3684 (best_path.remote_addr, best_path.local_addr)
3685 };
3686
3687 self.migrate(now, remote_addr);
3689
3690 if local_addr != SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0) {
3692 self.local_ip = Some(local_addr.ip());
3693 }
3694
3695 self.path.challenge_pending = true;
3697
3698 Ok(())
3699 }
3700
3701 fn update_rem_cid(&mut self) {
3703 let (reset_token, retired) = match self.rem_cids.next() {
3704 Some(x) => x,
3705 None => return,
3706 };
3707
3708 self.spaces[SpaceId::Data]
3710 .pending
3711 .retire_cids
3712 .extend(retired);
3713 self.set_reset_token(reset_token);
3714 }
3715
3716 fn set_reset_token(&mut self, reset_token: ResetToken) {
3717 self.endpoint_events
3718 .push_back(EndpointEventInner::ResetToken(
3719 self.path.remote,
3720 reset_token,
3721 ));
3722 self.peer_params.stateless_reset_token = Some(reset_token);
3723 }
3724
3725 fn issue_first_cids(&mut self, now: Instant) {
3727 if self.local_cid_state.cid_len() == 0 {
3728 return;
3729 }
3730
3731 let mut n = self.peer_params.issue_cids_limit() - 1;
3733 if let ConnectionSide::Server { server_config } = &self.side {
3734 if server_config.has_preferred_address() {
3735 n -= 1;
3737 }
3738 }
3739 self.endpoint_events
3740 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3741 }
3742
3743 fn populate_packet(
3744 &mut self,
3745 now: Instant,
3746 space_id: SpaceId,
3747 buf: &mut Vec<u8>,
3748 max_size: usize,
3749 pn: u64,
3750 ) -> SentFrames {
3751 let mut sent = SentFrames::default();
3752 let space = &mut self.spaces[space_id];
3753 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3754 space.pending_acks.maybe_ack_non_eliciting();
3755
3756 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3758 buf.write(frame::FrameType::HANDSHAKE_DONE);
3759 sent.retransmits.get_or_create().handshake_done = true;
3760 self.stats.frame_tx.handshake_done =
3762 self.stats.frame_tx.handshake_done.saturating_add(1);
3763 }
3764
3765 if mem::replace(&mut space.ping_pending, false) {
3767 trace!("PING");
3768 buf.write(frame::FrameType::PING);
3769 sent.non_retransmits = true;
3770 self.stats.frame_tx.ping += 1;
3771 }
3772
3773 if mem::replace(&mut space.immediate_ack_pending, false) {
3775 trace!("IMMEDIATE_ACK");
3776 buf.write(frame::FrameType::IMMEDIATE_ACK);
3777 sent.non_retransmits = true;
3778 self.stats.frame_tx.immediate_ack += 1;
3779 }
3780
3781 if space.pending_acks.can_send() {
3783 Self::populate_acks(
3784 now,
3785 self.receiving_ecn,
3786 &mut sent,
3787 space,
3788 buf,
3789 &mut self.stats,
3790 );
3791 }
3792
3793 if mem::replace(&mut space.pending.ack_frequency, false) {
3795 let sequence_number = self.ack_frequency.next_sequence_number();
3796
3797 let config = self.config.ack_frequency_config.as_ref().unwrap();
3799
3800 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3802 self.path.rtt.get(),
3803 config,
3804 &self.peer_params,
3805 );
3806
3807 trace!(?max_ack_delay, "ACK_FREQUENCY");
3808
3809 frame::AckFrequency {
3810 sequence: sequence_number,
3811 ack_eliciting_threshold: config.ack_eliciting_threshold,
3812 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3813 reordering_threshold: config.reordering_threshold,
3814 }
3815 .encode(buf);
3816
3817 sent.retransmits.get_or_create().ack_frequency = true;
3818
3819 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3820 self.stats.frame_tx.ack_frequency += 1;
3821 }
3822
3823 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3825 if let Some(token) = self.path.challenge {
3827 self.path.challenge_pending = false;
3829 sent.non_retransmits = true;
3830 sent.requires_padding = true;
3831 trace!("PATH_CHALLENGE {:08x}", token);
3832 buf.write(frame::FrameType::PATH_CHALLENGE);
3833 buf.write(token);
3834 self.stats.frame_tx.path_challenge += 1;
3835 }
3836
3837 }
3846
3847 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3849 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3850 sent.non_retransmits = true;
3851 sent.requires_padding = true;
3852 trace!("PATH_RESPONSE {:08x}", token);
3853 buf.write(frame::FrameType::PATH_RESPONSE);
3854 buf.write(token);
3855 self.stats.frame_tx.path_response += 1;
3856 }
3857 }
3858
3859 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3861 let mut frame = match space.pending.crypto.pop_front() {
3862 Some(x) => x,
3863 None => break,
3864 };
3865
3866 let max_crypto_data_size = max_size
3871 - buf.len()
3872 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3874 - 2; let available_space = max_size - buf.len();
3878 let remaining_data = frame.data.len();
3879 #[cfg(feature = "pqc")]
3880 let optimal_size = self
3881 .pqc_state
3882 .calculate_crypto_frame_size(available_space, remaining_data);
3883 #[cfg(not(feature = "pqc"))]
3884 let optimal_size = available_space.min(remaining_data);
3885
3886 let len = frame
3887 .data
3888 .len()
3889 .min(2usize.pow(14) - 1)
3890 .min(max_crypto_data_size)
3891 .min(optimal_size);
3892
3893 let data = frame.data.split_to(len);
3894 let truncated = frame::Crypto {
3895 offset: frame.offset,
3896 data,
3897 };
3898 trace!(
3899 "CRYPTO: off {} len {}",
3900 truncated.offset,
3901 truncated.data.len()
3902 );
3903 truncated.encode(buf);
3904 self.stats.frame_tx.crypto += 1;
3905 sent.retransmits.get_or_create().crypto.push_back(truncated);
3906 if !frame.data.is_empty() {
3907 frame.offset += len as u64;
3908 space.pending.crypto.push_front(frame);
3909 }
3910 }
3911
3912 if space_id == SpaceId::Data {
3913 self.streams.write_control_frames(
3914 buf,
3915 &mut space.pending,
3916 &mut sent.retransmits,
3917 &mut self.stats.frame_tx,
3918 max_size,
3919 );
3920 }
3921
3922 while buf.len() + 44 < max_size {
3924 let issued = match space.pending.new_cids.pop() {
3925 Some(x) => x,
3926 None => break,
3927 };
3928 trace!(
3929 sequence = issued.sequence,
3930 id = %issued.id,
3931 "NEW_CONNECTION_ID"
3932 );
3933 frame::NewConnectionId {
3934 sequence: issued.sequence,
3935 retire_prior_to: self.local_cid_state.retire_prior_to(),
3936 id: issued.id,
3937 reset_token: issued.reset_token,
3938 }
3939 .encode(buf);
3940 sent.retransmits.get_or_create().new_cids.push(issued);
3941 self.stats.frame_tx.new_connection_id += 1;
3942 }
3943
3944 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3946 let seq = match space.pending.retire_cids.pop() {
3947 Some(x) => x,
3948 None => break,
3949 };
3950 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3951 buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3952 buf.write_var(seq);
3953 sent.retransmits.get_or_create().retire_cids.push(seq);
3954 self.stats.frame_tx.retire_connection_id += 1;
3955 }
3956
3957 let mut sent_datagrams = false;
3959 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3960 match self.datagrams.write(buf, max_size) {
3961 true => {
3962 sent_datagrams = true;
3963 sent.non_retransmits = true;
3964 self.stats.frame_tx.datagram += 1;
3965 }
3966 false => break,
3967 }
3968 }
3969 if self.datagrams.send_blocked && sent_datagrams {
3970 self.events.push_back(Event::DatagramsUnblocked);
3971 self.datagrams.send_blocked = false;
3972 }
3973
3974 while let Some(remote_addr) = space.pending.new_tokens.pop() {
3976 debug_assert_eq!(space_id, SpaceId::Data);
3977 let ConnectionSide::Server { server_config } = &self.side else {
3978 debug_assert!(false, "NEW_TOKEN frames should not be enqueued by clients");
3980 continue;
3981 };
3982
3983 if remote_addr != self.path.remote {
3984 continue;
3989 }
3990
3991 let token = Token::new(
3992 TokenPayload::Validation {
3993 ip: remote_addr.ip(),
3994 issued: server_config.time_source.now(),
3995 },
3996 &mut self.rng,
3997 );
3998 let new_token = NewToken {
3999 token: token.encode(&*server_config.token_key).into(),
4000 };
4001
4002 if buf.len() + new_token.size() >= max_size {
4003 space.pending.new_tokens.push(remote_addr);
4004 break;
4005 }
4006
4007 new_token.encode(buf);
4008 sent.retransmits
4009 .get_or_create()
4010 .new_tokens
4011 .push(remote_addr);
4012 self.stats.frame_tx.new_token += 1;
4013 }
4014
4015 while buf.len() + frame::AddAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4017 let add_address = match space.pending.add_addresses.pop() {
4018 Some(x) => x,
4019 None => break,
4020 };
4021 trace!(
4022 sequence = %add_address.sequence,
4023 address = %add_address.address,
4024 "ADD_ADDRESS"
4025 );
4026 if self.nat_traversal_frame_config.use_rfc_format {
4028 add_address.encode_rfc(buf);
4029 } else {
4030 add_address.encode_legacy(buf);
4031 }
4032 sent.retransmits
4033 .get_or_create()
4034 .add_addresses
4035 .push(add_address);
4036 self.stats.frame_tx.add_address += 1;
4037 }
4038
4039 while buf.len() + frame::PunchMeNow::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4041 let punch_me_now = match space.pending.punch_me_now.pop() {
4042 Some(x) => x,
4043 None => break,
4044 };
4045 trace!(
4046 round = %punch_me_now.round,
4047 paired_with_sequence_number = %punch_me_now.paired_with_sequence_number,
4048 "PUNCH_ME_NOW"
4049 );
4050 if self.nat_traversal_frame_config.use_rfc_format {
4052 punch_me_now.encode_rfc(buf);
4053 } else {
4054 punch_me_now.encode_legacy(buf);
4055 }
4056 sent.retransmits
4057 .get_or_create()
4058 .punch_me_now
4059 .push(punch_me_now);
4060 self.stats.frame_tx.punch_me_now += 1;
4061 }
4062
4063 while buf.len() + frame::RemoveAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4065 let remove_address = match space.pending.remove_addresses.pop() {
4066 Some(x) => x,
4067 None => break,
4068 };
4069 trace!(
4070 sequence = %remove_address.sequence,
4071 "REMOVE_ADDRESS"
4072 );
4073 remove_address.encode(buf);
4075 sent.retransmits
4076 .get_or_create()
4077 .remove_addresses
4078 .push(remove_address);
4079 self.stats.frame_tx.remove_address += 1;
4080 }
4081
4082 while buf.len() + frame::ObservedAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data
4084 {
4085 let observed_address = match space.pending.observed_addresses.pop() {
4086 Some(x) => x,
4087 None => break,
4088 };
4089 trace!(
4090 address = %observed_address.address,
4091 "OBSERVED_ADDRESS"
4092 );
4093 observed_address.encode(buf);
4094 sent.retransmits
4095 .get_or_create()
4096 .observed_addresses
4097 .push(observed_address);
4098 self.stats.frame_tx.observed_address += 1;
4099 }
4100
4101 if space_id == SpaceId::Data {
4103 sent.stream_frames =
4104 self.streams
4105 .write_stream_frames(buf, max_size, self.config.send_fairness);
4106 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
4107 }
4108
4109 sent
4110 }
4111
4112 fn populate_acks(
4117 now: Instant,
4118 receiving_ecn: bool,
4119 sent: &mut SentFrames,
4120 space: &mut PacketSpace,
4121 buf: &mut Vec<u8>,
4122 stats: &mut ConnectionStats,
4123 ) {
4124 debug_assert!(!space.pending_acks.ranges().is_empty());
4125
4126 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
4128 let ecn = if receiving_ecn {
4129 Some(&space.ecn_counters)
4130 } else {
4131 None
4132 };
4133 sent.largest_acked = space.pending_acks.ranges().max();
4134
4135 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
4136
4137 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
4139 let delay = delay_micros >> ack_delay_exp.into_inner();
4140
4141 trace!(
4142 "ACK {:?}, Delay = {}us",
4143 space.pending_acks.ranges(),
4144 delay_micros
4145 );
4146
4147 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
4148 stats.frame_tx.acks += 1;
4149 }
4150
4151 fn close_common(&mut self) {
4152 trace!("connection closed");
4153 for &timer in &Timer::VALUES {
4154 self.timers.stop(timer);
4155 }
4156 }
4157
4158 fn set_close_timer(&mut self, now: Instant) {
4159 self.timers
4160 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
4161 }
4162
4163 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
4165 if Some(self.orig_rem_cid) != params.initial_src_cid
4166 || (self.side.is_client()
4167 && (Some(self.initial_dst_cid) != params.original_dst_cid
4168 || self.retry_src_cid != params.retry_src_cid))
4169 {
4170 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
4171 "CID authentication failure",
4172 ));
4173 }
4174
4175 self.set_peer_params(params);
4176
4177 Ok(())
4178 }
4179
4180 fn set_peer_params(&mut self, params: TransportParameters) {
4181 self.streams.set_params(¶ms);
4182 self.idle_timeout =
4183 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
4184 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
4185 if let Some(ref info) = params.preferred_address {
4186 self.rem_cids.insert(frame::NewConnectionId {
4187 sequence: 1,
4188 id: info.connection_id,
4189 reset_token: info.stateless_reset_token,
4190 retire_prior_to: 0,
4191 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
4192 }
4193 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
4194
4195 self.negotiate_nat_traversal_capability(¶ms);
4197
4198 let local_has_nat_traversal = self.config.nat_traversal_config.is_some();
4201 let local_supports_rfc = local_has_nat_traversal;
4204 self.nat_traversal_frame_config = frame::nat_traversal_unified::NatTraversalFrameConfig {
4205 use_rfc_format: local_supports_rfc && params.supports_rfc_nat_traversal(),
4207 accept_legacy: true,
4209 };
4210
4211 self.negotiate_address_discovery(¶ms);
4213
4214 #[cfg(feature = "pqc")]
4216 {
4217 self.pqc_state.update_from_peer_params(¶ms);
4218
4219 if self.pqc_state.enabled && self.pqc_state.using_pqc {
4221 trace!("PQC enabled, adjusting MTU discovery for larger handshake packets");
4222 let current_mtu = self.path.mtud.current_mtu();
4226 if current_mtu < self.pqc_state.handshake_mtu {
4227 trace!(
4228 "Current MTU {} is less than PQC handshake MTU {}, will rely on MTU discovery",
4229 current_mtu, self.pqc_state.handshake_mtu
4230 );
4231 }
4232 }
4233 }
4234
4235 self.peer_params = params;
4236 self.path.mtud.on_peer_max_udp_payload_size_received(
4237 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
4238 );
4239 }
4240
4241 fn negotiate_nat_traversal_capability(&mut self, params: &TransportParameters) {
4243 let peer_nat_config = match ¶ms.nat_traversal {
4245 Some(config) => config,
4246 None => {
4247 if self.config.nat_traversal_config.is_some() {
4249 debug!(
4250 "Peer does not support NAT traversal, maintaining backward compatibility"
4251 );
4252 self.emit_nat_traversal_capability_event(false);
4253
4254 self.set_nat_traversal_compatibility_mode(false);
4256 }
4257 return;
4258 }
4259 };
4260
4261 let local_nat_config = match &self.config.nat_traversal_config {
4263 Some(config) => config,
4264 None => {
4265 debug!("NAT traversal not enabled locally, ignoring peer support");
4266 self.emit_nat_traversal_capability_event(false);
4267 self.set_nat_traversal_compatibility_mode(false);
4268 return;
4269 }
4270 };
4271
4272 info!("Both peers support NAT traversal, negotiating capabilities");
4274
4275 match self.negotiate_nat_traversal_parameters(local_nat_config, peer_nat_config) {
4277 Ok(negotiated_config) => {
4278 info!("NAT traversal capability negotiated successfully");
4279 self.emit_nat_traversal_capability_event(true);
4280
4281 self.init_nat_traversal_with_negotiated_config(&negotiated_config);
4283
4284 self.set_nat_traversal_compatibility_mode(true);
4286
4287 if matches!(
4289 negotiated_config,
4290 crate::transport_parameters::NatTraversalConfig::ClientSupport
4291 ) {
4292 self.initiate_nat_traversal_process();
4293 }
4294 }
4295 Err(e) => {
4296 warn!("NAT traversal capability negotiation failed: {}", e);
4297 self.emit_nat_traversal_capability_event(false);
4298 self.set_nat_traversal_compatibility_mode(false);
4299 }
4300 }
4301 }
4302
4303 fn emit_nat_traversal_capability_event(&mut self, negotiated: bool) {
4357 if negotiated {
4360 info!("NAT traversal capability successfully negotiated");
4361 } else {
4362 info!("NAT traversal capability not available (peer or local support missing)");
4363 }
4364
4365 }
4368
4369 fn set_nat_traversal_compatibility_mode(&mut self, enabled: bool) {
4371 if enabled {
4372 debug!("NAT traversal enabled for this connection");
4373 } else {
4375 debug!("NAT traversal disabled for this connection (backward compatibility mode)");
4376 if self.nat_traversal.is_some() {
4378 warn!("Clearing NAT traversal state due to compatibility mode");
4379 self.nat_traversal = None;
4380 }
4381 }
4382 }
4383
4384 fn negotiate_nat_traversal_parameters(
4386 &self,
4387 local_config: &crate::transport_parameters::NatTraversalConfig,
4388 peer_config: &crate::transport_parameters::NatTraversalConfig,
4389 ) -> Result<crate::transport_parameters::NatTraversalConfig, String> {
4390 match (local_config, peer_config) {
4395 (
4397 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4398 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4399 concurrency_limit,
4400 },
4401 ) => Ok(
4402 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4403 concurrency_limit: *concurrency_limit,
4404 },
4405 ),
4406 (
4408 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4409 concurrency_limit,
4410 },
4411 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4412 ) => Ok(
4413 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4414 concurrency_limit: *concurrency_limit,
4415 },
4416 ),
4417 (
4419 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4420 concurrency_limit: limit1,
4421 },
4422 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4423 concurrency_limit: limit2,
4424 },
4425 ) => Ok(
4426 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4427 concurrency_limit: (*limit1).min(*limit2),
4428 },
4429 ),
4430 (
4432 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4433 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4434 ) => Err("Both endpoints claim to be NAT traversal clients".to_string()),
4435 }
4436 }
4437
4438 fn init_nat_traversal_with_negotiated_config(
4440 &mut self,
4441 config: &crate::transport_parameters::NatTraversalConfig,
4442 ) {
4443 let (role, _concurrency_limit) = match config {
4446 crate::transport_parameters::NatTraversalConfig::ClientSupport => {
4447 (NatTraversalRole::Client, 10) }
4450 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4451 concurrency_limit,
4452 } => {
4453 (
4455 NatTraversalRole::Server { can_relay: false },
4456 concurrency_limit.into_inner() as u32,
4457 )
4458 }
4459 };
4460
4461 let max_candidates = 50; let coordination_timeout = Duration::from_secs(10); self.nat_traversal = Some(NatTraversalState::new(
4467 role,
4468 max_candidates,
4469 coordination_timeout,
4470 ));
4471
4472 trace!(
4473 "NAT traversal initialized with negotiated config: role={:?}",
4474 role
4475 );
4476
4477 match role {
4479 NatTraversalRole::Bootstrap => {
4480 self.prepare_address_observation();
4482 }
4483 NatTraversalRole::Client => {
4484 self.schedule_candidate_discovery();
4486 }
4487 NatTraversalRole::Server { .. } => {
4488 self.prepare_coordination_handling();
4490 }
4491 }
4492 }
4493
4494 fn initiate_nat_traversal_process(&mut self) {
4496 if let Some(nat_state) = &mut self.nat_traversal {
4497 match nat_state.start_candidate_discovery() {
4498 Ok(()) => {
4499 debug!("NAT traversal process initiated - candidate discovery started");
4500 self.timers.set(
4502 Timer::NatTraversal,
4503 Instant::now() + Duration::from_millis(100),
4504 );
4505 }
4506 Err(e) => {
4507 warn!("Failed to initiate NAT traversal process: {}", e);
4508 }
4509 }
4510 }
4511 }
4512
4513 fn prepare_address_observation(&mut self) {
4515 debug!("Preparing for address observation as bootstrap node");
4516 }
4519
4520 fn schedule_candidate_discovery(&mut self) {
4522 debug!("Scheduling candidate discovery for client endpoint");
4523 self.timers.set(
4525 Timer::NatTraversal,
4526 Instant::now() + Duration::from_millis(50),
4527 );
4528 }
4529
4530 fn prepare_coordination_handling(&mut self) {
4532 debug!("Preparing to handle coordination requests as server endpoint");
4533 }
4536
4537 fn handle_nat_traversal_timeout(&mut self, now: Instant) {
4539 let timeout_result = if let Some(nat_state) = &mut self.nat_traversal {
4541 nat_state.handle_timeout(now)
4542 } else {
4543 return;
4544 };
4545
4546 match timeout_result {
4548 Ok(actions) => {
4549 for action in actions {
4550 match action {
4551 nat_traversal::TimeoutAction::RetryDiscovery => {
4552 debug!("NAT traversal timeout: retrying candidate discovery");
4553 if let Some(nat_state) = &mut self.nat_traversal {
4554 if let Err(e) = nat_state.start_candidate_discovery() {
4555 warn!("Failed to retry candidate discovery: {}", e);
4556 }
4557 }
4558 }
4559 nat_traversal::TimeoutAction::RetryCoordination => {
4560 debug!("NAT traversal timeout: retrying coordination");
4561 self.timers
4563 .set(Timer::NatTraversal, now + Duration::from_secs(2));
4564 }
4565 nat_traversal::TimeoutAction::StartValidation => {
4566 debug!("NAT traversal timeout: starting path validation");
4567 self.start_nat_traversal_validation(now);
4568 }
4569 nat_traversal::TimeoutAction::Complete => {
4570 debug!("NAT traversal completed successfully");
4571 self.timers.stop(Timer::NatTraversal);
4573 }
4574 nat_traversal::TimeoutAction::Failed => {
4575 warn!("NAT traversal failed after timeout");
4576 self.handle_nat_traversal_failure();
4578 }
4579 }
4580 }
4581 }
4582 Err(e) => {
4583 warn!("NAT traversal timeout handling failed: {}", e);
4584 self.handle_nat_traversal_failure();
4585 }
4586 }
4587 }
4588
4589 fn start_nat_traversal_validation(&mut self, now: Instant) {
4591 if let Some(nat_state) = &mut self.nat_traversal {
4592 let pairs = nat_state.get_next_validation_pairs(3);
4594
4595 for pair in pairs {
4596 let challenge = self.rng.r#gen();
4598 self.path.challenge = Some(challenge);
4599 self.path.challenge_pending = true;
4600
4601 debug!(
4602 "Starting path validation for NAT traversal candidate: {}",
4603 pair.remote_addr
4604 );
4605 }
4606
4607 self.timers
4609 .set(Timer::PathValidation, now + Duration::from_secs(3));
4610 }
4611 }
4612
4613 fn handle_nat_traversal_failure(&mut self) {
4615 warn!("NAT traversal failed, considering fallback options");
4616
4617 self.nat_traversal = None;
4619 self.timers.stop(Timer::NatTraversal);
4620
4621 debug!("NAT traversal disabled for this connection due to failure");
4628 }
4629
4630 pub fn nat_traversal_supported(&self) -> bool {
4632 self.nat_traversal.is_some()
4633 && self.config.nat_traversal_config.is_some()
4634 && self.peer_params.nat_traversal.is_some()
4635 }
4636
4637 pub fn nat_traversal_config(&self) -> Option<&crate::transport_parameters::NatTraversalConfig> {
4639 self.peer_params.nat_traversal.as_ref()
4640 }
4641
4642 pub fn nat_traversal_ready(&self) -> bool {
4644 self.nat_traversal_supported() && matches!(self.state, State::Established)
4645 }
4646
4647 pub(crate) fn nat_traversal_stats(&self) -> Option<nat_traversal::NatTraversalStats> {
4652 self.nat_traversal.as_ref().map(|state| state.stats.clone())
4653 }
4654
4655 #[cfg(test)]
4657 pub(crate) fn force_enable_nat_traversal(&mut self, role: NatTraversalRole) {
4658 use crate::transport_parameters::NatTraversalConfig;
4659
4660 let config = match role {
4662 NatTraversalRole::Client => NatTraversalConfig::ClientSupport,
4663 NatTraversalRole::Server { .. } | NatTraversalRole::Bootstrap => {
4664 NatTraversalConfig::ServerSupport {
4665 concurrency_limit: VarInt::from_u32(5),
4666 }
4667 }
4668 };
4669
4670 self.peer_params.nat_traversal = Some(config.clone());
4671 self.config = Arc::new({
4672 let mut transport_config = (*self.config).clone();
4673 transport_config.nat_traversal_config = Some(config);
4674 transport_config
4675 });
4676
4677 self.nat_traversal = Some(NatTraversalState::new(role, 8, Duration::from_secs(10)));
4678 }
4679
4680 fn derive_peer_id_from_connection(&self) -> [u8; 32] {
4683 let mut hasher = std::collections::hash_map::DefaultHasher::new();
4685 use std::hash::Hasher;
4686 hasher.write(&self.rem_handshake_cid);
4687 hasher.write(&self.handshake_cid);
4688 hasher.write(&self.path.remote.to_string().into_bytes());
4689 let hash = hasher.finish();
4690 let mut peer_id = [0u8; 32];
4691 peer_id[..8].copy_from_slice(&hash.to_be_bytes());
4692 let cid_bytes = self.rem_handshake_cid.as_ref();
4694 let copy_len = (cid_bytes.len()).min(24);
4695 peer_id[8..8 + copy_len].copy_from_slice(&cid_bytes[..copy_len]);
4696 peer_id
4697 }
4698
4699 fn handle_add_address(
4701 &mut self,
4702 add_address: &crate::frame::AddAddress,
4703 now: Instant,
4704 ) -> Result<(), TransportError> {
4705 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4706 TransportError::PROTOCOL_VIOLATION("AddAddress frame without NAT traversal negotiation")
4707 })?;
4708
4709 match nat_state.add_remote_candidate(
4710 add_address.sequence,
4711 add_address.address,
4712 add_address.priority,
4713 now,
4714 ) {
4715 Ok(()) => {
4716 trace!(
4717 "Added remote candidate: {} (seq={}, priority={})",
4718 add_address.address, add_address.sequence, add_address.priority
4719 );
4720
4721 self.trigger_candidate_validation(add_address.address, now)?;
4723 Ok(())
4724 }
4725 Err(NatTraversalError::TooManyCandidates) => Err(TransportError::PROTOCOL_VIOLATION(
4726 "too many NAT traversal candidates",
4727 )),
4728 Err(NatTraversalError::DuplicateAddress) => {
4729 Ok(())
4731 }
4732 Err(e) => {
4733 warn!("Failed to add remote candidate: {}", e);
4734 Ok(()) }
4736 }
4737 }
4738
4739 fn handle_punch_me_now(
4741 &mut self,
4742 punch_me_now: &crate::frame::PunchMeNow,
4743 now: Instant,
4744 ) -> Result<(), TransportError> {
4745 trace!(
4746 "Received PunchMeNow: round={}, target_seq={}, local_addr={}",
4747 punch_me_now.round, punch_me_now.paired_with_sequence_number, punch_me_now.address
4748 );
4749
4750 if let Some(nat_state) = &self.nat_traversal {
4752 if matches!(nat_state.role, NatTraversalRole::Bootstrap) {
4753 let from_peer_id = self.derive_peer_id_from_connection();
4755
4756 let punch_me_now_clone = punch_me_now.clone();
4758 drop(nat_state); match self
4761 .nat_traversal
4762 .as_mut()
4763 .unwrap()
4764 .handle_punch_me_now_frame(
4765 from_peer_id,
4766 self.path.remote,
4767 &punch_me_now_clone,
4768 now,
4769 ) {
4770 Ok(Some(coordination_frame)) => {
4771 trace!("Bootstrap node coordinating PUNCH_ME_NOW between peers");
4772
4773 if let Some(target_peer_id) = punch_me_now.target_peer_id {
4775 self.endpoint_events.push_back(
4776 crate::shared::EndpointEventInner::RelayPunchMeNow(
4777 target_peer_id,
4778 coordination_frame,
4779 ),
4780 );
4781 }
4782
4783 return Ok(());
4784 }
4785 Ok(None) => {
4786 trace!("Bootstrap coordination completed or no action needed");
4787 return Ok(());
4788 }
4789 Err(e) => {
4790 warn!("Bootstrap coordination failed: {}", e);
4791 return Ok(());
4792 }
4793 }
4794 }
4795 }
4796
4797 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4799 TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation")
4800 })?;
4801
4802 if nat_state
4804 .handle_peer_punch_request(punch_me_now.round, now)
4805 .map_err(|_e| {
4806 TransportError::PROTOCOL_VIOLATION("Failed to handle peer punch request")
4807 })?
4808 {
4809 trace!("Coordination synchronized for round {}", punch_me_now.round);
4810
4811 let _local_addr = self
4814 .local_ip
4815 .map(|ip| SocketAddr::new(ip, 0))
4816 .unwrap_or_else(|| {
4817 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
4818 });
4819
4820 let target = nat_traversal::PunchTarget {
4821 remote_addr: punch_me_now.address,
4822 remote_sequence: punch_me_now.paired_with_sequence_number,
4823 challenge: self.rng.r#gen(),
4824 };
4825
4826 let _ = nat_state.start_coordination_round(vec![target], now);
4828 } else {
4829 debug!(
4830 "Failed to synchronize coordination for round {}",
4831 punch_me_now.round
4832 );
4833 }
4834
4835 Ok(())
4836 }
4837
4838 fn handle_remove_address(
4840 &mut self,
4841 remove_address: &crate::frame::RemoveAddress,
4842 ) -> Result<(), TransportError> {
4843 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4844 TransportError::PROTOCOL_VIOLATION(
4845 "RemoveAddress frame without NAT traversal negotiation",
4846 )
4847 })?;
4848
4849 if nat_state.remove_candidate(remove_address.sequence) {
4850 trace!(
4851 "Removed candidate with sequence {}",
4852 remove_address.sequence
4853 );
4854 } else {
4855 trace!(
4856 "Attempted to remove unknown candidate sequence {}",
4857 remove_address.sequence
4858 );
4859 }
4860
4861 Ok(())
4862 }
4863
4864 fn handle_observed_address_frame(
4866 &mut self,
4867 observed_address: &crate::frame::ObservedAddress,
4868 now: Instant,
4869 ) -> Result<(), TransportError> {
4870 let state = self.address_discovery_state.as_mut().ok_or_else(|| {
4872 TransportError::PROTOCOL_VIOLATION(
4873 "ObservedAddress frame without address discovery negotiation",
4874 )
4875 })?;
4876
4877 if !state.enabled {
4879 return Err(TransportError::PROTOCOL_VIOLATION(
4880 "ObservedAddress frame received when address discovery is disabled",
4881 ));
4882 }
4883
4884 #[cfg(feature = "trace")]
4886 {
4887 use crate::trace_observed_address_received;
4888 trace_observed_address_received!(
4890 &self.event_log,
4891 self.trace_context.trace_id(),
4892 observed_address.address,
4893 0u64 );
4895 }
4896
4897 let path_id = 0u64; if let Some(&last_seq) = state.last_received_sequence.get(&path_id) {
4905 if observed_address.sequence_number <= last_seq {
4906 trace!(
4907 "Ignoring OBSERVED_ADDRESS frame with stale sequence number {} (last was {})",
4908 observed_address.sequence_number, last_seq
4909 );
4910 return Ok(());
4911 }
4912 }
4913
4914 state
4916 .last_received_sequence
4917 .insert(path_id, observed_address.sequence_number);
4918
4919 state.handle_observed_address(observed_address.address, path_id, now);
4921
4922 self.path
4924 .update_observed_address(observed_address.address, now);
4925
4926 trace!(
4928 "Received ObservedAddress frame: address={} for path={}",
4929 observed_address.address, path_id
4930 );
4931
4932 Ok(())
4933 }
4934
4935 pub fn queue_add_address(&mut self, sequence: VarInt, address: SocketAddr, priority: VarInt) {
4937 let add_address = frame::AddAddress {
4939 sequence,
4940 address,
4941 priority,
4942 };
4943
4944 self.spaces[SpaceId::Data]
4945 .pending
4946 .add_addresses
4947 .push(add_address);
4948 trace!(
4949 "Queued AddAddress frame: seq={}, addr={}, priority={}",
4950 sequence, address, priority
4951 );
4952 }
4953
4954 pub fn queue_punch_me_now(
4956 &mut self,
4957 round: VarInt,
4958 paired_with_sequence_number: VarInt,
4959 address: SocketAddr,
4960 ) {
4961 let punch_me_now = frame::PunchMeNow {
4962 round,
4963 paired_with_sequence_number,
4964 address,
4965 target_peer_id: None, };
4967
4968 self.spaces[SpaceId::Data]
4969 .pending
4970 .punch_me_now
4971 .push(punch_me_now);
4972 trace!(
4973 "Queued PunchMeNow frame: round={}, target={}",
4974 round, paired_with_sequence_number
4975 );
4976 }
4977
4978 pub fn queue_remove_address(&mut self, sequence: VarInt) {
4980 let remove_address = frame::RemoveAddress { sequence };
4981
4982 self.spaces[SpaceId::Data]
4983 .pending
4984 .remove_addresses
4985 .push(remove_address);
4986 trace!("Queued RemoveAddress frame: seq={}", sequence);
4987 }
4988
4989 pub fn queue_observed_address(&mut self, address: SocketAddr) {
4991 let sequence_number = if let Some(state) = &mut self.address_discovery_state {
4993 let seq = state.next_sequence_number;
4994 state.next_sequence_number =
4995 VarInt::from_u64(state.next_sequence_number.into_inner() + 1)
4996 .expect("sequence number overflow");
4997 seq
4998 } else {
4999 VarInt::from_u32(0)
5001 };
5002
5003 let observed_address = frame::ObservedAddress {
5004 sequence_number,
5005 address,
5006 };
5007 self.spaces[SpaceId::Data]
5008 .pending
5009 .observed_addresses
5010 .push(observed_address);
5011 trace!("Queued ObservedAddress frame: addr={}", address);
5012 }
5013
5014 pub fn check_for_address_observations(&mut self, now: Instant) {
5016 let Some(state) = &mut self.address_discovery_state else {
5018 return;
5019 };
5020
5021 if !state.enabled {
5023 return;
5024 }
5025
5026 let path_id = 0u64; let remote_address = self.path.remote;
5031
5032 if state.should_send_observation(path_id, now) {
5034 if let Some(frame) = state.queue_observed_address_frame(path_id, remote_address) {
5036 self.spaces[SpaceId::Data]
5038 .pending
5039 .observed_addresses
5040 .push(frame);
5041
5042 state.record_observation_sent(path_id);
5044
5045 #[cfg(feature = "trace")]
5047 {
5048 use crate::trace_observed_address_sent;
5049 trace_observed_address_sent!(
5051 &self.event_log,
5052 self.trace_context.trace_id(),
5053 remote_address,
5054 path_id
5055 );
5056 }
5057
5058 trace!(
5059 "Queued OBSERVED_ADDRESS frame for path {} with address {}",
5060 path_id, remote_address
5061 );
5062 }
5063 }
5064 }
5065
5066 fn trigger_candidate_validation(
5068 &mut self,
5069 candidate_address: SocketAddr,
5070 now: Instant,
5071 ) -> Result<(), TransportError> {
5072 let nat_state = self
5073 .nat_traversal
5074 .as_mut()
5075 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5076
5077 if nat_state
5079 .active_validations
5080 .contains_key(&candidate_address)
5081 {
5082 trace!("Validation already in progress for {}", candidate_address);
5083 return Ok(());
5084 }
5085
5086 let challenge = self.rng.r#gen::<u64>();
5088
5089 let validation_state = nat_traversal::PathValidationState {
5091 challenge,
5092 sent_at: now,
5093 retry_count: 0,
5094 max_retries: 3,
5095 coordination_round: None,
5096 timeout_state: nat_traversal::AdaptiveTimeoutState::new(),
5097 last_retry_at: None,
5098 };
5099
5100 nat_state
5102 .active_validations
5103 .insert(candidate_address, validation_state);
5104
5105 self.nat_traversal_challenges
5107 .push(candidate_address, challenge);
5108
5109 nat_state.stats.validations_succeeded += 1; trace!(
5113 "Triggered PATH_CHALLENGE validation for {} with challenge {:016x}",
5114 candidate_address, challenge
5115 );
5116
5117 Ok(())
5118 }
5119
5120 pub fn nat_traversal_state(&self) -> Option<(NatTraversalRole, usize, usize)> {
5122 self.nat_traversal.as_ref().map(|state| {
5123 (
5124 state.role,
5125 state.local_candidates.len(),
5126 state.remote_candidates.len(),
5127 )
5128 })
5129 }
5130
5131 pub fn initiate_nat_traversal_coordination(
5133 &mut self,
5134 now: Instant,
5135 ) -> Result<(), TransportError> {
5136 let nat_state = self
5137 .nat_traversal
5138 .as_mut()
5139 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5140
5141 if nat_state.should_send_punch_request() {
5143 nat_state.generate_candidate_pairs(now);
5145
5146 let pairs = nat_state.get_next_validation_pairs(3);
5148 if pairs.is_empty() {
5149 return Err(TransportError::PROTOCOL_VIOLATION(
5150 "No candidate pairs for coordination",
5151 ));
5152 }
5153
5154 let targets: Vec<_> = pairs
5156 .into_iter()
5157 .map(|pair| nat_traversal::PunchTarget {
5158 remote_addr: pair.remote_addr,
5159 remote_sequence: pair.remote_sequence,
5160 challenge: self.rng.r#gen(),
5161 })
5162 .collect();
5163
5164 let round = nat_state
5166 .start_coordination_round(targets, now)
5167 .map_err(|_e| {
5168 TransportError::PROTOCOL_VIOLATION("Failed to start coordination round")
5169 })?;
5170
5171 let local_addr = self
5174 .local_ip
5175 .map(|ip| SocketAddr::new(ip, self.local_ip.map(|_| 0).unwrap_or(0)))
5176 .unwrap_or_else(|| {
5177 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
5178 });
5179
5180 let punch_me_now = frame::PunchMeNow {
5181 round,
5182 paired_with_sequence_number: VarInt::from_u32(0), address: local_addr,
5184 target_peer_id: None, };
5186
5187 self.spaces[SpaceId::Data]
5188 .pending
5189 .punch_me_now
5190 .push(punch_me_now);
5191 nat_state.mark_punch_request_sent();
5192
5193 trace!("Initiated NAT traversal coordination round {}", round);
5194 }
5195
5196 Ok(())
5197 }
5198
5199 pub fn validate_nat_candidates(&mut self, now: Instant) {
5201 self.generate_nat_traversal_challenges(now);
5202 }
5203
5204 pub fn send_nat_address_advertisement(
5219 &mut self,
5220 address: SocketAddr,
5221 priority: u32,
5222 ) -> Result<u64, ConnectionError> {
5223 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5225 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5226 "NAT traversal not enabled on this connection",
5227 ))
5228 })?;
5229
5230 let sequence = nat_state.next_sequence;
5232 nat_state.next_sequence =
5233 VarInt::from_u64(nat_state.next_sequence.into_inner() + 1).unwrap();
5234
5235 let now = Instant::now();
5237 nat_state.local_candidates.insert(
5238 sequence,
5239 nat_traversal::AddressCandidate {
5240 address,
5241 priority,
5242 source: nat_traversal::CandidateSource::Local,
5243 discovered_at: now,
5244 state: nat_traversal::CandidateState::New,
5245 attempt_count: 0,
5246 last_attempt: None,
5247 },
5248 );
5249
5250 nat_state.stats.local_candidates_sent += 1;
5252
5253 self.queue_add_address(sequence, address, VarInt::from_u32(priority));
5255
5256 debug!(
5257 "Queued ADD_ADDRESS frame: addr={}, priority={}, seq={}",
5258 address, priority, sequence
5259 );
5260 Ok(sequence.into_inner())
5261 }
5262
5263 pub fn send_nat_punch_coordination(
5276 &mut self,
5277 paired_with_sequence_number: u64,
5278 address: SocketAddr,
5279 round: u32,
5280 ) -> Result<(), ConnectionError> {
5281 let _nat_state = self.nat_traversal.as_ref().ok_or_else(|| {
5283 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5284 "NAT traversal not enabled on this connection",
5285 ))
5286 })?;
5287
5288 self.queue_punch_me_now(
5290 VarInt::from_u32(round),
5291 VarInt::from_u64(paired_with_sequence_number).map_err(|_| {
5292 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5293 "Invalid target sequence number",
5294 ))
5295 })?,
5296 address,
5297 );
5298
5299 debug!(
5300 "Queued PUNCH_ME_NOW frame: paired_with_seq={}, addr={}, round={}",
5301 paired_with_sequence_number, address, round
5302 );
5303 Ok(())
5304 }
5305
5306 pub fn send_nat_address_removal(&mut self, sequence: u64) -> Result<(), ConnectionError> {
5317 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5319 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5320 "NAT traversal not enabled on this connection",
5321 ))
5322 })?;
5323
5324 let sequence_varint = VarInt::from_u64(sequence).map_err(|_| {
5325 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5326 "Invalid sequence number",
5327 ))
5328 })?;
5329
5330 nat_state.local_candidates.remove(&sequence_varint);
5332
5333 self.queue_remove_address(sequence_varint);
5335
5336 debug!("Queued REMOVE_ADDRESS frame: seq={}", sequence);
5337 Ok(())
5338 }
5339
5340 pub(crate) fn get_nat_traversal_stats(&self) -> Option<&nat_traversal::NatTraversalStats> {
5349 self.nat_traversal.as_ref().map(|state| &state.stats)
5350 }
5351
5352 pub fn is_nat_traversal_enabled(&self) -> bool {
5354 self.nat_traversal.is_some()
5355 }
5356
5357 pub fn get_nat_traversal_role(&self) -> Option<NatTraversalRole> {
5359 self.nat_traversal.as_ref().map(|state| state.role)
5360 }
5361
5362 fn negotiate_address_discovery(&mut self, peer_params: &TransportParameters) {
5364 let now = Instant::now();
5365
5366 match &peer_params.address_discovery {
5368 Some(peer_config) => {
5369 if let Some(state) = &mut self.address_discovery_state {
5371 if state.enabled {
5372 debug!(
5375 "Address discovery negotiated: rate={}, all_paths={}",
5376 state.max_observation_rate, state.observe_all_paths
5377 );
5378 } else {
5379 debug!("Address discovery disabled locally, ignoring peer support");
5381 }
5382 } else {
5383 self.address_discovery_state =
5385 Some(AddressDiscoveryState::new(peer_config, now));
5386 debug!("Address discovery initialized from peer config");
5387 }
5388 }
5389 _ => {
5390 if let Some(state) = &mut self.address_discovery_state {
5392 state.enabled = false;
5393 debug!("Address discovery disabled - peer doesn't support it");
5394 }
5395 }
5396 }
5397
5398 if let Some(state) = &self.address_discovery_state {
5400 if state.enabled {
5401 self.path.set_observation_rate(state.max_observation_rate);
5402 }
5403 }
5404 }
5405
5406 fn decrypt_packet(
5407 &mut self,
5408 now: Instant,
5409 packet: &mut Packet,
5410 ) -> Result<Option<u64>, Option<TransportError>> {
5411 let result = packet_crypto::decrypt_packet_body(
5412 packet,
5413 &self.spaces,
5414 self.zero_rtt_crypto.as_ref(),
5415 self.key_phase,
5416 self.prev_crypto.as_ref(),
5417 self.next_crypto.as_ref(),
5418 )?;
5419
5420 let result = match result {
5421 Some(r) => r,
5422 None => return Ok(None),
5423 };
5424
5425 if result.outgoing_key_update_acked {
5426 if let Some(prev) = self.prev_crypto.as_mut() {
5427 prev.end_packet = Some((result.number, now));
5428 self.set_key_discard_timer(now, packet.header.space());
5429 }
5430 }
5431
5432 if result.incoming_key_update {
5433 trace!("key update authenticated");
5434 self.update_keys(Some((result.number, now)), true);
5435 self.set_key_discard_timer(now, packet.header.space());
5436 }
5437
5438 Ok(Some(result.number))
5439 }
5440
5441 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5442 trace!("executing key update");
5443 let new = self
5447 .crypto
5448 .next_1rtt_keys()
5449 .expect("only called for `Data` packets");
5450 self.key_phase_size = new
5451 .local
5452 .confidentiality_limit()
5453 .saturating_sub(KEY_UPDATE_MARGIN);
5454 let old = mem::replace(
5455 &mut self.spaces[SpaceId::Data]
5456 .crypto
5457 .as_mut()
5458 .unwrap() .packet,
5460 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5461 );
5462 self.spaces[SpaceId::Data].sent_with_keys = 0;
5463 self.prev_crypto = Some(PrevCrypto {
5464 crypto: old,
5465 end_packet,
5466 update_unacked: remote,
5467 });
5468 self.key_phase = !self.key_phase;
5469 }
5470
5471 fn peer_supports_ack_frequency(&self) -> bool {
5472 self.peer_params.min_ack_delay.is_some()
5473 }
5474
5475 pub(crate) fn immediate_ack(&mut self) {
5480 self.spaces[self.highest_space].immediate_ack_pending = true;
5481 }
5482
5483 #[cfg(test)]
5485 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5486 let (first_decode, remaining) = match &event.0 {
5487 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5488 first_decode,
5489 remaining,
5490 ..
5491 }) => (first_decode, remaining),
5492 _ => return None,
5493 };
5494
5495 if remaining.is_some() {
5496 panic!("Packets should never be coalesced in tests");
5497 }
5498
5499 let decrypted_header = packet_crypto::unprotect_header(
5500 first_decode.clone(),
5501 &self.spaces,
5502 self.zero_rtt_crypto.as_ref(),
5503 self.peer_params.stateless_reset_token,
5504 )?;
5505
5506 let mut packet = decrypted_header.packet?;
5507 packet_crypto::decrypt_packet_body(
5508 &mut packet,
5509 &self.spaces,
5510 self.zero_rtt_crypto.as_ref(),
5511 self.key_phase,
5512 self.prev_crypto.as_ref(),
5513 self.next_crypto.as_ref(),
5514 )
5515 .ok()?;
5516
5517 Some(packet.payload.to_vec())
5518 }
5519
5520 #[cfg(test)]
5523 pub(crate) fn bytes_in_flight(&self) -> u64 {
5524 self.path.in_flight.bytes
5525 }
5526
5527 #[cfg(test)]
5529 pub(crate) fn congestion_window(&self) -> u64 {
5530 self.path
5531 .congestion
5532 .window()
5533 .saturating_sub(self.path.in_flight.bytes)
5534 }
5535
5536 #[cfg(test)]
5538 pub(crate) fn is_idle(&self) -> bool {
5539 Timer::VALUES
5540 .iter()
5541 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
5542 .filter_map(|&t| Some((t, self.timers.get(t)?)))
5543 .min_by_key(|&(_, time)| time)
5544 .is_none_or(|(timer, _)| timer == Timer::Idle)
5545 }
5546
5547 #[cfg(test)]
5549 pub(crate) fn lost_packets(&self) -> u64 {
5550 self.lost_packets
5551 }
5552
5553 #[cfg(test)]
5555 pub(crate) fn using_ecn(&self) -> bool {
5556 self.path.sending_ecn
5557 }
5558
5559 #[cfg(test)]
5561 pub(crate) fn total_recvd(&self) -> u64 {
5562 self.path.total_recvd
5563 }
5564
5565 #[cfg(test)]
5566 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
5567 self.local_cid_state.active_seq()
5568 }
5569
5570 #[cfg(test)]
5573 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
5574 let n = self.local_cid_state.assign_retire_seq(v);
5575 self.endpoint_events
5576 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
5577 }
5578
5579 #[cfg(test)]
5581 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
5582 self.rem_cids.active_seq()
5583 }
5584
5585 #[cfg(test)]
5587 pub(crate) fn path_mtu(&self) -> u16 {
5588 self.path.current_mtu()
5589 }
5590
5591 fn can_send_1rtt(&self, max_size: usize) -> bool {
5595 self.streams.can_send_stream_data()
5596 || self.path.challenge_pending
5597 || self
5598 .prev_path
5599 .as_ref()
5600 .is_some_and(|(_, x)| x.challenge_pending)
5601 || !self.path_responses.is_empty()
5602 || !self.nat_traversal_challenges.is_empty()
5603 || self
5604 .datagrams
5605 .outgoing
5606 .front()
5607 .is_some_and(|x| x.size(true) <= max_size)
5608 }
5609
5610 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
5612 for path in [&mut self.path]
5614 .into_iter()
5615 .chain(self.prev_path.as_mut().map(|(_, data)| data))
5616 {
5617 if path.remove_in_flight(pn, packet) {
5618 return;
5619 }
5620 }
5621 }
5622
5623 fn kill(&mut self, reason: ConnectionError) {
5625 self.close_common();
5626 self.error = Some(reason);
5627 self.state = State::Drained;
5628 self.endpoint_events.push_back(EndpointEventInner::Drained);
5629 }
5630
5631 fn generate_nat_traversal_challenges(&mut self, now: Instant) {
5633 let candidates: Vec<(VarInt, SocketAddr)> = if let Some(nat_state) = &self.nat_traversal {
5635 nat_state
5636 .get_validation_candidates()
5637 .into_iter()
5638 .take(3) .map(|(seq, candidate)| (seq, candidate.address))
5640 .collect()
5641 } else {
5642 return;
5643 };
5644
5645 if candidates.is_empty() {
5646 return;
5647 }
5648
5649 if let Some(nat_state) = &mut self.nat_traversal {
5651 for (seq, address) in candidates {
5652 let challenge: u64 = self.rng.r#gen();
5654
5655 if let Err(e) = nat_state.start_validation(seq, challenge, now) {
5657 debug!("Failed to start validation for candidate {}: {}", seq, e);
5658 continue;
5659 }
5660
5661 self.nat_traversal_challenges.push(address, challenge);
5663 trace!(
5664 "Queuing NAT validation PATH_CHALLENGE for {} with token {:08x}",
5665 address, challenge
5666 );
5667 }
5668 }
5669 }
5670
5671 pub fn current_mtu(&self) -> u16 {
5675 self.path.current_mtu()
5676 }
5677
5678 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
5685 let pn_len = match pn {
5686 Some(pn) => PacketNumber::new(
5687 pn,
5688 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
5689 )
5690 .len(),
5691 None => 4,
5693 };
5694
5695 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
5697 }
5698
5699 fn tag_len_1rtt(&self) -> usize {
5700 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
5701 Some(crypto) => Some(&*crypto.packet.local),
5702 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
5703 };
5704 key.map_or(16, |x| x.tag_len())
5708 }
5709
5710 fn on_path_validated(&mut self) {
5712 self.path.validated = true;
5713 let ConnectionSide::Server { server_config } = &self.side else {
5714 return;
5715 };
5716 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
5717 new_tokens.clear();
5718 for _ in 0..server_config.validation_token.sent {
5719 new_tokens.push(self.path.remote);
5720 }
5721 }
5722}
5723
5724impl fmt::Debug for Connection {
5725 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5726 f.debug_struct("Connection")
5727 .field("handshake_cid", &self.handshake_cid)
5728 .finish()
5729 }
5730}
5731
5732enum ConnectionSide {
5734 Client {
5735 token: Bytes,
5737 token_store: Arc<dyn TokenStore>,
5738 server_name: String,
5739 },
5740 Server {
5741 server_config: Arc<ServerConfig>,
5742 },
5743}
5744
5745impl ConnectionSide {
5746 fn remote_may_migrate(&self) -> bool {
5747 match self {
5748 Self::Server { server_config } => server_config.migration,
5749 Self::Client { .. } => false,
5750 }
5751 }
5752
5753 fn is_client(&self) -> bool {
5754 self.side().is_client()
5755 }
5756
5757 fn is_server(&self) -> bool {
5758 self.side().is_server()
5759 }
5760
5761 fn side(&self) -> Side {
5762 match *self {
5763 Self::Client { .. } => Side::Client,
5764 Self::Server { .. } => Side::Server,
5765 }
5766 }
5767}
5768
5769impl From<SideArgs> for ConnectionSide {
5770 fn from(side: SideArgs) -> Self {
5771 match side {
5772 SideArgs::Client {
5773 token_store,
5774 server_name,
5775 } => Self::Client {
5776 token: token_store.take(&server_name).unwrap_or_default(),
5777 token_store,
5778 server_name,
5779 },
5780 SideArgs::Server {
5781 server_config,
5782 pref_addr_cid: _,
5783 path_validated: _,
5784 } => Self::Server { server_config },
5785 }
5786 }
5787}
5788
5789pub(crate) enum SideArgs {
5791 Client {
5792 token_store: Arc<dyn TokenStore>,
5793 server_name: String,
5794 },
5795 Server {
5796 server_config: Arc<ServerConfig>,
5797 pref_addr_cid: Option<ConnectionId>,
5798 path_validated: bool,
5799 },
5800}
5801
5802impl SideArgs {
5803 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
5804 match *self {
5805 Self::Client { .. } => None,
5806 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
5807 }
5808 }
5809
5810 pub(crate) fn path_validated(&self) -> bool {
5811 match *self {
5812 Self::Client { .. } => true,
5813 Self::Server { path_validated, .. } => path_validated,
5814 }
5815 }
5816
5817 pub(crate) fn side(&self) -> Side {
5818 match *self {
5819 Self::Client { .. } => Side::Client,
5820 Self::Server { .. } => Side::Server,
5821 }
5822 }
5823}
5824
5825#[derive(Debug, Error, Clone, PartialEq, Eq)]
5827pub enum ConnectionError {
5828 #[error("peer doesn't implement any supported version")]
5830 VersionMismatch,
5831 #[error(transparent)]
5833 TransportError(#[from] TransportError),
5834 #[error("aborted by peer: {0}")]
5836 ConnectionClosed(frame::ConnectionClose),
5837 #[error("closed by peer: {0}")]
5839 ApplicationClosed(frame::ApplicationClose),
5840 #[error("reset by peer")]
5842 Reset,
5843 #[error("timed out")]
5849 TimedOut,
5850 #[error("closed")]
5852 LocallyClosed,
5853 #[error("CIDs exhausted")]
5857 CidsExhausted,
5858}
5859
5860impl From<Close> for ConnectionError {
5861 fn from(x: Close) -> Self {
5862 match x {
5863 Close::Connection(reason) => Self::ConnectionClosed(reason),
5864 Close::Application(reason) => Self::ApplicationClosed(reason),
5865 }
5866 }
5867}
5868
5869impl From<ConnectionError> for io::Error {
5871 fn from(x: ConnectionError) -> Self {
5872 use ConnectionError::*;
5873 let kind = match x {
5874 TimedOut => io::ErrorKind::TimedOut,
5875 Reset => io::ErrorKind::ConnectionReset,
5876 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
5877 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
5878 io::ErrorKind::Other
5879 }
5880 };
5881 Self::new(kind, x)
5882 }
5883}
5884
5885#[derive(Clone, Debug)]
5886pub enum State {
5888 Handshake(state::Handshake),
5890 Established,
5892 Closed(state::Closed),
5894 Draining,
5896 Drained,
5898}
5899
5900impl State {
5901 fn closed<R: Into<Close>>(reason: R) -> Self {
5902 Self::Closed(state::Closed {
5903 reason: reason.into(),
5904 })
5905 }
5906
5907 fn is_handshake(&self) -> bool {
5908 matches!(*self, Self::Handshake(_))
5909 }
5910
5911 fn is_established(&self) -> bool {
5912 matches!(*self, Self::Established)
5913 }
5914
5915 fn is_closed(&self) -> bool {
5916 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
5917 }
5918
5919 fn is_drained(&self) -> bool {
5920 matches!(*self, Self::Drained)
5921 }
5922}
5923
5924mod state {
5925 use super::*;
5926
5927 #[derive(Clone, Debug)]
5928 pub struct Handshake {
5929 pub(super) rem_cid_set: bool,
5933 pub(super) expected_token: Bytes,
5937 pub(super) client_hello: Option<Bytes>,
5941 }
5942
5943 #[derive(Clone, Debug)]
5944 pub struct Closed {
5945 pub(super) reason: Close,
5946 }
5947}
5948
5949#[derive(Debug)]
5951pub enum Event {
5952 HandshakeDataReady,
5954 Connected,
5956 ConnectionLost {
5960 reason: ConnectionError,
5962 },
5963 Stream(StreamEvent),
5965 DatagramReceived,
5967 DatagramsUnblocked,
5969}
5970
5971fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
5972 if x > y { x - y } else { Duration::ZERO }
5973}
5974
5975fn get_max_ack_delay(params: &TransportParameters) -> Duration {
5976 Duration::from_micros(params.max_ack_delay.0 * 1000)
5977}
5978
5979const MAX_BACKOFF_EXPONENT: u32 = 16;
5981
5982const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
5990
5991const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
5997 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
5998
5999const KEY_UPDATE_MARGIN: u64 = 10_000;
6003
6004#[derive(Default)]
6005struct SentFrames {
6006 retransmits: ThinRetransmits,
6007 largest_acked: Option<u64>,
6008 stream_frames: StreamMetaVec,
6009 non_retransmits: bool,
6011 requires_padding: bool,
6012}
6013
6014impl SentFrames {
6015 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6017 self.largest_acked.is_some()
6018 && !self.non_retransmits
6019 && self.stream_frames.is_empty()
6020 && self.retransmits.is_empty(streams)
6021 }
6022}
6023
6024fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6032 match (x, y) {
6033 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6034 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6035 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6036 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6037 }
6038}
6039
6040#[derive(Debug, Clone)]
6042pub(crate) struct PqcState {
6043 enabled: bool,
6045 algorithms: Option<crate::transport_parameters::PqcAlgorithms>,
6047 handshake_mtu: u16,
6049 using_pqc: bool,
6051 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler,
6053}
6054
6055impl PqcState {
6056 fn new() -> Self {
6057 Self {
6058 enabled: false,
6059 algorithms: None,
6060 handshake_mtu: MIN_INITIAL_SIZE,
6061 using_pqc: false,
6062 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler::new(),
6063 }
6064 }
6065
6066 fn min_initial_size(&self) -> u16 {
6068 if self.enabled && self.using_pqc {
6069 std::cmp::max(self.handshake_mtu, 4096)
6071 } else {
6072 MIN_INITIAL_SIZE
6073 }
6074 }
6075
6076 fn update_from_peer_params(&mut self, params: &TransportParameters) {
6078 if let Some(ref algorithms) = params.pqc_algorithms {
6079 self.enabled = true;
6080 self.algorithms = Some(algorithms.clone());
6081 if algorithms.ml_kem_768
6083 || algorithms.ml_dsa_65
6084 || algorithms.hybrid_x25519_ml_kem
6085 || algorithms.hybrid_ed25519_ml_dsa
6086 {
6087 self.using_pqc = true;
6088 self.handshake_mtu = 4096; }
6090 }
6091 }
6092
6093 fn detect_pqc_from_crypto(&mut self, crypto_data: &[u8], space: SpaceId) {
6095 if self.packet_handler.detect_pqc_handshake(crypto_data, space) {
6096 self.using_pqc = true;
6097 self.handshake_mtu = self.packet_handler.get_min_packet_size(space);
6099 }
6100 }
6101
6102 fn should_trigger_mtu_discovery(&mut self) -> bool {
6104 self.packet_handler.should_trigger_mtu_discovery()
6105 }
6106
6107 fn get_mtu_config(&self) -> MtuDiscoveryConfig {
6109 self.packet_handler.get_pqc_mtu_config()
6110 }
6111
6112 fn calculate_crypto_frame_size(&self, available_space: usize, remaining_data: usize) -> usize {
6114 self.packet_handler
6115 .calculate_crypto_frame_size(available_space, remaining_data)
6116 }
6117
6118 fn should_adjust_coalescing(&self, current_size: usize, space: SpaceId) -> bool {
6120 self.packet_handler
6121 .adjust_coalescing_for_pqc(current_size, space)
6122 }
6123
6124 fn on_packet_sent(&mut self, space: SpaceId, size: u16) {
6126 self.packet_handler.on_packet_sent(space, size);
6127 }
6128
6129 fn reset(&mut self) {
6131 self.enabled = false;
6132 self.algorithms = None;
6133 self.handshake_mtu = MIN_INITIAL_SIZE;
6134 self.using_pqc = false;
6135 self.packet_handler.reset();
6136 }
6137}
6138
6139#[cfg(feature = "pqc")]
6140impl Default for PqcState {
6141 fn default() -> Self {
6142 Self::new()
6143 }
6144}
6145
6146#[derive(Debug, Clone)]
6148pub(crate) struct AddressDiscoveryState {
6149 enabled: bool,
6151 max_observation_rate: u8,
6153 observe_all_paths: bool,
6155 path_addresses: std::collections::HashMap<u64, paths::PathAddressInfo>,
6157 rate_limiter: AddressObservationRateLimiter,
6159 observed_addresses: Vec<ObservedAddressEvent>,
6161 bootstrap_mode: bool,
6163 next_sequence_number: VarInt,
6165 last_received_sequence: std::collections::HashMap<u64, VarInt>,
6167}
6168
6169#[derive(Debug, Clone, PartialEq, Eq)]
6171struct ObservedAddressEvent {
6172 address: SocketAddr,
6174 received_at: Instant,
6176 path_id: u64,
6178}
6179
6180#[derive(Debug, Clone)]
6182struct AddressObservationRateLimiter {
6183 tokens: f64,
6185 max_tokens: f64,
6187 rate: f64,
6189 last_update: Instant,
6191}
6192
6193impl AddressDiscoveryState {
6194 fn new(config: &crate::transport_parameters::AddressDiscoveryConfig, now: Instant) -> Self {
6196 use crate::transport_parameters::AddressDiscoveryConfig::*;
6197
6198 let (enabled, _can_send, _can_receive) = match config {
6200 SendOnly => (true, true, false),
6201 ReceiveOnly => (true, false, true),
6202 SendAndReceive => (true, true, true),
6203 };
6204
6205 let max_observation_rate = 10u8; let observe_all_paths = false; Self {
6211 enabled,
6212 max_observation_rate,
6213 observe_all_paths,
6214 path_addresses: std::collections::HashMap::new(),
6215 rate_limiter: AddressObservationRateLimiter::new(max_observation_rate, now),
6216 observed_addresses: Vec::new(),
6217 bootstrap_mode: false,
6218 next_sequence_number: VarInt::from_u32(0),
6219 last_received_sequence: std::collections::HashMap::new(),
6220 }
6221 }
6222
6223 fn should_send_observation(&mut self, path_id: u64, now: Instant) -> bool {
6225 if !self.should_observe_path(path_id) {
6227 return false;
6228 }
6229
6230 let needs_observation = match self.path_addresses.get(&path_id) {
6232 Some(info) => info.observed_address.is_none() || !info.notified,
6233 None => true,
6234 };
6235
6236 if !needs_observation {
6237 return false;
6238 }
6239
6240 self.rate_limiter.try_consume(1.0, now)
6242 }
6243
6244 fn record_observation_sent(&mut self, path_id: u64) {
6246 if let Some(info) = self.path_addresses.get_mut(&path_id) {
6247 info.mark_notified();
6248 }
6249 }
6250
6251 fn handle_observed_address(&mut self, address: SocketAddr, path_id: u64, now: Instant) {
6253 if !self.enabled {
6254 return;
6255 }
6256
6257 self.observed_addresses.push(ObservedAddressEvent {
6258 address,
6259 received_at: now,
6260 path_id,
6261 });
6262
6263 let info = self
6265 .path_addresses
6266 .entry(path_id)
6267 .or_insert_with(paths::PathAddressInfo::new);
6268 info.update_observed_address(address, now);
6269 }
6270
6271 pub(crate) fn get_observed_address(&self, path_id: u64) -> Option<SocketAddr> {
6273 self.path_addresses
6274 .get(&path_id)
6275 .and_then(|info| info.observed_address)
6276 }
6277
6278 pub(crate) fn get_all_observed_addresses(&self) -> Vec<SocketAddr> {
6280 self.path_addresses
6281 .values()
6282 .filter_map(|info| info.observed_address)
6283 .collect()
6284 }
6285
6286 pub(crate) fn stats(&self) -> AddressDiscoveryStats {
6288 AddressDiscoveryStats {
6289 frames_sent: self.observed_addresses.len() as u64, frames_received: self.observed_addresses.len() as u64,
6291 addresses_discovered: self
6292 .path_addresses
6293 .values()
6294 .filter(|info| info.observed_address.is_some())
6295 .count() as u64,
6296 address_changes_detected: 0, }
6298 }
6299
6300 fn has_unnotified_changes(&self) -> bool {
6302 self.path_addresses
6303 .values()
6304 .any(|info| info.observed_address.is_some() && !info.notified)
6305 }
6306
6307 fn queue_observed_address_frame(
6309 &mut self,
6310 path_id: u64,
6311 address: SocketAddr,
6312 ) -> Option<frame::ObservedAddress> {
6313 if !self.enabled {
6315 return None;
6316 }
6317
6318 if !self.observe_all_paths && path_id != 0 {
6320 return None;
6321 }
6322
6323 if let Some(info) = self.path_addresses.get(&path_id) {
6325 if info.notified {
6326 return None;
6327 }
6328 }
6329
6330 if self.rate_limiter.tokens < 1.0 {
6332 return None;
6333 }
6334
6335 self.rate_limiter.tokens -= 1.0;
6337
6338 let info = self
6340 .path_addresses
6341 .entry(path_id)
6342 .or_insert_with(paths::PathAddressInfo::new);
6343 info.observed_address = Some(address);
6344 info.notified = true;
6345
6346 let sequence_number = self.next_sequence_number;
6348 self.next_sequence_number = VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6349 .expect("sequence number overflow");
6350
6351 Some(frame::ObservedAddress {
6352 sequence_number,
6353 address,
6354 })
6355 }
6356
6357 fn check_for_address_observations(
6359 &mut self,
6360 _current_path: u64,
6361 peer_supports_address_discovery: bool,
6362 now: Instant,
6363 ) -> Vec<frame::ObservedAddress> {
6364 let mut frames = Vec::new();
6365
6366 if !self.enabled || !peer_supports_address_discovery {
6368 return frames;
6369 }
6370
6371 self.rate_limiter.update_tokens(now);
6373
6374 let paths_to_notify: Vec<u64> = self
6376 .path_addresses
6377 .iter()
6378 .filter_map(|(&path_id, info)| {
6379 if info.observed_address.is_some() && !info.notified {
6380 Some(path_id)
6381 } else {
6382 None
6383 }
6384 })
6385 .collect();
6386
6387 for path_id in paths_to_notify {
6389 if !self.should_observe_path(path_id) {
6391 continue;
6392 }
6393
6394 if !self.bootstrap_mode && self.rate_limiter.tokens < 1.0 {
6396 break; }
6398
6399 if let Some(info) = self.path_addresses.get_mut(&path_id) {
6401 if let Some(address) = info.observed_address {
6402 if self.bootstrap_mode {
6404 self.rate_limiter.tokens -= 0.2; } else {
6406 self.rate_limiter.tokens -= 1.0;
6407 }
6408
6409 info.notified = true;
6411
6412 let sequence_number = self.next_sequence_number;
6414 self.next_sequence_number =
6415 VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6416 .expect("sequence number overflow");
6417
6418 frames.push(frame::ObservedAddress {
6419 sequence_number,
6420 address,
6421 });
6422 }
6423 }
6424 }
6425
6426 frames
6427 }
6428
6429 fn update_rate_limit(&mut self, new_rate: f64) {
6431 self.max_observation_rate = new_rate as u8;
6432 self.rate_limiter.set_rate(new_rate as u8);
6433 }
6434
6435 fn from_transport_params(params: &TransportParameters) -> Option<Self> {
6437 params
6438 .address_discovery
6439 .as_ref()
6440 .map(|config| Self::new(config, Instant::now()))
6441 }
6442
6443 #[cfg(test)]
6445 fn new_with_params(enabled: bool, max_rate: f64, observe_all_paths: bool) -> Self {
6446 if !enabled {
6448 return Self {
6450 enabled: false,
6451 max_observation_rate: max_rate as u8,
6452 observe_all_paths,
6453 path_addresses: std::collections::HashMap::new(),
6454 rate_limiter: AddressObservationRateLimiter::new(max_rate as u8, Instant::now()),
6455 observed_addresses: Vec::new(),
6456 bootstrap_mode: false,
6457 next_sequence_number: VarInt::from_u32(0),
6458 last_received_sequence: std::collections::HashMap::new(),
6459 };
6460 }
6461
6462 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6464 let mut state = Self::new(&config, Instant::now());
6465 state.max_observation_rate = max_rate as u8;
6466 state.observe_all_paths = observe_all_paths;
6467 state.rate_limiter = AddressObservationRateLimiter::new(max_rate as u8, Instant::now());
6468 state
6469 }
6470
6471 fn set_bootstrap_mode(&mut self, enabled: bool) {
6473 self.bootstrap_mode = enabled;
6474 if enabled {
6476 let bootstrap_rate = self.get_effective_rate_limit();
6477 self.rate_limiter.rate = bootstrap_rate;
6478 self.rate_limiter.max_tokens = bootstrap_rate * 2.0; self.rate_limiter.tokens = self.rate_limiter.max_tokens;
6481 }
6482 }
6483
6484 fn is_bootstrap_mode(&self) -> bool {
6486 self.bootstrap_mode
6487 }
6488
6489 fn get_effective_rate_limit(&self) -> f64 {
6491 if self.bootstrap_mode {
6492 (self.max_observation_rate as f64) * 5.0
6494 } else {
6495 self.max_observation_rate as f64
6496 }
6497 }
6498
6499 fn should_observe_path(&self, path_id: u64) -> bool {
6501 if !self.enabled {
6502 return false;
6503 }
6504
6505 if self.bootstrap_mode {
6507 return true;
6508 }
6509
6510 self.observe_all_paths || path_id == 0
6512 }
6513
6514 fn should_send_observation_immediately(&self, is_new_connection: bool) -> bool {
6516 self.bootstrap_mode && is_new_connection
6517 }
6518}
6519
6520impl AddressObservationRateLimiter {
6521 fn new(rate: u8, now: Instant) -> Self {
6523 let rate_f64 = rate as f64;
6524 Self {
6525 tokens: rate_f64,
6526 max_tokens: rate_f64,
6527 rate: rate_f64,
6528 last_update: now,
6529 }
6530 }
6531
6532 fn try_consume(&mut self, tokens: f64, now: Instant) -> bool {
6534 self.update_tokens(now);
6535
6536 if self.tokens >= tokens {
6537 self.tokens -= tokens;
6538 true
6539 } else {
6540 false
6541 }
6542 }
6543
6544 fn update_tokens(&mut self, now: Instant) {
6546 let elapsed = now.saturating_duration_since(self.last_update);
6547 let new_tokens = elapsed.as_secs_f64() * self.rate;
6548 self.tokens = (self.tokens + new_tokens).min(self.max_tokens);
6549 self.last_update = now;
6550 }
6551
6552 fn set_rate(&mut self, rate: u8) {
6554 let rate_f64 = rate as f64;
6555 self.rate = rate_f64;
6556 self.max_tokens = rate_f64;
6557 if self.tokens > self.max_tokens {
6559 self.tokens = self.max_tokens;
6560 }
6561 }
6562}
6563
6564#[cfg(test)]
6565mod tests {
6566 use super::*;
6567 use crate::transport_parameters::AddressDiscoveryConfig;
6568 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
6569
6570 #[test]
6571 fn address_discovery_state_new() {
6572 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6573 let now = Instant::now();
6574 let state = AddressDiscoveryState::new(&config, now);
6575
6576 assert!(state.enabled);
6577 assert_eq!(state.max_observation_rate, 10);
6578 assert!(!state.observe_all_paths);
6579 assert!(state.path_addresses.is_empty());
6580 assert!(state.observed_addresses.is_empty());
6581 assert_eq!(state.rate_limiter.tokens, 10.0);
6582 }
6583
6584 #[test]
6585 fn address_discovery_state_disabled() {
6586 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6587 let now = Instant::now();
6588 let mut state = AddressDiscoveryState::new(&config, now);
6589
6590 state.enabled = false;
6592
6593 assert!(!state.should_send_observation(0, now));
6595 }
6596
6597 #[test]
6598 fn address_discovery_state_should_send_observation() {
6599 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6600 let now = Instant::now();
6601 let mut state = AddressDiscoveryState::new(&config, now);
6602
6603 assert!(state.should_send_observation(0, now));
6605
6606 let mut path_info = paths::PathAddressInfo::new();
6608 path_info.update_observed_address(
6609 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6610 now,
6611 );
6612 path_info.mark_notified();
6613 state.path_addresses.insert(0, path_info);
6614
6615 assert!(!state.should_send_observation(0, now));
6617
6618 assert!(!state.should_send_observation(1, now));
6620 }
6621
6622 #[test]
6623 fn address_discovery_state_rate_limiting() {
6624 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6625 let now = Instant::now();
6626 let mut state = AddressDiscoveryState::new(&config, now);
6627
6628 state.observe_all_paths = true;
6630
6631 assert!(state.should_send_observation(0, now));
6633
6634 state.rate_limiter.try_consume(9.0, now); assert!(!state.should_send_observation(0, now));
6639
6640 let later = now + Duration::from_secs(1);
6642 state.rate_limiter.update_tokens(later);
6643 assert!(state.should_send_observation(0, later));
6644 }
6645
6646 #[test]
6647 fn address_discovery_state_handle_observed_address() {
6648 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6649 let now = Instant::now();
6650 let mut state = AddressDiscoveryState::new(&config, now);
6651
6652 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6653 let addr2 = SocketAddr::new(
6654 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
6655 8080,
6656 );
6657
6658 state.handle_observed_address(addr1, 0, now);
6660 assert_eq!(state.observed_addresses.len(), 1);
6661 assert_eq!(state.observed_addresses[0].address, addr1);
6662 assert_eq!(state.observed_addresses[0].path_id, 0);
6663
6664 let later = now + Duration::from_millis(100);
6666 state.handle_observed_address(addr2, 1, later);
6667 assert_eq!(state.observed_addresses.len(), 2);
6668 assert_eq!(state.observed_addresses[1].address, addr2);
6669 assert_eq!(state.observed_addresses[1].path_id, 1);
6670 }
6671
6672 #[test]
6673 fn address_discovery_state_get_observed_address() {
6674 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6675 let now = Instant::now();
6676 let mut state = AddressDiscoveryState::new(&config, now);
6677
6678 assert_eq!(state.get_observed_address(0), None);
6680
6681 let mut path_info = paths::PathAddressInfo::new();
6683 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80);
6684 path_info.update_observed_address(addr, now);
6685 state.path_addresses.insert(0, path_info);
6686
6687 assert_eq!(state.get_observed_address(0), Some(addr));
6689 assert_eq!(state.get_observed_address(1), None);
6690 }
6691
6692 #[test]
6693 fn address_discovery_state_unnotified_changes() {
6694 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6695 let now = Instant::now();
6696 let mut state = AddressDiscoveryState::new(&config, now);
6697
6698 assert!(!state.has_unnotified_changes());
6700
6701 let mut path_info = paths::PathAddressInfo::new();
6703 path_info.update_observed_address(
6704 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6705 now,
6706 );
6707 state.path_addresses.insert(0, path_info);
6708
6709 assert!(state.has_unnotified_changes());
6711
6712 state.record_observation_sent(0);
6714 assert!(!state.has_unnotified_changes());
6715 }
6716
6717 #[test]
6718 fn address_observation_rate_limiter_token_bucket() {
6719 let now = Instant::now();
6720 let mut limiter = AddressObservationRateLimiter::new(5, now); assert_eq!(limiter.tokens, 5.0);
6724 assert_eq!(limiter.max_tokens, 5.0);
6725 assert_eq!(limiter.rate, 5.0);
6726
6727 assert!(limiter.try_consume(3.0, now));
6729 assert_eq!(limiter.tokens, 2.0);
6730
6731 assert!(!limiter.try_consume(3.0, now));
6733 assert_eq!(limiter.tokens, 2.0);
6734
6735 let later = now + Duration::from_secs(1);
6737 limiter.update_tokens(later);
6738 assert_eq!(limiter.tokens, 5.0); let half_sec = now + Duration::from_millis(500);
6742 let mut limiter2 = AddressObservationRateLimiter::new(5, now);
6743 limiter2.try_consume(3.0, now);
6744 limiter2.update_tokens(half_sec);
6745 assert_eq!(limiter2.tokens, 4.5); }
6747
6748 #[test]
6750 fn connection_initializes_address_discovery_state_default() {
6751 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6754 let state = AddressDiscoveryState::new(&config, Instant::now());
6755 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
6758 }
6759
6760 #[test]
6761 fn connection_initializes_with_address_discovery_enabled() {
6762 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6764 let state = AddressDiscoveryState::new(&config, Instant::now());
6765 assert!(state.enabled);
6766 assert_eq!(state.max_observation_rate, 10);
6767 assert!(!state.observe_all_paths);
6768 }
6769
6770 #[test]
6771 fn connection_address_discovery_enabled_by_default() {
6772 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6774 let state = AddressDiscoveryState::new(&config, Instant::now());
6775 assert!(state.enabled); }
6777
6778 #[test]
6779 fn negotiate_max_idle_timeout_commutative() {
6780 let test_params = [
6781 (None, None, None),
6782 (None, Some(VarInt(0)), None),
6783 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6784 (Some(VarInt(0)), Some(VarInt(0)), None),
6785 (
6786 Some(VarInt(2)),
6787 Some(VarInt(0)),
6788 Some(Duration::from_millis(2)),
6789 ),
6790 (
6791 Some(VarInt(1)),
6792 Some(VarInt(4)),
6793 Some(Duration::from_millis(1)),
6794 ),
6795 ];
6796
6797 for (left, right, result) in test_params {
6798 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6799 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6800 }
6801 }
6802
6803 #[test]
6804 fn path_creation_initializes_address_discovery() {
6805 let config = TransportConfig::default();
6806 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6807 let now = Instant::now();
6808
6809 let path = paths::PathData::new(remote, false, None, now, &config);
6811
6812 assert!(path.address_info.observed_address.is_none());
6814 assert!(path.address_info.last_observed.is_none());
6815 assert_eq!(path.address_info.observation_count, 0);
6816 assert!(!path.address_info.notified);
6817
6818 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6820 assert_eq!(path.observation_rate_limiter.max_tokens, 10.0);
6821 assert_eq!(path.observation_rate_limiter.tokens, 10.0);
6822 }
6823
6824 #[test]
6825 fn path_migration_resets_address_discovery() {
6826 let config = TransportConfig::default();
6827 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6828 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6829 let now = Instant::now();
6830
6831 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
6833 path1.update_observed_address(remote1, now);
6834 path1.mark_address_notified();
6835 path1.consume_observation_token(now);
6836 path1.set_observation_rate(20);
6837
6838 let path2 = paths::PathData::from_previous(remote2, &path1, now);
6840
6841 assert!(path2.address_info.observed_address.is_none());
6843 assert!(path2.address_info.last_observed.is_none());
6844 assert_eq!(path2.address_info.observation_count, 0);
6845 assert!(!path2.address_info.notified);
6846
6847 assert_eq!(path2.observation_rate_limiter.rate, 20.0);
6849 assert_eq!(path2.observation_rate_limiter.tokens, 20.0);
6850 }
6851
6852 #[test]
6853 fn connection_path_updates_observation_rate() {
6854 let config = TransportConfig::default();
6855 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 42);
6856 let now = Instant::now();
6857
6858 let mut path = paths::PathData::new(remote, false, None, now, &config);
6859
6860 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6862
6863 path.set_observation_rate(25);
6865 assert_eq!(path.observation_rate_limiter.rate, 25.0);
6866 assert_eq!(path.observation_rate_limiter.max_tokens, 25.0);
6867
6868 path.observation_rate_limiter.tokens = 30.0; path.set_observation_rate(20);
6871 assert_eq!(path.observation_rate_limiter.tokens, 20.0); }
6873
6874 #[test]
6875 fn path_validation_preserves_discovery_state() {
6876 let config = TransportConfig::default();
6877 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6878 let now = Instant::now();
6879
6880 let mut path = paths::PathData::new(remote, false, None, now, &config);
6881
6882 let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 5678);
6884 path.update_observed_address(observed, now);
6885 path.set_observation_rate(15);
6886
6887 path.validated = true;
6889
6890 assert_eq!(path.address_info.observed_address, Some(observed));
6892 assert_eq!(path.observation_rate_limiter.rate, 15.0);
6893 }
6894
6895 #[test]
6896 fn address_discovery_state_initialization() {
6897 let state = AddressDiscoveryState::new_with_params(true, 30.0, true);
6899
6900 assert!(state.enabled);
6901 assert_eq!(state.max_observation_rate, 30);
6902 assert!(state.observe_all_paths);
6903 assert!(state.path_addresses.is_empty());
6904 assert!(state.observed_addresses.is_empty());
6905 }
6906
6907 #[test]
6909 fn handle_observed_address_frame_basic() {
6910 let config = AddressDiscoveryConfig::SendAndReceive;
6911 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6912 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6913 let now = Instant::now();
6914 let path_id = 0;
6915
6916 state.handle_observed_address(addr, path_id, now);
6918
6919 assert_eq!(state.observed_addresses.len(), 1);
6921 assert_eq!(state.observed_addresses[0].address, addr);
6922 assert_eq!(state.observed_addresses[0].path_id, path_id);
6923 assert_eq!(state.observed_addresses[0].received_at, now);
6924
6925 assert!(state.path_addresses.contains_key(&path_id));
6927 let path_info = &state.path_addresses[&path_id];
6928 assert_eq!(path_info.observed_address, Some(addr));
6929 assert_eq!(path_info.last_observed, Some(now));
6930 assert_eq!(path_info.observation_count, 1);
6931 }
6932
6933 #[test]
6934 fn handle_observed_address_frame_multiple_observations() {
6935 let config = AddressDiscoveryConfig::SendAndReceive;
6936 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6937 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6938 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6939 let now = Instant::now();
6940 let path_id = 0;
6941
6942 state.handle_observed_address(addr1, path_id, now);
6944 state.handle_observed_address(addr1, path_id, now + Duration::from_secs(1));
6945 state.handle_observed_address(addr2, path_id, now + Duration::from_secs(2));
6946
6947 assert_eq!(state.observed_addresses.len(), 3);
6949
6950 let path_info = &state.path_addresses[&path_id];
6952 assert_eq!(path_info.observed_address, Some(addr2));
6953 assert_eq!(path_info.observation_count, 1); }
6955
6956 #[test]
6957 fn handle_observed_address_frame_disabled() {
6958 let config = AddressDiscoveryConfig::SendAndReceive;
6959 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6960 state.enabled = false; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6962 let now = Instant::now();
6963
6964 state.handle_observed_address(addr, 0, now);
6966
6967 assert!(state.observed_addresses.is_empty());
6969 assert!(state.path_addresses.is_empty());
6970 }
6971
6972 #[test]
6973 fn should_send_observation_basic() {
6974 let config = AddressDiscoveryConfig::SendAndReceive;
6975 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6976 state.max_observation_rate = 10;
6977 let now = Instant::now();
6978 let path_id = 0;
6979
6980 assert!(state.should_send_observation(path_id, now));
6982
6983 state.record_observation_sent(path_id);
6985
6986 assert!(state.should_send_observation(path_id, now));
6988 }
6989
6990 #[test]
6991 fn should_send_observation_rate_limiting() {
6992 let config = AddressDiscoveryConfig::SendAndReceive;
6993 let now = Instant::now();
6994 let mut state = AddressDiscoveryState::new(&config, now);
6995 state.max_observation_rate = 2; state.update_rate_limit(2.0);
6997 let path_id = 0;
6998
6999 assert!(state.should_send_observation(path_id, now));
7001 state.record_observation_sent(path_id);
7002 assert!(state.should_send_observation(path_id, now));
7003 state.record_observation_sent(path_id);
7004
7005 assert!(!state.should_send_observation(path_id, now));
7007
7008 let later = now + Duration::from_secs(1);
7010 assert!(state.should_send_observation(path_id, later));
7011 }
7012
7013 #[test]
7014 fn should_send_observation_disabled() {
7015 let config = AddressDiscoveryConfig::SendAndReceive;
7016 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7017 state.enabled = false;
7018
7019 assert!(!state.should_send_observation(0, Instant::now()));
7021 }
7022
7023 #[test]
7024 fn should_send_observation_per_path() {
7025 let config = AddressDiscoveryConfig::SendAndReceive;
7026 let now = Instant::now();
7027 let mut state = AddressDiscoveryState::new(&config, now);
7028 state.max_observation_rate = 2; state.observe_all_paths = true;
7030 state.update_rate_limit(2.0);
7031
7032 assert!(state.should_send_observation(0, now));
7034 state.record_observation_sent(0);
7035
7036 assert!(state.should_send_observation(1, now));
7038 state.record_observation_sent(1);
7039
7040 assert!(!state.should_send_observation(0, now));
7042 assert!(!state.should_send_observation(1, now));
7043
7044 let later = now + Duration::from_secs(1);
7046 assert!(state.should_send_observation(0, later));
7047 }
7048
7049 #[test]
7050 fn has_unnotified_changes_test() {
7051 let config = AddressDiscoveryConfig::SendAndReceive;
7052 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7053 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7054 let now = Instant::now();
7055
7056 assert!(!state.has_unnotified_changes());
7058
7059 state.handle_observed_address(addr, 0, now);
7061 assert!(state.has_unnotified_changes());
7062
7063 state.path_addresses.get_mut(&0).unwrap().notified = true;
7065 assert!(!state.has_unnotified_changes());
7066 }
7067
7068 #[test]
7069 fn get_observed_address_test() {
7070 let config = AddressDiscoveryConfig::SendAndReceive;
7071 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7072 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7073 let now = Instant::now();
7074 let path_id = 0;
7075
7076 assert_eq!(state.get_observed_address(path_id), None);
7078
7079 state.handle_observed_address(addr, path_id, now);
7081 assert_eq!(state.get_observed_address(path_id), Some(addr));
7082
7083 assert_eq!(state.get_observed_address(999), None);
7085 }
7086
7087 #[test]
7089 fn rate_limiter_token_bucket_basic() {
7090 let now = Instant::now();
7091 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(5.0, now));
7095 assert!(limiter.try_consume(5.0, now));
7096
7097 assert!(!limiter.try_consume(1.0, now));
7099 }
7100
7101 #[test]
7102 fn rate_limiter_token_replenishment() {
7103 let now = Instant::now();
7104 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(10.0, now));
7108 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7112 assert!(limiter.try_consume(10.0, later)); assert!(!limiter.try_consume(0.1, later)); let later = later + Duration::from_millis(500);
7117 assert!(limiter.try_consume(5.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7120
7121 #[test]
7122 fn rate_limiter_max_tokens_cap() {
7123 let now = Instant::now();
7124 let mut limiter = AddressObservationRateLimiter::new(10, now);
7125
7126 let later = now + Duration::from_secs(2);
7128 assert!(limiter.try_consume(10.0, later));
7130 assert!(!limiter.try_consume(10.1, later)); let later2 = later + Duration::from_secs(1);
7134 assert!(limiter.try_consume(3.0, later2));
7135
7136 let much_later = later2 + Duration::from_secs(2);
7138 assert!(limiter.try_consume(10.0, much_later)); assert!(!limiter.try_consume(0.1, much_later)); }
7141
7142 #[test]
7143 fn rate_limiter_fractional_consumption() {
7144 let now = Instant::now();
7145 let mut limiter = AddressObservationRateLimiter::new(10, now);
7146
7147 assert!(limiter.try_consume(0.5, now));
7149 assert!(limiter.try_consume(2.3, now));
7150 assert!(limiter.try_consume(7.2, now)); assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_millis(100); assert!(limiter.try_consume(1.0, later));
7156 assert!(!limiter.try_consume(0.1, later));
7157 }
7158
7159 #[test]
7160 fn rate_limiter_zero_rate() {
7161 let now = Instant::now();
7162 let mut limiter = AddressObservationRateLimiter::new(0, now); assert!(!limiter.try_consume(1.0, now));
7166 assert!(!limiter.try_consume(0.1, now));
7167 assert!(!limiter.try_consume(0.001, now));
7168
7169 let later = now + Duration::from_secs(10);
7171 assert!(!limiter.try_consume(0.001, later));
7172 }
7173
7174 #[test]
7175 fn rate_limiter_high_rate() {
7176 let now = Instant::now();
7177 let mut limiter = AddressObservationRateLimiter::new(63, now); assert!(limiter.try_consume(60.0, now));
7181 assert!(limiter.try_consume(3.0, now));
7182 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7186 assert!(limiter.try_consume(63.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7189
7190 #[test]
7191 fn rate_limiter_time_precision() {
7192 let now = Instant::now();
7193 let mut limiter = AddressObservationRateLimiter::new(100, now); assert!(limiter.try_consume(100.0, now));
7197 assert!(!limiter.try_consume(0.1, now));
7198
7199 let later = now + Duration::from_millis(10);
7201 assert!(limiter.try_consume(0.8, later)); assert!(!limiter.try_consume(0.5, later)); let much_later = later + Duration::from_millis(100); assert!(limiter.try_consume(5.0, much_later)); limiter.tokens = 0.0; let final_time = much_later + Duration::from_millis(1);
7213 limiter.update_tokens(final_time); assert!(limiter.tokens >= 0.09 && limiter.tokens <= 0.11);
7218 }
7219
7220 #[test]
7221 fn per_path_rate_limiting_independent() {
7222 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7223 let now = Instant::now();
7224 let mut state = AddressDiscoveryState::new(&config, now);
7225
7226 state.observe_all_paths = true;
7228
7229 state.update_rate_limit(5.0);
7231
7232 state
7234 .path_addresses
7235 .insert(0, paths::PathAddressInfo::new());
7236 state
7237 .path_addresses
7238 .insert(1, paths::PathAddressInfo::new());
7239 state
7240 .path_addresses
7241 .insert(2, paths::PathAddressInfo::new());
7242
7243 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7245 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7246 8080,
7247 ));
7248 state.path_addresses.get_mut(&1).unwrap().observed_address = Some(SocketAddr::new(
7249 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
7250 8081,
7251 ));
7252 state.path_addresses.get_mut(&2).unwrap().observed_address = Some(SocketAddr::new(
7253 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)),
7254 8082,
7255 ));
7256
7257 for _ in 0..3 {
7259 assert!(state.should_send_observation(0, now));
7260 state.record_observation_sent(0);
7261 state.path_addresses.get_mut(&0).unwrap().notified = false;
7263 }
7264
7265 for _ in 0..2 {
7267 assert!(state.should_send_observation(1, now));
7268 state.record_observation_sent(1);
7269 state.path_addresses.get_mut(&1).unwrap().notified = false;
7271 }
7272
7273 assert!(!state.should_send_observation(2, now));
7275
7276 let later = now + Duration::from_secs(1);
7278
7279 assert!(state.should_send_observation(0, later));
7281 assert!(state.should_send_observation(1, later));
7282 assert!(state.should_send_observation(2, later));
7283 }
7284
7285 #[test]
7286 fn per_path_rate_limiting_with_path_specific_limits() {
7287 let now = Instant::now();
7288 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7289 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7290 let config = TransportConfig::default();
7291
7292 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
7294 let mut path2 = paths::PathData::new(remote2, false, None, now, &config);
7295
7296 path1.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now); path2.observation_rate_limiter = paths::PathObservationRateLimiter::new(5, now); for _ in 0..10 {
7302 assert!(path1.observation_rate_limiter.can_send(now));
7303 path1.observation_rate_limiter.consume_token(now);
7304 }
7305 assert!(!path1.observation_rate_limiter.can_send(now));
7306
7307 for _ in 0..5 {
7309 assert!(path2.observation_rate_limiter.can_send(now));
7310 path2.observation_rate_limiter.consume_token(now);
7311 }
7312 assert!(!path2.observation_rate_limiter.can_send(now));
7313 }
7314
7315 #[test]
7316 fn per_path_rate_limiting_address_change_detection() {
7317 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7318 let now = Instant::now();
7319 let mut state = AddressDiscoveryState::new(&config, now);
7320
7321 let path_id = 0;
7323 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7324 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8080);
7325
7326 assert!(state.should_send_observation(path_id, now));
7328 state.handle_observed_address(addr1, path_id, now);
7329 state.record_observation_sent(path_id);
7330
7331 assert!(!state.should_send_observation(path_id, now));
7333
7334 state.handle_observed_address(addr2, path_id, now);
7336 if let Some(info) = state.path_addresses.get_mut(&path_id) {
7337 info.notified = false; }
7339
7340 assert!(state.should_send_observation(path_id, now));
7342 }
7343
7344 #[test]
7345 fn per_path_rate_limiting_migration() {
7346 let now = Instant::now();
7347 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7348 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7349 let config = TransportConfig::default();
7350
7351 let mut path = paths::PathData::new(remote1, false, None, now, &config);
7353 path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7354
7355 for _ in 0..5 {
7357 assert!(path.observation_rate_limiter.can_send(now));
7358 path.observation_rate_limiter.consume_token(now);
7359 }
7360
7361 let mut new_path = paths::PathData::new(remote2, false, None, now, &config);
7363
7364 new_path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7367
7368 for _ in 0..10 {
7370 assert!(new_path.observation_rate_limiter.can_send(now));
7371 new_path.observation_rate_limiter.consume_token(now);
7372 }
7373 assert!(!new_path.observation_rate_limiter.can_send(now));
7374 }
7375
7376 #[test]
7377 fn per_path_rate_limiting_disabled_paths() {
7378 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7379 let now = Instant::now();
7380 let mut state = AddressDiscoveryState::new(&config, now);
7381
7382 assert!(state.should_send_observation(0, now));
7384
7385 assert!(!state.should_send_observation(1, now));
7387 assert!(!state.should_send_observation(2, now));
7388
7389 let later = now + Duration::from_secs(1);
7391 assert!(!state.should_send_observation(1, later));
7392 }
7393
7394 #[test]
7395 fn respecting_negotiated_max_observation_rate_basic() {
7396 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7397 let now = Instant::now();
7398 let mut state = AddressDiscoveryState::new(&config, now);
7399
7400 state.max_observation_rate = 10; state.rate_limiter = AddressObservationRateLimiter::new(10, now);
7403
7404 for _ in 0..10 {
7406 assert!(state.should_send_observation(0, now));
7407 }
7408 assert!(!state.should_send_observation(0, now));
7410 }
7411
7412 #[test]
7413 fn respecting_negotiated_max_observation_rate_zero() {
7414 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7415 let now = Instant::now();
7416 let mut state = AddressDiscoveryState::new(&config, now);
7417
7418 state.max_observation_rate = 0;
7420 state.rate_limiter = AddressObservationRateLimiter::new(0, now);
7421
7422 assert!(!state.should_send_observation(0, now));
7424 assert!(!state.should_send_observation(1, now));
7425
7426 let later = now + Duration::from_secs(10);
7428 assert!(!state.should_send_observation(0, later));
7429 }
7430
7431 #[test]
7432 fn respecting_negotiated_max_observation_rate_higher() {
7433 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7434 let now = Instant::now();
7435 let mut state = AddressDiscoveryState::new(&config, now);
7436
7437 state
7439 .path_addresses
7440 .insert(0, paths::PathAddressInfo::new());
7441 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7442 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7443 8080,
7444 ));
7445
7446 state.update_rate_limit(5.0);
7448
7449 state.max_observation_rate = 20; for _ in 0..5 {
7454 assert!(state.should_send_observation(0, now));
7455 state.record_observation_sent(0);
7456 state.path_addresses.get_mut(&0).unwrap().notified = false;
7458 }
7459 assert!(!state.should_send_observation(0, now));
7461 }
7462
7463 #[test]
7464 fn respecting_negotiated_max_observation_rate_dynamic_update() {
7465 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7466 let now = Instant::now();
7467 let mut state = AddressDiscoveryState::new(&config, now);
7468
7469 state
7471 .path_addresses
7472 .insert(0, paths::PathAddressInfo::new());
7473 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7474 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7475 8080,
7476 ));
7477
7478 for _ in 0..5 {
7480 assert!(state.should_send_observation(0, now));
7481 state.record_observation_sent(0);
7482 state.path_addresses.get_mut(&0).unwrap().notified = false;
7484 }
7485
7486 state.max_observation_rate = 3;
7490 state.rate_limiter.set_rate(3);
7491
7492 for _ in 0..3 {
7495 assert!(state.should_send_observation(0, now));
7496 state.record_observation_sent(0);
7497 state.path_addresses.get_mut(&0).unwrap().notified = false;
7499 }
7500
7501 assert!(!state.should_send_observation(0, now));
7503
7504 let later = now + Duration::from_secs(1);
7506 for _ in 0..3 {
7507 assert!(state.should_send_observation(0, later));
7508 state.record_observation_sent(0);
7509 state.path_addresses.get_mut(&0).unwrap().notified = false;
7511 }
7512
7513 assert!(!state.should_send_observation(0, later));
7515 }
7516
7517 #[test]
7518 fn respecting_negotiated_max_observation_rate_with_paths() {
7519 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7520 let now = Instant::now();
7521 let mut state = AddressDiscoveryState::new(&config, now);
7522
7523 state.observe_all_paths = true;
7525
7526 for i in 0..3 {
7528 state
7529 .path_addresses
7530 .insert(i, paths::PathAddressInfo::new());
7531 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
7532 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100 + i as u8)),
7533 5000,
7534 ));
7535 }
7536
7537 for _ in 0..3 {
7540 for i in 0..3 {
7542 if state.should_send_observation(i, now) {
7543 state.record_observation_sent(i);
7544 state.path_addresses.get_mut(&i).unwrap().notified = false;
7546 }
7547 }
7548 }
7549
7550 assert!(state.should_send_observation(0, now));
7553 state.record_observation_sent(0);
7554
7555 assert!(!state.should_send_observation(0, now));
7557 assert!(!state.should_send_observation(1, now));
7558 assert!(!state.should_send_observation(2, now));
7559 }
7560
7561 #[test]
7562 fn queue_observed_address_frame_basic() {
7563 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7564 let now = Instant::now();
7565 let mut state = AddressDiscoveryState::new(&config, now);
7566
7567 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7569 let frame = state.queue_observed_address_frame(0, address);
7570
7571 assert!(frame.is_some());
7573 let frame = frame.unwrap();
7574 assert_eq!(frame.address, address);
7575
7576 assert!(state.path_addresses.contains_key(&0));
7578 assert!(state.path_addresses.get(&0).unwrap().notified);
7579 }
7580
7581 #[test]
7582 fn queue_observed_address_frame_rate_limited() {
7583 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7584 let now = Instant::now();
7585 let mut state = AddressDiscoveryState::new(&config, now);
7586
7587 state.observe_all_paths = true;
7589
7590 let mut addresses = Vec::new();
7592 for i in 0..10 {
7593 let addr = SocketAddr::new(
7594 IpAddr::V4(Ipv4Addr::new(192, 168, 1, i as u8)),
7595 5000 + i as u16,
7596 );
7597 addresses.push(addr);
7598 assert!(
7599 state.queue_observed_address_frame(i as u64, addr).is_some(),
7600 "Frame {} should be allowed",
7601 i + 1
7602 );
7603 }
7604
7605 let addr11 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 11)), 5011);
7607 assert!(
7608 state.queue_observed_address_frame(10, addr11).is_none(),
7609 "11th frame should be rate limited"
7610 );
7611 }
7612
7613 #[test]
7614 fn queue_observed_address_frame_disabled() {
7615 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7616 let now = Instant::now();
7617 let mut state = AddressDiscoveryState::new(&config, now);
7618
7619 state.enabled = false;
7621
7622 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7623
7624 assert!(state.queue_observed_address_frame(0, address).is_none());
7626 }
7627
7628 #[test]
7629 fn queue_observed_address_frame_already_notified() {
7630 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7631 let now = Instant::now();
7632 let mut state = AddressDiscoveryState::new(&config, now);
7633
7634 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7635
7636 assert!(state.queue_observed_address_frame(0, address).is_some());
7638
7639 assert!(state.queue_observed_address_frame(0, address).is_none());
7641
7642 let new_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7644 assert!(state.queue_observed_address_frame(0, new_address).is_none());
7645 }
7646
7647 #[test]
7648 fn queue_observed_address_frame_primary_path_only() {
7649 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7650 let now = Instant::now();
7651 let mut state = AddressDiscoveryState::new(&config, now);
7652
7653 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7654
7655 assert!(state.queue_observed_address_frame(0, address).is_some());
7657
7658 assert!(state.queue_observed_address_frame(1, address).is_none());
7660 assert!(state.queue_observed_address_frame(2, address).is_none());
7661 }
7662
7663 #[test]
7664 fn queue_observed_address_frame_updates_path_info() {
7665 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7666 let now = Instant::now();
7667 let mut state = AddressDiscoveryState::new(&config, now);
7668
7669 let address = SocketAddr::new(
7670 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
7671 5000,
7672 );
7673
7674 let frame = state.queue_observed_address_frame(0, address);
7676 assert!(frame.is_some());
7677
7678 let path_info = state.path_addresses.get(&0).unwrap();
7680 assert_eq!(path_info.observed_address, Some(address));
7681 assert!(path_info.notified);
7682
7683 assert_eq!(state.observed_addresses.len(), 0);
7686 }
7687
7688 #[test]
7689 fn retransmits_includes_observed_addresses() {
7690 use crate::connection::spaces::Retransmits;
7691
7692 let mut retransmits = Retransmits::default();
7694
7695 assert!(retransmits.observed_addresses.is_empty());
7697
7698 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7700 let frame = frame::ObservedAddress {
7701 sequence_number: VarInt::from_u32(1),
7702 address,
7703 };
7704 retransmits.observed_addresses.push(frame);
7705
7706 assert_eq!(retransmits.observed_addresses.len(), 1);
7708 assert_eq!(retransmits.observed_addresses[0].address, address);
7709 }
7710
7711 #[test]
7712 fn check_for_address_observations_no_peer_support() {
7713 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7714 let now = Instant::now();
7715 let mut state = AddressDiscoveryState::new(&config, now);
7716
7717 state
7719 .path_addresses
7720 .insert(0, paths::PathAddressInfo::new());
7721 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7722 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)),
7723 5000,
7724 ));
7725
7726 let frames = state.check_for_address_observations(0, false, now);
7728
7729 assert!(frames.is_empty());
7731 }
7732
7733 #[test]
7734 fn check_for_address_observations_with_peer_support() {
7735 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7736 let now = Instant::now();
7737 let mut state = AddressDiscoveryState::new(&config, now);
7738
7739 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7741 state
7742 .path_addresses
7743 .insert(0, paths::PathAddressInfo::new());
7744 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(address);
7745
7746 let frames = state.check_for_address_observations(0, true, now);
7748
7749 assert_eq!(frames.len(), 1);
7751 assert_eq!(frames[0].address, address);
7752
7753 assert!(state.path_addresses.get(&0).unwrap().notified);
7755 }
7756
7757 #[test]
7758 fn check_for_address_observations_rate_limited() {
7759 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7760 let now = Instant::now();
7761 let mut state = AddressDiscoveryState::new(&config, now);
7762
7763 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7765 state
7766 .path_addresses
7767 .insert(0, paths::PathAddressInfo::new());
7768 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(address);
7769
7770 for _ in 0..10 {
7772 let frames = state.check_for_address_observations(0, true, now);
7773 if frames.is_empty() {
7774 break;
7775 }
7776 state.path_addresses.get_mut(&0).unwrap().notified = false;
7778 }
7779
7780 assert_eq!(state.rate_limiter.tokens, 0.0);
7782
7783 state.path_addresses.get_mut(&0).unwrap().notified = false;
7785
7786 let frames2 = state.check_for_address_observations(0, true, now);
7788 assert_eq!(frames2.len(), 0);
7789
7790 state.path_addresses.get_mut(&0).unwrap().notified = false;
7792
7793 let later = now + Duration::from_millis(200); let frames3 = state.check_for_address_observations(0, true, later);
7796 assert_eq!(frames3.len(), 1);
7797 }
7798
7799 #[test]
7800 fn check_for_address_observations_multiple_paths() {
7801 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7802 let now = Instant::now();
7803 let mut state = AddressDiscoveryState::new(&config, now);
7804
7805 state.observe_all_paths = true;
7807
7808 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7810 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7811
7812 state
7813 .path_addresses
7814 .insert(0, paths::PathAddressInfo::new());
7815 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(addr1);
7816
7817 state
7818 .path_addresses
7819 .insert(1, paths::PathAddressInfo::new());
7820 state.path_addresses.get_mut(&1).unwrap().observed_address = Some(addr2);
7821
7822 let frames = state.check_for_address_observations(0, true, now);
7824
7825 assert_eq!(frames.len(), 2);
7827
7828 let addresses: Vec<_> = frames.iter().map(|f| f.address).collect();
7830 assert!(addresses.contains(&addr1));
7831 assert!(addresses.contains(&addr2));
7832
7833 assert!(state.path_addresses.get(&0).unwrap().notified);
7835 assert!(state.path_addresses.get(&1).unwrap().notified);
7836 }
7837
7838 #[test]
7840 fn test_rate_limiter_configuration() {
7841 let state = AddressDiscoveryState::new_with_params(true, 10.0, false);
7843 assert_eq!(state.rate_limiter.rate, 10.0);
7844 assert_eq!(state.rate_limiter.max_tokens, 10.0);
7845 assert_eq!(state.rate_limiter.tokens, 10.0);
7846
7847 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
7848 assert_eq!(state.rate_limiter.rate, 63.0);
7849 assert_eq!(state.rate_limiter.max_tokens, 63.0);
7850 }
7851
7852 #[test]
7853 fn test_rate_limiter_update_configuration() {
7854 let mut state = AddressDiscoveryState::new_with_params(true, 5.0, false);
7855
7856 assert_eq!(state.rate_limiter.rate, 5.0);
7858
7859 state.update_rate_limit(10.0);
7861 assert_eq!(state.rate_limiter.rate, 10.0);
7862 assert_eq!(state.rate_limiter.max_tokens, 10.0);
7863
7864 state.rate_limiter.tokens = 15.0;
7866 state.update_rate_limit(8.0);
7867 assert_eq!(state.rate_limiter.tokens, 8.0);
7868 }
7869
7870 #[test]
7871 fn test_rate_limiter_from_transport_params() {
7872 let mut params = TransportParameters::default();
7873 params.address_discovery = Some(AddressDiscoveryConfig::SendAndReceive);
7874
7875 let state = AddressDiscoveryState::from_transport_params(¶ms);
7876 assert!(state.is_some());
7877 let state = state.unwrap();
7878 assert_eq!(state.rate_limiter.rate, 10.0); assert!(!state.observe_all_paths); }
7881
7882 #[test]
7883 fn test_rate_limiter_zero_rate() {
7884 let state = AddressDiscoveryState::new_with_params(true, 0.0, false);
7885 assert_eq!(state.rate_limiter.rate, 0.0);
7886 assert_eq!(state.rate_limiter.tokens, 0.0);
7887
7888 let address = "192.168.1.1:443".parse().unwrap();
7890 let mut state = AddressDiscoveryState::new_with_params(true, 0.0, false);
7891 let frame = state.queue_observed_address_frame(0, address);
7892 assert!(frame.is_none());
7893 }
7894
7895 #[test]
7896 fn test_rate_limiter_configuration_edge_cases() {
7897 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
7899 assert_eq!(state.rate_limiter.rate, 63.0);
7900
7901 let state = AddressDiscoveryState::new_with_params(true, 100.0, false);
7903 assert_eq!(state.rate_limiter.rate, 100.0);
7905
7906 let state = AddressDiscoveryState::new_with_params(true, 2.5, false);
7908 assert_eq!(state.rate_limiter.rate, 2.0);
7910 }
7911
7912 #[test]
7913 fn test_rate_limiter_runtime_update() {
7914 let mut state = AddressDiscoveryState::new_with_params(true, 10.0, false);
7915 let now = Instant::now();
7916
7917 state.rate_limiter.tokens = 5.0;
7919
7920 state.update_rate_limit(3.0);
7922
7923 assert_eq!(state.rate_limiter.tokens, 3.0);
7925 assert_eq!(state.rate_limiter.rate, 3.0);
7926 assert_eq!(state.rate_limiter.max_tokens, 3.0);
7927
7928 let later = now + Duration::from_secs(1);
7930 state.rate_limiter.update_tokens(later);
7931
7932 assert_eq!(state.rate_limiter.tokens, 3.0);
7934 }
7935
7936 #[test]
7938 fn test_address_discovery_state_initialization_default() {
7939 let now = Instant::now();
7941 let default_config = crate::transport_parameters::AddressDiscoveryConfig::default();
7942
7943 let address_discovery_state = Some(AddressDiscoveryState::new(&default_config, now));
7946
7947 assert!(address_discovery_state.is_some());
7948 let state = address_discovery_state.unwrap();
7949
7950 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
7954 }
7955
7956 #[test]
7957 fn test_address_discovery_state_initialization_on_handshake() {
7958 let now = Instant::now();
7960
7961 let mut address_discovery_state = Some(AddressDiscoveryState::new(
7963 &crate::transport_parameters::AddressDiscoveryConfig::default(),
7964 now,
7965 ));
7966
7967 let peer_params = TransportParameters {
7969 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
7970 ..TransportParameters::default()
7971 };
7972
7973 if let Some(peer_config) = &peer_params.address_discovery {
7975 address_discovery_state = Some(AddressDiscoveryState::new(peer_config, now));
7977 }
7978
7979 assert!(address_discovery_state.is_some());
7981 let state = address_discovery_state.unwrap();
7982 assert!(state.enabled);
7983 assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths); }
7987
7988 #[test]
7989 fn test_address_discovery_negotiation_disabled_peer() {
7990 let now = Instant::now();
7992
7993 let our_config = AddressDiscoveryConfig::SendAndReceive;
7995 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
7996
7997 let peer_params = TransportParameters {
7999 address_discovery: None,
8000 ..TransportParameters::default()
8001 };
8002
8003 if peer_params.address_discovery.is_none() {
8005 if let Some(state) = &mut address_discovery_state {
8006 state.enabled = false;
8007 }
8008 }
8009
8010 let state = address_discovery_state.unwrap();
8012 assert!(!state.enabled); }
8014
8015 #[test]
8016 fn test_address_discovery_negotiation_rate_limiting() {
8017 let now = Instant::now();
8019
8020 let our_config = AddressDiscoveryConfig::SendAndReceive;
8022 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8023
8024 if let Some(state) = &mut address_discovery_state {
8026 state.max_observation_rate = 30;
8027 state.update_rate_limit(30.0);
8028 }
8029
8030 let peer_params = TransportParameters {
8032 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8033 ..TransportParameters::default()
8034 };
8035
8036 if let (Some(state), Some(_peer_config)) =
8039 (&mut address_discovery_state, &peer_params.address_discovery)
8040 {
8041 let peer_rate = 15u8;
8044 let negotiated_rate = state.max_observation_rate.min(peer_rate);
8045 state.update_rate_limit(negotiated_rate as f64);
8046 }
8047
8048 let state = address_discovery_state.unwrap();
8050 assert_eq!(state.rate_limiter.rate, 15.0); }
8052
8053 #[test]
8054 fn test_address_discovery_path_initialization() {
8055 let now = Instant::now();
8057 let config = AddressDiscoveryConfig::SendAndReceive;
8058 let mut state = AddressDiscoveryState::new(&config, now);
8059
8060 assert!(state.path_addresses.is_empty());
8062
8063 let should_send = state.should_send_observation(0, now);
8065 assert!(should_send); }
8070
8071 #[test]
8072 fn test_address_discovery_multiple_path_initialization() {
8073 let now = Instant::now();
8075 let config = AddressDiscoveryConfig::SendAndReceive;
8076 let mut state = AddressDiscoveryState::new(&config, now);
8077
8078 assert!(state.should_send_observation(0, now)); assert!(!state.should_send_observation(1, now)); assert!(!state.should_send_observation(2, now)); state.observe_all_paths = true;
8085 assert!(state.should_send_observation(1, now)); assert!(state.should_send_observation(2, now)); let config_primary_only = AddressDiscoveryConfig::SendAndReceive;
8090 let mut state_primary = AddressDiscoveryState::new(&config_primary_only, now);
8091
8092 assert!(state_primary.should_send_observation(0, now)); assert!(!state_primary.should_send_observation(1, now)); }
8095
8096 #[test]
8097 fn test_handle_observed_address_frame_valid() {
8098 let now = Instant::now();
8100 let config = AddressDiscoveryConfig::SendAndReceive;
8101 let mut state = AddressDiscoveryState::new(&config, now);
8102
8103 let observed_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8105 state.handle_observed_address(observed_addr, 0, now);
8106
8107 assert_eq!(state.observed_addresses.len(), 1);
8109 assert_eq!(state.observed_addresses[0].address, observed_addr);
8110 assert_eq!(state.observed_addresses[0].path_id, 0);
8111 assert_eq!(state.observed_addresses[0].received_at, now);
8112
8113 let path_info = state.path_addresses.get(&0).unwrap();
8115 assert_eq!(path_info.observed_address, Some(observed_addr));
8116 assert_eq!(path_info.last_observed, Some(now));
8117 assert_eq!(path_info.observation_count, 1);
8118 }
8119
8120 #[test]
8121 fn test_handle_multiple_observed_addresses() {
8122 let now = Instant::now();
8124 let config = AddressDiscoveryConfig::SendAndReceive;
8125 let mut state = AddressDiscoveryState::new(&config, now);
8126
8127 let addr1 = SocketAddr::from(([192, 168, 1, 100], 5000));
8129 let addr2 = SocketAddr::from(([10, 0, 0, 50], 6000));
8130 let addr3 = SocketAddr::from(([192, 168, 1, 100], 7000)); state.handle_observed_address(addr1, 0, now);
8133 state.handle_observed_address(addr2, 1, now);
8134 state.handle_observed_address(addr3, 0, now + Duration::from_millis(100));
8135
8136 assert_eq!(state.observed_addresses.len(), 3);
8138
8139 let path0_info = state.path_addresses.get(&0).unwrap();
8141 assert_eq!(path0_info.observed_address, Some(addr3));
8142 assert_eq!(path0_info.observation_count, 1); let path1_info = state.path_addresses.get(&1).unwrap();
8146 assert_eq!(path1_info.observed_address, Some(addr2));
8147 assert_eq!(path1_info.observation_count, 1);
8148 }
8149
8150 #[test]
8151 fn test_get_observed_address() {
8152 let now = Instant::now();
8154 let config = AddressDiscoveryConfig::SendAndReceive;
8155 let mut state = AddressDiscoveryState::new(&config, now);
8156
8157 assert_eq!(state.get_observed_address(0), None);
8159
8160 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8162 state.handle_observed_address(addr, 0, now);
8163
8164 assert_eq!(state.get_observed_address(0), Some(addr));
8166
8167 assert_eq!(state.get_observed_address(999), None);
8169 }
8170
8171 #[test]
8172 fn test_has_unnotified_changes() {
8173 let now = Instant::now();
8175 let config = AddressDiscoveryConfig::SendAndReceive;
8176 let mut state = AddressDiscoveryState::new(&config, now);
8177
8178 assert!(!state.has_unnotified_changes());
8180
8181 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8183 state.handle_observed_address(addr, 0, now);
8184 assert!(state.has_unnotified_changes());
8185
8186 if let Some(path_info) = state.path_addresses.get_mut(&0) {
8188 path_info.notified = true;
8189 }
8190 assert!(!state.has_unnotified_changes());
8191
8192 let addr2 = SocketAddr::from(([192, 168, 1, 100], 6000));
8194 state.handle_observed_address(addr2, 0, now + Duration::from_secs(1));
8195 assert!(state.has_unnotified_changes());
8196 }
8197
8198 #[test]
8199 fn test_address_discovery_disabled() {
8200 let now = Instant::now();
8202 let config = AddressDiscoveryConfig::SendAndReceive;
8203 let mut state = AddressDiscoveryState::new(&config, now);
8204
8205 state.enabled = false;
8207
8208 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8210 state.handle_observed_address(addr, 0, now);
8211
8212 assert_eq!(state.observed_addresses.len(), 0);
8214
8215 assert!(!state.should_send_observation(0, now));
8217 }
8218
8219 #[test]
8220 fn test_rate_limiting_basic() {
8221 let now = Instant::now();
8223 let config = AddressDiscoveryConfig::SendAndReceive;
8224 let mut state = AddressDiscoveryState::new(&config, now);
8225
8226 state.observe_all_paths = true;
8228 state.rate_limiter.set_rate(2); assert!(state.should_send_observation(0, now));
8232 state.record_observation_sent(0);
8234
8235 assert!(state.should_send_observation(1, now));
8237 state.record_observation_sent(1);
8238
8239 assert!(!state.should_send_observation(2, now));
8241
8242 let later = now + Duration::from_millis(500);
8244 assert!(state.should_send_observation(3, later));
8245 state.record_observation_sent(3);
8246
8247 assert!(!state.should_send_observation(4, later));
8249
8250 let _one_sec_later = now + Duration::from_secs(1);
8254 let two_sec_later = now + Duration::from_secs(2);
8258 assert!(state.should_send_observation(5, two_sec_later));
8259 state.record_observation_sent(5);
8260
8261 assert!(state.should_send_observation(6, two_sec_later));
8272 state.record_observation_sent(6);
8273
8274 assert!(
8276 !state.should_send_observation(7, two_sec_later),
8277 "Expected no tokens available"
8278 );
8279 }
8280
8281 #[test]
8282 fn test_rate_limiting_per_path() {
8283 let now = Instant::now();
8285 let config = AddressDiscoveryConfig::SendAndReceive;
8286 let mut state = AddressDiscoveryState::new(&config, now);
8287
8288 state
8290 .path_addresses
8291 .insert(0, paths::PathAddressInfo::new());
8292 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
8293 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
8294 8080,
8295 ));
8296
8297 for _ in 0..10 {
8299 assert!(state.should_send_observation(0, now));
8300 state.record_observation_sent(0);
8301 state.path_addresses.get_mut(&0).unwrap().notified = false;
8303 }
8304
8305 assert!(!state.should_send_observation(0, now));
8307
8308 let later = now + Duration::from_millis(100);
8310 assert!(state.should_send_observation(0, later));
8311 state.record_observation_sent(0);
8312
8313 state.path_addresses.get_mut(&0).unwrap().notified = false;
8315
8316 assert!(!state.should_send_observation(0, later));
8318 }
8319
8320 #[test]
8321 fn test_rate_limiting_zero_rate() {
8322 let now = Instant::now();
8324 let config = AddressDiscoveryConfig::SendAndReceive;
8325 let mut state = AddressDiscoveryState::new(&config, now);
8326
8327 state.rate_limiter.set_rate(0);
8329 state.rate_limiter.tokens = 0.0;
8330 state.rate_limiter.max_tokens = 0.0;
8331
8332 assert!(!state.should_send_observation(0, now));
8334 assert!(!state.should_send_observation(0, now + Duration::from_secs(10)));
8335 assert!(!state.should_send_observation(0, now + Duration::from_secs(100)));
8336 }
8337
8338 #[test]
8339 fn test_rate_limiting_update() {
8340 let now = Instant::now();
8342 let config = AddressDiscoveryConfig::SendAndReceive;
8343 let mut state = AddressDiscoveryState::new(&config, now);
8344
8345 state.observe_all_paths = true;
8347
8348 for i in 0..12 {
8350 state
8351 .path_addresses
8352 .insert(i, paths::PathAddressInfo::new());
8353 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
8354 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8355 8080,
8356 ));
8357 }
8358
8359 for i in 0..10 {
8362 assert!(state.should_send_observation(i, now));
8363 state.record_observation_sent(i);
8364 }
8365 assert!(!state.should_send_observation(10, now));
8367
8368 state.update_rate_limit(20.0);
8370
8371 let later = now + Duration::from_millis(50);
8374 assert!(state.should_send_observation(10, later));
8375 state.record_observation_sent(10);
8376
8377 let later2 = now + Duration::from_millis(100);
8379 assert!(state.should_send_observation(11, later2));
8380 }
8381
8382 #[test]
8383 fn test_rate_limiting_burst() {
8384 let now = Instant::now();
8386 let config = AddressDiscoveryConfig::SendAndReceive;
8387 let mut state = AddressDiscoveryState::new(&config, now);
8388
8389 for _ in 0..10 {
8391 assert!(state.should_send_observation(0, now));
8392 state.record_observation_sent(0);
8393 }
8394
8395 assert!(!state.should_send_observation(0, now));
8397
8398 let later = now + Duration::from_millis(100);
8400 assert!(state.should_send_observation(0, later));
8401 state.record_observation_sent(0);
8402 assert!(!state.should_send_observation(0, later));
8403 }
8404
8405 #[test]
8406 fn test_connection_rate_limiting_with_check_observations() {
8407 let now = Instant::now();
8409 let config = AddressDiscoveryConfig::SendAndReceive;
8410 let mut state = AddressDiscoveryState::new(&config, now);
8411
8412 let mut path_info = paths::PathAddressInfo::new();
8414 path_info.update_observed_address(
8415 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
8416 now,
8417 );
8418 state.path_addresses.insert(0, path_info);
8419
8420 let frame1 =
8422 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8423 assert!(frame1.is_some());
8424 state.record_observation_sent(0);
8425
8426 if let Some(info) = state.path_addresses.get_mut(&0) {
8428 info.notified = false;
8429 }
8430
8431 for _ in 1..10 {
8433 if let Some(info) = state.path_addresses.get_mut(&0) {
8435 info.notified = false;
8436 }
8437 let frame =
8438 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8439 assert!(frame.is_some());
8440 state.record_observation_sent(0);
8441 }
8442
8443 if let Some(info) = state.path_addresses.get_mut(&0) {
8445 info.notified = false;
8446 }
8447 let frame3 =
8448 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8449 assert!(frame3.is_none()); let later = now + Duration::from_millis(100);
8453 state.rate_limiter.update_tokens(later); if let Some(info) = state.path_addresses.get_mut(&0) {
8457 info.notified = false;
8458 }
8459
8460 let frame4 =
8461 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8462 assert!(frame4.is_some()); }
8464
8465 #[test]
8466 fn test_queue_observed_address_frame() {
8467 let now = Instant::now();
8469 let config = AddressDiscoveryConfig::SendAndReceive;
8470 let mut state = AddressDiscoveryState::new(&config, now);
8471
8472 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8473
8474 let frame = state.queue_observed_address_frame(0, addr);
8476 assert!(frame.is_some());
8477 assert_eq!(frame.unwrap().address, addr);
8478
8479 state.record_observation_sent(0);
8481
8482 for i in 0..9 {
8484 if let Some(info) = state.path_addresses.get_mut(&0) {
8486 info.notified = false;
8487 }
8488
8489 let frame = state.queue_observed_address_frame(0, addr);
8490 assert!(frame.is_some(), "Frame {} should be allowed", i + 2);
8491 state.record_observation_sent(0);
8492 }
8493
8494 if let Some(info) = state.path_addresses.get_mut(&0) {
8496 info.notified = false;
8497 }
8498
8499 let frame = state.queue_observed_address_frame(0, addr);
8501 assert!(frame.is_none(), "11th frame should be rate limited");
8502 }
8503
8504 #[test]
8505 fn test_multi_path_basic() {
8506 let now = Instant::now();
8508 let config = AddressDiscoveryConfig::SendAndReceive;
8509 let mut state = AddressDiscoveryState::new(&config, now);
8510
8511 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8512 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8513 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8514
8515 state.handle_observed_address(addr1, 0, now);
8517 state.handle_observed_address(addr2, 1, now);
8518 state.handle_observed_address(addr3, 2, now);
8519
8520 assert_eq!(state.get_observed_address(0), Some(addr1));
8522 assert_eq!(state.get_observed_address(1), Some(addr2));
8523 assert_eq!(state.get_observed_address(2), Some(addr3));
8524
8525 assert!(state.has_unnotified_changes());
8527
8528 assert_eq!(state.observed_addresses.len(), 3);
8530 }
8531
8532 #[test]
8533 fn test_multi_path_observe_primary_only() {
8534 let now = Instant::now();
8536 let config = AddressDiscoveryConfig::SendAndReceive;
8537 let mut state = AddressDiscoveryState::new(&config, now);
8538
8539 assert!(state.should_send_observation(0, now));
8541 state.record_observation_sent(0);
8542
8543 assert!(!state.should_send_observation(1, now));
8545 assert!(!state.should_send_observation(2, now));
8546
8547 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8549 assert!(state.queue_observed_address_frame(0, addr).is_some());
8550 assert!(state.queue_observed_address_frame(1, addr).is_none());
8551 assert!(state.queue_observed_address_frame(2, addr).is_none());
8552 }
8553
8554 #[test]
8555 fn test_multi_path_rate_limiting() {
8556 let now = Instant::now();
8558 let config = AddressDiscoveryConfig::SendAndReceive;
8559 let mut state = AddressDiscoveryState::new(&config, now);
8560
8561 state.observe_all_paths = true;
8563
8564 for i in 0..21 {
8566 state
8567 .path_addresses
8568 .insert(i, paths::PathAddressInfo::new());
8569 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
8570 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8571 8080,
8572 ));
8573 }
8574
8575 for i in 0..10 {
8577 assert!(state.should_send_observation(i, now));
8578 state.record_observation_sent(i);
8579 }
8580
8581 assert!(!state.should_send_observation(10, now));
8583
8584 state.path_addresses.get_mut(&0).unwrap().notified = false;
8586 assert!(!state.should_send_observation(0, now)); let later = now + Duration::from_secs(1);
8590 for i in 10..20 {
8591 assert!(state.should_send_observation(i, later));
8592 state.record_observation_sent(i);
8593 }
8594 assert!(!state.should_send_observation(20, later));
8596 }
8597
8598 #[test]
8599 fn test_multi_path_address_changes() {
8600 let now = Instant::now();
8602 let config = AddressDiscoveryConfig::SendAndReceive;
8603 let mut state = AddressDiscoveryState::new(&config, now);
8604
8605 let addr1a = SocketAddr::from(([192, 168, 1, 1], 5000));
8606 let addr1b = SocketAddr::from(([192, 168, 1, 2], 5000));
8607 let addr2a = SocketAddr::from(([10, 0, 0, 1], 6000));
8608 let addr2b = SocketAddr::from(([10, 0, 0, 2], 6000));
8609
8610 state.handle_observed_address(addr1a, 0, now);
8612 state.handle_observed_address(addr2a, 1, now);
8613
8614 state.record_observation_sent(0);
8616 state.record_observation_sent(1);
8617 assert!(!state.has_unnotified_changes());
8618
8619 state.handle_observed_address(addr1b, 0, now + Duration::from_secs(1));
8621 assert!(state.has_unnotified_changes());
8622
8623 assert_eq!(state.get_observed_address(0), Some(addr1b));
8625 assert_eq!(state.get_observed_address(1), Some(addr2a));
8626
8627 state.record_observation_sent(0);
8629 assert!(!state.has_unnotified_changes());
8630
8631 state.handle_observed_address(addr2b, 1, now + Duration::from_secs(2));
8633 assert!(state.has_unnotified_changes());
8634 }
8635
8636 #[test]
8637 fn test_multi_path_migration() {
8638 let now = Instant::now();
8640 let config = AddressDiscoveryConfig::SendAndReceive;
8641 let mut state = AddressDiscoveryState::new(&config, now);
8642
8643 let addr_old = SocketAddr::from(([192, 168, 1, 1], 5000));
8644 let addr_new = SocketAddr::from(([10, 0, 0, 1], 6000));
8645
8646 state.handle_observed_address(addr_old, 0, now);
8648 assert_eq!(state.get_observed_address(0), Some(addr_old));
8649
8650 state.handle_observed_address(addr_new, 1, now + Duration::from_secs(1));
8652
8653 assert_eq!(state.get_observed_address(0), Some(addr_old));
8655 assert_eq!(state.get_observed_address(1), Some(addr_new));
8656
8657 assert_eq!(state.path_addresses.len(), 2);
8660 }
8661
8662 #[test]
8663 fn test_check_for_address_observations_multi_path() {
8664 let now = Instant::now();
8666 let config = AddressDiscoveryConfig::SendAndReceive;
8667 let mut state = AddressDiscoveryState::new(&config, now);
8668
8669 state.observe_all_paths = true;
8671
8672 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8674 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8675 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8676
8677 state.handle_observed_address(addr1, 0, now);
8678 state.handle_observed_address(addr2, 1, now);
8679 state.handle_observed_address(addr3, 2, now);
8680
8681 let frames = state.check_for_address_observations(0, true, now);
8683
8684 assert_eq!(frames.len(), 3);
8686
8687 let frame_addrs: Vec<_> = frames.iter().map(|f| f.address).collect();
8689 assert!(frame_addrs.contains(&addr1), "addr1 should be in frames");
8690 assert!(frame_addrs.contains(&addr2), "addr2 should be in frames");
8691 assert!(frame_addrs.contains(&addr3), "addr3 should be in frames");
8692
8693 assert!(!state.has_unnotified_changes());
8695 }
8696
8697 #[test]
8698 fn test_multi_path_with_peer_not_supporting() {
8699 let now = Instant::now();
8701 let config = AddressDiscoveryConfig::SendAndReceive;
8702 let mut state = AddressDiscoveryState::new(&config, now);
8703
8704 state.handle_observed_address(SocketAddr::from(([192, 168, 1, 1], 5000)), 0, now);
8706 state.handle_observed_address(SocketAddr::from(([10, 0, 0, 1], 6000)), 1, now);
8707
8708 let frames = state.check_for_address_observations(0, false, now);
8710 assert_eq!(frames.len(), 0);
8711
8712 assert!(state.has_unnotified_changes());
8714 }
8715
8716 #[test]
8718 fn test_bootstrap_node_aggressive_observation_mode() {
8719 let config = AddressDiscoveryConfig::SendAndReceive;
8721 let now = Instant::now();
8722 let mut state = AddressDiscoveryState::new(&config, now);
8723
8724 assert!(!state.is_bootstrap_mode());
8726
8727 state.set_bootstrap_mode(true);
8729 assert!(state.is_bootstrap_mode());
8730
8731 assert!(state.should_observe_path(0)); assert!(state.should_observe_path(1)); assert!(state.should_observe_path(2));
8735
8736 let bootstrap_rate = state.get_effective_rate_limit();
8738 assert!(bootstrap_rate > 10.0); }
8740
8741 #[test]
8742 fn test_bootstrap_node_immediate_observation() {
8743 let config = AddressDiscoveryConfig::SendAndReceive;
8745 let now = Instant::now();
8746 let mut state = AddressDiscoveryState::new(&config, now);
8747 state.set_bootstrap_mode(true);
8748
8749 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8751 state.handle_observed_address(addr, 0, now);
8752
8753 assert!(state.should_send_observation_immediately(true));
8755
8756 assert!(state.should_send_observation(0, now));
8758
8759 let frame = state.queue_observed_address_frame(0, addr);
8761 assert!(frame.is_some());
8762 }
8763
8764 #[test]
8765 fn test_bootstrap_node_multiple_path_observations() {
8766 let config = AddressDiscoveryConfig::SendAndReceive;
8768 let now = Instant::now();
8769 let mut state = AddressDiscoveryState::new(&config, now);
8770 state.set_bootstrap_mode(true);
8771
8772 let addrs = vec![
8774 (0, SocketAddr::from(([192, 168, 1, 1], 5000))),
8775 (1, SocketAddr::from(([10, 0, 0, 1], 6000))),
8776 (2, SocketAddr::from(([172, 16, 0, 1], 7000))),
8777 ];
8778
8779 for (path_id, addr) in &addrs {
8780 state.handle_observed_address(*addr, *path_id, now);
8781 }
8782
8783 let frames = state.check_for_address_observations(0, true, now);
8785 assert_eq!(frames.len(), 3);
8786
8787 for (_, addr) in &addrs {
8789 assert!(frames.iter().any(|f| f.address == *addr));
8790 }
8791 }
8792
8793 #[test]
8794 fn test_bootstrap_node_rate_limit_override() {
8795 let config = AddressDiscoveryConfig::SendAndReceive;
8797 let now = Instant::now();
8798 let mut state = AddressDiscoveryState::new(&config, now);
8799 state.set_bootstrap_mode(true);
8800
8801 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8803
8804 for i in 0..10 {
8806 state.handle_observed_address(addr, i, now);
8807 let can_send = state.should_send_observation(i, now);
8808 assert!(can_send, "Bootstrap node should send observation {i}");
8809 state.record_observation_sent(i);
8810 }
8811 }
8812
8813 #[test]
8814 fn test_bootstrap_node_configuration() {
8815 let config = AddressDiscoveryConfig::SendAndReceive;
8817 let mut state = AddressDiscoveryState::new(&config, Instant::now());
8818
8819 state.set_bootstrap_mode(true);
8821
8822 assert!(state.bootstrap_mode);
8824 assert!(state.enabled);
8825
8826 let effective_rate = state.get_effective_rate_limit();
8828 assert!(effective_rate > state.max_observation_rate as f64);
8829 }
8830
8831 #[test]
8832 fn test_bootstrap_node_persistent_observation() {
8833 let config = AddressDiscoveryConfig::SendAndReceive;
8835 let mut now = Instant::now();
8836 let mut state = AddressDiscoveryState::new(&config, now);
8837 state.set_bootstrap_mode(true);
8838
8839 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8840 let addr2 = SocketAddr::from(([192, 168, 1, 2], 5000));
8841
8842 state.handle_observed_address(addr1, 0, now);
8844 assert!(state.should_send_observation(0, now));
8845 state.record_observation_sent(0);
8846
8847 now += Duration::from_secs(60);
8849 state.handle_observed_address(addr2, 0, now);
8850
8851 assert!(state.should_send_observation(0, now));
8853 }
8854
8855 #[test]
8856 fn test_bootstrap_node_multi_peer_support() {
8857 let config = AddressDiscoveryConfig::SendAndReceive;
8860 let now = Instant::now();
8861 let mut state = AddressDiscoveryState::new(&config, now);
8862 state.set_bootstrap_mode(true);
8863
8864 let peer_addresses = vec![
8866 (0, SocketAddr::from(([192, 168, 1, 1], 5000))), (1, SocketAddr::from(([10, 0, 0, 1], 6000))), (2, SocketAddr::from(([172, 16, 0, 1], 7000))), (3, SocketAddr::from(([192, 168, 2, 1], 8000))), ];
8871
8872 for (path_id, addr) in &peer_addresses {
8874 state.handle_observed_address(*addr, *path_id, now);
8875 }
8876
8877 let frames = state.check_for_address_observations(0, true, now);
8879 assert_eq!(frames.len(), peer_addresses.len());
8880
8881 for (_, addr) in &peer_addresses {
8883 assert!(frames.iter().any(|f| f.address == *addr));
8884 }
8885 }
8886
8887 mod address_discovery_tests {
8889 include!("address_discovery_tests.rs");
8890 }
8891}