1#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
9use std::{
10 cmp,
11 collections::VecDeque,
12 convert::TryFrom,
13 fmt, io, mem,
14 net::{IpAddr, SocketAddr},
15 sync::Arc,
16};
17
18use bytes::{Bytes, BytesMut};
19use frame::StreamMetaVec;
20use rand::{Rng, SeedableRng, rngs::StdRng};
23use thiserror::Error;
24use tracing::{debug, error, info, trace, trace_span, warn};
25
26use crate::{
27 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
28 MIN_INITIAL_SIZE, MtuDiscoveryConfig, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
29 TransportError, TransportErrorCode, VarInt, VarIntBoundsExceeded,
30 cid_generator::ConnectionIdGenerator,
31 cid_queue::CidQueue,
32 coding::BufMutExt,
33 config::{ServerConfig, TransportConfig},
34 crypto::{self, KeyPair, Keys, PacketKey},
35 endpoint::AddressDiscoveryStats,
36 frame::{self, Close, Datagram, FrameStruct, NewToken},
37 nat_traversal_api::PeerId,
38 packet::{
39 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
40 PacketNumber, PartialDecode, SpaceId,
41 },
42 range_set::ArrayRangeSet,
43 shared::{
44 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
45 EndpointEvent, EndpointEventInner,
46 },
47 token::ResetToken,
48 transport_parameters::TransportParameters,
49};
50
51fn allow_loopback_from_env() -> bool {
52 matches!(
53 std::env::var("ANT_QUIC_ALLOW_LOOPBACK")
54 .unwrap_or_default()
55 .trim()
56 .to_ascii_lowercase()
57 .as_str(),
58 "1" | "true" | "yes"
59 )
60}
61
62mod ack_frequency;
63use ack_frequency::AckFrequencyState;
64
65pub mod port_prediction;
66pub use self::port_prediction::{PortPredictor, PortPredictorConfig};
67
68pub(crate) mod nat_traversal;
69use nat_traversal::NatTraversalState;
70pub(crate) use nat_traversal::{CoordinationPhase, NatTraversalError};
72
73mod assembler;
74pub use assembler::Chunk;
75
76mod cid_state;
77use cid_state::CidState;
78
79mod datagrams;
80use datagrams::DatagramState;
81pub use datagrams::{Datagrams, SendDatagramError};
82
83mod mtud;
84
85mod pacing;
86
87mod packet_builder;
88use packet_builder::PacketBuilder;
89
90mod packet_crypto;
91use packet_crypto::{PrevCrypto, ZeroRttCrypto};
92
93mod paths;
94pub use paths::RttEstimator;
95use paths::{PathData, PathResponses};
96
97mod send_buffer;
98
99mod spaces;
100#[cfg(fuzzing)]
101pub use spaces::Retransmits;
102#[cfg(not(fuzzing))]
103use spaces::Retransmits;
104use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
105
106mod stats;
107pub use stats::{ConnectionStats, DatagramDropStats, FrameStats, PathStats, UdpStats};
108
109mod streams;
110#[cfg(fuzzing)]
111pub use streams::StreamsState;
112#[cfg(not(fuzzing))]
113use streams::StreamsState;
114pub use streams::{
115 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
116 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
117};
118
119mod timer;
120use crate::congestion::Controller;
121use timer::{Timer, TimerTable};
122
123pub struct Connection {
163 endpoint_config: Arc<EndpointConfig>,
164 config: Arc<TransportConfig>,
165 rng: StdRng,
166 crypto: Box<dyn crypto::Session>,
167 handshake_cid: ConnectionId,
169 rem_handshake_cid: ConnectionId,
171 local_ip: Option<IpAddr>,
174 path: PathData,
175 allow_mtud: bool,
177 prev_path: Option<(ConnectionId, PathData)>,
178 state: State,
179 side: ConnectionSide,
180 zero_rtt_enabled: bool,
182 zero_rtt_crypto: Option<ZeroRttCrypto>,
184 key_phase: bool,
185 key_phase_size: u64,
187 peer_params: TransportParameters,
189 orig_rem_cid: ConnectionId,
191 initial_dst_cid: ConnectionId,
193 retry_src_cid: Option<ConnectionId>,
196 lost_packets: u64,
198 events: VecDeque<Event>,
199 endpoint_events: VecDeque<EndpointEventInner>,
200 spin_enabled: bool,
202 spin: bool,
204 spaces: [PacketSpace; 3],
206 highest_space: SpaceId,
208 prev_crypto: Option<PrevCrypto>,
210 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
215 accepted_0rtt: bool,
216 permit_idle_reset: bool,
218 idle_timeout: Option<Duration>,
220 timers: TimerTable,
221 authentication_failures: u64,
223 error: Option<ConnectionError>,
225 packet_number_filter: PacketNumberFilter,
227
228 path_responses: PathResponses,
233 close: bool,
234
235 ack_frequency: AckFrequencyState,
239
240 pto_count: u32,
245
246 receiving_ecn: bool,
251 total_authed_packets: u64,
253 app_limited: bool,
256
257 streams: StreamsState,
258 rem_cids: CidQueue,
260 local_cid_state: CidState,
262 datagrams: DatagramState,
264 stats: ConnectionStats,
266 version: u32,
268
269 nat_traversal: Option<NatTraversalState>,
271
272 nat_traversal_frame_config: frame::nat_traversal_unified::NatTraversalFrameConfig,
274
275 address_discovery_state: Option<AddressDiscoveryState>,
277
278 pqc_state: PqcState,
280
281 #[cfg(feature = "trace")]
283 trace_context: crate::tracing::TraceContext,
284
285 #[cfg(feature = "trace")]
287 event_log: Arc<crate::tracing::EventLog>,
288
289 #[cfg(feature = "__qlog")]
291 qlog_streamer: Option<Box<dyn std::io::Write + Send + Sync>>,
292
293 peer_id_for_tokens: Option<PeerId>,
295 delay_new_token_until_binding: bool,
297}
298
299impl Connection {
300 pub(crate) fn new(
301 endpoint_config: Arc<EndpointConfig>,
302 config: Arc<TransportConfig>,
303 init_cid: ConnectionId,
304 loc_cid: ConnectionId,
305 rem_cid: ConnectionId,
306 remote: SocketAddr,
307 local_ip: Option<IpAddr>,
308 crypto: Box<dyn crypto::Session>,
309 cid_gen: &dyn ConnectionIdGenerator,
310 now: Instant,
311 version: u32,
312 allow_mtud: bool,
313 rng_seed: [u8; 32],
314 side_args: SideArgs,
315 ) -> Self {
316 let pref_addr_cid = side_args.pref_addr_cid();
317 let path_validated = side_args.path_validated();
318 let connection_side = ConnectionSide::from(side_args);
319 let side = connection_side.side();
320 let initial_space = PacketSpace {
321 crypto: Some(crypto.initial_keys(&init_cid, side)),
322 ..PacketSpace::new(now)
323 };
324 let state = State::Handshake(state::Handshake {
325 rem_cid_set: side.is_server(),
326 expected_token: Bytes::new(),
327 client_hello: None,
328 });
329 let mut rng = StdRng::from_seed(rng_seed);
330 let mut this = Self {
331 endpoint_config,
332 crypto,
333 handshake_cid: loc_cid,
334 rem_handshake_cid: rem_cid,
335 local_cid_state: CidState::new(
336 cid_gen.cid_len(),
337 cid_gen.cid_lifetime(),
338 now,
339 if pref_addr_cid.is_some() { 2 } else { 1 },
340 ),
341 path: PathData::new(remote, allow_mtud, None, now, &config),
342 allow_mtud,
343 local_ip,
344 prev_path: None,
345 state,
346 side: connection_side,
347 zero_rtt_enabled: false,
348 zero_rtt_crypto: None,
349 key_phase: false,
350 key_phase_size: rng.gen_range(10..1000),
357 peer_params: TransportParameters::default(),
358 orig_rem_cid: rem_cid,
359 initial_dst_cid: init_cid,
360 retry_src_cid: None,
361 lost_packets: 0,
362 events: VecDeque::new(),
363 endpoint_events: VecDeque::new(),
364 spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
365 spin: false,
366 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
367 highest_space: SpaceId::Initial,
368 prev_crypto: None,
369 next_crypto: None,
370 accepted_0rtt: false,
371 permit_idle_reset: true,
372 idle_timeout: match config.max_idle_timeout {
373 None | Some(VarInt(0)) => None,
374 Some(dur) => Some(Duration::from_millis(dur.0)),
375 },
376 timers: TimerTable::default(),
377 authentication_failures: 0,
378 error: None,
379 #[cfg(test)]
380 packet_number_filter: match config.deterministic_packet_numbers {
381 false => PacketNumberFilter::new(&mut rng),
382 true => PacketNumberFilter::disabled(),
383 },
384 #[cfg(not(test))]
385 packet_number_filter: PacketNumberFilter::new(&mut rng),
386
387 path_responses: PathResponses::default(),
388 close: false,
389
390 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
391 &TransportParameters::default(),
392 )),
393
394 pto_count: 0,
395
396 app_limited: false,
397 receiving_ecn: false,
398 total_authed_packets: 0,
399
400 streams: StreamsState::new(
401 side,
402 config.max_concurrent_uni_streams,
403 config.max_concurrent_bidi_streams,
404 config.send_window,
405 config.receive_window,
406 config.stream_receive_window,
407 ),
408 datagrams: DatagramState::default(),
409 config,
410 rem_cids: CidQueue::new(rem_cid),
411 rng,
412 stats: ConnectionStats::default(),
413 version,
414 nat_traversal: None, nat_traversal_frame_config:
416 frame::nat_traversal_unified::NatTraversalFrameConfig::default(),
417 address_discovery_state: {
418 Some(AddressDiscoveryState::new(
421 &crate::transport_parameters::AddressDiscoveryConfig::default(),
422 now,
423 ))
424 },
425 pqc_state: PqcState::new(),
426
427 #[cfg(feature = "trace")]
428 trace_context: crate::tracing::TraceContext::new(crate::tracing::TraceId::new()),
429
430 #[cfg(feature = "trace")]
431 event_log: crate::tracing::global_log(),
432
433 #[cfg(feature = "__qlog")]
434 qlog_streamer: None,
435
436 peer_id_for_tokens: None,
437 delay_new_token_until_binding: false,
438 };
439
440 #[cfg(feature = "trace")]
442 {
443 use crate::trace_event;
444 use crate::tracing::{Event, EventData, socket_addr_to_bytes, timestamp_now};
445 let _peer_id = {
447 let mut id = [0u8; 32];
448 let addr_bytes = match remote {
449 SocketAddr::V4(addr) => addr.ip().octets().to_vec(),
450 SocketAddr::V6(addr) => addr.ip().octets().to_vec(),
451 };
452 id[..addr_bytes.len().min(32)]
453 .copy_from_slice(&addr_bytes[..addr_bytes.len().min(32)]);
454 id
455 };
456
457 let (addr_bytes, addr_type) = socket_addr_to_bytes(remote);
458 trace_event!(
459 &this.event_log,
460 Event {
461 timestamp: timestamp_now(),
462 trace_id: this.trace_context.trace_id(),
463 sequence: 0,
464 _padding: 0,
465 node_id: [0u8; 32], event_data: EventData::ConnInit {
467 endpoint_bytes: addr_bytes,
468 addr_type,
469 _padding: [0u8; 45],
470 },
471 }
472 );
473 }
474
475 if path_validated {
476 this.on_path_validated();
477 }
478 if side.is_client() {
479 this.write_crypto();
481 this.init_0rtt();
482 }
483 this
484 }
485
486 #[cfg(feature = "__qlog")]
488 pub fn set_qlog(
489 &mut self,
490 writer: Box<dyn std::io::Write + Send + Sync>,
491 _title: Option<String>,
492 _description: Option<String>,
493 _now: Instant,
494 ) {
495 self.qlog_streamer = Some(writer);
496 }
497
498 #[cfg(feature = "__qlog")]
500 fn emit_qlog_recovery_metrics(&mut self, _now: Instant) {
501 }
504
505 #[must_use]
513 pub fn poll_timeout(&mut self) -> Option<Instant> {
514 let mut next_timeout = self.timers.next_timeout();
515
516 if let Some(nat_state) = &self.nat_traversal {
518 if let Some(nat_timeout) = nat_state.get_next_timeout(Instant::now()) {
519 self.timers.set(Timer::NatTraversal, nat_timeout);
521 next_timeout = Some(next_timeout.map_or(nat_timeout, |t| t.min(nat_timeout)));
522 }
523 }
524
525 next_timeout
526 }
527
528 #[must_use]
534 pub fn poll(&mut self) -> Option<Event> {
535 if let Some(x) = self.events.pop_front() {
536 return Some(x);
537 }
538
539 if let Some(event) = self.streams.poll() {
540 return Some(Event::Stream(event));
541 }
542
543 if let Some(err) = self.error.take() {
544 return Some(Event::ConnectionLost { reason: err });
545 }
546
547 None
548 }
549
550 #[must_use]
552 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
553 self.endpoint_events.pop_front().map(EndpointEvent)
554 }
555
556 #[must_use]
558 pub fn streams(&mut self) -> Streams<'_> {
559 Streams {
560 state: &mut self.streams,
561 conn_state: &self.state,
562 }
563 }
564
565 #[must_use]
569 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
570 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
571 RecvStream {
572 id,
573 state: &mut self.streams,
574 pending: &mut self.spaces[SpaceId::Data].pending,
575 }
576 }
577
578 #[must_use]
580 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
581 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
582 SendStream {
583 id,
584 state: &mut self.streams,
585 pending: &mut self.spaces[SpaceId::Data].pending,
586 conn_state: &self.state,
587 }
588 }
589
590 #[must_use]
600 pub fn poll_transmit(
601 &mut self,
602 now: Instant,
603 max_datagrams: usize,
604 buf: &mut Vec<u8>,
605 ) -> Option<Transmit> {
606 assert!(max_datagrams != 0);
607 let max_datagrams = match self.config.enable_segmentation_offload {
608 false => 1,
609 true => max_datagrams,
610 };
611
612 let mut num_datagrams = 0;
613 let mut datagram_start = 0;
616 let mut segment_size = usize::from(self.path.current_mtu());
617
618 if let Some(nat_traversal) = &mut self.nat_traversal {
620 if nat_traversal.check_coordination_timeout(now) {
621 trace!("NAT traversal coordination timed out, may retry");
622 }
623 let expired = nat_traversal.check_validation_timeouts(now);
625 if !expired.is_empty() {
626 debug!(
627 "Cleaned up {} expired NAT traversal validations",
628 expired.len()
629 );
630 }
631 }
632
633 self.check_for_address_observations(now);
635
636 if let Some(challenge) = self.send_nat_traversal_challenge(now, buf) {
638 return Some(challenge);
639 }
640
641 if let Some(challenge) = self.send_path_challenge(now, buf) {
642 return Some(challenge);
643 }
644
645 for space in SpaceId::iter() {
647 let request_immediate_ack =
648 space == SpaceId::Data && self.peer_supports_ack_frequency();
649 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
650 }
651
652 let close = match self.state {
654 State::Drained => {
655 self.app_limited = true;
656 return None;
657 }
658 State::Draining | State::Closed(_) => {
659 if !self.close {
662 self.app_limited = true;
663 return None;
664 }
665 true
666 }
667 _ => false,
668 };
669
670 if let Some(config) = &self.config.ack_frequency_config {
672 self.spaces[SpaceId::Data].pending.ack_frequency = self
673 .ack_frequency
674 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
675 && self.highest_space == SpaceId::Data
676 && self.peer_supports_ack_frequency();
677 }
678
679 let mut buf_capacity = 0;
683
684 let mut coalesce = true;
685 let mut builder_storage: Option<PacketBuilder> = None;
686 let mut sent_frames = None;
687 let mut pad_datagram = false;
688 let mut pad_datagram_to_mtu = false;
689 let mut congestion_blocked = false;
690
691 let mut space_idx = 0;
693 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
694 while space_idx < spaces.len() {
697 let space_id = spaces[space_idx];
698 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
705 let frame_space_1rtt =
706 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
707
708 let can_send = self.space_can_send(space_id, frame_space_1rtt);
710 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
711 space_idx += 1;
712 continue;
713 }
714
715 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
716 || self.spaces[space_id].ping_pending
717 || self.spaces[space_id].immediate_ack_pending;
718 if space_id == SpaceId::Data {
719 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
720 }
721
722 pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
723
724 let buf_end = if let Some(builder) = &builder_storage {
728 buf.len().max(builder.min_size) + builder.tag_len
729 } else {
730 buf.len()
731 };
732
733 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
734 crypto.packet.local.tag_len()
735 } else if space_id == SpaceId::Data {
736 match self.zero_rtt_crypto.as_ref() {
737 Some(crypto) => crypto.packet.tag_len(),
738 None => {
739 error!(
741 "sending packets in the application data space requires known 0-RTT or 1-RTT keys"
742 );
743 return None;
744 }
745 }
746 } else {
747 unreachable!("tried to send {:?} packet without keys", space_id)
748 };
749 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
750 if num_datagrams >= max_datagrams {
754 break;
756 }
757
758 if self
765 .path
766 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
767 {
768 trace!("blocked by anti-amplification");
769 break;
770 }
771
772 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
775 let untracked_bytes = if let Some(builder) = &builder_storage {
777 buf_capacity - builder.partial_encode.start
778 } else {
779 0
780 } as u64;
781 debug_assert!(untracked_bytes <= segment_size as u64);
782
783 let bytes_to_send = segment_size as u64 + untracked_bytes;
784 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
785 space_idx += 1;
786 congestion_blocked = true;
787 trace!("blocked by congestion control");
790 continue;
791 }
792
793 let smoothed_rtt = self.path.rtt.get();
795 if let Some(delay) = self.path.pacing.delay(
796 smoothed_rtt,
797 bytes_to_send,
798 self.path.current_mtu(),
799 self.path.congestion.window(),
800 now,
801 ) {
802 self.timers.set(Timer::Pacing, delay);
803 congestion_blocked = true;
804 trace!("blocked by pacing");
807 break;
808 }
809 }
810
811 if let Some(mut builder) = builder_storage.take() {
813 if pad_datagram {
814 let min_size = self.pqc_state.min_initial_size();
815 builder.pad_to(min_size);
816 }
817
818 if num_datagrams > 1 || pad_datagram_to_mtu {
819 const MAX_PADDING: usize = 16;
832 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
833 - datagram_start
834 + builder.tag_len;
835 if (packet_len_unpadded + MAX_PADDING < segment_size
836 && !pad_datagram_to_mtu)
837 || datagram_start + segment_size > buf_capacity
838 {
839 trace!(
840 "GSO truncated by demand for {} padding bytes or loss probe",
841 segment_size - packet_len_unpadded
842 );
843 builder_storage = Some(builder);
844 break;
845 }
846
847 builder.pad_to(segment_size as u16);
850 }
851
852 builder.finish_and_track(now, self, sent_frames.take(), buf);
853
854 if num_datagrams == 1 {
855 segment_size = buf.len();
862 buf_capacity = buf.len();
865
866 if space_id == SpaceId::Data {
873 let frame_space_1rtt =
874 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
875 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
876 break;
877 }
878 }
879 }
880 }
881
882 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
884 0 => segment_size,
885 _ => {
886 self.spaces[space_id].loss_probes -= 1;
887 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
891 }
892 };
893 buf_capacity += next_datagram_size_limit;
894 if buf.capacity() < buf_capacity {
895 buf.reserve(max_datagrams * segment_size);
904 }
905 num_datagrams += 1;
906 coalesce = true;
907 pad_datagram = false;
908 datagram_start = buf.len();
909
910 debug_assert_eq!(
911 datagram_start % segment_size,
912 0,
913 "datagrams in a GSO batch must be aligned to the segment size"
914 );
915 } else {
916 if let Some(builder) = builder_storage.take() {
920 builder.finish_and_track(now, self, sent_frames.take(), buf);
921 }
922 }
923
924 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
925
926 if self.spaces[SpaceId::Initial].crypto.is_some()
931 && space_id == SpaceId::Handshake
932 && self.side.is_client()
933 {
934 self.discard_space(now, SpaceId::Initial);
937 }
938 if let Some(ref mut prev) = self.prev_crypto {
939 prev.update_unacked = false;
940 }
941
942 debug_assert!(
943 builder_storage.is_none() && sent_frames.is_none(),
944 "Previous packet must have been finished"
945 );
946
947 let builder = builder_storage.insert(PacketBuilder::new(
948 now,
949 space_id,
950 self.rem_cids.active(),
951 buf,
952 buf_capacity,
953 datagram_start,
954 ack_eliciting,
955 self,
956 )?);
957 coalesce = coalesce && !builder.short_header;
958
959 let should_adjust_coalescing = self
961 .pqc_state
962 .should_adjust_coalescing(buf.len() - datagram_start, space_id);
963
964 if should_adjust_coalescing {
965 coalesce = false;
966 trace!("Disabling coalescing for PQC handshake in {:?}", space_id);
967 }
968
969 pad_datagram |=
971 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
972
973 if close {
974 trace!("sending CONNECTION_CLOSE");
975 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
980 if Self::populate_acks(
981 now,
982 self.receiving_ecn,
983 &mut SentFrames::default(),
984 &mut self.spaces[space_id],
985 buf,
986 &mut self.stats,
987 )
988 .is_err()
989 {
990 self.handle_encode_error(now, "ACK (close)");
991 return None;
992 }
993 }
994
995 debug_assert!(
999 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
1000 "ACKs should leave space for ConnectionClose"
1001 );
1002 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
1003 let max_frame_size = builder.max_size - buf.len();
1004 match self.state {
1005 State::Closed(state::Closed { ref reason }) => {
1006 let result = if space_id == SpaceId::Data || reason.is_transport_layer()
1007 {
1008 reason.try_encode(buf, max_frame_size)
1009 } else {
1010 frame::ConnectionClose {
1011 error_code: TransportErrorCode::APPLICATION_ERROR,
1012 frame_type: None,
1013 reason: Bytes::new(),
1014 }
1015 .try_encode(buf, max_frame_size)
1016 };
1017 if result.is_err() {
1018 self.handle_encode_error(now, "ConnectionClose");
1019 return None;
1020 }
1021 }
1022 State::Draining => {
1023 if (frame::ConnectionClose {
1024 error_code: TransportErrorCode::NO_ERROR,
1025 frame_type: None,
1026 reason: Bytes::new(),
1027 })
1028 .try_encode(buf, max_frame_size)
1029 .is_err()
1030 {
1031 self.handle_encode_error(now, "ConnectionClose (draining)");
1032 return None;
1033 }
1034 }
1035 _ => unreachable!(
1036 "tried to make a close packet when the connection wasn't closed"
1037 ),
1038 }
1039 }
1040 if space_id == self.highest_space {
1041 self.close = false;
1043 break;
1045 } else {
1046 space_idx += 1;
1050 continue;
1051 }
1052 }
1053
1054 if space_id == SpaceId::Data && num_datagrams == 1 {
1057 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
1058 let mut builder = builder_storage.take().unwrap();
1061 trace!("PATH_RESPONSE {:08x} (off-path)", token);
1062 if !self.encode_or_close(
1063 now,
1064 frame::FrameType::PATH_RESPONSE.try_encode(buf),
1065 "PATH_RESPONSE (off-path)",
1066 ) {
1067 return None;
1068 }
1069 buf.write(token);
1070 self.stats.frame_tx.path_response += 1;
1071 let min_size = self.pqc_state.min_initial_size();
1072 builder.pad_to(min_size);
1073 builder.finish_and_track(
1074 now,
1075 self,
1076 Some(SentFrames {
1077 non_retransmits: true,
1078 ..SentFrames::default()
1079 }),
1080 buf,
1081 );
1082 self.stats.udp_tx.on_sent(1, buf.len());
1083
1084 #[cfg(feature = "trace")]
1086 {
1087 use crate::trace_packet_sent;
1088 trace_packet_sent!(
1090 &self.event_log,
1091 self.trace_context.trace_id(),
1092 buf.len() as u32,
1093 0 );
1095 }
1096
1097 return Some(Transmit {
1098 destination: remote,
1099 size: buf.len(),
1100 ecn: None,
1101 segment_size: None,
1102 src_ip: self.local_ip,
1103 });
1104 }
1105 }
1106
1107 if space_id == SpaceId::Data && self.address_discovery_state.is_some() {
1109 let peer_supports = self.peer_params.address_discovery.is_some();
1110
1111 if let Some(state) = &mut self.address_discovery_state {
1112 if peer_supports {
1113 if let Some(frame) = state.queue_observed_address_frame(0, self.path.remote)
1114 {
1115 self.spaces[space_id]
1116 .pending
1117 .outbound_observations
1118 .push(frame);
1119 }
1120 }
1121 }
1122 }
1123
1124 let sent =
1125 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
1126
1127 debug_assert!(
1134 !(sent.is_ack_only(&self.streams)
1135 && !can_send.acks
1136 && can_send.other
1137 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
1138 && self.datagrams.outgoing.is_empty()),
1139 "SendableFrames was {can_send:?}, but only ACKs have been written"
1140 );
1141 pad_datagram |= sent.requires_padding;
1142
1143 if sent.largest_acked.is_some() {
1144 self.spaces[space_id].pending_acks.acks_sent();
1145 self.timers.stop(Timer::MaxAckDelay);
1146 }
1147
1148 sent_frames = Some(sent);
1150
1151 }
1154
1155 if let Some(mut builder) = builder_storage {
1157 if pad_datagram {
1158 let min_size = self.pqc_state.min_initial_size();
1159 builder.pad_to(min_size);
1160 }
1161
1162 if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
1168 builder.pad_to(segment_size as u16);
1169 }
1170
1171 let last_packet_number = builder.exact_number;
1172 builder.finish_and_track(now, self, sent_frames, buf);
1173 self.path
1174 .congestion
1175 .on_sent(now, buf.len() as u64, last_packet_number);
1176
1177 #[cfg(feature = "__qlog")]
1178 self.emit_qlog_recovery_metrics(now);
1179 }
1180
1181 self.app_limited = buf.is_empty() && !congestion_blocked;
1182
1183 if buf.is_empty() && self.state.is_established() {
1185 let space_id = SpaceId::Data;
1186 let probe_size = self
1187 .path
1188 .mtud
1189 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
1190
1191 let buf_capacity = probe_size as usize;
1192 buf.reserve(buf_capacity);
1193
1194 let mut builder = PacketBuilder::new(
1195 now,
1196 space_id,
1197 self.rem_cids.active(),
1198 buf,
1199 buf_capacity,
1200 0,
1201 true,
1202 self,
1203 )?;
1204
1205 if !self.encode_or_close(now, frame::FrameType::PING.try_encode(buf), "PING (MTU)") {
1207 return None;
1208 }
1209 self.stats.frame_tx.ping += 1;
1210
1211 if self.peer_supports_ack_frequency() {
1213 if !self.encode_or_close(
1214 now,
1215 frame::FrameType::IMMEDIATE_ACK.try_encode(buf),
1216 "IMMEDIATE_ACK (MTU)",
1217 ) {
1218 return None;
1219 }
1220 self.stats.frame_tx.immediate_ack += 1;
1221 }
1222
1223 builder.pad_to(probe_size);
1224 let sent_frames = SentFrames {
1225 non_retransmits: true,
1226 ..Default::default()
1227 };
1228 builder.finish_and_track(now, self, Some(sent_frames), buf);
1229
1230 self.stats.path.sent_plpmtud_probes += 1;
1231 num_datagrams = 1;
1232
1233 trace!(?probe_size, "writing MTUD probe");
1234 }
1235
1236 if buf.is_empty() {
1237 return None;
1238 }
1239
1240 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1241 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1242
1243 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
1244
1245 #[cfg(feature = "trace")]
1247 {
1248 use crate::trace_packet_sent;
1249 let packet_num = self.spaces[SpaceId::Data]
1252 .next_packet_number
1253 .saturating_sub(1);
1254 trace_packet_sent!(
1255 &self.event_log,
1256 self.trace_context.trace_id(),
1257 buf.len() as u32,
1258 packet_num
1259 );
1260 }
1261
1262 Some(Transmit {
1263 destination: self.path.remote,
1264 size: buf.len(),
1265 ecn: if self.path.sending_ecn {
1266 Some(EcnCodepoint::Ect0)
1267 } else {
1268 None
1269 },
1270 segment_size: match num_datagrams {
1271 1 => None,
1272 _ => Some(segment_size),
1273 },
1274 src_ip: self.local_ip,
1275 })
1276 }
1277
1278 fn send_coordination_request(&mut self, _now: Instant, _buf: &mut Vec<u8>) -> Option<Transmit> {
1280 let nat = self.nat_traversal.as_mut()?;
1282 if !nat.should_send_punch_request() {
1283 return None;
1284 }
1285
1286 let coord = nat.coordination.as_ref()?;
1287 let round = coord.round;
1288 if coord.punch_targets.is_empty() {
1289 return None;
1290 }
1291
1292 trace!(
1293 "queuing PUNCH_ME_NOW round {} with {} targets",
1294 round,
1295 coord.punch_targets.len()
1296 );
1297
1298 for target in &coord.punch_targets {
1300 let punch = frame::PunchMeNow {
1301 round,
1302 paired_with_sequence_number: target.remote_sequence,
1303 address: target.remote_addr,
1304 target_peer_id: None,
1305 };
1306 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1307 }
1308
1309 nat.mark_punch_request_sent();
1311
1312 None
1314 }
1315
1316 fn send_coordinated_path_challenge(
1318 &mut self,
1319 now: Instant,
1320 buf: &mut Vec<u8>,
1321 ) -> Option<Transmit> {
1322 if let Some(nat_traversal) = &mut self.nat_traversal {
1324 if nat_traversal.should_start_punching(now) {
1325 nat_traversal.start_punching_phase(now);
1326 }
1327 }
1328
1329 let (target_addr, challenge) = {
1331 let nat_traversal = self.nat_traversal.as_ref()?;
1332 match nat_traversal.get_coordination_phase() {
1333 Some(CoordinationPhase::Punching) => {
1334 let targets = nat_traversal.get_punch_targets_from_coordination()?;
1335 if targets.is_empty() {
1336 return None;
1337 }
1338 let target = &targets[0];
1340 (target.remote_addr, target.challenge)
1341 }
1342 _ => return None,
1343 }
1344 };
1345
1346 debug_assert_eq!(
1347 self.highest_space,
1348 SpaceId::Data,
1349 "PATH_CHALLENGE queued without 1-RTT keys"
1350 );
1351
1352 buf.reserve(self.pqc_state.min_initial_size() as usize);
1353 let buf_capacity = buf.capacity();
1354
1355 let mut builder = PacketBuilder::new(
1356 now,
1357 SpaceId::Data,
1358 self.rem_cids.active(),
1359 buf,
1360 buf_capacity,
1361 0,
1362 false,
1363 self,
1364 )?;
1365
1366 trace!(
1367 "sending coordinated PATH_CHALLENGE {:08x} to {}",
1368 challenge, target_addr
1369 );
1370 if !self.encode_or_close(
1371 now,
1372 frame::FrameType::PATH_CHALLENGE.try_encode(buf),
1373 "PATH_CHALLENGE (coordination)",
1374 ) {
1375 return None;
1376 }
1377 buf.write(challenge);
1378 self.stats.frame_tx.path_challenge += 1;
1379
1380 let min_size = self.pqc_state.min_initial_size();
1381 builder.pad_to(min_size);
1382 builder.finish_and_track(now, self, None, buf);
1383
1384 if let Some(nat_traversal) = &mut self.nat_traversal {
1386 nat_traversal.mark_coordination_validating();
1387 }
1388
1389 Some(Transmit {
1390 destination: target_addr,
1391 size: buf.len(),
1392 ecn: if self.path.sending_ecn {
1393 Some(EcnCodepoint::Ect0)
1394 } else {
1395 None
1396 },
1397 segment_size: None,
1398 src_ip: self.local_ip,
1399 })
1400 }
1401
1402 fn send_nat_traversal_challenge(
1404 &mut self,
1405 now: Instant,
1406 buf: &mut Vec<u8>,
1407 ) -> Option<Transmit> {
1408 if let Some(request) = self.send_coordination_request(now, buf) {
1410 return Some(request);
1411 }
1412
1413 if let Some(punch) = self.send_coordinated_path_challenge(now, buf) {
1415 return Some(punch);
1416 }
1417
1418 let (remote_addr, remote_sequence) = {
1420 let nat_traversal = self.nat_traversal.as_ref()?;
1421 let candidates = nat_traversal.get_validation_candidates();
1422 if candidates.is_empty() {
1423 return None;
1424 }
1425 let (sequence, candidate) = candidates[0];
1427 (candidate.address, sequence)
1428 };
1429
1430 let challenge = self.rng.r#gen::<u64>();
1431
1432 if let Err(e) =
1434 self.nat_traversal
1435 .as_mut()?
1436 .start_validation(remote_sequence, challenge, now)
1437 {
1438 warn!("Failed to start NAT traversal validation: {}", e);
1439 return None;
1440 }
1441
1442 debug_assert_eq!(
1443 self.highest_space,
1444 SpaceId::Data,
1445 "PATH_CHALLENGE queued without 1-RTT keys"
1446 );
1447
1448 buf.reserve(self.pqc_state.min_initial_size() as usize);
1449 let buf_capacity = buf.capacity();
1450
1451 let mut builder = PacketBuilder::new(
1453 now,
1454 SpaceId::Data,
1455 self.rem_cids.active(),
1456 buf,
1457 buf_capacity,
1458 0,
1459 false,
1460 self,
1461 )?;
1462
1463 trace!(
1464 "sending PATH_CHALLENGE {:08x} to NAT candidate {}",
1465 challenge, remote_addr
1466 );
1467 if !self.encode_or_close(
1468 now,
1469 frame::FrameType::PATH_CHALLENGE.try_encode(buf),
1470 "PATH_CHALLENGE (nat)",
1471 ) {
1472 return None;
1473 }
1474 buf.write(challenge);
1475 self.stats.frame_tx.path_challenge += 1;
1476
1477 let min_size = self.pqc_state.min_initial_size();
1479 builder.pad_to(min_size);
1480
1481 builder.finish_and_track(now, self, None, buf);
1482
1483 Some(Transmit {
1484 destination: remote_addr,
1485 size: buf.len(),
1486 ecn: if self.path.sending_ecn {
1487 Some(EcnCodepoint::Ect0)
1488 } else {
1489 None
1490 },
1491 segment_size: None,
1492 src_ip: self.local_ip,
1493 })
1494 }
1495
1496 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1498 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1499 if !prev_path.challenge_pending {
1500 return None;
1501 }
1502 prev_path.challenge_pending = false;
1503 let token = prev_path
1504 .challenge
1505 .expect("previous path challenge pending without token");
1506 let destination = prev_path.remote;
1507 debug_assert_eq!(
1508 self.highest_space,
1509 SpaceId::Data,
1510 "PATH_CHALLENGE queued without 1-RTT keys"
1511 );
1512 buf.reserve(self.pqc_state.min_initial_size() as usize);
1513
1514 let buf_capacity = buf.capacity();
1515
1516 let mut builder = PacketBuilder::new(
1522 now,
1523 SpaceId::Data,
1524 *prev_cid,
1525 buf,
1526 buf_capacity,
1527 0,
1528 false,
1529 self,
1530 )?;
1531 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1532 if !self.encode_or_close(
1533 now,
1534 frame::FrameType::PATH_CHALLENGE.try_encode(buf),
1535 "PATH_CHALLENGE (prev path)",
1536 ) {
1537 return None;
1538 }
1539 buf.write(token);
1540 self.stats.frame_tx.path_challenge += 1;
1541
1542 let min_size = self.pqc_state.min_initial_size();
1547 builder.pad_to(min_size);
1548
1549 builder.finish(self, buf);
1550 self.stats.udp_tx.on_sent(1, buf.len());
1551
1552 Some(Transmit {
1553 destination,
1554 size: buf.len(),
1555 ecn: None,
1556 segment_size: None,
1557 src_ip: self.local_ip,
1558 })
1559 }
1560
1561 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1563 if self.spaces[space_id].crypto.is_none()
1564 && (space_id != SpaceId::Data
1565 || self.zero_rtt_crypto.is_none()
1566 || self.side.is_server())
1567 {
1568 return SendableFrames::empty();
1570 }
1571 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1572 if space_id == SpaceId::Data {
1573 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1574 }
1575 can_send
1576 }
1577
1578 pub fn handle_event(&mut self, event: ConnectionEvent) {
1584 use ConnectionEventInner::*;
1585 match event.0 {
1586 Datagram(DatagramConnectionEvent {
1587 now,
1588 remote,
1589 ecn,
1590 first_decode,
1591 remaining,
1592 }) => {
1593 if remote != self.path.remote && !self.side.remote_may_migrate() {
1597 trace!("discarding packet from unrecognized peer {}", remote);
1598 return;
1599 }
1600
1601 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1602
1603 self.stats.udp_rx.datagrams += 1;
1604 self.stats.udp_rx.bytes += first_decode.len() as u64;
1605 let data_len = first_decode.len();
1606
1607 self.handle_decode(now, remote, ecn, first_decode);
1608 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1613
1614 if let Some(data) = remaining {
1615 self.stats.udp_rx.bytes += data.len() as u64;
1616 self.handle_coalesced(now, remote, ecn, data);
1617 }
1618
1619 #[cfg(feature = "__qlog")]
1620 self.emit_qlog_recovery_metrics(now);
1621
1622 if was_anti_amplification_blocked {
1623 self.set_loss_detection_timer(now);
1627 }
1628 }
1629 NewIdentifiers(ids, now) => {
1630 self.local_cid_state.new_cids(&ids, now);
1631 ids.into_iter().rev().for_each(|frame| {
1632 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1633 });
1634 if self.timers.get(Timer::PushNewCid).is_none_or(|x| x <= now) {
1636 self.reset_cid_retirement();
1637 }
1638 }
1639 QueueAddAddress(add) => {
1640 self.spaces[SpaceId::Data].pending.add_addresses.push(add);
1642 }
1643 QueuePunchMeNow(punch) => {
1644 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1646 }
1647 }
1648 }
1649
1650 pub fn handle_timeout(&mut self, now: Instant) {
1660 for &timer in &Timer::VALUES {
1661 if !self.timers.is_expired(timer, now) {
1662 continue;
1663 }
1664 self.timers.stop(timer);
1665 trace!(timer = ?timer, "timeout");
1666 match timer {
1667 Timer::Close => {
1668 self.state = State::Drained;
1669 self.endpoint_events.push_back(EndpointEventInner::Drained);
1670 }
1671 Timer::Idle => {
1672 self.kill(ConnectionError::TimedOut);
1673 }
1674 Timer::KeepAlive => {
1675 trace!("sending keep-alive");
1676 self.ping();
1677 }
1678 Timer::LossDetection => {
1679 self.on_loss_detection_timeout(now);
1680
1681 #[cfg(feature = "__qlog")]
1682 self.emit_qlog_recovery_metrics(now);
1683 }
1684 Timer::KeyDiscard => {
1685 self.zero_rtt_crypto = None;
1686 self.prev_crypto = None;
1687 }
1688 Timer::PathValidation => {
1689 debug!("path validation failed");
1690 if let Some((_, prev)) = self.prev_path.take() {
1691 self.path = prev;
1692 }
1693 self.path.challenge = None;
1694 self.path.challenge_pending = false;
1695 }
1696 Timer::Pacing => trace!("pacing timer expired"),
1697 Timer::NatTraversal => {
1698 self.handle_nat_traversal_timeout(now);
1699 }
1700 Timer::PushNewCid => {
1701 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1703 if !self.state.is_closed() {
1704 trace!(
1705 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1706 self.local_cid_state.retire_prior_to()
1707 );
1708 self.endpoint_events
1709 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1710 }
1711 }
1712 Timer::MaxAckDelay => {
1713 trace!("max ack delay reached");
1714 self.spaces[SpaceId::Data]
1716 .pending_acks
1717 .on_max_ack_delay_timeout()
1718 }
1719 }
1720 }
1721 }
1722
1723 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1735 self.close_inner(
1736 now,
1737 Close::Application(frame::ApplicationClose { error_code, reason }),
1738 )
1739 }
1740
1741 fn close_inner(&mut self, now: Instant, reason: Close) {
1742 let was_closed = self.state.is_closed();
1743 if !was_closed {
1744 self.close_common();
1745 self.set_close_timer(now);
1746 self.close = true;
1747 self.state = State::Closed(state::Closed { reason });
1748 }
1749 }
1750
1751 pub fn datagrams(&mut self) -> Datagrams<'_> {
1753 Datagrams { conn: self }
1754 }
1755
1756 pub fn stats(&self) -> ConnectionStats {
1758 let mut stats = self.stats;
1759 stats.path.rtt = self.path.rtt.get();
1760 stats.path.cwnd = self.path.congestion.window();
1761 stats.path.current_mtu = self.path.mtud.current_mtu();
1762
1763 stats
1764 }
1765
1766 pub fn set_token_binding_peer_id(&mut self, pid: PeerId) {
1768 self.peer_id_for_tokens = Some(pid);
1769 }
1770
1771 pub fn set_delay_new_token_until_binding(&mut self, v: bool) {
1773 self.delay_new_token_until_binding = v;
1774 }
1775
1776 pub fn ping(&mut self) {
1780 self.spaces[self.highest_space].ping_pending = true;
1781 }
1782
1783 pub(crate) fn is_pqc(&self) -> bool {
1785 self.pqc_state.using_pqc
1786 }
1787
1788 pub fn force_key_update(&mut self) {
1792 if !self.state.is_established() {
1793 debug!("ignoring forced key update in illegal state");
1794 return;
1795 }
1796 if self.prev_crypto.is_some() {
1797 debug!("ignoring redundant forced key update");
1800 return;
1801 }
1802 self.update_keys(None, false);
1803 }
1804
1805 pub fn crypto_session(&self) -> &dyn crypto::Session {
1807 &*self.crypto
1808 }
1809
1810 pub fn is_handshaking(&self) -> bool {
1815 self.state.is_handshake()
1816 }
1817
1818 pub fn is_closed(&self) -> bool {
1826 self.state.is_closed()
1827 }
1828
1829 pub fn is_drained(&self) -> bool {
1834 self.state.is_drained()
1835 }
1836
1837 pub fn accepted_0rtt(&self) -> bool {
1841 self.accepted_0rtt
1842 }
1843
1844 pub fn has_0rtt(&self) -> bool {
1846 self.zero_rtt_enabled
1847 }
1848
1849 pub fn has_pending_retransmits(&self) -> bool {
1851 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1852 }
1853
1854 pub fn side(&self) -> Side {
1856 self.side.side()
1857 }
1858
1859 pub fn remote_address(&self) -> SocketAddr {
1861 self.path.remote
1862 }
1863
1864 pub fn local_ip(&self) -> Option<IpAddr> {
1874 self.local_ip
1875 }
1876
1877 pub fn rtt(&self) -> Duration {
1879 self.path.rtt.get()
1880 }
1881
1882 pub fn congestion_state(&self) -> &dyn Controller {
1884 self.path.congestion.as_ref()
1885 }
1886
1887 pub fn path_changed(&mut self, now: Instant) {
1898 self.path.reset(now, &self.config);
1899 }
1900
1901 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1906 self.streams.set_max_concurrent(dir, count);
1907 let pending = &mut self.spaces[SpaceId::Data].pending;
1910 self.streams.queue_max_stream_id(pending);
1911 }
1912
1913 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1919 self.streams.max_concurrent(dir)
1920 }
1921
1922 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1924 if self.streams.set_receive_window(receive_window) {
1925 self.spaces[SpaceId::Data].pending.max_data = true;
1926 }
1927 }
1928
1929 pub fn set_address_discovery_enabled(&mut self, enabled: bool) {
1931 if let Some(ref mut state) = self.address_discovery_state {
1932 state.enabled = enabled;
1933 }
1934 }
1935
1936 pub fn address_discovery_enabled(&self) -> bool {
1938 self.address_discovery_state
1939 .as_ref()
1940 .is_some_and(|state| state.enabled)
1941 }
1942
1943 pub fn observed_address(&self) -> Option<SocketAddr> {
1948 self.address_discovery_state
1949 .as_ref()
1950 .and_then(|state| state.get_observed_address(0)) }
1952
1953 pub fn all_observed_addresses(&self) -> Vec<SocketAddr> {
1958 self.address_discovery_state
1959 .as_ref()
1960 .map(|state| state.get_all_received_history())
1961 .unwrap_or_default()
1962 }
1963
1964 #[allow(dead_code)]
1966 pub(crate) fn address_discovery_state(&self) -> Option<&AddressDiscoveryState> {
1967 self.address_discovery_state.as_ref()
1968 }
1969
1970 fn on_ack_received(
1971 &mut self,
1972 now: Instant,
1973 space: SpaceId,
1974 ack: frame::Ack,
1975 ) -> Result<(), TransportError> {
1976 if ack.largest >= self.spaces[space].next_packet_number {
1977 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1978 }
1979 let new_largest = {
1980 let space = &mut self.spaces[space];
1981 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
1982 space.largest_acked_packet = Some(ack.largest);
1983 if let Some(info) = space.sent_packets.get(&ack.largest) {
1984 space.largest_acked_packet_sent = info.time_sent;
1988 }
1989 true
1990 } else {
1991 false
1992 }
1993 };
1994
1995 let mut newly_acked = ArrayRangeSet::new();
1997 for range in ack.iter() {
1998 self.packet_number_filter.check_ack(space, range.clone())?;
1999 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
2000 newly_acked.insert_one(pn);
2001 }
2002 }
2003
2004 if newly_acked.is_empty() {
2005 return Ok(());
2006 }
2007
2008 let mut ack_eliciting_acked = false;
2009 for packet in newly_acked.elts() {
2010 if let Some(info) = self.spaces[space].take(packet) {
2011 if let Some(acked) = info.largest_acked {
2012 self.spaces[space].pending_acks.subtract_below(acked);
2018 }
2019 ack_eliciting_acked |= info.ack_eliciting;
2020
2021 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
2023 if mtu_updated {
2024 self.path
2025 .congestion
2026 .on_mtu_update(self.path.mtud.current_mtu());
2027 }
2028
2029 self.ack_frequency.on_acked(packet);
2031
2032 self.on_packet_acked(now, packet, info);
2033 }
2034 }
2035
2036 self.path.congestion.on_end_acks(
2037 now,
2038 self.path.in_flight.bytes,
2039 self.app_limited,
2040 self.spaces[space].largest_acked_packet,
2041 );
2042
2043 if new_largest && ack_eliciting_acked {
2044 let ack_delay = if space != SpaceId::Data {
2045 Duration::from_micros(0)
2046 } else {
2047 cmp::min(
2048 self.ack_frequency.peer_max_ack_delay,
2049 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2050 )
2051 };
2052 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
2053 self.path.rtt.update(ack_delay, rtt);
2054 if self.path.first_packet_after_rtt_sample.is_none() {
2055 self.path.first_packet_after_rtt_sample =
2056 Some((space, self.spaces[space].next_packet_number));
2057 }
2058 }
2059
2060 self.detect_lost_packets(now, space, true);
2062
2063 if self.peer_completed_address_validation() {
2064 self.pto_count = 0;
2065 }
2066
2067 if self.path.sending_ecn {
2069 if let Some(ecn) = ack.ecn {
2070 if new_largest {
2075 let sent = self.spaces[space].largest_acked_packet_sent;
2076 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
2077 }
2078 } else {
2079 debug!("ECN not acknowledged by peer");
2081 self.path.sending_ecn = false;
2082 }
2083 }
2084
2085 self.set_loss_detection_timer(now);
2086 Ok(())
2087 }
2088
2089 fn process_ecn(
2091 &mut self,
2092 now: Instant,
2093 space: SpaceId,
2094 newly_acked: u64,
2095 ecn: frame::EcnCounts,
2096 largest_sent_time: Instant,
2097 ) {
2098 match self.spaces[space].detect_ecn(newly_acked, ecn) {
2099 Err(e) => {
2100 debug!("halting ECN due to verification failure: {}", e);
2101 self.path.sending_ecn = false;
2102 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
2105 }
2106 Ok(false) => {}
2107 Ok(true) => {
2108 self.stats.path.congestion_events += 1;
2109 self.path
2110 .congestion
2111 .on_congestion_event(now, largest_sent_time, false, 0);
2112 }
2113 }
2114 }
2115
2116 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
2119 self.remove_in_flight(pn, &info);
2120 if info.ack_eliciting && self.path.challenge.is_none() {
2121 self.path.congestion.on_ack(
2124 now,
2125 info.time_sent,
2126 info.size.into(),
2127 self.app_limited,
2128 &self.path.rtt,
2129 );
2130 }
2131
2132 if let Some(retransmits) = info.retransmits.get() {
2134 for (id, _) in retransmits.reset_stream.iter() {
2135 self.streams.reset_acked(*id);
2136 }
2137 }
2138
2139 for frame in info.stream_frames {
2140 self.streams.received_ack_of(frame);
2141 }
2142 }
2143
2144 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2145 let start = if self.zero_rtt_crypto.is_some() {
2146 now
2147 } else {
2148 self.prev_crypto
2149 .as_ref()
2150 .expect("no previous keys")
2151 .end_packet
2152 .as_ref()
2153 .expect("update not acknowledged yet")
2154 .1
2155 };
2156 self.timers
2157 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
2158 }
2159
2160 fn on_loss_detection_timeout(&mut self, now: Instant) {
2161 if let Some((_, pn_space)) = self.loss_time_and_space() {
2162 self.detect_lost_packets(now, pn_space, false);
2164 self.set_loss_detection_timer(now);
2165 return;
2166 }
2167
2168 let (_, space) = match self.pto_time_and_space(now) {
2169 Some(x) => x,
2170 None => {
2171 error!("PTO expired while unset");
2172 return;
2173 }
2174 };
2175 trace!(
2176 in_flight = self.path.in_flight.bytes,
2177 count = self.pto_count,
2178 ?space,
2179 "PTO fired"
2180 );
2181
2182 let count = match self.path.in_flight.ack_eliciting {
2183 0 => {
2186 debug_assert!(!self.peer_completed_address_validation());
2187 1
2188 }
2189 _ => 2,
2191 };
2192 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
2193 self.pto_count = self.pto_count.saturating_add(1);
2194 self.set_loss_detection_timer(now);
2195 }
2196
2197 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
2198 let mut lost_packets = Vec::<u64>::new();
2199 let mut lost_mtu_probe = None;
2200 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
2201 let rtt = self.path.rtt.conservative();
2202 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
2203
2204 let lost_send_time = now.checked_sub(loss_delay).unwrap();
2206 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
2207 let packet_threshold = self.config.packet_threshold as u64;
2208 let mut size_of_lost_packets = 0u64;
2209
2210 let congestion_period =
2214 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
2215 let mut persistent_congestion_start: Option<Instant> = None;
2216 let mut prev_packet = None;
2217 let mut in_persistent_congestion = false;
2218
2219 let space = &mut self.spaces[pn_space];
2220 space.loss_time = None;
2221
2222 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
2223 if prev_packet != Some(packet.wrapping_sub(1)) {
2224 persistent_congestion_start = None;
2226 }
2227
2228 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
2229 {
2230 if Some(packet) == in_flight_mtu_probe {
2231 lost_mtu_probe = in_flight_mtu_probe;
2234 } else {
2235 lost_packets.push(packet);
2236 size_of_lost_packets += info.size as u64;
2237 if info.ack_eliciting && due_to_ack {
2238 match persistent_congestion_start {
2239 Some(start) if info.time_sent - start > congestion_period => {
2242 in_persistent_congestion = true;
2243 }
2244 None if self
2246 .path
2247 .first_packet_after_rtt_sample
2248 .is_some_and(|x| x < (pn_space, packet)) =>
2249 {
2250 persistent_congestion_start = Some(info.time_sent);
2251 }
2252 _ => {}
2253 }
2254 }
2255 }
2256 } else {
2257 let next_loss_time = info.time_sent + loss_delay;
2258 space.loss_time = Some(
2259 space
2260 .loss_time
2261 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
2262 );
2263 persistent_congestion_start = None;
2264 }
2265
2266 prev_packet = Some(packet);
2267 }
2268
2269 if let Some(largest_lost) = lost_packets.last().cloned() {
2271 let old_bytes_in_flight = self.path.in_flight.bytes;
2272 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
2273 self.lost_packets += lost_packets.len() as u64;
2274 self.stats.path.lost_packets += lost_packets.len() as u64;
2275 self.stats.path.lost_bytes += size_of_lost_packets;
2276 trace!(
2277 "packets lost: {:?}, bytes lost: {}",
2278 lost_packets, size_of_lost_packets
2279 );
2280
2281 for &packet in &lost_packets {
2282 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2284 for frame in info.stream_frames {
2285 self.streams.retransmit(frame);
2286 }
2287 self.spaces[pn_space].pending |= info.retransmits;
2288 self.path.mtud.on_non_probe_lost(packet, info.size);
2289 }
2290
2291 if self.path.mtud.black_hole_detected(now) {
2292 self.stats.path.black_holes_detected += 1;
2293 self.path
2294 .congestion
2295 .on_mtu_update(self.path.mtud.current_mtu());
2296 if let Some(max_datagram_size) = self.datagrams().max_size() {
2297 self.datagrams.drop_oversized(max_datagram_size);
2298 }
2299 }
2300
2301 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
2303
2304 if lost_ack_eliciting {
2305 self.stats.path.congestion_events += 1;
2306 self.path.congestion.on_congestion_event(
2307 now,
2308 largest_lost_sent,
2309 in_persistent_congestion,
2310 size_of_lost_packets,
2311 );
2312 }
2313 }
2314
2315 if let Some(packet) = lost_mtu_probe {
2317 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2319 self.path.mtud.on_probe_lost();
2320 self.stats.path.lost_plpmtud_probes += 1;
2321 }
2322 }
2323
2324 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
2325 SpaceId::iter()
2326 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
2327 .min_by_key(|&(time, _)| time)
2328 }
2329
2330 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
2331 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
2332 let mut duration = self.path.rtt.pto_base() * backoff;
2333
2334 if self.path.in_flight.ack_eliciting == 0 {
2335 debug_assert!(!self.peer_completed_address_validation());
2336 let space = match self.highest_space {
2337 SpaceId::Handshake => SpaceId::Handshake,
2338 _ => SpaceId::Initial,
2339 };
2340 return Some((now + duration, space));
2341 }
2342
2343 let mut result = None;
2344 for space in SpaceId::iter() {
2345 if self.spaces[space].in_flight == 0 {
2346 continue;
2347 }
2348 if space == SpaceId::Data {
2349 if self.is_handshaking() {
2351 return result;
2352 }
2353 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2355 }
2356 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
2357 Some(time) => time,
2358 None => continue,
2359 };
2360 let pto = last_ack_eliciting + duration;
2361 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2362 result = Some((pto, space));
2363 }
2364 }
2365 result
2366 }
2367
2368 fn peer_completed_address_validation(&self) -> bool {
2369 if self.side.is_server() || self.state.is_closed() {
2370 return true;
2371 }
2372 self.spaces[SpaceId::Handshake]
2375 .largest_acked_packet
2376 .is_some()
2377 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
2378 || (self.spaces[SpaceId::Data].crypto.is_some()
2379 && self.spaces[SpaceId::Handshake].crypto.is_none())
2380 }
2381
2382 fn set_loss_detection_timer(&mut self, now: Instant) {
2383 if self.state.is_closed() {
2384 return;
2388 }
2389
2390 if let Some((loss_time, _)) = self.loss_time_and_space() {
2391 self.timers.set(Timer::LossDetection, loss_time);
2393 return;
2394 }
2395
2396 if self.path.anti_amplification_blocked(1) {
2397 self.timers.stop(Timer::LossDetection);
2399 return;
2400 }
2401
2402 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
2403 self.timers.stop(Timer::LossDetection);
2406 return;
2407 }
2408
2409 if let Some((timeout, _)) = self.pto_time_and_space(now) {
2412 self.timers.set(Timer::LossDetection, timeout);
2413 } else {
2414 self.timers.stop(Timer::LossDetection);
2415 }
2416 }
2417
2418 fn pto(&self, space: SpaceId) -> Duration {
2420 let max_ack_delay = match space {
2421 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
2422 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
2423 };
2424 self.path.rtt.pto_base() + max_ack_delay
2425 }
2426
2427 fn on_packet_authenticated(
2428 &mut self,
2429 now: Instant,
2430 space_id: SpaceId,
2431 ecn: Option<EcnCodepoint>,
2432 packet: Option<u64>,
2433 spin: bool,
2434 is_1rtt: bool,
2435 ) {
2436 self.total_authed_packets += 1;
2437 self.reset_keep_alive(now);
2438 self.reset_idle_timeout(now, space_id);
2439 self.permit_idle_reset = true;
2440 self.receiving_ecn |= ecn.is_some();
2441 if let Some(x) = ecn {
2442 let space = &mut self.spaces[space_id];
2443 space.ecn_counters += x;
2444
2445 if x.is_ce() {
2446 space.pending_acks.set_immediate_ack_required();
2447 }
2448 }
2449
2450 let packet = match packet {
2451 Some(x) => x,
2452 None => return,
2453 };
2454 if self.side.is_server() {
2455 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
2456 self.discard_space(now, SpaceId::Initial);
2458 }
2459 if self.zero_rtt_crypto.is_some() && is_1rtt {
2460 self.set_key_discard_timer(now, space_id)
2462 }
2463 }
2464 let space = &mut self.spaces[space_id];
2465 space.pending_acks.insert_one(packet, now);
2466 if packet >= space.rx_packet {
2467 space.rx_packet = packet;
2468 self.spin = self.side.is_client() ^ spin;
2470 }
2471 }
2472
2473 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
2474 let timeout = match self.idle_timeout {
2475 None => return,
2476 Some(dur) => dur,
2477 };
2478 if self.state.is_closed() {
2479 self.timers.stop(Timer::Idle);
2480 return;
2481 }
2482 let dt = cmp::max(timeout, 3 * self.pto(space));
2483 self.timers.set(Timer::Idle, now + dt);
2484 }
2485
2486 fn reset_keep_alive(&mut self, now: Instant) {
2487 let interval = match self.config.keep_alive_interval {
2488 Some(x) if self.state.is_established() => x,
2489 _ => return,
2490 };
2491 self.timers.set(Timer::KeepAlive, now + interval);
2492 }
2493
2494 fn reset_cid_retirement(&mut self) {
2495 if let Some(t) = self.local_cid_state.next_timeout() {
2496 self.timers.set(Timer::PushNewCid, t);
2497 }
2498 }
2499
2500 pub(crate) fn handle_first_packet(
2505 &mut self,
2506 now: Instant,
2507 remote: SocketAddr,
2508 ecn: Option<EcnCodepoint>,
2509 packet_number: u64,
2510 packet: InitialPacket,
2511 remaining: Option<BytesMut>,
2512 ) -> Result<(), ConnectionError> {
2513 let span = trace_span!("first recv");
2514 let _guard = span.enter();
2515 debug_assert!(self.side.is_server());
2516 let len = packet.header_data.len() + packet.payload.len();
2517 self.path.total_recvd = len as u64;
2518
2519 match self.state {
2520 State::Handshake(ref mut state) => {
2521 state.expected_token = packet.header.token.clone();
2522 }
2523 _ => unreachable!("first packet must be delivered in Handshake state"),
2524 }
2525
2526 self.on_packet_authenticated(
2527 now,
2528 SpaceId::Initial,
2529 ecn,
2530 Some(packet_number),
2531 false,
2532 false,
2533 );
2534
2535 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2536 if let Some(data) = remaining {
2537 self.handle_coalesced(now, remote, ecn, data);
2538 }
2539
2540 #[cfg(feature = "__qlog")]
2541 self.emit_qlog_recovery_metrics(now);
2542
2543 Ok(())
2544 }
2545
2546 fn init_0rtt(&mut self) {
2547 let (header, packet) = match self.crypto.early_crypto() {
2548 Some(x) => x,
2549 None => return,
2550 };
2551 if self.side.is_client() {
2552 match self.crypto.transport_parameters() {
2553 Ok(params) => {
2554 let params = params
2555 .expect("crypto layer didn't supply transport parameters with ticket");
2556 let params = TransportParameters {
2558 initial_src_cid: None,
2559 original_dst_cid: None,
2560 preferred_address: None,
2561 retry_src_cid: None,
2562 stateless_reset_token: None,
2563 min_ack_delay: None,
2564 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2565 max_ack_delay: TransportParameters::default().max_ack_delay,
2566 ..params
2567 };
2568 self.set_peer_params(params);
2569 }
2570 Err(e) => {
2571 error!("session ticket has malformed transport parameters: {}", e);
2572 return;
2573 }
2574 }
2575 }
2576 trace!("0-RTT enabled");
2577 self.zero_rtt_enabled = true;
2578 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2579 }
2580
2581 fn read_crypto(
2582 &mut self,
2583 space: SpaceId,
2584 crypto: &frame::Crypto,
2585 payload_len: usize,
2586 ) -> Result<(), TransportError> {
2587 let expected = if !self.state.is_handshake() {
2588 SpaceId::Data
2589 } else if self.highest_space == SpaceId::Initial {
2590 SpaceId::Initial
2591 } else {
2592 SpaceId::Handshake
2595 };
2596 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2600
2601 let end = crypto.offset + crypto.data.len() as u64;
2602 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2603 warn!(
2604 "received new {:?} CRYPTO data when expecting {:?}",
2605 space, expected
2606 );
2607 return Err(TransportError::PROTOCOL_VIOLATION(
2608 "new data at unexpected encryption level",
2609 ));
2610 }
2611
2612 self.pqc_state.detect_pqc_from_crypto(&crypto.data, space);
2614
2615 if self.pqc_state.should_trigger_mtu_discovery() {
2617 self.path
2619 .mtud
2620 .reset(self.pqc_state.min_initial_size(), self.config.min_mtu);
2621 trace!("Triggered MTU discovery for PQC handshake");
2622 }
2623
2624 let space = &mut self.spaces[space];
2625 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2626 if max > self.config.crypto_buffer_size as u64 {
2627 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2628 }
2629
2630 space
2631 .crypto_stream
2632 .insert(crypto.offset, crypto.data.clone(), payload_len);
2633 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2634 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2635 if self.crypto.read_handshake(&chunk.bytes)? {
2636 self.events.push_back(Event::HandshakeDataReady);
2637 }
2638 }
2639
2640 Ok(())
2641 }
2642
2643 fn write_crypto(&mut self) {
2644 loop {
2645 let space = self.highest_space;
2646 let mut outgoing = Vec::new();
2647 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2648 match space {
2649 SpaceId::Initial => {
2650 self.upgrade_crypto(SpaceId::Handshake, crypto);
2651 }
2652 SpaceId::Handshake => {
2653 self.upgrade_crypto(SpaceId::Data, crypto);
2654 }
2655 _ => unreachable!("got updated secrets during 1-RTT"),
2656 }
2657 }
2658 if outgoing.is_empty() {
2659 if space == self.highest_space {
2660 break;
2661 } else {
2662 continue;
2664 }
2665 }
2666 let offset = self.spaces[space].crypto_offset;
2667 let outgoing = Bytes::from(outgoing);
2668 if let State::Handshake(ref mut state) = self.state {
2669 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2670 state.client_hello = Some(outgoing.clone());
2671 }
2672 }
2673 self.spaces[space].crypto_offset += outgoing.len() as u64;
2674 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2675
2676 let use_pqc_fragmentation = self.pqc_state.using_pqc && outgoing.len() > 1200;
2678
2679 if use_pqc_fragmentation {
2680 let frames = self.pqc_state.packet_handler.fragment_crypto_data(
2682 &outgoing,
2683 offset,
2684 self.pqc_state.min_initial_size() as usize,
2685 );
2686 for frame in frames {
2687 self.spaces[space].pending.crypto.push_back(frame);
2688 }
2689 } else {
2690 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2692 offset,
2693 data: outgoing,
2694 });
2695 }
2696 }
2697 }
2698
2699 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2701 debug_assert!(
2702 self.spaces[space].crypto.is_none(),
2703 "already reached packet space {space:?}"
2704 );
2705 trace!("{:?} keys ready", space);
2706 if space == SpaceId::Data {
2707 self.next_crypto = Some(
2709 self.crypto
2710 .next_1rtt_keys()
2711 .expect("handshake should be complete"),
2712 );
2713 }
2714
2715 self.spaces[space].crypto = Some(crypto);
2716 debug_assert!(space as usize > self.highest_space as usize);
2717 self.highest_space = space;
2718 if space == SpaceId::Data && self.side.is_client() {
2719 self.zero_rtt_crypto = None;
2721 }
2722 }
2723
2724 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2725 debug_assert!(space_id != SpaceId::Data);
2726 trace!("discarding {:?} keys", space_id);
2727 if space_id == SpaceId::Initial {
2728 if let ConnectionSide::Client { token, .. } = &mut self.side {
2730 *token = Bytes::new();
2731 }
2732 }
2733 let space = &mut self.spaces[space_id];
2734 space.crypto = None;
2735 space.time_of_last_ack_eliciting_packet = None;
2736 space.loss_time = None;
2737 space.in_flight = 0;
2738 let sent_packets = mem::take(&mut space.sent_packets);
2739 for (pn, packet) in sent_packets.into_iter() {
2740 self.remove_in_flight(pn, &packet);
2741 }
2742 self.set_loss_detection_timer(now)
2743 }
2744
2745 fn handle_coalesced(
2746 &mut self,
2747 now: Instant,
2748 remote: SocketAddr,
2749 ecn: Option<EcnCodepoint>,
2750 data: BytesMut,
2751 ) {
2752 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2753 let mut remaining = Some(data);
2754 while let Some(data) = remaining {
2755 match PartialDecode::new(
2756 data,
2757 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2758 &[self.version],
2759 self.endpoint_config.grease_quic_bit,
2760 ) {
2761 Ok((partial_decode, rest)) => {
2762 remaining = rest;
2763 self.handle_decode(now, remote, ecn, partial_decode);
2764 }
2765 Err(e) => {
2766 trace!("malformed header: {}", e);
2767 return;
2768 }
2769 }
2770 }
2771 }
2772
2773 fn handle_decode(
2774 &mut self,
2775 now: Instant,
2776 remote: SocketAddr,
2777 ecn: Option<EcnCodepoint>,
2778 partial_decode: PartialDecode,
2779 ) {
2780 if let Some(decoded) = packet_crypto::unprotect_header(
2781 partial_decode,
2782 &self.spaces,
2783 self.zero_rtt_crypto.as_ref(),
2784 self.peer_params.stateless_reset_token,
2785 ) {
2786 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2787 }
2788 }
2789
2790 fn handle_packet(
2791 &mut self,
2792 now: Instant,
2793 remote: SocketAddr,
2794 ecn: Option<EcnCodepoint>,
2795 packet: Option<Packet>,
2796 stateless_reset: bool,
2797 ) {
2798 self.stats.udp_rx.ios += 1;
2799 if let Some(ref packet) = packet {
2800 trace!(
2801 "got {:?} packet ({} bytes) from {} using id {}",
2802 packet.header.space(),
2803 packet.payload.len() + packet.header_data.len(),
2804 remote,
2805 packet.header.dst_cid(),
2806 );
2807
2808 #[cfg(feature = "trace")]
2810 {
2811 use crate::trace_packet_received;
2812 let packet_size = packet.payload.len() + packet.header_data.len();
2814 trace_packet_received!(
2815 &self.event_log,
2816 self.trace_context.trace_id(),
2817 packet_size as u32,
2818 0 );
2820 }
2821 }
2822
2823 if self.is_handshaking() && remote != self.path.remote {
2824 debug!("discarding packet with unexpected remote during handshake");
2825 return;
2826 }
2827
2828 let was_closed = self.state.is_closed();
2829 let was_drained = self.state.is_drained();
2830
2831 let decrypted = match packet {
2832 None => Err(None),
2833 Some(mut packet) => self
2834 .decrypt_packet(now, &mut packet)
2835 .map(move |number| (packet, number)),
2836 };
2837 let result = match decrypted {
2838 _ if stateless_reset => {
2839 debug!("got stateless reset");
2840 Err(ConnectionError::Reset)
2841 }
2842 Err(Some(e)) => {
2843 warn!("illegal packet: {}", e);
2844 Err(e.into())
2845 }
2846 Err(None) => {
2847 debug!("failed to authenticate packet");
2848 self.authentication_failures += 1;
2849 let integrity_limit = self.spaces[self.highest_space]
2850 .crypto
2851 .as_ref()
2852 .unwrap()
2853 .packet
2854 .local
2855 .integrity_limit();
2856 if self.authentication_failures > integrity_limit {
2857 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2858 } else {
2859 return;
2860 }
2861 }
2862 Ok((packet, number)) => {
2863 let span = match number {
2864 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2865 None => trace_span!("recv", space = ?packet.header.space()),
2866 };
2867 let _guard = span.enter();
2868
2869 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2870 if number.is_some_and(is_duplicate) {
2871 debug!("discarding possible duplicate packet");
2872 return;
2873 } else if self.state.is_handshake() && packet.header.is_short() {
2874 trace!("dropping short packet during handshake");
2876 return;
2877 } else {
2878 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2879 if let State::Handshake(ref hs) = self.state {
2880 if self.side.is_server() && token != &hs.expected_token {
2881 warn!("discarding Initial with invalid retry token");
2885 return;
2886 }
2887 }
2888 }
2889
2890 if !self.state.is_closed() {
2891 let spin = match packet.header {
2892 Header::Short { spin, .. } => spin,
2893 _ => false,
2894 };
2895 self.on_packet_authenticated(
2896 now,
2897 packet.header.space(),
2898 ecn,
2899 number,
2900 spin,
2901 packet.header.is_1rtt(),
2902 );
2903 }
2904
2905 self.process_decrypted_packet(now, remote, number, packet)
2906 }
2907 }
2908 };
2909
2910 if let Err(conn_err) = result {
2912 self.error = Some(conn_err.clone());
2913 self.state = match conn_err {
2914 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2915 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2916 ConnectionError::Reset
2917 | ConnectionError::TransportError(TransportError {
2918 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2919 ..
2920 }) => State::Drained,
2921 ConnectionError::TimedOut => {
2922 unreachable!("timeouts aren't generated by packet processing");
2923 }
2924 ConnectionError::TransportError(err) => {
2925 debug!("closing connection due to transport error: {}", err);
2926 State::closed(err)
2927 }
2928 ConnectionError::VersionMismatch => State::Draining,
2929 ConnectionError::LocallyClosed => {
2930 unreachable!("LocallyClosed isn't generated by packet processing");
2931 }
2932 ConnectionError::CidsExhausted => {
2933 unreachable!("CidsExhausted isn't generated by packet processing");
2934 }
2935 };
2936 }
2937
2938 if !was_closed && self.state.is_closed() {
2939 self.close_common();
2940 if !self.state.is_drained() {
2941 self.set_close_timer(now);
2942 }
2943 }
2944 if !was_drained && self.state.is_drained() {
2945 self.endpoint_events.push_back(EndpointEventInner::Drained);
2946 self.timers.stop(Timer::Close);
2949 }
2950
2951 if let State::Closed(_) = self.state {
2953 self.close = remote == self.path.remote;
2954 }
2955 }
2956
2957 fn process_decrypted_packet(
2958 &mut self,
2959 now: Instant,
2960 remote: SocketAddr,
2961 number: Option<u64>,
2962 packet: Packet,
2963 ) -> Result<(), ConnectionError> {
2964 let state = match self.state {
2965 State::Established => {
2966 match packet.header.space() {
2967 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2968 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2969 _ => {
2970 trace!("discarding unexpected pre-handshake packet");
2971 }
2972 }
2973 return Ok(());
2974 }
2975 State::Closed(_) => {
2976 for result in frame::Iter::new(packet.payload.freeze())? {
2977 let frame = match result {
2978 Ok(frame) => frame,
2979 Err(err) => {
2980 debug!("frame decoding error: {err:?}");
2981 continue;
2982 }
2983 };
2984
2985 if let Frame::Padding = frame {
2986 continue;
2987 };
2988
2989 self.stats.frame_rx.record(&frame);
2990
2991 if let Frame::Close(_) = frame {
2992 trace!("draining");
2993 self.state = State::Draining;
2994 break;
2995 }
2996 }
2997 return Ok(());
2998 }
2999 State::Draining | State::Drained => return Ok(()),
3000 State::Handshake(ref mut state) => state,
3001 };
3002
3003 match packet.header {
3004 Header::Retry {
3005 src_cid: rem_cid, ..
3006 } => {
3007 if self.side.is_server() {
3008 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3009 }
3010
3011 if self.total_authed_packets > 1
3012 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
3014 &self.rem_cids.active(),
3015 &packet.header_data,
3016 &packet.payload,
3017 )
3018 {
3019 trace!("discarding invalid Retry");
3020 return Ok(());
3028 }
3029
3030 trace!("retrying with CID {}", rem_cid);
3031 let client_hello = state.client_hello.take().unwrap();
3032 self.retry_src_cid = Some(rem_cid);
3033 self.rem_cids.update_initial_cid(rem_cid);
3034 self.rem_handshake_cid = rem_cid;
3035
3036 let space = &mut self.spaces[SpaceId::Initial];
3037 if let Some(info) = space.take(0) {
3038 self.on_packet_acked(now, 0, info);
3039 };
3040
3041 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
3043 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
3044 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
3045 crypto_offset: client_hello.len() as u64,
3046 ..PacketSpace::new(now)
3047 };
3048 self.spaces[SpaceId::Initial]
3049 .pending
3050 .crypto
3051 .push_back(frame::Crypto {
3052 offset: 0,
3053 data: client_hello,
3054 });
3055
3056 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
3058 for (pn, info) in zero_rtt {
3059 self.remove_in_flight(pn, &info);
3060 self.spaces[SpaceId::Data].pending |= info.retransmits;
3061 }
3062 self.streams.retransmit_all_for_0rtt();
3063
3064 let token_len = packet.payload.len() - 16;
3065 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3066 unreachable!("we already short-circuited if we're server");
3067 };
3068 *token = packet.payload.freeze().split_to(token_len);
3069 self.state = State::Handshake(state::Handshake {
3070 expected_token: Bytes::new(),
3071 rem_cid_set: false,
3072 client_hello: None,
3073 });
3074 Ok(())
3075 }
3076 Header::Long {
3077 ty: LongType::Handshake,
3078 src_cid: rem_cid,
3079 ..
3080 } => {
3081 if rem_cid != self.rem_handshake_cid {
3082 debug!(
3083 "discarding packet with mismatched remote CID: {} != {}",
3084 self.rem_handshake_cid, rem_cid
3085 );
3086 return Ok(());
3087 }
3088 self.on_path_validated();
3089
3090 self.process_early_payload(now, packet)?;
3091 if self.state.is_closed() {
3092 return Ok(());
3093 }
3094
3095 if self.crypto.is_handshaking() {
3096 trace!("handshake ongoing");
3097 return Ok(());
3098 }
3099
3100 if self.side.is_client() {
3101 let params =
3103 self.crypto
3104 .transport_parameters()?
3105 .ok_or_else(|| TransportError {
3106 code: TransportErrorCode::crypto(0x6d),
3107 frame: None,
3108 reason: "transport parameters missing".into(),
3109 })?;
3110
3111 if self.has_0rtt() {
3112 if !self.crypto.early_data_accepted().unwrap() {
3113 debug_assert!(self.side.is_client());
3114 debug!("0-RTT rejected");
3115 self.accepted_0rtt = false;
3116 self.streams.zero_rtt_rejected();
3117
3118 self.spaces[SpaceId::Data].pending = Retransmits::default();
3120
3121 let sent_packets =
3123 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
3124 for (pn, packet) in sent_packets {
3125 self.remove_in_flight(pn, &packet);
3126 }
3127 } else {
3128 self.accepted_0rtt = true;
3129 params.validate_resumption_from(&self.peer_params)?;
3130 }
3131 }
3132 if let Some(token) = params.stateless_reset_token {
3133 self.endpoint_events
3134 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
3135 }
3136 self.handle_peer_params(params)?;
3137 self.issue_first_cids(now);
3138 } else {
3139 self.spaces[SpaceId::Data].pending.handshake_done = true;
3141 self.discard_space(now, SpaceId::Handshake);
3142 }
3143
3144 self.events.push_back(Event::Connected);
3145 self.state = State::Established;
3146 trace!("established");
3147 Ok(())
3148 }
3149 Header::Initial(InitialHeader {
3150 src_cid: rem_cid, ..
3151 }) => {
3152 if !state.rem_cid_set {
3153 trace!("switching remote CID to {}", rem_cid);
3154 let mut state = state.clone();
3155 self.rem_cids.update_initial_cid(rem_cid);
3156 self.rem_handshake_cid = rem_cid;
3157 self.orig_rem_cid = rem_cid;
3158 state.rem_cid_set = true;
3159 self.state = State::Handshake(state);
3160 } else if rem_cid != self.rem_handshake_cid {
3161 debug!(
3162 "discarding packet with mismatched remote CID: {} != {}",
3163 self.rem_handshake_cid, rem_cid
3164 );
3165 return Ok(());
3166 }
3167
3168 let starting_space = self.highest_space;
3169 self.process_early_payload(now, packet)?;
3170
3171 if self.side.is_server()
3172 && starting_space == SpaceId::Initial
3173 && self.highest_space != SpaceId::Initial
3174 {
3175 let params =
3176 self.crypto
3177 .transport_parameters()?
3178 .ok_or_else(|| TransportError {
3179 code: TransportErrorCode::crypto(0x6d),
3180 frame: None,
3181 reason: "transport parameters missing".into(),
3182 })?;
3183 self.handle_peer_params(params)?;
3184 self.issue_first_cids(now);
3185 self.init_0rtt();
3186 }
3187 Ok(())
3188 }
3189 Header::Long {
3190 ty: LongType::ZeroRtt,
3191 ..
3192 } => {
3193 self.process_payload(now, remote, number.unwrap(), packet)?;
3194 Ok(())
3195 }
3196 Header::VersionNegotiate { .. } => {
3197 if self.total_authed_packets > 1 {
3198 return Ok(());
3199 }
3200 let supported = packet
3201 .payload
3202 .chunks(4)
3203 .any(|x| match <[u8; 4]>::try_from(x) {
3204 Ok(version) => self.version == u32::from_be_bytes(version),
3205 Err(_) => false,
3206 });
3207 if supported {
3208 return Ok(());
3209 }
3210 debug!("remote doesn't support our version");
3211 Err(ConnectionError::VersionMismatch)
3212 }
3213 Header::Short { .. } => unreachable!(
3214 "short packets received during handshake are discarded in handle_packet"
3215 ),
3216 }
3217 }
3218
3219 fn process_early_payload(
3221 &mut self,
3222 now: Instant,
3223 packet: Packet,
3224 ) -> Result<(), TransportError> {
3225 debug_assert_ne!(packet.header.space(), SpaceId::Data);
3226 let payload_len = packet.payload.len();
3227 let mut ack_eliciting = false;
3228 for result in frame::Iter::new(packet.payload.freeze())? {
3229 let frame = result?;
3230 let span = match frame {
3231 Frame::Padding => continue,
3232 _ => Some(trace_span!("frame", ty = %frame.ty())),
3233 };
3234
3235 self.stats.frame_rx.record(&frame);
3236
3237 let _guard = span.as_ref().map(|x| x.enter());
3238 ack_eliciting |= frame.is_ack_eliciting();
3239
3240 match frame {
3242 Frame::Padding | Frame::Ping => {}
3243 Frame::Crypto(frame) => {
3244 self.read_crypto(packet.header.space(), &frame, payload_len)?;
3245 }
3246 Frame::Ack(ack) => {
3247 self.on_ack_received(now, packet.header.space(), ack)?;
3248 }
3249 Frame::Close(reason) => {
3250 self.error = Some(reason.into());
3251 self.state = State::Draining;
3252 return Ok(());
3253 }
3254 _ => {
3255 let mut err =
3256 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
3257 err.frame = Some(frame.ty());
3258 return Err(err);
3259 }
3260 }
3261 }
3262
3263 if ack_eliciting {
3264 self.spaces[packet.header.space()]
3266 .pending_acks
3267 .set_immediate_ack_required();
3268 }
3269
3270 self.write_crypto();
3271 Ok(())
3272 }
3273
3274 fn process_payload(
3275 &mut self,
3276 now: Instant,
3277 remote: SocketAddr,
3278 number: u64,
3279 packet: Packet,
3280 ) -> Result<(), TransportError> {
3281 let payload = packet.payload.freeze();
3282 let mut is_probing_packet = true;
3283 let mut close = None;
3284 let payload_len = payload.len();
3285 let mut ack_eliciting = false;
3286 for result in frame::Iter::new(payload)? {
3287 let frame = result?;
3288 let span = match frame {
3289 Frame::Padding => continue,
3290 _ => Some(trace_span!("frame", ty = %frame.ty())),
3291 };
3292
3293 self.stats.frame_rx.record(&frame);
3294 match &frame {
3297 Frame::Crypto(f) => {
3298 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
3299 }
3300 Frame::Stream(f) => {
3301 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
3302 }
3303 Frame::Datagram(f) => {
3304 trace!(len = f.data.len(), "got datagram frame");
3305 }
3306 f => {
3307 trace!("got frame {:?}", f);
3308 }
3309 }
3310
3311 let _guard = span.as_ref().map(|x| x.enter());
3312 if packet.header.is_0rtt() {
3313 match frame {
3314 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
3315 return Err(TransportError::PROTOCOL_VIOLATION(
3316 "illegal frame type in 0-RTT",
3317 ));
3318 }
3319 _ => {}
3320 }
3321 }
3322 ack_eliciting |= frame.is_ack_eliciting();
3323
3324 match frame {
3326 Frame::Padding
3327 | Frame::PathChallenge(_)
3328 | Frame::PathResponse(_)
3329 | Frame::NewConnectionId(_) => {}
3330 _ => {
3331 is_probing_packet = false;
3332 }
3333 }
3334 match frame {
3335 Frame::Crypto(frame) => {
3336 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
3337 }
3338 Frame::Stream(frame) => {
3339 if self.streams.received(frame, payload_len)?.should_transmit() {
3340 self.spaces[SpaceId::Data].pending.max_data = true;
3341 }
3342 }
3343 Frame::Ack(ack) => {
3344 self.on_ack_received(now, SpaceId::Data, ack)?;
3345 }
3346 Frame::Padding | Frame::Ping => {}
3347 Frame::Close(reason) => {
3348 close = Some(reason);
3349 }
3350 Frame::PathChallenge(token) => {
3351 self.path_responses.push(number, token, remote);
3352 if remote == self.path.remote {
3353 match self.peer_supports_ack_frequency() {
3356 true => self.immediate_ack(),
3357 false => self.ping(),
3358 }
3359 }
3360 }
3361 Frame::PathResponse(token) => {
3362 if self.path.challenge == Some(token) && remote == self.path.remote {
3363 trace!("new path validated");
3364 self.timers.stop(Timer::PathValidation);
3365 self.path.challenge = None;
3366 self.path.validated = true;
3367 if let Some((_, ref mut prev_path)) = self.prev_path {
3368 prev_path.challenge = None;
3369 prev_path.challenge_pending = false;
3370 }
3371 self.on_path_validated();
3372 } else if let Some(nat_traversal) = &mut self.nat_traversal {
3373 match nat_traversal.handle_validation_success(remote, token, now) {
3375 Ok(sequence) => {
3376 trace!(
3377 "NAT traversal candidate {} validated for sequence {}",
3378 remote, sequence
3379 );
3380
3381 if nat_traversal.handle_coordination_success(remote, now) {
3383 trace!("Coordination succeeded via {}", remote);
3384
3385 let can_migrate = match &self.side {
3387 ConnectionSide::Client { .. } => true, ConnectionSide::Server { server_config } => {
3389 server_config.migration
3390 }
3391 };
3392
3393 if can_migrate {
3394 let best_pairs = nat_traversal.get_best_succeeded_pairs();
3396 if let Some(best) = best_pairs.first() {
3397 if best.remote_addr == remote
3398 && best.remote_addr != self.path.remote
3399 {
3400 debug!(
3401 "NAT traversal found better path, initiating migration"
3402 );
3403 if let Err(e) =
3405 self.migrate_to_nat_traversal_path(now)
3406 {
3407 warn!(
3408 "Failed to migrate to NAT traversal path: {:?}",
3409 e
3410 );
3411 }
3412 }
3413 }
3414 }
3415 } else {
3416 if nat_traversal.mark_pair_succeeded(remote) {
3418 trace!("NAT traversal pair succeeded for {}", remote);
3419 }
3420 }
3421 }
3422 Err(NatTraversalError::ChallengeMismatch) => {
3423 debug!(
3424 "PATH_RESPONSE challenge mismatch for NAT candidate {}",
3425 remote
3426 );
3427 }
3428 Err(e) => {
3429 debug!("NAT traversal validation error: {}", e);
3430 }
3431 }
3432 } else {
3433 debug!(token, "ignoring invalid PATH_RESPONSE");
3434 }
3435 }
3436 Frame::MaxData(bytes) => {
3437 self.streams.received_max_data(bytes);
3438 }
3439 Frame::MaxStreamData { id, offset } => {
3440 self.streams.received_max_stream_data(id, offset)?;
3441 }
3442 Frame::MaxStreams { dir, count } => {
3443 self.streams.received_max_streams(dir, count)?;
3444 }
3445 Frame::ResetStream(frame) => {
3446 if self.streams.received_reset(frame)?.should_transmit() {
3447 self.spaces[SpaceId::Data].pending.max_data = true;
3448 }
3449 }
3450 Frame::DataBlocked { offset } => {
3451 debug!(offset, "peer claims to be blocked at connection level");
3452 }
3453 Frame::StreamDataBlocked { id, offset } => {
3454 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
3455 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
3456 return Err(TransportError::STREAM_STATE_ERROR(
3457 "STREAM_DATA_BLOCKED on send-only stream",
3458 ));
3459 }
3460 debug!(
3461 stream = %id,
3462 offset, "peer claims to be blocked at stream level"
3463 );
3464 }
3465 Frame::StreamsBlocked { dir, limit } => {
3466 if limit > MAX_STREAM_COUNT {
3467 return Err(TransportError::FRAME_ENCODING_ERROR(
3468 "unrepresentable stream limit",
3469 ));
3470 }
3471 debug!(
3472 "peer claims to be blocked opening more than {} {} streams",
3473 limit, dir
3474 );
3475 }
3476 Frame::StopSending(frame::StopSending { id, error_code }) => {
3477 if id.initiator() != self.side.side() {
3478 if id.dir() == Dir::Uni {
3479 debug!("got STOP_SENDING on recv-only {}", id);
3480 return Err(TransportError::STREAM_STATE_ERROR(
3481 "STOP_SENDING on recv-only stream",
3482 ));
3483 }
3484 } else if self.streams.is_local_unopened(id) {
3485 return Err(TransportError::STREAM_STATE_ERROR(
3486 "STOP_SENDING on unopened stream",
3487 ));
3488 }
3489 self.streams.received_stop_sending(id, error_code);
3490 }
3491 Frame::RetireConnectionId { sequence } => {
3492 let allow_more_cids = self
3493 .local_cid_state
3494 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
3495 self.endpoint_events
3496 .push_back(EndpointEventInner::RetireConnectionId(
3497 now,
3498 sequence,
3499 allow_more_cids,
3500 ));
3501 }
3502 Frame::NewConnectionId(frame) => {
3503 trace!(
3504 sequence = frame.sequence,
3505 id = %frame.id,
3506 retire_prior_to = frame.retire_prior_to,
3507 );
3508 if self.rem_cids.active().is_empty() {
3509 return Err(TransportError::PROTOCOL_VIOLATION(
3510 "NEW_CONNECTION_ID when CIDs aren't in use",
3511 ));
3512 }
3513 if frame.retire_prior_to > frame.sequence {
3514 return Err(TransportError::PROTOCOL_VIOLATION(
3515 "NEW_CONNECTION_ID retiring unissued CIDs",
3516 ));
3517 }
3518
3519 use crate::cid_queue::InsertError;
3520 match self.rem_cids.insert(frame) {
3521 Ok(None) => {}
3522 Ok(Some((retired, reset_token))) => {
3523 let pending_retired =
3524 &mut self.spaces[SpaceId::Data].pending.retire_cids;
3525 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
3528 if (pending_retired.len() as u64)
3531 .saturating_add(retired.end.saturating_sub(retired.start))
3532 > MAX_PENDING_RETIRED_CIDS
3533 {
3534 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
3535 "queued too many retired CIDs",
3536 ));
3537 }
3538 pending_retired.extend(retired);
3539 self.set_reset_token(reset_token);
3540 }
3541 Err(InsertError::ExceedsLimit) => {
3542 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
3543 }
3544 Err(InsertError::Retired) => {
3545 trace!("discarding already-retired");
3546 self.spaces[SpaceId::Data]
3550 .pending
3551 .retire_cids
3552 .push(frame.sequence);
3553 continue;
3554 }
3555 };
3556
3557 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
3558 self.update_rem_cid();
3561 }
3562 }
3563 Frame::NewToken(NewToken { token }) => {
3564 let ConnectionSide::Client {
3565 token_store,
3566 server_name,
3567 ..
3568 } = &self.side
3569 else {
3570 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
3571 };
3572 if token.is_empty() {
3573 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
3574 }
3575 trace!("got new token");
3576 token_store.insert(server_name, token);
3577 }
3578 Frame::Datagram(datagram) => {
3579 let result = self
3580 .datagrams
3581 .received(datagram, &self.config.datagram_receive_buffer_size)?;
3582 if result.was_empty {
3583 self.events.push_back(Event::DatagramReceived);
3584 }
3585 if result.dropped_count > 0 {
3586 let drop_counts = DatagramDropStats {
3587 datagrams: result.dropped_count as u64,
3588 bytes: result.dropped_bytes as u64,
3589 };
3590 self.stats
3591 .datagram_drops
3592 .record(drop_counts.datagrams, drop_counts.bytes);
3593 self.events.push_back(Event::DatagramDropped(drop_counts));
3594 }
3595 }
3596 Frame::AckFrequency(ack_frequency) => {
3597 let space = &mut self.spaces[SpaceId::Data];
3599
3600 if !self
3601 .ack_frequency
3602 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3603 {
3604 continue;
3606 }
3607
3608 if let Some(timeout) = space
3611 .pending_acks
3612 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3613 {
3614 self.timers.set(Timer::MaxAckDelay, timeout);
3615 }
3616 }
3617 Frame::ImmediateAck => {
3618 self.spaces[SpaceId::Data]
3620 .pending_acks
3621 .set_immediate_ack_required();
3622 }
3623 Frame::HandshakeDone => {
3624 if self.side.is_server() {
3625 return Err(TransportError::PROTOCOL_VIOLATION(
3626 "client sent HANDSHAKE_DONE",
3627 ));
3628 }
3629 if self.spaces[SpaceId::Handshake].crypto.is_some() {
3630 self.discard_space(now, SpaceId::Handshake);
3631 }
3632 }
3633 Frame::AddAddress(add_address) => {
3634 self.handle_add_address(&add_address, now)?;
3635 }
3636 Frame::PunchMeNow(punch_me_now) => {
3637 self.handle_punch_me_now(&punch_me_now, now)?;
3638 }
3639 Frame::RemoveAddress(remove_address) => {
3640 self.handle_remove_address(&remove_address)?;
3641 }
3642 Frame::ObservedAddress(observed_address) => {
3643 self.handle_observed_address_frame(&observed_address, now)?;
3644 }
3645 Frame::TryConnectTo(try_connect_to) => {
3646 self.handle_try_connect_to(&try_connect_to, now)?;
3647 }
3648 Frame::TryConnectToResponse(response) => {
3649 self.handle_try_connect_to_response(&response)?;
3650 }
3651 }
3652 }
3653
3654 let space = &mut self.spaces[SpaceId::Data];
3655 if space
3656 .pending_acks
3657 .packet_received(now, number, ack_eliciting, &space.dedup)
3658 {
3659 self.timers
3660 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3661 }
3662
3663 let pending = &mut self.spaces[SpaceId::Data].pending;
3668 self.streams.queue_max_stream_id(pending);
3669
3670 if let Some(reason) = close {
3671 self.error = Some(reason.into());
3672 self.state = State::Draining;
3673 self.close = true;
3674 }
3675
3676 if remote != self.path.remote
3677 && !is_probing_packet
3678 && number == self.spaces[SpaceId::Data].rx_packet
3679 {
3680 let ConnectionSide::Server { ref server_config } = self.side else {
3681 return Err(TransportError::PROTOCOL_VIOLATION(
3682 "packets from unknown remote should be dropped by clients",
3683 ));
3684 };
3685 debug_assert!(
3686 server_config.migration,
3687 "migration-initiating packets should have been dropped immediately"
3688 );
3689 self.migrate(now, remote);
3690 self.update_rem_cid();
3692 self.spin = false;
3693 }
3694
3695 Ok(())
3696 }
3697
3698 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3699 trace!(%remote, "migration initiated");
3700 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3704 PathData::from_previous(remote, &self.path, now)
3705 } else {
3706 let peer_max_udp_payload_size =
3707 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3708 .unwrap_or(u16::MAX);
3709 PathData::new(
3710 remote,
3711 self.allow_mtud,
3712 Some(peer_max_udp_payload_size),
3713 now,
3714 &self.config,
3715 )
3716 };
3717 new_path.challenge = Some(self.rng.r#gen());
3718 new_path.challenge_pending = true;
3719 let prev_pto = self.pto(SpaceId::Data);
3720
3721 let mut prev = mem::replace(&mut self.path, new_path);
3722 if prev.challenge.is_none() {
3724 prev.challenge = Some(self.rng.r#gen());
3725 prev.challenge_pending = true;
3726 self.prev_path = Some((self.rem_cids.active(), prev));
3729 }
3730
3731 self.timers.set(
3732 Timer::PathValidation,
3733 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3734 );
3735 }
3736
3737 pub fn local_address_changed(&mut self) {
3739 self.update_rem_cid();
3740 self.ping();
3741 }
3742
3743 pub fn migrate_to_nat_traversal_path(&mut self, now: Instant) -> Result<(), TransportError> {
3745 let (remote_addr, local_addr) = {
3747 let nat_state = self
3748 .nat_traversal
3749 .as_ref()
3750 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
3751
3752 let best_pairs = nat_state.get_best_succeeded_pairs();
3754 if best_pairs.is_empty() {
3755 return Err(TransportError::PROTOCOL_VIOLATION(
3756 "No validated NAT traversal paths",
3757 ));
3758 }
3759
3760 let best_path = best_pairs
3762 .iter()
3763 .find(|pair| pair.remote_addr != self.path.remote)
3764 .or_else(|| best_pairs.first());
3765
3766 let best_path = best_path.ok_or_else(|| {
3767 TransportError::PROTOCOL_VIOLATION("No suitable NAT traversal path")
3768 })?;
3769
3770 debug!(
3771 "Migrating to NAT traversal path: {} -> {} (priority: {})",
3772 self.path.remote, best_path.remote_addr, best_path.priority
3773 );
3774
3775 (best_path.remote_addr, best_path.local_addr)
3776 };
3777
3778 self.migrate(now, remote_addr);
3780
3781 if local_addr != SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0) {
3783 self.local_ip = Some(local_addr.ip());
3784 }
3785
3786 self.path.challenge_pending = true;
3788
3789 Ok(())
3790 }
3791
3792 fn update_rem_cid(&mut self) {
3794 let (reset_token, retired) = match self.rem_cids.next() {
3795 Some(x) => x,
3796 None => return,
3797 };
3798
3799 self.spaces[SpaceId::Data]
3801 .pending
3802 .retire_cids
3803 .extend(retired);
3804 self.set_reset_token(reset_token);
3805 }
3806
3807 fn set_reset_token(&mut self, reset_token: ResetToken) {
3808 self.endpoint_events
3809 .push_back(EndpointEventInner::ResetToken(
3810 self.path.remote,
3811 reset_token,
3812 ));
3813 self.peer_params.stateless_reset_token = Some(reset_token);
3814 }
3815
3816 fn handle_encode_error(&mut self, now: Instant, context: &'static str) {
3817 tracing::error!("VarInt overflow while encoding {context}");
3818 self.close_inner(
3819 now,
3820 Close::from(TransportError::INTERNAL_ERROR(
3821 "varint overflow during encoding",
3822 )),
3823 );
3824 }
3825
3826 fn encode_or_close(
3827 &mut self,
3828 now: Instant,
3829 result: Result<(), VarIntBoundsExceeded>,
3830 context: &'static str,
3831 ) -> bool {
3832 if result.is_err() {
3833 self.handle_encode_error(now, context);
3834 return false;
3835 }
3836 true
3837 }
3838
3839 fn issue_first_cids(&mut self, now: Instant) {
3841 if self.local_cid_state.cid_len() == 0 {
3842 return;
3843 }
3844
3845 let mut n = self.peer_params.issue_cids_limit() - 1;
3847 if let ConnectionSide::Server { server_config } = &self.side {
3848 if server_config.has_preferred_address() {
3849 n -= 1;
3851 }
3852 }
3853 self.endpoint_events
3854 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3855 }
3856
3857 fn populate_packet(
3858 &mut self,
3859 now: Instant,
3860 space_id: SpaceId,
3861 buf: &mut Vec<u8>,
3862 max_size: usize,
3863 pn: u64,
3864 ) -> SentFrames {
3865 let mut sent = SentFrames::default();
3866 let space = &mut self.spaces[space_id];
3867 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3868 space.pending_acks.maybe_ack_non_eliciting();
3869 macro_rules! encode_or_close {
3870 ($result:expr, $context:expr) => {{
3871 if $result.is_err() {
3872 drop(space);
3873 self.handle_encode_error(now, $context);
3874 return sent;
3875 }
3876 }};
3877 }
3878
3879 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3881 encode_or_close!(
3882 frame::FrameType::HANDSHAKE_DONE.try_encode(buf),
3883 "HANDSHAKE_DONE"
3884 );
3885 sent.retransmits.get_or_create().handshake_done = true;
3886 self.stats.frame_tx.handshake_done =
3888 self.stats.frame_tx.handshake_done.saturating_add(1);
3889 }
3890
3891 if mem::replace(&mut space.ping_pending, false) {
3893 trace!("PING");
3894 encode_or_close!(frame::FrameType::PING.try_encode(buf), "PING");
3895 sent.non_retransmits = true;
3896 self.stats.frame_tx.ping += 1;
3897 }
3898
3899 if mem::replace(&mut space.immediate_ack_pending, false) {
3901 trace!("IMMEDIATE_ACK");
3902 encode_or_close!(
3903 frame::FrameType::IMMEDIATE_ACK.try_encode(buf),
3904 "IMMEDIATE_ACK"
3905 );
3906 sent.non_retransmits = true;
3907 self.stats.frame_tx.immediate_ack += 1;
3908 }
3909
3910 if space.pending_acks.can_send() {
3912 let ack_result = Self::populate_acks(
3913 now,
3914 self.receiving_ecn,
3915 &mut sent,
3916 space,
3917 buf,
3918 &mut self.stats,
3919 );
3920 encode_or_close!(ack_result, "ACK");
3921 }
3922
3923 if mem::replace(&mut space.pending.ack_frequency, false) {
3925 let sequence_number = self.ack_frequency.next_sequence_number();
3926
3927 let config = self.config.ack_frequency_config.as_ref().unwrap();
3929
3930 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3932 self.path.rtt.get(),
3933 config,
3934 &self.peer_params,
3935 );
3936
3937 trace!(?max_ack_delay, "ACK_FREQUENCY");
3938
3939 encode_or_close!(
3940 (frame::AckFrequency {
3941 sequence: sequence_number,
3942 ack_eliciting_threshold: config.ack_eliciting_threshold,
3943 request_max_ack_delay: max_ack_delay
3944 .as_micros()
3945 .try_into()
3946 .unwrap_or(VarInt::MAX),
3947 reordering_threshold: config.reordering_threshold,
3948 })
3949 .try_encode(buf),
3950 "ACK_FREQUENCY"
3951 );
3952
3953 sent.retransmits.get_or_create().ack_frequency = true;
3954
3955 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3956 self.stats.frame_tx.ack_frequency += 1;
3957 }
3958
3959 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3961 if let Some(token) = self.path.challenge {
3963 self.path.challenge_pending = false;
3965 sent.non_retransmits = true;
3966 sent.requires_padding = true;
3967 trace!("PATH_CHALLENGE {:08x}", token);
3968 encode_or_close!(
3969 frame::FrameType::PATH_CHALLENGE.try_encode(buf),
3970 "PATH_CHALLENGE"
3971 );
3972 buf.write(token);
3973 self.stats.frame_tx.path_challenge += 1;
3974 }
3975
3976 }
3979
3980 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3982 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3983 sent.non_retransmits = true;
3984 sent.requires_padding = true;
3985 trace!("PATH_RESPONSE {:08x}", token);
3986 encode_or_close!(
3987 frame::FrameType::PATH_RESPONSE.try_encode(buf),
3988 "PATH_RESPONSE"
3989 );
3990 buf.write(token);
3991 self.stats.frame_tx.path_response += 1;
3992 }
3993 }
3994
3995 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3997 let mut frame = match space.pending.crypto.pop_front() {
3998 Some(x) => x,
3999 None => break,
4000 };
4001
4002 let max_crypto_data_size = max_size
4007 - buf.len()
4008 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
4010 - 2; let available_space = max_size - buf.len();
4014 let remaining_data = frame.data.len();
4015 let optimal_size = self
4016 .pqc_state
4017 .calculate_crypto_frame_size(available_space, remaining_data);
4018
4019 let len = frame
4020 .data
4021 .len()
4022 .min(2usize.pow(14) - 1)
4023 .min(max_crypto_data_size)
4024 .min(optimal_size);
4025
4026 let data = frame.data.split_to(len);
4027 let truncated = frame::Crypto {
4028 offset: frame.offset,
4029 data,
4030 };
4031 trace!(
4032 "CRYPTO: off {} len {}",
4033 truncated.offset,
4034 truncated.data.len()
4035 );
4036 encode_or_close!(truncated.try_encode(buf), "CRYPTO");
4037 self.stats.frame_tx.crypto += 1;
4038 sent.retransmits.get_or_create().crypto.push_back(truncated);
4039 if !frame.data.is_empty() {
4040 frame.offset += len as u64;
4041 space.pending.crypto.push_front(frame);
4042 }
4043 }
4044
4045 if space_id == SpaceId::Data {
4046 let control_result = self.streams.write_control_frames(
4047 buf,
4048 &mut space.pending,
4049 &mut sent.retransmits,
4050 &mut self.stats.frame_tx,
4051 max_size,
4052 );
4053 encode_or_close!(control_result, "control frames");
4054 }
4055
4056 while buf.len() + 44 < max_size {
4058 let issued = match space.pending.new_cids.pop() {
4059 Some(x) => x,
4060 None => break,
4061 };
4062 trace!(
4063 sequence = issued.sequence,
4064 id = %issued.id,
4065 "NEW_CONNECTION_ID"
4066 );
4067 encode_or_close!(
4068 (frame::NewConnectionId {
4069 sequence: issued.sequence,
4070 retire_prior_to: self.local_cid_state.retire_prior_to(),
4071 id: issued.id,
4072 reset_token: issued.reset_token,
4073 })
4074 .try_encode(buf),
4075 "NEW_CONNECTION_ID"
4076 );
4077 sent.retransmits.get_or_create().new_cids.push(issued);
4078 self.stats.frame_tx.new_connection_id += 1;
4079 }
4080
4081 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
4083 let seq = match space.pending.retire_cids.pop() {
4084 Some(x) => x,
4085 None => break,
4086 };
4087 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
4088 encode_or_close!(
4089 frame::FrameType::RETIRE_CONNECTION_ID.try_encode(buf),
4090 "RETIRE_CONNECTION_ID"
4091 );
4092 encode_or_close!(buf.write_var(seq), "RETIRE_CONNECTION_ID seq");
4093 sent.retransmits.get_or_create().retire_cids.push(seq);
4094 self.stats.frame_tx.retire_connection_id += 1;
4095 }
4096
4097 let mut sent_datagrams = false;
4099 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4100 match self.datagrams.write(buf, max_size) {
4101 true => {
4102 sent_datagrams = true;
4103 sent.non_retransmits = true;
4104 self.stats.frame_tx.datagram += 1;
4105 }
4106 false => break,
4107 }
4108 }
4109 if self.datagrams.send_blocked && sent_datagrams {
4110 self.events.push_back(Event::DatagramsUnblocked);
4111 self.datagrams.send_blocked = false;
4112 }
4113
4114 while let Some(remote_addr) = space.pending.new_tokens.pop() {
4116 debug_assert_eq!(space_id, SpaceId::Data);
4117 let ConnectionSide::Server { server_config } = &self.side else {
4118 debug_assert!(false, "NEW_TOKEN frames should not be enqueued by clients");
4120 continue;
4121 };
4122
4123 if remote_addr != self.path.remote {
4124 continue;
4129 }
4130
4131 if self.delay_new_token_until_binding && self.peer_id_for_tokens.is_none() {
4134 space.pending.new_tokens.push(remote_addr);
4136 break;
4137 }
4138
4139 let token = match crate::token_v2::encode_validation_token_with_rng(
4140 &server_config.token_key,
4141 remote_addr.ip(),
4142 server_config.time_source.now(),
4143 &mut self.rng,
4144 ) {
4145 Ok(token) => token,
4146 Err(err) => {
4147 error!(?err, "failed to encode validation token");
4148 continue;
4149 }
4150 };
4151 let new_token = NewToken {
4152 token: token.into(),
4153 };
4154
4155 if buf.len() + new_token.size() >= max_size {
4156 space.pending.new_tokens.push(remote_addr);
4157 break;
4158 }
4159
4160 encode_or_close!(new_token.try_encode(buf), "NEW_TOKEN");
4161 sent.retransmits
4162 .get_or_create()
4163 .new_tokens
4164 .push(remote_addr);
4165 self.stats.frame_tx.new_token += 1;
4166 }
4167
4168 while buf.len() + frame::AddAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4170 let add_address = match space.pending.add_addresses.pop() {
4171 Some(x) => x,
4172 None => break,
4173 };
4174 trace!(
4175 sequence = %add_address.sequence,
4176 address = %add_address.address,
4177 "ADD_ADDRESS"
4178 );
4179 if self.nat_traversal_frame_config.use_rfc_format {
4181 encode_or_close!(add_address.try_encode_rfc(buf), "ADD_ADDRESS (rfc)");
4182 } else {
4183 encode_or_close!(add_address.try_encode_legacy(buf), "ADD_ADDRESS (legacy)");
4184 }
4185 sent.retransmits
4186 .get_or_create()
4187 .add_addresses
4188 .push(add_address);
4189 self.stats.frame_tx.add_address += 1;
4190 }
4191
4192 while buf.len() + frame::PunchMeNow::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4194 let punch_me_now = match space.pending.punch_me_now.pop() {
4195 Some(x) => x,
4196 None => break,
4197 };
4198 trace!(
4199 round = %punch_me_now.round,
4200 paired_with_sequence_number = %punch_me_now.paired_with_sequence_number,
4201 "PUNCH_ME_NOW"
4202 );
4203 if self.nat_traversal_frame_config.use_rfc_format {
4205 encode_or_close!(punch_me_now.try_encode_rfc(buf), "PUNCH_ME_NOW (rfc)");
4206 } else {
4207 encode_or_close!(punch_me_now.try_encode_legacy(buf), "PUNCH_ME_NOW (legacy)");
4208 }
4209 sent.retransmits
4210 .get_or_create()
4211 .punch_me_now
4212 .push(punch_me_now);
4213 self.stats.frame_tx.punch_me_now += 1;
4214 }
4215
4216 while buf.len() + frame::RemoveAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4218 let remove_address = match space.pending.remove_addresses.pop() {
4219 Some(x) => x,
4220 None => break,
4221 };
4222 trace!(
4223 sequence = %remove_address.sequence,
4224 "REMOVE_ADDRESS"
4225 );
4226 encode_or_close!(remove_address.try_encode(buf), "REMOVE_ADDRESS");
4228 sent.retransmits
4229 .get_or_create()
4230 .remove_addresses
4231 .push(remove_address);
4232 self.stats.frame_tx.remove_address += 1;
4233 }
4234
4235 while buf.len() + frame::ObservedAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data
4237 {
4238 let observed_address = match space.pending.outbound_observations.pop() {
4239 Some(x) => x,
4240 None => break,
4241 };
4242 info!(
4243 address = %observed_address.address,
4244 sequence = %observed_address.sequence_number,
4245 "populate_packet: ENCODING OBSERVED_ADDRESS into packet"
4246 );
4247 encode_or_close!(observed_address.try_encode(buf), "OBSERVED_ADDRESS");
4248 sent.retransmits
4249 .get_or_create()
4250 .outbound_observations
4251 .push(observed_address);
4252 self.stats.frame_tx.observed_address += 1;
4253 }
4254
4255 if space_id == SpaceId::Data {
4257 sent.stream_frames =
4258 self.streams
4259 .write_stream_frames(buf, max_size, self.config.send_fairness);
4260 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
4261 }
4262
4263 sent
4264 }
4265
4266 fn populate_acks(
4271 now: Instant,
4272 receiving_ecn: bool,
4273 sent: &mut SentFrames,
4274 space: &mut PacketSpace,
4275 buf: &mut Vec<u8>,
4276 stats: &mut ConnectionStats,
4277 ) -> Result<(), VarIntBoundsExceeded> {
4278 debug_assert!(!space.pending_acks.ranges().is_empty());
4279
4280 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
4282 let ecn = if receiving_ecn {
4283 Some(&space.ecn_counters)
4284 } else {
4285 None
4286 };
4287 sent.largest_acked = space.pending_acks.ranges().max();
4288
4289 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
4290
4291 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
4293 let delay = delay_micros >> ack_delay_exp.into_inner();
4294
4295 trace!(
4296 "ACK {:?}, Delay = {}us",
4297 space.pending_acks.ranges(),
4298 delay_micros
4299 );
4300
4301 frame::Ack::try_encode(delay as _, space.pending_acks.ranges(), ecn, buf)?;
4302 stats.frame_tx.acks += 1;
4303 Ok(())
4304 }
4305
4306 fn close_common(&mut self) {
4307 trace!("connection closed");
4308 for &timer in &Timer::VALUES {
4309 self.timers.stop(timer);
4310 }
4311 }
4312
4313 fn set_close_timer(&mut self, now: Instant) {
4314 self.timers
4315 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
4316 }
4317
4318 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
4320 if Some(self.orig_rem_cid) != params.initial_src_cid
4321 || (self.side.is_client()
4322 && (Some(self.initial_dst_cid) != params.original_dst_cid
4323 || self.retry_src_cid != params.retry_src_cid))
4324 {
4325 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
4326 "CID authentication failure",
4327 ));
4328 }
4329
4330 self.set_peer_params(params);
4331
4332 Ok(())
4333 }
4334
4335 fn set_peer_params(&mut self, params: TransportParameters) {
4336 self.streams.set_params(¶ms);
4337 self.idle_timeout =
4338 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
4339 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
4340 if let Some(ref info) = params.preferred_address {
4341 self.rem_cids.insert(frame::NewConnectionId {
4342 sequence: 1,
4343 id: info.connection_id,
4344 reset_token: info.stateless_reset_token,
4345 retire_prior_to: 0,
4346 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
4347 }
4348 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
4349
4350 self.negotiate_nat_traversal_capability(¶ms);
4352
4353 let local_has_nat_traversal = self.config.nat_traversal_config.is_some();
4356 let local_supports_rfc = local_has_nat_traversal;
4359 self.nat_traversal_frame_config = frame::nat_traversal_unified::NatTraversalFrameConfig {
4360 use_rfc_format: local_supports_rfc && params.supports_rfc_nat_traversal(),
4362 accept_legacy: true,
4364 };
4365
4366 self.negotiate_address_discovery(¶ms);
4368
4369 self.pqc_state.update_from_peer_params(¶ms);
4371
4372 if self.pqc_state.enabled && self.pqc_state.using_pqc {
4374 trace!("PQC enabled, adjusting MTU discovery for larger handshake packets");
4375 let current_mtu = self.path.mtud.current_mtu();
4379 if current_mtu < self.pqc_state.handshake_mtu {
4380 trace!(
4381 "Current MTU {} is less than PQC handshake MTU {}, will rely on MTU discovery",
4382 current_mtu, self.pqc_state.handshake_mtu
4383 );
4384 }
4385 }
4386
4387 self.peer_params = params;
4388 self.path.mtud.on_peer_max_udp_payload_size_received(
4389 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
4390 );
4391 }
4392
4393 fn negotiate_nat_traversal_capability(&mut self, params: &TransportParameters) {
4395 let peer_nat_config = match ¶ms.nat_traversal {
4397 Some(config) => config,
4398 None => {
4399 if self.config.nat_traversal_config.is_some() {
4401 debug!(
4402 "Peer does not support NAT traversal, maintaining backward compatibility"
4403 );
4404 self.emit_nat_traversal_capability_event(false);
4405
4406 self.set_nat_traversal_compatibility_mode(false);
4408 }
4409 return;
4410 }
4411 };
4412
4413 let local_nat_config = match &self.config.nat_traversal_config {
4415 Some(config) => config,
4416 None => {
4417 debug!("NAT traversal not enabled locally, ignoring peer support");
4418 self.emit_nat_traversal_capability_event(false);
4419 self.set_nat_traversal_compatibility_mode(false);
4420 return;
4421 }
4422 };
4423
4424 info!("Both peers support NAT traversal, negotiating capabilities");
4426
4427 match self.negotiate_nat_traversal_parameters(local_nat_config, peer_nat_config) {
4429 Ok(negotiated_config) => {
4430 info!("NAT traversal capability negotiated successfully");
4431 self.emit_nat_traversal_capability_event(true);
4432
4433 self.init_nat_traversal_with_negotiated_config(&negotiated_config);
4435
4436 self.set_nat_traversal_compatibility_mode(true);
4438
4439 if matches!(
4441 negotiated_config,
4442 crate::transport_parameters::NatTraversalConfig::ClientSupport
4443 ) {
4444 self.initiate_nat_traversal_process();
4445 }
4446 }
4447 Err(e) => {
4448 warn!("NAT traversal capability negotiation failed: {}", e);
4449 self.emit_nat_traversal_capability_event(false);
4450 self.set_nat_traversal_compatibility_mode(false);
4451 }
4452 }
4453 }
4454
4455 fn emit_nat_traversal_capability_event(&mut self, negotiated: bool) {
4457 if negotiated {
4460 info!("NAT traversal capability successfully negotiated");
4461 } else {
4462 info!("NAT traversal capability not available (peer or local support missing)");
4463 }
4464
4465 }
4468
4469 fn set_nat_traversal_compatibility_mode(&mut self, enabled: bool) {
4471 if enabled {
4472 debug!("NAT traversal enabled for this connection");
4473 } else {
4475 debug!("NAT traversal disabled for this connection (backward compatibility mode)");
4476 if self.nat_traversal.is_some() {
4478 warn!("Clearing NAT traversal state due to compatibility mode");
4479 self.nat_traversal = None;
4480 }
4481 }
4482 }
4483
4484 fn negotiate_nat_traversal_parameters(
4486 &self,
4487 local_config: &crate::transport_parameters::NatTraversalConfig,
4488 peer_config: &crate::transport_parameters::NatTraversalConfig,
4489 ) -> Result<crate::transport_parameters::NatTraversalConfig, String> {
4490 match (local_config, peer_config) {
4495 (
4497 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4498 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4499 concurrency_limit,
4500 },
4501 ) => Ok(
4502 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4503 concurrency_limit: *concurrency_limit,
4504 },
4505 ),
4506 (
4508 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4509 concurrency_limit,
4510 },
4511 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4512 ) => Ok(
4513 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4514 concurrency_limit: *concurrency_limit,
4515 },
4516 ),
4517 (
4519 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4520 concurrency_limit: limit1,
4521 },
4522 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4523 concurrency_limit: limit2,
4524 },
4525 ) => Ok(
4526 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4527 concurrency_limit: (*limit1).min(*limit2),
4528 },
4529 ),
4530 (
4532 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4533 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4534 ) => Err("Both endpoints claim to be NAT traversal clients".to_string()),
4535 }
4536 }
4537
4538 fn init_nat_traversal_with_negotiated_config(
4543 &mut self,
4544 _config: &crate::transport_parameters::NatTraversalConfig,
4545 ) {
4546 let max_candidates = 50; let coordination_timeout = Duration::from_secs(10); self.nat_traversal = Some(NatTraversalState::new(max_candidates, coordination_timeout));
4553
4554 trace!("NAT traversal initialized for symmetric P2P node");
4555
4556 self.prepare_address_observation();
4559 self.schedule_candidate_discovery();
4560 self.prepare_coordination_handling();
4561 }
4562
4563 fn initiate_nat_traversal_process(&mut self) {
4565 if let Some(nat_state) = &mut self.nat_traversal {
4566 match nat_state.start_candidate_discovery() {
4567 Ok(()) => {
4568 debug!("NAT traversal process initiated - candidate discovery started");
4569 self.timers.set(
4571 Timer::NatTraversal,
4572 Instant::now() + Duration::from_millis(100),
4573 );
4574 }
4575 Err(e) => {
4576 warn!("Failed to initiate NAT traversal process: {}", e);
4577 }
4578 }
4579 }
4580 }
4581
4582 fn prepare_address_observation(&mut self) {
4584 debug!("Preparing for address observation as bootstrap node");
4585 }
4588
4589 fn schedule_candidate_discovery(&mut self) {
4591 debug!("Scheduling candidate discovery for client endpoint");
4592 self.timers.set(
4594 Timer::NatTraversal,
4595 Instant::now() + Duration::from_millis(50),
4596 );
4597 }
4598
4599 fn prepare_coordination_handling(&mut self) {
4601 debug!("Preparing to handle coordination requests as server endpoint");
4602 }
4605
4606 fn handle_nat_traversal_timeout(&mut self, now: Instant) {
4608 let timeout_result = if let Some(nat_state) = &mut self.nat_traversal {
4610 nat_state.handle_timeout(now)
4611 } else {
4612 return;
4613 };
4614
4615 match timeout_result {
4617 Ok(actions) => {
4618 for action in actions {
4619 match action {
4620 nat_traversal::TimeoutAction::RetryDiscovery => {
4621 debug!("NAT traversal timeout: retrying candidate discovery");
4622 if let Some(nat_state) = &mut self.nat_traversal {
4623 if let Err(e) = nat_state.start_candidate_discovery() {
4624 warn!("Failed to retry candidate discovery: {}", e);
4625 }
4626 }
4627 }
4628 nat_traversal::TimeoutAction::RetryCoordination => {
4629 debug!("NAT traversal timeout: retrying coordination");
4630 self.timers
4632 .set(Timer::NatTraversal, now + Duration::from_secs(2));
4633 }
4634 nat_traversal::TimeoutAction::StartValidation => {
4635 debug!("NAT traversal timeout: starting path validation");
4636 self.start_nat_traversal_validation(now);
4637 }
4638 nat_traversal::TimeoutAction::Complete => {
4639 debug!("NAT traversal completed successfully");
4640 self.timers.stop(Timer::NatTraversal);
4642 }
4643 nat_traversal::TimeoutAction::Failed => {
4644 warn!("NAT traversal failed after timeout");
4645 self.handle_nat_traversal_failure();
4647 }
4648 }
4649 }
4650 }
4651 Err(e) => {
4652 warn!("NAT traversal timeout handling failed: {}", e);
4653 self.handle_nat_traversal_failure();
4654 }
4655 }
4656 }
4657
4658 fn start_nat_traversal_validation(&mut self, now: Instant) {
4660 if let Some(nat_state) = &mut self.nat_traversal {
4661 let pairs = nat_state.get_next_validation_pairs(3);
4663
4664 for pair in pairs {
4665 let challenge = self.rng.r#gen();
4667 self.path.challenge = Some(challenge);
4668 self.path.challenge_pending = true;
4669
4670 debug!(
4671 "Starting path validation for NAT traversal candidate: {}",
4672 pair.remote_addr
4673 );
4674 }
4675
4676 self.timers
4678 .set(Timer::PathValidation, now + Duration::from_secs(3));
4679 }
4680 }
4681
4682 fn handle_nat_traversal_failure(&mut self) {
4684 warn!("NAT traversal failed, considering fallback options");
4685
4686 self.nat_traversal = None;
4688 self.timers.stop(Timer::NatTraversal);
4689
4690 debug!("NAT traversal disabled for this connection due to failure");
4697 }
4698
4699 pub fn nat_traversal_supported(&self) -> bool {
4701 self.nat_traversal.is_some()
4702 && self.config.nat_traversal_config.is_some()
4703 && self.peer_params.nat_traversal.is_some()
4704 }
4705
4706 pub fn nat_traversal_config(&self) -> Option<&crate::transport_parameters::NatTraversalConfig> {
4708 self.peer_params.nat_traversal.as_ref()
4709 }
4710
4711 pub fn nat_traversal_ready(&self) -> bool {
4713 self.nat_traversal_supported() && matches!(self.state, State::Established)
4714 }
4715
4716 #[allow(dead_code)]
4721 pub(crate) fn nat_traversal_stats(&self) -> Option<nat_traversal::NatTraversalStats> {
4722 self.nat_traversal.as_ref().map(|state| state.stats.clone())
4723 }
4724
4725 #[cfg(test)]
4729 #[allow(dead_code)]
4730 pub(crate) fn force_enable_nat_traversal(&mut self) {
4731 use crate::transport_parameters::NatTraversalConfig;
4732
4733 let config = NatTraversalConfig::ServerSupport {
4735 concurrency_limit: VarInt::from_u32(5),
4736 };
4737
4738 self.peer_params.nat_traversal = Some(config.clone());
4739 self.config = Arc::new({
4740 let mut transport_config = (*self.config).clone();
4741 transport_config.nat_traversal_config = Some(config);
4742 transport_config
4743 });
4744
4745 self.nat_traversal = Some(NatTraversalState::new(8, Duration::from_secs(10)));
4747 }
4748
4749 fn handle_add_address(
4751 &mut self,
4752 add_address: &crate::frame::AddAddress,
4753 now: Instant,
4754 ) -> Result<(), TransportError> {
4755 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4756 TransportError::PROTOCOL_VIOLATION("AddAddress frame without NAT traversal negotiation")
4757 })?;
4758
4759 let normalized_addr = crate::shared::normalize_socket_addr(add_address.address);
4762
4763 info!(
4764 "handle_add_address: RECEIVED ADD_ADDRESS from peer addr={} (normalized={}) seq={} priority={}",
4765 add_address.address, normalized_addr, add_address.sequence, add_address.priority
4766 );
4767
4768 match nat_state.add_remote_candidate(
4769 add_address.sequence,
4770 normalized_addr,
4771 add_address.priority,
4772 now,
4773 ) {
4774 Ok(()) => {
4775 info!(
4776 "Added remote candidate: {} (seq={}, priority={})",
4777 normalized_addr, add_address.sequence, add_address.priority
4778 );
4779
4780 self.endpoint_events.push_back(
4782 crate::shared::EndpointEventInner::PeerAddressAdvertised {
4783 peer_addr: self.path.remote,
4784 advertised_addr: normalized_addr,
4785 },
4786 );
4787
4788 self.trigger_candidate_validation(normalized_addr, now)?;
4790 Ok(())
4791 }
4792 Err(NatTraversalError::TooManyCandidates) => Err(TransportError::PROTOCOL_VIOLATION(
4793 "too many NAT traversal candidates",
4794 )),
4795 Err(NatTraversalError::DuplicateAddress) => {
4796 Ok(())
4798 }
4799 Err(e) => {
4800 warn!("Failed to add remote candidate: {}", e);
4801 Ok(()) }
4803 }
4804 }
4805
4806 fn handle_punch_me_now(
4810 &mut self,
4811 punch_me_now: &crate::frame::PunchMeNow,
4812 now: Instant,
4813 ) -> Result<(), TransportError> {
4814 trace!(
4815 "Received PunchMeNow: round={}, target_seq={}, local_addr={}",
4816 punch_me_now.round, punch_me_now.paired_with_sequence_number, punch_me_now.address
4817 );
4818
4819 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4821 TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation")
4822 })?;
4823
4824 let target = nat_traversal::PunchTarget {
4827 remote_addr: punch_me_now.address,
4828 remote_sequence: punch_me_now.paired_with_sequence_number,
4829 challenge: self.rng.r#gen(),
4830 };
4831
4832 if let Err(_e) =
4833 nat_state.prime_passive_coordination_target(punch_me_now.round, target, now)
4834 {
4835 debug!(
4836 "Failed to prime passive coordination for round {}",
4837 punch_me_now.round
4838 );
4839 } else {
4840 trace!(
4841 "Passive coordination primed for round {}",
4842 punch_me_now.round
4843 );
4844 }
4845
4846 Ok(())
4847 }
4848
4849 fn handle_remove_address(
4851 &mut self,
4852 remove_address: &crate::frame::RemoveAddress,
4853 ) -> Result<(), TransportError> {
4854 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4855 TransportError::PROTOCOL_VIOLATION(
4856 "RemoveAddress frame without NAT traversal negotiation",
4857 )
4858 })?;
4859
4860 if nat_state.remove_candidate(remove_address.sequence) {
4861 trace!(
4862 "Removed candidate with sequence {}",
4863 remove_address.sequence
4864 );
4865 } else {
4866 trace!(
4867 "Attempted to remove unknown candidate sequence {}",
4868 remove_address.sequence
4869 );
4870 }
4871
4872 Ok(())
4873 }
4874
4875 fn handle_observed_address_frame(
4877 &mut self,
4878 observed_address: &crate::frame::ObservedAddress,
4879 now: Instant,
4880 ) -> Result<(), TransportError> {
4881 tracing::info!(
4882 address = %observed_address.address,
4883 sequence = %observed_address.sequence_number,
4884 from_peer = %self.peer_id_for_tokens.map(|pid| format!("{pid}")).unwrap_or_else(|| "unknown".to_string()),
4885 "handle_observed_address_frame: RECEIVED OBSERVED_ADDRESS from peer"
4886 );
4887 let state = self.address_discovery_state.as_mut().ok_or_else(|| {
4889 TransportError::PROTOCOL_VIOLATION(
4890 "ObservedAddress frame without address discovery negotiation",
4891 )
4892 })?;
4893
4894 if !state.enabled {
4896 return Err(TransportError::PROTOCOL_VIOLATION(
4897 "ObservedAddress frame received when address discovery is disabled",
4898 ));
4899 }
4900
4901 #[cfg(feature = "trace")]
4903 {
4904 use crate::trace_observed_address_received;
4905 let peer_bytes = self
4906 .peer_id_for_tokens
4907 .as_ref()
4908 .map(|pid| pid.0)
4909 .unwrap_or([0u8; 32]);
4910 trace_observed_address_received!(
4911 &self.event_log,
4912 self.trace_context.trace_id(),
4913 observed_address.address,
4914 0u64, peer_bytes
4916 );
4917 }
4918
4919 let path_id = 0u64; if let Some(&last_seq) = state.last_received_sequence.get(&path_id) {
4927 if observed_address.sequence_number <= last_seq {
4928 trace!(
4929 "Ignoring OBSERVED_ADDRESS frame with stale sequence number {} (last was {})",
4930 observed_address.sequence_number, last_seq
4931 );
4932 return Ok(());
4933 }
4934 }
4935
4936 state
4938 .last_received_sequence
4939 .insert(path_id, observed_address.sequence_number);
4940
4941 let normalized_addr = crate::shared::normalize_socket_addr(observed_address.address);
4944
4945 state.handle_observed_address(normalized_addr, path_id, now);
4947
4948 self.path.update_observed_address(normalized_addr, now);
4950
4951 trace!(
4953 "Received ObservedAddress frame: address={} for path={}",
4954 observed_address.address, path_id
4955 );
4956
4957 Ok(())
4958 }
4959
4960 fn handle_try_connect_to(
4965 &mut self,
4966 try_connect_to: &crate::frame::TryConnectTo,
4967 now: Instant,
4968 ) -> Result<(), TransportError> {
4969 trace!(
4970 "Received TryConnectTo: request_id={}, target={}, timeout_ms={}",
4971 try_connect_to.request_id, try_connect_to.target_address, try_connect_to.timeout_ms
4972 );
4973
4974 let target = try_connect_to.target_address;
4976
4977 let allow_loopback = allow_loopback_from_env();
4979 if target.ip().is_loopback() && !allow_loopback {
4980 warn!(
4981 "Rejecting TryConnectTo request to loopback address: {}",
4982 target
4983 );
4984 let response = crate::frame::TryConnectToResponse {
4986 request_id: try_connect_to.request_id,
4987 success: false,
4988 error_code: Some(crate::frame::TryConnectError::InvalidAddress),
4989 source_address: self.path.remote,
4990 };
4991 self.spaces[SpaceId::Data]
4992 .pending
4993 .try_connect_to_responses
4994 .push(response);
4995 return Ok(());
4996 }
4997
4998 if target.ip().is_unspecified() {
5000 warn!(
5001 "Rejecting TryConnectTo request to unspecified address: {}",
5002 target
5003 );
5004 let response = crate::frame::TryConnectToResponse {
5005 request_id: try_connect_to.request_id,
5006 success: false,
5007 error_code: Some(crate::frame::TryConnectError::InvalidAddress),
5008 source_address: self.path.remote,
5009 };
5010 self.spaces[SpaceId::Data]
5011 .pending
5012 .try_connect_to_responses
5013 .push(response);
5014 return Ok(());
5015 }
5016
5017 self.endpoint_events
5020 .push_back(EndpointEventInner::TryConnectTo {
5021 request_id: try_connect_to.request_id,
5022 target_address: try_connect_to.target_address,
5023 timeout_ms: try_connect_to.timeout_ms,
5024 requester_connection: self.path.remote,
5025 requested_at: now,
5026 });
5027
5028 trace!(
5029 "Queued TryConnectTo attempt for request_id={}",
5030 try_connect_to.request_id
5031 );
5032
5033 Ok(())
5034 }
5035
5036 fn handle_try_connect_to_response(
5038 &mut self,
5039 response: &crate::frame::TryConnectToResponse,
5040 ) -> Result<(), TransportError> {
5041 trace!(
5042 "Received TryConnectToResponse: request_id={}, success={}, error={:?}, source={}",
5043 response.request_id, response.success, response.error_code, response.source_address
5044 );
5045
5046 if response.success {
5049 debug!(
5050 "TryConnectTo succeeded: target can receive connections from {}",
5051 response.source_address
5052 );
5053
5054 if let Some(nat_state) = &mut self.nat_traversal {
5056 nat_state
5057 .record_successful_callback_probe(response.request_id, response.source_address);
5058 }
5059 } else {
5060 debug!("TryConnectTo failed with error: {:?}", response.error_code);
5061
5062 if let Some(nat_state) = &mut self.nat_traversal {
5064 nat_state.record_failed_callback_probe(response.request_id, response.error_code);
5065 }
5066 }
5067
5068 Ok(())
5069 }
5070
5071 pub fn queue_add_address(&mut self, sequence: VarInt, address: SocketAddr, priority: VarInt) {
5073 let add_address = frame::AddAddress {
5075 sequence,
5076 address,
5077 priority,
5078 };
5079
5080 self.spaces[SpaceId::Data]
5081 .pending
5082 .add_addresses
5083 .push(add_address);
5084 trace!(
5085 "Queued AddAddress frame: seq={}, addr={}, priority={}",
5086 sequence, address, priority
5087 );
5088 }
5089
5090 pub fn queue_punch_me_now(
5092 &mut self,
5093 round: VarInt,
5094 paired_with_sequence_number: VarInt,
5095 address: SocketAddr,
5096 ) {
5097 self.queue_punch_me_now_with_target(round, paired_with_sequence_number, address, None);
5098 }
5099
5100 pub fn queue_punch_me_now_with_target(
5112 &mut self,
5113 round: VarInt,
5114 paired_with_sequence_number: VarInt,
5115 address: SocketAddr,
5116 target_peer_id: Option<[u8; 32]>,
5117 ) {
5118 let punch_me_now = frame::PunchMeNow {
5119 round,
5120 paired_with_sequence_number,
5121 address,
5122 target_peer_id,
5123 };
5124
5125 self.spaces[SpaceId::Data]
5126 .pending
5127 .punch_me_now
5128 .push(punch_me_now);
5129
5130 if target_peer_id.is_some() {
5131 trace!(
5132 "Queued PunchMeNow frame for relay: round={}, target_seq={}, target_peer={:?}",
5133 round,
5134 paired_with_sequence_number,
5135 target_peer_id.map(|p| hex::encode(&p[..8]))
5136 );
5137 } else {
5138 trace!(
5139 "Queued PunchMeNow frame: round={}, target={}",
5140 round, paired_with_sequence_number
5141 );
5142 }
5143 }
5144
5145 pub fn queue_remove_address(&mut self, sequence: VarInt) {
5147 let remove_address = frame::RemoveAddress { sequence };
5148
5149 self.spaces[SpaceId::Data]
5150 .pending
5151 .remove_addresses
5152 .push(remove_address);
5153 trace!("Queued RemoveAddress frame: seq={}", sequence);
5154 }
5155
5156 pub fn queue_observed_address(&mut self, address: SocketAddr) {
5158 let sequence_number = if let Some(state) = &mut self.address_discovery_state {
5160 let seq = state.next_sequence_number;
5161 state.next_sequence_number =
5162 VarInt::from_u64(state.next_sequence_number.into_inner() + 1)
5163 .expect("sequence number overflow");
5164 seq
5165 } else {
5166 VarInt::from_u32(0)
5168 };
5169
5170 let observed_address = frame::ObservedAddress {
5171 sequence_number,
5172 address,
5173 };
5174 self.spaces[SpaceId::Data]
5175 .pending
5176 .outbound_observations
5177 .push(observed_address);
5178 trace!("Queued ObservedAddress frame: addr={}", address);
5179 }
5180
5181 pub fn check_for_address_observations(&mut self, now: Instant) {
5183 let Some(state) = &mut self.address_discovery_state else {
5185 return;
5186 };
5187
5188 if !state.enabled {
5190 return;
5191 }
5192
5193 if self.peer_params.address_discovery.is_none() {
5196 return;
5197 }
5198
5199 let path_id = 0u64; let remote_address = self.path.remote;
5204
5205 if state.should_send_observation(path_id, now) {
5207 if let Some(frame) = state.queue_observed_address_frame(path_id, remote_address) {
5209 self.spaces[SpaceId::Data]
5211 .pending
5212 .outbound_observations
5213 .push(frame);
5214
5215 state.record_observation_sent(path_id);
5217
5218 #[cfg(feature = "trace")]
5220 {
5221 use crate::trace_observed_address_sent;
5222 trace_observed_address_sent!(
5224 &self.event_log,
5225 self.trace_context.trace_id(),
5226 remote_address,
5227 path_id
5228 );
5229 }
5230
5231 trace!(
5232 "Queued OBSERVED_ADDRESS frame for path {} with address {}",
5233 path_id, remote_address
5234 );
5235 }
5236 }
5237 }
5238
5239 fn trigger_candidate_validation(
5241 &mut self,
5242 candidate_address: SocketAddr,
5243 now: Instant,
5244 ) -> Result<(), TransportError> {
5245 let nat_state = self
5246 .nat_traversal
5247 .as_mut()
5248 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5249
5250 let already_validating = nat_state.active_validations.values().any(|v| {
5253 crate::shared::normalize_socket_addr(v.target_addr)
5254 == crate::shared::normalize_socket_addr(candidate_address)
5255 });
5256 if already_validating {
5257 trace!("Validation already in progress for {}", candidate_address);
5258 return Ok(());
5259 }
5260
5261 let sequence = nat_state
5263 .remote_candidates
5264 .iter()
5265 .find(|(_, c)| {
5266 crate::shared::normalize_socket_addr(c.address)
5267 == crate::shared::normalize_socket_addr(candidate_address)
5268 })
5269 .map(|(seq, _)| *seq)
5270 .unwrap_or(crate::VarInt::from_u32(0));
5271
5272 let challenge = self.rng.r#gen::<u64>();
5274
5275 let validation_state = nat_traversal::PathValidationState {
5277 challenge,
5278 sequence,
5279 target_addr: candidate_address,
5280 sent_at: now,
5281 retry_count: 0,
5282 max_retries: 3,
5283 coordination_round: None,
5284 timeout_state: nat_traversal::AdaptiveTimeoutState::new(),
5285 last_retry_at: None,
5286 };
5287
5288 nat_state
5290 .active_validations
5291 .insert(challenge, validation_state);
5292
5293 nat_state.stats.validations_succeeded += 1; trace!(
5299 "Triggered PATH_CHALLENGE validation for {} with challenge {:016x}",
5300 candidate_address, challenge
5301 );
5302
5303 Ok(())
5304 }
5305
5306 pub fn nat_traversal_state(&self) -> Option<(usize, usize)> {
5311 self.nat_traversal
5312 .as_ref()
5313 .map(|state| (state.local_candidates.len(), state.remote_candidates.len()))
5314 }
5315
5316 pub fn initiate_nat_traversal_coordination(
5318 &mut self,
5319 now: Instant,
5320 ) -> Result<(), TransportError> {
5321 let nat_state = self
5322 .nat_traversal
5323 .as_mut()
5324 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5325
5326 if nat_state.should_send_punch_request() {
5328 nat_state.generate_candidate_pairs(now);
5330
5331 let pairs = nat_state.get_next_validation_pairs(3);
5333 if pairs.is_empty() {
5334 return Err(TransportError::PROTOCOL_VIOLATION(
5335 "No candidate pairs for coordination",
5336 ));
5337 }
5338
5339 let targets: Vec<_> = pairs
5341 .into_iter()
5342 .map(|pair| nat_traversal::PunchTarget {
5343 remote_addr: pair.remote_addr,
5344 remote_sequence: pair.remote_sequence,
5345 challenge: self.rng.r#gen(),
5346 })
5347 .collect();
5348
5349 let round = nat_state
5351 .start_coordination_round(targets, now)
5352 .map_err(|_e| {
5353 TransportError::PROTOCOL_VIOLATION("Failed to start coordination round")
5354 })?;
5355
5356 let local_addr = self
5359 .local_ip
5360 .map(|ip| SocketAddr::new(ip, self.local_ip.map(|_| 0).unwrap_or(0)))
5361 .unwrap_or_else(|| {
5362 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
5363 });
5364
5365 let punch_me_now = frame::PunchMeNow {
5366 round,
5367 paired_with_sequence_number: VarInt::from_u32(0), address: local_addr,
5369 target_peer_id: None, };
5371
5372 self.spaces[SpaceId::Data]
5373 .pending
5374 .punch_me_now
5375 .push(punch_me_now);
5376 nat_state.mark_punch_request_sent();
5377
5378 trace!("Initiated NAT traversal coordination round {}", round);
5379 }
5380
5381 Ok(())
5382 }
5383
5384 pub fn validate_nat_candidates(&mut self, now: Instant) {
5386 self.generate_nat_traversal_challenges(now);
5387 }
5388
5389 pub fn send_nat_address_advertisement(
5404 &mut self,
5405 address: SocketAddr,
5406 priority: u32,
5407 ) -> Result<u64, ConnectionError> {
5408 let normalized_addr = crate::shared::normalize_socket_addr(address);
5411
5412 if !is_valid_nat_advertisement_address(normalized_addr) {
5413 debug!(
5414 "Skipping NAT address advertisement for invalid candidate {}",
5415 normalized_addr
5416 );
5417 return Err(ConnectionError::TransportError(
5418 TransportError::PROTOCOL_VIOLATION("invalid NAT candidate address"),
5419 ));
5420 }
5421
5422 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5424 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5425 "NAT traversal not enabled on this connection",
5426 ))
5427 })?;
5428
5429 let sequence = nat_state.next_sequence;
5431 nat_state.next_sequence =
5432 VarInt::from_u64(nat_state.next_sequence.into_inner() + 1).unwrap();
5433
5434 let now = Instant::now();
5436 nat_state.local_candidates.insert(
5437 sequence,
5438 nat_traversal::AddressCandidate {
5439 address: normalized_addr,
5440 priority,
5441 source: nat_traversal::CandidateSource::Local,
5442 discovered_at: now,
5443 state: nat_traversal::CandidateState::New,
5444 attempt_count: 0,
5445 last_attempt: None,
5446 },
5447 );
5448
5449 nat_state.stats.local_candidates_sent += 1;
5451
5452 self.queue_add_address(sequence, normalized_addr, VarInt::from_u32(priority));
5454
5455 debug!(
5456 "Queued ADD_ADDRESS frame: addr={} (normalized from {}), priority={}, seq={}",
5457 normalized_addr, address, priority, sequence
5458 );
5459 Ok(sequence.into_inner())
5460 }
5461
5462 pub fn send_nat_punch_coordination(
5475 &mut self,
5476 paired_with_sequence_number: u64,
5477 address: SocketAddr,
5478 round: u32,
5479 ) -> Result<(), ConnectionError> {
5480 let _nat_state = self.nat_traversal.as_ref().ok_or_else(|| {
5482 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5483 "NAT traversal not enabled on this connection",
5484 ))
5485 })?;
5486
5487 self.queue_punch_me_now(
5489 VarInt::from_u32(round),
5490 VarInt::from_u64(paired_with_sequence_number).map_err(|_| {
5491 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5492 "Invalid target sequence number",
5493 ))
5494 })?,
5495 address,
5496 );
5497
5498 debug!(
5499 "Queued PUNCH_ME_NOW frame: paired_with_seq={}, addr={}, round={}",
5500 paired_with_sequence_number, address, round
5501 );
5502 Ok(())
5503 }
5504
5505 pub fn send_nat_punch_via_relay(
5519 &mut self,
5520 target_peer_id: [u8; 32],
5521 our_address: SocketAddr,
5522 round: u32,
5523 ) -> Result<(), ConnectionError> {
5524 let _nat_state = self.nat_traversal.as_ref().ok_or_else(|| {
5526 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5527 "NAT traversal not enabled on this connection",
5528 ))
5529 })?;
5530
5531 self.queue_punch_me_now_with_target(
5533 VarInt::from_u32(round),
5534 VarInt::from_u32(0), our_address,
5536 Some(target_peer_id),
5537 );
5538
5539 info!(
5540 "Queued PUNCH_ME_NOW for relay: target_peer={}, our_addr={}, round={}",
5541 hex::encode(&target_peer_id[..8]),
5542 our_address,
5543 round
5544 );
5545 Ok(())
5546 }
5547
5548 pub fn send_nat_address_removal(&mut self, sequence: u64) -> Result<(), ConnectionError> {
5559 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5561 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5562 "NAT traversal not enabled on this connection",
5563 ))
5564 })?;
5565
5566 let sequence_varint = VarInt::from_u64(sequence).map_err(|_| {
5567 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5568 "Invalid sequence number",
5569 ))
5570 })?;
5571
5572 nat_state.local_candidates.remove(&sequence_varint);
5574
5575 self.queue_remove_address(sequence_varint);
5577
5578 debug!("Queued REMOVE_ADDRESS frame: seq={}", sequence);
5579 Ok(())
5580 }
5581
5582 #[allow(dead_code)]
5591 pub(crate) fn get_nat_traversal_stats(&self) -> Option<&nat_traversal::NatTraversalStats> {
5592 self.nat_traversal.as_ref().map(|state| &state.stats)
5593 }
5594
5595 pub fn is_nat_traversal_enabled(&self) -> bool {
5597 self.nat_traversal.is_some()
5598 }
5599
5600 fn negotiate_address_discovery(&mut self, peer_params: &TransportParameters) {
5604 let now = Instant::now();
5605
5606 info!(
5607 "negotiate_address_discovery: peer_params.address_discovery = {:?}",
5608 peer_params.address_discovery
5609 );
5610
5611 match &peer_params.address_discovery {
5613 Some(peer_config) => {
5614 info!("Peer supports address discovery: {:?}", peer_config);
5616 if let Some(state) = &mut self.address_discovery_state {
5617 if state.enabled {
5618 info!(
5621 "Address discovery negotiated successfully: rate={}, all_paths={}",
5622 state.max_observation_rate, state.observe_all_paths
5623 );
5624 } else {
5625 info!("Address discovery disabled locally, ignoring peer support");
5627 }
5628 } else {
5629 self.address_discovery_state =
5631 Some(AddressDiscoveryState::new(peer_config, now));
5632 info!("Address discovery initialized from peer config");
5633 }
5634 }
5635 _ => {
5636 warn!("Peer does NOT support address discovery (transport parameter not present)");
5638 if let Some(state) = &mut self.address_discovery_state {
5639 state.enabled = false;
5640 }
5641 }
5642 }
5643
5644 if let Some(state) = &self.address_discovery_state {
5646 if state.enabled {
5647 self.path.set_observation_rate(state.max_observation_rate);
5648 }
5649 }
5650 }
5651
5652 fn decrypt_packet(
5653 &mut self,
5654 now: Instant,
5655 packet: &mut Packet,
5656 ) -> Result<Option<u64>, Option<TransportError>> {
5657 let result = packet_crypto::decrypt_packet_body(
5658 packet,
5659 &self.spaces,
5660 self.zero_rtt_crypto.as_ref(),
5661 self.key_phase,
5662 self.prev_crypto.as_ref(),
5663 self.next_crypto.as_ref(),
5664 )?;
5665
5666 let result = match result {
5667 Some(r) => r,
5668 None => return Ok(None),
5669 };
5670
5671 if result.outgoing_key_update_acked {
5672 if let Some(prev) = self.prev_crypto.as_mut() {
5673 prev.end_packet = Some((result.number, now));
5674 self.set_key_discard_timer(now, packet.header.space());
5675 }
5676 }
5677
5678 if result.incoming_key_update {
5679 trace!("key update authenticated");
5680 self.update_keys(Some((result.number, now)), true);
5681 self.set_key_discard_timer(now, packet.header.space());
5682 }
5683
5684 Ok(Some(result.number))
5685 }
5686
5687 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5688 trace!("executing key update");
5689 let new = self
5693 .crypto
5694 .next_1rtt_keys()
5695 .expect("only called for `Data` packets");
5696 self.key_phase_size = new
5697 .local
5698 .confidentiality_limit()
5699 .saturating_sub(KEY_UPDATE_MARGIN);
5700 let old = mem::replace(
5701 &mut self.spaces[SpaceId::Data]
5702 .crypto
5703 .as_mut()
5704 .unwrap() .packet,
5706 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5707 );
5708 self.spaces[SpaceId::Data].sent_with_keys = 0;
5709 self.prev_crypto = Some(PrevCrypto {
5710 crypto: old,
5711 end_packet,
5712 update_unacked: remote,
5713 });
5714 self.key_phase = !self.key_phase;
5715 }
5716
5717 fn peer_supports_ack_frequency(&self) -> bool {
5718 self.peer_params.min_ack_delay.is_some()
5719 }
5720
5721 pub(crate) fn immediate_ack(&mut self) {
5726 self.spaces[self.highest_space].immediate_ack_pending = true;
5727 }
5728
5729 #[cfg(test)]
5731 #[allow(dead_code)]
5732 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5733 let (first_decode, remaining) = match &event.0 {
5734 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5735 first_decode,
5736 remaining,
5737 ..
5738 }) => (first_decode, remaining),
5739 _ => return None,
5740 };
5741
5742 if remaining.is_some() {
5743 panic!("Packets should never be coalesced in tests");
5744 }
5745
5746 let decrypted_header = packet_crypto::unprotect_header(
5747 first_decode.clone(),
5748 &self.spaces,
5749 self.zero_rtt_crypto.as_ref(),
5750 self.peer_params.stateless_reset_token,
5751 )?;
5752
5753 let mut packet = decrypted_header.packet?;
5754 packet_crypto::decrypt_packet_body(
5755 &mut packet,
5756 &self.spaces,
5757 self.zero_rtt_crypto.as_ref(),
5758 self.key_phase,
5759 self.prev_crypto.as_ref(),
5760 self.next_crypto.as_ref(),
5761 )
5762 .ok()?;
5763
5764 Some(packet.payload.to_vec())
5765 }
5766
5767 #[cfg(test)]
5770 #[allow(dead_code)]
5771 pub(crate) fn bytes_in_flight(&self) -> u64 {
5772 self.path.in_flight.bytes
5773 }
5774
5775 #[cfg(test)]
5777 #[allow(dead_code)]
5778 pub(crate) fn congestion_window(&self) -> u64 {
5779 self.path
5780 .congestion
5781 .window()
5782 .saturating_sub(self.path.in_flight.bytes)
5783 }
5784
5785 #[cfg(test)]
5787 #[allow(dead_code)]
5788 pub(crate) fn is_idle(&self) -> bool {
5789 Timer::VALUES
5790 .iter()
5791 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
5792 .filter_map(|&t| Some((t, self.timers.get(t)?)))
5793 .min_by_key(|&(_, time)| time)
5794 .is_none_or(|(timer, _)| timer == Timer::Idle)
5795 }
5796
5797 #[cfg(test)]
5799 #[allow(dead_code)]
5800 pub(crate) fn lost_packets(&self) -> u64 {
5801 self.lost_packets
5802 }
5803
5804 #[cfg(test)]
5806 #[allow(dead_code)]
5807 pub(crate) fn using_ecn(&self) -> bool {
5808 self.path.sending_ecn
5809 }
5810
5811 #[cfg(test)]
5813 #[allow(dead_code)]
5814 pub(crate) fn total_recvd(&self) -> u64 {
5815 self.path.total_recvd
5816 }
5817
5818 #[cfg(test)]
5819 #[allow(dead_code)]
5820 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
5821 self.local_cid_state.active_seq()
5822 }
5823
5824 #[cfg(test)]
5827 #[allow(dead_code)]
5828 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
5829 let n = self.local_cid_state.assign_retire_seq(v);
5830 self.endpoint_events
5831 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
5832 }
5833
5834 #[cfg(test)]
5836 #[allow(dead_code)]
5837 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
5838 self.rem_cids.active_seq()
5839 }
5840
5841 #[cfg(test)]
5843 #[cfg(test)]
5844 #[allow(dead_code)]
5845 pub(crate) fn path_mtu(&self) -> u16 {
5846 self.path.current_mtu()
5847 }
5848
5849 fn can_send_1rtt(&self, max_size: usize) -> bool {
5853 self.streams.can_send_stream_data()
5854 || self.path.challenge_pending
5855 || self
5856 .prev_path
5857 .as_ref()
5858 .is_some_and(|(_, x)| x.challenge_pending)
5859 || !self.path_responses.is_empty()
5860 || self
5861 .datagrams
5862 .outgoing
5863 .front()
5864 .is_some_and(|x| x.size(true) <= max_size)
5865 }
5866
5867 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
5869 for path in [&mut self.path]
5871 .into_iter()
5872 .chain(self.prev_path.as_mut().map(|(_, data)| data))
5873 {
5874 if path.remove_in_flight(pn, packet) {
5875 return;
5876 }
5877 }
5878 }
5879
5880 fn kill(&mut self, reason: ConnectionError) {
5882 self.close_common();
5883 self.error = Some(reason);
5884 self.state = State::Drained;
5885 self.endpoint_events.push_back(EndpointEventInner::Drained);
5886 }
5887
5888 fn generate_nat_traversal_challenges(&mut self, now: Instant) {
5890 let candidates: Vec<(VarInt, SocketAddr)> = if let Some(nat_state) = &self.nat_traversal {
5892 nat_state
5893 .get_validation_candidates()
5894 .into_iter()
5895 .take(3) .map(|(seq, candidate)| (seq, candidate.address))
5897 .collect()
5898 } else {
5899 return;
5900 };
5901
5902 if candidates.is_empty() {
5903 return;
5904 }
5905
5906 if let Some(nat_state) = &mut self.nat_traversal {
5908 for (seq, address) in candidates {
5909 let challenge: u64 = self.rng.r#gen();
5911
5912 if let Err(e) = nat_state.start_validation(seq, challenge, now) {
5914 debug!("Failed to start validation for candidate {}: {}", seq, e);
5915 continue;
5916 }
5917
5918 trace!(
5920 "Started NAT validation for {} with token {:08x}",
5921 address, challenge
5922 );
5923 }
5924 }
5925 }
5926
5927 pub fn current_mtu(&self) -> u16 {
5931 self.path.current_mtu()
5932 }
5933
5934 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
5941 let pn_len = match pn {
5942 Some(pn) => PacketNumber::new(
5943 pn,
5944 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
5945 )
5946 .len(),
5947 None => 4,
5949 };
5950
5951 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
5953 }
5954
5955 fn tag_len_1rtt(&self) -> usize {
5956 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
5957 Some(crypto) => Some(&*crypto.packet.local),
5958 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
5959 };
5960 key.map_or(16, |x| x.tag_len())
5964 }
5965
5966 fn on_path_validated(&mut self) {
5968 self.path.validated = true;
5969 let ConnectionSide::Server { server_config } = &self.side else {
5970 return;
5971 };
5972 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
5973 new_tokens.clear();
5974 for _ in 0..server_config.validation_token.sent {
5975 new_tokens.push(self.path.remote);
5976 }
5977 }
5978}
5979
5980impl fmt::Debug for Connection {
5981 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5982 f.debug_struct("Connection")
5983 .field("handshake_cid", &self.handshake_cid)
5984 .finish()
5985 }
5986}
5987
5988enum ConnectionSide {
5990 Client {
5991 token: Bytes,
5993 token_store: Arc<dyn TokenStore>,
5994 server_name: String,
5995 },
5996 Server {
5997 server_config: Arc<ServerConfig>,
5998 },
5999}
6000
6001impl ConnectionSide {
6002 fn remote_may_migrate(&self) -> bool {
6003 match self {
6004 Self::Server { server_config } => server_config.migration,
6005 Self::Client { .. } => false,
6006 }
6007 }
6008
6009 fn is_client(&self) -> bool {
6010 self.side().is_client()
6011 }
6012
6013 fn is_server(&self) -> bool {
6014 self.side().is_server()
6015 }
6016
6017 fn side(&self) -> Side {
6018 match *self {
6019 Self::Client { .. } => Side::Client,
6020 Self::Server { .. } => Side::Server,
6021 }
6022 }
6023}
6024
6025impl From<SideArgs> for ConnectionSide {
6026 fn from(side: SideArgs) -> Self {
6027 match side {
6028 SideArgs::Client {
6029 token_store,
6030 server_name,
6031 } => Self::Client {
6032 token: token_store.take(&server_name).unwrap_or_default(),
6033 token_store,
6034 server_name,
6035 },
6036 SideArgs::Server {
6037 server_config,
6038 pref_addr_cid: _,
6039 path_validated: _,
6040 } => Self::Server { server_config },
6041 }
6042 }
6043}
6044
6045pub(crate) enum SideArgs {
6047 Client {
6048 token_store: Arc<dyn TokenStore>,
6049 server_name: String,
6050 },
6051 Server {
6052 server_config: Arc<ServerConfig>,
6053 pref_addr_cid: Option<ConnectionId>,
6054 path_validated: bool,
6055 },
6056}
6057
6058impl SideArgs {
6059 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6060 match *self {
6061 Self::Client { .. } => None,
6062 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6063 }
6064 }
6065
6066 pub(crate) fn path_validated(&self) -> bool {
6067 match *self {
6068 Self::Client { .. } => true,
6069 Self::Server { path_validated, .. } => path_validated,
6070 }
6071 }
6072
6073 pub(crate) fn side(&self) -> Side {
6074 match *self {
6075 Self::Client { .. } => Side::Client,
6076 Self::Server { .. } => Side::Server,
6077 }
6078 }
6079}
6080
6081fn is_valid_nat_advertisement_address(address: SocketAddr) -> bool {
6082 if address.port() == 0 {
6083 return false;
6084 }
6085
6086 match address.ip() {
6087 IpAddr::V4(ipv4) => !ipv4.is_unspecified() && !ipv4.is_broadcast() && !ipv4.is_multicast(),
6088 IpAddr::V6(ipv6) => !ipv6.is_unspecified() && !ipv6.is_multicast(),
6089 }
6090}
6091
6092#[derive(Debug, Error, Clone, PartialEq, Eq)]
6094pub enum ConnectionError {
6095 #[error("peer doesn't implement any supported version")]
6097 VersionMismatch,
6098 #[error(transparent)]
6100 TransportError(#[from] TransportError),
6101 #[error("aborted by peer: {0}")]
6103 ConnectionClosed(frame::ConnectionClose),
6104 #[error("closed by peer: {0}")]
6106 ApplicationClosed(frame::ApplicationClose),
6107 #[error("reset by peer")]
6109 Reset,
6110 #[error("timed out")]
6116 TimedOut,
6117 #[error("closed")]
6119 LocallyClosed,
6120 #[error("CIDs exhausted")]
6124 CidsExhausted,
6125}
6126
6127impl From<Close> for ConnectionError {
6128 fn from(x: Close) -> Self {
6129 match x {
6130 Close::Connection(reason) => Self::ConnectionClosed(reason),
6131 Close::Application(reason) => Self::ApplicationClosed(reason),
6132 }
6133 }
6134}
6135
6136impl From<ConnectionError> for io::Error {
6138 fn from(x: ConnectionError) -> Self {
6139 use ConnectionError::*;
6140 let kind = match x {
6141 TimedOut => io::ErrorKind::TimedOut,
6142 Reset => io::ErrorKind::ConnectionReset,
6143 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6144 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6145 io::ErrorKind::Other
6146 }
6147 };
6148 Self::new(kind, x)
6149 }
6150}
6151
6152#[derive(Clone, Debug)]
6153pub enum State {
6155 Handshake(state::Handshake),
6157 Established,
6159 Closed(state::Closed),
6161 Draining,
6163 Drained,
6165}
6166
6167impl State {
6168 fn closed<R: Into<Close>>(reason: R) -> Self {
6169 Self::Closed(state::Closed {
6170 reason: reason.into(),
6171 })
6172 }
6173
6174 fn is_handshake(&self) -> bool {
6175 matches!(*self, Self::Handshake(_))
6176 }
6177
6178 fn is_established(&self) -> bool {
6179 matches!(*self, Self::Established)
6180 }
6181
6182 fn is_closed(&self) -> bool {
6183 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
6184 }
6185
6186 fn is_drained(&self) -> bool {
6187 matches!(*self, Self::Drained)
6188 }
6189}
6190
6191mod state {
6192 use super::*;
6193
6194 #[derive(Clone, Debug)]
6195 pub struct Handshake {
6196 pub(super) rem_cid_set: bool,
6200 pub(super) expected_token: Bytes,
6204 pub(super) client_hello: Option<Bytes>,
6208 }
6209
6210 #[derive(Clone, Debug)]
6211 pub struct Closed {
6212 pub(super) reason: Close,
6213 }
6214}
6215
6216#[derive(Debug)]
6218pub enum Event {
6219 HandshakeDataReady,
6221 Connected,
6223 ConnectionLost {
6227 reason: ConnectionError,
6229 },
6230 Stream(StreamEvent),
6232 DatagramReceived,
6234 DatagramsUnblocked,
6236 DatagramDropped(DatagramDropStats),
6242}
6243
6244fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
6245 if x > y { x - y } else { Duration::ZERO }
6246}
6247
6248fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6249 Duration::from_micros(params.max_ack_delay.0 * 1000)
6250}
6251
6252const MAX_BACKOFF_EXPONENT: u32 = 16;
6254
6255const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6263
6264const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6270 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6271
6272const KEY_UPDATE_MARGIN: u64 = 10_000;
6276
6277#[derive(Default)]
6278struct SentFrames {
6279 retransmits: ThinRetransmits,
6280 largest_acked: Option<u64>,
6281 stream_frames: StreamMetaVec,
6282 non_retransmits: bool,
6284 requires_padding: bool,
6285}
6286
6287impl SentFrames {
6288 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6290 self.largest_acked.is_some()
6291 && !self.non_retransmits
6292 && self.stream_frames.is_empty()
6293 && self.retransmits.is_empty(streams)
6294 }
6295}
6296
6297fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6305 match (x, y) {
6306 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6307 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6308 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6309 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6310 }
6311}
6312
6313#[derive(Debug, Clone)]
6315pub(crate) struct PqcState {
6316 enabled: bool,
6318 #[allow(dead_code)]
6320 algorithms: Option<crate::transport_parameters::PqcAlgorithms>,
6321 handshake_mtu: u16,
6323 using_pqc: bool,
6325 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler,
6327}
6328
6329#[allow(dead_code)]
6330impl PqcState {
6331 fn new() -> Self {
6332 Self {
6333 enabled: false,
6334 algorithms: None,
6335 handshake_mtu: MIN_INITIAL_SIZE,
6336 using_pqc: false,
6337 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler::new(),
6338 }
6339 }
6340
6341 fn min_initial_size(&self) -> u16 {
6343 if self.enabled && self.using_pqc {
6344 std::cmp::max(self.handshake_mtu, 4096)
6346 } else {
6347 MIN_INITIAL_SIZE
6348 }
6349 }
6350
6351 fn update_from_peer_params(&mut self, params: &TransportParameters) {
6353 if let Some(ref algorithms) = params.pqc_algorithms {
6354 self.enabled = true;
6355 self.algorithms = Some(algorithms.clone());
6356 if algorithms.ml_kem_768 || algorithms.ml_dsa_65 {
6358 self.using_pqc = true;
6359 self.handshake_mtu = 4096; }
6361 }
6362 }
6363
6364 fn detect_pqc_from_crypto(&mut self, crypto_data: &[u8], space: SpaceId) {
6366 if !self.enabled {
6367 return;
6368 }
6369 if self.packet_handler.detect_pqc_handshake(crypto_data, space) {
6370 self.using_pqc = true;
6371 self.handshake_mtu = self.packet_handler.get_min_packet_size(space);
6373 }
6374 }
6375
6376 fn should_trigger_mtu_discovery(&mut self) -> bool {
6378 self.packet_handler.should_trigger_mtu_discovery()
6379 }
6380
6381 fn get_mtu_config(&self) -> MtuDiscoveryConfig {
6383 self.packet_handler.get_pqc_mtu_config()
6384 }
6385
6386 fn calculate_crypto_frame_size(&self, available_space: usize, remaining_data: usize) -> usize {
6388 self.packet_handler
6389 .calculate_crypto_frame_size(available_space, remaining_data)
6390 }
6391
6392 fn should_adjust_coalescing(&self, current_size: usize, space: SpaceId) -> bool {
6394 self.packet_handler
6395 .adjust_coalescing_for_pqc(current_size, space)
6396 }
6397
6398 fn on_packet_sent(&mut self, space: SpaceId, size: u16) {
6400 self.packet_handler.on_packet_sent(space, size);
6401 }
6402
6403 fn reset(&mut self) {
6405 self.enabled = false;
6406 self.algorithms = None;
6407 self.handshake_mtu = MIN_INITIAL_SIZE;
6408 self.using_pqc = false;
6409 self.packet_handler.reset();
6410 }
6411}
6412
6413impl Default for PqcState {
6414 fn default() -> Self {
6415 Self::new()
6416 }
6417}
6418
6419#[derive(Debug, Clone)]
6421pub(crate) struct AddressDiscoveryState {
6422 enabled: bool,
6424 max_observation_rate: u8,
6426 observe_all_paths: bool,
6428 sent_observations: std::collections::HashMap<u64, paths::PathAddressInfo>,
6430 received_observations: std::collections::HashMap<u64, paths::PathAddressInfo>,
6432 rate_limiter: AddressObservationRateLimiter,
6434 received_history: Vec<ObservedAddressEvent>,
6436 bootstrap_mode: bool,
6438 next_sequence_number: VarInt,
6440 last_received_sequence: std::collections::HashMap<u64, VarInt>,
6442 frames_sent: u64,
6444}
6445
6446#[derive(Debug, Clone, PartialEq, Eq)]
6448struct ObservedAddressEvent {
6449 address: SocketAddr,
6451 received_at: Instant,
6453 path_id: u64,
6455}
6456
6457#[derive(Debug, Clone)]
6459struct AddressObservationRateLimiter {
6460 tokens: f64,
6462 max_tokens: f64,
6464 rate: f64,
6466 last_update: Instant,
6468}
6469
6470#[allow(dead_code)]
6471impl AddressDiscoveryState {
6472 fn new(config: &crate::transport_parameters::AddressDiscoveryConfig, now: Instant) -> Self {
6474 use crate::transport_parameters::AddressDiscoveryConfig::*;
6475
6476 let (enabled, _can_send, _can_receive) = match config {
6478 SendOnly => (true, true, false),
6479 ReceiveOnly => (true, false, true),
6480 SendAndReceive => (true, true, true),
6481 };
6482
6483 let max_observation_rate = 10u8; let observe_all_paths = false; Self {
6489 enabled,
6490 max_observation_rate,
6491 observe_all_paths,
6492 sent_observations: std::collections::HashMap::new(),
6493 received_observations: std::collections::HashMap::new(),
6494 rate_limiter: AddressObservationRateLimiter::new(max_observation_rate, now),
6495 received_history: Vec::new(),
6496 bootstrap_mode: false,
6497 next_sequence_number: VarInt::from_u32(0),
6498 last_received_sequence: std::collections::HashMap::new(),
6499 frames_sent: 0,
6500 }
6501 }
6502
6503 fn should_send_observation(&mut self, path_id: u64, now: Instant) -> bool {
6505 if !self.should_observe_path(path_id) {
6507 return false;
6508 }
6509
6510 let needs_observation = match self.sent_observations.get(&path_id) {
6512 Some(info) => info.observed_address.is_none() || !info.notified,
6513 None => true,
6514 };
6515
6516 if !needs_observation {
6517 return false;
6518 }
6519
6520 self.rate_limiter.try_consume(1.0, now)
6522 }
6523
6524 fn record_observation_sent(&mut self, path_id: u64) {
6526 if let Some(info) = self.sent_observations.get_mut(&path_id) {
6527 info.mark_notified();
6528 }
6529 }
6530
6531 fn handle_observed_address(&mut self, address: SocketAddr, path_id: u64, now: Instant) {
6533 if !self.enabled {
6534 return;
6535 }
6536
6537 self.received_history.push(ObservedAddressEvent {
6538 address,
6539 received_at: now,
6540 path_id,
6541 });
6542
6543 let info = self
6545 .received_observations
6546 .entry(path_id)
6547 .or_insert_with(paths::PathAddressInfo::new);
6548 info.update_observed_address(address, now);
6549 }
6550
6551 pub(crate) fn get_observed_address(&self, path_id: u64) -> Option<SocketAddr> {
6553 self.received_observations
6554 .get(&path_id)
6555 .and_then(|info| info.observed_address)
6556 }
6557
6558 pub(crate) fn get_all_received_history(&self) -> Vec<SocketAddr> {
6560 self.received_observations
6561 .values()
6562 .filter_map(|info| info.observed_address)
6563 .collect()
6564 }
6565
6566 pub(crate) fn stats(&self) -> AddressDiscoveryStats {
6568 AddressDiscoveryStats {
6569 frames_sent: self.frames_sent,
6570 frames_received: self.received_history.len() as u64,
6571 addresses_discovered: self
6572 .received_observations
6573 .values()
6574 .filter(|info| info.observed_address.is_some())
6575 .count() as u64,
6576 address_changes_detected: 0, }
6578 }
6579
6580 fn has_unnotified_changes(&self) -> bool {
6586 let has_unsent = self
6588 .sent_observations
6589 .values()
6590 .any(|info| info.observed_address.is_some() && !info.notified);
6591
6592 let has_unreceived = self
6594 .received_observations
6595 .values()
6596 .any(|info| info.observed_address.is_some() && !info.notified);
6597
6598 has_unsent || has_unreceived
6599 }
6600
6601 fn queue_observed_address_frame(
6603 &mut self,
6604 path_id: u64,
6605 address: SocketAddr,
6606 ) -> Option<frame::ObservedAddress> {
6607 if !self.enabled {
6609 tracing::debug!("queue_observed_address_frame: BLOCKED - address discovery disabled");
6610 return None;
6611 }
6612
6613 if !self.observe_all_paths && path_id != 0 {
6615 tracing::debug!(
6616 "queue_observed_address_frame: BLOCKED - path {} not allowed (observe_all_paths={})",
6617 path_id,
6618 self.observe_all_paths
6619 );
6620 return None;
6621 }
6622
6623 if let Some(info) = self.sent_observations.get(&path_id) {
6625 if info.notified {
6626 tracing::trace!(
6627 "queue_observed_address_frame: BLOCKED - path {} already notified",
6628 path_id
6629 );
6630 return None;
6631 }
6632 }
6633
6634 if self.rate_limiter.tokens < 1.0 {
6636 tracing::debug!(
6637 "queue_observed_address_frame: BLOCKED - rate limited (tokens={})",
6638 self.rate_limiter.tokens
6639 );
6640 return None;
6641 }
6642
6643 tracing::info!(
6644 "queue_observed_address_frame: SENDING OBSERVED_ADDRESS to {} for path {}",
6645 address,
6646 path_id
6647 );
6648
6649 self.rate_limiter.tokens -= 1.0;
6651
6652 let info = self
6654 .sent_observations
6655 .entry(path_id)
6656 .or_insert_with(paths::PathAddressInfo::new);
6657 info.observed_address = Some(address);
6658 info.notified = true;
6659
6660 tracing::trace!(
6661 path_id = ?path_id,
6662 address = %address,
6663 "queue_observed_address_frame: queuing frame"
6664 );
6665
6666 let sequence_number = self.next_sequence_number;
6668 self.next_sequence_number = VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6669 .expect("sequence number overflow");
6670
6671 Some(frame::ObservedAddress {
6672 sequence_number,
6673 address,
6674 })
6675 }
6676
6677 fn check_for_address_observations(
6679 &mut self,
6680 _current_path: u64,
6681 peer_supports_address_discovery: bool,
6682 now: Instant,
6683 ) -> Vec<frame::ObservedAddress> {
6684 let mut frames = Vec::new();
6685
6686 if !self.enabled || !peer_supports_address_discovery {
6688 return frames;
6689 }
6690
6691 self.rate_limiter.update_tokens(now);
6693
6694 let paths_to_notify: Vec<u64> = self
6696 .sent_observations
6697 .iter()
6698 .filter_map(|(&path_id, info)| {
6699 if info.observed_address.is_some() && !info.notified {
6700 Some(path_id)
6701 } else {
6702 None
6703 }
6704 })
6705 .collect();
6706
6707 for path_id in paths_to_notify {
6709 if !self.should_observe_path(path_id) {
6711 continue;
6712 }
6713
6714 if !self.bootstrap_mode && self.rate_limiter.tokens < 1.0 {
6716 break; }
6718
6719 if let Some(info) = self.sent_observations.get_mut(&path_id) {
6721 if let Some(address) = info.observed_address {
6722 if self.bootstrap_mode {
6724 self.rate_limiter.tokens -= 0.2; } else {
6726 self.rate_limiter.tokens -= 1.0;
6727 }
6728
6729 info.notified = true;
6731
6732 let sequence_number = self.next_sequence_number;
6734 self.next_sequence_number =
6735 VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6736 .expect("sequence number overflow");
6737
6738 self.frames_sent += 1;
6739
6740 frames.push(frame::ObservedAddress {
6741 sequence_number,
6742 address,
6743 });
6744 }
6745 }
6746 }
6747
6748 frames
6749 }
6750
6751 fn update_rate_limit(&mut self, new_rate: f64) {
6753 self.max_observation_rate = new_rate as u8;
6754 self.rate_limiter.set_rate(new_rate as u8);
6755 }
6756
6757 fn from_transport_params(params: &TransportParameters) -> Option<Self> {
6759 params
6760 .address_discovery
6761 .as_ref()
6762 .map(|config| Self::new(config, Instant::now()))
6763 }
6764
6765 #[cfg(test)]
6767 fn new_with_params(enabled: bool, max_rate: f64, observe_all_paths: bool) -> Self {
6768 if !enabled {
6770 return Self {
6772 enabled: false,
6773 max_observation_rate: max_rate as u8,
6774 observe_all_paths,
6775 sent_observations: std::collections::HashMap::new(),
6776 received_observations: std::collections::HashMap::new(),
6777 rate_limiter: AddressObservationRateLimiter::new(max_rate as u8, Instant::now()),
6778 received_history: Vec::new(),
6779 bootstrap_mode: false,
6780 next_sequence_number: VarInt::from_u32(0),
6781 last_received_sequence: std::collections::HashMap::new(),
6782 frames_sent: 0,
6783 };
6784 }
6785
6786 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6788 let mut state = Self::new(&config, Instant::now());
6789 state.max_observation_rate = max_rate as u8;
6790 state.observe_all_paths = observe_all_paths;
6791 state.rate_limiter = AddressObservationRateLimiter::new(max_rate as u8, Instant::now());
6792 state
6793 }
6794
6795 fn set_bootstrap_mode(&mut self, enabled: bool) {
6797 self.bootstrap_mode = enabled;
6798 if enabled {
6800 let bootstrap_rate = self.get_effective_rate_limit();
6801 self.rate_limiter.rate = bootstrap_rate;
6802 self.rate_limiter.max_tokens = bootstrap_rate * 2.0; self.rate_limiter.tokens = self.rate_limiter.max_tokens;
6805 }
6806 }
6807
6808 fn is_bootstrap_mode(&self) -> bool {
6810 self.bootstrap_mode
6811 }
6812
6813 fn get_effective_rate_limit(&self) -> f64 {
6815 if self.bootstrap_mode {
6816 (self.max_observation_rate as f64) * 5.0
6818 } else {
6819 self.max_observation_rate as f64
6820 }
6821 }
6822
6823 fn should_observe_path(&self, path_id: u64) -> bool {
6825 if !self.enabled {
6826 return false;
6827 }
6828
6829 if self.bootstrap_mode {
6831 return true;
6832 }
6833
6834 self.observe_all_paths || path_id == 0
6836 }
6837
6838 fn should_send_observation_immediately(&self, is_new_connection: bool) -> bool {
6840 self.bootstrap_mode && is_new_connection
6841 }
6842}
6843
6844#[allow(dead_code)]
6845impl AddressObservationRateLimiter {
6846 fn new(rate: u8, now: Instant) -> Self {
6848 let rate_f64 = rate as f64;
6849 Self {
6850 tokens: rate_f64,
6851 max_tokens: rate_f64,
6852 rate: rate_f64,
6853 last_update: now,
6854 }
6855 }
6856
6857 fn try_consume(&mut self, tokens: f64, now: Instant) -> bool {
6859 self.update_tokens(now);
6860
6861 if self.tokens >= tokens {
6862 self.tokens -= tokens;
6863 true
6864 } else {
6865 false
6866 }
6867 }
6868
6869 fn update_tokens(&mut self, now: Instant) {
6871 let elapsed = now.saturating_duration_since(self.last_update);
6872 let new_tokens = elapsed.as_secs_f64() * self.rate;
6873 self.tokens = (self.tokens + new_tokens).min(self.max_tokens);
6874 self.last_update = now;
6875 }
6876
6877 fn set_rate(&mut self, rate: u8) {
6879 let rate_f64 = rate as f64;
6880 self.rate = rate_f64;
6881 self.max_tokens = rate_f64;
6882 if self.tokens > self.max_tokens {
6884 self.tokens = self.max_tokens;
6885 }
6886 }
6887}
6888
6889impl Connection {
6890 pub(crate) fn supports_ack_receive_v1(&self) -> bool {
6891 self.peer_params.ack_receive_v1
6892 }
6893}
6894
6895#[cfg(test)]
6896mod tests {
6897 use super::*;
6898 use crate::transport_parameters::AddressDiscoveryConfig;
6899 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
6900
6901 #[test]
6902 fn nat_advertisement_address_validation_rejects_unspecified_and_zero_port() {
6903 assert!(!is_valid_nat_advertisement_address(SocketAddr::new(
6904 IpAddr::V6(Ipv6Addr::UNSPECIFIED),
6905 5000,
6906 )));
6907 assert!(!is_valid_nat_advertisement_address(SocketAddr::new(
6908 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10)),
6909 0,
6910 )));
6911 assert!(is_valid_nat_advertisement_address(SocketAddr::new(
6912 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10)),
6913 5000,
6914 )));
6915 }
6916
6917 #[test]
6918 fn address_discovery_state_new() {
6919 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6920 let now = Instant::now();
6921 let state = AddressDiscoveryState::new(&config, now);
6922
6923 assert!(state.enabled);
6924 assert_eq!(state.max_observation_rate, 10);
6925 assert!(!state.observe_all_paths);
6926 assert!(state.sent_observations.is_empty());
6927 assert!(state.received_observations.is_empty());
6928 assert!(state.received_history.is_empty());
6929 assert_eq!(state.rate_limiter.tokens, 10.0);
6930 }
6931
6932 #[test]
6933 fn address_discovery_state_disabled() {
6934 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6935 let now = Instant::now();
6936 let mut state = AddressDiscoveryState::new(&config, now);
6937
6938 state.enabled = false;
6940
6941 assert!(!state.should_send_observation(0, now));
6943 }
6944
6945 #[test]
6946 fn address_discovery_state_should_send_observation() {
6947 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6948 let now = Instant::now();
6949 let mut state = AddressDiscoveryState::new(&config, now);
6950
6951 assert!(state.should_send_observation(0, now));
6953
6954 let mut path_info = paths::PathAddressInfo::new();
6956 path_info.update_observed_address(
6957 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6958 now,
6959 );
6960 path_info.mark_notified();
6961 state.sent_observations.insert(0, path_info);
6962
6963 assert!(!state.should_send_observation(0, now));
6965
6966 assert!(!state.should_send_observation(1, now));
6968 }
6969
6970 #[test]
6971 fn address_discovery_state_rate_limiting() {
6972 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6973 let now = Instant::now();
6974 let mut state = AddressDiscoveryState::new(&config, now);
6975
6976 state.observe_all_paths = true;
6978
6979 assert!(state.should_send_observation(0, now));
6981
6982 state.rate_limiter.try_consume(9.0, now); assert!(!state.should_send_observation(0, now));
6987
6988 let later = now + Duration::from_secs(1);
6990 state.rate_limiter.update_tokens(later);
6991 assert!(state.should_send_observation(0, later));
6992 }
6993
6994 #[test]
6995 fn address_discovery_state_handle_observed_address() {
6996 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6997 let now = Instant::now();
6998 let mut state = AddressDiscoveryState::new(&config, now);
6999
7000 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
7001 let addr2 = SocketAddr::new(
7002 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
7003 8080,
7004 );
7005
7006 state.handle_observed_address(addr1, 0, now);
7008 assert_eq!(state.received_history.len(), 1);
7009 assert_eq!(state.received_history[0].address, addr1);
7010 assert_eq!(state.received_history[0].path_id, 0);
7011
7012 let later = now + Duration::from_millis(100);
7014 state.handle_observed_address(addr2, 1, later);
7015 assert_eq!(state.received_history.len(), 2);
7016 assert_eq!(state.received_history[1].address, addr2);
7017 assert_eq!(state.received_history[1].path_id, 1);
7018 }
7019
7020 #[test]
7021 fn address_discovery_state_get_observed_address() {
7022 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7023 let now = Instant::now();
7024 let mut state = AddressDiscoveryState::new(&config, now);
7025
7026 assert_eq!(state.get_observed_address(0), None);
7028
7029 let mut path_info = paths::PathAddressInfo::new();
7031 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80);
7032 path_info.update_observed_address(addr, now);
7033 state.received_observations.insert(0, path_info);
7034
7035 assert_eq!(state.get_observed_address(0), Some(addr));
7037 assert_eq!(state.get_observed_address(1), None);
7038 }
7039
7040 #[test]
7041 fn address_discovery_state_unnotified_changes() {
7042 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7043 let now = Instant::now();
7044 let mut state = AddressDiscoveryState::new(&config, now);
7045
7046 assert!(!state.has_unnotified_changes());
7048
7049 let mut path_info = paths::PathAddressInfo::new();
7051 path_info.update_observed_address(
7052 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
7053 now,
7054 );
7055 state.sent_observations.insert(0, path_info);
7056
7057 assert!(state.has_unnotified_changes());
7059
7060 state.record_observation_sent(0);
7062 assert!(!state.has_unnotified_changes());
7063 }
7064
7065 #[test]
7066 fn address_observation_rate_limiter_token_bucket() {
7067 let now = Instant::now();
7068 let mut limiter = AddressObservationRateLimiter::new(5, now); assert_eq!(limiter.tokens, 5.0);
7072 assert_eq!(limiter.max_tokens, 5.0);
7073 assert_eq!(limiter.rate, 5.0);
7074
7075 assert!(limiter.try_consume(3.0, now));
7077 assert_eq!(limiter.tokens, 2.0);
7078
7079 assert!(!limiter.try_consume(3.0, now));
7081 assert_eq!(limiter.tokens, 2.0);
7082
7083 let later = now + Duration::from_secs(1);
7085 limiter.update_tokens(later);
7086 assert_eq!(limiter.tokens, 5.0); let half_sec = now + Duration::from_millis(500);
7090 let mut limiter2 = AddressObservationRateLimiter::new(5, now);
7091 limiter2.try_consume(3.0, now);
7092 limiter2.update_tokens(half_sec);
7093 assert_eq!(limiter2.tokens, 4.5); }
7095
7096 #[test]
7098 fn connection_initializes_address_discovery_state_default() {
7099 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
7102 let state = AddressDiscoveryState::new(&config, Instant::now());
7103 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
7106 }
7107
7108 #[test]
7109 fn connection_initializes_with_address_discovery_enabled() {
7110 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7112 let state = AddressDiscoveryState::new(&config, Instant::now());
7113 assert!(state.enabled);
7114 assert_eq!(state.max_observation_rate, 10);
7115 assert!(!state.observe_all_paths);
7116 }
7117
7118 #[test]
7119 fn connection_address_discovery_enabled_by_default() {
7120 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
7122 let state = AddressDiscoveryState::new(&config, Instant::now());
7123 assert!(state.enabled); }
7125
7126 #[test]
7127 fn negotiate_max_idle_timeout_commutative() {
7128 let test_params = [
7129 (None, None, None),
7130 (None, Some(VarInt(0)), None),
7131 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
7132 (Some(VarInt(0)), Some(VarInt(0)), None),
7133 (
7134 Some(VarInt(2)),
7135 Some(VarInt(0)),
7136 Some(Duration::from_millis(2)),
7137 ),
7138 (
7139 Some(VarInt(1)),
7140 Some(VarInt(4)),
7141 Some(Duration::from_millis(1)),
7142 ),
7143 ];
7144
7145 for (left, right, result) in test_params {
7146 assert_eq!(negotiate_max_idle_timeout(left, right), result);
7147 assert_eq!(negotiate_max_idle_timeout(right, left), result);
7148 }
7149 }
7150
7151 #[test]
7152 fn path_creation_initializes_address_discovery() {
7153 let config = TransportConfig::default();
7154 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7155 let now = Instant::now();
7156
7157 let path = paths::PathData::new(remote, false, None, now, &config);
7159
7160 assert!(path.address_info.observed_address.is_none());
7162 assert!(path.address_info.last_observed.is_none());
7163 assert_eq!(path.address_info.observation_count, 0);
7164 assert!(!path.address_info.notified);
7165
7166 assert_eq!(path.observation_rate_limiter.rate, 10.0);
7168 assert_eq!(path.observation_rate_limiter.max_tokens, 10.0);
7169 assert_eq!(path.observation_rate_limiter.tokens, 10.0);
7170 }
7171
7172 #[test]
7173 fn path_migration_resets_address_discovery() {
7174 let config = TransportConfig::default();
7175 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7176 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
7177 let now = Instant::now();
7178
7179 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
7181 path1.update_observed_address(remote1, now);
7182 path1.mark_address_notified();
7183 path1.consume_observation_token(now);
7184 path1.set_observation_rate(20);
7185
7186 let path2 = paths::PathData::from_previous(remote2, &path1, now);
7188
7189 assert!(path2.address_info.observed_address.is_none());
7191 assert!(path2.address_info.last_observed.is_none());
7192 assert_eq!(path2.address_info.observation_count, 0);
7193 assert!(!path2.address_info.notified);
7194
7195 assert_eq!(path2.observation_rate_limiter.rate, 20.0);
7197 assert_eq!(path2.observation_rate_limiter.tokens, 20.0);
7198 }
7199
7200 #[test]
7201 fn connection_path_updates_observation_rate() {
7202 let config = TransportConfig::default();
7203 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 42);
7204 let now = Instant::now();
7205
7206 let mut path = paths::PathData::new(remote, false, None, now, &config);
7207
7208 assert_eq!(path.observation_rate_limiter.rate, 10.0);
7210
7211 path.set_observation_rate(25);
7213 assert_eq!(path.observation_rate_limiter.rate, 25.0);
7214 assert_eq!(path.observation_rate_limiter.max_tokens, 25.0);
7215
7216 path.observation_rate_limiter.tokens = 30.0; path.set_observation_rate(20);
7219 assert_eq!(path.observation_rate_limiter.tokens, 20.0); }
7221
7222 #[test]
7223 fn path_validation_preserves_discovery_state() {
7224 let config = TransportConfig::default();
7225 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7226 let now = Instant::now();
7227
7228 let mut path = paths::PathData::new(remote, false, None, now, &config);
7229
7230 let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 5678);
7232 path.update_observed_address(observed, now);
7233 path.set_observation_rate(15);
7234
7235 path.validated = true;
7237
7238 assert_eq!(path.address_info.observed_address, Some(observed));
7240 assert_eq!(path.observation_rate_limiter.rate, 15.0);
7241 }
7242
7243 #[test]
7244 fn address_discovery_state_initialization() {
7245 let state = AddressDiscoveryState::new_with_params(true, 30.0, true);
7247
7248 assert!(state.enabled);
7249 assert_eq!(state.max_observation_rate, 30);
7250 assert!(state.observe_all_paths);
7251 assert!(state.sent_observations.is_empty());
7252 assert!(state.received_observations.is_empty());
7253 assert!(state.received_history.is_empty());
7254 }
7255
7256 #[test]
7258 fn handle_observed_address_frame_basic() {
7259 let config = AddressDiscoveryConfig::SendAndReceive;
7260 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7261 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7262 let now = Instant::now();
7263 let path_id = 0;
7264
7265 state.handle_observed_address(addr, path_id, now);
7267
7268 assert_eq!(state.received_history.len(), 1);
7270 assert_eq!(state.received_history[0].address, addr);
7271 assert_eq!(state.received_history[0].path_id, path_id);
7272 assert_eq!(state.received_history[0].received_at, now);
7273
7274 assert!(state.received_observations.contains_key(&path_id));
7276 let path_info = &state.received_observations[&path_id];
7277 assert_eq!(path_info.observed_address, Some(addr));
7278 assert_eq!(path_info.last_observed, Some(now));
7279 assert_eq!(path_info.observation_count, 1);
7280 }
7281
7282 #[test]
7283 fn handle_observed_address_frame_multiple_observations() {
7284 let config = AddressDiscoveryConfig::SendAndReceive;
7285 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7286 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7287 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
7288 let now = Instant::now();
7289 let path_id = 0;
7290
7291 state.handle_observed_address(addr1, path_id, now);
7293 state.handle_observed_address(addr1, path_id, now + Duration::from_secs(1));
7294 state.handle_observed_address(addr2, path_id, now + Duration::from_secs(2));
7295
7296 assert_eq!(state.received_history.len(), 3);
7298
7299 let path_info = &state.received_observations[&path_id];
7301 assert_eq!(path_info.observed_address, Some(addr2));
7302 assert_eq!(path_info.observation_count, 1); }
7304
7305 #[test]
7306 fn handle_observed_address_frame_disabled() {
7307 let config = AddressDiscoveryConfig::SendAndReceive;
7308 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7309 state.enabled = false; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7311 let now = Instant::now();
7312
7313 state.handle_observed_address(addr, 0, now);
7315
7316 assert!(state.received_history.is_empty());
7318 assert!(state.sent_observations.is_empty());
7319 assert!(state.received_observations.is_empty());
7320 }
7321
7322 #[test]
7323 fn should_send_observation_basic() {
7324 let config = AddressDiscoveryConfig::SendAndReceive;
7325 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7326 state.max_observation_rate = 10;
7327 let now = Instant::now();
7328 let path_id = 0;
7329
7330 assert!(state.should_send_observation(path_id, now));
7332
7333 state.record_observation_sent(path_id);
7335
7336 assert!(state.should_send_observation(path_id, now));
7338 }
7339
7340 #[test]
7341 fn should_send_observation_rate_limiting() {
7342 let config = AddressDiscoveryConfig::SendAndReceive;
7343 let now = Instant::now();
7344 let mut state = AddressDiscoveryState::new(&config, now);
7345 state.max_observation_rate = 2; state.update_rate_limit(2.0);
7347 let path_id = 0;
7348
7349 assert!(state.should_send_observation(path_id, now));
7351 state.record_observation_sent(path_id);
7352 assert!(state.should_send_observation(path_id, now));
7353 state.record_observation_sent(path_id);
7354
7355 assert!(!state.should_send_observation(path_id, now));
7357
7358 let later = now + Duration::from_secs(1);
7360 assert!(state.should_send_observation(path_id, later));
7361 }
7362
7363 #[test]
7364 fn should_send_observation_disabled() {
7365 let config = AddressDiscoveryConfig::SendAndReceive;
7366 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7367 state.enabled = false;
7368
7369 assert!(!state.should_send_observation(0, Instant::now()));
7371 }
7372
7373 #[test]
7374 fn should_send_observation_per_path() {
7375 let config = AddressDiscoveryConfig::SendAndReceive;
7376 let now = Instant::now();
7377 let mut state = AddressDiscoveryState::new(&config, now);
7378 state.max_observation_rate = 2; state.observe_all_paths = true;
7380 state.update_rate_limit(2.0);
7381
7382 assert!(state.should_send_observation(0, now));
7384 state.record_observation_sent(0);
7385
7386 assert!(state.should_send_observation(1, now));
7388 state.record_observation_sent(1);
7389
7390 assert!(!state.should_send_observation(0, now));
7392 assert!(!state.should_send_observation(1, now));
7393
7394 let later = now + Duration::from_secs(1);
7396 assert!(state.should_send_observation(0, later));
7397 }
7398
7399 #[test]
7400 fn has_unnotified_changes_test() {
7401 let config = AddressDiscoveryConfig::SendAndReceive;
7402 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7403 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7404 let now = Instant::now();
7405
7406 assert!(!state.has_unnotified_changes());
7408
7409 state.handle_observed_address(addr, 0, now);
7411 assert!(state.has_unnotified_changes());
7412
7413 state.received_observations.get_mut(&0).unwrap().notified = true;
7415 assert!(!state.has_unnotified_changes());
7416 }
7417
7418 #[test]
7419 fn get_observed_address_test() {
7420 let config = AddressDiscoveryConfig::SendAndReceive;
7421 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7422 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7423 let now = Instant::now();
7424 let path_id = 0;
7425
7426 assert_eq!(state.get_observed_address(path_id), None);
7428
7429 state.handle_observed_address(addr, path_id, now);
7431 assert_eq!(state.get_observed_address(path_id), Some(addr));
7432
7433 assert_eq!(state.get_observed_address(999), None);
7435 }
7436
7437 #[test]
7439 fn rate_limiter_token_bucket_basic() {
7440 let now = Instant::now();
7441 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(5.0, now));
7445 assert!(limiter.try_consume(5.0, now));
7446
7447 assert!(!limiter.try_consume(1.0, now));
7449 }
7450
7451 #[test]
7452 fn rate_limiter_token_replenishment() {
7453 let now = Instant::now();
7454 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(10.0, now));
7458 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7462 assert!(limiter.try_consume(10.0, later)); assert!(!limiter.try_consume(0.1, later)); let later = later + Duration::from_millis(500);
7467 assert!(limiter.try_consume(5.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7470
7471 #[test]
7472 fn rate_limiter_max_tokens_cap() {
7473 let now = Instant::now();
7474 let mut limiter = AddressObservationRateLimiter::new(10, now);
7475
7476 let later = now + Duration::from_secs(2);
7478 assert!(limiter.try_consume(10.0, later));
7480 assert!(!limiter.try_consume(10.1, later)); let later2 = later + Duration::from_secs(1);
7484 assert!(limiter.try_consume(3.0, later2));
7485
7486 let much_later = later2 + Duration::from_secs(2);
7488 assert!(limiter.try_consume(10.0, much_later)); assert!(!limiter.try_consume(0.1, much_later)); }
7491
7492 #[test]
7493 fn rate_limiter_fractional_consumption() {
7494 let now = Instant::now();
7495 let mut limiter = AddressObservationRateLimiter::new(10, now);
7496
7497 assert!(limiter.try_consume(0.5, now));
7499 assert!(limiter.try_consume(2.3, now));
7500 assert!(limiter.try_consume(7.2, now)); assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_millis(100); assert!(limiter.try_consume(1.0, later));
7506 assert!(!limiter.try_consume(0.1, later));
7507 }
7508
7509 #[test]
7510 fn rate_limiter_zero_rate() {
7511 let now = Instant::now();
7512 let mut limiter = AddressObservationRateLimiter::new(0, now); assert!(!limiter.try_consume(1.0, now));
7516 assert!(!limiter.try_consume(0.1, now));
7517 assert!(!limiter.try_consume(0.001, now));
7518
7519 let later = now + Duration::from_secs(10);
7521 assert!(!limiter.try_consume(0.001, later));
7522 }
7523
7524 #[test]
7525 fn rate_limiter_high_rate() {
7526 let now = Instant::now();
7527 let mut limiter = AddressObservationRateLimiter::new(63, now); assert!(limiter.try_consume(60.0, now));
7531 assert!(limiter.try_consume(3.0, now));
7532 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7536 assert!(limiter.try_consume(63.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7539
7540 #[test]
7541 fn rate_limiter_time_precision() {
7542 let now = Instant::now();
7543 let mut limiter = AddressObservationRateLimiter::new(100, now); assert!(limiter.try_consume(100.0, now));
7547 assert!(!limiter.try_consume(0.1, now));
7548
7549 let later = now + Duration::from_millis(10);
7551 assert!(limiter.try_consume(0.8, later)); assert!(!limiter.try_consume(0.5, later)); let much_later = later + Duration::from_millis(100); assert!(limiter.try_consume(5.0, much_later)); limiter.tokens = 0.0; let final_time = much_later + Duration::from_millis(1);
7563 limiter.update_tokens(final_time); assert!(limiter.tokens >= 0.09 && limiter.tokens <= 0.11);
7568 }
7569
7570 #[test]
7571 fn per_path_rate_limiting_independent() {
7572 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7573 let now = Instant::now();
7574 let mut state = AddressDiscoveryState::new(&config, now);
7575
7576 state.observe_all_paths = true;
7578
7579 state.update_rate_limit(5.0);
7581
7582 state
7584 .sent_observations
7585 .insert(0, paths::PathAddressInfo::new());
7586 state
7587 .sent_observations
7588 .insert(1, paths::PathAddressInfo::new());
7589 state
7590 .sent_observations
7591 .insert(2, paths::PathAddressInfo::new());
7592
7593 state
7595 .sent_observations
7596 .get_mut(&0)
7597 .unwrap()
7598 .observed_address = Some(SocketAddr::new(
7599 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7600 8080,
7601 ));
7602 state
7603 .sent_observations
7604 .get_mut(&1)
7605 .unwrap()
7606 .observed_address = Some(SocketAddr::new(
7607 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
7608 8081,
7609 ));
7610 state
7611 .sent_observations
7612 .get_mut(&2)
7613 .unwrap()
7614 .observed_address = Some(SocketAddr::new(
7615 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)),
7616 8082,
7617 ));
7618
7619 for _ in 0..3 {
7621 assert!(state.should_send_observation(0, now));
7622 state.record_observation_sent(0);
7623 state.sent_observations.get_mut(&0).unwrap().notified = false;
7625 }
7626
7627 for _ in 0..2 {
7629 assert!(state.should_send_observation(1, now));
7630 state.record_observation_sent(1);
7631 state.sent_observations.get_mut(&1).unwrap().notified = false;
7633 }
7634
7635 assert!(!state.should_send_observation(2, now));
7637
7638 let later = now + Duration::from_secs(1);
7640
7641 assert!(state.should_send_observation(0, later));
7643 assert!(state.should_send_observation(1, later));
7644 assert!(state.should_send_observation(2, later));
7645 }
7646
7647 #[test]
7648 fn per_path_rate_limiting_with_path_specific_limits() {
7649 let now = Instant::now();
7650 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7651 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7652 let config = TransportConfig::default();
7653
7654 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
7656 let mut path2 = paths::PathData::new(remote2, false, None, now, &config);
7657
7658 path1.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now); path2.observation_rate_limiter = paths::PathObservationRateLimiter::new(5, now); for _ in 0..10 {
7664 assert!(path1.observation_rate_limiter.can_send(now));
7665 path1.observation_rate_limiter.consume_token(now);
7666 }
7667 assert!(!path1.observation_rate_limiter.can_send(now));
7668
7669 for _ in 0..5 {
7671 assert!(path2.observation_rate_limiter.can_send(now));
7672 path2.observation_rate_limiter.consume_token(now);
7673 }
7674 assert!(!path2.observation_rate_limiter.can_send(now));
7675 }
7676
7677 #[test]
7678 fn per_path_rate_limiting_address_change_detection() {
7679 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7680 let now = Instant::now();
7681 let mut state = AddressDiscoveryState::new(&config, now);
7682
7683 let path_id = 0;
7685 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7686 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8080);
7687
7688 assert!(state.should_send_observation(path_id, now));
7690
7691 let frame = state.queue_observed_address_frame(path_id, addr1);
7693 assert!(frame.is_some());
7694
7695 assert!(!state.should_send_observation(path_id, now));
7697
7698 if let Some(info) = state.sent_observations.get_mut(&path_id) {
7700 info.notified = false;
7701 info.observed_address = Some(addr2);
7702 }
7703
7704 assert!(state.should_send_observation(path_id, now));
7706 }
7707
7708 #[test]
7709 fn per_path_rate_limiting_migration() {
7710 let now = Instant::now();
7711 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7712 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7713 let config = TransportConfig::default();
7714
7715 let mut path = paths::PathData::new(remote1, false, None, now, &config);
7717 path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7718
7719 for _ in 0..5 {
7721 assert!(path.observation_rate_limiter.can_send(now));
7722 path.observation_rate_limiter.consume_token(now);
7723 }
7724
7725 let mut new_path = paths::PathData::new(remote2, false, None, now, &config);
7727
7728 new_path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7731
7732 for _ in 0..10 {
7734 assert!(new_path.observation_rate_limiter.can_send(now));
7735 new_path.observation_rate_limiter.consume_token(now);
7736 }
7737 assert!(!new_path.observation_rate_limiter.can_send(now));
7738 }
7739
7740 #[test]
7741 fn per_path_rate_limiting_disabled_paths() {
7742 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7743 let now = Instant::now();
7744 let mut state = AddressDiscoveryState::new(&config, now);
7745
7746 assert!(state.should_send_observation(0, now));
7748
7749 assert!(!state.should_send_observation(1, now));
7751 assert!(!state.should_send_observation(2, now));
7752
7753 let later = now + Duration::from_secs(1);
7755 assert!(!state.should_send_observation(1, later));
7756 }
7757
7758 #[test]
7759 fn respecting_negotiated_max_observation_rate_basic() {
7760 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7761 let now = Instant::now();
7762 let mut state = AddressDiscoveryState::new(&config, now);
7763
7764 state.max_observation_rate = 10; state.rate_limiter = AddressObservationRateLimiter::new(10, now);
7767
7768 for _ in 0..10 {
7770 assert!(state.should_send_observation(0, now));
7771 }
7772 assert!(!state.should_send_observation(0, now));
7774 }
7775
7776 #[test]
7777 fn respecting_negotiated_max_observation_rate_zero() {
7778 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7779 let now = Instant::now();
7780 let mut state = AddressDiscoveryState::new(&config, now);
7781
7782 state.max_observation_rate = 0;
7784 state.rate_limiter = AddressObservationRateLimiter::new(0, now);
7785
7786 assert!(!state.should_send_observation(0, now));
7788 assert!(!state.should_send_observation(1, now));
7789
7790 let later = now + Duration::from_secs(10);
7792 assert!(!state.should_send_observation(0, later));
7793 }
7794
7795 #[test]
7796 fn respecting_negotiated_max_observation_rate_higher() {
7797 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7798 let now = Instant::now();
7799 let mut state = AddressDiscoveryState::new(&config, now);
7800
7801 state
7803 .sent_observations
7804 .insert(0, paths::PathAddressInfo::new());
7805 state
7806 .sent_observations
7807 .get_mut(&0)
7808 .unwrap()
7809 .observed_address = Some(SocketAddr::new(
7810 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7811 8080,
7812 ));
7813
7814 state.update_rate_limit(5.0);
7816
7817 state.max_observation_rate = 20; for _ in 0..5 {
7822 assert!(state.should_send_observation(0, now));
7823 state.record_observation_sent(0);
7824 state.sent_observations.get_mut(&0).unwrap().notified = false;
7826 }
7827 assert!(!state.should_send_observation(0, now));
7829 }
7830
7831 #[test]
7832 fn respecting_negotiated_max_observation_rate_dynamic_update() {
7833 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7834 let now = Instant::now();
7835 let mut state = AddressDiscoveryState::new(&config, now);
7836
7837 state
7839 .sent_observations
7840 .insert(0, paths::PathAddressInfo::new());
7841 state
7842 .sent_observations
7843 .get_mut(&0)
7844 .unwrap()
7845 .observed_address = Some(SocketAddr::new(
7846 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7847 8080,
7848 ));
7849
7850 for _ in 0..5 {
7852 assert!(state.should_send_observation(0, now));
7853 state.record_observation_sent(0);
7854 state.sent_observations.get_mut(&0).unwrap().notified = false;
7856 }
7857
7858 state.max_observation_rate = 3;
7862 state.rate_limiter.set_rate(3);
7863
7864 for _ in 0..3 {
7867 assert!(state.should_send_observation(0, now));
7868 state.record_observation_sent(0);
7869 state.sent_observations.get_mut(&0).unwrap().notified = false;
7871 }
7872
7873 assert!(!state.should_send_observation(0, now));
7875
7876 let later = now + Duration::from_secs(1);
7878 for _ in 0..3 {
7879 assert!(state.should_send_observation(0, later));
7880 state.record_observation_sent(0);
7881 state.sent_observations.get_mut(&0).unwrap().notified = false;
7883 }
7884
7885 assert!(!state.should_send_observation(0, later));
7887 }
7888
7889 #[test]
7890 fn respecting_negotiated_max_observation_rate_with_paths() {
7891 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7892 let now = Instant::now();
7893 let mut state = AddressDiscoveryState::new(&config, now);
7894
7895 state.observe_all_paths = true;
7897
7898 for i in 0..3 {
7900 state
7901 .sent_observations
7902 .insert(i, paths::PathAddressInfo::new());
7903 state
7904 .sent_observations
7905 .get_mut(&i)
7906 .unwrap()
7907 .observed_address = Some(SocketAddr::new(
7908 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100 + i as u8)),
7909 5000,
7910 ));
7911 }
7912
7913 for _ in 0..3 {
7916 for i in 0..3 {
7918 if state.should_send_observation(i, now) {
7919 state.record_observation_sent(i);
7920 state.sent_observations.get_mut(&i).unwrap().notified = false;
7922 }
7923 }
7924 }
7925
7926 assert!(state.should_send_observation(0, now));
7929 state.record_observation_sent(0);
7930
7931 assert!(!state.should_send_observation(0, now));
7933 assert!(!state.should_send_observation(1, now));
7934 assert!(!state.should_send_observation(2, now));
7935 }
7936
7937 #[test]
7938 fn queue_observed_address_frame_basic() {
7939 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7940 let now = Instant::now();
7941 let mut state = AddressDiscoveryState::new(&config, now);
7942
7943 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7945 let frame = state.queue_observed_address_frame(0, address);
7946
7947 assert!(frame.is_some());
7949 let frame = frame.unwrap();
7950 assert_eq!(frame.address, address);
7951
7952 assert!(state.sent_observations.contains_key(&0));
7954 assert!(state.sent_observations.get(&0).unwrap().notified);
7955 }
7956
7957 #[test]
7958 fn queue_observed_address_frame_rate_limited() {
7959 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7960 let now = Instant::now();
7961 let mut state = AddressDiscoveryState::new(&config, now);
7962
7963 state.observe_all_paths = true;
7965
7966 let mut addresses = Vec::new();
7968 for i in 0..10 {
7969 let addr = SocketAddr::new(
7970 IpAddr::V4(Ipv4Addr::new(192, 168, 1, i as u8)),
7971 5000 + i as u16,
7972 );
7973 addresses.push(addr);
7974 assert!(
7975 state.queue_observed_address_frame(i as u64, addr).is_some(),
7976 "Frame {} should be allowed",
7977 i + 1
7978 );
7979 }
7980
7981 let addr11 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 11)), 5011);
7983 assert!(
7984 state.queue_observed_address_frame(10, addr11).is_none(),
7985 "11th frame should be rate limited"
7986 );
7987 }
7988
7989 #[test]
7990 fn queue_observed_address_frame_disabled() {
7991 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7992 let now = Instant::now();
7993 let mut state = AddressDiscoveryState::new(&config, now);
7994
7995 state.enabled = false;
7997
7998 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7999
8000 assert!(state.queue_observed_address_frame(0, address).is_none());
8002 }
8003
8004 #[test]
8005 fn queue_observed_address_frame_already_notified() {
8006 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8007 let now = Instant::now();
8008 let mut state = AddressDiscoveryState::new(&config, now);
8009
8010 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
8011
8012 assert!(state.queue_observed_address_frame(0, address).is_some());
8014
8015 assert!(state.queue_observed_address_frame(0, address).is_none());
8017
8018 let new_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
8020 assert!(state.queue_observed_address_frame(0, new_address).is_none());
8021 }
8022
8023 #[test]
8024 fn queue_observed_address_frame_primary_path_only() {
8025 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8026 let now = Instant::now();
8027 let mut state = AddressDiscoveryState::new(&config, now);
8028
8029 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
8030
8031 assert!(state.queue_observed_address_frame(0, address).is_some());
8033
8034 assert!(state.queue_observed_address_frame(1, address).is_none());
8036 assert!(state.queue_observed_address_frame(2, address).is_none());
8037 }
8038
8039 #[test]
8040 fn queue_observed_address_frame_updates_path_info() {
8041 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8042 let now = Instant::now();
8043 let mut state = AddressDiscoveryState::new(&config, now);
8044
8045 let address = SocketAddr::new(
8046 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
8047 5000,
8048 );
8049
8050 let frame = state.queue_observed_address_frame(0, address);
8052 assert!(frame.is_some());
8053
8054 let path_info = state.sent_observations.get(&0).unwrap();
8056 assert_eq!(path_info.observed_address, Some(address));
8057 assert!(path_info.notified);
8058
8059 assert_eq!(state.received_history.len(), 0);
8062 }
8063
8064 #[test]
8065 fn retransmits_includes_outbound_observations() {
8066 use crate::connection::spaces::Retransmits;
8067
8068 let mut retransmits = Retransmits::default();
8070
8071 assert!(retransmits.outbound_observations.is_empty());
8073
8074 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
8076 let frame = frame::ObservedAddress {
8077 sequence_number: VarInt::from_u32(1),
8078 address,
8079 };
8080 retransmits.outbound_observations.push(frame);
8081
8082 assert_eq!(retransmits.outbound_observations.len(), 1);
8084 assert_eq!(retransmits.outbound_observations[0].address, address);
8085 }
8086
8087 #[test]
8088 fn check_for_address_observations_no_peer_support() {
8089 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8090 let now = Instant::now();
8091 let mut state = AddressDiscoveryState::new(&config, now);
8092
8093 state
8095 .sent_observations
8096 .insert(0, paths::PathAddressInfo::new());
8097 state
8098 .sent_observations
8099 .get_mut(&0)
8100 .unwrap()
8101 .observed_address = Some(SocketAddr::new(
8102 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)),
8103 5000,
8104 ));
8105
8106 let frames = state.check_for_address_observations(0, false, now);
8108
8109 assert!(frames.is_empty());
8111 }
8112
8113 #[test]
8114 fn check_for_address_observations_with_peer_support() {
8115 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8116 let now = Instant::now();
8117 let mut state = AddressDiscoveryState::new(&config, now);
8118
8119 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
8121 state
8122 .sent_observations
8123 .insert(0, paths::PathAddressInfo::new());
8124 state
8125 .sent_observations
8126 .get_mut(&0)
8127 .unwrap()
8128 .observed_address = Some(address);
8129
8130 let frames = state.check_for_address_observations(0, true, now);
8132
8133 assert_eq!(frames.len(), 1);
8135 assert_eq!(frames[0].address, address);
8136
8137 assert!(state.sent_observations.get(&0).unwrap().notified);
8139 }
8140
8141 #[test]
8142 fn check_for_address_observations_rate_limited() {
8143 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8144 let now = Instant::now();
8145 let mut state = AddressDiscoveryState::new(&config, now);
8146
8147 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
8149 state
8150 .sent_observations
8151 .insert(0, paths::PathAddressInfo::new());
8152 state
8153 .sent_observations
8154 .get_mut(&0)
8155 .unwrap()
8156 .observed_address = Some(address);
8157
8158 for _ in 0..10 {
8160 let frames = state.check_for_address_observations(0, true, now);
8161 if frames.is_empty() {
8162 break;
8163 }
8164 state.sent_observations.get_mut(&0).unwrap().notified = false;
8166 }
8167
8168 assert_eq!(state.rate_limiter.tokens, 0.0);
8170
8171 state.sent_observations.get_mut(&0).unwrap().notified = false;
8173
8174 let frames2 = state.check_for_address_observations(0, true, now);
8176 assert_eq!(frames2.len(), 0);
8177
8178 state.sent_observations.get_mut(&0).unwrap().notified = false;
8180
8181 let later = now + Duration::from_millis(200); let frames3 = state.check_for_address_observations(0, true, later);
8184 assert_eq!(frames3.len(), 1);
8185 }
8186
8187 #[test]
8188 fn check_for_address_observations_multiple_paths() {
8189 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
8190 let now = Instant::now();
8191 let mut state = AddressDiscoveryState::new(&config, now);
8192
8193 state.observe_all_paths = true;
8195
8196 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
8198 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
8199
8200 state
8201 .sent_observations
8202 .insert(0, paths::PathAddressInfo::new());
8203 state
8204 .sent_observations
8205 .get_mut(&0)
8206 .unwrap()
8207 .observed_address = Some(addr1);
8208
8209 state
8210 .sent_observations
8211 .insert(1, paths::PathAddressInfo::new());
8212 state
8213 .sent_observations
8214 .get_mut(&1)
8215 .unwrap()
8216 .observed_address = Some(addr2);
8217
8218 let frames = state.check_for_address_observations(0, true, now);
8220
8221 assert_eq!(frames.len(), 2);
8223
8224 let addresses: Vec<_> = frames.iter().map(|f| f.address).collect();
8226 assert!(addresses.contains(&addr1));
8227 assert!(addresses.contains(&addr2));
8228
8229 assert!(state.sent_observations.get(&0).unwrap().notified);
8231 assert!(state.sent_observations.get(&1).unwrap().notified);
8232 }
8233
8234 #[test]
8236 fn test_rate_limiter_configuration() {
8237 let state = AddressDiscoveryState::new_with_params(true, 10.0, false);
8239 assert_eq!(state.rate_limiter.rate, 10.0);
8240 assert_eq!(state.rate_limiter.max_tokens, 10.0);
8241 assert_eq!(state.rate_limiter.tokens, 10.0);
8242
8243 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
8244 assert_eq!(state.rate_limiter.rate, 63.0);
8245 assert_eq!(state.rate_limiter.max_tokens, 63.0);
8246 }
8247
8248 #[test]
8249 fn test_rate_limiter_update_configuration() {
8250 let mut state = AddressDiscoveryState::new_with_params(true, 5.0, false);
8251
8252 assert_eq!(state.rate_limiter.rate, 5.0);
8254
8255 state.update_rate_limit(10.0);
8257 assert_eq!(state.rate_limiter.rate, 10.0);
8258 assert_eq!(state.rate_limiter.max_tokens, 10.0);
8259
8260 state.rate_limiter.tokens = 15.0;
8262 state.update_rate_limit(8.0);
8263 assert_eq!(state.rate_limiter.tokens, 8.0);
8264 }
8265
8266 #[test]
8267 fn test_rate_limiter_from_transport_params() {
8268 let mut params = TransportParameters::default();
8269 params.address_discovery = Some(AddressDiscoveryConfig::SendAndReceive);
8270
8271 let state = AddressDiscoveryState::from_transport_params(¶ms);
8272 assert!(state.is_some());
8273 let state = state.unwrap();
8274 assert_eq!(state.rate_limiter.rate, 10.0); assert!(!state.observe_all_paths); }
8277
8278 #[test]
8279 fn test_rate_limiter_zero_rate() {
8280 let state = AddressDiscoveryState::new_with_params(true, 0.0, false);
8281 assert_eq!(state.rate_limiter.rate, 0.0);
8282 assert_eq!(state.rate_limiter.tokens, 0.0);
8283
8284 let address = "192.168.1.1:443".parse().unwrap();
8286 let mut state = AddressDiscoveryState::new_with_params(true, 0.0, false);
8287 let frame = state.queue_observed_address_frame(0, address);
8288 assert!(frame.is_none());
8289 }
8290
8291 #[test]
8292 fn test_rate_limiter_configuration_edge_cases() {
8293 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
8295 assert_eq!(state.rate_limiter.rate, 63.0);
8296
8297 let state = AddressDiscoveryState::new_with_params(true, 100.0, false);
8299 assert_eq!(state.rate_limiter.rate, 100.0);
8301
8302 let state = AddressDiscoveryState::new_with_params(true, 2.5, false);
8304 assert_eq!(state.rate_limiter.rate, 2.0);
8306 }
8307
8308 #[test]
8309 fn test_rate_limiter_runtime_update() {
8310 let mut state = AddressDiscoveryState::new_with_params(true, 10.0, false);
8311 let now = Instant::now();
8312
8313 state.rate_limiter.tokens = 5.0;
8315
8316 state.update_rate_limit(3.0);
8318
8319 assert_eq!(state.rate_limiter.tokens, 3.0);
8321 assert_eq!(state.rate_limiter.rate, 3.0);
8322 assert_eq!(state.rate_limiter.max_tokens, 3.0);
8323
8324 let later = now + Duration::from_secs(1);
8326 state.rate_limiter.update_tokens(later);
8327
8328 assert_eq!(state.rate_limiter.tokens, 3.0);
8330 }
8331
8332 #[test]
8334 fn test_address_discovery_state_initialization_default() {
8335 let now = Instant::now();
8337 let default_config = crate::transport_parameters::AddressDiscoveryConfig::default();
8338
8339 let address_discovery_state = Some(AddressDiscoveryState::new(&default_config, now));
8342
8343 assert!(address_discovery_state.is_some());
8344 let state = address_discovery_state.unwrap();
8345
8346 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
8350 }
8351
8352 #[test]
8353 fn test_address_discovery_state_initialization_on_handshake() {
8354 let now = Instant::now();
8356
8357 let mut address_discovery_state = Some(AddressDiscoveryState::new(
8359 &crate::transport_parameters::AddressDiscoveryConfig::default(),
8360 now,
8361 ));
8362
8363 let peer_params = TransportParameters {
8365 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8366 ..TransportParameters::default()
8367 };
8368
8369 if let Some(peer_config) = &peer_params.address_discovery {
8371 address_discovery_state = Some(AddressDiscoveryState::new(peer_config, now));
8373 }
8374
8375 assert!(address_discovery_state.is_some());
8377 let state = address_discovery_state.unwrap();
8378 assert!(state.enabled);
8379 assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths); }
8383
8384 #[test]
8385 fn test_address_discovery_negotiation_disabled_peer() {
8386 let now = Instant::now();
8388
8389 let our_config = AddressDiscoveryConfig::SendAndReceive;
8391 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8392
8393 let peer_params = TransportParameters {
8395 address_discovery: None,
8396 ..TransportParameters::default()
8397 };
8398
8399 if peer_params.address_discovery.is_none() {
8401 if let Some(state) = &mut address_discovery_state {
8402 state.enabled = false;
8403 }
8404 }
8405
8406 let state = address_discovery_state.unwrap();
8408 assert!(!state.enabled); }
8410
8411 #[test]
8412 fn test_address_discovery_negotiation_rate_limiting() {
8413 let now = Instant::now();
8415
8416 let our_config = AddressDiscoveryConfig::SendAndReceive;
8418 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8419
8420 if let Some(state) = &mut address_discovery_state {
8422 state.max_observation_rate = 30;
8423 state.update_rate_limit(30.0);
8424 }
8425
8426 let peer_params = TransportParameters {
8428 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8429 ..TransportParameters::default()
8430 };
8431
8432 if let (Some(state), Some(_peer_config)) =
8435 (&mut address_discovery_state, &peer_params.address_discovery)
8436 {
8437 let peer_rate = 15u8;
8440 let negotiated_rate = state.max_observation_rate.min(peer_rate);
8441 state.update_rate_limit(negotiated_rate as f64);
8442 }
8443
8444 let state = address_discovery_state.unwrap();
8446 assert_eq!(state.rate_limiter.rate, 15.0); }
8448
8449 #[test]
8450 fn test_address_discovery_path_initialization() {
8451 let now = Instant::now();
8453 let config = AddressDiscoveryConfig::SendAndReceive;
8454 let mut state = AddressDiscoveryState::new(&config, now);
8455
8456 assert!(state.sent_observations.is_empty());
8458 assert!(state.received_observations.is_empty());
8459
8460 let should_send = state.should_send_observation(0, now);
8462 assert!(should_send); }
8467
8468 #[test]
8469 fn test_address_discovery_multiple_path_initialization() {
8470 let now = Instant::now();
8472 let config = AddressDiscoveryConfig::SendAndReceive;
8473 let mut state = AddressDiscoveryState::new(&config, now);
8474
8475 assert!(state.should_send_observation(0, now)); assert!(!state.should_send_observation(1, now)); assert!(!state.should_send_observation(2, now)); state.observe_all_paths = true;
8482 assert!(state.should_send_observation(1, now)); assert!(state.should_send_observation(2, now)); let config_primary_only = AddressDiscoveryConfig::SendAndReceive;
8487 let mut state_primary = AddressDiscoveryState::new(&config_primary_only, now);
8488
8489 assert!(state_primary.should_send_observation(0, now)); assert!(!state_primary.should_send_observation(1, now)); }
8492
8493 #[test]
8494 fn test_handle_observed_address_frame_valid() {
8495 let now = Instant::now();
8497 let config = AddressDiscoveryConfig::SendAndReceive;
8498 let mut state = AddressDiscoveryState::new(&config, now);
8499
8500 let observed_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8502 state.handle_observed_address(observed_addr, 0, now);
8503
8504 assert_eq!(state.received_history.len(), 1);
8506 assert_eq!(state.received_history[0].address, observed_addr);
8507 assert_eq!(state.received_history[0].path_id, 0);
8508 assert_eq!(state.received_history[0].received_at, now);
8509
8510 let path_info = state.received_observations.get(&0).unwrap();
8512 assert_eq!(path_info.observed_address, Some(observed_addr));
8513 assert_eq!(path_info.last_observed, Some(now));
8514 assert_eq!(path_info.observation_count, 1);
8515 }
8516
8517 #[test]
8518 fn test_handle_multiple_received_history() {
8519 let now = Instant::now();
8521 let config = AddressDiscoveryConfig::SendAndReceive;
8522 let mut state = AddressDiscoveryState::new(&config, now);
8523
8524 let addr1 = SocketAddr::from(([192, 168, 1, 100], 5000));
8526 let addr2 = SocketAddr::from(([10, 0, 0, 50], 6000));
8527 let addr3 = SocketAddr::from(([192, 168, 1, 100], 7000)); state.handle_observed_address(addr1, 0, now);
8530 state.handle_observed_address(addr2, 1, now);
8531 state.handle_observed_address(addr3, 0, now + Duration::from_millis(100));
8532
8533 assert_eq!(state.received_history.len(), 3);
8535
8536 let path0_info = state.received_observations.get(&0).unwrap();
8538 assert_eq!(path0_info.observed_address, Some(addr3));
8539 assert_eq!(path0_info.observation_count, 1); let path1_info = state.received_observations.get(&1).unwrap();
8543 assert_eq!(path1_info.observed_address, Some(addr2));
8544 assert_eq!(path1_info.observation_count, 1);
8545 }
8546
8547 #[test]
8548 fn test_get_observed_address() {
8549 let now = Instant::now();
8551 let config = AddressDiscoveryConfig::SendAndReceive;
8552 let mut state = AddressDiscoveryState::new(&config, now);
8553
8554 assert_eq!(state.get_observed_address(0), None);
8556
8557 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8559 state.handle_observed_address(addr, 0, now);
8560
8561 assert_eq!(state.get_observed_address(0), Some(addr));
8563
8564 assert_eq!(state.get_observed_address(999), None);
8566 }
8567
8568 #[test]
8569 fn test_has_unnotified_changes() {
8570 let now = Instant::now();
8572 let config = AddressDiscoveryConfig::SendAndReceive;
8573 let mut state = AddressDiscoveryState::new(&config, now);
8574
8575 assert!(!state.has_unnotified_changes());
8577
8578 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8580 state.handle_observed_address(addr, 0, now);
8581 assert!(state.has_unnotified_changes());
8582
8583 if let Some(path_info) = state.received_observations.get_mut(&0) {
8585 path_info.notified = true;
8586 }
8587 assert!(!state.has_unnotified_changes());
8588
8589 let addr2 = SocketAddr::from(([192, 168, 1, 100], 6000));
8591 state.handle_observed_address(addr2, 0, now + Duration::from_secs(1));
8592 assert!(state.has_unnotified_changes());
8593 }
8594
8595 #[test]
8596 fn test_address_discovery_disabled() {
8597 let now = Instant::now();
8599 let config = AddressDiscoveryConfig::SendAndReceive;
8600 let mut state = AddressDiscoveryState::new(&config, now);
8601
8602 state.enabled = false;
8604
8605 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8607 state.handle_observed_address(addr, 0, now);
8608
8609 assert_eq!(state.received_history.len(), 0);
8611
8612 assert!(!state.should_send_observation(0, now));
8614 }
8615
8616 #[test]
8617 fn test_rate_limiting_basic() {
8618 let now = Instant::now();
8620 let config = AddressDiscoveryConfig::SendAndReceive;
8621 let mut state = AddressDiscoveryState::new(&config, now);
8622
8623 state.observe_all_paths = true;
8625 state.rate_limiter.set_rate(2); assert!(state.should_send_observation(0, now));
8629 state.record_observation_sent(0);
8631
8632 assert!(state.should_send_observation(1, now));
8634 state.record_observation_sent(1);
8635
8636 assert!(!state.should_send_observation(2, now));
8638
8639 let later = now + Duration::from_millis(500);
8641 assert!(state.should_send_observation(3, later));
8642 state.record_observation_sent(3);
8643
8644 assert!(!state.should_send_observation(4, later));
8646
8647 let _one_sec_later = now + Duration::from_secs(1);
8651 let two_sec_later = now + Duration::from_secs(2);
8655 assert!(state.should_send_observation(5, two_sec_later));
8656 state.record_observation_sent(5);
8657
8658 assert!(state.should_send_observation(6, two_sec_later));
8669 state.record_observation_sent(6);
8670
8671 assert!(
8673 !state.should_send_observation(7, two_sec_later),
8674 "Expected no tokens available"
8675 );
8676 }
8677
8678 #[test]
8679 fn test_rate_limiting_per_path() {
8680 let now = Instant::now();
8682 let config = AddressDiscoveryConfig::SendAndReceive;
8683 let mut state = AddressDiscoveryState::new(&config, now);
8684
8685 state
8687 .sent_observations
8688 .insert(0, paths::PathAddressInfo::new());
8689 state
8690 .sent_observations
8691 .get_mut(&0)
8692 .unwrap()
8693 .observed_address = Some(SocketAddr::new(
8694 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
8695 8080,
8696 ));
8697
8698 for _ in 0..10 {
8700 assert!(state.should_send_observation(0, now));
8701 state.record_observation_sent(0);
8702 state.sent_observations.get_mut(&0).unwrap().notified = false;
8704 }
8705
8706 assert!(!state.should_send_observation(0, now));
8708
8709 let later = now + Duration::from_millis(100);
8711 assert!(state.should_send_observation(0, later));
8712 state.record_observation_sent(0);
8713
8714 state.sent_observations.get_mut(&0).unwrap().notified = false;
8716
8717 assert!(!state.should_send_observation(0, later));
8719 }
8720
8721 #[test]
8722 fn test_rate_limiting_zero_rate() {
8723 let now = Instant::now();
8725 let config = AddressDiscoveryConfig::SendAndReceive;
8726 let mut state = AddressDiscoveryState::new(&config, now);
8727
8728 state.rate_limiter.set_rate(0);
8730 state.rate_limiter.tokens = 0.0;
8731 state.rate_limiter.max_tokens = 0.0;
8732
8733 assert!(!state.should_send_observation(0, now));
8735 assert!(!state.should_send_observation(0, now + Duration::from_secs(10)));
8736 assert!(!state.should_send_observation(0, now + Duration::from_secs(100)));
8737 }
8738
8739 #[test]
8740 fn test_rate_limiting_update() {
8741 let now = Instant::now();
8743 let config = AddressDiscoveryConfig::SendAndReceive;
8744 let mut state = AddressDiscoveryState::new(&config, now);
8745
8746 state.observe_all_paths = true;
8748
8749 for i in 0..12 {
8751 state
8752 .sent_observations
8753 .insert(i, paths::PathAddressInfo::new());
8754 state
8755 .sent_observations
8756 .get_mut(&i)
8757 .unwrap()
8758 .observed_address = Some(SocketAddr::new(
8759 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8760 8080,
8761 ));
8762 }
8763
8764 for i in 0..10 {
8767 assert!(state.should_send_observation(i, now));
8768 state.record_observation_sent(i);
8769 }
8770 assert!(!state.should_send_observation(10, now));
8772
8773 state.update_rate_limit(20.0);
8775
8776 let later = now + Duration::from_millis(50);
8779 assert!(state.should_send_observation(10, later));
8780 state.record_observation_sent(10);
8781
8782 let later2 = now + Duration::from_millis(100);
8784 assert!(state.should_send_observation(11, later2));
8785 }
8786
8787 #[test]
8788 fn test_rate_limiting_burst() {
8789 let now = Instant::now();
8791 let config = AddressDiscoveryConfig::SendAndReceive;
8792 let mut state = AddressDiscoveryState::new(&config, now);
8793
8794 for _ in 0..10 {
8796 assert!(state.should_send_observation(0, now));
8797 state.record_observation_sent(0);
8798 }
8799
8800 assert!(!state.should_send_observation(0, now));
8802
8803 let later = now + Duration::from_millis(100);
8805 assert!(state.should_send_observation(0, later));
8806 state.record_observation_sent(0);
8807 assert!(!state.should_send_observation(0, later));
8808 }
8809
8810 #[test]
8811 fn test_connection_rate_limiting_with_check_observations() {
8812 let now = Instant::now();
8814 let config = AddressDiscoveryConfig::SendAndReceive;
8815 let mut state = AddressDiscoveryState::new(&config, now);
8816
8817 let mut path_info = paths::PathAddressInfo::new();
8819 path_info.update_observed_address(
8820 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
8821 now,
8822 );
8823 state.sent_observations.insert(0, path_info);
8824
8825 let frame1 =
8827 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8828 assert!(frame1.is_some());
8829 state.record_observation_sent(0);
8830
8831 if let Some(info) = state.sent_observations.get_mut(&0) {
8833 info.notified = false;
8834 }
8835
8836 for _ in 1..10 {
8838 if let Some(info) = state.sent_observations.get_mut(&0) {
8840 info.notified = false;
8841 }
8842 let frame =
8843 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8844 assert!(frame.is_some());
8845 state.record_observation_sent(0);
8846 }
8847
8848 if let Some(info) = state.sent_observations.get_mut(&0) {
8850 info.notified = false;
8851 }
8852 let frame3 =
8853 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8854 assert!(frame3.is_none()); let later = now + Duration::from_millis(100);
8858 state.rate_limiter.update_tokens(later); if let Some(info) = state.sent_observations.get_mut(&0) {
8862 info.notified = false;
8863 }
8864
8865 let frame4 =
8866 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8867 assert!(frame4.is_some()); }
8869
8870 #[test]
8871 fn test_queue_observed_address_frame() {
8872 let now = Instant::now();
8874 let config = AddressDiscoveryConfig::SendAndReceive;
8875 let mut state = AddressDiscoveryState::new(&config, now);
8876
8877 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8878
8879 let frame = state.queue_observed_address_frame(0, addr);
8881 assert!(frame.is_some());
8882 assert_eq!(frame.unwrap().address, addr);
8883
8884 state.record_observation_sent(0);
8886
8887 for i in 0..9 {
8889 if let Some(info) = state.sent_observations.get_mut(&0) {
8891 info.notified = false;
8892 }
8893
8894 let frame = state.queue_observed_address_frame(0, addr);
8895 assert!(frame.is_some(), "Frame {} should be allowed", i + 2);
8896 state.record_observation_sent(0);
8897 }
8898
8899 if let Some(info) = state.sent_observations.get_mut(&0) {
8901 info.notified = false;
8902 }
8903
8904 let frame = state.queue_observed_address_frame(0, addr);
8906 assert!(frame.is_none(), "11th frame should be rate limited");
8907 }
8908
8909 #[test]
8910 fn test_multi_path_basic() {
8911 let now = Instant::now();
8913 let config = AddressDiscoveryConfig::SendAndReceive;
8914 let mut state = AddressDiscoveryState::new(&config, now);
8915
8916 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8917 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8918 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8919
8920 state.handle_observed_address(addr1, 0, now);
8922 state.handle_observed_address(addr2, 1, now);
8923 state.handle_observed_address(addr3, 2, now);
8924
8925 assert_eq!(state.get_observed_address(0), Some(addr1));
8927 assert_eq!(state.get_observed_address(1), Some(addr2));
8928 assert_eq!(state.get_observed_address(2), Some(addr3));
8929
8930 assert!(state.has_unnotified_changes());
8932
8933 assert_eq!(state.received_history.len(), 3);
8935 }
8936
8937 #[test]
8938 fn test_multi_path_observe_primary_only() {
8939 let now = Instant::now();
8941 let config = AddressDiscoveryConfig::SendAndReceive;
8942 let mut state = AddressDiscoveryState::new(&config, now);
8943
8944 assert!(state.should_send_observation(0, now));
8946 state.record_observation_sent(0);
8947
8948 assert!(!state.should_send_observation(1, now));
8950 assert!(!state.should_send_observation(2, now));
8951
8952 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8954 assert!(state.queue_observed_address_frame(0, addr).is_some());
8955 assert!(state.queue_observed_address_frame(1, addr).is_none());
8956 assert!(state.queue_observed_address_frame(2, addr).is_none());
8957 }
8958
8959 #[test]
8960 fn test_multi_path_rate_limiting() {
8961 let now = Instant::now();
8963 let config = AddressDiscoveryConfig::SendAndReceive;
8964 let mut state = AddressDiscoveryState::new(&config, now);
8965
8966 state.observe_all_paths = true;
8968
8969 for i in 0..21 {
8971 state
8972 .sent_observations
8973 .insert(i, paths::PathAddressInfo::new());
8974 state
8975 .sent_observations
8976 .get_mut(&i)
8977 .unwrap()
8978 .observed_address = Some(SocketAddr::new(
8979 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8980 8080,
8981 ));
8982 }
8983
8984 for i in 0..10 {
8986 assert!(state.should_send_observation(i, now));
8987 state.record_observation_sent(i);
8988 }
8989
8990 assert!(!state.should_send_observation(10, now));
8992
8993 state.sent_observations.get_mut(&0).unwrap().notified = false;
8995 assert!(!state.should_send_observation(0, now)); let later = now + Duration::from_secs(1);
8999 for i in 10..20 {
9000 assert!(state.should_send_observation(i, later));
9001 state.record_observation_sent(i);
9002 }
9003 assert!(!state.should_send_observation(20, later));
9005 }
9006
9007 #[test]
9008 fn test_multi_path_address_changes() {
9009 let now = Instant::now();
9011 let config = AddressDiscoveryConfig::SendAndReceive;
9012 let mut state = AddressDiscoveryState::new(&config, now);
9013
9014 let addr1a = SocketAddr::from(([192, 168, 1, 1], 5000));
9015 let addr1b = SocketAddr::from(([192, 168, 1, 2], 5000));
9016 let addr2a = SocketAddr::from(([10, 0, 0, 1], 6000));
9017 let addr2b = SocketAddr::from(([10, 0, 0, 2], 6000));
9018
9019 state.handle_observed_address(addr1a, 0, now);
9021 state.handle_observed_address(addr2a, 1, now);
9022
9023 if let Some(info) = state.received_observations.get_mut(&0) {
9025 info.notified = true;
9026 }
9027 if let Some(info) = state.received_observations.get_mut(&1) {
9028 info.notified = true;
9029 }
9030 assert!(!state.has_unnotified_changes());
9031
9032 state.handle_observed_address(addr1b, 0, now + Duration::from_secs(1));
9034 assert!(state.has_unnotified_changes());
9035
9036 assert_eq!(state.get_observed_address(0), Some(addr1b));
9038 assert_eq!(state.get_observed_address(1), Some(addr2a));
9039
9040 if let Some(info) = state.received_observations.get_mut(&0) {
9042 info.notified = true;
9043 }
9044 assert!(!state.has_unnotified_changes());
9045
9046 state.handle_observed_address(addr2b, 1, now + Duration::from_secs(2));
9048 assert!(state.has_unnotified_changes());
9049 }
9050
9051 #[test]
9052 fn test_multi_path_migration() {
9053 let now = Instant::now();
9055 let config = AddressDiscoveryConfig::SendAndReceive;
9056 let mut state = AddressDiscoveryState::new(&config, now);
9057
9058 let addr_old = SocketAddr::from(([192, 168, 1, 1], 5000));
9059 let addr_new = SocketAddr::from(([10, 0, 0, 1], 6000));
9060
9061 state.handle_observed_address(addr_old, 0, now);
9063 assert_eq!(state.get_observed_address(0), Some(addr_old));
9064
9065 state.handle_observed_address(addr_new, 1, now + Duration::from_secs(1));
9067
9068 assert_eq!(state.get_observed_address(0), Some(addr_old));
9070 assert_eq!(state.get_observed_address(1), Some(addr_new));
9071
9072 assert_eq!(state.received_observations.len(), 2);
9075 }
9076
9077 #[test]
9078 fn test_check_for_address_observations_multi_path() {
9079 let now = Instant::now();
9081 let config = AddressDiscoveryConfig::SendAndReceive;
9082 let mut state = AddressDiscoveryState::new(&config, now);
9083
9084 state.observe_all_paths = true;
9086
9087 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
9089 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
9090 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
9091
9092 state
9094 .sent_observations
9095 .insert(0, paths::PathAddressInfo::new());
9096 state
9097 .sent_observations
9098 .get_mut(&0)
9099 .unwrap()
9100 .observed_address = Some(addr1);
9101 state
9102 .sent_observations
9103 .insert(1, paths::PathAddressInfo::new());
9104 state
9105 .sent_observations
9106 .get_mut(&1)
9107 .unwrap()
9108 .observed_address = Some(addr2);
9109 state
9110 .sent_observations
9111 .insert(2, paths::PathAddressInfo::new());
9112 state
9113 .sent_observations
9114 .get_mut(&2)
9115 .unwrap()
9116 .observed_address = Some(addr3);
9117
9118 let frames = state.check_for_address_observations(0, true, now);
9120
9121 assert_eq!(frames.len(), 3);
9123
9124 let frame_addrs: Vec<_> = frames.iter().map(|f| f.address).collect();
9126 assert!(frame_addrs.contains(&addr1), "addr1 should be in frames");
9127 assert!(frame_addrs.contains(&addr2), "addr2 should be in frames");
9128 assert!(frame_addrs.contains(&addr3), "addr3 should be in frames");
9129
9130 assert!(!state.has_unnotified_changes());
9132 }
9133
9134 #[test]
9135 fn test_multi_path_with_peer_not_supporting() {
9136 let now = Instant::now();
9138 let config = AddressDiscoveryConfig::SendAndReceive;
9139 let mut state = AddressDiscoveryState::new(&config, now);
9140
9141 state.handle_observed_address(SocketAddr::from(([192, 168, 1, 1], 5000)), 0, now);
9143 state.handle_observed_address(SocketAddr::from(([10, 0, 0, 1], 6000)), 1, now);
9144
9145 let frames = state.check_for_address_observations(0, false, now);
9147 assert_eq!(frames.len(), 0);
9148
9149 assert!(state.has_unnotified_changes());
9151 }
9152
9153 #[test]
9155 fn test_bootstrap_node_aggressive_observation_mode() {
9156 let config = AddressDiscoveryConfig::SendAndReceive;
9158 let now = Instant::now();
9159 let mut state = AddressDiscoveryState::new(&config, now);
9160
9161 assert!(!state.is_bootstrap_mode());
9163
9164 state.set_bootstrap_mode(true);
9166 assert!(state.is_bootstrap_mode());
9167
9168 assert!(state.should_observe_path(0)); assert!(state.should_observe_path(1)); assert!(state.should_observe_path(2));
9172
9173 let bootstrap_rate = state.get_effective_rate_limit();
9175 assert!(bootstrap_rate > 10.0); }
9177
9178 #[test]
9179 fn test_bootstrap_node_immediate_observation() {
9180 let config = AddressDiscoveryConfig::SendAndReceive;
9182 let now = Instant::now();
9183 let mut state = AddressDiscoveryState::new(&config, now);
9184 state.set_bootstrap_mode(true);
9185
9186 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
9188 state.handle_observed_address(addr, 0, now);
9189
9190 assert!(state.should_send_observation_immediately(true));
9192
9193 assert!(state.should_send_observation(0, now));
9195
9196 let frame = state.queue_observed_address_frame(0, addr);
9198 assert!(frame.is_some());
9199 }
9200
9201 #[test]
9202 fn test_bootstrap_node_multiple_path_observations() {
9203 let config = AddressDiscoveryConfig::SendAndReceive;
9205 let now = Instant::now();
9206 let mut state = AddressDiscoveryState::new(&config, now);
9207 state.set_bootstrap_mode(true);
9208
9209 let addrs = vec![
9211 (0u64, SocketAddr::from(([192, 168, 1, 1], 5000))),
9212 (1u64, SocketAddr::from(([10, 0, 0, 1], 6000))),
9213 (2u64, SocketAddr::from(([172, 16, 0, 1], 7000))),
9214 ];
9215
9216 for (path_id, addr) in &addrs {
9217 state
9218 .sent_observations
9219 .insert(*path_id, paths::PathAddressInfo::new());
9220 state
9221 .sent_observations
9222 .get_mut(path_id)
9223 .unwrap()
9224 .observed_address = Some(*addr);
9225 }
9226
9227 let frames = state.check_for_address_observations(0, true, now);
9229 assert_eq!(frames.len(), 3);
9230
9231 for (_, addr) in &addrs {
9233 assert!(frames.iter().any(|f| f.address == *addr));
9234 }
9235 }
9236
9237 #[test]
9238 fn test_bootstrap_node_rate_limit_override() {
9239 let config = AddressDiscoveryConfig::SendAndReceive;
9241 let now = Instant::now();
9242 let mut state = AddressDiscoveryState::new(&config, now);
9243 state.set_bootstrap_mode(true);
9244
9245 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
9247
9248 for i in 0..10 {
9250 state.handle_observed_address(addr, i, now);
9251 let can_send = state.should_send_observation(i, now);
9252 assert!(can_send, "Bootstrap node should send observation {i}");
9253 state.record_observation_sent(i);
9254 }
9255 }
9256
9257 #[test]
9258 fn test_bootstrap_node_configuration() {
9259 let config = AddressDiscoveryConfig::SendAndReceive;
9261 let mut state = AddressDiscoveryState::new(&config, Instant::now());
9262
9263 state.set_bootstrap_mode(true);
9265
9266 assert!(state.bootstrap_mode);
9268 assert!(state.enabled);
9269
9270 let effective_rate = state.get_effective_rate_limit();
9272 assert!(effective_rate > state.max_observation_rate as f64);
9273 }
9274
9275 #[test]
9276 fn test_bootstrap_node_persistent_observation() {
9277 let config = AddressDiscoveryConfig::SendAndReceive;
9279 let mut now = Instant::now();
9280 let mut state = AddressDiscoveryState::new(&config, now);
9281 state.set_bootstrap_mode(true);
9282
9283 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
9284 let addr2 = SocketAddr::from(([192, 168, 1, 2], 5000));
9285
9286 state.handle_observed_address(addr1, 0, now);
9288 assert!(state.should_send_observation(0, now));
9289 state.record_observation_sent(0);
9290
9291 now += Duration::from_secs(60);
9293 state.handle_observed_address(addr2, 0, now);
9294
9295 assert!(state.should_send_observation(0, now));
9297 }
9298
9299 #[test]
9300 fn test_bootstrap_node_multi_peer_support() {
9301 let config = AddressDiscoveryConfig::SendAndReceive;
9304 let now = Instant::now();
9305 let mut state = AddressDiscoveryState::new(&config, now);
9306 state.set_bootstrap_mode(true);
9307
9308 let peer_addresses: Vec<(u64, SocketAddr)> = vec![
9310 (0, SocketAddr::from(([192, 168, 1, 1], 5000))), (1, SocketAddr::from(([10, 0, 0, 1], 6000))), (2, SocketAddr::from(([172, 16, 0, 1], 7000))), (3, SocketAddr::from(([192, 168, 2, 1], 8000))), ];
9315
9316 for (path_id, addr) in &peer_addresses {
9318 state
9319 .sent_observations
9320 .insert(*path_id, paths::PathAddressInfo::new());
9321 state
9322 .sent_observations
9323 .get_mut(path_id)
9324 .unwrap()
9325 .observed_address = Some(*addr);
9326 }
9327
9328 let frames = state.check_for_address_observations(0, true, now);
9330 assert_eq!(frames.len(), peer_addresses.len());
9331
9332 for (_, addr) in &peer_addresses {
9334 assert!(frames.iter().any(|f| f.address == *addr));
9335 }
9336 }
9337
9338 mod address_discovery_tests {
9340 include!("address_discovery_tests.rs");
9341 }
9342}