1use std::{
2 cmp,
3 collections::VecDeque,
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12use rand::{Rng, SeedableRng, rngs::StdRng};
15use thiserror::Error;
16use tracing::{debug, error, info, trace, trace_span, warn};
17
18use crate::{
19 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
20 MIN_INITIAL_SIZE, MtuDiscoveryConfig, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
21 TransportError, TransportErrorCode, VarInt,
22 cid_generator::ConnectionIdGenerator,
23 cid_queue::CidQueue,
24 coding::BufMutExt,
25 config::{ServerConfig, TransportConfig},
26 crypto::{self, KeyPair, Keys, PacketKey},
27 endpoint::AddressDiscoveryStats,
28 frame::{self, Close, Datagram, FrameStruct, NewToken},
29 packet::{
30 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
31 PacketNumber, PartialDecode, SpaceId,
32 },
33 range_set::ArrayRangeSet,
34 shared::{
35 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
36 EndpointEvent, EndpointEventInner,
37 },
38 token::{ResetToken, Token, TokenPayload},
39 transport_parameters::TransportParameters,
40};
41
42mod ack_frequency;
43use ack_frequency::AckFrequencyState;
44
45pub(crate) mod nat_traversal;
46use nat_traversal::NatTraversalState;
47pub(crate) use nat_traversal::{CoordinationPhase, NatTraversalError, NatTraversalRole};
48
49mod assembler;
50pub use assembler::Chunk;
51
52mod cid_state;
53use cid_state::CidState;
54
55mod datagrams;
56use datagrams::DatagramState;
57pub use datagrams::{Datagrams, SendDatagramError};
58
59mod mtud;
60use mtud::MtuDiscovery;
61
62mod pacing;
63
64mod packet_builder;
65use packet_builder::PacketBuilder;
66
67mod packet_crypto;
68use packet_crypto::{PrevCrypto, ZeroRttCrypto};
69
70mod paths;
71pub use paths::RttEstimator;
72use paths::{NatTraversalChallenges, PathData, PathResponses};
73
74mod send_buffer;
75
76mod spaces;
77#[cfg(fuzzing)]
78pub use spaces::Retransmits;
79#[cfg(not(fuzzing))]
80use spaces::Retransmits;
81use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
82
83mod stats;
84pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
85
86mod streams;
87#[cfg(fuzzing)]
88pub use streams::StreamsState;
89#[cfg(not(fuzzing))]
90use streams::StreamsState;
91pub use streams::{
92 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
93 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
94};
95
96mod timer;
97use crate::congestion::Controller;
98use timer::{Timer, TimerTable};
99
100pub struct Connection {
140 endpoint_config: Arc<EndpointConfig>,
141 config: Arc<TransportConfig>,
142 rng: StdRng,
143 crypto: Box<dyn crypto::Session>,
144 handshake_cid: ConnectionId,
146 rem_handshake_cid: ConnectionId,
148 local_ip: Option<IpAddr>,
151 path: PathData,
152 allow_mtud: bool,
154 prev_path: Option<(ConnectionId, PathData)>,
155 state: State,
156 side: ConnectionSide,
157 zero_rtt_enabled: bool,
159 zero_rtt_crypto: Option<ZeroRttCrypto>,
161 key_phase: bool,
162 key_phase_size: u64,
164 peer_params: TransportParameters,
166 orig_rem_cid: ConnectionId,
168 initial_dst_cid: ConnectionId,
170 retry_src_cid: Option<ConnectionId>,
173 lost_packets: u64,
175 events: VecDeque<Event>,
176 endpoint_events: VecDeque<EndpointEventInner>,
177 spin_enabled: bool,
179 spin: bool,
181 spaces: [PacketSpace; 3],
183 highest_space: SpaceId,
185 prev_crypto: Option<PrevCrypto>,
187 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
192 accepted_0rtt: bool,
193 permit_idle_reset: bool,
195 idle_timeout: Option<Duration>,
197 timers: TimerTable,
198 authentication_failures: u64,
200 error: Option<ConnectionError>,
202 packet_number_filter: PacketNumberFilter,
204
205 path_responses: PathResponses,
210 nat_traversal_challenges: NatTraversalChallenges,
212 close: bool,
213
214 ack_frequency: AckFrequencyState,
218
219 pto_count: u32,
224
225 receiving_ecn: bool,
230 total_authed_packets: u64,
232 app_limited: bool,
235
236 streams: StreamsState,
237 rem_cids: CidQueue,
239 local_cid_state: CidState,
241 datagrams: DatagramState,
243 stats: ConnectionStats,
245 version: u32,
247
248 nat_traversal: Option<NatTraversalState>,
250
251 nat_traversal_frame_config: frame::nat_traversal_unified::NatTraversalFrameConfig,
253
254 address_discovery_state: Option<AddressDiscoveryState>,
256
257 pqc_state: PqcState,
259
260 #[cfg(feature = "trace")]
262 trace_context: crate::tracing::TraceContext,
263
264 #[cfg(feature = "trace")]
266 event_log: Arc<crate::tracing::EventLog>,
267
268 #[cfg(feature = "__qlog")]
270 qlog_streamer: Option<Box<dyn std::io::Write + Send + Sync>>,
271}
272
273impl Connection {
274 pub(crate) fn new(
275 endpoint_config: Arc<EndpointConfig>,
276 config: Arc<TransportConfig>,
277 init_cid: ConnectionId,
278 loc_cid: ConnectionId,
279 rem_cid: ConnectionId,
280 remote: SocketAddr,
281 local_ip: Option<IpAddr>,
282 crypto: Box<dyn crypto::Session>,
283 cid_gen: &dyn ConnectionIdGenerator,
284 now: Instant,
285 version: u32,
286 allow_mtud: bool,
287 rng_seed: [u8; 32],
288 side_args: SideArgs,
289 ) -> Self {
290 let pref_addr_cid = side_args.pref_addr_cid();
291 let path_validated = side_args.path_validated();
292 let connection_side = ConnectionSide::from(side_args);
293 let side = connection_side.side();
294 let initial_space = PacketSpace {
295 crypto: Some(crypto.initial_keys(&init_cid, side)),
296 ..PacketSpace::new(now)
297 };
298 let state = State::Handshake(state::Handshake {
299 rem_cid_set: side.is_server(),
300 expected_token: Bytes::new(),
301 client_hello: None,
302 });
303 let mut rng = StdRng::from_seed(rng_seed);
304 let mut this = Self {
305 endpoint_config,
306 crypto,
307 handshake_cid: loc_cid,
308 rem_handshake_cid: rem_cid,
309 local_cid_state: CidState::new(
310 cid_gen.cid_len(),
311 cid_gen.cid_lifetime(),
312 now,
313 if pref_addr_cid.is_some() { 2 } else { 1 },
314 ),
315 path: PathData::new(remote, allow_mtud, None, now, &config),
316 allow_mtud,
317 local_ip,
318 prev_path: None,
319 state,
320 side: connection_side,
321 zero_rtt_enabled: false,
322 zero_rtt_crypto: None,
323 key_phase: false,
324 key_phase_size: rng.gen_range(10..1000),
331 peer_params: TransportParameters::default(),
332 orig_rem_cid: rem_cid,
333 initial_dst_cid: init_cid,
334 retry_src_cid: None,
335 lost_packets: 0,
336 events: VecDeque::new(),
337 endpoint_events: VecDeque::new(),
338 spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
339 spin: false,
340 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
341 highest_space: SpaceId::Initial,
342 prev_crypto: None,
343 next_crypto: None,
344 accepted_0rtt: false,
345 permit_idle_reset: true,
346 idle_timeout: match config.max_idle_timeout {
347 None | Some(VarInt(0)) => None,
348 Some(dur) => Some(Duration::from_millis(dur.0)),
349 },
350 timers: TimerTable::default(),
351 authentication_failures: 0,
352 error: None,
353 #[cfg(test)]
354 packet_number_filter: match config.deterministic_packet_numbers {
355 false => PacketNumberFilter::new(&mut rng),
356 true => PacketNumberFilter::disabled(),
357 },
358 #[cfg(not(test))]
359 packet_number_filter: PacketNumberFilter::new(&mut rng),
360
361 path_responses: PathResponses::default(),
362 nat_traversal_challenges: NatTraversalChallenges::default(),
363 close: false,
364
365 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
366 &TransportParameters::default(),
367 )),
368
369 pto_count: 0,
370
371 app_limited: false,
372 receiving_ecn: false,
373 total_authed_packets: 0,
374
375 streams: StreamsState::new(
376 side,
377 config.max_concurrent_uni_streams,
378 config.max_concurrent_bidi_streams,
379 config.send_window,
380 config.receive_window,
381 config.stream_receive_window,
382 ),
383 datagrams: DatagramState::default(),
384 config,
385 rem_cids: CidQueue::new(rem_cid),
386 rng,
387 stats: ConnectionStats::default(),
388 version,
389 nat_traversal: None, nat_traversal_frame_config:
391 frame::nat_traversal_unified::NatTraversalFrameConfig::default(),
392 address_discovery_state: {
393 Some(AddressDiscoveryState::new(
396 &crate::transport_parameters::AddressDiscoveryConfig::default(),
397 now,
398 ))
399 },
400 pqc_state: PqcState::new(),
401
402 #[cfg(feature = "trace")]
403 trace_context: crate::tracing::TraceContext::new(crate::tracing::TraceId::new()),
404
405 #[cfg(feature = "trace")]
406 event_log: crate::tracing::global_log(),
407
408 #[cfg(feature = "__qlog")]
409 qlog_streamer: None,
410 };
411
412 #[cfg(feature = "trace")]
414 {
415 use crate::trace_event;
416 use crate::tracing::{Event, EventData, socket_addr_to_bytes, timestamp_now};
417 let _peer_id = {
419 let mut id = [0u8; 32];
420 let addr_bytes = match remote {
421 SocketAddr::V4(addr) => addr.ip().octets().to_vec(),
422 SocketAddr::V6(addr) => addr.ip().octets().to_vec(),
423 };
424 id[..addr_bytes.len().min(32)]
425 .copy_from_slice(&addr_bytes[..addr_bytes.len().min(32)]);
426 id
427 };
428
429 let (addr_bytes, addr_type) = socket_addr_to_bytes(remote);
430 trace_event!(
431 &this.event_log,
432 Event {
433 timestamp: timestamp_now(),
434 trace_id: this.trace_context.trace_id(),
435 sequence: 0,
436 _padding: 0,
437 node_id: [0u8; 32], event_data: EventData::ConnInit {
439 endpoint_bytes: addr_bytes,
440 addr_type,
441 _padding: [0u8; 45],
442 },
443 }
444 );
445 }
446
447 if path_validated {
448 this.on_path_validated();
449 }
450 if side.is_client() {
451 this.write_crypto();
453 this.init_0rtt();
454 }
455 this
456 }
457
458 #[cfg(feature = "__qlog")]
460 pub fn set_qlog(
461 &mut self,
462 writer: Box<dyn std::io::Write + Send + Sync>,
463 _title: Option<String>,
464 _description: Option<String>,
465 _now: Instant,
466 ) {
467 self.qlog_streamer = Some(writer);
468 }
469
470 #[cfg(feature = "__qlog")]
472 fn emit_qlog_recovery_metrics(&mut self, _now: Instant) {
473 }
476
477 #[must_use]
485 pub fn poll_timeout(&mut self) -> Option<Instant> {
486 let mut next_timeout = self.timers.next_timeout();
487
488 if let Some(nat_state) = &self.nat_traversal {
490 if let Some(nat_timeout) = nat_state.get_next_timeout(Instant::now()) {
491 self.timers.set(Timer::NatTraversal, nat_timeout);
493 next_timeout = Some(next_timeout.map_or(nat_timeout, |t| t.min(nat_timeout)));
494 }
495 }
496
497 next_timeout
498 }
499
500 #[must_use]
506 pub fn poll(&mut self) -> Option<Event> {
507 if let Some(x) = self.events.pop_front() {
508 return Some(x);
509 }
510
511 if let Some(event) = self.streams.poll() {
512 return Some(Event::Stream(event));
513 }
514
515 if let Some(err) = self.error.take() {
516 return Some(Event::ConnectionLost { reason: err });
517 }
518
519 None
520 }
521
522 #[must_use]
524 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
525 self.endpoint_events.pop_front().map(EndpointEvent)
526 }
527
528 #[must_use]
530 pub fn streams(&mut self) -> Streams<'_> {
531 Streams {
532 state: &mut self.streams,
533 conn_state: &self.state,
534 }
535 }
536
537 #[cfg(feature = "trace")]
539 pub(crate) fn trace_context(&self) -> &crate::tracing::TraceContext {
540 &self.trace_context
541 }
542
543 #[cfg(feature = "trace")]
545 pub(crate) fn event_log(&self) -> &Arc<crate::tracing::EventLog> {
546 &self.event_log
547 }
548
549 #[must_use]
551 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
552 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
553 RecvStream {
554 id,
555 state: &mut self.streams,
556 pending: &mut self.spaces[SpaceId::Data].pending,
557 }
558 }
559
560 #[must_use]
562 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
563 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
564 SendStream {
565 id,
566 state: &mut self.streams,
567 pending: &mut self.spaces[SpaceId::Data].pending,
568 conn_state: &self.state,
569 }
570 }
571
572 #[must_use]
582 pub fn poll_transmit(
583 &mut self,
584 now: Instant,
585 max_datagrams: usize,
586 buf: &mut Vec<u8>,
587 ) -> Option<Transmit> {
588 assert!(max_datagrams != 0);
589 let max_datagrams = match self.config.enable_segmentation_offload {
590 false => 1,
591 true => max_datagrams,
592 };
593
594 let mut num_datagrams = 0;
595 let mut datagram_start = 0;
598 let mut segment_size = usize::from(self.path.current_mtu());
599
600 if let Some(nat_traversal) = &mut self.nat_traversal {
602 if nat_traversal.check_coordination_timeout(now) {
603 trace!("NAT traversal coordination timed out, may retry");
604 }
605 }
606
607 if let Some(challenge) = self.send_nat_traversal_challenge(now, buf) {
609 return Some(challenge);
610 }
611
612 if let Some(challenge) = self.send_path_challenge(now, buf) {
613 return Some(challenge);
614 }
615
616 for space in SpaceId::iter() {
618 let request_immediate_ack =
619 space == SpaceId::Data && self.peer_supports_ack_frequency();
620 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
621 }
622
623 let close = match self.state {
625 State::Drained => {
626 self.app_limited = true;
627 return None;
628 }
629 State::Draining | State::Closed(_) => {
630 if !self.close {
633 self.app_limited = true;
634 return None;
635 }
636 true
637 }
638 _ => false,
639 };
640
641 if let Some(config) = &self.config.ack_frequency_config {
643 self.spaces[SpaceId::Data].pending.ack_frequency = self
644 .ack_frequency
645 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
646 && self.highest_space == SpaceId::Data
647 && self.peer_supports_ack_frequency();
648 }
649
650 let mut buf_capacity = 0;
654
655 let mut coalesce = true;
656 let mut builder_storage: Option<PacketBuilder> = None;
657 let mut sent_frames = None;
658 let mut pad_datagram = false;
659 let mut pad_datagram_to_mtu = false;
660 let mut congestion_blocked = false;
661
662 let mut space_idx = 0;
664 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
665 while space_idx < spaces.len() {
668 let space_id = spaces[space_idx];
669 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
676 let frame_space_1rtt =
677 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
678
679 let can_send = self.space_can_send(space_id, frame_space_1rtt);
681 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
682 space_idx += 1;
683 continue;
684 }
685
686 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
687 || self.spaces[space_id].ping_pending
688 || self.spaces[space_id].immediate_ack_pending;
689 if space_id == SpaceId::Data {
690 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
691 }
692
693 pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
694
695 let buf_end = if let Some(builder) = &builder_storage {
699 buf.len().max(builder.min_size) + builder.tag_len
700 } else {
701 buf.len()
702 };
703
704 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
705 crypto.packet.local.tag_len()
706 } else if space_id == SpaceId::Data {
707 match self.zero_rtt_crypto.as_ref() {
708 Some(crypto) => crypto.packet.tag_len(),
709 None => {
710 error!(
712 "sending packets in the application data space requires known 0-RTT or 1-RTT keys"
713 );
714 return None;
715 }
716 }
717 } else {
718 unreachable!("tried to send {:?} packet without keys", space_id)
719 };
720 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
721 if num_datagrams >= max_datagrams {
725 break;
727 }
728
729 if self
736 .path
737 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
738 {
739 trace!("blocked by anti-amplification");
740 break;
741 }
742
743 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
746 let untracked_bytes = if let Some(builder) = &builder_storage {
748 buf_capacity - builder.partial_encode.start
749 } else {
750 0
751 } as u64;
752 debug_assert!(untracked_bytes <= segment_size as u64);
753
754 let bytes_to_send = segment_size as u64 + untracked_bytes;
755 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
756 space_idx += 1;
757 congestion_blocked = true;
758 trace!("blocked by congestion control");
761 continue;
762 }
763
764 let smoothed_rtt = self.path.rtt.get();
766 if let Some(delay) = self.path.pacing.delay(
767 smoothed_rtt,
768 bytes_to_send,
769 self.path.current_mtu(),
770 self.path.congestion.window(),
771 now,
772 ) {
773 self.timers.set(Timer::Pacing, delay);
774 congestion_blocked = true;
775 trace!("blocked by pacing");
778 break;
779 }
780 }
781
782 if let Some(mut builder) = builder_storage.take() {
784 if pad_datagram {
785 builder.pad_to(self.pqc_state.min_initial_size());
786 }
787
788 if num_datagrams > 1 || pad_datagram_to_mtu {
789 const MAX_PADDING: usize = 16;
802 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
803 - datagram_start
804 + builder.tag_len;
805 if (packet_len_unpadded + MAX_PADDING < segment_size
806 && !pad_datagram_to_mtu)
807 || datagram_start + segment_size > buf_capacity
808 {
809 trace!(
810 "GSO truncated by demand for {} padding bytes or loss probe",
811 segment_size - packet_len_unpadded
812 );
813 builder_storage = Some(builder);
814 break;
815 }
816
817 builder.pad_to(segment_size as u16);
820 }
821
822 builder.finish_and_track(now, self, sent_frames.take(), buf);
823
824 if num_datagrams == 1 {
825 segment_size = buf.len();
832 buf_capacity = buf.len();
835
836 if space_id == SpaceId::Data {
843 let frame_space_1rtt =
844 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
845 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
846 break;
847 }
848 }
849 }
850 }
851
852 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
854 0 => segment_size,
855 _ => {
856 self.spaces[space_id].loss_probes -= 1;
857 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
861 }
862 };
863 buf_capacity += next_datagram_size_limit;
864 if buf.capacity() < buf_capacity {
865 buf.reserve(max_datagrams * segment_size);
874 }
875 num_datagrams += 1;
876 coalesce = true;
877 pad_datagram = false;
878 datagram_start = buf.len();
879
880 debug_assert_eq!(
881 datagram_start % segment_size,
882 0,
883 "datagrams in a GSO batch must be aligned to the segment size"
884 );
885 } else {
886 if let Some(builder) = builder_storage.take() {
890 builder.finish_and_track(now, self, sent_frames.take(), buf);
891 }
892 }
893
894 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
895
896 if self.spaces[SpaceId::Initial].crypto.is_some()
901 && space_id == SpaceId::Handshake
902 && self.side.is_client()
903 {
904 self.discard_space(now, SpaceId::Initial);
907 }
908 if let Some(ref mut prev) = self.prev_crypto {
909 prev.update_unacked = false;
910 }
911
912 debug_assert!(
913 builder_storage.is_none() && sent_frames.is_none(),
914 "Previous packet must have been finished"
915 );
916
917 let builder = builder_storage.insert(PacketBuilder::new(
918 now,
919 space_id,
920 self.rem_cids.active(),
921 buf,
922 buf_capacity,
923 datagram_start,
924 ack_eliciting,
925 self,
926 )?);
927 coalesce = coalesce && !builder.short_header;
928
929 if self
931 .pqc_state
932 .should_adjust_coalescing(buf.len() - datagram_start, space_id)
933 {
934 coalesce = false;
935 trace!("Disabling coalescing for PQC handshake in {:?}", space_id);
936 }
937
938 pad_datagram |=
940 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
941
942 if close {
943 trace!("sending CONNECTION_CLOSE");
944 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
949 Self::populate_acks(
950 now,
951 self.receiving_ecn,
952 &mut SentFrames::default(),
953 &mut self.spaces[space_id],
954 buf,
955 &mut self.stats,
956 );
957 }
958
959 debug_assert!(
963 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
964 "ACKs should leave space for ConnectionClose"
965 );
966 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
967 let max_frame_size = builder.max_size - buf.len();
968 match self.state {
969 State::Closed(state::Closed { ref reason }) => {
970 if space_id == SpaceId::Data || reason.is_transport_layer() {
971 reason.encode(buf, max_frame_size)
972 } else {
973 frame::ConnectionClose {
974 error_code: TransportErrorCode::APPLICATION_ERROR,
975 frame_type: None,
976 reason: Bytes::new(),
977 }
978 .encode(buf, max_frame_size)
979 }
980 }
981 State::Draining => frame::ConnectionClose {
982 error_code: TransportErrorCode::NO_ERROR,
983 frame_type: None,
984 reason: Bytes::new(),
985 }
986 .encode(buf, max_frame_size),
987 _ => unreachable!(
988 "tried to make a close packet when the connection wasn't closed"
989 ),
990 }
991 }
992 if space_id == self.highest_space {
993 self.close = false;
995 break;
997 } else {
998 space_idx += 1;
1002 continue;
1003 }
1004 }
1005
1006 if space_id == SpaceId::Data && num_datagrams == 1 {
1009 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
1010 let mut builder = builder_storage.take().unwrap();
1013 trace!("PATH_RESPONSE {:08x} (off-path)", token);
1014 buf.write(frame::FrameType::PATH_RESPONSE);
1015 buf.write(token);
1016 self.stats.frame_tx.path_response += 1;
1017 builder.pad_to(self.pqc_state.min_initial_size());
1018 builder.finish_and_track(
1019 now,
1020 self,
1021 Some(SentFrames {
1022 non_retransmits: true,
1023 ..SentFrames::default()
1024 }),
1025 buf,
1026 );
1027 self.stats.udp_tx.on_sent(1, buf.len());
1028
1029 #[cfg(feature = "trace")]
1031 {
1032 use crate::trace_packet_sent;
1033 trace_packet_sent!(
1035 &self.event_log,
1036 self.trace_context.trace_id(),
1037 buf.len() as u32,
1038 0 );
1040 }
1041
1042 return Some(Transmit {
1043 destination: remote,
1044 size: buf.len(),
1045 ecn: None,
1046 segment_size: None,
1047 src_ip: self.local_ip,
1048 });
1049 }
1050 }
1051
1052 if space_id == SpaceId::Data && self.address_discovery_state.is_some() {
1054 let peer_supports = self.peer_params.address_discovery.is_some();
1055
1056 if let Some(state) = &mut self.address_discovery_state {
1057 let frames = state.check_for_address_observations(0, peer_supports, now);
1058 self.spaces[space_id]
1059 .pending
1060 .observed_addresses
1061 .extend(frames);
1062 }
1063 }
1064
1065 let sent =
1066 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
1067
1068 debug_assert!(
1075 !(sent.is_ack_only(&self.streams)
1076 && !can_send.acks
1077 && can_send.other
1078 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
1079 && self.datagrams.outgoing.is_empty()),
1080 "SendableFrames was {can_send:?}, but only ACKs have been written"
1081 );
1082 pad_datagram |= sent.requires_padding;
1083
1084 if sent.largest_acked.is_some() {
1085 self.spaces[space_id].pending_acks.acks_sent();
1086 self.timers.stop(Timer::MaxAckDelay);
1087 }
1088
1089 sent_frames = Some(sent);
1091
1092 }
1095
1096 if let Some(mut builder) = builder_storage {
1098 if pad_datagram {
1099 builder.pad_to(self.pqc_state.min_initial_size());
1100 }
1101
1102 if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
1108 builder.pad_to(segment_size as u16);
1109 }
1110
1111 let last_packet_number = builder.exact_number;
1112 builder.finish_and_track(now, self, sent_frames, buf);
1113 self.path
1114 .congestion
1115 .on_sent(now, buf.len() as u64, last_packet_number);
1116
1117 #[cfg(feature = "__qlog")]
1118 self.emit_qlog_recovery_metrics(now);
1119 }
1120
1121 self.app_limited = buf.is_empty() && !congestion_blocked;
1122
1123 if buf.is_empty() && self.state.is_established() {
1125 let space_id = SpaceId::Data;
1126 let probe_size = self
1127 .path
1128 .mtud
1129 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
1130
1131 let buf_capacity = probe_size as usize;
1132 buf.reserve(buf_capacity);
1133
1134 let mut builder = PacketBuilder::new(
1135 now,
1136 space_id,
1137 self.rem_cids.active(),
1138 buf,
1139 buf_capacity,
1140 0,
1141 true,
1142 self,
1143 )?;
1144
1145 buf.write(frame::FrameType::PING);
1147 self.stats.frame_tx.ping += 1;
1148
1149 if self.peer_supports_ack_frequency() {
1151 buf.write(frame::FrameType::IMMEDIATE_ACK);
1152 self.stats.frame_tx.immediate_ack += 1;
1153 }
1154
1155 builder.pad_to(probe_size);
1156 let sent_frames = SentFrames {
1157 non_retransmits: true,
1158 ..Default::default()
1159 };
1160 builder.finish_and_track(now, self, Some(sent_frames), buf);
1161
1162 self.stats.path.sent_plpmtud_probes += 1;
1163 num_datagrams = 1;
1164
1165 trace!(?probe_size, "writing MTUD probe");
1166 }
1167
1168 if buf.is_empty() {
1169 return None;
1170 }
1171
1172 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1173 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1174
1175 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
1176
1177 #[cfg(feature = "trace")]
1179 {
1180 use crate::trace_packet_sent;
1181 let packet_num = self.spaces[SpaceId::Data]
1184 .next_packet_number
1185 .saturating_sub(1);
1186 trace_packet_sent!(
1187 &self.event_log,
1188 self.trace_context.trace_id(),
1189 buf.len() as u32,
1190 packet_num
1191 );
1192 }
1193
1194 Some(Transmit {
1195 destination: self.path.remote,
1196 size: buf.len(),
1197 ecn: if self.path.sending_ecn {
1198 Some(EcnCodepoint::Ect0)
1199 } else {
1200 None
1201 },
1202 segment_size: match num_datagrams {
1203 1 => None,
1204 _ => Some(segment_size),
1205 },
1206 src_ip: self.local_ip,
1207 })
1208 }
1209
1210 fn send_coordination_request(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1212 let should_send = self.nat_traversal.as_ref()?.should_send_punch_request();
1214 if !should_send {
1215 return None;
1216 }
1217
1218 let (round, target_addrs, coordinator_addr) = {
1219 let nat_traversal = self.nat_traversal.as_ref()?;
1220 let coord = nat_traversal.coordination.as_ref()?;
1221 let addrs: Vec<_> = coord.punch_targets.iter().map(|t| t.remote_addr).collect();
1222 (coord.round, addrs, self.path.remote) };
1224
1225 if target_addrs.is_empty() {
1226 return None;
1227 }
1228
1229 debug_assert_eq!(
1230 self.highest_space,
1231 SpaceId::Data,
1232 "PUNCH_ME_NOW queued without 1-RTT keys"
1233 );
1234
1235 buf.reserve(self.pqc_state.min_initial_size() as usize);
1236 let buf_capacity = buf.capacity();
1237
1238 let mut builder = PacketBuilder::new(
1239 now,
1240 SpaceId::Data,
1241 self.rem_cids.active(),
1242 buf,
1243 buf_capacity,
1244 0,
1245 false,
1246 self,
1247 )?;
1248
1249 trace!(
1250 "sending PUNCH_ME_NOW round {} with {} targets",
1251 round,
1252 target_addrs.len()
1253 );
1254
1255 buf.write(frame::FrameType::PUNCH_ME_NOW_IPV4);
1258 buf.write(round);
1259 buf.write(target_addrs.len() as u8);
1260 for addr in target_addrs {
1261 match addr {
1262 SocketAddr::V4(v4) => {
1263 buf.write(4u8); buf.write(u32::from(*v4.ip()));
1265 buf.write(v4.port());
1266 }
1267 SocketAddr::V6(v6) => {
1268 buf.write(6u8); buf.write(*v6.ip());
1270 buf.write(v6.port());
1271 }
1272 }
1273 }
1274
1275 self.stats.frame_tx.ping += 1; builder.pad_to(self.pqc_state.min_initial_size());
1278 builder.finish_and_track(now, self, None, buf);
1279
1280 if let Some(nat_traversal) = &mut self.nat_traversal {
1282 nat_traversal.mark_punch_request_sent();
1283 }
1284
1285 Some(Transmit {
1286 destination: coordinator_addr,
1287 size: buf.len(),
1288 ecn: if self.path.sending_ecn {
1289 Some(EcnCodepoint::Ect0)
1290 } else {
1291 None
1292 },
1293 segment_size: None,
1294 src_ip: self.local_ip,
1295 })
1296 }
1297
1298 fn send_coordinated_path_challenge(
1300 &mut self,
1301 now: Instant,
1302 buf: &mut Vec<u8>,
1303 ) -> Option<Transmit> {
1304 if let Some(nat_traversal) = &mut self.nat_traversal {
1306 if nat_traversal.should_start_punching(now) {
1307 nat_traversal.start_punching_phase(now);
1308 }
1309 }
1310
1311 let (target_addr, challenge) = {
1313 let nat_traversal = self.nat_traversal.as_ref()?;
1314 match nat_traversal.get_coordination_phase() {
1315 Some(CoordinationPhase::Punching) => {
1316 let targets = nat_traversal.get_punch_targets_from_coordination()?;
1317 if targets.is_empty() {
1318 return None;
1319 }
1320 let target = &targets[0];
1322 (target.remote_addr, target.challenge)
1323 }
1324 _ => return None,
1325 }
1326 };
1327
1328 debug_assert_eq!(
1329 self.highest_space,
1330 SpaceId::Data,
1331 "PATH_CHALLENGE queued without 1-RTT keys"
1332 );
1333
1334 buf.reserve(self.pqc_state.min_initial_size() as usize);
1335 let buf_capacity = buf.capacity();
1336
1337 let mut builder = PacketBuilder::new(
1338 now,
1339 SpaceId::Data,
1340 self.rem_cids.active(),
1341 buf,
1342 buf_capacity,
1343 0,
1344 false,
1345 self,
1346 )?;
1347
1348 trace!(
1349 "sending coordinated PATH_CHALLENGE {:08x} to {}",
1350 challenge, target_addr
1351 );
1352 buf.write(frame::FrameType::PATH_CHALLENGE);
1353 buf.write(challenge);
1354 self.stats.frame_tx.path_challenge += 1;
1355
1356 builder.pad_to(self.pqc_state.min_initial_size());
1357 builder.finish_and_track(now, self, None, buf);
1358
1359 if let Some(nat_traversal) = &mut self.nat_traversal {
1361 nat_traversal.mark_coordination_validating();
1362 }
1363
1364 Some(Transmit {
1365 destination: target_addr,
1366 size: buf.len(),
1367 ecn: if self.path.sending_ecn {
1368 Some(EcnCodepoint::Ect0)
1369 } else {
1370 None
1371 },
1372 segment_size: None,
1373 src_ip: self.local_ip,
1374 })
1375 }
1376
1377 fn send_nat_traversal_challenge(
1379 &mut self,
1380 now: Instant,
1381 buf: &mut Vec<u8>,
1382 ) -> Option<Transmit> {
1383 if let Some(request) = self.send_coordination_request(now, buf) {
1385 return Some(request);
1386 }
1387
1388 if let Some(punch) = self.send_coordinated_path_challenge(now, buf) {
1390 return Some(punch);
1391 }
1392
1393 let (remote_addr, remote_sequence) = {
1395 let nat_traversal = self.nat_traversal.as_ref()?;
1396 let candidates = nat_traversal.get_validation_candidates();
1397 if candidates.is_empty() {
1398 return None;
1399 }
1400 let (sequence, candidate) = candidates[0];
1402 (candidate.address, sequence)
1403 };
1404
1405 let challenge = self.rng.r#gen::<u64>();
1406
1407 if let Err(e) =
1409 self.nat_traversal
1410 .as_mut()?
1411 .start_validation(remote_sequence, challenge, now)
1412 {
1413 warn!("Failed to start NAT traversal validation: {}", e);
1414 return None;
1415 }
1416
1417 debug_assert_eq!(
1418 self.highest_space,
1419 SpaceId::Data,
1420 "PATH_CHALLENGE queued without 1-RTT keys"
1421 );
1422
1423 buf.reserve(self.pqc_state.min_initial_size() as usize);
1424 let buf_capacity = buf.capacity();
1425
1426 let mut builder = PacketBuilder::new(
1428 now,
1429 SpaceId::Data,
1430 self.rem_cids.active(),
1431 buf,
1432 buf_capacity,
1433 0,
1434 false,
1435 self,
1436 )?;
1437
1438 trace!(
1439 "sending PATH_CHALLENGE {:08x} to NAT candidate {}",
1440 challenge, remote_addr
1441 );
1442 buf.write(frame::FrameType::PATH_CHALLENGE);
1443 buf.write(challenge);
1444 self.stats.frame_tx.path_challenge += 1;
1445
1446 builder.pad_to(self.pqc_state.min_initial_size());
1448
1449 builder.finish_and_track(now, self, None, buf);
1450
1451 Some(Transmit {
1452 destination: remote_addr,
1453 size: buf.len(),
1454 ecn: if self.path.sending_ecn {
1455 Some(EcnCodepoint::Ect0)
1456 } else {
1457 None
1458 },
1459 segment_size: None,
1460 src_ip: self.local_ip,
1461 })
1462 }
1463
1464 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1466 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1467 if !prev_path.challenge_pending {
1468 return None;
1469 }
1470 prev_path.challenge_pending = false;
1471 let token = prev_path
1472 .challenge
1473 .expect("previous path challenge pending without token");
1474 let destination = prev_path.remote;
1475 debug_assert_eq!(
1476 self.highest_space,
1477 SpaceId::Data,
1478 "PATH_CHALLENGE queued without 1-RTT keys"
1479 );
1480 buf.reserve(self.pqc_state.min_initial_size() as usize);
1481
1482 let buf_capacity = buf.capacity();
1483
1484 let mut builder = PacketBuilder::new(
1490 now,
1491 SpaceId::Data,
1492 *prev_cid,
1493 buf,
1494 buf_capacity,
1495 0,
1496 false,
1497 self,
1498 )?;
1499 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1500 buf.write(frame::FrameType::PATH_CHALLENGE);
1501 buf.write(token);
1502 self.stats.frame_tx.path_challenge += 1;
1503
1504 builder.pad_to(self.pqc_state.min_initial_size());
1509
1510 builder.finish(self, buf);
1511 self.stats.udp_tx.on_sent(1, buf.len());
1512
1513 Some(Transmit {
1514 destination,
1515 size: buf.len(),
1516 ecn: None,
1517 segment_size: None,
1518 src_ip: self.local_ip,
1519 })
1520 }
1521
1522 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1524 if self.spaces[space_id].crypto.is_none()
1525 && (space_id != SpaceId::Data
1526 || self.zero_rtt_crypto.is_none()
1527 || self.side.is_server())
1528 {
1529 return SendableFrames::empty();
1531 }
1532 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1533 if space_id == SpaceId::Data {
1534 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1535 }
1536 can_send
1537 }
1538
1539 pub fn handle_event(&mut self, event: ConnectionEvent) {
1545 use ConnectionEventInner::*;
1546 match event.0 {
1547 Datagram(DatagramConnectionEvent {
1548 now,
1549 remote,
1550 ecn,
1551 first_decode,
1552 remaining,
1553 }) => {
1554 if remote != self.path.remote && !self.side.remote_may_migrate() {
1558 trace!("discarding packet from unrecognized peer {}", remote);
1559 return;
1560 }
1561
1562 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1563
1564 self.stats.udp_rx.datagrams += 1;
1565 self.stats.udp_rx.bytes += first_decode.len() as u64;
1566 let data_len = first_decode.len();
1567
1568 self.handle_decode(now, remote, ecn, first_decode);
1569 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1574
1575 if let Some(data) = remaining {
1576 self.stats.udp_rx.bytes += data.len() as u64;
1577 self.handle_coalesced(now, remote, ecn, data);
1578 }
1579
1580 #[cfg(feature = "__qlog")]
1581 self.emit_qlog_recovery_metrics(now);
1582
1583 if was_anti_amplification_blocked {
1584 self.set_loss_detection_timer(now);
1588 }
1589 }
1590 NewIdentifiers(ids, now) => {
1591 self.local_cid_state.new_cids(&ids, now);
1592 ids.into_iter().rev().for_each(|frame| {
1593 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1594 });
1595 if self.timers.get(Timer::PushNewCid).is_none_or(|x| x <= now) {
1597 self.reset_cid_retirement();
1598 }
1599 }
1600 }
1601 }
1602
1603 pub fn handle_timeout(&mut self, now: Instant) {
1613 for &timer in &Timer::VALUES {
1614 if !self.timers.is_expired(timer, now) {
1615 continue;
1616 }
1617 self.timers.stop(timer);
1618 trace!(timer = ?timer, "timeout");
1619 match timer {
1620 Timer::Close => {
1621 self.state = State::Drained;
1622 self.endpoint_events.push_back(EndpointEventInner::Drained);
1623 }
1624 Timer::Idle => {
1625 self.kill(ConnectionError::TimedOut);
1626 }
1627 Timer::KeepAlive => {
1628 trace!("sending keep-alive");
1629 self.ping();
1630 }
1631 Timer::LossDetection => {
1632 self.on_loss_detection_timeout(now);
1633
1634 #[cfg(feature = "__qlog")]
1635 self.emit_qlog_recovery_metrics(now);
1636 }
1637 Timer::KeyDiscard => {
1638 self.zero_rtt_crypto = None;
1639 self.prev_crypto = None;
1640 }
1641 Timer::PathValidation => {
1642 debug!("path validation failed");
1643 if let Some((_, prev)) = self.prev_path.take() {
1644 self.path = prev;
1645 }
1646 self.path.challenge = None;
1647 self.path.challenge_pending = false;
1648 }
1649 Timer::Pacing => trace!("pacing timer expired"),
1650 Timer::NatTraversal => {
1651 self.handle_nat_traversal_timeout(now);
1652 }
1653 Timer::PushNewCid => {
1654 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1656 if !self.state.is_closed() {
1657 trace!(
1658 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1659 self.local_cid_state.retire_prior_to()
1660 );
1661 self.endpoint_events
1662 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1663 }
1664 }
1665 Timer::MaxAckDelay => {
1666 trace!("max ack delay reached");
1667 self.spaces[SpaceId::Data]
1669 .pending_acks
1670 .on_max_ack_delay_timeout()
1671 }
1672 }
1673 }
1674 }
1675
1676 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1688 self.close_inner(
1689 now,
1690 Close::Application(frame::ApplicationClose { error_code, reason }),
1691 )
1692 }
1693
1694 fn close_inner(&mut self, now: Instant, reason: Close) {
1695 let was_closed = self.state.is_closed();
1696 if !was_closed {
1697 self.close_common();
1698 self.set_close_timer(now);
1699 self.close = true;
1700 self.state = State::Closed(state::Closed { reason });
1701 }
1702 }
1703
1704 pub fn datagrams(&mut self) -> Datagrams<'_> {
1706 Datagrams { conn: self }
1707 }
1708
1709 pub fn stats(&self) -> ConnectionStats {
1711 let mut stats = self.stats;
1712 stats.path.rtt = self.path.rtt.get();
1713 stats.path.cwnd = self.path.congestion.window();
1714 stats.path.current_mtu = self.path.mtud.current_mtu();
1715
1716 stats
1717 }
1718
1719 pub fn ping(&mut self) {
1723 self.spaces[self.highest_space].ping_pending = true;
1724 }
1725
1726 pub fn force_key_update(&mut self) {
1730 if !self.state.is_established() {
1731 debug!("ignoring forced key update in illegal state");
1732 return;
1733 }
1734 if self.prev_crypto.is_some() {
1735 debug!("ignoring redundant forced key update");
1738 return;
1739 }
1740 self.update_keys(None, false);
1741 }
1742
1743 #[doc(hidden)]
1745 #[deprecated]
1746 pub fn initiate_key_update(&mut self) {
1747 self.force_key_update();
1748 }
1749
1750 pub fn crypto_session(&self) -> &dyn crypto::Session {
1752 &*self.crypto
1753 }
1754
1755 pub fn is_handshaking(&self) -> bool {
1760 self.state.is_handshake()
1761 }
1762
1763 pub fn is_closed(&self) -> bool {
1771 self.state.is_closed()
1772 }
1773
1774 pub fn is_drained(&self) -> bool {
1779 self.state.is_drained()
1780 }
1781
1782 pub fn accepted_0rtt(&self) -> bool {
1786 self.accepted_0rtt
1787 }
1788
1789 pub fn has_0rtt(&self) -> bool {
1791 self.zero_rtt_enabled
1792 }
1793
1794 pub fn has_pending_retransmits(&self) -> bool {
1796 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1797 }
1798
1799 pub fn side(&self) -> Side {
1801 self.side.side()
1802 }
1803
1804 pub fn remote_address(&self) -> SocketAddr {
1806 self.path.remote
1807 }
1808
1809 pub fn local_ip(&self) -> Option<IpAddr> {
1819 self.local_ip
1820 }
1821
1822 pub fn rtt(&self) -> Duration {
1824 self.path.rtt.get()
1825 }
1826
1827 pub fn congestion_state(&self) -> &dyn Controller {
1829 self.path.congestion.as_ref()
1830 }
1831
1832 pub fn path_changed(&mut self, now: Instant) {
1843 self.path.reset(now, &self.config);
1844 }
1845
1846 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1851 self.streams.set_max_concurrent(dir, count);
1852 let pending = &mut self.spaces[SpaceId::Data].pending;
1855 self.streams.queue_max_stream_id(pending);
1856 }
1857
1858 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1864 self.streams.max_concurrent(dir)
1865 }
1866
1867 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1869 if self.streams.set_receive_window(receive_window) {
1870 self.spaces[SpaceId::Data].pending.max_data = true;
1871 }
1872 }
1873
1874 pub fn set_address_discovery_enabled(&mut self, enabled: bool) {
1876 if let Some(ref mut state) = self.address_discovery_state {
1877 state.enabled = enabled;
1878 }
1879 }
1880
1881 pub fn address_discovery_enabled(&self) -> bool {
1883 self.address_discovery_state
1884 .as_ref()
1885 .is_some_and(|state| state.enabled)
1886 }
1887
1888 pub fn observed_address(&self) -> Option<SocketAddr> {
1893 self.address_discovery_state
1894 .as_ref()
1895 .and_then(|state| state.get_observed_address(0)) }
1897
1898 pub(crate) fn address_discovery_state(&self) -> Option<&AddressDiscoveryState> {
1900 self.address_discovery_state.as_ref()
1901 }
1902
1903 fn on_ack_received(
1904 &mut self,
1905 now: Instant,
1906 space: SpaceId,
1907 ack: frame::Ack,
1908 ) -> Result<(), TransportError> {
1909 if ack.largest >= self.spaces[space].next_packet_number {
1910 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1911 }
1912 let new_largest = {
1913 let space = &mut self.spaces[space];
1914 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
1915 space.largest_acked_packet = Some(ack.largest);
1916 if let Some(info) = space.sent_packets.get(&ack.largest) {
1917 space.largest_acked_packet_sent = info.time_sent;
1921 }
1922 true
1923 } else {
1924 false
1925 }
1926 };
1927
1928 let mut newly_acked = ArrayRangeSet::new();
1930 for range in ack.iter() {
1931 self.packet_number_filter.check_ack(space, range.clone())?;
1932 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1933 newly_acked.insert_one(pn);
1934 }
1935 }
1936
1937 if newly_acked.is_empty() {
1938 return Ok(());
1939 }
1940
1941 let mut ack_eliciting_acked = false;
1942 for packet in newly_acked.elts() {
1943 if let Some(info) = self.spaces[space].take(packet) {
1944 if let Some(acked) = info.largest_acked {
1945 self.spaces[space].pending_acks.subtract_below(acked);
1951 }
1952 ack_eliciting_acked |= info.ack_eliciting;
1953
1954 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1956 if mtu_updated {
1957 self.path
1958 .congestion
1959 .on_mtu_update(self.path.mtud.current_mtu());
1960 }
1961
1962 self.ack_frequency.on_acked(packet);
1964
1965 self.on_packet_acked(now, packet, info);
1966 }
1967 }
1968
1969 self.path.congestion.on_end_acks(
1970 now,
1971 self.path.in_flight.bytes,
1972 self.app_limited,
1973 self.spaces[space].largest_acked_packet,
1974 );
1975
1976 if new_largest && ack_eliciting_acked {
1977 let ack_delay = if space != SpaceId::Data {
1978 Duration::from_micros(0)
1979 } else {
1980 cmp::min(
1981 self.ack_frequency.peer_max_ack_delay,
1982 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1983 )
1984 };
1985 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1986 self.path.rtt.update(ack_delay, rtt);
1987 if self.path.first_packet_after_rtt_sample.is_none() {
1988 self.path.first_packet_after_rtt_sample =
1989 Some((space, self.spaces[space].next_packet_number));
1990 }
1991 }
1992
1993 self.detect_lost_packets(now, space, true);
1995
1996 if self.peer_completed_address_validation() {
1997 self.pto_count = 0;
1998 }
1999
2000 if self.path.sending_ecn {
2002 if let Some(ecn) = ack.ecn {
2003 if new_largest {
2008 let sent = self.spaces[space].largest_acked_packet_sent;
2009 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
2010 }
2011 } else {
2012 debug!("ECN not acknowledged by peer");
2014 self.path.sending_ecn = false;
2015 }
2016 }
2017
2018 self.set_loss_detection_timer(now);
2019 Ok(())
2020 }
2021
2022 fn process_ecn(
2024 &mut self,
2025 now: Instant,
2026 space: SpaceId,
2027 newly_acked: u64,
2028 ecn: frame::EcnCounts,
2029 largest_sent_time: Instant,
2030 ) {
2031 match self.spaces[space].detect_ecn(newly_acked, ecn) {
2032 Err(e) => {
2033 debug!("halting ECN due to verification failure: {}", e);
2034 self.path.sending_ecn = false;
2035 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
2038 }
2039 Ok(false) => {}
2040 Ok(true) => {
2041 self.stats.path.congestion_events += 1;
2042 self.path
2043 .congestion
2044 .on_congestion_event(now, largest_sent_time, false, 0);
2045 }
2046 }
2047 }
2048
2049 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
2052 self.remove_in_flight(pn, &info);
2053 if info.ack_eliciting && self.path.challenge.is_none() {
2054 self.path.congestion.on_ack(
2057 now,
2058 info.time_sent,
2059 info.size.into(),
2060 self.app_limited,
2061 &self.path.rtt,
2062 );
2063 }
2064
2065 if let Some(retransmits) = info.retransmits.get() {
2067 for (id, _) in retransmits.reset_stream.iter() {
2068 self.streams.reset_acked(*id);
2069 }
2070 }
2071
2072 for frame in info.stream_frames {
2073 self.streams.received_ack_of(frame);
2074 }
2075 }
2076
2077 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2078 let start = if self.zero_rtt_crypto.is_some() {
2079 now
2080 } else {
2081 self.prev_crypto
2082 .as_ref()
2083 .expect("no previous keys")
2084 .end_packet
2085 .as_ref()
2086 .expect("update not acknowledged yet")
2087 .1
2088 };
2089 self.timers
2090 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
2091 }
2092
2093 fn on_loss_detection_timeout(&mut self, now: Instant) {
2094 if let Some((_, pn_space)) = self.loss_time_and_space() {
2095 self.detect_lost_packets(now, pn_space, false);
2097 self.set_loss_detection_timer(now);
2098 return;
2099 }
2100
2101 let (_, space) = match self.pto_time_and_space(now) {
2102 Some(x) => x,
2103 None => {
2104 error!("PTO expired while unset");
2105 return;
2106 }
2107 };
2108 trace!(
2109 in_flight = self.path.in_flight.bytes,
2110 count = self.pto_count,
2111 ?space,
2112 "PTO fired"
2113 );
2114
2115 let count = match self.path.in_flight.ack_eliciting {
2116 0 => {
2119 debug_assert!(!self.peer_completed_address_validation());
2120 1
2121 }
2122 _ => 2,
2124 };
2125 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
2126 self.pto_count = self.pto_count.saturating_add(1);
2127 self.set_loss_detection_timer(now);
2128 }
2129
2130 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
2131 let mut lost_packets = Vec::<u64>::new();
2132 let mut lost_mtu_probe = None;
2133 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
2134 let rtt = self.path.rtt.conservative();
2135 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
2136
2137 let lost_send_time = now.checked_sub(loss_delay).unwrap();
2139 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
2140 let packet_threshold = self.config.packet_threshold as u64;
2141 let mut size_of_lost_packets = 0u64;
2142
2143 let congestion_period =
2147 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
2148 let mut persistent_congestion_start: Option<Instant> = None;
2149 let mut prev_packet = None;
2150 let mut in_persistent_congestion = false;
2151
2152 let space = &mut self.spaces[pn_space];
2153 space.loss_time = None;
2154
2155 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
2156 if prev_packet != Some(packet.wrapping_sub(1)) {
2157 persistent_congestion_start = None;
2159 }
2160
2161 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
2162 {
2163 if Some(packet) == in_flight_mtu_probe {
2164 lost_mtu_probe = in_flight_mtu_probe;
2167 } else {
2168 lost_packets.push(packet);
2169 size_of_lost_packets += info.size as u64;
2170 if info.ack_eliciting && due_to_ack {
2171 match persistent_congestion_start {
2172 Some(start) if info.time_sent - start > congestion_period => {
2175 in_persistent_congestion = true;
2176 }
2177 None if self
2179 .path
2180 .first_packet_after_rtt_sample
2181 .is_some_and(|x| x < (pn_space, packet)) =>
2182 {
2183 persistent_congestion_start = Some(info.time_sent);
2184 }
2185 _ => {}
2186 }
2187 }
2188 }
2189 } else {
2190 let next_loss_time = info.time_sent + loss_delay;
2191 space.loss_time = Some(
2192 space
2193 .loss_time
2194 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
2195 );
2196 persistent_congestion_start = None;
2197 }
2198
2199 prev_packet = Some(packet);
2200 }
2201
2202 if let Some(largest_lost) = lost_packets.last().cloned() {
2204 let old_bytes_in_flight = self.path.in_flight.bytes;
2205 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
2206 self.lost_packets += lost_packets.len() as u64;
2207 self.stats.path.lost_packets += lost_packets.len() as u64;
2208 self.stats.path.lost_bytes += size_of_lost_packets;
2209 trace!(
2210 "packets lost: {:?}, bytes lost: {}",
2211 lost_packets, size_of_lost_packets
2212 );
2213
2214 for &packet in &lost_packets {
2215 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2217 for frame in info.stream_frames {
2218 self.streams.retransmit(frame);
2219 }
2220 self.spaces[pn_space].pending |= info.retransmits;
2221 self.path.mtud.on_non_probe_lost(packet, info.size);
2222 }
2223
2224 if self.path.mtud.black_hole_detected(now) {
2225 self.stats.path.black_holes_detected += 1;
2226 self.path
2227 .congestion
2228 .on_mtu_update(self.path.mtud.current_mtu());
2229 if let Some(max_datagram_size) = self.datagrams().max_size() {
2230 self.datagrams.drop_oversized(max_datagram_size);
2231 }
2232 }
2233
2234 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
2236
2237 if lost_ack_eliciting {
2238 self.stats.path.congestion_events += 1;
2239 self.path.congestion.on_congestion_event(
2240 now,
2241 largest_lost_sent,
2242 in_persistent_congestion,
2243 size_of_lost_packets,
2244 );
2245 }
2246 }
2247
2248 if let Some(packet) = lost_mtu_probe {
2250 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2252 self.path.mtud.on_probe_lost();
2253 self.stats.path.lost_plpmtud_probes += 1;
2254 }
2255 }
2256
2257 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
2258 SpaceId::iter()
2259 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
2260 .min_by_key(|&(time, _)| time)
2261 }
2262
2263 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
2264 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
2265 let mut duration = self.path.rtt.pto_base() * backoff;
2266
2267 if self.path.in_flight.ack_eliciting == 0 {
2268 debug_assert!(!self.peer_completed_address_validation());
2269 let space = match self.highest_space {
2270 SpaceId::Handshake => SpaceId::Handshake,
2271 _ => SpaceId::Initial,
2272 };
2273 return Some((now + duration, space));
2274 }
2275
2276 let mut result = None;
2277 for space in SpaceId::iter() {
2278 if self.spaces[space].in_flight == 0 {
2279 continue;
2280 }
2281 if space == SpaceId::Data {
2282 if self.is_handshaking() {
2284 return result;
2285 }
2286 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2288 }
2289 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
2290 Some(time) => time,
2291 None => continue,
2292 };
2293 let pto = last_ack_eliciting + duration;
2294 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2295 result = Some((pto, space));
2296 }
2297 }
2298 result
2299 }
2300
2301 fn peer_completed_address_validation(&self) -> bool {
2302 if self.side.is_server() || self.state.is_closed() {
2303 return true;
2304 }
2305 self.spaces[SpaceId::Handshake]
2308 .largest_acked_packet
2309 .is_some()
2310 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
2311 || (self.spaces[SpaceId::Data].crypto.is_some()
2312 && self.spaces[SpaceId::Handshake].crypto.is_none())
2313 }
2314
2315 fn set_loss_detection_timer(&mut self, now: Instant) {
2316 if self.state.is_closed() {
2317 return;
2321 }
2322
2323 if let Some((loss_time, _)) = self.loss_time_and_space() {
2324 self.timers.set(Timer::LossDetection, loss_time);
2326 return;
2327 }
2328
2329 if self.path.anti_amplification_blocked(1) {
2330 self.timers.stop(Timer::LossDetection);
2332 return;
2333 }
2334
2335 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
2336 self.timers.stop(Timer::LossDetection);
2339 return;
2340 }
2341
2342 if let Some((timeout, _)) = self.pto_time_and_space(now) {
2345 self.timers.set(Timer::LossDetection, timeout);
2346 } else {
2347 self.timers.stop(Timer::LossDetection);
2348 }
2349 }
2350
2351 fn pto(&self, space: SpaceId) -> Duration {
2353 let max_ack_delay = match space {
2354 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
2355 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
2356 };
2357 self.path.rtt.pto_base() + max_ack_delay
2358 }
2359
2360 fn on_packet_authenticated(
2361 &mut self,
2362 now: Instant,
2363 space_id: SpaceId,
2364 ecn: Option<EcnCodepoint>,
2365 packet: Option<u64>,
2366 spin: bool,
2367 is_1rtt: bool,
2368 ) {
2369 self.total_authed_packets += 1;
2370 self.reset_keep_alive(now);
2371 self.reset_idle_timeout(now, space_id);
2372 self.permit_idle_reset = true;
2373 self.receiving_ecn |= ecn.is_some();
2374 if let Some(x) = ecn {
2375 let space = &mut self.spaces[space_id];
2376 space.ecn_counters += x;
2377
2378 if x.is_ce() {
2379 space.pending_acks.set_immediate_ack_required();
2380 }
2381 }
2382
2383 let packet = match packet {
2384 Some(x) => x,
2385 None => return,
2386 };
2387 if self.side.is_server() {
2388 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
2389 self.discard_space(now, SpaceId::Initial);
2391 }
2392 if self.zero_rtt_crypto.is_some() && is_1rtt {
2393 self.set_key_discard_timer(now, space_id)
2395 }
2396 }
2397 let space = &mut self.spaces[space_id];
2398 space.pending_acks.insert_one(packet, now);
2399 if packet >= space.rx_packet {
2400 space.rx_packet = packet;
2401 self.spin = self.side.is_client() ^ spin;
2403 }
2404 }
2405
2406 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
2407 let timeout = match self.idle_timeout {
2408 None => return,
2409 Some(dur) => dur,
2410 };
2411 if self.state.is_closed() {
2412 self.timers.stop(Timer::Idle);
2413 return;
2414 }
2415 let dt = cmp::max(timeout, 3 * self.pto(space));
2416 self.timers.set(Timer::Idle, now + dt);
2417 }
2418
2419 fn reset_keep_alive(&mut self, now: Instant) {
2420 let interval = match self.config.keep_alive_interval {
2421 Some(x) if self.state.is_established() => x,
2422 _ => return,
2423 };
2424 self.timers.set(Timer::KeepAlive, now + interval);
2425 }
2426
2427 fn reset_cid_retirement(&mut self) {
2428 if let Some(t) = self.local_cid_state.next_timeout() {
2429 self.timers.set(Timer::PushNewCid, t);
2430 }
2431 }
2432
2433 pub(crate) fn handle_first_packet(
2438 &mut self,
2439 now: Instant,
2440 remote: SocketAddr,
2441 ecn: Option<EcnCodepoint>,
2442 packet_number: u64,
2443 packet: InitialPacket,
2444 remaining: Option<BytesMut>,
2445 ) -> Result<(), ConnectionError> {
2446 let span = trace_span!("first recv");
2447 let _guard = span.enter();
2448 debug_assert!(self.side.is_server());
2449 let len = packet.header_data.len() + packet.payload.len();
2450 self.path.total_recvd = len as u64;
2451
2452 match self.state {
2453 State::Handshake(ref mut state) => {
2454 state.expected_token = packet.header.token.clone();
2455 }
2456 _ => unreachable!("first packet must be delivered in Handshake state"),
2457 }
2458
2459 self.on_packet_authenticated(
2460 now,
2461 SpaceId::Initial,
2462 ecn,
2463 Some(packet_number),
2464 false,
2465 false,
2466 );
2467
2468 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2469 if let Some(data) = remaining {
2470 self.handle_coalesced(now, remote, ecn, data);
2471 }
2472
2473 #[cfg(feature = "__qlog")]
2474 self.emit_qlog_recovery_metrics(now);
2475
2476 Ok(())
2477 }
2478
2479 fn init_0rtt(&mut self) {
2480 let (header, packet) = match self.crypto.early_crypto() {
2481 Some(x) => x,
2482 None => return,
2483 };
2484 if self.side.is_client() {
2485 match self.crypto.transport_parameters() {
2486 Ok(params) => {
2487 let params = params
2488 .expect("crypto layer didn't supply transport parameters with ticket");
2489 let params = TransportParameters {
2491 initial_src_cid: None,
2492 original_dst_cid: None,
2493 preferred_address: None,
2494 retry_src_cid: None,
2495 stateless_reset_token: None,
2496 min_ack_delay: None,
2497 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2498 max_ack_delay: TransportParameters::default().max_ack_delay,
2499 ..params
2500 };
2501 self.set_peer_params(params);
2502 }
2503 Err(e) => {
2504 error!("session ticket has malformed transport parameters: {}", e);
2505 return;
2506 }
2507 }
2508 }
2509 trace!("0-RTT enabled");
2510 self.zero_rtt_enabled = true;
2511 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2512 }
2513
2514 fn read_crypto(
2515 &mut self,
2516 space: SpaceId,
2517 crypto: &frame::Crypto,
2518 payload_len: usize,
2519 ) -> Result<(), TransportError> {
2520 let expected = if !self.state.is_handshake() {
2521 SpaceId::Data
2522 } else if self.highest_space == SpaceId::Initial {
2523 SpaceId::Initial
2524 } else {
2525 SpaceId::Handshake
2528 };
2529 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2533
2534 let end = crypto.offset + crypto.data.len() as u64;
2535 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2536 warn!(
2537 "received new {:?} CRYPTO data when expecting {:?}",
2538 space, expected
2539 );
2540 return Err(TransportError::PROTOCOL_VIOLATION(
2541 "new data at unexpected encryption level",
2542 ));
2543 }
2544
2545 self.pqc_state.detect_pqc_from_crypto(&crypto.data, space);
2547
2548 if self.pqc_state.should_trigger_mtu_discovery() {
2550 self.path
2552 .mtud
2553 .reset(self.pqc_state.min_initial_size(), self.config.min_mtu);
2554 trace!("Triggered MTU discovery for PQC handshake");
2555 }
2556
2557 let space = &mut self.spaces[space];
2558 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2559 if max > self.config.crypto_buffer_size as u64 {
2560 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2561 }
2562
2563 space
2564 .crypto_stream
2565 .insert(crypto.offset, crypto.data.clone(), payload_len);
2566 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2567 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2568 if self.crypto.read_handshake(&chunk.bytes)? {
2569 self.events.push_back(Event::HandshakeDataReady);
2570 }
2571 }
2572
2573 Ok(())
2574 }
2575
2576 fn write_crypto(&mut self) {
2577 loop {
2578 let space = self.highest_space;
2579 let mut outgoing = Vec::new();
2580 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2581 match space {
2582 SpaceId::Initial => {
2583 self.upgrade_crypto(SpaceId::Handshake, crypto);
2584 }
2585 SpaceId::Handshake => {
2586 self.upgrade_crypto(SpaceId::Data, crypto);
2587 }
2588 _ => unreachable!("got updated secrets during 1-RTT"),
2589 }
2590 }
2591 if outgoing.is_empty() {
2592 if space == self.highest_space {
2593 break;
2594 } else {
2595 continue;
2597 }
2598 }
2599 let offset = self.spaces[space].crypto_offset;
2600 let outgoing = Bytes::from(outgoing);
2601 if let State::Handshake(ref mut state) = self.state {
2602 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2603 state.client_hello = Some(outgoing.clone());
2604 }
2605 }
2606 self.spaces[space].crypto_offset += outgoing.len() as u64;
2607 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2608
2609 if self.pqc_state.using_pqc && outgoing.len() > 1200 {
2611 let frames = self.pqc_state.packet_handler.fragment_crypto_data(
2613 &outgoing,
2614 offset,
2615 self.pqc_state.min_initial_size() as usize,
2616 );
2617 for frame in frames {
2618 self.spaces[space].pending.crypto.push_back(frame);
2619 }
2620 } else {
2621 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2623 offset,
2624 data: outgoing,
2625 });
2626 }
2627 }
2628 }
2629
2630 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2632 debug_assert!(
2633 self.spaces[space].crypto.is_none(),
2634 "already reached packet space {space:?}"
2635 );
2636 trace!("{:?} keys ready", space);
2637 if space == SpaceId::Data {
2638 self.next_crypto = Some(
2640 self.crypto
2641 .next_1rtt_keys()
2642 .expect("handshake should be complete"),
2643 );
2644 }
2645
2646 self.spaces[space].crypto = Some(crypto);
2647 debug_assert!(space as usize > self.highest_space as usize);
2648 self.highest_space = space;
2649 if space == SpaceId::Data && self.side.is_client() {
2650 self.zero_rtt_crypto = None;
2652 }
2653 }
2654
2655 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2656 debug_assert!(space_id != SpaceId::Data);
2657 trace!("discarding {:?} keys", space_id);
2658 if space_id == SpaceId::Initial {
2659 if let ConnectionSide::Client { token, .. } = &mut self.side {
2661 *token = Bytes::new();
2662 }
2663 }
2664 let space = &mut self.spaces[space_id];
2665 space.crypto = None;
2666 space.time_of_last_ack_eliciting_packet = None;
2667 space.loss_time = None;
2668 space.in_flight = 0;
2669 let sent_packets = mem::take(&mut space.sent_packets);
2670 for (pn, packet) in sent_packets.into_iter() {
2671 self.remove_in_flight(pn, &packet);
2672 }
2673 self.set_loss_detection_timer(now)
2674 }
2675
2676 fn handle_coalesced(
2677 &mut self,
2678 now: Instant,
2679 remote: SocketAddr,
2680 ecn: Option<EcnCodepoint>,
2681 data: BytesMut,
2682 ) {
2683 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2684 let mut remaining = Some(data);
2685 while let Some(data) = remaining {
2686 match PartialDecode::new(
2687 data,
2688 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2689 &[self.version],
2690 self.endpoint_config.grease_quic_bit,
2691 ) {
2692 Ok((partial_decode, rest)) => {
2693 remaining = rest;
2694 self.handle_decode(now, remote, ecn, partial_decode);
2695 }
2696 Err(e) => {
2697 trace!("malformed header: {}", e);
2698 return;
2699 }
2700 }
2701 }
2702 }
2703
2704 fn handle_decode(
2705 &mut self,
2706 now: Instant,
2707 remote: SocketAddr,
2708 ecn: Option<EcnCodepoint>,
2709 partial_decode: PartialDecode,
2710 ) {
2711 if let Some(decoded) = packet_crypto::unprotect_header(
2712 partial_decode,
2713 &self.spaces,
2714 self.zero_rtt_crypto.as_ref(),
2715 self.peer_params.stateless_reset_token,
2716 ) {
2717 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2718 }
2719 }
2720
2721 fn handle_packet(
2722 &mut self,
2723 now: Instant,
2724 remote: SocketAddr,
2725 ecn: Option<EcnCodepoint>,
2726 packet: Option<Packet>,
2727 stateless_reset: bool,
2728 ) {
2729 self.stats.udp_rx.ios += 1;
2730 if let Some(ref packet) = packet {
2731 trace!(
2732 "got {:?} packet ({} bytes) from {} using id {}",
2733 packet.header.space(),
2734 packet.payload.len() + packet.header_data.len(),
2735 remote,
2736 packet.header.dst_cid(),
2737 );
2738
2739 #[cfg(feature = "trace")]
2741 {
2742 use crate::trace_packet_received;
2743 let packet_size = packet.payload.len() + packet.header_data.len();
2745 trace_packet_received!(
2746 &self.event_log,
2747 self.trace_context.trace_id(),
2748 packet_size as u32,
2749 0 );
2751 }
2752 }
2753
2754 if self.is_handshaking() && remote != self.path.remote {
2755 debug!("discarding packet with unexpected remote during handshake");
2756 return;
2757 }
2758
2759 let was_closed = self.state.is_closed();
2760 let was_drained = self.state.is_drained();
2761
2762 let decrypted = match packet {
2763 None => Err(None),
2764 Some(mut packet) => self
2765 .decrypt_packet(now, &mut packet)
2766 .map(move |number| (packet, number)),
2767 };
2768 let result = match decrypted {
2769 _ if stateless_reset => {
2770 debug!("got stateless reset");
2771 Err(ConnectionError::Reset)
2772 }
2773 Err(Some(e)) => {
2774 warn!("illegal packet: {}", e);
2775 Err(e.into())
2776 }
2777 Err(None) => {
2778 debug!("failed to authenticate packet");
2779 self.authentication_failures += 1;
2780 let integrity_limit = self.spaces[self.highest_space]
2781 .crypto
2782 .as_ref()
2783 .unwrap()
2784 .packet
2785 .local
2786 .integrity_limit();
2787 if self.authentication_failures > integrity_limit {
2788 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2789 } else {
2790 return;
2791 }
2792 }
2793 Ok((packet, number)) => {
2794 let span = match number {
2795 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2796 None => trace_span!("recv", space = ?packet.header.space()),
2797 };
2798 let _guard = span.enter();
2799
2800 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2801 if number.is_some_and(is_duplicate) {
2802 debug!("discarding possible duplicate packet");
2803 return;
2804 } else if self.state.is_handshake() && packet.header.is_short() {
2805 trace!("dropping short packet during handshake");
2807 return;
2808 } else {
2809 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2810 if let State::Handshake(ref hs) = self.state {
2811 if self.side.is_server() && token != &hs.expected_token {
2812 warn!("discarding Initial with invalid retry token");
2816 return;
2817 }
2818 }
2819 }
2820
2821 if !self.state.is_closed() {
2822 let spin = match packet.header {
2823 Header::Short { spin, .. } => spin,
2824 _ => false,
2825 };
2826 self.on_packet_authenticated(
2827 now,
2828 packet.header.space(),
2829 ecn,
2830 number,
2831 spin,
2832 packet.header.is_1rtt(),
2833 );
2834 }
2835
2836 self.process_decrypted_packet(now, remote, number, packet)
2837 }
2838 }
2839 };
2840
2841 if let Err(conn_err) = result {
2843 self.error = Some(conn_err.clone());
2844 self.state = match conn_err {
2845 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2846 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2847 ConnectionError::Reset
2848 | ConnectionError::TransportError(TransportError {
2849 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2850 ..
2851 }) => State::Drained,
2852 ConnectionError::TimedOut => {
2853 unreachable!("timeouts aren't generated by packet processing");
2854 }
2855 ConnectionError::TransportError(err) => {
2856 debug!("closing connection due to transport error: {}", err);
2857 State::closed(err)
2858 }
2859 ConnectionError::VersionMismatch => State::Draining,
2860 ConnectionError::LocallyClosed => {
2861 unreachable!("LocallyClosed isn't generated by packet processing");
2862 }
2863 ConnectionError::CidsExhausted => {
2864 unreachable!("CidsExhausted isn't generated by packet processing");
2865 }
2866 };
2867 }
2868
2869 if !was_closed && self.state.is_closed() {
2870 self.close_common();
2871 if !self.state.is_drained() {
2872 self.set_close_timer(now);
2873 }
2874 }
2875 if !was_drained && self.state.is_drained() {
2876 self.endpoint_events.push_back(EndpointEventInner::Drained);
2877 self.timers.stop(Timer::Close);
2880 }
2881
2882 if let State::Closed(_) = self.state {
2884 self.close = remote == self.path.remote;
2885 }
2886 }
2887
2888 fn process_decrypted_packet(
2889 &mut self,
2890 now: Instant,
2891 remote: SocketAddr,
2892 number: Option<u64>,
2893 packet: Packet,
2894 ) -> Result<(), ConnectionError> {
2895 let state = match self.state {
2896 State::Established => {
2897 match packet.header.space() {
2898 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2899 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2900 _ => {
2901 trace!("discarding unexpected pre-handshake packet");
2902 }
2903 }
2904 return Ok(());
2905 }
2906 State::Closed(_) => {
2907 for result in frame::Iter::new(packet.payload.freeze())? {
2908 let frame = match result {
2909 Ok(frame) => frame,
2910 Err(err) => {
2911 debug!("frame decoding error: {err:?}");
2912 continue;
2913 }
2914 };
2915
2916 if let Frame::Padding = frame {
2917 continue;
2918 };
2919
2920 self.stats.frame_rx.record(&frame);
2921
2922 if let Frame::Close(_) = frame {
2923 trace!("draining");
2924 self.state = State::Draining;
2925 break;
2926 }
2927 }
2928 return Ok(());
2929 }
2930 State::Draining | State::Drained => return Ok(()),
2931 State::Handshake(ref mut state) => state,
2932 };
2933
2934 match packet.header {
2935 Header::Retry {
2936 src_cid: rem_cid, ..
2937 } => {
2938 if self.side.is_server() {
2939 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2940 }
2941
2942 if self.total_authed_packets > 1
2943 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2945 &self.rem_cids.active(),
2946 &packet.header_data,
2947 &packet.payload,
2948 )
2949 {
2950 trace!("discarding invalid Retry");
2951 return Ok(());
2959 }
2960
2961 trace!("retrying with CID {}", rem_cid);
2962 let client_hello = state.client_hello.take().unwrap();
2963 self.retry_src_cid = Some(rem_cid);
2964 self.rem_cids.update_initial_cid(rem_cid);
2965 self.rem_handshake_cid = rem_cid;
2966
2967 let space = &mut self.spaces[SpaceId::Initial];
2968 if let Some(info) = space.take(0) {
2969 self.on_packet_acked(now, 0, info);
2970 };
2971
2972 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
2974 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2975 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2976 crypto_offset: client_hello.len() as u64,
2977 ..PacketSpace::new(now)
2978 };
2979 self.spaces[SpaceId::Initial]
2980 .pending
2981 .crypto
2982 .push_back(frame::Crypto {
2983 offset: 0,
2984 data: client_hello,
2985 });
2986
2987 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2989 for (pn, info) in zero_rtt {
2990 self.remove_in_flight(pn, &info);
2991 self.spaces[SpaceId::Data].pending |= info.retransmits;
2992 }
2993 self.streams.retransmit_all_for_0rtt();
2994
2995 let token_len = packet.payload.len() - 16;
2996 let ConnectionSide::Client { ref mut token, .. } = self.side else {
2997 unreachable!("we already short-circuited if we're server");
2998 };
2999 *token = packet.payload.freeze().split_to(token_len);
3000 self.state = State::Handshake(state::Handshake {
3001 expected_token: Bytes::new(),
3002 rem_cid_set: false,
3003 client_hello: None,
3004 });
3005 Ok(())
3006 }
3007 Header::Long {
3008 ty: LongType::Handshake,
3009 src_cid: rem_cid,
3010 ..
3011 } => {
3012 if rem_cid != self.rem_handshake_cid {
3013 debug!(
3014 "discarding packet with mismatched remote CID: {} != {}",
3015 self.rem_handshake_cid, rem_cid
3016 );
3017 return Ok(());
3018 }
3019 self.on_path_validated();
3020
3021 self.process_early_payload(now, packet)?;
3022 if self.state.is_closed() {
3023 return Ok(());
3024 }
3025
3026 if self.crypto.is_handshaking() {
3027 trace!("handshake ongoing");
3028 return Ok(());
3029 }
3030
3031 if self.side.is_client() {
3032 let params =
3034 self.crypto
3035 .transport_parameters()?
3036 .ok_or_else(|| TransportError {
3037 code: TransportErrorCode::crypto(0x6d),
3038 frame: None,
3039 reason: "transport parameters missing".into(),
3040 })?;
3041
3042 if self.has_0rtt() {
3043 if !self.crypto.early_data_accepted().unwrap() {
3044 debug_assert!(self.side.is_client());
3045 debug!("0-RTT rejected");
3046 self.accepted_0rtt = false;
3047 self.streams.zero_rtt_rejected();
3048
3049 self.spaces[SpaceId::Data].pending = Retransmits::default();
3051
3052 let sent_packets =
3054 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
3055 for (pn, packet) in sent_packets {
3056 self.remove_in_flight(pn, &packet);
3057 }
3058 } else {
3059 self.accepted_0rtt = true;
3060 params.validate_resumption_from(&self.peer_params)?;
3061 }
3062 }
3063 if let Some(token) = params.stateless_reset_token {
3064 self.endpoint_events
3065 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
3066 }
3067 self.handle_peer_params(params)?;
3068 self.issue_first_cids(now);
3069 } else {
3070 self.spaces[SpaceId::Data].pending.handshake_done = true;
3072 self.discard_space(now, SpaceId::Handshake);
3073 }
3074
3075 self.events.push_back(Event::Connected);
3076 self.state = State::Established;
3077 trace!("established");
3078 Ok(())
3079 }
3080 Header::Initial(InitialHeader {
3081 src_cid: rem_cid, ..
3082 }) => {
3083 if !state.rem_cid_set {
3084 trace!("switching remote CID to {}", rem_cid);
3085 let mut state = state.clone();
3086 self.rem_cids.update_initial_cid(rem_cid);
3087 self.rem_handshake_cid = rem_cid;
3088 self.orig_rem_cid = rem_cid;
3089 state.rem_cid_set = true;
3090 self.state = State::Handshake(state);
3091 } else if rem_cid != self.rem_handshake_cid {
3092 debug!(
3093 "discarding packet with mismatched remote CID: {} != {}",
3094 self.rem_handshake_cid, rem_cid
3095 );
3096 return Ok(());
3097 }
3098
3099 let starting_space = self.highest_space;
3100 self.process_early_payload(now, packet)?;
3101
3102 if self.side.is_server()
3103 && starting_space == SpaceId::Initial
3104 && self.highest_space != SpaceId::Initial
3105 {
3106 let params =
3107 self.crypto
3108 .transport_parameters()?
3109 .ok_or_else(|| TransportError {
3110 code: TransportErrorCode::crypto(0x6d),
3111 frame: None,
3112 reason: "transport parameters missing".into(),
3113 })?;
3114 self.handle_peer_params(params)?;
3115 self.issue_first_cids(now);
3116 self.init_0rtt();
3117 }
3118 Ok(())
3119 }
3120 Header::Long {
3121 ty: LongType::ZeroRtt,
3122 ..
3123 } => {
3124 self.process_payload(now, remote, number.unwrap(), packet)?;
3125 Ok(())
3126 }
3127 Header::VersionNegotiate { .. } => {
3128 if self.total_authed_packets > 1 {
3129 return Ok(());
3130 }
3131 let supported = packet
3132 .payload
3133 .chunks(4)
3134 .any(|x| match <[u8; 4]>::try_from(x) {
3135 Ok(version) => self.version == u32::from_be_bytes(version),
3136 Err(_) => false,
3137 });
3138 if supported {
3139 return Ok(());
3140 }
3141 debug!("remote doesn't support our version");
3142 Err(ConnectionError::VersionMismatch)
3143 }
3144 Header::Short { .. } => unreachable!(
3145 "short packets received during handshake are discarded in handle_packet"
3146 ),
3147 }
3148 }
3149
3150 fn process_early_payload(
3152 &mut self,
3153 now: Instant,
3154 packet: Packet,
3155 ) -> Result<(), TransportError> {
3156 debug_assert_ne!(packet.header.space(), SpaceId::Data);
3157 let payload_len = packet.payload.len();
3158 let mut ack_eliciting = false;
3159 for result in frame::Iter::new(packet.payload.freeze())? {
3160 let frame = result?;
3161 let span = match frame {
3162 Frame::Padding => continue,
3163 _ => Some(trace_span!("frame", ty = %frame.ty())),
3164 };
3165
3166 self.stats.frame_rx.record(&frame);
3167
3168 let _guard = span.as_ref().map(|x| x.enter());
3169 ack_eliciting |= frame.is_ack_eliciting();
3170
3171 match frame {
3173 Frame::Padding | Frame::Ping => {}
3174 Frame::Crypto(frame) => {
3175 self.read_crypto(packet.header.space(), &frame, payload_len)?;
3176 }
3177 Frame::Ack(ack) => {
3178 self.on_ack_received(now, packet.header.space(), ack)?;
3179 }
3180 Frame::Close(reason) => {
3181 self.error = Some(reason.into());
3182 self.state = State::Draining;
3183 return Ok(());
3184 }
3185 _ => {
3186 let mut err =
3187 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
3188 err.frame = Some(frame.ty());
3189 return Err(err);
3190 }
3191 }
3192 }
3193
3194 if ack_eliciting {
3195 self.spaces[packet.header.space()]
3197 .pending_acks
3198 .set_immediate_ack_required();
3199 }
3200
3201 self.write_crypto();
3202 Ok(())
3203 }
3204
3205 fn process_payload(
3206 &mut self,
3207 now: Instant,
3208 remote: SocketAddr,
3209 number: u64,
3210 packet: Packet,
3211 ) -> Result<(), TransportError> {
3212 let payload = packet.payload.freeze();
3213 let mut is_probing_packet = true;
3214 let mut close = None;
3215 let payload_len = payload.len();
3216 let mut ack_eliciting = false;
3217 for result in frame::Iter::new(payload)? {
3218 let frame = result?;
3219 let span = match frame {
3220 Frame::Padding => continue,
3221 _ => Some(trace_span!("frame", ty = %frame.ty())),
3222 };
3223
3224 self.stats.frame_rx.record(&frame);
3225 match &frame {
3228 Frame::Crypto(f) => {
3229 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
3230 }
3231 Frame::Stream(f) => {
3232 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
3233 }
3234 Frame::Datagram(f) => {
3235 trace!(len = f.data.len(), "got datagram frame");
3236 }
3237 f => {
3238 trace!("got frame {:?}", f);
3239 }
3240 }
3241
3242 let _guard = span.as_ref().map(|x| x.enter());
3243 if packet.header.is_0rtt() {
3244 match frame {
3245 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
3246 return Err(TransportError::PROTOCOL_VIOLATION(
3247 "illegal frame type in 0-RTT",
3248 ));
3249 }
3250 _ => {}
3251 }
3252 }
3253 ack_eliciting |= frame.is_ack_eliciting();
3254
3255 match frame {
3257 Frame::Padding
3258 | Frame::PathChallenge(_)
3259 | Frame::PathResponse(_)
3260 | Frame::NewConnectionId(_) => {}
3261 _ => {
3262 is_probing_packet = false;
3263 }
3264 }
3265 match frame {
3266 Frame::Crypto(frame) => {
3267 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
3268 }
3269 Frame::Stream(frame) => {
3270 if self.streams.received(frame, payload_len)?.should_transmit() {
3271 self.spaces[SpaceId::Data].pending.max_data = true;
3272 }
3273 }
3274 Frame::Ack(ack) => {
3275 self.on_ack_received(now, SpaceId::Data, ack)?;
3276 }
3277 Frame::Padding | Frame::Ping => {}
3278 Frame::Close(reason) => {
3279 close = Some(reason);
3280 }
3281 Frame::PathChallenge(token) => {
3282 self.path_responses.push(number, token, remote);
3283 if remote == self.path.remote {
3284 match self.peer_supports_ack_frequency() {
3287 true => self.immediate_ack(),
3288 false => self.ping(),
3289 }
3290 }
3291 }
3292 Frame::PathResponse(token) => {
3293 if self.path.challenge == Some(token) && remote == self.path.remote {
3294 trace!("new path validated");
3295 self.timers.stop(Timer::PathValidation);
3296 self.path.challenge = None;
3297 self.path.validated = true;
3298 if let Some((_, ref mut prev_path)) = self.prev_path {
3299 prev_path.challenge = None;
3300 prev_path.challenge_pending = false;
3301 }
3302 self.on_path_validated();
3303 } else if let Some(nat_traversal) = &mut self.nat_traversal {
3304 match nat_traversal.handle_validation_success(remote, token, now) {
3306 Ok(sequence) => {
3307 trace!(
3308 "NAT traversal candidate {} validated for sequence {}",
3309 remote, sequence
3310 );
3311
3312 if nat_traversal.handle_coordination_success(remote, now) {
3314 trace!("Coordination succeeded via {}", remote);
3315
3316 let can_migrate = match &self.side {
3318 ConnectionSide::Client { .. } => true, ConnectionSide::Server { server_config } => {
3320 server_config.migration
3321 }
3322 };
3323
3324 if can_migrate {
3325 let best_pairs = nat_traversal.get_best_succeeded_pairs();
3327 if let Some(best) = best_pairs.first() {
3328 if best.remote_addr == remote
3329 && best.remote_addr != self.path.remote
3330 {
3331 debug!(
3332 "NAT traversal found better path, initiating migration"
3333 );
3334 if let Err(e) =
3336 self.migrate_to_nat_traversal_path(now)
3337 {
3338 warn!(
3339 "Failed to migrate to NAT traversal path: {:?}",
3340 e
3341 );
3342 }
3343 }
3344 }
3345 }
3346 } else {
3347 if nat_traversal.mark_pair_succeeded(remote) {
3349 trace!("NAT traversal pair succeeded for {}", remote);
3350 }
3351 }
3352 }
3353 Err(NatTraversalError::ChallengeMismatch) => {
3354 debug!(
3355 "PATH_RESPONSE challenge mismatch for NAT candidate {}",
3356 remote
3357 );
3358 }
3359 Err(e) => {
3360 debug!("NAT traversal validation error: {}", e);
3361 }
3362 }
3363 } else {
3364 debug!(token, "ignoring invalid PATH_RESPONSE");
3365 }
3366 }
3367 Frame::MaxData(bytes) => {
3368 self.streams.received_max_data(bytes);
3369 }
3370 Frame::MaxStreamData { id, offset } => {
3371 self.streams.received_max_stream_data(id, offset)?;
3372 }
3373 Frame::MaxStreams { dir, count } => {
3374 self.streams.received_max_streams(dir, count)?;
3375 }
3376 Frame::ResetStream(frame) => {
3377 if self.streams.received_reset(frame)?.should_transmit() {
3378 self.spaces[SpaceId::Data].pending.max_data = true;
3379 }
3380 }
3381 Frame::DataBlocked { offset } => {
3382 debug!(offset, "peer claims to be blocked at connection level");
3383 }
3384 Frame::StreamDataBlocked { id, offset } => {
3385 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
3386 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
3387 return Err(TransportError::STREAM_STATE_ERROR(
3388 "STREAM_DATA_BLOCKED on send-only stream",
3389 ));
3390 }
3391 debug!(
3392 stream = %id,
3393 offset, "peer claims to be blocked at stream level"
3394 );
3395 }
3396 Frame::StreamsBlocked { dir, limit } => {
3397 if limit > MAX_STREAM_COUNT {
3398 return Err(TransportError::FRAME_ENCODING_ERROR(
3399 "unrepresentable stream limit",
3400 ));
3401 }
3402 debug!(
3403 "peer claims to be blocked opening more than {} {} streams",
3404 limit, dir
3405 );
3406 }
3407 Frame::StopSending(frame::StopSending { id, error_code }) => {
3408 if id.initiator() != self.side.side() {
3409 if id.dir() == Dir::Uni {
3410 debug!("got STOP_SENDING on recv-only {}", id);
3411 return Err(TransportError::STREAM_STATE_ERROR(
3412 "STOP_SENDING on recv-only stream",
3413 ));
3414 }
3415 } else if self.streams.is_local_unopened(id) {
3416 return Err(TransportError::STREAM_STATE_ERROR(
3417 "STOP_SENDING on unopened stream",
3418 ));
3419 }
3420 self.streams.received_stop_sending(id, error_code);
3421 }
3422 Frame::RetireConnectionId { sequence } => {
3423 let allow_more_cids = self
3424 .local_cid_state
3425 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
3426 self.endpoint_events
3427 .push_back(EndpointEventInner::RetireConnectionId(
3428 now,
3429 sequence,
3430 allow_more_cids,
3431 ));
3432 }
3433 Frame::NewConnectionId(frame) => {
3434 trace!(
3435 sequence = frame.sequence,
3436 id = %frame.id,
3437 retire_prior_to = frame.retire_prior_to,
3438 );
3439 if self.rem_cids.active().is_empty() {
3440 return Err(TransportError::PROTOCOL_VIOLATION(
3441 "NEW_CONNECTION_ID when CIDs aren't in use",
3442 ));
3443 }
3444 if frame.retire_prior_to > frame.sequence {
3445 return Err(TransportError::PROTOCOL_VIOLATION(
3446 "NEW_CONNECTION_ID retiring unissued CIDs",
3447 ));
3448 }
3449
3450 use crate::cid_queue::InsertError;
3451 match self.rem_cids.insert(frame) {
3452 Ok(None) => {}
3453 Ok(Some((retired, reset_token))) => {
3454 let pending_retired =
3455 &mut self.spaces[SpaceId::Data].pending.retire_cids;
3456 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
3459 if (pending_retired.len() as u64)
3462 .saturating_add(retired.end.saturating_sub(retired.start))
3463 > MAX_PENDING_RETIRED_CIDS
3464 {
3465 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
3466 "queued too many retired CIDs",
3467 ));
3468 }
3469 pending_retired.extend(retired);
3470 self.set_reset_token(reset_token);
3471 }
3472 Err(InsertError::ExceedsLimit) => {
3473 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
3474 }
3475 Err(InsertError::Retired) => {
3476 trace!("discarding already-retired");
3477 self.spaces[SpaceId::Data]
3481 .pending
3482 .retire_cids
3483 .push(frame.sequence);
3484 continue;
3485 }
3486 };
3487
3488 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
3489 self.update_rem_cid();
3492 }
3493 }
3494 Frame::NewToken(NewToken { token }) => {
3495 let ConnectionSide::Client {
3496 token_store,
3497 server_name,
3498 ..
3499 } = &self.side
3500 else {
3501 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
3502 };
3503 if token.is_empty() {
3504 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
3505 }
3506 trace!("got new token");
3507 token_store.insert(server_name, token);
3508 }
3509 Frame::Datagram(datagram) => {
3510 if self
3511 .datagrams
3512 .received(datagram, &self.config.datagram_receive_buffer_size)?
3513 {
3514 self.events.push_back(Event::DatagramReceived);
3515 }
3516 }
3517 Frame::AckFrequency(ack_frequency) => {
3518 let space = &mut self.spaces[SpaceId::Data];
3520
3521 if !self
3522 .ack_frequency
3523 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3524 {
3525 continue;
3527 }
3528
3529 if let Some(timeout) = space
3532 .pending_acks
3533 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3534 {
3535 self.timers.set(Timer::MaxAckDelay, timeout);
3536 }
3537 }
3538 Frame::ImmediateAck => {
3539 self.spaces[SpaceId::Data]
3541 .pending_acks
3542 .set_immediate_ack_required();
3543 }
3544 Frame::HandshakeDone => {
3545 if self.side.is_server() {
3546 return Err(TransportError::PROTOCOL_VIOLATION(
3547 "client sent HANDSHAKE_DONE",
3548 ));
3549 }
3550 if self.spaces[SpaceId::Handshake].crypto.is_some() {
3551 self.discard_space(now, SpaceId::Handshake);
3552 }
3553 }
3554 Frame::AddAddress(add_address) => {
3555 self.handle_add_address(&add_address, now)?;
3556 }
3557 Frame::PunchMeNow(punch_me_now) => {
3558 self.handle_punch_me_now(&punch_me_now, now)?;
3559 }
3560 Frame::RemoveAddress(remove_address) => {
3561 self.handle_remove_address(&remove_address)?;
3562 }
3563 Frame::ObservedAddress(observed_address) => {
3564 self.handle_observed_address_frame(&observed_address, now)?;
3565 }
3566 }
3567 }
3568
3569 let space = &mut self.spaces[SpaceId::Data];
3570 if space
3571 .pending_acks
3572 .packet_received(now, number, ack_eliciting, &space.dedup)
3573 {
3574 self.timers
3575 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3576 }
3577
3578 let pending = &mut self.spaces[SpaceId::Data].pending;
3583 self.streams.queue_max_stream_id(pending);
3584
3585 if let Some(reason) = close {
3586 self.error = Some(reason.into());
3587 self.state = State::Draining;
3588 self.close = true;
3589 }
3590
3591 if remote != self.path.remote
3592 && !is_probing_packet
3593 && number == self.spaces[SpaceId::Data].rx_packet
3594 {
3595 let ConnectionSide::Server { ref server_config } = self.side else {
3596 return Err(TransportError::PROTOCOL_VIOLATION(
3597 "packets from unknown remote should be dropped by clients",
3598 ));
3599 };
3600 debug_assert!(
3601 server_config.migration,
3602 "migration-initiating packets should have been dropped immediately"
3603 );
3604 self.migrate(now, remote);
3605 self.update_rem_cid();
3607 self.spin = false;
3608 }
3609
3610 Ok(())
3611 }
3612
3613 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3614 trace!(%remote, "migration initiated");
3615 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3619 PathData::from_previous(remote, &self.path, now)
3620 } else {
3621 let peer_max_udp_payload_size =
3622 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3623 .unwrap_or(u16::MAX);
3624 PathData::new(
3625 remote,
3626 self.allow_mtud,
3627 Some(peer_max_udp_payload_size),
3628 now,
3629 &self.config,
3630 )
3631 };
3632 new_path.challenge = Some(self.rng.r#gen());
3633 new_path.challenge_pending = true;
3634 let prev_pto = self.pto(SpaceId::Data);
3635
3636 let mut prev = mem::replace(&mut self.path, new_path);
3637 if prev.challenge.is_none() {
3639 prev.challenge = Some(self.rng.r#gen());
3640 prev.challenge_pending = true;
3641 self.prev_path = Some((self.rem_cids.active(), prev));
3644 }
3645
3646 self.timers.set(
3647 Timer::PathValidation,
3648 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3649 );
3650 }
3651
3652 pub fn local_address_changed(&mut self) {
3654 self.update_rem_cid();
3655 self.ping();
3656 }
3657
3658 pub fn migrate_to_nat_traversal_path(&mut self, now: Instant) -> Result<(), TransportError> {
3660 let (remote_addr, local_addr) = {
3662 let nat_state = self
3663 .nat_traversal
3664 .as_ref()
3665 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
3666
3667 let best_pairs = nat_state.get_best_succeeded_pairs();
3669 if best_pairs.is_empty() {
3670 return Err(TransportError::PROTOCOL_VIOLATION(
3671 "No validated NAT traversal paths",
3672 ));
3673 }
3674
3675 let best_path = best_pairs
3677 .iter()
3678 .find(|pair| pair.remote_addr != self.path.remote)
3679 .or_else(|| best_pairs.first());
3680
3681 let best_path = best_path.ok_or_else(|| {
3682 TransportError::PROTOCOL_VIOLATION("No suitable NAT traversal path")
3683 })?;
3684
3685 debug!(
3686 "Migrating to NAT traversal path: {} -> {} (priority: {})",
3687 self.path.remote, best_path.remote_addr, best_path.priority
3688 );
3689
3690 (best_path.remote_addr, best_path.local_addr)
3691 };
3692
3693 self.migrate(now, remote_addr);
3695
3696 if local_addr != SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0) {
3698 self.local_ip = Some(local_addr.ip());
3699 }
3700
3701 self.path.challenge_pending = true;
3703
3704 Ok(())
3705 }
3706
3707 fn update_rem_cid(&mut self) {
3709 let (reset_token, retired) = match self.rem_cids.next() {
3710 Some(x) => x,
3711 None => return,
3712 };
3713
3714 self.spaces[SpaceId::Data]
3716 .pending
3717 .retire_cids
3718 .extend(retired);
3719 self.set_reset_token(reset_token);
3720 }
3721
3722 fn set_reset_token(&mut self, reset_token: ResetToken) {
3723 self.endpoint_events
3724 .push_back(EndpointEventInner::ResetToken(
3725 self.path.remote,
3726 reset_token,
3727 ));
3728 self.peer_params.stateless_reset_token = Some(reset_token);
3729 }
3730
3731 fn issue_first_cids(&mut self, now: Instant) {
3733 if self.local_cid_state.cid_len() == 0 {
3734 return;
3735 }
3736
3737 let mut n = self.peer_params.issue_cids_limit() - 1;
3739 if let ConnectionSide::Server { server_config } = &self.side {
3740 if server_config.has_preferred_address() {
3741 n -= 1;
3743 }
3744 }
3745 self.endpoint_events
3746 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3747 }
3748
3749 fn populate_packet(
3750 &mut self,
3751 now: Instant,
3752 space_id: SpaceId,
3753 buf: &mut Vec<u8>,
3754 max_size: usize,
3755 pn: u64,
3756 ) -> SentFrames {
3757 let mut sent = SentFrames::default();
3758 let space = &mut self.spaces[space_id];
3759 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3760 space.pending_acks.maybe_ack_non_eliciting();
3761
3762 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3764 buf.write(frame::FrameType::HANDSHAKE_DONE);
3765 sent.retransmits.get_or_create().handshake_done = true;
3766 self.stats.frame_tx.handshake_done =
3768 self.stats.frame_tx.handshake_done.saturating_add(1);
3769 }
3770
3771 if mem::replace(&mut space.ping_pending, false) {
3773 trace!("PING");
3774 buf.write(frame::FrameType::PING);
3775 sent.non_retransmits = true;
3776 self.stats.frame_tx.ping += 1;
3777 }
3778
3779 if mem::replace(&mut space.immediate_ack_pending, false) {
3781 trace!("IMMEDIATE_ACK");
3782 buf.write(frame::FrameType::IMMEDIATE_ACK);
3783 sent.non_retransmits = true;
3784 self.stats.frame_tx.immediate_ack += 1;
3785 }
3786
3787 if space.pending_acks.can_send() {
3789 Self::populate_acks(
3790 now,
3791 self.receiving_ecn,
3792 &mut sent,
3793 space,
3794 buf,
3795 &mut self.stats,
3796 );
3797 }
3798
3799 if mem::replace(&mut space.pending.ack_frequency, false) {
3801 let sequence_number = self.ack_frequency.next_sequence_number();
3802
3803 let config = self.config.ack_frequency_config.as_ref().unwrap();
3805
3806 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3808 self.path.rtt.get(),
3809 config,
3810 &self.peer_params,
3811 );
3812
3813 trace!(?max_ack_delay, "ACK_FREQUENCY");
3814
3815 frame::AckFrequency {
3816 sequence: sequence_number,
3817 ack_eliciting_threshold: config.ack_eliciting_threshold,
3818 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3819 reordering_threshold: config.reordering_threshold,
3820 }
3821 .encode(buf);
3822
3823 sent.retransmits.get_or_create().ack_frequency = true;
3824
3825 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3826 self.stats.frame_tx.ack_frequency += 1;
3827 }
3828
3829 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3831 if let Some(token) = self.path.challenge {
3833 self.path.challenge_pending = false;
3835 sent.non_retransmits = true;
3836 sent.requires_padding = true;
3837 trace!("PATH_CHALLENGE {:08x}", token);
3838 buf.write(frame::FrameType::PATH_CHALLENGE);
3839 buf.write(token);
3840 self.stats.frame_tx.path_challenge += 1;
3841 }
3842
3843 }
3852
3853 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3855 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3856 sent.non_retransmits = true;
3857 sent.requires_padding = true;
3858 trace!("PATH_RESPONSE {:08x}", token);
3859 buf.write(frame::FrameType::PATH_RESPONSE);
3860 buf.write(token);
3861 self.stats.frame_tx.path_response += 1;
3862 }
3863 }
3864
3865 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3867 let mut frame = match space.pending.crypto.pop_front() {
3868 Some(x) => x,
3869 None => break,
3870 };
3871
3872 let max_crypto_data_size = max_size
3877 - buf.len()
3878 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3880 - 2; let available_space = max_size - buf.len();
3884 let remaining_data = frame.data.len();
3885 let optimal_size = self
3886 .pqc_state
3887 .calculate_crypto_frame_size(available_space, remaining_data);
3888
3889 let len = frame
3890 .data
3891 .len()
3892 .min(2usize.pow(14) - 1)
3893 .min(max_crypto_data_size)
3894 .min(optimal_size);
3895
3896 let data = frame.data.split_to(len);
3897 let truncated = frame::Crypto {
3898 offset: frame.offset,
3899 data,
3900 };
3901 trace!(
3902 "CRYPTO: off {} len {}",
3903 truncated.offset,
3904 truncated.data.len()
3905 );
3906 truncated.encode(buf);
3907 self.stats.frame_tx.crypto += 1;
3908 sent.retransmits.get_or_create().crypto.push_back(truncated);
3909 if !frame.data.is_empty() {
3910 frame.offset += len as u64;
3911 space.pending.crypto.push_front(frame);
3912 }
3913 }
3914
3915 if space_id == SpaceId::Data {
3916 self.streams.write_control_frames(
3917 buf,
3918 &mut space.pending,
3919 &mut sent.retransmits,
3920 &mut self.stats.frame_tx,
3921 max_size,
3922 );
3923 }
3924
3925 while buf.len() + 44 < max_size {
3927 let issued = match space.pending.new_cids.pop() {
3928 Some(x) => x,
3929 None => break,
3930 };
3931 trace!(
3932 sequence = issued.sequence,
3933 id = %issued.id,
3934 "NEW_CONNECTION_ID"
3935 );
3936 frame::NewConnectionId {
3937 sequence: issued.sequence,
3938 retire_prior_to: self.local_cid_state.retire_prior_to(),
3939 id: issued.id,
3940 reset_token: issued.reset_token,
3941 }
3942 .encode(buf);
3943 sent.retransmits.get_or_create().new_cids.push(issued);
3944 self.stats.frame_tx.new_connection_id += 1;
3945 }
3946
3947 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3949 let seq = match space.pending.retire_cids.pop() {
3950 Some(x) => x,
3951 None => break,
3952 };
3953 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3954 buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3955 buf.write_var(seq);
3956 sent.retransmits.get_or_create().retire_cids.push(seq);
3957 self.stats.frame_tx.retire_connection_id += 1;
3958 }
3959
3960 let mut sent_datagrams = false;
3962 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3963 match self.datagrams.write(buf, max_size) {
3964 true => {
3965 sent_datagrams = true;
3966 sent.non_retransmits = true;
3967 self.stats.frame_tx.datagram += 1;
3968 }
3969 false => break,
3970 }
3971 }
3972 if self.datagrams.send_blocked && sent_datagrams {
3973 self.events.push_back(Event::DatagramsUnblocked);
3974 self.datagrams.send_blocked = false;
3975 }
3976
3977 while let Some(remote_addr) = space.pending.new_tokens.pop() {
3979 debug_assert_eq!(space_id, SpaceId::Data);
3980 let ConnectionSide::Server { server_config } = &self.side else {
3981 debug_assert!(false, "NEW_TOKEN frames should not be enqueued by clients");
3983 continue;
3984 };
3985
3986 if remote_addr != self.path.remote {
3987 continue;
3992 }
3993
3994 let token = Token::new(
3995 TokenPayload::Validation {
3996 ip: remote_addr.ip(),
3997 issued: server_config.time_source.now(),
3998 },
3999 &mut self.rng,
4000 );
4001 let new_token = NewToken {
4002 token: token.encode(&*server_config.token_key).into(),
4003 };
4004
4005 if buf.len() + new_token.size() >= max_size {
4006 space.pending.new_tokens.push(remote_addr);
4007 break;
4008 }
4009
4010 new_token.encode(buf);
4011 sent.retransmits
4012 .get_or_create()
4013 .new_tokens
4014 .push(remote_addr);
4015 self.stats.frame_tx.new_token += 1;
4016 }
4017
4018 while buf.len() + frame::AddAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4020 let add_address = match space.pending.add_addresses.pop() {
4021 Some(x) => x,
4022 None => break,
4023 };
4024 trace!(
4025 sequence = %add_address.sequence,
4026 address = %add_address.address,
4027 "ADD_ADDRESS"
4028 );
4029 if self.nat_traversal_frame_config.use_rfc_format {
4031 add_address.encode_rfc(buf);
4032 } else {
4033 add_address.encode_legacy(buf);
4034 }
4035 sent.retransmits
4036 .get_or_create()
4037 .add_addresses
4038 .push(add_address);
4039 self.stats.frame_tx.add_address += 1;
4040 }
4041
4042 while buf.len() + frame::PunchMeNow::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4044 let punch_me_now = match space.pending.punch_me_now.pop() {
4045 Some(x) => x,
4046 None => break,
4047 };
4048 trace!(
4049 round = %punch_me_now.round,
4050 paired_with_sequence_number = %punch_me_now.paired_with_sequence_number,
4051 "PUNCH_ME_NOW"
4052 );
4053 if self.nat_traversal_frame_config.use_rfc_format {
4055 punch_me_now.encode_rfc(buf);
4056 } else {
4057 punch_me_now.encode_legacy(buf);
4058 }
4059 sent.retransmits
4060 .get_or_create()
4061 .punch_me_now
4062 .push(punch_me_now);
4063 self.stats.frame_tx.punch_me_now += 1;
4064 }
4065
4066 while buf.len() + frame::RemoveAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4068 let remove_address = match space.pending.remove_addresses.pop() {
4069 Some(x) => x,
4070 None => break,
4071 };
4072 trace!(
4073 sequence = %remove_address.sequence,
4074 "REMOVE_ADDRESS"
4075 );
4076 remove_address.encode(buf);
4078 sent.retransmits
4079 .get_or_create()
4080 .remove_addresses
4081 .push(remove_address);
4082 self.stats.frame_tx.remove_address += 1;
4083 }
4084
4085 while buf.len() + frame::ObservedAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data
4087 {
4088 let observed_address = match space.pending.observed_addresses.pop() {
4089 Some(x) => x,
4090 None => break,
4091 };
4092 trace!(
4093 address = %observed_address.address,
4094 "OBSERVED_ADDRESS"
4095 );
4096 observed_address.encode(buf);
4097 sent.retransmits
4098 .get_or_create()
4099 .observed_addresses
4100 .push(observed_address);
4101 self.stats.frame_tx.observed_address += 1;
4102 }
4103
4104 if space_id == SpaceId::Data {
4106 sent.stream_frames =
4107 self.streams
4108 .write_stream_frames(buf, max_size, self.config.send_fairness);
4109 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
4110 }
4111
4112 sent
4113 }
4114
4115 fn populate_acks(
4120 now: Instant,
4121 receiving_ecn: bool,
4122 sent: &mut SentFrames,
4123 space: &mut PacketSpace,
4124 buf: &mut Vec<u8>,
4125 stats: &mut ConnectionStats,
4126 ) {
4127 debug_assert!(!space.pending_acks.ranges().is_empty());
4128
4129 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
4131 let ecn = if receiving_ecn {
4132 Some(&space.ecn_counters)
4133 } else {
4134 None
4135 };
4136 sent.largest_acked = space.pending_acks.ranges().max();
4137
4138 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
4139
4140 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
4142 let delay = delay_micros >> ack_delay_exp.into_inner();
4143
4144 trace!(
4145 "ACK {:?}, Delay = {}us",
4146 space.pending_acks.ranges(),
4147 delay_micros
4148 );
4149
4150 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
4151 stats.frame_tx.acks += 1;
4152 }
4153
4154 fn close_common(&mut self) {
4155 trace!("connection closed");
4156 for &timer in &Timer::VALUES {
4157 self.timers.stop(timer);
4158 }
4159 }
4160
4161 fn set_close_timer(&mut self, now: Instant) {
4162 self.timers
4163 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
4164 }
4165
4166 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
4168 if Some(self.orig_rem_cid) != params.initial_src_cid
4169 || (self.side.is_client()
4170 && (Some(self.initial_dst_cid) != params.original_dst_cid
4171 || self.retry_src_cid != params.retry_src_cid))
4172 {
4173 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
4174 "CID authentication failure",
4175 ));
4176 }
4177
4178 self.set_peer_params(params);
4179
4180 Ok(())
4181 }
4182
4183 fn set_peer_params(&mut self, params: TransportParameters) {
4184 self.streams.set_params(¶ms);
4185 self.idle_timeout =
4186 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
4187 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
4188 if let Some(ref info) = params.preferred_address {
4189 self.rem_cids.insert(frame::NewConnectionId {
4190 sequence: 1,
4191 id: info.connection_id,
4192 reset_token: info.stateless_reset_token,
4193 retire_prior_to: 0,
4194 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
4195 }
4196 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
4197
4198 self.negotiate_nat_traversal_capability(¶ms);
4200
4201 let local_has_nat_traversal = self.config.nat_traversal_config.is_some();
4204 let local_supports_rfc = local_has_nat_traversal;
4207 self.nat_traversal_frame_config = frame::nat_traversal_unified::NatTraversalFrameConfig {
4208 use_rfc_format: local_supports_rfc && params.supports_rfc_nat_traversal(),
4210 accept_legacy: true,
4212 };
4213
4214 self.negotiate_address_discovery(¶ms);
4216
4217 self.pqc_state.update_from_peer_params(¶ms);
4219
4220 if self.pqc_state.enabled && self.pqc_state.using_pqc {
4222 trace!("PQC enabled, adjusting MTU discovery for larger handshake packets");
4223 let current_mtu = self.path.mtud.current_mtu();
4227 if current_mtu < self.pqc_state.handshake_mtu {
4228 trace!(
4229 "Current MTU {} is less than PQC handshake MTU {}, will rely on MTU discovery",
4230 current_mtu, self.pqc_state.handshake_mtu
4231 );
4232 }
4233 }
4234
4235 self.peer_params = params;
4236 self.path.mtud.on_peer_max_udp_payload_size_received(
4237 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
4238 );
4239 }
4240
4241 fn negotiate_nat_traversal_capability(&mut self, params: &TransportParameters) {
4243 let peer_nat_config = match ¶ms.nat_traversal {
4245 Some(config) => config,
4246 None => {
4247 if self.config.nat_traversal_config.is_some() {
4249 debug!(
4250 "Peer does not support NAT traversal, maintaining backward compatibility"
4251 );
4252 self.emit_nat_traversal_capability_event(false);
4253
4254 self.set_nat_traversal_compatibility_mode(false);
4256 }
4257 return;
4258 }
4259 };
4260
4261 let local_nat_config = match &self.config.nat_traversal_config {
4263 Some(config) => config,
4264 None => {
4265 debug!("NAT traversal not enabled locally, ignoring peer support");
4266 self.emit_nat_traversal_capability_event(false);
4267 self.set_nat_traversal_compatibility_mode(false);
4268 return;
4269 }
4270 };
4271
4272 info!("Both peers support NAT traversal, negotiating capabilities");
4274
4275 match self.negotiate_nat_traversal_parameters(local_nat_config, peer_nat_config) {
4277 Ok(negotiated_config) => {
4278 info!("NAT traversal capability negotiated successfully");
4279 self.emit_nat_traversal_capability_event(true);
4280
4281 self.init_nat_traversal_with_negotiated_config(&negotiated_config);
4283
4284 self.set_nat_traversal_compatibility_mode(true);
4286
4287 if matches!(
4289 negotiated_config,
4290 crate::transport_parameters::NatTraversalConfig::ClientSupport
4291 ) {
4292 self.initiate_nat_traversal_process();
4293 }
4294 }
4295 Err(e) => {
4296 warn!("NAT traversal capability negotiation failed: {}", e);
4297 self.emit_nat_traversal_capability_event(false);
4298 self.set_nat_traversal_compatibility_mode(false);
4299 }
4300 }
4301 }
4302
4303 fn emit_nat_traversal_capability_event(&mut self, negotiated: bool) {
4357 if negotiated {
4360 info!("NAT traversal capability successfully negotiated");
4361 } else {
4362 info!("NAT traversal capability not available (peer or local support missing)");
4363 }
4364
4365 }
4368
4369 fn set_nat_traversal_compatibility_mode(&mut self, enabled: bool) {
4371 if enabled {
4372 debug!("NAT traversal enabled for this connection");
4373 } else {
4375 debug!("NAT traversal disabled for this connection (backward compatibility mode)");
4376 if self.nat_traversal.is_some() {
4378 warn!("Clearing NAT traversal state due to compatibility mode");
4379 self.nat_traversal = None;
4380 }
4381 }
4382 }
4383
4384 fn negotiate_nat_traversal_parameters(
4386 &self,
4387 local_config: &crate::transport_parameters::NatTraversalConfig,
4388 peer_config: &crate::transport_parameters::NatTraversalConfig,
4389 ) -> Result<crate::transport_parameters::NatTraversalConfig, String> {
4390 match (local_config, peer_config) {
4395 (
4397 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4398 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4399 concurrency_limit,
4400 },
4401 ) => Ok(
4402 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4403 concurrency_limit: *concurrency_limit,
4404 },
4405 ),
4406 (
4408 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4409 concurrency_limit,
4410 },
4411 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4412 ) => Ok(
4413 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4414 concurrency_limit: *concurrency_limit,
4415 },
4416 ),
4417 (
4419 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4420 concurrency_limit: limit1,
4421 },
4422 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4423 concurrency_limit: limit2,
4424 },
4425 ) => Ok(
4426 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4427 concurrency_limit: (*limit1).min(*limit2),
4428 },
4429 ),
4430 (
4432 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4433 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4434 ) => Err("Both endpoints claim to be NAT traversal clients".to_string()),
4435 }
4436 }
4437
4438 fn init_nat_traversal_with_negotiated_config(
4440 &mut self,
4441 config: &crate::transport_parameters::NatTraversalConfig,
4442 ) {
4443 let (role, _concurrency_limit) = match config {
4446 crate::transport_parameters::NatTraversalConfig::ClientSupport => {
4447 (NatTraversalRole::Client, 10) }
4450 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4451 concurrency_limit,
4452 } => {
4453 (
4455 NatTraversalRole::Server { can_relay: false },
4456 concurrency_limit.into_inner() as u32,
4457 )
4458 }
4459 };
4460
4461 let max_candidates = 50; let coordination_timeout = Duration::from_secs(10); self.nat_traversal = Some(NatTraversalState::new(
4467 role,
4468 max_candidates,
4469 coordination_timeout,
4470 ));
4471
4472 trace!(
4473 "NAT traversal initialized with negotiated config: role={:?}",
4474 role
4475 );
4476
4477 match role {
4479 NatTraversalRole::Bootstrap => {
4480 self.prepare_address_observation();
4482 }
4483 NatTraversalRole::Client => {
4484 self.schedule_candidate_discovery();
4486 }
4487 NatTraversalRole::Server { .. } => {
4488 self.prepare_coordination_handling();
4490 }
4491 }
4492 }
4493
4494 fn initiate_nat_traversal_process(&mut self) {
4496 if let Some(nat_state) = &mut self.nat_traversal {
4497 match nat_state.start_candidate_discovery() {
4498 Ok(()) => {
4499 debug!("NAT traversal process initiated - candidate discovery started");
4500 self.timers.set(
4502 Timer::NatTraversal,
4503 Instant::now() + Duration::from_millis(100),
4504 );
4505 }
4506 Err(e) => {
4507 warn!("Failed to initiate NAT traversal process: {}", e);
4508 }
4509 }
4510 }
4511 }
4512
4513 fn prepare_address_observation(&mut self) {
4515 debug!("Preparing for address observation as bootstrap node");
4516 }
4519
4520 fn schedule_candidate_discovery(&mut self) {
4522 debug!("Scheduling candidate discovery for client endpoint");
4523 self.timers.set(
4525 Timer::NatTraversal,
4526 Instant::now() + Duration::from_millis(50),
4527 );
4528 }
4529
4530 fn prepare_coordination_handling(&mut self) {
4532 debug!("Preparing to handle coordination requests as server endpoint");
4533 }
4536
4537 fn handle_nat_traversal_timeout(&mut self, now: Instant) {
4539 let timeout_result = if let Some(nat_state) = &mut self.nat_traversal {
4541 nat_state.handle_timeout(now)
4542 } else {
4543 return;
4544 };
4545
4546 match timeout_result {
4548 Ok(actions) => {
4549 for action in actions {
4550 match action {
4551 nat_traversal::TimeoutAction::RetryDiscovery => {
4552 debug!("NAT traversal timeout: retrying candidate discovery");
4553 if let Some(nat_state) = &mut self.nat_traversal {
4554 if let Err(e) = nat_state.start_candidate_discovery() {
4555 warn!("Failed to retry candidate discovery: {}", e);
4556 }
4557 }
4558 }
4559 nat_traversal::TimeoutAction::RetryCoordination => {
4560 debug!("NAT traversal timeout: retrying coordination");
4561 self.timers
4563 .set(Timer::NatTraversal, now + Duration::from_secs(2));
4564 }
4565 nat_traversal::TimeoutAction::StartValidation => {
4566 debug!("NAT traversal timeout: starting path validation");
4567 self.start_nat_traversal_validation(now);
4568 }
4569 nat_traversal::TimeoutAction::Complete => {
4570 debug!("NAT traversal completed successfully");
4571 self.timers.stop(Timer::NatTraversal);
4573 }
4574 nat_traversal::TimeoutAction::Failed => {
4575 warn!("NAT traversal failed after timeout");
4576 self.handle_nat_traversal_failure();
4578 }
4579 }
4580 }
4581 }
4582 Err(e) => {
4583 warn!("NAT traversal timeout handling failed: {}", e);
4584 self.handle_nat_traversal_failure();
4585 }
4586 }
4587 }
4588
4589 fn start_nat_traversal_validation(&mut self, now: Instant) {
4591 if let Some(nat_state) = &mut self.nat_traversal {
4592 let pairs = nat_state.get_next_validation_pairs(3);
4594
4595 for pair in pairs {
4596 let challenge = self.rng.r#gen();
4598 self.path.challenge = Some(challenge);
4599 self.path.challenge_pending = true;
4600
4601 debug!(
4602 "Starting path validation for NAT traversal candidate: {}",
4603 pair.remote_addr
4604 );
4605 }
4606
4607 self.timers
4609 .set(Timer::PathValidation, now + Duration::from_secs(3));
4610 }
4611 }
4612
4613 fn handle_nat_traversal_failure(&mut self) {
4615 warn!("NAT traversal failed, considering fallback options");
4616
4617 self.nat_traversal = None;
4619 self.timers.stop(Timer::NatTraversal);
4620
4621 debug!("NAT traversal disabled for this connection due to failure");
4628 }
4629
4630 pub fn nat_traversal_supported(&self) -> bool {
4632 self.nat_traversal.is_some()
4633 && self.config.nat_traversal_config.is_some()
4634 && self.peer_params.nat_traversal.is_some()
4635 }
4636
4637 pub fn nat_traversal_config(&self) -> Option<&crate::transport_parameters::NatTraversalConfig> {
4639 self.peer_params.nat_traversal.as_ref()
4640 }
4641
4642 pub fn nat_traversal_ready(&self) -> bool {
4644 self.nat_traversal_supported() && matches!(self.state, State::Established)
4645 }
4646
4647 pub(crate) fn nat_traversal_stats(&self) -> Option<nat_traversal::NatTraversalStats> {
4652 self.nat_traversal.as_ref().map(|state| state.stats.clone())
4653 }
4654
4655 #[cfg(test)]
4657 pub(crate) fn force_enable_nat_traversal(&mut self, role: NatTraversalRole) {
4658 use crate::transport_parameters::NatTraversalConfig;
4659
4660 let config = match role {
4662 NatTraversalRole::Client => NatTraversalConfig::ClientSupport,
4663 NatTraversalRole::Server { .. } | NatTraversalRole::Bootstrap => {
4664 NatTraversalConfig::ServerSupport {
4665 concurrency_limit: VarInt::from_u32(5),
4666 }
4667 }
4668 };
4669
4670 self.peer_params.nat_traversal = Some(config.clone());
4671 self.config = Arc::new({
4672 let mut transport_config = (*self.config).clone();
4673 transport_config.nat_traversal_config = Some(config);
4674 transport_config
4675 });
4676
4677 self.nat_traversal = Some(NatTraversalState::new(role, 8, Duration::from_secs(10)));
4678 }
4679
4680 fn derive_peer_id_from_connection(&self) -> [u8; 32] {
4683 let mut hasher = std::collections::hash_map::DefaultHasher::new();
4685 use std::hash::Hasher;
4686 hasher.write(&self.rem_handshake_cid);
4687 hasher.write(&self.handshake_cid);
4688 hasher.write(&self.path.remote.to_string().into_bytes());
4689 let hash = hasher.finish();
4690 let mut peer_id = [0u8; 32];
4691 peer_id[..8].copy_from_slice(&hash.to_be_bytes());
4692 let cid_bytes = self.rem_handshake_cid.as_ref();
4694 let copy_len = (cid_bytes.len()).min(24);
4695 peer_id[8..8 + copy_len].copy_from_slice(&cid_bytes[..copy_len]);
4696 peer_id
4697 }
4698
4699 fn handle_add_address(
4701 &mut self,
4702 add_address: &crate::frame::AddAddress,
4703 now: Instant,
4704 ) -> Result<(), TransportError> {
4705 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4706 TransportError::PROTOCOL_VIOLATION("AddAddress frame without NAT traversal negotiation")
4707 })?;
4708
4709 match nat_state.add_remote_candidate(
4710 add_address.sequence,
4711 add_address.address,
4712 add_address.priority,
4713 now,
4714 ) {
4715 Ok(()) => {
4716 trace!(
4717 "Added remote candidate: {} (seq={}, priority={})",
4718 add_address.address, add_address.sequence, add_address.priority
4719 );
4720
4721 self.trigger_candidate_validation(add_address.address, now)?;
4723 Ok(())
4724 }
4725 Err(NatTraversalError::TooManyCandidates) => Err(TransportError::PROTOCOL_VIOLATION(
4726 "too many NAT traversal candidates",
4727 )),
4728 Err(NatTraversalError::DuplicateAddress) => {
4729 Ok(())
4731 }
4732 Err(e) => {
4733 warn!("Failed to add remote candidate: {}", e);
4734 Ok(()) }
4736 }
4737 }
4738
4739 fn handle_punch_me_now(
4741 &mut self,
4742 punch_me_now: &crate::frame::PunchMeNow,
4743 now: Instant,
4744 ) -> Result<(), TransportError> {
4745 trace!(
4746 "Received PunchMeNow: round={}, target_seq={}, local_addr={}",
4747 punch_me_now.round, punch_me_now.paired_with_sequence_number, punch_me_now.address
4748 );
4749
4750 if let Some(nat_state) = &self.nat_traversal {
4752 if matches!(nat_state.role, NatTraversalRole::Bootstrap) {
4753 let from_peer_id = self.derive_peer_id_from_connection();
4755
4756 let punch_me_now_clone = punch_me_now.clone();
4758 drop(nat_state); match self
4761 .nat_traversal
4762 .as_mut()
4763 .unwrap()
4764 .handle_punch_me_now_frame(
4765 from_peer_id,
4766 self.path.remote,
4767 &punch_me_now_clone,
4768 now,
4769 ) {
4770 Ok(Some(coordination_frame)) => {
4771 trace!("Bootstrap node coordinating PUNCH_ME_NOW between peers");
4772
4773 if let Some(target_peer_id) = punch_me_now.target_peer_id {
4775 self.endpoint_events.push_back(
4776 crate::shared::EndpointEventInner::RelayPunchMeNow(
4777 target_peer_id,
4778 coordination_frame,
4779 ),
4780 );
4781 }
4782
4783 return Ok(());
4784 }
4785 Ok(None) => {
4786 trace!("Bootstrap coordination completed or no action needed");
4787 return Ok(());
4788 }
4789 Err(e) => {
4790 warn!("Bootstrap coordination failed: {}", e);
4791 return Ok(());
4792 }
4793 }
4794 }
4795 }
4796
4797 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4799 TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation")
4800 })?;
4801
4802 if nat_state
4804 .handle_peer_punch_request(punch_me_now.round, now)
4805 .map_err(|_e| {
4806 TransportError::PROTOCOL_VIOLATION("Failed to handle peer punch request")
4807 })?
4808 {
4809 trace!("Coordination synchronized for round {}", punch_me_now.round);
4810
4811 let _local_addr = self
4814 .local_ip
4815 .map(|ip| SocketAddr::new(ip, 0))
4816 .unwrap_or_else(|| {
4817 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
4818 });
4819
4820 let target = nat_traversal::PunchTarget {
4821 remote_addr: punch_me_now.address,
4822 remote_sequence: punch_me_now.paired_with_sequence_number,
4823 challenge: self.rng.r#gen(),
4824 };
4825
4826 let _ = nat_state.start_coordination_round(vec![target], now);
4828 } else {
4829 debug!(
4830 "Failed to synchronize coordination for round {}",
4831 punch_me_now.round
4832 );
4833 }
4834
4835 Ok(())
4836 }
4837
4838 fn handle_remove_address(
4840 &mut self,
4841 remove_address: &crate::frame::RemoveAddress,
4842 ) -> Result<(), TransportError> {
4843 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4844 TransportError::PROTOCOL_VIOLATION(
4845 "RemoveAddress frame without NAT traversal negotiation",
4846 )
4847 })?;
4848
4849 if nat_state.remove_candidate(remove_address.sequence) {
4850 trace!(
4851 "Removed candidate with sequence {}",
4852 remove_address.sequence
4853 );
4854 } else {
4855 trace!(
4856 "Attempted to remove unknown candidate sequence {}",
4857 remove_address.sequence
4858 );
4859 }
4860
4861 Ok(())
4862 }
4863
4864 fn handle_observed_address_frame(
4866 &mut self,
4867 observed_address: &crate::frame::ObservedAddress,
4868 now: Instant,
4869 ) -> Result<(), TransportError> {
4870 let state = self.address_discovery_state.as_mut().ok_or_else(|| {
4872 TransportError::PROTOCOL_VIOLATION(
4873 "ObservedAddress frame without address discovery negotiation",
4874 )
4875 })?;
4876
4877 if !state.enabled {
4879 return Err(TransportError::PROTOCOL_VIOLATION(
4880 "ObservedAddress frame received when address discovery is disabled",
4881 ));
4882 }
4883
4884 #[cfg(feature = "trace")]
4886 {
4887 use crate::trace_observed_address_received;
4888 trace_observed_address_received!(
4890 &self.event_log,
4891 self.trace_context.trace_id(),
4892 observed_address.address,
4893 0u64 );
4895 }
4896
4897 let path_id = 0u64; if let Some(&last_seq) = state.last_received_sequence.get(&path_id) {
4905 if observed_address.sequence_number <= last_seq {
4906 trace!(
4907 "Ignoring OBSERVED_ADDRESS frame with stale sequence number {} (last was {})",
4908 observed_address.sequence_number, last_seq
4909 );
4910 return Ok(());
4911 }
4912 }
4913
4914 state
4916 .last_received_sequence
4917 .insert(path_id, observed_address.sequence_number);
4918
4919 state.handle_observed_address(observed_address.address, path_id, now);
4921
4922 self.path
4924 .update_observed_address(observed_address.address, now);
4925
4926 trace!(
4928 "Received ObservedAddress frame: address={} for path={}",
4929 observed_address.address, path_id
4930 );
4931
4932 Ok(())
4933 }
4934
4935 pub fn queue_add_address(&mut self, sequence: VarInt, address: SocketAddr, priority: VarInt) {
4937 let add_address = frame::AddAddress {
4939 sequence,
4940 address,
4941 priority,
4942 };
4943
4944 self.spaces[SpaceId::Data]
4945 .pending
4946 .add_addresses
4947 .push(add_address);
4948 trace!(
4949 "Queued AddAddress frame: seq={}, addr={}, priority={}",
4950 sequence, address, priority
4951 );
4952 }
4953
4954 pub fn queue_punch_me_now(
4956 &mut self,
4957 round: VarInt,
4958 paired_with_sequence_number: VarInt,
4959 address: SocketAddr,
4960 ) {
4961 let punch_me_now = frame::PunchMeNow {
4962 round,
4963 paired_with_sequence_number,
4964 address,
4965 target_peer_id: None, };
4967
4968 self.spaces[SpaceId::Data]
4969 .pending
4970 .punch_me_now
4971 .push(punch_me_now);
4972 trace!(
4973 "Queued PunchMeNow frame: round={}, target={}",
4974 round, paired_with_sequence_number
4975 );
4976 }
4977
4978 pub fn queue_remove_address(&mut self, sequence: VarInt) {
4980 let remove_address = frame::RemoveAddress { sequence };
4981
4982 self.spaces[SpaceId::Data]
4983 .pending
4984 .remove_addresses
4985 .push(remove_address);
4986 trace!("Queued RemoveAddress frame: seq={}", sequence);
4987 }
4988
4989 pub fn queue_observed_address(&mut self, address: SocketAddr) {
4991 let sequence_number = if let Some(state) = &mut self.address_discovery_state {
4993 let seq = state.next_sequence_number;
4994 state.next_sequence_number =
4995 VarInt::from_u64(state.next_sequence_number.into_inner() + 1)
4996 .expect("sequence number overflow");
4997 seq
4998 } else {
4999 VarInt::from_u32(0)
5001 };
5002
5003 let observed_address = frame::ObservedAddress {
5004 sequence_number,
5005 address,
5006 };
5007 self.spaces[SpaceId::Data]
5008 .pending
5009 .observed_addresses
5010 .push(observed_address);
5011 trace!("Queued ObservedAddress frame: addr={}", address);
5012 }
5013
5014 pub fn check_for_address_observations(&mut self, now: Instant) {
5016 let Some(state) = &mut self.address_discovery_state else {
5018 return;
5019 };
5020
5021 if !state.enabled {
5023 return;
5024 }
5025
5026 let path_id = 0u64; let remote_address = self.path.remote;
5031
5032 if state.should_send_observation(path_id, now) {
5034 if let Some(frame) = state.queue_observed_address_frame(path_id, remote_address) {
5036 self.spaces[SpaceId::Data]
5038 .pending
5039 .observed_addresses
5040 .push(frame);
5041
5042 state.record_observation_sent(path_id);
5044
5045 #[cfg(feature = "trace")]
5047 {
5048 use crate::trace_observed_address_sent;
5049 trace_observed_address_sent!(
5051 &self.event_log,
5052 self.trace_context.trace_id(),
5053 remote_address,
5054 path_id
5055 );
5056 }
5057
5058 trace!(
5059 "Queued OBSERVED_ADDRESS frame for path {} with address {}",
5060 path_id, remote_address
5061 );
5062 }
5063 }
5064 }
5065
5066 fn trigger_candidate_validation(
5068 &mut self,
5069 candidate_address: SocketAddr,
5070 now: Instant,
5071 ) -> Result<(), TransportError> {
5072 let nat_state = self
5073 .nat_traversal
5074 .as_mut()
5075 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5076
5077 if nat_state
5079 .active_validations
5080 .contains_key(&candidate_address)
5081 {
5082 trace!("Validation already in progress for {}", candidate_address);
5083 return Ok(());
5084 }
5085
5086 let challenge = self.rng.r#gen::<u64>();
5088
5089 let validation_state = nat_traversal::PathValidationState {
5091 challenge,
5092 sent_at: now,
5093 retry_count: 0,
5094 max_retries: 3,
5095 coordination_round: None,
5096 timeout_state: nat_traversal::AdaptiveTimeoutState::new(),
5097 last_retry_at: None,
5098 };
5099
5100 nat_state
5102 .active_validations
5103 .insert(candidate_address, validation_state);
5104
5105 self.nat_traversal_challenges
5107 .push(candidate_address, challenge);
5108
5109 nat_state.stats.validations_succeeded += 1; trace!(
5113 "Triggered PATH_CHALLENGE validation for {} with challenge {:016x}",
5114 candidate_address, challenge
5115 );
5116
5117 Ok(())
5118 }
5119
5120 pub fn nat_traversal_state(&self) -> Option<(NatTraversalRole, usize, usize)> {
5122 self.nat_traversal.as_ref().map(|state| {
5123 (
5124 state.role,
5125 state.local_candidates.len(),
5126 state.remote_candidates.len(),
5127 )
5128 })
5129 }
5130
5131 pub fn initiate_nat_traversal_coordination(
5133 &mut self,
5134 now: Instant,
5135 ) -> Result<(), TransportError> {
5136 let nat_state = self
5137 .nat_traversal
5138 .as_mut()
5139 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5140
5141 if nat_state.should_send_punch_request() {
5143 nat_state.generate_candidate_pairs(now);
5145
5146 let pairs = nat_state.get_next_validation_pairs(3);
5148 if pairs.is_empty() {
5149 return Err(TransportError::PROTOCOL_VIOLATION(
5150 "No candidate pairs for coordination",
5151 ));
5152 }
5153
5154 let targets: Vec<_> = pairs
5156 .into_iter()
5157 .map(|pair| nat_traversal::PunchTarget {
5158 remote_addr: pair.remote_addr,
5159 remote_sequence: pair.remote_sequence,
5160 challenge: self.rng.r#gen(),
5161 })
5162 .collect();
5163
5164 let round = nat_state
5166 .start_coordination_round(targets, now)
5167 .map_err(|_e| {
5168 TransportError::PROTOCOL_VIOLATION("Failed to start coordination round")
5169 })?;
5170
5171 let local_addr = self
5174 .local_ip
5175 .map(|ip| SocketAddr::new(ip, self.local_ip.map(|_| 0).unwrap_or(0)))
5176 .unwrap_or_else(|| {
5177 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
5178 });
5179
5180 let punch_me_now = frame::PunchMeNow {
5181 round,
5182 paired_with_sequence_number: VarInt::from_u32(0), address: local_addr,
5184 target_peer_id: None, };
5186
5187 self.spaces[SpaceId::Data]
5188 .pending
5189 .punch_me_now
5190 .push(punch_me_now);
5191 nat_state.mark_punch_request_sent();
5192
5193 trace!("Initiated NAT traversal coordination round {}", round);
5194 }
5195
5196 Ok(())
5197 }
5198
5199 pub fn validate_nat_candidates(&mut self, now: Instant) {
5201 self.generate_nat_traversal_challenges(now);
5202 }
5203
5204 pub fn send_nat_address_advertisement(
5219 &mut self,
5220 address: SocketAddr,
5221 priority: u32,
5222 ) -> Result<u64, ConnectionError> {
5223 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5225 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5226 "NAT traversal not enabled on this connection",
5227 ))
5228 })?;
5229
5230 let sequence = nat_state.next_sequence;
5232 nat_state.next_sequence =
5233 VarInt::from_u64(nat_state.next_sequence.into_inner() + 1).unwrap();
5234
5235 let now = Instant::now();
5237 nat_state.local_candidates.insert(
5238 sequence,
5239 nat_traversal::AddressCandidate {
5240 address,
5241 priority,
5242 source: nat_traversal::CandidateSource::Local,
5243 discovered_at: now,
5244 state: nat_traversal::CandidateState::New,
5245 attempt_count: 0,
5246 last_attempt: None,
5247 },
5248 );
5249
5250 nat_state.stats.local_candidates_sent += 1;
5252
5253 self.queue_add_address(sequence, address, VarInt::from_u32(priority));
5255
5256 debug!(
5257 "Queued ADD_ADDRESS frame: addr={}, priority={}, seq={}",
5258 address, priority, sequence
5259 );
5260 Ok(sequence.into_inner())
5261 }
5262
5263 pub fn send_nat_punch_coordination(
5276 &mut self,
5277 paired_with_sequence_number: u64,
5278 address: SocketAddr,
5279 round: u32,
5280 ) -> Result<(), ConnectionError> {
5281 let _nat_state = self.nat_traversal.as_ref().ok_or_else(|| {
5283 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5284 "NAT traversal not enabled on this connection",
5285 ))
5286 })?;
5287
5288 self.queue_punch_me_now(
5290 VarInt::from_u32(round),
5291 VarInt::from_u64(paired_with_sequence_number).map_err(|_| {
5292 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5293 "Invalid target sequence number",
5294 ))
5295 })?,
5296 address,
5297 );
5298
5299 debug!(
5300 "Queued PUNCH_ME_NOW frame: paired_with_seq={}, addr={}, round={}",
5301 paired_with_sequence_number, address, round
5302 );
5303 Ok(())
5304 }
5305
5306 pub fn send_nat_address_removal(&mut self, sequence: u64) -> Result<(), ConnectionError> {
5317 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5319 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5320 "NAT traversal not enabled on this connection",
5321 ))
5322 })?;
5323
5324 let sequence_varint = VarInt::from_u64(sequence).map_err(|_| {
5325 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5326 "Invalid sequence number",
5327 ))
5328 })?;
5329
5330 nat_state.local_candidates.remove(&sequence_varint);
5332
5333 self.queue_remove_address(sequence_varint);
5335
5336 debug!("Queued REMOVE_ADDRESS frame: seq={}", sequence);
5337 Ok(())
5338 }
5339
5340 pub(crate) fn get_nat_traversal_stats(&self) -> Option<&nat_traversal::NatTraversalStats> {
5349 self.nat_traversal.as_ref().map(|state| &state.stats)
5350 }
5351
5352 pub fn is_nat_traversal_enabled(&self) -> bool {
5354 self.nat_traversal.is_some()
5355 }
5356
5357 pub fn get_nat_traversal_role(&self) -> Option<NatTraversalRole> {
5359 self.nat_traversal.as_ref().map(|state| state.role)
5360 }
5361
5362 fn negotiate_address_discovery(&mut self, peer_params: &TransportParameters) {
5364 let now = Instant::now();
5365
5366 match &peer_params.address_discovery {
5368 Some(peer_config) => {
5369 if let Some(state) = &mut self.address_discovery_state {
5371 if state.enabled {
5372 debug!(
5375 "Address discovery negotiated: rate={}, all_paths={}",
5376 state.max_observation_rate, state.observe_all_paths
5377 );
5378 } else {
5379 debug!("Address discovery disabled locally, ignoring peer support");
5381 }
5382 } else {
5383 self.address_discovery_state =
5385 Some(AddressDiscoveryState::new(peer_config, now));
5386 debug!("Address discovery initialized from peer config");
5387 }
5388 }
5389 _ => {
5390 if let Some(state) = &mut self.address_discovery_state {
5392 state.enabled = false;
5393 debug!("Address discovery disabled - peer doesn't support it");
5394 }
5395 }
5396 }
5397
5398 if let Some(state) = &self.address_discovery_state {
5400 if state.enabled {
5401 self.path.set_observation_rate(state.max_observation_rate);
5402 }
5403 }
5404 }
5405
5406 fn decrypt_packet(
5407 &mut self,
5408 now: Instant,
5409 packet: &mut Packet,
5410 ) -> Result<Option<u64>, Option<TransportError>> {
5411 let result = packet_crypto::decrypt_packet_body(
5412 packet,
5413 &self.spaces,
5414 self.zero_rtt_crypto.as_ref(),
5415 self.key_phase,
5416 self.prev_crypto.as_ref(),
5417 self.next_crypto.as_ref(),
5418 )?;
5419
5420 let result = match result {
5421 Some(r) => r,
5422 None => return Ok(None),
5423 };
5424
5425 if result.outgoing_key_update_acked {
5426 if let Some(prev) = self.prev_crypto.as_mut() {
5427 prev.end_packet = Some((result.number, now));
5428 self.set_key_discard_timer(now, packet.header.space());
5429 }
5430 }
5431
5432 if result.incoming_key_update {
5433 trace!("key update authenticated");
5434 self.update_keys(Some((result.number, now)), true);
5435 self.set_key_discard_timer(now, packet.header.space());
5436 }
5437
5438 Ok(Some(result.number))
5439 }
5440
5441 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5442 trace!("executing key update");
5443 let new = self
5447 .crypto
5448 .next_1rtt_keys()
5449 .expect("only called for `Data` packets");
5450 self.key_phase_size = new
5451 .local
5452 .confidentiality_limit()
5453 .saturating_sub(KEY_UPDATE_MARGIN);
5454 let old = mem::replace(
5455 &mut self.spaces[SpaceId::Data]
5456 .crypto
5457 .as_mut()
5458 .unwrap() .packet,
5460 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5461 );
5462 self.spaces[SpaceId::Data].sent_with_keys = 0;
5463 self.prev_crypto = Some(PrevCrypto {
5464 crypto: old,
5465 end_packet,
5466 update_unacked: remote,
5467 });
5468 self.key_phase = !self.key_phase;
5469 }
5470
5471 fn peer_supports_ack_frequency(&self) -> bool {
5472 self.peer_params.min_ack_delay.is_some()
5473 }
5474
5475 pub(crate) fn immediate_ack(&mut self) {
5480 self.spaces[self.highest_space].immediate_ack_pending = true;
5481 }
5482
5483 #[cfg(test)]
5485 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5486 let (first_decode, remaining) = match &event.0 {
5487 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5488 first_decode,
5489 remaining,
5490 ..
5491 }) => (first_decode, remaining),
5492 _ => return None,
5493 };
5494
5495 if remaining.is_some() {
5496 panic!("Packets should never be coalesced in tests");
5497 }
5498
5499 let decrypted_header = packet_crypto::unprotect_header(
5500 first_decode.clone(),
5501 &self.spaces,
5502 self.zero_rtt_crypto.as_ref(),
5503 self.peer_params.stateless_reset_token,
5504 )?;
5505
5506 let mut packet = decrypted_header.packet?;
5507 packet_crypto::decrypt_packet_body(
5508 &mut packet,
5509 &self.spaces,
5510 self.zero_rtt_crypto.as_ref(),
5511 self.key_phase,
5512 self.prev_crypto.as_ref(),
5513 self.next_crypto.as_ref(),
5514 )
5515 .ok()?;
5516
5517 Some(packet.payload.to_vec())
5518 }
5519
5520 #[cfg(test)]
5523 pub(crate) fn bytes_in_flight(&self) -> u64 {
5524 self.path.in_flight.bytes
5525 }
5526
5527 #[cfg(test)]
5529 pub(crate) fn congestion_window(&self) -> u64 {
5530 self.path
5531 .congestion
5532 .window()
5533 .saturating_sub(self.path.in_flight.bytes)
5534 }
5535
5536 #[cfg(test)]
5538 pub(crate) fn is_idle(&self) -> bool {
5539 Timer::VALUES
5540 .iter()
5541 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
5542 .filter_map(|&t| Some((t, self.timers.get(t)?)))
5543 .min_by_key(|&(_, time)| time)
5544 .is_none_or(|(timer, _)| timer == Timer::Idle)
5545 }
5546
5547 #[cfg(test)]
5549 pub(crate) fn lost_packets(&self) -> u64 {
5550 self.lost_packets
5551 }
5552
5553 #[cfg(test)]
5555 pub(crate) fn using_ecn(&self) -> bool {
5556 self.path.sending_ecn
5557 }
5558
5559 #[cfg(test)]
5561 pub(crate) fn total_recvd(&self) -> u64 {
5562 self.path.total_recvd
5563 }
5564
5565 #[cfg(test)]
5566 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
5567 self.local_cid_state.active_seq()
5568 }
5569
5570 #[cfg(test)]
5573 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
5574 let n = self.local_cid_state.assign_retire_seq(v);
5575 self.endpoint_events
5576 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
5577 }
5578
5579 #[cfg(test)]
5581 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
5582 self.rem_cids.active_seq()
5583 }
5584
5585 #[cfg(test)]
5587 pub(crate) fn path_mtu(&self) -> u16 {
5588 self.path.current_mtu()
5589 }
5590
5591 fn can_send_1rtt(&self, max_size: usize) -> bool {
5595 self.streams.can_send_stream_data()
5596 || self.path.challenge_pending
5597 || self
5598 .prev_path
5599 .as_ref()
5600 .is_some_and(|(_, x)| x.challenge_pending)
5601 || !self.path_responses.is_empty()
5602 || !self.nat_traversal_challenges.is_empty()
5603 || self
5604 .datagrams
5605 .outgoing
5606 .front()
5607 .is_some_and(|x| x.size(true) <= max_size)
5608 }
5609
5610 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
5612 for path in [&mut self.path]
5614 .into_iter()
5615 .chain(self.prev_path.as_mut().map(|(_, data)| data))
5616 {
5617 if path.remove_in_flight(pn, packet) {
5618 return;
5619 }
5620 }
5621 }
5622
5623 fn kill(&mut self, reason: ConnectionError) {
5625 self.close_common();
5626 self.error = Some(reason);
5627 self.state = State::Drained;
5628 self.endpoint_events.push_back(EndpointEventInner::Drained);
5629 }
5630
5631 fn generate_nat_traversal_challenges(&mut self, now: Instant) {
5633 let candidates: Vec<(VarInt, SocketAddr)> = if let Some(nat_state) = &self.nat_traversal {
5635 nat_state
5636 .get_validation_candidates()
5637 .into_iter()
5638 .take(3) .map(|(seq, candidate)| (seq, candidate.address))
5640 .collect()
5641 } else {
5642 return;
5643 };
5644
5645 if candidates.is_empty() {
5646 return;
5647 }
5648
5649 if let Some(nat_state) = &mut self.nat_traversal {
5651 for (seq, address) in candidates {
5652 let challenge: u64 = self.rng.r#gen();
5654
5655 if let Err(e) = nat_state.start_validation(seq, challenge, now) {
5657 debug!("Failed to start validation for candidate {}: {}", seq, e);
5658 continue;
5659 }
5660
5661 self.nat_traversal_challenges.push(address, challenge);
5663 trace!(
5664 "Queuing NAT validation PATH_CHALLENGE for {} with token {:08x}",
5665 address, challenge
5666 );
5667 }
5668 }
5669 }
5670
5671 pub fn current_mtu(&self) -> u16 {
5675 self.path.current_mtu()
5676 }
5677
5678 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
5685 let pn_len = match pn {
5686 Some(pn) => PacketNumber::new(
5687 pn,
5688 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
5689 )
5690 .len(),
5691 None => 4,
5693 };
5694
5695 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
5697 }
5698
5699 fn tag_len_1rtt(&self) -> usize {
5700 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
5701 Some(crypto) => Some(&*crypto.packet.local),
5702 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
5703 };
5704 key.map_or(16, |x| x.tag_len())
5708 }
5709
5710 fn on_path_validated(&mut self) {
5712 self.path.validated = true;
5713 let ConnectionSide::Server { server_config } = &self.side else {
5714 return;
5715 };
5716 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
5717 new_tokens.clear();
5718 for _ in 0..server_config.validation_token.sent {
5719 new_tokens.push(self.path.remote);
5720 }
5721 }
5722}
5723
5724impl fmt::Debug for Connection {
5725 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5726 f.debug_struct("Connection")
5727 .field("handshake_cid", &self.handshake_cid)
5728 .finish()
5729 }
5730}
5731
5732enum ConnectionSide {
5734 Client {
5735 token: Bytes,
5737 token_store: Arc<dyn TokenStore>,
5738 server_name: String,
5739 },
5740 Server {
5741 server_config: Arc<ServerConfig>,
5742 },
5743}
5744
5745impl ConnectionSide {
5746 fn remote_may_migrate(&self) -> bool {
5747 match self {
5748 Self::Server { server_config } => server_config.migration,
5749 Self::Client { .. } => false,
5750 }
5751 }
5752
5753 fn is_client(&self) -> bool {
5754 self.side().is_client()
5755 }
5756
5757 fn is_server(&self) -> bool {
5758 self.side().is_server()
5759 }
5760
5761 fn side(&self) -> Side {
5762 match *self {
5763 Self::Client { .. } => Side::Client,
5764 Self::Server { .. } => Side::Server,
5765 }
5766 }
5767}
5768
5769impl From<SideArgs> for ConnectionSide {
5770 fn from(side: SideArgs) -> Self {
5771 match side {
5772 SideArgs::Client {
5773 token_store,
5774 server_name,
5775 } => Self::Client {
5776 token: token_store.take(&server_name).unwrap_or_default(),
5777 token_store,
5778 server_name,
5779 },
5780 SideArgs::Server {
5781 server_config,
5782 pref_addr_cid: _,
5783 path_validated: _,
5784 } => Self::Server { server_config },
5785 }
5786 }
5787}
5788
5789pub(crate) enum SideArgs {
5791 Client {
5792 token_store: Arc<dyn TokenStore>,
5793 server_name: String,
5794 },
5795 Server {
5796 server_config: Arc<ServerConfig>,
5797 pref_addr_cid: Option<ConnectionId>,
5798 path_validated: bool,
5799 },
5800}
5801
5802impl SideArgs {
5803 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
5804 match *self {
5805 Self::Client { .. } => None,
5806 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
5807 }
5808 }
5809
5810 pub(crate) fn path_validated(&self) -> bool {
5811 match *self {
5812 Self::Client { .. } => true,
5813 Self::Server { path_validated, .. } => path_validated,
5814 }
5815 }
5816
5817 pub(crate) fn side(&self) -> Side {
5818 match *self {
5819 Self::Client { .. } => Side::Client,
5820 Self::Server { .. } => Side::Server,
5821 }
5822 }
5823}
5824
5825#[derive(Debug, Error, Clone, PartialEq, Eq)]
5827pub enum ConnectionError {
5828 #[error("peer doesn't implement any supported version")]
5830 VersionMismatch,
5831 #[error(transparent)]
5833 TransportError(#[from] TransportError),
5834 #[error("aborted by peer: {0}")]
5836 ConnectionClosed(frame::ConnectionClose),
5837 #[error("closed by peer: {0}")]
5839 ApplicationClosed(frame::ApplicationClose),
5840 #[error("reset by peer")]
5842 Reset,
5843 #[error("timed out")]
5849 TimedOut,
5850 #[error("closed")]
5852 LocallyClosed,
5853 #[error("CIDs exhausted")]
5857 CidsExhausted,
5858}
5859
5860impl From<Close> for ConnectionError {
5861 fn from(x: Close) -> Self {
5862 match x {
5863 Close::Connection(reason) => Self::ConnectionClosed(reason),
5864 Close::Application(reason) => Self::ApplicationClosed(reason),
5865 }
5866 }
5867}
5868
5869impl From<ConnectionError> for io::Error {
5871 fn from(x: ConnectionError) -> Self {
5872 use ConnectionError::*;
5873 let kind = match x {
5874 TimedOut => io::ErrorKind::TimedOut,
5875 Reset => io::ErrorKind::ConnectionReset,
5876 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
5877 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
5878 io::ErrorKind::Other
5879 }
5880 };
5881 Self::new(kind, x)
5882 }
5883}
5884
5885#[derive(Clone, Debug)]
5886pub enum State {
5888 Handshake(state::Handshake),
5890 Established,
5892 Closed(state::Closed),
5894 Draining,
5896 Drained,
5898}
5899
5900impl State {
5901 fn closed<R: Into<Close>>(reason: R) -> Self {
5902 Self::Closed(state::Closed {
5903 reason: reason.into(),
5904 })
5905 }
5906
5907 fn is_handshake(&self) -> bool {
5908 matches!(*self, Self::Handshake(_))
5909 }
5910
5911 fn is_established(&self) -> bool {
5912 matches!(*self, Self::Established)
5913 }
5914
5915 fn is_closed(&self) -> bool {
5916 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
5917 }
5918
5919 fn is_drained(&self) -> bool {
5920 matches!(*self, Self::Drained)
5921 }
5922}
5923
5924mod state {
5925 use super::*;
5926
5927 #[derive(Clone, Debug)]
5928 pub struct Handshake {
5929 pub(super) rem_cid_set: bool,
5933 pub(super) expected_token: Bytes,
5937 pub(super) client_hello: Option<Bytes>,
5941 }
5942
5943 #[derive(Clone, Debug)]
5944 pub struct Closed {
5945 pub(super) reason: Close,
5946 }
5947}
5948
5949#[derive(Debug)]
5951pub enum Event {
5952 HandshakeDataReady,
5954 Connected,
5956 ConnectionLost {
5960 reason: ConnectionError,
5962 },
5963 Stream(StreamEvent),
5965 DatagramReceived,
5967 DatagramsUnblocked,
5969}
5970
5971fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
5972 if x > y { x - y } else { Duration::ZERO }
5973}
5974
5975fn get_max_ack_delay(params: &TransportParameters) -> Duration {
5976 Duration::from_micros(params.max_ack_delay.0 * 1000)
5977}
5978
5979const MAX_BACKOFF_EXPONENT: u32 = 16;
5981
5982const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
5990
5991const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
5997 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
5998
5999const KEY_UPDATE_MARGIN: u64 = 10_000;
6003
6004#[derive(Default)]
6005struct SentFrames {
6006 retransmits: ThinRetransmits,
6007 largest_acked: Option<u64>,
6008 stream_frames: StreamMetaVec,
6009 non_retransmits: bool,
6011 requires_padding: bool,
6012}
6013
6014impl SentFrames {
6015 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6017 self.largest_acked.is_some()
6018 && !self.non_retransmits
6019 && self.stream_frames.is_empty()
6020 && self.retransmits.is_empty(streams)
6021 }
6022}
6023
6024fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6032 match (x, y) {
6033 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6034 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6035 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6036 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6037 }
6038}
6039
6040#[derive(Debug, Clone)]
6042pub(crate) struct PqcState {
6043 enabled: bool,
6045 algorithms: Option<crate::transport_parameters::PqcAlgorithms>,
6047 handshake_mtu: u16,
6049 using_pqc: bool,
6051 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler,
6053}
6054
6055impl PqcState {
6056 fn new() -> Self {
6057 Self {
6058 enabled: false,
6059 algorithms: None,
6060 handshake_mtu: MIN_INITIAL_SIZE,
6061 using_pqc: false,
6062 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler::new(),
6063 }
6064 }
6065
6066 fn min_initial_size(&self) -> u16 {
6068 if self.enabled && self.using_pqc {
6069 std::cmp::max(self.handshake_mtu, 4096)
6071 } else {
6072 MIN_INITIAL_SIZE
6073 }
6074 }
6075
6076 fn update_from_peer_params(&mut self, params: &TransportParameters) {
6078 if let Some(ref algorithms) = params.pqc_algorithms {
6079 self.enabled = true;
6080 self.algorithms = Some(algorithms.clone());
6081 if algorithms.ml_kem_768
6083 || algorithms.ml_dsa_65
6084 || algorithms.hybrid_x25519_ml_kem
6085 || algorithms.hybrid_ed25519_ml_dsa
6086 {
6087 self.using_pqc = true;
6088 self.handshake_mtu = 4096; }
6090 }
6091 }
6092
6093 fn detect_pqc_from_crypto(&mut self, crypto_data: &[u8], space: SpaceId) {
6095 if self.packet_handler.detect_pqc_handshake(crypto_data, space) {
6096 self.using_pqc = true;
6097 self.handshake_mtu = self.packet_handler.get_min_packet_size(space);
6099 }
6100 }
6101
6102 fn should_trigger_mtu_discovery(&mut self) -> bool {
6104 self.packet_handler.should_trigger_mtu_discovery()
6105 }
6106
6107 fn get_mtu_config(&self) -> MtuDiscoveryConfig {
6109 self.packet_handler.get_pqc_mtu_config()
6110 }
6111
6112 fn calculate_crypto_frame_size(&self, available_space: usize, remaining_data: usize) -> usize {
6114 self.packet_handler
6115 .calculate_crypto_frame_size(available_space, remaining_data)
6116 }
6117
6118 fn should_adjust_coalescing(&self, current_size: usize, space: SpaceId) -> bool {
6120 self.packet_handler
6121 .adjust_coalescing_for_pqc(current_size, space)
6122 }
6123
6124 fn on_packet_sent(&mut self, space: SpaceId, size: u16) {
6126 self.packet_handler.on_packet_sent(space, size);
6127 }
6128
6129 fn reset(&mut self) {
6131 self.enabled = false;
6132 self.algorithms = None;
6133 self.handshake_mtu = MIN_INITIAL_SIZE;
6134 self.using_pqc = false;
6135 self.packet_handler.reset();
6136 }
6137}
6138
6139impl Default for PqcState {
6140 fn default() -> Self {
6141 Self::new()
6142 }
6143}
6144
6145#[derive(Debug, Clone)]
6147pub(crate) struct AddressDiscoveryState {
6148 enabled: bool,
6150 max_observation_rate: u8,
6152 observe_all_paths: bool,
6154 path_addresses: std::collections::HashMap<u64, paths::PathAddressInfo>,
6156 rate_limiter: AddressObservationRateLimiter,
6158 observed_addresses: Vec<ObservedAddressEvent>,
6160 bootstrap_mode: bool,
6162 next_sequence_number: VarInt,
6164 last_received_sequence: std::collections::HashMap<u64, VarInt>,
6166}
6167
6168#[derive(Debug, Clone, PartialEq, Eq)]
6170struct ObservedAddressEvent {
6171 address: SocketAddr,
6173 received_at: Instant,
6175 path_id: u64,
6177}
6178
6179#[derive(Debug, Clone)]
6181struct AddressObservationRateLimiter {
6182 tokens: f64,
6184 max_tokens: f64,
6186 rate: f64,
6188 last_update: Instant,
6190}
6191
6192impl AddressDiscoveryState {
6193 fn new(config: &crate::transport_parameters::AddressDiscoveryConfig, now: Instant) -> Self {
6195 use crate::transport_parameters::AddressDiscoveryConfig::*;
6196
6197 let (enabled, _can_send, _can_receive) = match config {
6199 SendOnly => (true, true, false),
6200 ReceiveOnly => (true, false, true),
6201 SendAndReceive => (true, true, true),
6202 };
6203
6204 let max_observation_rate = 10u8; let observe_all_paths = false; Self {
6210 enabled,
6211 max_observation_rate,
6212 observe_all_paths,
6213 path_addresses: std::collections::HashMap::new(),
6214 rate_limiter: AddressObservationRateLimiter::new(max_observation_rate, now),
6215 observed_addresses: Vec::new(),
6216 bootstrap_mode: false,
6217 next_sequence_number: VarInt::from_u32(0),
6218 last_received_sequence: std::collections::HashMap::new(),
6219 }
6220 }
6221
6222 fn should_send_observation(&mut self, path_id: u64, now: Instant) -> bool {
6224 if !self.should_observe_path(path_id) {
6226 return false;
6227 }
6228
6229 let needs_observation = match self.path_addresses.get(&path_id) {
6231 Some(info) => info.observed_address.is_none() || !info.notified,
6232 None => true,
6233 };
6234
6235 if !needs_observation {
6236 return false;
6237 }
6238
6239 self.rate_limiter.try_consume(1.0, now)
6241 }
6242
6243 fn record_observation_sent(&mut self, path_id: u64) {
6245 if let Some(info) = self.path_addresses.get_mut(&path_id) {
6246 info.mark_notified();
6247 }
6248 }
6249
6250 fn handle_observed_address(&mut self, address: SocketAddr, path_id: u64, now: Instant) {
6252 if !self.enabled {
6253 return;
6254 }
6255
6256 self.observed_addresses.push(ObservedAddressEvent {
6257 address,
6258 received_at: now,
6259 path_id,
6260 });
6261
6262 let info = self
6264 .path_addresses
6265 .entry(path_id)
6266 .or_insert_with(paths::PathAddressInfo::new);
6267 info.update_observed_address(address, now);
6268 }
6269
6270 pub(crate) fn get_observed_address(&self, path_id: u64) -> Option<SocketAddr> {
6272 self.path_addresses
6273 .get(&path_id)
6274 .and_then(|info| info.observed_address)
6275 }
6276
6277 pub(crate) fn get_all_observed_addresses(&self) -> Vec<SocketAddr> {
6279 self.path_addresses
6280 .values()
6281 .filter_map(|info| info.observed_address)
6282 .collect()
6283 }
6284
6285 pub(crate) fn stats(&self) -> AddressDiscoveryStats {
6287 AddressDiscoveryStats {
6288 frames_sent: self.observed_addresses.len() as u64, frames_received: self.observed_addresses.len() as u64,
6290 addresses_discovered: self
6291 .path_addresses
6292 .values()
6293 .filter(|info| info.observed_address.is_some())
6294 .count() as u64,
6295 address_changes_detected: 0, }
6297 }
6298
6299 fn has_unnotified_changes(&self) -> bool {
6301 self.path_addresses
6302 .values()
6303 .any(|info| info.observed_address.is_some() && !info.notified)
6304 }
6305
6306 fn queue_observed_address_frame(
6308 &mut self,
6309 path_id: u64,
6310 address: SocketAddr,
6311 ) -> Option<frame::ObservedAddress> {
6312 if !self.enabled {
6314 return None;
6315 }
6316
6317 if !self.observe_all_paths && path_id != 0 {
6319 return None;
6320 }
6321
6322 if let Some(info) = self.path_addresses.get(&path_id) {
6324 if info.notified {
6325 return None;
6326 }
6327 }
6328
6329 if self.rate_limiter.tokens < 1.0 {
6331 return None;
6332 }
6333
6334 self.rate_limiter.tokens -= 1.0;
6336
6337 let info = self
6339 .path_addresses
6340 .entry(path_id)
6341 .or_insert_with(paths::PathAddressInfo::new);
6342 info.observed_address = Some(address);
6343 info.notified = true;
6344
6345 let sequence_number = self.next_sequence_number;
6347 self.next_sequence_number = VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6348 .expect("sequence number overflow");
6349
6350 Some(frame::ObservedAddress {
6351 sequence_number,
6352 address,
6353 })
6354 }
6355
6356 fn check_for_address_observations(
6358 &mut self,
6359 _current_path: u64,
6360 peer_supports_address_discovery: bool,
6361 now: Instant,
6362 ) -> Vec<frame::ObservedAddress> {
6363 let mut frames = Vec::new();
6364
6365 if !self.enabled || !peer_supports_address_discovery {
6367 return frames;
6368 }
6369
6370 self.rate_limiter.update_tokens(now);
6372
6373 let paths_to_notify: Vec<u64> = self
6375 .path_addresses
6376 .iter()
6377 .filter_map(|(&path_id, info)| {
6378 if info.observed_address.is_some() && !info.notified {
6379 Some(path_id)
6380 } else {
6381 None
6382 }
6383 })
6384 .collect();
6385
6386 for path_id in paths_to_notify {
6388 if !self.should_observe_path(path_id) {
6390 continue;
6391 }
6392
6393 if !self.bootstrap_mode && self.rate_limiter.tokens < 1.0 {
6395 break; }
6397
6398 if let Some(info) = self.path_addresses.get_mut(&path_id) {
6400 if let Some(address) = info.observed_address {
6401 if self.bootstrap_mode {
6403 self.rate_limiter.tokens -= 0.2; } else {
6405 self.rate_limiter.tokens -= 1.0;
6406 }
6407
6408 info.notified = true;
6410
6411 let sequence_number = self.next_sequence_number;
6413 self.next_sequence_number =
6414 VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6415 .expect("sequence number overflow");
6416
6417 frames.push(frame::ObservedAddress {
6418 sequence_number,
6419 address,
6420 });
6421 }
6422 }
6423 }
6424
6425 frames
6426 }
6427
6428 fn update_rate_limit(&mut self, new_rate: f64) {
6430 self.max_observation_rate = new_rate as u8;
6431 self.rate_limiter.set_rate(new_rate as u8);
6432 }
6433
6434 fn from_transport_params(params: &TransportParameters) -> Option<Self> {
6436 params
6437 .address_discovery
6438 .as_ref()
6439 .map(|config| Self::new(config, Instant::now()))
6440 }
6441
6442 #[cfg(test)]
6444 fn new_with_params(enabled: bool, max_rate: f64, observe_all_paths: bool) -> Self {
6445 if !enabled {
6447 return Self {
6449 enabled: false,
6450 max_observation_rate: max_rate as u8,
6451 observe_all_paths,
6452 path_addresses: std::collections::HashMap::new(),
6453 rate_limiter: AddressObservationRateLimiter::new(max_rate as u8, Instant::now()),
6454 observed_addresses: Vec::new(),
6455 bootstrap_mode: false,
6456 next_sequence_number: VarInt::from_u32(0),
6457 last_received_sequence: std::collections::HashMap::new(),
6458 };
6459 }
6460
6461 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6463 let mut state = Self::new(&config, Instant::now());
6464 state.max_observation_rate = max_rate as u8;
6465 state.observe_all_paths = observe_all_paths;
6466 state.rate_limiter = AddressObservationRateLimiter::new(max_rate as u8, Instant::now());
6467 state
6468 }
6469
6470 fn set_bootstrap_mode(&mut self, enabled: bool) {
6472 self.bootstrap_mode = enabled;
6473 if enabled {
6475 let bootstrap_rate = self.get_effective_rate_limit();
6476 self.rate_limiter.rate = bootstrap_rate;
6477 self.rate_limiter.max_tokens = bootstrap_rate * 2.0; self.rate_limiter.tokens = self.rate_limiter.max_tokens;
6480 }
6481 }
6482
6483 fn is_bootstrap_mode(&self) -> bool {
6485 self.bootstrap_mode
6486 }
6487
6488 fn get_effective_rate_limit(&self) -> f64 {
6490 if self.bootstrap_mode {
6491 (self.max_observation_rate as f64) * 5.0
6493 } else {
6494 self.max_observation_rate as f64
6495 }
6496 }
6497
6498 fn should_observe_path(&self, path_id: u64) -> bool {
6500 if !self.enabled {
6501 return false;
6502 }
6503
6504 if self.bootstrap_mode {
6506 return true;
6507 }
6508
6509 self.observe_all_paths || path_id == 0
6511 }
6512
6513 fn should_send_observation_immediately(&self, is_new_connection: bool) -> bool {
6515 self.bootstrap_mode && is_new_connection
6516 }
6517}
6518
6519impl AddressObservationRateLimiter {
6520 fn new(rate: u8, now: Instant) -> Self {
6522 let rate_f64 = rate as f64;
6523 Self {
6524 tokens: rate_f64,
6525 max_tokens: rate_f64,
6526 rate: rate_f64,
6527 last_update: now,
6528 }
6529 }
6530
6531 fn try_consume(&mut self, tokens: f64, now: Instant) -> bool {
6533 self.update_tokens(now);
6534
6535 if self.tokens >= tokens {
6536 self.tokens -= tokens;
6537 true
6538 } else {
6539 false
6540 }
6541 }
6542
6543 fn update_tokens(&mut self, now: Instant) {
6545 let elapsed = now.saturating_duration_since(self.last_update);
6546 let new_tokens = elapsed.as_secs_f64() * self.rate;
6547 self.tokens = (self.tokens + new_tokens).min(self.max_tokens);
6548 self.last_update = now;
6549 }
6550
6551 fn set_rate(&mut self, rate: u8) {
6553 let rate_f64 = rate as f64;
6554 self.rate = rate_f64;
6555 self.max_tokens = rate_f64;
6556 if self.tokens > self.max_tokens {
6558 self.tokens = self.max_tokens;
6559 }
6560 }
6561}
6562
6563#[cfg(test)]
6564mod tests {
6565 use super::*;
6566 use crate::transport_parameters::AddressDiscoveryConfig;
6567 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
6568
6569 #[test]
6570 fn address_discovery_state_new() {
6571 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6572 let now = Instant::now();
6573 let state = AddressDiscoveryState::new(&config, now);
6574
6575 assert!(state.enabled);
6576 assert_eq!(state.max_observation_rate, 10);
6577 assert!(!state.observe_all_paths);
6578 assert!(state.path_addresses.is_empty());
6579 assert!(state.observed_addresses.is_empty());
6580 assert_eq!(state.rate_limiter.tokens, 10.0);
6581 }
6582
6583 #[test]
6584 fn address_discovery_state_disabled() {
6585 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6586 let now = Instant::now();
6587 let mut state = AddressDiscoveryState::new(&config, now);
6588
6589 state.enabled = false;
6591
6592 assert!(!state.should_send_observation(0, now));
6594 }
6595
6596 #[test]
6597 fn address_discovery_state_should_send_observation() {
6598 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6599 let now = Instant::now();
6600 let mut state = AddressDiscoveryState::new(&config, now);
6601
6602 assert!(state.should_send_observation(0, now));
6604
6605 let mut path_info = paths::PathAddressInfo::new();
6607 path_info.update_observed_address(
6608 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6609 now,
6610 );
6611 path_info.mark_notified();
6612 state.path_addresses.insert(0, path_info);
6613
6614 assert!(!state.should_send_observation(0, now));
6616
6617 assert!(!state.should_send_observation(1, now));
6619 }
6620
6621 #[test]
6622 fn address_discovery_state_rate_limiting() {
6623 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6624 let now = Instant::now();
6625 let mut state = AddressDiscoveryState::new(&config, now);
6626
6627 state.observe_all_paths = true;
6629
6630 assert!(state.should_send_observation(0, now));
6632
6633 state.rate_limiter.try_consume(9.0, now); assert!(!state.should_send_observation(0, now));
6638
6639 let later = now + Duration::from_secs(1);
6641 state.rate_limiter.update_tokens(later);
6642 assert!(state.should_send_observation(0, later));
6643 }
6644
6645 #[test]
6646 fn address_discovery_state_handle_observed_address() {
6647 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6648 let now = Instant::now();
6649 let mut state = AddressDiscoveryState::new(&config, now);
6650
6651 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6652 let addr2 = SocketAddr::new(
6653 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
6654 8080,
6655 );
6656
6657 state.handle_observed_address(addr1, 0, now);
6659 assert_eq!(state.observed_addresses.len(), 1);
6660 assert_eq!(state.observed_addresses[0].address, addr1);
6661 assert_eq!(state.observed_addresses[0].path_id, 0);
6662
6663 let later = now + Duration::from_millis(100);
6665 state.handle_observed_address(addr2, 1, later);
6666 assert_eq!(state.observed_addresses.len(), 2);
6667 assert_eq!(state.observed_addresses[1].address, addr2);
6668 assert_eq!(state.observed_addresses[1].path_id, 1);
6669 }
6670
6671 #[test]
6672 fn address_discovery_state_get_observed_address() {
6673 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6674 let now = Instant::now();
6675 let mut state = AddressDiscoveryState::new(&config, now);
6676
6677 assert_eq!(state.get_observed_address(0), None);
6679
6680 let mut path_info = paths::PathAddressInfo::new();
6682 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80);
6683 path_info.update_observed_address(addr, now);
6684 state.path_addresses.insert(0, path_info);
6685
6686 assert_eq!(state.get_observed_address(0), Some(addr));
6688 assert_eq!(state.get_observed_address(1), None);
6689 }
6690
6691 #[test]
6692 fn address_discovery_state_unnotified_changes() {
6693 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6694 let now = Instant::now();
6695 let mut state = AddressDiscoveryState::new(&config, now);
6696
6697 assert!(!state.has_unnotified_changes());
6699
6700 let mut path_info = paths::PathAddressInfo::new();
6702 path_info.update_observed_address(
6703 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6704 now,
6705 );
6706 state.path_addresses.insert(0, path_info);
6707
6708 assert!(state.has_unnotified_changes());
6710
6711 state.record_observation_sent(0);
6713 assert!(!state.has_unnotified_changes());
6714 }
6715
6716 #[test]
6717 fn address_observation_rate_limiter_token_bucket() {
6718 let now = Instant::now();
6719 let mut limiter = AddressObservationRateLimiter::new(5, now); assert_eq!(limiter.tokens, 5.0);
6723 assert_eq!(limiter.max_tokens, 5.0);
6724 assert_eq!(limiter.rate, 5.0);
6725
6726 assert!(limiter.try_consume(3.0, now));
6728 assert_eq!(limiter.tokens, 2.0);
6729
6730 assert!(!limiter.try_consume(3.0, now));
6732 assert_eq!(limiter.tokens, 2.0);
6733
6734 let later = now + Duration::from_secs(1);
6736 limiter.update_tokens(later);
6737 assert_eq!(limiter.tokens, 5.0); let half_sec = now + Duration::from_millis(500);
6741 let mut limiter2 = AddressObservationRateLimiter::new(5, now);
6742 limiter2.try_consume(3.0, now);
6743 limiter2.update_tokens(half_sec);
6744 assert_eq!(limiter2.tokens, 4.5); }
6746
6747 #[test]
6749 fn connection_initializes_address_discovery_state_default() {
6750 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6753 let state = AddressDiscoveryState::new(&config, Instant::now());
6754 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
6757 }
6758
6759 #[test]
6760 fn connection_initializes_with_address_discovery_enabled() {
6761 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6763 let state = AddressDiscoveryState::new(&config, Instant::now());
6764 assert!(state.enabled);
6765 assert_eq!(state.max_observation_rate, 10);
6766 assert!(!state.observe_all_paths);
6767 }
6768
6769 #[test]
6770 fn connection_address_discovery_enabled_by_default() {
6771 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6773 let state = AddressDiscoveryState::new(&config, Instant::now());
6774 assert!(state.enabled); }
6776
6777 #[test]
6778 fn negotiate_max_idle_timeout_commutative() {
6779 let test_params = [
6780 (None, None, None),
6781 (None, Some(VarInt(0)), None),
6782 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6783 (Some(VarInt(0)), Some(VarInt(0)), None),
6784 (
6785 Some(VarInt(2)),
6786 Some(VarInt(0)),
6787 Some(Duration::from_millis(2)),
6788 ),
6789 (
6790 Some(VarInt(1)),
6791 Some(VarInt(4)),
6792 Some(Duration::from_millis(1)),
6793 ),
6794 ];
6795
6796 for (left, right, result) in test_params {
6797 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6798 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6799 }
6800 }
6801
6802 #[test]
6803 fn path_creation_initializes_address_discovery() {
6804 let config = TransportConfig::default();
6805 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6806 let now = Instant::now();
6807
6808 let path = paths::PathData::new(remote, false, None, now, &config);
6810
6811 assert!(path.address_info.observed_address.is_none());
6813 assert!(path.address_info.last_observed.is_none());
6814 assert_eq!(path.address_info.observation_count, 0);
6815 assert!(!path.address_info.notified);
6816
6817 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6819 assert_eq!(path.observation_rate_limiter.max_tokens, 10.0);
6820 assert_eq!(path.observation_rate_limiter.tokens, 10.0);
6821 }
6822
6823 #[test]
6824 fn path_migration_resets_address_discovery() {
6825 let config = TransportConfig::default();
6826 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6827 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6828 let now = Instant::now();
6829
6830 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
6832 path1.update_observed_address(remote1, now);
6833 path1.mark_address_notified();
6834 path1.consume_observation_token(now);
6835 path1.set_observation_rate(20);
6836
6837 let path2 = paths::PathData::from_previous(remote2, &path1, now);
6839
6840 assert!(path2.address_info.observed_address.is_none());
6842 assert!(path2.address_info.last_observed.is_none());
6843 assert_eq!(path2.address_info.observation_count, 0);
6844 assert!(!path2.address_info.notified);
6845
6846 assert_eq!(path2.observation_rate_limiter.rate, 20.0);
6848 assert_eq!(path2.observation_rate_limiter.tokens, 20.0);
6849 }
6850
6851 #[test]
6852 fn connection_path_updates_observation_rate() {
6853 let config = TransportConfig::default();
6854 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 42);
6855 let now = Instant::now();
6856
6857 let mut path = paths::PathData::new(remote, false, None, now, &config);
6858
6859 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6861
6862 path.set_observation_rate(25);
6864 assert_eq!(path.observation_rate_limiter.rate, 25.0);
6865 assert_eq!(path.observation_rate_limiter.max_tokens, 25.0);
6866
6867 path.observation_rate_limiter.tokens = 30.0; path.set_observation_rate(20);
6870 assert_eq!(path.observation_rate_limiter.tokens, 20.0); }
6872
6873 #[test]
6874 fn path_validation_preserves_discovery_state() {
6875 let config = TransportConfig::default();
6876 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6877 let now = Instant::now();
6878
6879 let mut path = paths::PathData::new(remote, false, None, now, &config);
6880
6881 let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 5678);
6883 path.update_observed_address(observed, now);
6884 path.set_observation_rate(15);
6885
6886 path.validated = true;
6888
6889 assert_eq!(path.address_info.observed_address, Some(observed));
6891 assert_eq!(path.observation_rate_limiter.rate, 15.0);
6892 }
6893
6894 #[test]
6895 fn address_discovery_state_initialization() {
6896 let state = AddressDiscoveryState::new_with_params(true, 30.0, true);
6898
6899 assert!(state.enabled);
6900 assert_eq!(state.max_observation_rate, 30);
6901 assert!(state.observe_all_paths);
6902 assert!(state.path_addresses.is_empty());
6903 assert!(state.observed_addresses.is_empty());
6904 }
6905
6906 #[test]
6908 fn handle_observed_address_frame_basic() {
6909 let config = AddressDiscoveryConfig::SendAndReceive;
6910 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6911 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6912 let now = Instant::now();
6913 let path_id = 0;
6914
6915 state.handle_observed_address(addr, path_id, now);
6917
6918 assert_eq!(state.observed_addresses.len(), 1);
6920 assert_eq!(state.observed_addresses[0].address, addr);
6921 assert_eq!(state.observed_addresses[0].path_id, path_id);
6922 assert_eq!(state.observed_addresses[0].received_at, now);
6923
6924 assert!(state.path_addresses.contains_key(&path_id));
6926 let path_info = &state.path_addresses[&path_id];
6927 assert_eq!(path_info.observed_address, Some(addr));
6928 assert_eq!(path_info.last_observed, Some(now));
6929 assert_eq!(path_info.observation_count, 1);
6930 }
6931
6932 #[test]
6933 fn handle_observed_address_frame_multiple_observations() {
6934 let config = AddressDiscoveryConfig::SendAndReceive;
6935 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6936 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6937 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6938 let now = Instant::now();
6939 let path_id = 0;
6940
6941 state.handle_observed_address(addr1, path_id, now);
6943 state.handle_observed_address(addr1, path_id, now + Duration::from_secs(1));
6944 state.handle_observed_address(addr2, path_id, now + Duration::from_secs(2));
6945
6946 assert_eq!(state.observed_addresses.len(), 3);
6948
6949 let path_info = &state.path_addresses[&path_id];
6951 assert_eq!(path_info.observed_address, Some(addr2));
6952 assert_eq!(path_info.observation_count, 1); }
6954
6955 #[test]
6956 fn handle_observed_address_frame_disabled() {
6957 let config = AddressDiscoveryConfig::SendAndReceive;
6958 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6959 state.enabled = false; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6961 let now = Instant::now();
6962
6963 state.handle_observed_address(addr, 0, now);
6965
6966 assert!(state.observed_addresses.is_empty());
6968 assert!(state.path_addresses.is_empty());
6969 }
6970
6971 #[test]
6972 fn should_send_observation_basic() {
6973 let config = AddressDiscoveryConfig::SendAndReceive;
6974 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6975 state.max_observation_rate = 10;
6976 let now = Instant::now();
6977 let path_id = 0;
6978
6979 assert!(state.should_send_observation(path_id, now));
6981
6982 state.record_observation_sent(path_id);
6984
6985 assert!(state.should_send_observation(path_id, now));
6987 }
6988
6989 #[test]
6990 fn should_send_observation_rate_limiting() {
6991 let config = AddressDiscoveryConfig::SendAndReceive;
6992 let now = Instant::now();
6993 let mut state = AddressDiscoveryState::new(&config, now);
6994 state.max_observation_rate = 2; state.update_rate_limit(2.0);
6996 let path_id = 0;
6997
6998 assert!(state.should_send_observation(path_id, now));
7000 state.record_observation_sent(path_id);
7001 assert!(state.should_send_observation(path_id, now));
7002 state.record_observation_sent(path_id);
7003
7004 assert!(!state.should_send_observation(path_id, now));
7006
7007 let later = now + Duration::from_secs(1);
7009 assert!(state.should_send_observation(path_id, later));
7010 }
7011
7012 #[test]
7013 fn should_send_observation_disabled() {
7014 let config = AddressDiscoveryConfig::SendAndReceive;
7015 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7016 state.enabled = false;
7017
7018 assert!(!state.should_send_observation(0, Instant::now()));
7020 }
7021
7022 #[test]
7023 fn should_send_observation_per_path() {
7024 let config = AddressDiscoveryConfig::SendAndReceive;
7025 let now = Instant::now();
7026 let mut state = AddressDiscoveryState::new(&config, now);
7027 state.max_observation_rate = 2; state.observe_all_paths = true;
7029 state.update_rate_limit(2.0);
7030
7031 assert!(state.should_send_observation(0, now));
7033 state.record_observation_sent(0);
7034
7035 assert!(state.should_send_observation(1, now));
7037 state.record_observation_sent(1);
7038
7039 assert!(!state.should_send_observation(0, now));
7041 assert!(!state.should_send_observation(1, now));
7042
7043 let later = now + Duration::from_secs(1);
7045 assert!(state.should_send_observation(0, later));
7046 }
7047
7048 #[test]
7049 fn has_unnotified_changes_test() {
7050 let config = AddressDiscoveryConfig::SendAndReceive;
7051 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7052 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7053 let now = Instant::now();
7054
7055 assert!(!state.has_unnotified_changes());
7057
7058 state.handle_observed_address(addr, 0, now);
7060 assert!(state.has_unnotified_changes());
7061
7062 state.path_addresses.get_mut(&0).unwrap().notified = true;
7064 assert!(!state.has_unnotified_changes());
7065 }
7066
7067 #[test]
7068 fn get_observed_address_test() {
7069 let config = AddressDiscoveryConfig::SendAndReceive;
7070 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7071 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7072 let now = Instant::now();
7073 let path_id = 0;
7074
7075 assert_eq!(state.get_observed_address(path_id), None);
7077
7078 state.handle_observed_address(addr, path_id, now);
7080 assert_eq!(state.get_observed_address(path_id), Some(addr));
7081
7082 assert_eq!(state.get_observed_address(999), None);
7084 }
7085
7086 #[test]
7088 fn rate_limiter_token_bucket_basic() {
7089 let now = Instant::now();
7090 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(5.0, now));
7094 assert!(limiter.try_consume(5.0, now));
7095
7096 assert!(!limiter.try_consume(1.0, now));
7098 }
7099
7100 #[test]
7101 fn rate_limiter_token_replenishment() {
7102 let now = Instant::now();
7103 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(10.0, now));
7107 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7111 assert!(limiter.try_consume(10.0, later)); assert!(!limiter.try_consume(0.1, later)); let later = later + Duration::from_millis(500);
7116 assert!(limiter.try_consume(5.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7119
7120 #[test]
7121 fn rate_limiter_max_tokens_cap() {
7122 let now = Instant::now();
7123 let mut limiter = AddressObservationRateLimiter::new(10, now);
7124
7125 let later = now + Duration::from_secs(2);
7127 assert!(limiter.try_consume(10.0, later));
7129 assert!(!limiter.try_consume(10.1, later)); let later2 = later + Duration::from_secs(1);
7133 assert!(limiter.try_consume(3.0, later2));
7134
7135 let much_later = later2 + Duration::from_secs(2);
7137 assert!(limiter.try_consume(10.0, much_later)); assert!(!limiter.try_consume(0.1, much_later)); }
7140
7141 #[test]
7142 fn rate_limiter_fractional_consumption() {
7143 let now = Instant::now();
7144 let mut limiter = AddressObservationRateLimiter::new(10, now);
7145
7146 assert!(limiter.try_consume(0.5, now));
7148 assert!(limiter.try_consume(2.3, now));
7149 assert!(limiter.try_consume(7.2, now)); assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_millis(100); assert!(limiter.try_consume(1.0, later));
7155 assert!(!limiter.try_consume(0.1, later));
7156 }
7157
7158 #[test]
7159 fn rate_limiter_zero_rate() {
7160 let now = Instant::now();
7161 let mut limiter = AddressObservationRateLimiter::new(0, now); assert!(!limiter.try_consume(1.0, now));
7165 assert!(!limiter.try_consume(0.1, now));
7166 assert!(!limiter.try_consume(0.001, now));
7167
7168 let later = now + Duration::from_secs(10);
7170 assert!(!limiter.try_consume(0.001, later));
7171 }
7172
7173 #[test]
7174 fn rate_limiter_high_rate() {
7175 let now = Instant::now();
7176 let mut limiter = AddressObservationRateLimiter::new(63, now); assert!(limiter.try_consume(60.0, now));
7180 assert!(limiter.try_consume(3.0, now));
7181 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7185 assert!(limiter.try_consume(63.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7188
7189 #[test]
7190 fn rate_limiter_time_precision() {
7191 let now = Instant::now();
7192 let mut limiter = AddressObservationRateLimiter::new(100, now); assert!(limiter.try_consume(100.0, now));
7196 assert!(!limiter.try_consume(0.1, now));
7197
7198 let later = now + Duration::from_millis(10);
7200 assert!(limiter.try_consume(0.8, later)); assert!(!limiter.try_consume(0.5, later)); let much_later = later + Duration::from_millis(100); assert!(limiter.try_consume(5.0, much_later)); limiter.tokens = 0.0; let final_time = much_later + Duration::from_millis(1);
7212 limiter.update_tokens(final_time); assert!(limiter.tokens >= 0.09 && limiter.tokens <= 0.11);
7217 }
7218
7219 #[test]
7220 fn per_path_rate_limiting_independent() {
7221 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7222 let now = Instant::now();
7223 let mut state = AddressDiscoveryState::new(&config, now);
7224
7225 state.observe_all_paths = true;
7227
7228 state.update_rate_limit(5.0);
7230
7231 state
7233 .path_addresses
7234 .insert(0, paths::PathAddressInfo::new());
7235 state
7236 .path_addresses
7237 .insert(1, paths::PathAddressInfo::new());
7238 state
7239 .path_addresses
7240 .insert(2, paths::PathAddressInfo::new());
7241
7242 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7244 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7245 8080,
7246 ));
7247 state.path_addresses.get_mut(&1).unwrap().observed_address = Some(SocketAddr::new(
7248 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
7249 8081,
7250 ));
7251 state.path_addresses.get_mut(&2).unwrap().observed_address = Some(SocketAddr::new(
7252 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)),
7253 8082,
7254 ));
7255
7256 for _ in 0..3 {
7258 assert!(state.should_send_observation(0, now));
7259 state.record_observation_sent(0);
7260 state.path_addresses.get_mut(&0).unwrap().notified = false;
7262 }
7263
7264 for _ in 0..2 {
7266 assert!(state.should_send_observation(1, now));
7267 state.record_observation_sent(1);
7268 state.path_addresses.get_mut(&1).unwrap().notified = false;
7270 }
7271
7272 assert!(!state.should_send_observation(2, now));
7274
7275 let later = now + Duration::from_secs(1);
7277
7278 assert!(state.should_send_observation(0, later));
7280 assert!(state.should_send_observation(1, later));
7281 assert!(state.should_send_observation(2, later));
7282 }
7283
7284 #[test]
7285 fn per_path_rate_limiting_with_path_specific_limits() {
7286 let now = Instant::now();
7287 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7288 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7289 let config = TransportConfig::default();
7290
7291 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
7293 let mut path2 = paths::PathData::new(remote2, false, None, now, &config);
7294
7295 path1.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now); path2.observation_rate_limiter = paths::PathObservationRateLimiter::new(5, now); for _ in 0..10 {
7301 assert!(path1.observation_rate_limiter.can_send(now));
7302 path1.observation_rate_limiter.consume_token(now);
7303 }
7304 assert!(!path1.observation_rate_limiter.can_send(now));
7305
7306 for _ in 0..5 {
7308 assert!(path2.observation_rate_limiter.can_send(now));
7309 path2.observation_rate_limiter.consume_token(now);
7310 }
7311 assert!(!path2.observation_rate_limiter.can_send(now));
7312 }
7313
7314 #[test]
7315 fn per_path_rate_limiting_address_change_detection() {
7316 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7317 let now = Instant::now();
7318 let mut state = AddressDiscoveryState::new(&config, now);
7319
7320 let path_id = 0;
7322 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7323 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8080);
7324
7325 assert!(state.should_send_observation(path_id, now));
7327 state.handle_observed_address(addr1, path_id, now);
7328 state.record_observation_sent(path_id);
7329
7330 assert!(!state.should_send_observation(path_id, now));
7332
7333 state.handle_observed_address(addr2, path_id, now);
7335 if let Some(info) = state.path_addresses.get_mut(&path_id) {
7336 info.notified = false; }
7338
7339 assert!(state.should_send_observation(path_id, now));
7341 }
7342
7343 #[test]
7344 fn per_path_rate_limiting_migration() {
7345 let now = Instant::now();
7346 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7347 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7348 let config = TransportConfig::default();
7349
7350 let mut path = paths::PathData::new(remote1, false, None, now, &config);
7352 path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7353
7354 for _ in 0..5 {
7356 assert!(path.observation_rate_limiter.can_send(now));
7357 path.observation_rate_limiter.consume_token(now);
7358 }
7359
7360 let mut new_path = paths::PathData::new(remote2, false, None, now, &config);
7362
7363 new_path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7366
7367 for _ in 0..10 {
7369 assert!(new_path.observation_rate_limiter.can_send(now));
7370 new_path.observation_rate_limiter.consume_token(now);
7371 }
7372 assert!(!new_path.observation_rate_limiter.can_send(now));
7373 }
7374
7375 #[test]
7376 fn per_path_rate_limiting_disabled_paths() {
7377 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7378 let now = Instant::now();
7379 let mut state = AddressDiscoveryState::new(&config, now);
7380
7381 assert!(state.should_send_observation(0, now));
7383
7384 assert!(!state.should_send_observation(1, now));
7386 assert!(!state.should_send_observation(2, now));
7387
7388 let later = now + Duration::from_secs(1);
7390 assert!(!state.should_send_observation(1, later));
7391 }
7392
7393 #[test]
7394 fn respecting_negotiated_max_observation_rate_basic() {
7395 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7396 let now = Instant::now();
7397 let mut state = AddressDiscoveryState::new(&config, now);
7398
7399 state.max_observation_rate = 10; state.rate_limiter = AddressObservationRateLimiter::new(10, now);
7402
7403 for _ in 0..10 {
7405 assert!(state.should_send_observation(0, now));
7406 }
7407 assert!(!state.should_send_observation(0, now));
7409 }
7410
7411 #[test]
7412 fn respecting_negotiated_max_observation_rate_zero() {
7413 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7414 let now = Instant::now();
7415 let mut state = AddressDiscoveryState::new(&config, now);
7416
7417 state.max_observation_rate = 0;
7419 state.rate_limiter = AddressObservationRateLimiter::new(0, now);
7420
7421 assert!(!state.should_send_observation(0, now));
7423 assert!(!state.should_send_observation(1, now));
7424
7425 let later = now + Duration::from_secs(10);
7427 assert!(!state.should_send_observation(0, later));
7428 }
7429
7430 #[test]
7431 fn respecting_negotiated_max_observation_rate_higher() {
7432 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7433 let now = Instant::now();
7434 let mut state = AddressDiscoveryState::new(&config, now);
7435
7436 state
7438 .path_addresses
7439 .insert(0, paths::PathAddressInfo::new());
7440 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7441 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7442 8080,
7443 ));
7444
7445 state.update_rate_limit(5.0);
7447
7448 state.max_observation_rate = 20; for _ in 0..5 {
7453 assert!(state.should_send_observation(0, now));
7454 state.record_observation_sent(0);
7455 state.path_addresses.get_mut(&0).unwrap().notified = false;
7457 }
7458 assert!(!state.should_send_observation(0, now));
7460 }
7461
7462 #[test]
7463 fn respecting_negotiated_max_observation_rate_dynamic_update() {
7464 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7465 let now = Instant::now();
7466 let mut state = AddressDiscoveryState::new(&config, now);
7467
7468 state
7470 .path_addresses
7471 .insert(0, paths::PathAddressInfo::new());
7472 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7473 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7474 8080,
7475 ));
7476
7477 for _ in 0..5 {
7479 assert!(state.should_send_observation(0, now));
7480 state.record_observation_sent(0);
7481 state.path_addresses.get_mut(&0).unwrap().notified = false;
7483 }
7484
7485 state.max_observation_rate = 3;
7489 state.rate_limiter.set_rate(3);
7490
7491 for _ in 0..3 {
7494 assert!(state.should_send_observation(0, now));
7495 state.record_observation_sent(0);
7496 state.path_addresses.get_mut(&0).unwrap().notified = false;
7498 }
7499
7500 assert!(!state.should_send_observation(0, now));
7502
7503 let later = now + Duration::from_secs(1);
7505 for _ in 0..3 {
7506 assert!(state.should_send_observation(0, later));
7507 state.record_observation_sent(0);
7508 state.path_addresses.get_mut(&0).unwrap().notified = false;
7510 }
7511
7512 assert!(!state.should_send_observation(0, later));
7514 }
7515
7516 #[test]
7517 fn respecting_negotiated_max_observation_rate_with_paths() {
7518 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7519 let now = Instant::now();
7520 let mut state = AddressDiscoveryState::new(&config, now);
7521
7522 state.observe_all_paths = true;
7524
7525 for i in 0..3 {
7527 state
7528 .path_addresses
7529 .insert(i, paths::PathAddressInfo::new());
7530 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
7531 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100 + i as u8)),
7532 5000,
7533 ));
7534 }
7535
7536 for _ in 0..3 {
7539 for i in 0..3 {
7541 if state.should_send_observation(i, now) {
7542 state.record_observation_sent(i);
7543 state.path_addresses.get_mut(&i).unwrap().notified = false;
7545 }
7546 }
7547 }
7548
7549 assert!(state.should_send_observation(0, now));
7552 state.record_observation_sent(0);
7553
7554 assert!(!state.should_send_observation(0, now));
7556 assert!(!state.should_send_observation(1, now));
7557 assert!(!state.should_send_observation(2, now));
7558 }
7559
7560 #[test]
7561 fn queue_observed_address_frame_basic() {
7562 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7563 let now = Instant::now();
7564 let mut state = AddressDiscoveryState::new(&config, now);
7565
7566 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7568 let frame = state.queue_observed_address_frame(0, address);
7569
7570 assert!(frame.is_some());
7572 let frame = frame.unwrap();
7573 assert_eq!(frame.address, address);
7574
7575 assert!(state.path_addresses.contains_key(&0));
7577 assert!(state.path_addresses.get(&0).unwrap().notified);
7578 }
7579
7580 #[test]
7581 fn queue_observed_address_frame_rate_limited() {
7582 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7583 let now = Instant::now();
7584 let mut state = AddressDiscoveryState::new(&config, now);
7585
7586 state.observe_all_paths = true;
7588
7589 let mut addresses = Vec::new();
7591 for i in 0..10 {
7592 let addr = SocketAddr::new(
7593 IpAddr::V4(Ipv4Addr::new(192, 168, 1, i as u8)),
7594 5000 + i as u16,
7595 );
7596 addresses.push(addr);
7597 assert!(
7598 state.queue_observed_address_frame(i as u64, addr).is_some(),
7599 "Frame {} should be allowed",
7600 i + 1
7601 );
7602 }
7603
7604 let addr11 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 11)), 5011);
7606 assert!(
7607 state.queue_observed_address_frame(10, addr11).is_none(),
7608 "11th frame should be rate limited"
7609 );
7610 }
7611
7612 #[test]
7613 fn queue_observed_address_frame_disabled() {
7614 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7615 let now = Instant::now();
7616 let mut state = AddressDiscoveryState::new(&config, now);
7617
7618 state.enabled = false;
7620
7621 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7622
7623 assert!(state.queue_observed_address_frame(0, address).is_none());
7625 }
7626
7627 #[test]
7628 fn queue_observed_address_frame_already_notified() {
7629 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7630 let now = Instant::now();
7631 let mut state = AddressDiscoveryState::new(&config, now);
7632
7633 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7634
7635 assert!(state.queue_observed_address_frame(0, address).is_some());
7637
7638 assert!(state.queue_observed_address_frame(0, address).is_none());
7640
7641 let new_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7643 assert!(state.queue_observed_address_frame(0, new_address).is_none());
7644 }
7645
7646 #[test]
7647 fn queue_observed_address_frame_primary_path_only() {
7648 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7649 let now = Instant::now();
7650 let mut state = AddressDiscoveryState::new(&config, now);
7651
7652 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7653
7654 assert!(state.queue_observed_address_frame(0, address).is_some());
7656
7657 assert!(state.queue_observed_address_frame(1, address).is_none());
7659 assert!(state.queue_observed_address_frame(2, address).is_none());
7660 }
7661
7662 #[test]
7663 fn queue_observed_address_frame_updates_path_info() {
7664 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7665 let now = Instant::now();
7666 let mut state = AddressDiscoveryState::new(&config, now);
7667
7668 let address = SocketAddr::new(
7669 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
7670 5000,
7671 );
7672
7673 let frame = state.queue_observed_address_frame(0, address);
7675 assert!(frame.is_some());
7676
7677 let path_info = state.path_addresses.get(&0).unwrap();
7679 assert_eq!(path_info.observed_address, Some(address));
7680 assert!(path_info.notified);
7681
7682 assert_eq!(state.observed_addresses.len(), 0);
7685 }
7686
7687 #[test]
7688 fn retransmits_includes_observed_addresses() {
7689 use crate::connection::spaces::Retransmits;
7690
7691 let mut retransmits = Retransmits::default();
7693
7694 assert!(retransmits.observed_addresses.is_empty());
7696
7697 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7699 let frame = frame::ObservedAddress {
7700 sequence_number: VarInt::from_u32(1),
7701 address,
7702 };
7703 retransmits.observed_addresses.push(frame);
7704
7705 assert_eq!(retransmits.observed_addresses.len(), 1);
7707 assert_eq!(retransmits.observed_addresses[0].address, address);
7708 }
7709
7710 #[test]
7711 fn check_for_address_observations_no_peer_support() {
7712 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7713 let now = Instant::now();
7714 let mut state = AddressDiscoveryState::new(&config, now);
7715
7716 state
7718 .path_addresses
7719 .insert(0, paths::PathAddressInfo::new());
7720 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7721 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)),
7722 5000,
7723 ));
7724
7725 let frames = state.check_for_address_observations(0, false, now);
7727
7728 assert!(frames.is_empty());
7730 }
7731
7732 #[test]
7733 fn check_for_address_observations_with_peer_support() {
7734 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7735 let now = Instant::now();
7736 let mut state = AddressDiscoveryState::new(&config, now);
7737
7738 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7740 state
7741 .path_addresses
7742 .insert(0, paths::PathAddressInfo::new());
7743 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(address);
7744
7745 let frames = state.check_for_address_observations(0, true, now);
7747
7748 assert_eq!(frames.len(), 1);
7750 assert_eq!(frames[0].address, address);
7751
7752 assert!(state.path_addresses.get(&0).unwrap().notified);
7754 }
7755
7756 #[test]
7757 fn check_for_address_observations_rate_limited() {
7758 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7759 let now = Instant::now();
7760 let mut state = AddressDiscoveryState::new(&config, now);
7761
7762 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7764 state
7765 .path_addresses
7766 .insert(0, paths::PathAddressInfo::new());
7767 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(address);
7768
7769 for _ in 0..10 {
7771 let frames = state.check_for_address_observations(0, true, now);
7772 if frames.is_empty() {
7773 break;
7774 }
7775 state.path_addresses.get_mut(&0).unwrap().notified = false;
7777 }
7778
7779 assert_eq!(state.rate_limiter.tokens, 0.0);
7781
7782 state.path_addresses.get_mut(&0).unwrap().notified = false;
7784
7785 let frames2 = state.check_for_address_observations(0, true, now);
7787 assert_eq!(frames2.len(), 0);
7788
7789 state.path_addresses.get_mut(&0).unwrap().notified = false;
7791
7792 let later = now + Duration::from_millis(200); let frames3 = state.check_for_address_observations(0, true, later);
7795 assert_eq!(frames3.len(), 1);
7796 }
7797
7798 #[test]
7799 fn check_for_address_observations_multiple_paths() {
7800 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7801 let now = Instant::now();
7802 let mut state = AddressDiscoveryState::new(&config, now);
7803
7804 state.observe_all_paths = true;
7806
7807 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7809 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7810
7811 state
7812 .path_addresses
7813 .insert(0, paths::PathAddressInfo::new());
7814 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(addr1);
7815
7816 state
7817 .path_addresses
7818 .insert(1, paths::PathAddressInfo::new());
7819 state.path_addresses.get_mut(&1).unwrap().observed_address = Some(addr2);
7820
7821 let frames = state.check_for_address_observations(0, true, now);
7823
7824 assert_eq!(frames.len(), 2);
7826
7827 let addresses: Vec<_> = frames.iter().map(|f| f.address).collect();
7829 assert!(addresses.contains(&addr1));
7830 assert!(addresses.contains(&addr2));
7831
7832 assert!(state.path_addresses.get(&0).unwrap().notified);
7834 assert!(state.path_addresses.get(&1).unwrap().notified);
7835 }
7836
7837 #[test]
7839 fn test_rate_limiter_configuration() {
7840 let state = AddressDiscoveryState::new_with_params(true, 10.0, false);
7842 assert_eq!(state.rate_limiter.rate, 10.0);
7843 assert_eq!(state.rate_limiter.max_tokens, 10.0);
7844 assert_eq!(state.rate_limiter.tokens, 10.0);
7845
7846 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
7847 assert_eq!(state.rate_limiter.rate, 63.0);
7848 assert_eq!(state.rate_limiter.max_tokens, 63.0);
7849 }
7850
7851 #[test]
7852 fn test_rate_limiter_update_configuration() {
7853 let mut state = AddressDiscoveryState::new_with_params(true, 5.0, false);
7854
7855 assert_eq!(state.rate_limiter.rate, 5.0);
7857
7858 state.update_rate_limit(10.0);
7860 assert_eq!(state.rate_limiter.rate, 10.0);
7861 assert_eq!(state.rate_limiter.max_tokens, 10.0);
7862
7863 state.rate_limiter.tokens = 15.0;
7865 state.update_rate_limit(8.0);
7866 assert_eq!(state.rate_limiter.tokens, 8.0);
7867 }
7868
7869 #[test]
7870 fn test_rate_limiter_from_transport_params() {
7871 let mut params = TransportParameters::default();
7872 params.address_discovery = Some(AddressDiscoveryConfig::SendAndReceive);
7873
7874 let state = AddressDiscoveryState::from_transport_params(¶ms);
7875 assert!(state.is_some());
7876 let state = state.unwrap();
7877 assert_eq!(state.rate_limiter.rate, 10.0); assert!(!state.observe_all_paths); }
7880
7881 #[test]
7882 fn test_rate_limiter_zero_rate() {
7883 let state = AddressDiscoveryState::new_with_params(true, 0.0, false);
7884 assert_eq!(state.rate_limiter.rate, 0.0);
7885 assert_eq!(state.rate_limiter.tokens, 0.0);
7886
7887 let address = "192.168.1.1:443".parse().unwrap();
7889 let mut state = AddressDiscoveryState::new_with_params(true, 0.0, false);
7890 let frame = state.queue_observed_address_frame(0, address);
7891 assert!(frame.is_none());
7892 }
7893
7894 #[test]
7895 fn test_rate_limiter_configuration_edge_cases() {
7896 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
7898 assert_eq!(state.rate_limiter.rate, 63.0);
7899
7900 let state = AddressDiscoveryState::new_with_params(true, 100.0, false);
7902 assert_eq!(state.rate_limiter.rate, 100.0);
7904
7905 let state = AddressDiscoveryState::new_with_params(true, 2.5, false);
7907 assert_eq!(state.rate_limiter.rate, 2.0);
7909 }
7910
7911 #[test]
7912 fn test_rate_limiter_runtime_update() {
7913 let mut state = AddressDiscoveryState::new_with_params(true, 10.0, false);
7914 let now = Instant::now();
7915
7916 state.rate_limiter.tokens = 5.0;
7918
7919 state.update_rate_limit(3.0);
7921
7922 assert_eq!(state.rate_limiter.tokens, 3.0);
7924 assert_eq!(state.rate_limiter.rate, 3.0);
7925 assert_eq!(state.rate_limiter.max_tokens, 3.0);
7926
7927 let later = now + Duration::from_secs(1);
7929 state.rate_limiter.update_tokens(later);
7930
7931 assert_eq!(state.rate_limiter.tokens, 3.0);
7933 }
7934
7935 #[test]
7937 fn test_address_discovery_state_initialization_default() {
7938 let now = Instant::now();
7940 let default_config = crate::transport_parameters::AddressDiscoveryConfig::default();
7941
7942 let address_discovery_state = Some(AddressDiscoveryState::new(&default_config, now));
7945
7946 assert!(address_discovery_state.is_some());
7947 let state = address_discovery_state.unwrap();
7948
7949 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
7953 }
7954
7955 #[test]
7956 fn test_address_discovery_state_initialization_on_handshake() {
7957 let now = Instant::now();
7959
7960 let mut address_discovery_state = Some(AddressDiscoveryState::new(
7962 &crate::transport_parameters::AddressDiscoveryConfig::default(),
7963 now,
7964 ));
7965
7966 let peer_params = TransportParameters {
7968 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
7969 ..TransportParameters::default()
7970 };
7971
7972 if let Some(peer_config) = &peer_params.address_discovery {
7974 address_discovery_state = Some(AddressDiscoveryState::new(peer_config, now));
7976 }
7977
7978 assert!(address_discovery_state.is_some());
7980 let state = address_discovery_state.unwrap();
7981 assert!(state.enabled);
7982 assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths); }
7986
7987 #[test]
7988 fn test_address_discovery_negotiation_disabled_peer() {
7989 let now = Instant::now();
7991
7992 let our_config = AddressDiscoveryConfig::SendAndReceive;
7994 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
7995
7996 let peer_params = TransportParameters {
7998 address_discovery: None,
7999 ..TransportParameters::default()
8000 };
8001
8002 if peer_params.address_discovery.is_none() {
8004 if let Some(state) = &mut address_discovery_state {
8005 state.enabled = false;
8006 }
8007 }
8008
8009 let state = address_discovery_state.unwrap();
8011 assert!(!state.enabled); }
8013
8014 #[test]
8015 fn test_address_discovery_negotiation_rate_limiting() {
8016 let now = Instant::now();
8018
8019 let our_config = AddressDiscoveryConfig::SendAndReceive;
8021 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8022
8023 if let Some(state) = &mut address_discovery_state {
8025 state.max_observation_rate = 30;
8026 state.update_rate_limit(30.0);
8027 }
8028
8029 let peer_params = TransportParameters {
8031 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8032 ..TransportParameters::default()
8033 };
8034
8035 if let (Some(state), Some(_peer_config)) =
8038 (&mut address_discovery_state, &peer_params.address_discovery)
8039 {
8040 let peer_rate = 15u8;
8043 let negotiated_rate = state.max_observation_rate.min(peer_rate);
8044 state.update_rate_limit(negotiated_rate as f64);
8045 }
8046
8047 let state = address_discovery_state.unwrap();
8049 assert_eq!(state.rate_limiter.rate, 15.0); }
8051
8052 #[test]
8053 fn test_address_discovery_path_initialization() {
8054 let now = Instant::now();
8056 let config = AddressDiscoveryConfig::SendAndReceive;
8057 let mut state = AddressDiscoveryState::new(&config, now);
8058
8059 assert!(state.path_addresses.is_empty());
8061
8062 let should_send = state.should_send_observation(0, now);
8064 assert!(should_send); }
8069
8070 #[test]
8071 fn test_address_discovery_multiple_path_initialization() {
8072 let now = Instant::now();
8074 let config = AddressDiscoveryConfig::SendAndReceive;
8075 let mut state = AddressDiscoveryState::new(&config, now);
8076
8077 assert!(state.should_send_observation(0, now)); assert!(!state.should_send_observation(1, now)); assert!(!state.should_send_observation(2, now)); state.observe_all_paths = true;
8084 assert!(state.should_send_observation(1, now)); assert!(state.should_send_observation(2, now)); let config_primary_only = AddressDiscoveryConfig::SendAndReceive;
8089 let mut state_primary = AddressDiscoveryState::new(&config_primary_only, now);
8090
8091 assert!(state_primary.should_send_observation(0, now)); assert!(!state_primary.should_send_observation(1, now)); }
8094
8095 #[test]
8096 fn test_handle_observed_address_frame_valid() {
8097 let now = Instant::now();
8099 let config = AddressDiscoveryConfig::SendAndReceive;
8100 let mut state = AddressDiscoveryState::new(&config, now);
8101
8102 let observed_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8104 state.handle_observed_address(observed_addr, 0, now);
8105
8106 assert_eq!(state.observed_addresses.len(), 1);
8108 assert_eq!(state.observed_addresses[0].address, observed_addr);
8109 assert_eq!(state.observed_addresses[0].path_id, 0);
8110 assert_eq!(state.observed_addresses[0].received_at, now);
8111
8112 let path_info = state.path_addresses.get(&0).unwrap();
8114 assert_eq!(path_info.observed_address, Some(observed_addr));
8115 assert_eq!(path_info.last_observed, Some(now));
8116 assert_eq!(path_info.observation_count, 1);
8117 }
8118
8119 #[test]
8120 fn test_handle_multiple_observed_addresses() {
8121 let now = Instant::now();
8123 let config = AddressDiscoveryConfig::SendAndReceive;
8124 let mut state = AddressDiscoveryState::new(&config, now);
8125
8126 let addr1 = SocketAddr::from(([192, 168, 1, 100], 5000));
8128 let addr2 = SocketAddr::from(([10, 0, 0, 50], 6000));
8129 let addr3 = SocketAddr::from(([192, 168, 1, 100], 7000)); state.handle_observed_address(addr1, 0, now);
8132 state.handle_observed_address(addr2, 1, now);
8133 state.handle_observed_address(addr3, 0, now + Duration::from_millis(100));
8134
8135 assert_eq!(state.observed_addresses.len(), 3);
8137
8138 let path0_info = state.path_addresses.get(&0).unwrap();
8140 assert_eq!(path0_info.observed_address, Some(addr3));
8141 assert_eq!(path0_info.observation_count, 1); let path1_info = state.path_addresses.get(&1).unwrap();
8145 assert_eq!(path1_info.observed_address, Some(addr2));
8146 assert_eq!(path1_info.observation_count, 1);
8147 }
8148
8149 #[test]
8150 fn test_get_observed_address() {
8151 let now = Instant::now();
8153 let config = AddressDiscoveryConfig::SendAndReceive;
8154 let mut state = AddressDiscoveryState::new(&config, now);
8155
8156 assert_eq!(state.get_observed_address(0), None);
8158
8159 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8161 state.handle_observed_address(addr, 0, now);
8162
8163 assert_eq!(state.get_observed_address(0), Some(addr));
8165
8166 assert_eq!(state.get_observed_address(999), None);
8168 }
8169
8170 #[test]
8171 fn test_has_unnotified_changes() {
8172 let now = Instant::now();
8174 let config = AddressDiscoveryConfig::SendAndReceive;
8175 let mut state = AddressDiscoveryState::new(&config, now);
8176
8177 assert!(!state.has_unnotified_changes());
8179
8180 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8182 state.handle_observed_address(addr, 0, now);
8183 assert!(state.has_unnotified_changes());
8184
8185 if let Some(path_info) = state.path_addresses.get_mut(&0) {
8187 path_info.notified = true;
8188 }
8189 assert!(!state.has_unnotified_changes());
8190
8191 let addr2 = SocketAddr::from(([192, 168, 1, 100], 6000));
8193 state.handle_observed_address(addr2, 0, now + Duration::from_secs(1));
8194 assert!(state.has_unnotified_changes());
8195 }
8196
8197 #[test]
8198 fn test_address_discovery_disabled() {
8199 let now = Instant::now();
8201 let config = AddressDiscoveryConfig::SendAndReceive;
8202 let mut state = AddressDiscoveryState::new(&config, now);
8203
8204 state.enabled = false;
8206
8207 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8209 state.handle_observed_address(addr, 0, now);
8210
8211 assert_eq!(state.observed_addresses.len(), 0);
8213
8214 assert!(!state.should_send_observation(0, now));
8216 }
8217
8218 #[test]
8219 fn test_rate_limiting_basic() {
8220 let now = Instant::now();
8222 let config = AddressDiscoveryConfig::SendAndReceive;
8223 let mut state = AddressDiscoveryState::new(&config, now);
8224
8225 state.observe_all_paths = true;
8227 state.rate_limiter.set_rate(2); assert!(state.should_send_observation(0, now));
8231 state.record_observation_sent(0);
8233
8234 assert!(state.should_send_observation(1, now));
8236 state.record_observation_sent(1);
8237
8238 assert!(!state.should_send_observation(2, now));
8240
8241 let later = now + Duration::from_millis(500);
8243 assert!(state.should_send_observation(3, later));
8244 state.record_observation_sent(3);
8245
8246 assert!(!state.should_send_observation(4, later));
8248
8249 let _one_sec_later = now + Duration::from_secs(1);
8253 let two_sec_later = now + Duration::from_secs(2);
8257 assert!(state.should_send_observation(5, two_sec_later));
8258 state.record_observation_sent(5);
8259
8260 assert!(state.should_send_observation(6, two_sec_later));
8271 state.record_observation_sent(6);
8272
8273 assert!(
8275 !state.should_send_observation(7, two_sec_later),
8276 "Expected no tokens available"
8277 );
8278 }
8279
8280 #[test]
8281 fn test_rate_limiting_per_path() {
8282 let now = Instant::now();
8284 let config = AddressDiscoveryConfig::SendAndReceive;
8285 let mut state = AddressDiscoveryState::new(&config, now);
8286
8287 state
8289 .path_addresses
8290 .insert(0, paths::PathAddressInfo::new());
8291 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
8292 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
8293 8080,
8294 ));
8295
8296 for _ in 0..10 {
8298 assert!(state.should_send_observation(0, now));
8299 state.record_observation_sent(0);
8300 state.path_addresses.get_mut(&0).unwrap().notified = false;
8302 }
8303
8304 assert!(!state.should_send_observation(0, now));
8306
8307 let later = now + Duration::from_millis(100);
8309 assert!(state.should_send_observation(0, later));
8310 state.record_observation_sent(0);
8311
8312 state.path_addresses.get_mut(&0).unwrap().notified = false;
8314
8315 assert!(!state.should_send_observation(0, later));
8317 }
8318
8319 #[test]
8320 fn test_rate_limiting_zero_rate() {
8321 let now = Instant::now();
8323 let config = AddressDiscoveryConfig::SendAndReceive;
8324 let mut state = AddressDiscoveryState::new(&config, now);
8325
8326 state.rate_limiter.set_rate(0);
8328 state.rate_limiter.tokens = 0.0;
8329 state.rate_limiter.max_tokens = 0.0;
8330
8331 assert!(!state.should_send_observation(0, now));
8333 assert!(!state.should_send_observation(0, now + Duration::from_secs(10)));
8334 assert!(!state.should_send_observation(0, now + Duration::from_secs(100)));
8335 }
8336
8337 #[test]
8338 fn test_rate_limiting_update() {
8339 let now = Instant::now();
8341 let config = AddressDiscoveryConfig::SendAndReceive;
8342 let mut state = AddressDiscoveryState::new(&config, now);
8343
8344 state.observe_all_paths = true;
8346
8347 for i in 0..12 {
8349 state
8350 .path_addresses
8351 .insert(i, paths::PathAddressInfo::new());
8352 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
8353 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8354 8080,
8355 ));
8356 }
8357
8358 for i in 0..10 {
8361 assert!(state.should_send_observation(i, now));
8362 state.record_observation_sent(i);
8363 }
8364 assert!(!state.should_send_observation(10, now));
8366
8367 state.update_rate_limit(20.0);
8369
8370 let later = now + Duration::from_millis(50);
8373 assert!(state.should_send_observation(10, later));
8374 state.record_observation_sent(10);
8375
8376 let later2 = now + Duration::from_millis(100);
8378 assert!(state.should_send_observation(11, later2));
8379 }
8380
8381 #[test]
8382 fn test_rate_limiting_burst() {
8383 let now = Instant::now();
8385 let config = AddressDiscoveryConfig::SendAndReceive;
8386 let mut state = AddressDiscoveryState::new(&config, now);
8387
8388 for _ in 0..10 {
8390 assert!(state.should_send_observation(0, now));
8391 state.record_observation_sent(0);
8392 }
8393
8394 assert!(!state.should_send_observation(0, now));
8396
8397 let later = now + Duration::from_millis(100);
8399 assert!(state.should_send_observation(0, later));
8400 state.record_observation_sent(0);
8401 assert!(!state.should_send_observation(0, later));
8402 }
8403
8404 #[test]
8405 fn test_connection_rate_limiting_with_check_observations() {
8406 let now = Instant::now();
8408 let config = AddressDiscoveryConfig::SendAndReceive;
8409 let mut state = AddressDiscoveryState::new(&config, now);
8410
8411 let mut path_info = paths::PathAddressInfo::new();
8413 path_info.update_observed_address(
8414 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
8415 now,
8416 );
8417 state.path_addresses.insert(0, path_info);
8418
8419 let frame1 =
8421 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8422 assert!(frame1.is_some());
8423 state.record_observation_sent(0);
8424
8425 if let Some(info) = state.path_addresses.get_mut(&0) {
8427 info.notified = false;
8428 }
8429
8430 for _ in 1..10 {
8432 if let Some(info) = state.path_addresses.get_mut(&0) {
8434 info.notified = false;
8435 }
8436 let frame =
8437 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8438 assert!(frame.is_some());
8439 state.record_observation_sent(0);
8440 }
8441
8442 if let Some(info) = state.path_addresses.get_mut(&0) {
8444 info.notified = false;
8445 }
8446 let frame3 =
8447 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8448 assert!(frame3.is_none()); let later = now + Duration::from_millis(100);
8452 state.rate_limiter.update_tokens(later); if let Some(info) = state.path_addresses.get_mut(&0) {
8456 info.notified = false;
8457 }
8458
8459 let frame4 =
8460 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8461 assert!(frame4.is_some()); }
8463
8464 #[test]
8465 fn test_queue_observed_address_frame() {
8466 let now = Instant::now();
8468 let config = AddressDiscoveryConfig::SendAndReceive;
8469 let mut state = AddressDiscoveryState::new(&config, now);
8470
8471 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8472
8473 let frame = state.queue_observed_address_frame(0, addr);
8475 assert!(frame.is_some());
8476 assert_eq!(frame.unwrap().address, addr);
8477
8478 state.record_observation_sent(0);
8480
8481 for i in 0..9 {
8483 if let Some(info) = state.path_addresses.get_mut(&0) {
8485 info.notified = false;
8486 }
8487
8488 let frame = state.queue_observed_address_frame(0, addr);
8489 assert!(frame.is_some(), "Frame {} should be allowed", i + 2);
8490 state.record_observation_sent(0);
8491 }
8492
8493 if let Some(info) = state.path_addresses.get_mut(&0) {
8495 info.notified = false;
8496 }
8497
8498 let frame = state.queue_observed_address_frame(0, addr);
8500 assert!(frame.is_none(), "11th frame should be rate limited");
8501 }
8502
8503 #[test]
8504 fn test_multi_path_basic() {
8505 let now = Instant::now();
8507 let config = AddressDiscoveryConfig::SendAndReceive;
8508 let mut state = AddressDiscoveryState::new(&config, now);
8509
8510 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8511 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8512 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8513
8514 state.handle_observed_address(addr1, 0, now);
8516 state.handle_observed_address(addr2, 1, now);
8517 state.handle_observed_address(addr3, 2, now);
8518
8519 assert_eq!(state.get_observed_address(0), Some(addr1));
8521 assert_eq!(state.get_observed_address(1), Some(addr2));
8522 assert_eq!(state.get_observed_address(2), Some(addr3));
8523
8524 assert!(state.has_unnotified_changes());
8526
8527 assert_eq!(state.observed_addresses.len(), 3);
8529 }
8530
8531 #[test]
8532 fn test_multi_path_observe_primary_only() {
8533 let now = Instant::now();
8535 let config = AddressDiscoveryConfig::SendAndReceive;
8536 let mut state = AddressDiscoveryState::new(&config, now);
8537
8538 assert!(state.should_send_observation(0, now));
8540 state.record_observation_sent(0);
8541
8542 assert!(!state.should_send_observation(1, now));
8544 assert!(!state.should_send_observation(2, now));
8545
8546 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8548 assert!(state.queue_observed_address_frame(0, addr).is_some());
8549 assert!(state.queue_observed_address_frame(1, addr).is_none());
8550 assert!(state.queue_observed_address_frame(2, addr).is_none());
8551 }
8552
8553 #[test]
8554 fn test_multi_path_rate_limiting() {
8555 let now = Instant::now();
8557 let config = AddressDiscoveryConfig::SendAndReceive;
8558 let mut state = AddressDiscoveryState::new(&config, now);
8559
8560 state.observe_all_paths = true;
8562
8563 for i in 0..21 {
8565 state
8566 .path_addresses
8567 .insert(i, paths::PathAddressInfo::new());
8568 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
8569 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8570 8080,
8571 ));
8572 }
8573
8574 for i in 0..10 {
8576 assert!(state.should_send_observation(i, now));
8577 state.record_observation_sent(i);
8578 }
8579
8580 assert!(!state.should_send_observation(10, now));
8582
8583 state.path_addresses.get_mut(&0).unwrap().notified = false;
8585 assert!(!state.should_send_observation(0, now)); let later = now + Duration::from_secs(1);
8589 for i in 10..20 {
8590 assert!(state.should_send_observation(i, later));
8591 state.record_observation_sent(i);
8592 }
8593 assert!(!state.should_send_observation(20, later));
8595 }
8596
8597 #[test]
8598 fn test_multi_path_address_changes() {
8599 let now = Instant::now();
8601 let config = AddressDiscoveryConfig::SendAndReceive;
8602 let mut state = AddressDiscoveryState::new(&config, now);
8603
8604 let addr1a = SocketAddr::from(([192, 168, 1, 1], 5000));
8605 let addr1b = SocketAddr::from(([192, 168, 1, 2], 5000));
8606 let addr2a = SocketAddr::from(([10, 0, 0, 1], 6000));
8607 let addr2b = SocketAddr::from(([10, 0, 0, 2], 6000));
8608
8609 state.handle_observed_address(addr1a, 0, now);
8611 state.handle_observed_address(addr2a, 1, now);
8612
8613 state.record_observation_sent(0);
8615 state.record_observation_sent(1);
8616 assert!(!state.has_unnotified_changes());
8617
8618 state.handle_observed_address(addr1b, 0, now + Duration::from_secs(1));
8620 assert!(state.has_unnotified_changes());
8621
8622 assert_eq!(state.get_observed_address(0), Some(addr1b));
8624 assert_eq!(state.get_observed_address(1), Some(addr2a));
8625
8626 state.record_observation_sent(0);
8628 assert!(!state.has_unnotified_changes());
8629
8630 state.handle_observed_address(addr2b, 1, now + Duration::from_secs(2));
8632 assert!(state.has_unnotified_changes());
8633 }
8634
8635 #[test]
8636 fn test_multi_path_migration() {
8637 let now = Instant::now();
8639 let config = AddressDiscoveryConfig::SendAndReceive;
8640 let mut state = AddressDiscoveryState::new(&config, now);
8641
8642 let addr_old = SocketAddr::from(([192, 168, 1, 1], 5000));
8643 let addr_new = SocketAddr::from(([10, 0, 0, 1], 6000));
8644
8645 state.handle_observed_address(addr_old, 0, now);
8647 assert_eq!(state.get_observed_address(0), Some(addr_old));
8648
8649 state.handle_observed_address(addr_new, 1, now + Duration::from_secs(1));
8651
8652 assert_eq!(state.get_observed_address(0), Some(addr_old));
8654 assert_eq!(state.get_observed_address(1), Some(addr_new));
8655
8656 assert_eq!(state.path_addresses.len(), 2);
8659 }
8660
8661 #[test]
8662 fn test_check_for_address_observations_multi_path() {
8663 let now = Instant::now();
8665 let config = AddressDiscoveryConfig::SendAndReceive;
8666 let mut state = AddressDiscoveryState::new(&config, now);
8667
8668 state.observe_all_paths = true;
8670
8671 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8673 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8674 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8675
8676 state.handle_observed_address(addr1, 0, now);
8677 state.handle_observed_address(addr2, 1, now);
8678 state.handle_observed_address(addr3, 2, now);
8679
8680 let frames = state.check_for_address_observations(0, true, now);
8682
8683 assert_eq!(frames.len(), 3);
8685
8686 let frame_addrs: Vec<_> = frames.iter().map(|f| f.address).collect();
8688 assert!(frame_addrs.contains(&addr1), "addr1 should be in frames");
8689 assert!(frame_addrs.contains(&addr2), "addr2 should be in frames");
8690 assert!(frame_addrs.contains(&addr3), "addr3 should be in frames");
8691
8692 assert!(!state.has_unnotified_changes());
8694 }
8695
8696 #[test]
8697 fn test_multi_path_with_peer_not_supporting() {
8698 let now = Instant::now();
8700 let config = AddressDiscoveryConfig::SendAndReceive;
8701 let mut state = AddressDiscoveryState::new(&config, now);
8702
8703 state.handle_observed_address(SocketAddr::from(([192, 168, 1, 1], 5000)), 0, now);
8705 state.handle_observed_address(SocketAddr::from(([10, 0, 0, 1], 6000)), 1, now);
8706
8707 let frames = state.check_for_address_observations(0, false, now);
8709 assert_eq!(frames.len(), 0);
8710
8711 assert!(state.has_unnotified_changes());
8713 }
8714
8715 #[test]
8717 fn test_bootstrap_node_aggressive_observation_mode() {
8718 let config = AddressDiscoveryConfig::SendAndReceive;
8720 let now = Instant::now();
8721 let mut state = AddressDiscoveryState::new(&config, now);
8722
8723 assert!(!state.is_bootstrap_mode());
8725
8726 state.set_bootstrap_mode(true);
8728 assert!(state.is_bootstrap_mode());
8729
8730 assert!(state.should_observe_path(0)); assert!(state.should_observe_path(1)); assert!(state.should_observe_path(2));
8734
8735 let bootstrap_rate = state.get_effective_rate_limit();
8737 assert!(bootstrap_rate > 10.0); }
8739
8740 #[test]
8741 fn test_bootstrap_node_immediate_observation() {
8742 let config = AddressDiscoveryConfig::SendAndReceive;
8744 let now = Instant::now();
8745 let mut state = AddressDiscoveryState::new(&config, now);
8746 state.set_bootstrap_mode(true);
8747
8748 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8750 state.handle_observed_address(addr, 0, now);
8751
8752 assert!(state.should_send_observation_immediately(true));
8754
8755 assert!(state.should_send_observation(0, now));
8757
8758 let frame = state.queue_observed_address_frame(0, addr);
8760 assert!(frame.is_some());
8761 }
8762
8763 #[test]
8764 fn test_bootstrap_node_multiple_path_observations() {
8765 let config = AddressDiscoveryConfig::SendAndReceive;
8767 let now = Instant::now();
8768 let mut state = AddressDiscoveryState::new(&config, now);
8769 state.set_bootstrap_mode(true);
8770
8771 let addrs = vec![
8773 (0, SocketAddr::from(([192, 168, 1, 1], 5000))),
8774 (1, SocketAddr::from(([10, 0, 0, 1], 6000))),
8775 (2, SocketAddr::from(([172, 16, 0, 1], 7000))),
8776 ];
8777
8778 for (path_id, addr) in &addrs {
8779 state.handle_observed_address(*addr, *path_id, now);
8780 }
8781
8782 let frames = state.check_for_address_observations(0, true, now);
8784 assert_eq!(frames.len(), 3);
8785
8786 for (_, addr) in &addrs {
8788 assert!(frames.iter().any(|f| f.address == *addr));
8789 }
8790 }
8791
8792 #[test]
8793 fn test_bootstrap_node_rate_limit_override() {
8794 let config = AddressDiscoveryConfig::SendAndReceive;
8796 let now = Instant::now();
8797 let mut state = AddressDiscoveryState::new(&config, now);
8798 state.set_bootstrap_mode(true);
8799
8800 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8802
8803 for i in 0..10 {
8805 state.handle_observed_address(addr, i, now);
8806 let can_send = state.should_send_observation(i, now);
8807 assert!(can_send, "Bootstrap node should send observation {i}");
8808 state.record_observation_sent(i);
8809 }
8810 }
8811
8812 #[test]
8813 fn test_bootstrap_node_configuration() {
8814 let config = AddressDiscoveryConfig::SendAndReceive;
8816 let mut state = AddressDiscoveryState::new(&config, Instant::now());
8817
8818 state.set_bootstrap_mode(true);
8820
8821 assert!(state.bootstrap_mode);
8823 assert!(state.enabled);
8824
8825 let effective_rate = state.get_effective_rate_limit();
8827 assert!(effective_rate > state.max_observation_rate as f64);
8828 }
8829
8830 #[test]
8831 fn test_bootstrap_node_persistent_observation() {
8832 let config = AddressDiscoveryConfig::SendAndReceive;
8834 let mut now = Instant::now();
8835 let mut state = AddressDiscoveryState::new(&config, now);
8836 state.set_bootstrap_mode(true);
8837
8838 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8839 let addr2 = SocketAddr::from(([192, 168, 1, 2], 5000));
8840
8841 state.handle_observed_address(addr1, 0, now);
8843 assert!(state.should_send_observation(0, now));
8844 state.record_observation_sent(0);
8845
8846 now += Duration::from_secs(60);
8848 state.handle_observed_address(addr2, 0, now);
8849
8850 assert!(state.should_send_observation(0, now));
8852 }
8853
8854 #[test]
8855 fn test_bootstrap_node_multi_peer_support() {
8856 let config = AddressDiscoveryConfig::SendAndReceive;
8859 let now = Instant::now();
8860 let mut state = AddressDiscoveryState::new(&config, now);
8861 state.set_bootstrap_mode(true);
8862
8863 let peer_addresses = vec![
8865 (0, SocketAddr::from(([192, 168, 1, 1], 5000))), (1, SocketAddr::from(([10, 0, 0, 1], 6000))), (2, SocketAddr::from(([172, 16, 0, 1], 7000))), (3, SocketAddr::from(([192, 168, 2, 1], 8000))), ];
8870
8871 for (path_id, addr) in &peer_addresses {
8873 state.handle_observed_address(*addr, *path_id, now);
8874 }
8875
8876 let frames = state.check_for_address_observations(0, true, now);
8878 assert_eq!(frames.len(), peer_addresses.len());
8879
8880 for (_, addr) in &peer_addresses {
8882 assert!(frames.iter().any(|f| f.address == *addr));
8883 }
8884 }
8885
8886 mod address_discovery_tests {
8888 include!("address_discovery_tests.rs");
8889 }
8890}