1use std::{
2 cmp,
3 collections::VecDeque,
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12use rand::{Rng, SeedableRng, rngs::StdRng};
15use thiserror::Error;
16use tracing::{debug, error, info, trace, trace_span, warn};
17
18use crate::{
19 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
20 MIN_INITIAL_SIZE, MtuDiscoveryConfig, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
21 TransportError, TransportErrorCode, VarInt,
22 cid_generator::ConnectionIdGenerator,
23 cid_queue::CidQueue,
24 coding::BufMutExt,
25 config::{ServerConfig, TransportConfig},
26 crypto::{self, KeyPair, Keys, PacketKey},
27 endpoint::AddressDiscoveryStats,
28 frame::{self, Close, Datagram, FrameStruct, NewToken},
29 packet::{
30 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
31 PacketNumber, PartialDecode, SpaceId,
32 },
33 range_set::ArrayRangeSet,
34 shared::{
35 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
36 EndpointEvent, EndpointEventInner,
37 },
38 token::{ResetToken, Token, TokenPayload},
39 transport_parameters::TransportParameters,
40};
41
42mod ack_frequency;
43use ack_frequency::AckFrequencyState;
44
45pub(crate) mod nat_traversal;
46use nat_traversal::NatTraversalState;
47pub(crate) use nat_traversal::{CoordinationPhase, NatTraversalError, NatTraversalRole};
48
49mod assembler;
50pub use assembler::Chunk;
51
52mod cid_state;
53use cid_state::CidState;
54
55mod datagrams;
56use datagrams::DatagramState;
57pub use datagrams::{Datagrams, SendDatagramError};
58
59mod mtud;
60use mtud::MtuDiscovery;
61
62mod pacing;
63
64mod packet_builder;
65use packet_builder::PacketBuilder;
66
67mod packet_crypto;
68use packet_crypto::{PrevCrypto, ZeroRttCrypto};
69
70mod paths;
71pub use paths::RttEstimator;
72use paths::{NatTraversalChallenges, PathData, PathResponses};
73
74mod send_buffer;
75
76mod spaces;
77#[cfg(fuzzing)]
78pub use spaces::Retransmits;
79#[cfg(not(fuzzing))]
80use spaces::Retransmits;
81use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
82
83mod stats;
84pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
85
86mod streams;
87#[cfg(fuzzing)]
88pub use streams::StreamsState;
89#[cfg(not(fuzzing))]
90use streams::StreamsState;
91pub use streams::{
92 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
93 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
94};
95
96mod timer;
97use crate::congestion::Controller;
98use timer::{Timer, TimerTable};
99
100pub struct Connection {
140 endpoint_config: Arc<EndpointConfig>,
141 config: Arc<TransportConfig>,
142 rng: StdRng,
143 crypto: Box<dyn crypto::Session>,
144 handshake_cid: ConnectionId,
146 rem_handshake_cid: ConnectionId,
148 local_ip: Option<IpAddr>,
151 path: PathData,
152 allow_mtud: bool,
154 prev_path: Option<(ConnectionId, PathData)>,
155 state: State,
156 side: ConnectionSide,
157 zero_rtt_enabled: bool,
159 zero_rtt_crypto: Option<ZeroRttCrypto>,
161 key_phase: bool,
162 key_phase_size: u64,
164 peer_params: TransportParameters,
166 orig_rem_cid: ConnectionId,
168 initial_dst_cid: ConnectionId,
170 retry_src_cid: Option<ConnectionId>,
173 lost_packets: u64,
175 events: VecDeque<Event>,
176 endpoint_events: VecDeque<EndpointEventInner>,
177 spin_enabled: bool,
179 spin: bool,
181 spaces: [PacketSpace; 3],
183 highest_space: SpaceId,
185 prev_crypto: Option<PrevCrypto>,
187 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
192 accepted_0rtt: bool,
193 permit_idle_reset: bool,
195 idle_timeout: Option<Duration>,
197 timers: TimerTable,
198 authentication_failures: u64,
200 error: Option<ConnectionError>,
202 packet_number_filter: PacketNumberFilter,
204
205 path_responses: PathResponses,
210 nat_traversal_challenges: NatTraversalChallenges,
212 close: bool,
213
214 ack_frequency: AckFrequencyState,
218
219 pto_count: u32,
224
225 receiving_ecn: bool,
230 total_authed_packets: u64,
232 app_limited: bool,
235
236 streams: StreamsState,
237 rem_cids: CidQueue,
239 local_cid_state: CidState,
241 datagrams: DatagramState,
243 stats: ConnectionStats,
245 version: u32,
247
248 nat_traversal: Option<NatTraversalState>,
250
251 nat_traversal_frame_config: frame::nat_traversal_unified::NatTraversalFrameConfig,
253
254 address_discovery_state: Option<AddressDiscoveryState>,
256
257 #[cfg(feature = "pqc")]
259 pqc_state: PqcState,
260
261 #[cfg(feature = "trace")]
263 trace_context: crate::tracing::TraceContext,
264
265 #[cfg(feature = "trace")]
267 event_log: Arc<crate::tracing::EventLog>,
268
269 #[cfg(feature = "__qlog")]
271 qlog_streamer: Option<Box<dyn std::io::Write + Send + Sync>>,
272}
273
274impl Connection {
275 pub(crate) fn new(
276 endpoint_config: Arc<EndpointConfig>,
277 config: Arc<TransportConfig>,
278 init_cid: ConnectionId,
279 loc_cid: ConnectionId,
280 rem_cid: ConnectionId,
281 remote: SocketAddr,
282 local_ip: Option<IpAddr>,
283 crypto: Box<dyn crypto::Session>,
284 cid_gen: &dyn ConnectionIdGenerator,
285 now: Instant,
286 version: u32,
287 allow_mtud: bool,
288 rng_seed: [u8; 32],
289 side_args: SideArgs,
290 ) -> Self {
291 let pref_addr_cid = side_args.pref_addr_cid();
292 let path_validated = side_args.path_validated();
293 let connection_side = ConnectionSide::from(side_args);
294 let side = connection_side.side();
295 let initial_space = PacketSpace {
296 crypto: Some(crypto.initial_keys(&init_cid, side)),
297 ..PacketSpace::new(now)
298 };
299 let state = State::Handshake(state::Handshake {
300 rem_cid_set: side.is_server(),
301 expected_token: Bytes::new(),
302 client_hello: None,
303 });
304 let mut rng = StdRng::from_seed(rng_seed);
305 let mut this = Self {
306 endpoint_config,
307 crypto,
308 handshake_cid: loc_cid,
309 rem_handshake_cid: rem_cid,
310 local_cid_state: CidState::new(
311 cid_gen.cid_len(),
312 cid_gen.cid_lifetime(),
313 now,
314 if pref_addr_cid.is_some() { 2 } else { 1 },
315 ),
316 path: PathData::new(remote, allow_mtud, None, now, &config),
317 allow_mtud,
318 local_ip,
319 prev_path: None,
320 state,
321 side: connection_side,
322 zero_rtt_enabled: false,
323 zero_rtt_crypto: None,
324 key_phase: false,
325 key_phase_size: rng.gen_range(10..1000),
332 peer_params: TransportParameters::default(),
333 orig_rem_cid: rem_cid,
334 initial_dst_cid: init_cid,
335 retry_src_cid: None,
336 lost_packets: 0,
337 events: VecDeque::new(),
338 endpoint_events: VecDeque::new(),
339 spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
340 spin: false,
341 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
342 highest_space: SpaceId::Initial,
343 prev_crypto: None,
344 next_crypto: None,
345 accepted_0rtt: false,
346 permit_idle_reset: true,
347 idle_timeout: match config.max_idle_timeout {
348 None | Some(VarInt(0)) => None,
349 Some(dur) => Some(Duration::from_millis(dur.0)),
350 },
351 timers: TimerTable::default(),
352 authentication_failures: 0,
353 error: None,
354 #[cfg(test)]
355 packet_number_filter: match config.deterministic_packet_numbers {
356 false => PacketNumberFilter::new(&mut rng),
357 true => PacketNumberFilter::disabled(),
358 },
359 #[cfg(not(test))]
360 packet_number_filter: PacketNumberFilter::new(&mut rng),
361
362 path_responses: PathResponses::default(),
363 nat_traversal_challenges: NatTraversalChallenges::default(),
364 close: false,
365
366 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
367 &TransportParameters::default(),
368 )),
369
370 pto_count: 0,
371
372 app_limited: false,
373 receiving_ecn: false,
374 total_authed_packets: 0,
375
376 streams: StreamsState::new(
377 side,
378 config.max_concurrent_uni_streams,
379 config.max_concurrent_bidi_streams,
380 config.send_window,
381 config.receive_window,
382 config.stream_receive_window,
383 ),
384 datagrams: DatagramState::default(),
385 config,
386 rem_cids: CidQueue::new(rem_cid),
387 rng,
388 stats: ConnectionStats::default(),
389 version,
390 nat_traversal: None, nat_traversal_frame_config:
392 frame::nat_traversal_unified::NatTraversalFrameConfig::default(),
393 address_discovery_state: {
394 Some(AddressDiscoveryState::new(
397 &crate::transport_parameters::AddressDiscoveryConfig::default(),
398 now,
399 ))
400 },
401 #[cfg(feature = "pqc")]
402 pqc_state: PqcState::new(),
403
404 #[cfg(feature = "trace")]
405 trace_context: crate::tracing::TraceContext::new(crate::tracing::TraceId::new()),
406
407 #[cfg(feature = "trace")]
408 event_log: crate::tracing::global_log(),
409
410 #[cfg(feature = "__qlog")]
411 qlog_streamer: None,
412 };
413
414 #[cfg(feature = "trace")]
416 {
417 use crate::trace_event;
418 use crate::tracing::{Event, EventData, socket_addr_to_bytes, timestamp_now};
419 let _peer_id = {
421 let mut id = [0u8; 32];
422 let addr_bytes = match remote {
423 SocketAddr::V4(addr) => addr.ip().octets().to_vec(),
424 SocketAddr::V6(addr) => addr.ip().octets().to_vec(),
425 };
426 id[..addr_bytes.len().min(32)]
427 .copy_from_slice(&addr_bytes[..addr_bytes.len().min(32)]);
428 id
429 };
430
431 let (addr_bytes, addr_type) = socket_addr_to_bytes(remote);
432 trace_event!(
433 &this.event_log,
434 Event {
435 timestamp: timestamp_now(),
436 trace_id: this.trace_context.trace_id(),
437 sequence: 0,
438 _padding: 0,
439 node_id: [0u8; 32], event_data: EventData::ConnInit {
441 endpoint_bytes: addr_bytes,
442 addr_type,
443 _padding: [0u8; 45],
444 },
445 }
446 );
447 }
448
449 if path_validated {
450 this.on_path_validated();
451 }
452 if side.is_client() {
453 this.write_crypto();
455 this.init_0rtt();
456 }
457 this
458 }
459
460 #[cfg(feature = "__qlog")]
462 pub fn set_qlog(
463 &mut self,
464 writer: Box<dyn std::io::Write + Send + Sync>,
465 _title: Option<String>,
466 _description: Option<String>,
467 _now: Instant,
468 ) {
469 self.qlog_streamer = Some(writer);
470 }
471
472 #[cfg(feature = "__qlog")]
474 fn emit_qlog_recovery_metrics(&mut self, _now: Instant) {
475 }
478
479 #[must_use]
487 pub fn poll_timeout(&mut self) -> Option<Instant> {
488 let mut next_timeout = self.timers.next_timeout();
489
490 if let Some(nat_state) = &self.nat_traversal {
492 if let Some(nat_timeout) = nat_state.get_next_timeout(Instant::now()) {
493 self.timers.set(Timer::NatTraversal, nat_timeout);
495 next_timeout = Some(next_timeout.map_or(nat_timeout, |t| t.min(nat_timeout)));
496 }
497 }
498
499 next_timeout
500 }
501
502 #[must_use]
508 pub fn poll(&mut self) -> Option<Event> {
509 if let Some(x) = self.events.pop_front() {
510 return Some(x);
511 }
512
513 if let Some(event) = self.streams.poll() {
514 return Some(Event::Stream(event));
515 }
516
517 if let Some(err) = self.error.take() {
518 return Some(Event::ConnectionLost { reason: err });
519 }
520
521 None
522 }
523
524 #[must_use]
526 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
527 self.endpoint_events.pop_front().map(EndpointEvent)
528 }
529
530 #[must_use]
532 pub fn streams(&mut self) -> Streams<'_> {
533 Streams {
534 state: &mut self.streams,
535 conn_state: &self.state,
536 }
537 }
538
539 #[cfg(feature = "trace")]
541 pub(crate) fn trace_context(&self) -> &crate::tracing::TraceContext {
542 &self.trace_context
543 }
544
545 #[cfg(feature = "trace")]
547 pub(crate) fn event_log(&self) -> &Arc<crate::tracing::EventLog> {
548 &self.event_log
549 }
550
551 #[must_use]
553 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
554 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
555 RecvStream {
556 id,
557 state: &mut self.streams,
558 pending: &mut self.spaces[SpaceId::Data].pending,
559 }
560 }
561
562 #[must_use]
564 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
565 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
566 SendStream {
567 id,
568 state: &mut self.streams,
569 pending: &mut self.spaces[SpaceId::Data].pending,
570 conn_state: &self.state,
571 }
572 }
573
574 #[must_use]
584 pub fn poll_transmit(
585 &mut self,
586 now: Instant,
587 max_datagrams: usize,
588 buf: &mut Vec<u8>,
589 ) -> Option<Transmit> {
590 assert!(max_datagrams != 0);
591 let max_datagrams = match self.config.enable_segmentation_offload {
592 false => 1,
593 true => max_datagrams,
594 };
595
596 let mut num_datagrams = 0;
597 let mut datagram_start = 0;
600 let mut segment_size = usize::from(self.path.current_mtu());
601
602 if let Some(nat_traversal) = &mut self.nat_traversal {
604 if nat_traversal.check_coordination_timeout(now) {
605 trace!("NAT traversal coordination timed out, may retry");
606 }
607 }
608
609 if let Some(challenge) = self.send_nat_traversal_challenge(now, buf) {
611 return Some(challenge);
612 }
613
614 if let Some(challenge) = self.send_path_challenge(now, buf) {
615 return Some(challenge);
616 }
617
618 for space in SpaceId::iter() {
620 let request_immediate_ack =
621 space == SpaceId::Data && self.peer_supports_ack_frequency();
622 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
623 }
624
625 let close = match self.state {
627 State::Drained => {
628 self.app_limited = true;
629 return None;
630 }
631 State::Draining | State::Closed(_) => {
632 if !self.close {
635 self.app_limited = true;
636 return None;
637 }
638 true
639 }
640 _ => false,
641 };
642
643 if let Some(config) = &self.config.ack_frequency_config {
645 self.spaces[SpaceId::Data].pending.ack_frequency = self
646 .ack_frequency
647 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
648 && self.highest_space == SpaceId::Data
649 && self.peer_supports_ack_frequency();
650 }
651
652 let mut buf_capacity = 0;
656
657 let mut coalesce = true;
658 let mut builder_storage: Option<PacketBuilder> = None;
659 let mut sent_frames = None;
660 let mut pad_datagram = false;
661 let mut pad_datagram_to_mtu = false;
662 let mut congestion_blocked = false;
663
664 let mut space_idx = 0;
666 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
667 while space_idx < spaces.len() {
670 let space_id = spaces[space_idx];
671 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
678 let frame_space_1rtt =
679 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
680
681 let can_send = self.space_can_send(space_id, frame_space_1rtt);
683 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
684 space_idx += 1;
685 continue;
686 }
687
688 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
689 || self.spaces[space_id].ping_pending
690 || self.spaces[space_id].immediate_ack_pending;
691 if space_id == SpaceId::Data {
692 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
693 }
694
695 pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
696
697 let buf_end = if let Some(builder) = &builder_storage {
701 buf.len().max(builder.min_size) + builder.tag_len
702 } else {
703 buf.len()
704 };
705
706 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
707 crypto.packet.local.tag_len()
708 } else if space_id == SpaceId::Data {
709 match self.zero_rtt_crypto.as_ref() {
710 Some(crypto) => crypto.packet.tag_len(),
711 None => {
712 error!(
714 "sending packets in the application data space requires known 0-RTT or 1-RTT keys"
715 );
716 return None;
717 }
718 }
719 } else {
720 unreachable!("tried to send {:?} packet without keys", space_id)
721 };
722 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
723 if num_datagrams >= max_datagrams {
727 break;
729 }
730
731 if self
738 .path
739 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
740 {
741 trace!("blocked by anti-amplification");
742 break;
743 }
744
745 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
748 let untracked_bytes = if let Some(builder) = &builder_storage {
750 buf_capacity - builder.partial_encode.start
751 } else {
752 0
753 } as u64;
754 debug_assert!(untracked_bytes <= segment_size as u64);
755
756 let bytes_to_send = segment_size as u64 + untracked_bytes;
757 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
758 space_idx += 1;
759 congestion_blocked = true;
760 trace!("blocked by congestion control");
763 continue;
764 }
765
766 let smoothed_rtt = self.path.rtt.get();
768 if let Some(delay) = self.path.pacing.delay(
769 smoothed_rtt,
770 bytes_to_send,
771 self.path.current_mtu(),
772 self.path.congestion.window(),
773 now,
774 ) {
775 self.timers.set(Timer::Pacing, delay);
776 congestion_blocked = true;
777 trace!("blocked by pacing");
780 break;
781 }
782 }
783
784 if let Some(mut builder) = builder_storage.take() {
786 if pad_datagram {
787 #[cfg(feature = "pqc")]
788 let min_size = self.pqc_state.min_initial_size();
789 #[cfg(not(feature = "pqc"))]
790 let min_size = MIN_INITIAL_SIZE;
791 builder.pad_to(min_size);
792 }
793
794 if num_datagrams > 1 || pad_datagram_to_mtu {
795 const MAX_PADDING: usize = 16;
808 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
809 - datagram_start
810 + builder.tag_len;
811 if (packet_len_unpadded + MAX_PADDING < segment_size
812 && !pad_datagram_to_mtu)
813 || datagram_start + segment_size > buf_capacity
814 {
815 trace!(
816 "GSO truncated by demand for {} padding bytes or loss probe",
817 segment_size - packet_len_unpadded
818 );
819 builder_storage = Some(builder);
820 break;
821 }
822
823 builder.pad_to(segment_size as u16);
826 }
827
828 builder.finish_and_track(now, self, sent_frames.take(), buf);
829
830 if num_datagrams == 1 {
831 segment_size = buf.len();
838 buf_capacity = buf.len();
841
842 if space_id == SpaceId::Data {
849 let frame_space_1rtt =
850 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
851 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
852 break;
853 }
854 }
855 }
856 }
857
858 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
860 0 => segment_size,
861 _ => {
862 self.spaces[space_id].loss_probes -= 1;
863 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
867 }
868 };
869 buf_capacity += next_datagram_size_limit;
870 if buf.capacity() < buf_capacity {
871 buf.reserve(max_datagrams * segment_size);
880 }
881 num_datagrams += 1;
882 coalesce = true;
883 pad_datagram = false;
884 datagram_start = buf.len();
885
886 debug_assert_eq!(
887 datagram_start % segment_size,
888 0,
889 "datagrams in a GSO batch must be aligned to the segment size"
890 );
891 } else {
892 if let Some(builder) = builder_storage.take() {
896 builder.finish_and_track(now, self, sent_frames.take(), buf);
897 }
898 }
899
900 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
901
902 if self.spaces[SpaceId::Initial].crypto.is_some()
907 && space_id == SpaceId::Handshake
908 && self.side.is_client()
909 {
910 self.discard_space(now, SpaceId::Initial);
913 }
914 if let Some(ref mut prev) = self.prev_crypto {
915 prev.update_unacked = false;
916 }
917
918 debug_assert!(
919 builder_storage.is_none() && sent_frames.is_none(),
920 "Previous packet must have been finished"
921 );
922
923 let builder = builder_storage.insert(PacketBuilder::new(
924 now,
925 space_id,
926 self.rem_cids.active(),
927 buf,
928 buf_capacity,
929 datagram_start,
930 ack_eliciting,
931 self,
932 )?);
933 coalesce = coalesce && !builder.short_header;
934
935 #[cfg(feature = "pqc")]
937 let should_adjust_coalescing = self
938 .pqc_state
939 .should_adjust_coalescing(buf.len() - datagram_start, space_id);
940 #[cfg(not(feature = "pqc"))]
941 let should_adjust_coalescing = false;
942
943 if should_adjust_coalescing {
944 coalesce = false;
945 trace!("Disabling coalescing for PQC handshake in {:?}", space_id);
946 }
947
948 pad_datagram |=
950 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
951
952 if close {
953 trace!("sending CONNECTION_CLOSE");
954 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
959 Self::populate_acks(
960 now,
961 self.receiving_ecn,
962 &mut SentFrames::default(),
963 &mut self.spaces[space_id],
964 buf,
965 &mut self.stats,
966 );
967 }
968
969 debug_assert!(
973 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
974 "ACKs should leave space for ConnectionClose"
975 );
976 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
977 let max_frame_size = builder.max_size - buf.len();
978 match self.state {
979 State::Closed(state::Closed { ref reason }) => {
980 if space_id == SpaceId::Data || reason.is_transport_layer() {
981 reason.encode(buf, max_frame_size)
982 } else {
983 frame::ConnectionClose {
984 error_code: TransportErrorCode::APPLICATION_ERROR,
985 frame_type: None,
986 reason: Bytes::new(),
987 }
988 .encode(buf, max_frame_size)
989 }
990 }
991 State::Draining => frame::ConnectionClose {
992 error_code: TransportErrorCode::NO_ERROR,
993 frame_type: None,
994 reason: Bytes::new(),
995 }
996 .encode(buf, max_frame_size),
997 _ => unreachable!(
998 "tried to make a close packet when the connection wasn't closed"
999 ),
1000 }
1001 }
1002 if space_id == self.highest_space {
1003 self.close = false;
1005 break;
1007 } else {
1008 space_idx += 1;
1012 continue;
1013 }
1014 }
1015
1016 if space_id == SpaceId::Data && num_datagrams == 1 {
1019 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
1020 let mut builder = builder_storage.take().unwrap();
1023 trace!("PATH_RESPONSE {:08x} (off-path)", token);
1024 buf.write(frame::FrameType::PATH_RESPONSE);
1025 buf.write(token);
1026 self.stats.frame_tx.path_response += 1;
1027 #[cfg(feature = "pqc")]
1028 let min_size = self.pqc_state.min_initial_size();
1029 #[cfg(not(feature = "pqc"))]
1030 let min_size = MIN_INITIAL_SIZE;
1031 builder.pad_to(min_size);
1032 builder.finish_and_track(
1033 now,
1034 self,
1035 Some(SentFrames {
1036 non_retransmits: true,
1037 ..SentFrames::default()
1038 }),
1039 buf,
1040 );
1041 self.stats.udp_tx.on_sent(1, buf.len());
1042
1043 #[cfg(feature = "trace")]
1045 {
1046 use crate::trace_packet_sent;
1047 trace_packet_sent!(
1049 &self.event_log,
1050 self.trace_context.trace_id(),
1051 buf.len() as u32,
1052 0 );
1054 }
1055
1056 return Some(Transmit {
1057 destination: remote,
1058 size: buf.len(),
1059 ecn: None,
1060 segment_size: None,
1061 src_ip: self.local_ip,
1062 });
1063 }
1064 }
1065
1066 if space_id == SpaceId::Data && self.address_discovery_state.is_some() {
1068 let peer_supports = self.peer_params.address_discovery.is_some();
1069
1070 if let Some(state) = &mut self.address_discovery_state {
1071 let frames = state.check_for_address_observations(0, peer_supports, now);
1072 self.spaces[space_id]
1073 .pending
1074 .observed_addresses
1075 .extend(frames);
1076 }
1077 }
1078
1079 let sent =
1080 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
1081
1082 debug_assert!(
1089 !(sent.is_ack_only(&self.streams)
1090 && !can_send.acks
1091 && can_send.other
1092 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
1093 && self.datagrams.outgoing.is_empty()),
1094 "SendableFrames was {can_send:?}, but only ACKs have been written"
1095 );
1096 pad_datagram |= sent.requires_padding;
1097
1098 if sent.largest_acked.is_some() {
1099 self.spaces[space_id].pending_acks.acks_sent();
1100 self.timers.stop(Timer::MaxAckDelay);
1101 }
1102
1103 sent_frames = Some(sent);
1105
1106 }
1109
1110 if let Some(mut builder) = builder_storage {
1112 if pad_datagram {
1113 #[cfg(feature = "pqc")]
1114 let min_size = self.pqc_state.min_initial_size();
1115 #[cfg(not(feature = "pqc"))]
1116 let min_size = MIN_INITIAL_SIZE;
1117 builder.pad_to(min_size);
1118 }
1119
1120 if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
1126 builder.pad_to(segment_size as u16);
1127 }
1128
1129 let last_packet_number = builder.exact_number;
1130 builder.finish_and_track(now, self, sent_frames, buf);
1131 self.path
1132 .congestion
1133 .on_sent(now, buf.len() as u64, last_packet_number);
1134
1135 #[cfg(feature = "__qlog")]
1136 self.emit_qlog_recovery_metrics(now);
1137 }
1138
1139 self.app_limited = buf.is_empty() && !congestion_blocked;
1140
1141 if buf.is_empty() && self.state.is_established() {
1143 let space_id = SpaceId::Data;
1144 let probe_size = self
1145 .path
1146 .mtud
1147 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
1148
1149 let buf_capacity = probe_size as usize;
1150 buf.reserve(buf_capacity);
1151
1152 let mut builder = PacketBuilder::new(
1153 now,
1154 space_id,
1155 self.rem_cids.active(),
1156 buf,
1157 buf_capacity,
1158 0,
1159 true,
1160 self,
1161 )?;
1162
1163 buf.write(frame::FrameType::PING);
1165 self.stats.frame_tx.ping += 1;
1166
1167 if self.peer_supports_ack_frequency() {
1169 buf.write(frame::FrameType::IMMEDIATE_ACK);
1170 self.stats.frame_tx.immediate_ack += 1;
1171 }
1172
1173 builder.pad_to(probe_size);
1174 let sent_frames = SentFrames {
1175 non_retransmits: true,
1176 ..Default::default()
1177 };
1178 builder.finish_and_track(now, self, Some(sent_frames), buf);
1179
1180 self.stats.path.sent_plpmtud_probes += 1;
1181 num_datagrams = 1;
1182
1183 trace!(?probe_size, "writing MTUD probe");
1184 }
1185
1186 if buf.is_empty() {
1187 return None;
1188 }
1189
1190 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1191 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1192
1193 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
1194
1195 #[cfg(feature = "trace")]
1197 {
1198 use crate::trace_packet_sent;
1199 let packet_num = self.spaces[SpaceId::Data]
1202 .next_packet_number
1203 .saturating_sub(1);
1204 trace_packet_sent!(
1205 &self.event_log,
1206 self.trace_context.trace_id(),
1207 buf.len() as u32,
1208 packet_num
1209 );
1210 }
1211
1212 Some(Transmit {
1213 destination: self.path.remote,
1214 size: buf.len(),
1215 ecn: if self.path.sending_ecn {
1216 Some(EcnCodepoint::Ect0)
1217 } else {
1218 None
1219 },
1220 segment_size: match num_datagrams {
1221 1 => None,
1222 _ => Some(segment_size),
1223 },
1224 src_ip: self.local_ip,
1225 })
1226 }
1227
1228 fn send_coordination_request(&mut self, _now: Instant, _buf: &mut Vec<u8>) -> Option<Transmit> {
1230 let nat = self.nat_traversal.as_mut()?;
1232 if !nat.should_send_punch_request() {
1233 return None;
1234 }
1235
1236 let coord = nat.coordination.as_ref()?;
1237 let round = coord.round;
1238 if coord.punch_targets.is_empty() {
1239 return None;
1240 }
1241
1242 trace!(
1243 "queuing PUNCH_ME_NOW round {} with {} targets",
1244 round,
1245 coord.punch_targets.len()
1246 );
1247
1248 for target in &coord.punch_targets {
1250 let punch = frame::PunchMeNow {
1251 round,
1252 paired_with_sequence_number: target.remote_sequence,
1253 address: target.remote_addr,
1254 target_peer_id: None,
1255 };
1256 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1257 }
1258
1259 nat.mark_punch_request_sent();
1261
1262 None
1264 }
1265
1266 fn send_coordinated_path_challenge(
1268 &mut self,
1269 now: Instant,
1270 buf: &mut Vec<u8>,
1271 ) -> Option<Transmit> {
1272 if let Some(nat_traversal) = &mut self.nat_traversal {
1274 if nat_traversal.should_start_punching(now) {
1275 nat_traversal.start_punching_phase(now);
1276 }
1277 }
1278
1279 let (target_addr, challenge) = {
1281 let nat_traversal = self.nat_traversal.as_ref()?;
1282 match nat_traversal.get_coordination_phase() {
1283 Some(CoordinationPhase::Punching) => {
1284 let targets = nat_traversal.get_punch_targets_from_coordination()?;
1285 if targets.is_empty() {
1286 return None;
1287 }
1288 let target = &targets[0];
1290 (target.remote_addr, target.challenge)
1291 }
1292 _ => return None,
1293 }
1294 };
1295
1296 debug_assert_eq!(
1297 self.highest_space,
1298 SpaceId::Data,
1299 "PATH_CHALLENGE queued without 1-RTT keys"
1300 );
1301
1302 #[cfg(feature = "pqc")]
1303 buf.reserve(self.pqc_state.min_initial_size() as usize);
1304 #[cfg(not(feature = "pqc"))]
1305 buf.reserve(MIN_INITIAL_SIZE as usize);
1306 let buf_capacity = buf.capacity();
1307
1308 let mut builder = PacketBuilder::new(
1309 now,
1310 SpaceId::Data,
1311 self.rem_cids.active(),
1312 buf,
1313 buf_capacity,
1314 0,
1315 false,
1316 self,
1317 )?;
1318
1319 trace!(
1320 "sending coordinated PATH_CHALLENGE {:08x} to {}",
1321 challenge, target_addr
1322 );
1323 buf.write(frame::FrameType::PATH_CHALLENGE);
1324 buf.write(challenge);
1325 self.stats.frame_tx.path_challenge += 1;
1326
1327 #[cfg(feature = "pqc")]
1328 let min_size = self.pqc_state.min_initial_size();
1329 #[cfg(not(feature = "pqc"))]
1330 let min_size = MIN_INITIAL_SIZE;
1331 builder.pad_to(min_size);
1332 builder.finish_and_track(now, self, None, buf);
1333
1334 if let Some(nat_traversal) = &mut self.nat_traversal {
1336 nat_traversal.mark_coordination_validating();
1337 }
1338
1339 Some(Transmit {
1340 destination: target_addr,
1341 size: buf.len(),
1342 ecn: if self.path.sending_ecn {
1343 Some(EcnCodepoint::Ect0)
1344 } else {
1345 None
1346 },
1347 segment_size: None,
1348 src_ip: self.local_ip,
1349 })
1350 }
1351
1352 fn send_nat_traversal_challenge(
1354 &mut self,
1355 now: Instant,
1356 buf: &mut Vec<u8>,
1357 ) -> Option<Transmit> {
1358 if let Some(request) = self.send_coordination_request(now, buf) {
1360 return Some(request);
1361 }
1362
1363 if let Some(punch) = self.send_coordinated_path_challenge(now, buf) {
1365 return Some(punch);
1366 }
1367
1368 let (remote_addr, remote_sequence) = {
1370 let nat_traversal = self.nat_traversal.as_ref()?;
1371 let candidates = nat_traversal.get_validation_candidates();
1372 if candidates.is_empty() {
1373 return None;
1374 }
1375 let (sequence, candidate) = candidates[0];
1377 (candidate.address, sequence)
1378 };
1379
1380 let challenge = self.rng.r#gen::<u64>();
1381
1382 if let Err(e) =
1384 self.nat_traversal
1385 .as_mut()?
1386 .start_validation(remote_sequence, challenge, now)
1387 {
1388 warn!("Failed to start NAT traversal validation: {}", e);
1389 return None;
1390 }
1391
1392 debug_assert_eq!(
1393 self.highest_space,
1394 SpaceId::Data,
1395 "PATH_CHALLENGE queued without 1-RTT keys"
1396 );
1397
1398 #[cfg(feature = "pqc")]
1399 buf.reserve(self.pqc_state.min_initial_size() as usize);
1400 #[cfg(not(feature = "pqc"))]
1401 buf.reserve(MIN_INITIAL_SIZE as usize);
1402 let buf_capacity = buf.capacity();
1403
1404 let mut builder = PacketBuilder::new(
1406 now,
1407 SpaceId::Data,
1408 self.rem_cids.active(),
1409 buf,
1410 buf_capacity,
1411 0,
1412 false,
1413 self,
1414 )?;
1415
1416 trace!(
1417 "sending PATH_CHALLENGE {:08x} to NAT candidate {}",
1418 challenge, remote_addr
1419 );
1420 buf.write(frame::FrameType::PATH_CHALLENGE);
1421 buf.write(challenge);
1422 self.stats.frame_tx.path_challenge += 1;
1423
1424 #[cfg(feature = "pqc")]
1426 let min_size = self.pqc_state.min_initial_size();
1427 #[cfg(not(feature = "pqc"))]
1428 let min_size = MIN_INITIAL_SIZE;
1429 builder.pad_to(min_size);
1430
1431 builder.finish_and_track(now, self, None, buf);
1432
1433 Some(Transmit {
1434 destination: remote_addr,
1435 size: buf.len(),
1436 ecn: if self.path.sending_ecn {
1437 Some(EcnCodepoint::Ect0)
1438 } else {
1439 None
1440 },
1441 segment_size: None,
1442 src_ip: self.local_ip,
1443 })
1444 }
1445
1446 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1448 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1449 if !prev_path.challenge_pending {
1450 return None;
1451 }
1452 prev_path.challenge_pending = false;
1453 let token = prev_path
1454 .challenge
1455 .expect("previous path challenge pending without token");
1456 let destination = prev_path.remote;
1457 debug_assert_eq!(
1458 self.highest_space,
1459 SpaceId::Data,
1460 "PATH_CHALLENGE queued without 1-RTT keys"
1461 );
1462 #[cfg(feature = "pqc")]
1463 buf.reserve(self.pqc_state.min_initial_size() as usize);
1464 #[cfg(not(feature = "pqc"))]
1465 buf.reserve(MIN_INITIAL_SIZE as usize);
1466
1467 let buf_capacity = buf.capacity();
1468
1469 let mut builder = PacketBuilder::new(
1475 now,
1476 SpaceId::Data,
1477 *prev_cid,
1478 buf,
1479 buf_capacity,
1480 0,
1481 false,
1482 self,
1483 )?;
1484 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1485 buf.write(frame::FrameType::PATH_CHALLENGE);
1486 buf.write(token);
1487 self.stats.frame_tx.path_challenge += 1;
1488
1489 #[cfg(feature = "pqc")]
1494 let min_size = self.pqc_state.min_initial_size();
1495 #[cfg(not(feature = "pqc"))]
1496 let min_size = MIN_INITIAL_SIZE;
1497 builder.pad_to(min_size);
1498
1499 builder.finish(self, buf);
1500 self.stats.udp_tx.on_sent(1, buf.len());
1501
1502 Some(Transmit {
1503 destination,
1504 size: buf.len(),
1505 ecn: None,
1506 segment_size: None,
1507 src_ip: self.local_ip,
1508 })
1509 }
1510
1511 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1513 if self.spaces[space_id].crypto.is_none()
1514 && (space_id != SpaceId::Data
1515 || self.zero_rtt_crypto.is_none()
1516 || self.side.is_server())
1517 {
1518 return SendableFrames::empty();
1520 }
1521 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1522 if space_id == SpaceId::Data {
1523 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1524 }
1525 can_send
1526 }
1527
1528 pub fn handle_event(&mut self, event: ConnectionEvent) {
1534 use ConnectionEventInner::*;
1535 match event.0 {
1536 Datagram(DatagramConnectionEvent {
1537 now,
1538 remote,
1539 ecn,
1540 first_decode,
1541 remaining,
1542 }) => {
1543 if remote != self.path.remote && !self.side.remote_may_migrate() {
1547 trace!("discarding packet from unrecognized peer {}", remote);
1548 return;
1549 }
1550
1551 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1552
1553 self.stats.udp_rx.datagrams += 1;
1554 self.stats.udp_rx.bytes += first_decode.len() as u64;
1555 let data_len = first_decode.len();
1556
1557 self.handle_decode(now, remote, ecn, first_decode);
1558 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1563
1564 if let Some(data) = remaining {
1565 self.stats.udp_rx.bytes += data.len() as u64;
1566 self.handle_coalesced(now, remote, ecn, data);
1567 }
1568
1569 #[cfg(feature = "__qlog")]
1570 self.emit_qlog_recovery_metrics(now);
1571
1572 if was_anti_amplification_blocked {
1573 self.set_loss_detection_timer(now);
1577 }
1578 }
1579 NewIdentifiers(ids, now) => {
1580 self.local_cid_state.new_cids(&ids, now);
1581 ids.into_iter().rev().for_each(|frame| {
1582 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1583 });
1584 if self.timers.get(Timer::PushNewCid).is_none_or(|x| x <= now) {
1586 self.reset_cid_retirement();
1587 }
1588 }
1589 QueueAddAddress(add) => {
1590 self.spaces[SpaceId::Data].pending.add_addresses.push(add);
1592 }
1593 QueuePunchMeNow(punch) => {
1594 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch);
1596 }
1597 }
1598 }
1599
1600 pub fn handle_timeout(&mut self, now: Instant) {
1610 for &timer in &Timer::VALUES {
1611 if !self.timers.is_expired(timer, now) {
1612 continue;
1613 }
1614 self.timers.stop(timer);
1615 trace!(timer = ?timer, "timeout");
1616 match timer {
1617 Timer::Close => {
1618 self.state = State::Drained;
1619 self.endpoint_events.push_back(EndpointEventInner::Drained);
1620 }
1621 Timer::Idle => {
1622 self.kill(ConnectionError::TimedOut);
1623 }
1624 Timer::KeepAlive => {
1625 trace!("sending keep-alive");
1626 self.ping();
1627 }
1628 Timer::LossDetection => {
1629 self.on_loss_detection_timeout(now);
1630
1631 #[cfg(feature = "__qlog")]
1632 self.emit_qlog_recovery_metrics(now);
1633 }
1634 Timer::KeyDiscard => {
1635 self.zero_rtt_crypto = None;
1636 self.prev_crypto = None;
1637 }
1638 Timer::PathValidation => {
1639 debug!("path validation failed");
1640 if let Some((_, prev)) = self.prev_path.take() {
1641 self.path = prev;
1642 }
1643 self.path.challenge = None;
1644 self.path.challenge_pending = false;
1645 }
1646 Timer::Pacing => trace!("pacing timer expired"),
1647 Timer::NatTraversal => {
1648 self.handle_nat_traversal_timeout(now);
1649 }
1650 Timer::PushNewCid => {
1651 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1653 if !self.state.is_closed() {
1654 trace!(
1655 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1656 self.local_cid_state.retire_prior_to()
1657 );
1658 self.endpoint_events
1659 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1660 }
1661 }
1662 Timer::MaxAckDelay => {
1663 trace!("max ack delay reached");
1664 self.spaces[SpaceId::Data]
1666 .pending_acks
1667 .on_max_ack_delay_timeout()
1668 }
1669 }
1670 }
1671 }
1672
1673 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1685 self.close_inner(
1686 now,
1687 Close::Application(frame::ApplicationClose { error_code, reason }),
1688 )
1689 }
1690
1691 fn close_inner(&mut self, now: Instant, reason: Close) {
1692 let was_closed = self.state.is_closed();
1693 if !was_closed {
1694 self.close_common();
1695 self.set_close_timer(now);
1696 self.close = true;
1697 self.state = State::Closed(state::Closed { reason });
1698 }
1699 }
1700
1701 pub fn datagrams(&mut self) -> Datagrams<'_> {
1703 Datagrams { conn: self }
1704 }
1705
1706 pub fn stats(&self) -> ConnectionStats {
1708 let mut stats = self.stats;
1709 stats.path.rtt = self.path.rtt.get();
1710 stats.path.cwnd = self.path.congestion.window();
1711 stats.path.current_mtu = self.path.mtud.current_mtu();
1712
1713 stats
1714 }
1715
1716 pub fn ping(&mut self) {
1720 self.spaces[self.highest_space].ping_pending = true;
1721 }
1722
1723 pub fn force_key_update(&mut self) {
1727 if !self.state.is_established() {
1728 debug!("ignoring forced key update in illegal state");
1729 return;
1730 }
1731 if self.prev_crypto.is_some() {
1732 debug!("ignoring redundant forced key update");
1735 return;
1736 }
1737 self.update_keys(None, false);
1738 }
1739
1740 #[doc(hidden)]
1742 #[deprecated]
1743 pub fn initiate_key_update(&mut self) {
1744 self.force_key_update();
1745 }
1746
1747 pub fn crypto_session(&self) -> &dyn crypto::Session {
1749 &*self.crypto
1750 }
1751
1752 pub fn is_handshaking(&self) -> bool {
1757 self.state.is_handshake()
1758 }
1759
1760 pub fn is_closed(&self) -> bool {
1768 self.state.is_closed()
1769 }
1770
1771 pub fn is_drained(&self) -> bool {
1776 self.state.is_drained()
1777 }
1778
1779 pub fn accepted_0rtt(&self) -> bool {
1783 self.accepted_0rtt
1784 }
1785
1786 pub fn has_0rtt(&self) -> bool {
1788 self.zero_rtt_enabled
1789 }
1790
1791 pub fn has_pending_retransmits(&self) -> bool {
1793 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1794 }
1795
1796 pub fn side(&self) -> Side {
1798 self.side.side()
1799 }
1800
1801 pub fn remote_address(&self) -> SocketAddr {
1803 self.path.remote
1804 }
1805
1806 pub fn local_ip(&self) -> Option<IpAddr> {
1816 self.local_ip
1817 }
1818
1819 pub fn rtt(&self) -> Duration {
1821 self.path.rtt.get()
1822 }
1823
1824 pub fn congestion_state(&self) -> &dyn Controller {
1826 self.path.congestion.as_ref()
1827 }
1828
1829 pub fn path_changed(&mut self, now: Instant) {
1840 self.path.reset(now, &self.config);
1841 }
1842
1843 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1848 self.streams.set_max_concurrent(dir, count);
1849 let pending = &mut self.spaces[SpaceId::Data].pending;
1852 self.streams.queue_max_stream_id(pending);
1853 }
1854
1855 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1861 self.streams.max_concurrent(dir)
1862 }
1863
1864 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1866 if self.streams.set_receive_window(receive_window) {
1867 self.spaces[SpaceId::Data].pending.max_data = true;
1868 }
1869 }
1870
1871 pub fn set_address_discovery_enabled(&mut self, enabled: bool) {
1873 if let Some(ref mut state) = self.address_discovery_state {
1874 state.enabled = enabled;
1875 }
1876 }
1877
1878 pub fn address_discovery_enabled(&self) -> bool {
1880 self.address_discovery_state
1881 .as_ref()
1882 .is_some_and(|state| state.enabled)
1883 }
1884
1885 pub fn observed_address(&self) -> Option<SocketAddr> {
1890 self.address_discovery_state
1891 .as_ref()
1892 .and_then(|state| state.get_observed_address(0)) }
1894
1895 pub(crate) fn address_discovery_state(&self) -> Option<&AddressDiscoveryState> {
1897 self.address_discovery_state.as_ref()
1898 }
1899
1900 fn on_ack_received(
1901 &mut self,
1902 now: Instant,
1903 space: SpaceId,
1904 ack: frame::Ack,
1905 ) -> Result<(), TransportError> {
1906 if ack.largest >= self.spaces[space].next_packet_number {
1907 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1908 }
1909 let new_largest = {
1910 let space = &mut self.spaces[space];
1911 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
1912 space.largest_acked_packet = Some(ack.largest);
1913 if let Some(info) = space.sent_packets.get(&ack.largest) {
1914 space.largest_acked_packet_sent = info.time_sent;
1918 }
1919 true
1920 } else {
1921 false
1922 }
1923 };
1924
1925 let mut newly_acked = ArrayRangeSet::new();
1927 for range in ack.iter() {
1928 self.packet_number_filter.check_ack(space, range.clone())?;
1929 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1930 newly_acked.insert_one(pn);
1931 }
1932 }
1933
1934 if newly_acked.is_empty() {
1935 return Ok(());
1936 }
1937
1938 let mut ack_eliciting_acked = false;
1939 for packet in newly_acked.elts() {
1940 if let Some(info) = self.spaces[space].take(packet) {
1941 if let Some(acked) = info.largest_acked {
1942 self.spaces[space].pending_acks.subtract_below(acked);
1948 }
1949 ack_eliciting_acked |= info.ack_eliciting;
1950
1951 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1953 if mtu_updated {
1954 self.path
1955 .congestion
1956 .on_mtu_update(self.path.mtud.current_mtu());
1957 }
1958
1959 self.ack_frequency.on_acked(packet);
1961
1962 self.on_packet_acked(now, packet, info);
1963 }
1964 }
1965
1966 self.path.congestion.on_end_acks(
1967 now,
1968 self.path.in_flight.bytes,
1969 self.app_limited,
1970 self.spaces[space].largest_acked_packet,
1971 );
1972
1973 if new_largest && ack_eliciting_acked {
1974 let ack_delay = if space != SpaceId::Data {
1975 Duration::from_micros(0)
1976 } else {
1977 cmp::min(
1978 self.ack_frequency.peer_max_ack_delay,
1979 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1980 )
1981 };
1982 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1983 self.path.rtt.update(ack_delay, rtt);
1984 if self.path.first_packet_after_rtt_sample.is_none() {
1985 self.path.first_packet_after_rtt_sample =
1986 Some((space, self.spaces[space].next_packet_number));
1987 }
1988 }
1989
1990 self.detect_lost_packets(now, space, true);
1992
1993 if self.peer_completed_address_validation() {
1994 self.pto_count = 0;
1995 }
1996
1997 if self.path.sending_ecn {
1999 if let Some(ecn) = ack.ecn {
2000 if new_largest {
2005 let sent = self.spaces[space].largest_acked_packet_sent;
2006 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
2007 }
2008 } else {
2009 debug!("ECN not acknowledged by peer");
2011 self.path.sending_ecn = false;
2012 }
2013 }
2014
2015 self.set_loss_detection_timer(now);
2016 Ok(())
2017 }
2018
2019 fn process_ecn(
2021 &mut self,
2022 now: Instant,
2023 space: SpaceId,
2024 newly_acked: u64,
2025 ecn: frame::EcnCounts,
2026 largest_sent_time: Instant,
2027 ) {
2028 match self.spaces[space].detect_ecn(newly_acked, ecn) {
2029 Err(e) => {
2030 debug!("halting ECN due to verification failure: {}", e);
2031 self.path.sending_ecn = false;
2032 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
2035 }
2036 Ok(false) => {}
2037 Ok(true) => {
2038 self.stats.path.congestion_events += 1;
2039 self.path
2040 .congestion
2041 .on_congestion_event(now, largest_sent_time, false, 0);
2042 }
2043 }
2044 }
2045
2046 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
2049 self.remove_in_flight(pn, &info);
2050 if info.ack_eliciting && self.path.challenge.is_none() {
2051 self.path.congestion.on_ack(
2054 now,
2055 info.time_sent,
2056 info.size.into(),
2057 self.app_limited,
2058 &self.path.rtt,
2059 );
2060 }
2061
2062 if let Some(retransmits) = info.retransmits.get() {
2064 for (id, _) in retransmits.reset_stream.iter() {
2065 self.streams.reset_acked(*id);
2066 }
2067 }
2068
2069 for frame in info.stream_frames {
2070 self.streams.received_ack_of(frame);
2071 }
2072 }
2073
2074 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2075 let start = if self.zero_rtt_crypto.is_some() {
2076 now
2077 } else {
2078 self.prev_crypto
2079 .as_ref()
2080 .expect("no previous keys")
2081 .end_packet
2082 .as_ref()
2083 .expect("update not acknowledged yet")
2084 .1
2085 };
2086 self.timers
2087 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
2088 }
2089
2090 fn on_loss_detection_timeout(&mut self, now: Instant) {
2091 if let Some((_, pn_space)) = self.loss_time_and_space() {
2092 self.detect_lost_packets(now, pn_space, false);
2094 self.set_loss_detection_timer(now);
2095 return;
2096 }
2097
2098 let (_, space) = match self.pto_time_and_space(now) {
2099 Some(x) => x,
2100 None => {
2101 error!("PTO expired while unset");
2102 return;
2103 }
2104 };
2105 trace!(
2106 in_flight = self.path.in_flight.bytes,
2107 count = self.pto_count,
2108 ?space,
2109 "PTO fired"
2110 );
2111
2112 let count = match self.path.in_flight.ack_eliciting {
2113 0 => {
2116 debug_assert!(!self.peer_completed_address_validation());
2117 1
2118 }
2119 _ => 2,
2121 };
2122 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
2123 self.pto_count = self.pto_count.saturating_add(1);
2124 self.set_loss_detection_timer(now);
2125 }
2126
2127 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
2128 let mut lost_packets = Vec::<u64>::new();
2129 let mut lost_mtu_probe = None;
2130 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
2131 let rtt = self.path.rtt.conservative();
2132 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
2133
2134 let lost_send_time = now.checked_sub(loss_delay).unwrap();
2136 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
2137 let packet_threshold = self.config.packet_threshold as u64;
2138 let mut size_of_lost_packets = 0u64;
2139
2140 let congestion_period =
2144 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
2145 let mut persistent_congestion_start: Option<Instant> = None;
2146 let mut prev_packet = None;
2147 let mut in_persistent_congestion = false;
2148
2149 let space = &mut self.spaces[pn_space];
2150 space.loss_time = None;
2151
2152 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
2153 if prev_packet != Some(packet.wrapping_sub(1)) {
2154 persistent_congestion_start = None;
2156 }
2157
2158 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
2159 {
2160 if Some(packet) == in_flight_mtu_probe {
2161 lost_mtu_probe = in_flight_mtu_probe;
2164 } else {
2165 lost_packets.push(packet);
2166 size_of_lost_packets += info.size as u64;
2167 if info.ack_eliciting && due_to_ack {
2168 match persistent_congestion_start {
2169 Some(start) if info.time_sent - start > congestion_period => {
2172 in_persistent_congestion = true;
2173 }
2174 None if self
2176 .path
2177 .first_packet_after_rtt_sample
2178 .is_some_and(|x| x < (pn_space, packet)) =>
2179 {
2180 persistent_congestion_start = Some(info.time_sent);
2181 }
2182 _ => {}
2183 }
2184 }
2185 }
2186 } else {
2187 let next_loss_time = info.time_sent + loss_delay;
2188 space.loss_time = Some(
2189 space
2190 .loss_time
2191 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
2192 );
2193 persistent_congestion_start = None;
2194 }
2195
2196 prev_packet = Some(packet);
2197 }
2198
2199 if let Some(largest_lost) = lost_packets.last().cloned() {
2201 let old_bytes_in_flight = self.path.in_flight.bytes;
2202 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
2203 self.lost_packets += lost_packets.len() as u64;
2204 self.stats.path.lost_packets += lost_packets.len() as u64;
2205 self.stats.path.lost_bytes += size_of_lost_packets;
2206 trace!(
2207 "packets lost: {:?}, bytes lost: {}",
2208 lost_packets, size_of_lost_packets
2209 );
2210
2211 for &packet in &lost_packets {
2212 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2214 for frame in info.stream_frames {
2215 self.streams.retransmit(frame);
2216 }
2217 self.spaces[pn_space].pending |= info.retransmits;
2218 self.path.mtud.on_non_probe_lost(packet, info.size);
2219 }
2220
2221 if self.path.mtud.black_hole_detected(now) {
2222 self.stats.path.black_holes_detected += 1;
2223 self.path
2224 .congestion
2225 .on_mtu_update(self.path.mtud.current_mtu());
2226 if let Some(max_datagram_size) = self.datagrams().max_size() {
2227 self.datagrams.drop_oversized(max_datagram_size);
2228 }
2229 }
2230
2231 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
2233
2234 if lost_ack_eliciting {
2235 self.stats.path.congestion_events += 1;
2236 self.path.congestion.on_congestion_event(
2237 now,
2238 largest_lost_sent,
2239 in_persistent_congestion,
2240 size_of_lost_packets,
2241 );
2242 }
2243 }
2244
2245 if let Some(packet) = lost_mtu_probe {
2247 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2249 self.path.mtud.on_probe_lost();
2250 self.stats.path.lost_plpmtud_probes += 1;
2251 }
2252 }
2253
2254 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
2255 SpaceId::iter()
2256 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
2257 .min_by_key(|&(time, _)| time)
2258 }
2259
2260 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
2261 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
2262 let mut duration = self.path.rtt.pto_base() * backoff;
2263
2264 if self.path.in_flight.ack_eliciting == 0 {
2265 debug_assert!(!self.peer_completed_address_validation());
2266 let space = match self.highest_space {
2267 SpaceId::Handshake => SpaceId::Handshake,
2268 _ => SpaceId::Initial,
2269 };
2270 return Some((now + duration, space));
2271 }
2272
2273 let mut result = None;
2274 for space in SpaceId::iter() {
2275 if self.spaces[space].in_flight == 0 {
2276 continue;
2277 }
2278 if space == SpaceId::Data {
2279 if self.is_handshaking() {
2281 return result;
2282 }
2283 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2285 }
2286 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
2287 Some(time) => time,
2288 None => continue,
2289 };
2290 let pto = last_ack_eliciting + duration;
2291 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2292 result = Some((pto, space));
2293 }
2294 }
2295 result
2296 }
2297
2298 fn peer_completed_address_validation(&self) -> bool {
2299 if self.side.is_server() || self.state.is_closed() {
2300 return true;
2301 }
2302 self.spaces[SpaceId::Handshake]
2305 .largest_acked_packet
2306 .is_some()
2307 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
2308 || (self.spaces[SpaceId::Data].crypto.is_some()
2309 && self.spaces[SpaceId::Handshake].crypto.is_none())
2310 }
2311
2312 fn set_loss_detection_timer(&mut self, now: Instant) {
2313 if self.state.is_closed() {
2314 return;
2318 }
2319
2320 if let Some((loss_time, _)) = self.loss_time_and_space() {
2321 self.timers.set(Timer::LossDetection, loss_time);
2323 return;
2324 }
2325
2326 if self.path.anti_amplification_blocked(1) {
2327 self.timers.stop(Timer::LossDetection);
2329 return;
2330 }
2331
2332 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
2333 self.timers.stop(Timer::LossDetection);
2336 return;
2337 }
2338
2339 if let Some((timeout, _)) = self.pto_time_and_space(now) {
2342 self.timers.set(Timer::LossDetection, timeout);
2343 } else {
2344 self.timers.stop(Timer::LossDetection);
2345 }
2346 }
2347
2348 fn pto(&self, space: SpaceId) -> Duration {
2350 let max_ack_delay = match space {
2351 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
2352 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
2353 };
2354 self.path.rtt.pto_base() + max_ack_delay
2355 }
2356
2357 fn on_packet_authenticated(
2358 &mut self,
2359 now: Instant,
2360 space_id: SpaceId,
2361 ecn: Option<EcnCodepoint>,
2362 packet: Option<u64>,
2363 spin: bool,
2364 is_1rtt: bool,
2365 ) {
2366 self.total_authed_packets += 1;
2367 self.reset_keep_alive(now);
2368 self.reset_idle_timeout(now, space_id);
2369 self.permit_idle_reset = true;
2370 self.receiving_ecn |= ecn.is_some();
2371 if let Some(x) = ecn {
2372 let space = &mut self.spaces[space_id];
2373 space.ecn_counters += x;
2374
2375 if x.is_ce() {
2376 space.pending_acks.set_immediate_ack_required();
2377 }
2378 }
2379
2380 let packet = match packet {
2381 Some(x) => x,
2382 None => return,
2383 };
2384 if self.side.is_server() {
2385 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
2386 self.discard_space(now, SpaceId::Initial);
2388 }
2389 if self.zero_rtt_crypto.is_some() && is_1rtt {
2390 self.set_key_discard_timer(now, space_id)
2392 }
2393 }
2394 let space = &mut self.spaces[space_id];
2395 space.pending_acks.insert_one(packet, now);
2396 if packet >= space.rx_packet {
2397 space.rx_packet = packet;
2398 self.spin = self.side.is_client() ^ spin;
2400 }
2401 }
2402
2403 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
2404 let timeout = match self.idle_timeout {
2405 None => return,
2406 Some(dur) => dur,
2407 };
2408 if self.state.is_closed() {
2409 self.timers.stop(Timer::Idle);
2410 return;
2411 }
2412 let dt = cmp::max(timeout, 3 * self.pto(space));
2413 self.timers.set(Timer::Idle, now + dt);
2414 }
2415
2416 fn reset_keep_alive(&mut self, now: Instant) {
2417 let interval = match self.config.keep_alive_interval {
2418 Some(x) if self.state.is_established() => x,
2419 _ => return,
2420 };
2421 self.timers.set(Timer::KeepAlive, now + interval);
2422 }
2423
2424 fn reset_cid_retirement(&mut self) {
2425 if let Some(t) = self.local_cid_state.next_timeout() {
2426 self.timers.set(Timer::PushNewCid, t);
2427 }
2428 }
2429
2430 pub(crate) fn handle_first_packet(
2435 &mut self,
2436 now: Instant,
2437 remote: SocketAddr,
2438 ecn: Option<EcnCodepoint>,
2439 packet_number: u64,
2440 packet: InitialPacket,
2441 remaining: Option<BytesMut>,
2442 ) -> Result<(), ConnectionError> {
2443 let span = trace_span!("first recv");
2444 let _guard = span.enter();
2445 debug_assert!(self.side.is_server());
2446 let len = packet.header_data.len() + packet.payload.len();
2447 self.path.total_recvd = len as u64;
2448
2449 match self.state {
2450 State::Handshake(ref mut state) => {
2451 state.expected_token = packet.header.token.clone();
2452 }
2453 _ => unreachable!("first packet must be delivered in Handshake state"),
2454 }
2455
2456 self.on_packet_authenticated(
2457 now,
2458 SpaceId::Initial,
2459 ecn,
2460 Some(packet_number),
2461 false,
2462 false,
2463 );
2464
2465 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2466 if let Some(data) = remaining {
2467 self.handle_coalesced(now, remote, ecn, data);
2468 }
2469
2470 #[cfg(feature = "__qlog")]
2471 self.emit_qlog_recovery_metrics(now);
2472
2473 Ok(())
2474 }
2475
2476 fn init_0rtt(&mut self) {
2477 let (header, packet) = match self.crypto.early_crypto() {
2478 Some(x) => x,
2479 None => return,
2480 };
2481 if self.side.is_client() {
2482 match self.crypto.transport_parameters() {
2483 Ok(params) => {
2484 let params = params
2485 .expect("crypto layer didn't supply transport parameters with ticket");
2486 let params = TransportParameters {
2488 initial_src_cid: None,
2489 original_dst_cid: None,
2490 preferred_address: None,
2491 retry_src_cid: None,
2492 stateless_reset_token: None,
2493 min_ack_delay: None,
2494 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2495 max_ack_delay: TransportParameters::default().max_ack_delay,
2496 ..params
2497 };
2498 self.set_peer_params(params);
2499 }
2500 Err(e) => {
2501 error!("session ticket has malformed transport parameters: {}", e);
2502 return;
2503 }
2504 }
2505 }
2506 trace!("0-RTT enabled");
2507 self.zero_rtt_enabled = true;
2508 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2509 }
2510
2511 fn read_crypto(
2512 &mut self,
2513 space: SpaceId,
2514 crypto: &frame::Crypto,
2515 payload_len: usize,
2516 ) -> Result<(), TransportError> {
2517 let expected = if !self.state.is_handshake() {
2518 SpaceId::Data
2519 } else if self.highest_space == SpaceId::Initial {
2520 SpaceId::Initial
2521 } else {
2522 SpaceId::Handshake
2525 };
2526 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2530
2531 let end = crypto.offset + crypto.data.len() as u64;
2532 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2533 warn!(
2534 "received new {:?} CRYPTO data when expecting {:?}",
2535 space, expected
2536 );
2537 return Err(TransportError::PROTOCOL_VIOLATION(
2538 "new data at unexpected encryption level",
2539 ));
2540 }
2541
2542 #[cfg(feature = "pqc")]
2544 {
2545 self.pqc_state.detect_pqc_from_crypto(&crypto.data, space);
2546
2547 if self.pqc_state.should_trigger_mtu_discovery() {
2549 self.path
2551 .mtud
2552 .reset(self.pqc_state.min_initial_size(), self.config.min_mtu);
2553 trace!("Triggered MTU discovery for PQC handshake");
2554 }
2555 }
2556
2557 let space = &mut self.spaces[space];
2558 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2559 if max > self.config.crypto_buffer_size as u64 {
2560 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2561 }
2562
2563 space
2564 .crypto_stream
2565 .insert(crypto.offset, crypto.data.clone(), payload_len);
2566 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2567 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2568 if self.crypto.read_handshake(&chunk.bytes)? {
2569 self.events.push_back(Event::HandshakeDataReady);
2570 }
2571 }
2572
2573 Ok(())
2574 }
2575
2576 fn write_crypto(&mut self) {
2577 loop {
2578 let space = self.highest_space;
2579 let mut outgoing = Vec::new();
2580 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2581 match space {
2582 SpaceId::Initial => {
2583 self.upgrade_crypto(SpaceId::Handshake, crypto);
2584 }
2585 SpaceId::Handshake => {
2586 self.upgrade_crypto(SpaceId::Data, crypto);
2587 }
2588 _ => unreachable!("got updated secrets during 1-RTT"),
2589 }
2590 }
2591 if outgoing.is_empty() {
2592 if space == self.highest_space {
2593 break;
2594 } else {
2595 continue;
2597 }
2598 }
2599 let offset = self.spaces[space].crypto_offset;
2600 let outgoing = Bytes::from(outgoing);
2601 if let State::Handshake(ref mut state) = self.state {
2602 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2603 state.client_hello = Some(outgoing.clone());
2604 }
2605 }
2606 self.spaces[space].crypto_offset += outgoing.len() as u64;
2607 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2608
2609 #[cfg(feature = "pqc")]
2611 let use_pqc_fragmentation = self.pqc_state.using_pqc && outgoing.len() > 1200;
2612 #[cfg(not(feature = "pqc"))]
2613 let use_pqc_fragmentation = false;
2614
2615 if use_pqc_fragmentation {
2616 #[cfg(feature = "pqc")]
2618 {
2619 let frames = self.pqc_state.packet_handler.fragment_crypto_data(
2620 &outgoing,
2621 offset,
2622 self.pqc_state.min_initial_size() as usize,
2623 );
2624 for frame in frames {
2625 self.spaces[space].pending.crypto.push_back(frame);
2626 }
2627 }
2628 } else {
2629 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2631 offset,
2632 data: outgoing,
2633 });
2634 }
2635 }
2636 }
2637
2638 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2640 debug_assert!(
2641 self.spaces[space].crypto.is_none(),
2642 "already reached packet space {space:?}"
2643 );
2644 trace!("{:?} keys ready", space);
2645 if space == SpaceId::Data {
2646 self.next_crypto = Some(
2648 self.crypto
2649 .next_1rtt_keys()
2650 .expect("handshake should be complete"),
2651 );
2652 }
2653
2654 self.spaces[space].crypto = Some(crypto);
2655 debug_assert!(space as usize > self.highest_space as usize);
2656 self.highest_space = space;
2657 if space == SpaceId::Data && self.side.is_client() {
2658 self.zero_rtt_crypto = None;
2660 }
2661 }
2662
2663 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2664 debug_assert!(space_id != SpaceId::Data);
2665 trace!("discarding {:?} keys", space_id);
2666 if space_id == SpaceId::Initial {
2667 if let ConnectionSide::Client { token, .. } = &mut self.side {
2669 *token = Bytes::new();
2670 }
2671 }
2672 let space = &mut self.spaces[space_id];
2673 space.crypto = None;
2674 space.time_of_last_ack_eliciting_packet = None;
2675 space.loss_time = None;
2676 space.in_flight = 0;
2677 let sent_packets = mem::take(&mut space.sent_packets);
2678 for (pn, packet) in sent_packets.into_iter() {
2679 self.remove_in_flight(pn, &packet);
2680 }
2681 self.set_loss_detection_timer(now)
2682 }
2683
2684 fn handle_coalesced(
2685 &mut self,
2686 now: Instant,
2687 remote: SocketAddr,
2688 ecn: Option<EcnCodepoint>,
2689 data: BytesMut,
2690 ) {
2691 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2692 let mut remaining = Some(data);
2693 while let Some(data) = remaining {
2694 match PartialDecode::new(
2695 data,
2696 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2697 &[self.version],
2698 self.endpoint_config.grease_quic_bit,
2699 ) {
2700 Ok((partial_decode, rest)) => {
2701 remaining = rest;
2702 self.handle_decode(now, remote, ecn, partial_decode);
2703 }
2704 Err(e) => {
2705 trace!("malformed header: {}", e);
2706 return;
2707 }
2708 }
2709 }
2710 }
2711
2712 fn handle_decode(
2713 &mut self,
2714 now: Instant,
2715 remote: SocketAddr,
2716 ecn: Option<EcnCodepoint>,
2717 partial_decode: PartialDecode,
2718 ) {
2719 if let Some(decoded) = packet_crypto::unprotect_header(
2720 partial_decode,
2721 &self.spaces,
2722 self.zero_rtt_crypto.as_ref(),
2723 self.peer_params.stateless_reset_token,
2724 ) {
2725 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2726 }
2727 }
2728
2729 fn handle_packet(
2730 &mut self,
2731 now: Instant,
2732 remote: SocketAddr,
2733 ecn: Option<EcnCodepoint>,
2734 packet: Option<Packet>,
2735 stateless_reset: bool,
2736 ) {
2737 self.stats.udp_rx.ios += 1;
2738 if let Some(ref packet) = packet {
2739 trace!(
2740 "got {:?} packet ({} bytes) from {} using id {}",
2741 packet.header.space(),
2742 packet.payload.len() + packet.header_data.len(),
2743 remote,
2744 packet.header.dst_cid(),
2745 );
2746
2747 #[cfg(feature = "trace")]
2749 {
2750 use crate::trace_packet_received;
2751 let packet_size = packet.payload.len() + packet.header_data.len();
2753 trace_packet_received!(
2754 &self.event_log,
2755 self.trace_context.trace_id(),
2756 packet_size as u32,
2757 0 );
2759 }
2760 }
2761
2762 if self.is_handshaking() && remote != self.path.remote {
2763 debug!("discarding packet with unexpected remote during handshake");
2764 return;
2765 }
2766
2767 let was_closed = self.state.is_closed();
2768 let was_drained = self.state.is_drained();
2769
2770 let decrypted = match packet {
2771 None => Err(None),
2772 Some(mut packet) => self
2773 .decrypt_packet(now, &mut packet)
2774 .map(move |number| (packet, number)),
2775 };
2776 let result = match decrypted {
2777 _ if stateless_reset => {
2778 debug!("got stateless reset");
2779 Err(ConnectionError::Reset)
2780 }
2781 Err(Some(e)) => {
2782 warn!("illegal packet: {}", e);
2783 Err(e.into())
2784 }
2785 Err(None) => {
2786 debug!("failed to authenticate packet");
2787 self.authentication_failures += 1;
2788 let integrity_limit = self.spaces[self.highest_space]
2789 .crypto
2790 .as_ref()
2791 .unwrap()
2792 .packet
2793 .local
2794 .integrity_limit();
2795 if self.authentication_failures > integrity_limit {
2796 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2797 } else {
2798 return;
2799 }
2800 }
2801 Ok((packet, number)) => {
2802 let span = match number {
2803 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2804 None => trace_span!("recv", space = ?packet.header.space()),
2805 };
2806 let _guard = span.enter();
2807
2808 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2809 if number.is_some_and(is_duplicate) {
2810 debug!("discarding possible duplicate packet");
2811 return;
2812 } else if self.state.is_handshake() && packet.header.is_short() {
2813 trace!("dropping short packet during handshake");
2815 return;
2816 } else {
2817 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2818 if let State::Handshake(ref hs) = self.state {
2819 if self.side.is_server() && token != &hs.expected_token {
2820 warn!("discarding Initial with invalid retry token");
2824 return;
2825 }
2826 }
2827 }
2828
2829 if !self.state.is_closed() {
2830 let spin = match packet.header {
2831 Header::Short { spin, .. } => spin,
2832 _ => false,
2833 };
2834 self.on_packet_authenticated(
2835 now,
2836 packet.header.space(),
2837 ecn,
2838 number,
2839 spin,
2840 packet.header.is_1rtt(),
2841 );
2842 }
2843
2844 self.process_decrypted_packet(now, remote, number, packet)
2845 }
2846 }
2847 };
2848
2849 if let Err(conn_err) = result {
2851 self.error = Some(conn_err.clone());
2852 self.state = match conn_err {
2853 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2854 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2855 ConnectionError::Reset
2856 | ConnectionError::TransportError(TransportError {
2857 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2858 ..
2859 }) => State::Drained,
2860 ConnectionError::TimedOut => {
2861 unreachable!("timeouts aren't generated by packet processing");
2862 }
2863 ConnectionError::TransportError(err) => {
2864 debug!("closing connection due to transport error: {}", err);
2865 State::closed(err)
2866 }
2867 ConnectionError::VersionMismatch => State::Draining,
2868 ConnectionError::LocallyClosed => {
2869 unreachable!("LocallyClosed isn't generated by packet processing");
2870 }
2871 ConnectionError::CidsExhausted => {
2872 unreachable!("CidsExhausted isn't generated by packet processing");
2873 }
2874 };
2875 }
2876
2877 if !was_closed && self.state.is_closed() {
2878 self.close_common();
2879 if !self.state.is_drained() {
2880 self.set_close_timer(now);
2881 }
2882 }
2883 if !was_drained && self.state.is_drained() {
2884 self.endpoint_events.push_back(EndpointEventInner::Drained);
2885 self.timers.stop(Timer::Close);
2888 }
2889
2890 if let State::Closed(_) = self.state {
2892 self.close = remote == self.path.remote;
2893 }
2894 }
2895
2896 fn process_decrypted_packet(
2897 &mut self,
2898 now: Instant,
2899 remote: SocketAddr,
2900 number: Option<u64>,
2901 packet: Packet,
2902 ) -> Result<(), ConnectionError> {
2903 let state = match self.state {
2904 State::Established => {
2905 match packet.header.space() {
2906 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2907 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2908 _ => {
2909 trace!("discarding unexpected pre-handshake packet");
2910 }
2911 }
2912 return Ok(());
2913 }
2914 State::Closed(_) => {
2915 for result in frame::Iter::new(packet.payload.freeze())? {
2916 let frame = match result {
2917 Ok(frame) => frame,
2918 Err(err) => {
2919 debug!("frame decoding error: {err:?}");
2920 continue;
2921 }
2922 };
2923
2924 if let Frame::Padding = frame {
2925 continue;
2926 };
2927
2928 self.stats.frame_rx.record(&frame);
2929
2930 if let Frame::Close(_) = frame {
2931 trace!("draining");
2932 self.state = State::Draining;
2933 break;
2934 }
2935 }
2936 return Ok(());
2937 }
2938 State::Draining | State::Drained => return Ok(()),
2939 State::Handshake(ref mut state) => state,
2940 };
2941
2942 match packet.header {
2943 Header::Retry {
2944 src_cid: rem_cid, ..
2945 } => {
2946 if self.side.is_server() {
2947 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2948 }
2949
2950 if self.total_authed_packets > 1
2951 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2953 &self.rem_cids.active(),
2954 &packet.header_data,
2955 &packet.payload,
2956 )
2957 {
2958 trace!("discarding invalid Retry");
2959 return Ok(());
2967 }
2968
2969 trace!("retrying with CID {}", rem_cid);
2970 let client_hello = state.client_hello.take().unwrap();
2971 self.retry_src_cid = Some(rem_cid);
2972 self.rem_cids.update_initial_cid(rem_cid);
2973 self.rem_handshake_cid = rem_cid;
2974
2975 let space = &mut self.spaces[SpaceId::Initial];
2976 if let Some(info) = space.take(0) {
2977 self.on_packet_acked(now, 0, info);
2978 };
2979
2980 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
2982 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2983 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2984 crypto_offset: client_hello.len() as u64,
2985 ..PacketSpace::new(now)
2986 };
2987 self.spaces[SpaceId::Initial]
2988 .pending
2989 .crypto
2990 .push_back(frame::Crypto {
2991 offset: 0,
2992 data: client_hello,
2993 });
2994
2995 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2997 for (pn, info) in zero_rtt {
2998 self.remove_in_flight(pn, &info);
2999 self.spaces[SpaceId::Data].pending |= info.retransmits;
3000 }
3001 self.streams.retransmit_all_for_0rtt();
3002
3003 let token_len = packet.payload.len() - 16;
3004 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3005 unreachable!("we already short-circuited if we're server");
3006 };
3007 *token = packet.payload.freeze().split_to(token_len);
3008 self.state = State::Handshake(state::Handshake {
3009 expected_token: Bytes::new(),
3010 rem_cid_set: false,
3011 client_hello: None,
3012 });
3013 Ok(())
3014 }
3015 Header::Long {
3016 ty: LongType::Handshake,
3017 src_cid: rem_cid,
3018 ..
3019 } => {
3020 if rem_cid != self.rem_handshake_cid {
3021 debug!(
3022 "discarding packet with mismatched remote CID: {} != {}",
3023 self.rem_handshake_cid, rem_cid
3024 );
3025 return Ok(());
3026 }
3027 self.on_path_validated();
3028
3029 self.process_early_payload(now, packet)?;
3030 if self.state.is_closed() {
3031 return Ok(());
3032 }
3033
3034 if self.crypto.is_handshaking() {
3035 trace!("handshake ongoing");
3036 return Ok(());
3037 }
3038
3039 if self.side.is_client() {
3040 let params =
3042 self.crypto
3043 .transport_parameters()?
3044 .ok_or_else(|| TransportError {
3045 code: TransportErrorCode::crypto(0x6d),
3046 frame: None,
3047 reason: "transport parameters missing".into(),
3048 })?;
3049
3050 if self.has_0rtt() {
3051 if !self.crypto.early_data_accepted().unwrap() {
3052 debug_assert!(self.side.is_client());
3053 debug!("0-RTT rejected");
3054 self.accepted_0rtt = false;
3055 self.streams.zero_rtt_rejected();
3056
3057 self.spaces[SpaceId::Data].pending = Retransmits::default();
3059
3060 let sent_packets =
3062 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
3063 for (pn, packet) in sent_packets {
3064 self.remove_in_flight(pn, &packet);
3065 }
3066 } else {
3067 self.accepted_0rtt = true;
3068 params.validate_resumption_from(&self.peer_params)?;
3069 }
3070 }
3071 if let Some(token) = params.stateless_reset_token {
3072 self.endpoint_events
3073 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
3074 }
3075 self.handle_peer_params(params)?;
3076 self.issue_first_cids(now);
3077 } else {
3078 self.spaces[SpaceId::Data].pending.handshake_done = true;
3080 self.discard_space(now, SpaceId::Handshake);
3081 }
3082
3083 self.events.push_back(Event::Connected);
3084 self.state = State::Established;
3085 trace!("established");
3086 Ok(())
3087 }
3088 Header::Initial(InitialHeader {
3089 src_cid: rem_cid, ..
3090 }) => {
3091 if !state.rem_cid_set {
3092 trace!("switching remote CID to {}", rem_cid);
3093 let mut state = state.clone();
3094 self.rem_cids.update_initial_cid(rem_cid);
3095 self.rem_handshake_cid = rem_cid;
3096 self.orig_rem_cid = rem_cid;
3097 state.rem_cid_set = true;
3098 self.state = State::Handshake(state);
3099 } else if rem_cid != self.rem_handshake_cid {
3100 debug!(
3101 "discarding packet with mismatched remote CID: {} != {}",
3102 self.rem_handshake_cid, rem_cid
3103 );
3104 return Ok(());
3105 }
3106
3107 let starting_space = self.highest_space;
3108 self.process_early_payload(now, packet)?;
3109
3110 if self.side.is_server()
3111 && starting_space == SpaceId::Initial
3112 && self.highest_space != SpaceId::Initial
3113 {
3114 let params =
3115 self.crypto
3116 .transport_parameters()?
3117 .ok_or_else(|| TransportError {
3118 code: TransportErrorCode::crypto(0x6d),
3119 frame: None,
3120 reason: "transport parameters missing".into(),
3121 })?;
3122 self.handle_peer_params(params)?;
3123 self.issue_first_cids(now);
3124 self.init_0rtt();
3125 }
3126 Ok(())
3127 }
3128 Header::Long {
3129 ty: LongType::ZeroRtt,
3130 ..
3131 } => {
3132 self.process_payload(now, remote, number.unwrap(), packet)?;
3133 Ok(())
3134 }
3135 Header::VersionNegotiate { .. } => {
3136 if self.total_authed_packets > 1 {
3137 return Ok(());
3138 }
3139 let supported = packet
3140 .payload
3141 .chunks(4)
3142 .any(|x| match <[u8; 4]>::try_from(x) {
3143 Ok(version) => self.version == u32::from_be_bytes(version),
3144 Err(_) => false,
3145 });
3146 if supported {
3147 return Ok(());
3148 }
3149 debug!("remote doesn't support our version");
3150 Err(ConnectionError::VersionMismatch)
3151 }
3152 Header::Short { .. } => unreachable!(
3153 "short packets received during handshake are discarded in handle_packet"
3154 ),
3155 }
3156 }
3157
3158 fn process_early_payload(
3160 &mut self,
3161 now: Instant,
3162 packet: Packet,
3163 ) -> Result<(), TransportError> {
3164 debug_assert_ne!(packet.header.space(), SpaceId::Data);
3165 let payload_len = packet.payload.len();
3166 let mut ack_eliciting = false;
3167 for result in frame::Iter::new(packet.payload.freeze())? {
3168 let frame = result?;
3169 let span = match frame {
3170 Frame::Padding => continue,
3171 _ => Some(trace_span!("frame", ty = %frame.ty())),
3172 };
3173
3174 self.stats.frame_rx.record(&frame);
3175
3176 let _guard = span.as_ref().map(|x| x.enter());
3177 ack_eliciting |= frame.is_ack_eliciting();
3178
3179 match frame {
3181 Frame::Padding | Frame::Ping => {}
3182 Frame::Crypto(frame) => {
3183 self.read_crypto(packet.header.space(), &frame, payload_len)?;
3184 }
3185 Frame::Ack(ack) => {
3186 self.on_ack_received(now, packet.header.space(), ack)?;
3187 }
3188 Frame::Close(reason) => {
3189 self.error = Some(reason.into());
3190 self.state = State::Draining;
3191 return Ok(());
3192 }
3193 _ => {
3194 let mut err =
3195 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
3196 err.frame = Some(frame.ty());
3197 return Err(err);
3198 }
3199 }
3200 }
3201
3202 if ack_eliciting {
3203 self.spaces[packet.header.space()]
3205 .pending_acks
3206 .set_immediate_ack_required();
3207 }
3208
3209 self.write_crypto();
3210 Ok(())
3211 }
3212
3213 fn process_payload(
3214 &mut self,
3215 now: Instant,
3216 remote: SocketAddr,
3217 number: u64,
3218 packet: Packet,
3219 ) -> Result<(), TransportError> {
3220 let payload = packet.payload.freeze();
3221 let mut is_probing_packet = true;
3222 let mut close = None;
3223 let payload_len = payload.len();
3224 let mut ack_eliciting = false;
3225 for result in frame::Iter::new(payload)? {
3226 let frame = result?;
3227 let span = match frame {
3228 Frame::Padding => continue,
3229 _ => Some(trace_span!("frame", ty = %frame.ty())),
3230 };
3231
3232 self.stats.frame_rx.record(&frame);
3233 match &frame {
3236 Frame::Crypto(f) => {
3237 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
3238 }
3239 Frame::Stream(f) => {
3240 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
3241 }
3242 Frame::Datagram(f) => {
3243 trace!(len = f.data.len(), "got datagram frame");
3244 }
3245 f => {
3246 trace!("got frame {:?}", f);
3247 }
3248 }
3249
3250 let _guard = span.as_ref().map(|x| x.enter());
3251 if packet.header.is_0rtt() {
3252 match frame {
3253 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
3254 return Err(TransportError::PROTOCOL_VIOLATION(
3255 "illegal frame type in 0-RTT",
3256 ));
3257 }
3258 _ => {}
3259 }
3260 }
3261 ack_eliciting |= frame.is_ack_eliciting();
3262
3263 match frame {
3265 Frame::Padding
3266 | Frame::PathChallenge(_)
3267 | Frame::PathResponse(_)
3268 | Frame::NewConnectionId(_) => {}
3269 _ => {
3270 is_probing_packet = false;
3271 }
3272 }
3273 match frame {
3274 Frame::Crypto(frame) => {
3275 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
3276 }
3277 Frame::Stream(frame) => {
3278 if self.streams.received(frame, payload_len)?.should_transmit() {
3279 self.spaces[SpaceId::Data].pending.max_data = true;
3280 }
3281 }
3282 Frame::Ack(ack) => {
3283 self.on_ack_received(now, SpaceId::Data, ack)?;
3284 }
3285 Frame::Padding | Frame::Ping => {}
3286 Frame::Close(reason) => {
3287 close = Some(reason);
3288 }
3289 Frame::PathChallenge(token) => {
3290 self.path_responses.push(number, token, remote);
3291 if remote == self.path.remote {
3292 match self.peer_supports_ack_frequency() {
3295 true => self.immediate_ack(),
3296 false => self.ping(),
3297 }
3298 }
3299 }
3300 Frame::PathResponse(token) => {
3301 if self.path.challenge == Some(token) && remote == self.path.remote {
3302 trace!("new path validated");
3303 self.timers.stop(Timer::PathValidation);
3304 self.path.challenge = None;
3305 self.path.validated = true;
3306 if let Some((_, ref mut prev_path)) = self.prev_path {
3307 prev_path.challenge = None;
3308 prev_path.challenge_pending = false;
3309 }
3310 self.on_path_validated();
3311 } else if let Some(nat_traversal) = &mut self.nat_traversal {
3312 match nat_traversal.handle_validation_success(remote, token, now) {
3314 Ok(sequence) => {
3315 trace!(
3316 "NAT traversal candidate {} validated for sequence {}",
3317 remote, sequence
3318 );
3319
3320 if nat_traversal.handle_coordination_success(remote, now) {
3322 trace!("Coordination succeeded via {}", remote);
3323
3324 let can_migrate = match &self.side {
3326 ConnectionSide::Client { .. } => true, ConnectionSide::Server { server_config } => {
3328 server_config.migration
3329 }
3330 };
3331
3332 if can_migrate {
3333 let best_pairs = nat_traversal.get_best_succeeded_pairs();
3335 if let Some(best) = best_pairs.first() {
3336 if best.remote_addr == remote
3337 && best.remote_addr != self.path.remote
3338 {
3339 debug!(
3340 "NAT traversal found better path, initiating migration"
3341 );
3342 if let Err(e) =
3344 self.migrate_to_nat_traversal_path(now)
3345 {
3346 warn!(
3347 "Failed to migrate to NAT traversal path: {:?}",
3348 e
3349 );
3350 }
3351 }
3352 }
3353 }
3354 } else {
3355 if nat_traversal.mark_pair_succeeded(remote) {
3357 trace!("NAT traversal pair succeeded for {}", remote);
3358 }
3359 }
3360 }
3361 Err(NatTraversalError::ChallengeMismatch) => {
3362 debug!(
3363 "PATH_RESPONSE challenge mismatch for NAT candidate {}",
3364 remote
3365 );
3366 }
3367 Err(e) => {
3368 debug!("NAT traversal validation error: {}", e);
3369 }
3370 }
3371 } else {
3372 debug!(token, "ignoring invalid PATH_RESPONSE");
3373 }
3374 }
3375 Frame::MaxData(bytes) => {
3376 self.streams.received_max_data(bytes);
3377 }
3378 Frame::MaxStreamData { id, offset } => {
3379 self.streams.received_max_stream_data(id, offset)?;
3380 }
3381 Frame::MaxStreams { dir, count } => {
3382 self.streams.received_max_streams(dir, count)?;
3383 }
3384 Frame::ResetStream(frame) => {
3385 if self.streams.received_reset(frame)?.should_transmit() {
3386 self.spaces[SpaceId::Data].pending.max_data = true;
3387 }
3388 }
3389 Frame::DataBlocked { offset } => {
3390 debug!(offset, "peer claims to be blocked at connection level");
3391 }
3392 Frame::StreamDataBlocked { id, offset } => {
3393 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
3394 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
3395 return Err(TransportError::STREAM_STATE_ERROR(
3396 "STREAM_DATA_BLOCKED on send-only stream",
3397 ));
3398 }
3399 debug!(
3400 stream = %id,
3401 offset, "peer claims to be blocked at stream level"
3402 );
3403 }
3404 Frame::StreamsBlocked { dir, limit } => {
3405 if limit > MAX_STREAM_COUNT {
3406 return Err(TransportError::FRAME_ENCODING_ERROR(
3407 "unrepresentable stream limit",
3408 ));
3409 }
3410 debug!(
3411 "peer claims to be blocked opening more than {} {} streams",
3412 limit, dir
3413 );
3414 }
3415 Frame::StopSending(frame::StopSending { id, error_code }) => {
3416 if id.initiator() != self.side.side() {
3417 if id.dir() == Dir::Uni {
3418 debug!("got STOP_SENDING on recv-only {}", id);
3419 return Err(TransportError::STREAM_STATE_ERROR(
3420 "STOP_SENDING on recv-only stream",
3421 ));
3422 }
3423 } else if self.streams.is_local_unopened(id) {
3424 return Err(TransportError::STREAM_STATE_ERROR(
3425 "STOP_SENDING on unopened stream",
3426 ));
3427 }
3428 self.streams.received_stop_sending(id, error_code);
3429 }
3430 Frame::RetireConnectionId { sequence } => {
3431 let allow_more_cids = self
3432 .local_cid_state
3433 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
3434 self.endpoint_events
3435 .push_back(EndpointEventInner::RetireConnectionId(
3436 now,
3437 sequence,
3438 allow_more_cids,
3439 ));
3440 }
3441 Frame::NewConnectionId(frame) => {
3442 trace!(
3443 sequence = frame.sequence,
3444 id = %frame.id,
3445 retire_prior_to = frame.retire_prior_to,
3446 );
3447 if self.rem_cids.active().is_empty() {
3448 return Err(TransportError::PROTOCOL_VIOLATION(
3449 "NEW_CONNECTION_ID when CIDs aren't in use",
3450 ));
3451 }
3452 if frame.retire_prior_to > frame.sequence {
3453 return Err(TransportError::PROTOCOL_VIOLATION(
3454 "NEW_CONNECTION_ID retiring unissued CIDs",
3455 ));
3456 }
3457
3458 use crate::cid_queue::InsertError;
3459 match self.rem_cids.insert(frame) {
3460 Ok(None) => {}
3461 Ok(Some((retired, reset_token))) => {
3462 let pending_retired =
3463 &mut self.spaces[SpaceId::Data].pending.retire_cids;
3464 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
3467 if (pending_retired.len() as u64)
3470 .saturating_add(retired.end.saturating_sub(retired.start))
3471 > MAX_PENDING_RETIRED_CIDS
3472 {
3473 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
3474 "queued too many retired CIDs",
3475 ));
3476 }
3477 pending_retired.extend(retired);
3478 self.set_reset_token(reset_token);
3479 }
3480 Err(InsertError::ExceedsLimit) => {
3481 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
3482 }
3483 Err(InsertError::Retired) => {
3484 trace!("discarding already-retired");
3485 self.spaces[SpaceId::Data]
3489 .pending
3490 .retire_cids
3491 .push(frame.sequence);
3492 continue;
3493 }
3494 };
3495
3496 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
3497 self.update_rem_cid();
3500 }
3501 }
3502 Frame::NewToken(NewToken { token }) => {
3503 let ConnectionSide::Client {
3504 token_store,
3505 server_name,
3506 ..
3507 } = &self.side
3508 else {
3509 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
3510 };
3511 if token.is_empty() {
3512 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
3513 }
3514 trace!("got new token");
3515 token_store.insert(server_name, token);
3516 }
3517 Frame::Datagram(datagram) => {
3518 if self
3519 .datagrams
3520 .received(datagram, &self.config.datagram_receive_buffer_size)?
3521 {
3522 self.events.push_back(Event::DatagramReceived);
3523 }
3524 }
3525 Frame::AckFrequency(ack_frequency) => {
3526 let space = &mut self.spaces[SpaceId::Data];
3528
3529 if !self
3530 .ack_frequency
3531 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3532 {
3533 continue;
3535 }
3536
3537 if let Some(timeout) = space
3540 .pending_acks
3541 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3542 {
3543 self.timers.set(Timer::MaxAckDelay, timeout);
3544 }
3545 }
3546 Frame::ImmediateAck => {
3547 self.spaces[SpaceId::Data]
3549 .pending_acks
3550 .set_immediate_ack_required();
3551 }
3552 Frame::HandshakeDone => {
3553 if self.side.is_server() {
3554 return Err(TransportError::PROTOCOL_VIOLATION(
3555 "client sent HANDSHAKE_DONE",
3556 ));
3557 }
3558 if self.spaces[SpaceId::Handshake].crypto.is_some() {
3559 self.discard_space(now, SpaceId::Handshake);
3560 }
3561 }
3562 Frame::AddAddress(add_address) => {
3563 self.handle_add_address(&add_address, now)?;
3564 }
3565 Frame::PunchMeNow(punch_me_now) => {
3566 self.handle_punch_me_now(&punch_me_now, now)?;
3567 }
3568 Frame::RemoveAddress(remove_address) => {
3569 self.handle_remove_address(&remove_address)?;
3570 }
3571 Frame::ObservedAddress(observed_address) => {
3572 self.handle_observed_address_frame(&observed_address, now)?;
3573 }
3574 }
3575 }
3576
3577 let space = &mut self.spaces[SpaceId::Data];
3578 if space
3579 .pending_acks
3580 .packet_received(now, number, ack_eliciting, &space.dedup)
3581 {
3582 self.timers
3583 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3584 }
3585
3586 let pending = &mut self.spaces[SpaceId::Data].pending;
3591 self.streams.queue_max_stream_id(pending);
3592
3593 if let Some(reason) = close {
3594 self.error = Some(reason.into());
3595 self.state = State::Draining;
3596 self.close = true;
3597 }
3598
3599 if remote != self.path.remote
3600 && !is_probing_packet
3601 && number == self.spaces[SpaceId::Data].rx_packet
3602 {
3603 let ConnectionSide::Server { ref server_config } = self.side else {
3604 return Err(TransportError::PROTOCOL_VIOLATION(
3605 "packets from unknown remote should be dropped by clients",
3606 ));
3607 };
3608 debug_assert!(
3609 server_config.migration,
3610 "migration-initiating packets should have been dropped immediately"
3611 );
3612 self.migrate(now, remote);
3613 self.update_rem_cid();
3615 self.spin = false;
3616 }
3617
3618 Ok(())
3619 }
3620
3621 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3622 trace!(%remote, "migration initiated");
3623 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3627 PathData::from_previous(remote, &self.path, now)
3628 } else {
3629 let peer_max_udp_payload_size =
3630 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3631 .unwrap_or(u16::MAX);
3632 PathData::new(
3633 remote,
3634 self.allow_mtud,
3635 Some(peer_max_udp_payload_size),
3636 now,
3637 &self.config,
3638 )
3639 };
3640 new_path.challenge = Some(self.rng.r#gen());
3641 new_path.challenge_pending = true;
3642 let prev_pto = self.pto(SpaceId::Data);
3643
3644 let mut prev = mem::replace(&mut self.path, new_path);
3645 if prev.challenge.is_none() {
3647 prev.challenge = Some(self.rng.r#gen());
3648 prev.challenge_pending = true;
3649 self.prev_path = Some((self.rem_cids.active(), prev));
3652 }
3653
3654 self.timers.set(
3655 Timer::PathValidation,
3656 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3657 );
3658 }
3659
3660 pub fn local_address_changed(&mut self) {
3662 self.update_rem_cid();
3663 self.ping();
3664 }
3665
3666 pub fn migrate_to_nat_traversal_path(&mut self, now: Instant) -> Result<(), TransportError> {
3668 let (remote_addr, local_addr) = {
3670 let nat_state = self
3671 .nat_traversal
3672 .as_ref()
3673 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
3674
3675 let best_pairs = nat_state.get_best_succeeded_pairs();
3677 if best_pairs.is_empty() {
3678 return Err(TransportError::PROTOCOL_VIOLATION(
3679 "No validated NAT traversal paths",
3680 ));
3681 }
3682
3683 let best_path = best_pairs
3685 .iter()
3686 .find(|pair| pair.remote_addr != self.path.remote)
3687 .or_else(|| best_pairs.first());
3688
3689 let best_path = best_path.ok_or_else(|| {
3690 TransportError::PROTOCOL_VIOLATION("No suitable NAT traversal path")
3691 })?;
3692
3693 debug!(
3694 "Migrating to NAT traversal path: {} -> {} (priority: {})",
3695 self.path.remote, best_path.remote_addr, best_path.priority
3696 );
3697
3698 (best_path.remote_addr, best_path.local_addr)
3699 };
3700
3701 self.migrate(now, remote_addr);
3703
3704 if local_addr != SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0) {
3706 self.local_ip = Some(local_addr.ip());
3707 }
3708
3709 self.path.challenge_pending = true;
3711
3712 Ok(())
3713 }
3714
3715 fn update_rem_cid(&mut self) {
3717 let (reset_token, retired) = match self.rem_cids.next() {
3718 Some(x) => x,
3719 None => return,
3720 };
3721
3722 self.spaces[SpaceId::Data]
3724 .pending
3725 .retire_cids
3726 .extend(retired);
3727 self.set_reset_token(reset_token);
3728 }
3729
3730 fn set_reset_token(&mut self, reset_token: ResetToken) {
3731 self.endpoint_events
3732 .push_back(EndpointEventInner::ResetToken(
3733 self.path.remote,
3734 reset_token,
3735 ));
3736 self.peer_params.stateless_reset_token = Some(reset_token);
3737 }
3738
3739 fn issue_first_cids(&mut self, now: Instant) {
3741 if self.local_cid_state.cid_len() == 0 {
3742 return;
3743 }
3744
3745 let mut n = self.peer_params.issue_cids_limit() - 1;
3747 if let ConnectionSide::Server { server_config } = &self.side {
3748 if server_config.has_preferred_address() {
3749 n -= 1;
3751 }
3752 }
3753 self.endpoint_events
3754 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3755 }
3756
3757 fn populate_packet(
3758 &mut self,
3759 now: Instant,
3760 space_id: SpaceId,
3761 buf: &mut Vec<u8>,
3762 max_size: usize,
3763 pn: u64,
3764 ) -> SentFrames {
3765 let mut sent = SentFrames::default();
3766 let space = &mut self.spaces[space_id];
3767 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3768 space.pending_acks.maybe_ack_non_eliciting();
3769
3770 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3772 buf.write(frame::FrameType::HANDSHAKE_DONE);
3773 sent.retransmits.get_or_create().handshake_done = true;
3774 self.stats.frame_tx.handshake_done =
3776 self.stats.frame_tx.handshake_done.saturating_add(1);
3777 }
3778
3779 if mem::replace(&mut space.ping_pending, false) {
3781 trace!("PING");
3782 buf.write(frame::FrameType::PING);
3783 sent.non_retransmits = true;
3784 self.stats.frame_tx.ping += 1;
3785 }
3786
3787 if mem::replace(&mut space.immediate_ack_pending, false) {
3789 trace!("IMMEDIATE_ACK");
3790 buf.write(frame::FrameType::IMMEDIATE_ACK);
3791 sent.non_retransmits = true;
3792 self.stats.frame_tx.immediate_ack += 1;
3793 }
3794
3795 if space.pending_acks.can_send() {
3797 Self::populate_acks(
3798 now,
3799 self.receiving_ecn,
3800 &mut sent,
3801 space,
3802 buf,
3803 &mut self.stats,
3804 );
3805 }
3806
3807 if mem::replace(&mut space.pending.ack_frequency, false) {
3809 let sequence_number = self.ack_frequency.next_sequence_number();
3810
3811 let config = self.config.ack_frequency_config.as_ref().unwrap();
3813
3814 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3816 self.path.rtt.get(),
3817 config,
3818 &self.peer_params,
3819 );
3820
3821 trace!(?max_ack_delay, "ACK_FREQUENCY");
3822
3823 frame::AckFrequency {
3824 sequence: sequence_number,
3825 ack_eliciting_threshold: config.ack_eliciting_threshold,
3826 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3827 reordering_threshold: config.reordering_threshold,
3828 }
3829 .encode(buf);
3830
3831 sent.retransmits.get_or_create().ack_frequency = true;
3832
3833 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3834 self.stats.frame_tx.ack_frequency += 1;
3835 }
3836
3837 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3839 if let Some(token) = self.path.challenge {
3841 self.path.challenge_pending = false;
3843 sent.non_retransmits = true;
3844 sent.requires_padding = true;
3845 trace!("PATH_CHALLENGE {:08x}", token);
3846 buf.write(frame::FrameType::PATH_CHALLENGE);
3847 buf.write(token);
3848 self.stats.frame_tx.path_challenge += 1;
3849 }
3850
3851 }
3860
3861 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3863 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3864 sent.non_retransmits = true;
3865 sent.requires_padding = true;
3866 trace!("PATH_RESPONSE {:08x}", token);
3867 buf.write(frame::FrameType::PATH_RESPONSE);
3868 buf.write(token);
3869 self.stats.frame_tx.path_response += 1;
3870 }
3871 }
3872
3873 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3875 let mut frame = match space.pending.crypto.pop_front() {
3876 Some(x) => x,
3877 None => break,
3878 };
3879
3880 let max_crypto_data_size = max_size
3885 - buf.len()
3886 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3888 - 2; let available_space = max_size - buf.len();
3892 let remaining_data = frame.data.len();
3893 #[cfg(feature = "pqc")]
3894 let optimal_size = self
3895 .pqc_state
3896 .calculate_crypto_frame_size(available_space, remaining_data);
3897 #[cfg(not(feature = "pqc"))]
3898 let optimal_size = available_space.min(remaining_data);
3899
3900 let len = frame
3901 .data
3902 .len()
3903 .min(2usize.pow(14) - 1)
3904 .min(max_crypto_data_size)
3905 .min(optimal_size);
3906
3907 let data = frame.data.split_to(len);
3908 let truncated = frame::Crypto {
3909 offset: frame.offset,
3910 data,
3911 };
3912 trace!(
3913 "CRYPTO: off {} len {}",
3914 truncated.offset,
3915 truncated.data.len()
3916 );
3917 truncated.encode(buf);
3918 self.stats.frame_tx.crypto += 1;
3919 sent.retransmits.get_or_create().crypto.push_back(truncated);
3920 if !frame.data.is_empty() {
3921 frame.offset += len as u64;
3922 space.pending.crypto.push_front(frame);
3923 }
3924 }
3925
3926 if space_id == SpaceId::Data {
3927 self.streams.write_control_frames(
3928 buf,
3929 &mut space.pending,
3930 &mut sent.retransmits,
3931 &mut self.stats.frame_tx,
3932 max_size,
3933 );
3934 }
3935
3936 while buf.len() + 44 < max_size {
3938 let issued = match space.pending.new_cids.pop() {
3939 Some(x) => x,
3940 None => break,
3941 };
3942 trace!(
3943 sequence = issued.sequence,
3944 id = %issued.id,
3945 "NEW_CONNECTION_ID"
3946 );
3947 frame::NewConnectionId {
3948 sequence: issued.sequence,
3949 retire_prior_to: self.local_cid_state.retire_prior_to(),
3950 id: issued.id,
3951 reset_token: issued.reset_token,
3952 }
3953 .encode(buf);
3954 sent.retransmits.get_or_create().new_cids.push(issued);
3955 self.stats.frame_tx.new_connection_id += 1;
3956 }
3957
3958 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3960 let seq = match space.pending.retire_cids.pop() {
3961 Some(x) => x,
3962 None => break,
3963 };
3964 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3965 buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3966 buf.write_var(seq);
3967 sent.retransmits.get_or_create().retire_cids.push(seq);
3968 self.stats.frame_tx.retire_connection_id += 1;
3969 }
3970
3971 let mut sent_datagrams = false;
3973 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3974 match self.datagrams.write(buf, max_size) {
3975 true => {
3976 sent_datagrams = true;
3977 sent.non_retransmits = true;
3978 self.stats.frame_tx.datagram += 1;
3979 }
3980 false => break,
3981 }
3982 }
3983 if self.datagrams.send_blocked && sent_datagrams {
3984 self.events.push_back(Event::DatagramsUnblocked);
3985 self.datagrams.send_blocked = false;
3986 }
3987
3988 while let Some(remote_addr) = space.pending.new_tokens.pop() {
3990 debug_assert_eq!(space_id, SpaceId::Data);
3991 let ConnectionSide::Server { server_config } = &self.side else {
3992 debug_assert!(false, "NEW_TOKEN frames should not be enqueued by clients");
3994 continue;
3995 };
3996
3997 if remote_addr != self.path.remote {
3998 continue;
4003 }
4004
4005 let token = Token::new(
4006 TokenPayload::Validation {
4007 ip: remote_addr.ip(),
4008 issued: server_config.time_source.now(),
4009 },
4010 &mut self.rng,
4011 );
4012 let new_token = NewToken {
4013 token: token.encode(&*server_config.token_key).into(),
4014 };
4015
4016 if buf.len() + new_token.size() >= max_size {
4017 space.pending.new_tokens.push(remote_addr);
4018 break;
4019 }
4020
4021 new_token.encode(buf);
4022 sent.retransmits
4023 .get_or_create()
4024 .new_tokens
4025 .push(remote_addr);
4026 self.stats.frame_tx.new_token += 1;
4027 }
4028
4029 while buf.len() + frame::AddAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4031 let add_address = match space.pending.add_addresses.pop() {
4032 Some(x) => x,
4033 None => break,
4034 };
4035 trace!(
4036 sequence = %add_address.sequence,
4037 address = %add_address.address,
4038 "ADD_ADDRESS"
4039 );
4040 if self.nat_traversal_frame_config.use_rfc_format {
4042 add_address.encode_rfc(buf);
4043 } else {
4044 add_address.encode_legacy(buf);
4045 }
4046 sent.retransmits
4047 .get_or_create()
4048 .add_addresses
4049 .push(add_address);
4050 self.stats.frame_tx.add_address += 1;
4051 }
4052
4053 while buf.len() + frame::PunchMeNow::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4055 let punch_me_now = match space.pending.punch_me_now.pop() {
4056 Some(x) => x,
4057 None => break,
4058 };
4059 trace!(
4060 round = %punch_me_now.round,
4061 paired_with_sequence_number = %punch_me_now.paired_with_sequence_number,
4062 "PUNCH_ME_NOW"
4063 );
4064 if self.nat_traversal_frame_config.use_rfc_format {
4066 punch_me_now.encode_rfc(buf);
4067 } else {
4068 punch_me_now.encode_legacy(buf);
4069 }
4070 sent.retransmits
4071 .get_or_create()
4072 .punch_me_now
4073 .push(punch_me_now);
4074 self.stats.frame_tx.punch_me_now += 1;
4075 }
4076
4077 while buf.len() + frame::RemoveAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
4079 let remove_address = match space.pending.remove_addresses.pop() {
4080 Some(x) => x,
4081 None => break,
4082 };
4083 trace!(
4084 sequence = %remove_address.sequence,
4085 "REMOVE_ADDRESS"
4086 );
4087 remove_address.encode(buf);
4089 sent.retransmits
4090 .get_or_create()
4091 .remove_addresses
4092 .push(remove_address);
4093 self.stats.frame_tx.remove_address += 1;
4094 }
4095
4096 while buf.len() + frame::ObservedAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data
4098 {
4099 let observed_address = match space.pending.observed_addresses.pop() {
4100 Some(x) => x,
4101 None => break,
4102 };
4103 trace!(
4104 address = %observed_address.address,
4105 "OBSERVED_ADDRESS"
4106 );
4107 observed_address.encode(buf);
4108 sent.retransmits
4109 .get_or_create()
4110 .observed_addresses
4111 .push(observed_address);
4112 self.stats.frame_tx.observed_address += 1;
4113 }
4114
4115 if space_id == SpaceId::Data {
4117 sent.stream_frames =
4118 self.streams
4119 .write_stream_frames(buf, max_size, self.config.send_fairness);
4120 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
4121 }
4122
4123 sent
4124 }
4125
4126 fn populate_acks(
4131 now: Instant,
4132 receiving_ecn: bool,
4133 sent: &mut SentFrames,
4134 space: &mut PacketSpace,
4135 buf: &mut Vec<u8>,
4136 stats: &mut ConnectionStats,
4137 ) {
4138 debug_assert!(!space.pending_acks.ranges().is_empty());
4139
4140 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
4142 let ecn = if receiving_ecn {
4143 Some(&space.ecn_counters)
4144 } else {
4145 None
4146 };
4147 sent.largest_acked = space.pending_acks.ranges().max();
4148
4149 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
4150
4151 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
4153 let delay = delay_micros >> ack_delay_exp.into_inner();
4154
4155 trace!(
4156 "ACK {:?}, Delay = {}us",
4157 space.pending_acks.ranges(),
4158 delay_micros
4159 );
4160
4161 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
4162 stats.frame_tx.acks += 1;
4163 }
4164
4165 fn close_common(&mut self) {
4166 trace!("connection closed");
4167 for &timer in &Timer::VALUES {
4168 self.timers.stop(timer);
4169 }
4170 }
4171
4172 fn set_close_timer(&mut self, now: Instant) {
4173 self.timers
4174 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
4175 }
4176
4177 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
4179 if Some(self.orig_rem_cid) != params.initial_src_cid
4180 || (self.side.is_client()
4181 && (Some(self.initial_dst_cid) != params.original_dst_cid
4182 || self.retry_src_cid != params.retry_src_cid))
4183 {
4184 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
4185 "CID authentication failure",
4186 ));
4187 }
4188
4189 self.set_peer_params(params);
4190
4191 Ok(())
4192 }
4193
4194 fn set_peer_params(&mut self, params: TransportParameters) {
4195 self.streams.set_params(¶ms);
4196 self.idle_timeout =
4197 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
4198 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
4199 if let Some(ref info) = params.preferred_address {
4200 self.rem_cids.insert(frame::NewConnectionId {
4201 sequence: 1,
4202 id: info.connection_id,
4203 reset_token: info.stateless_reset_token,
4204 retire_prior_to: 0,
4205 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
4206 }
4207 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
4208
4209 self.negotiate_nat_traversal_capability(¶ms);
4211
4212 let local_has_nat_traversal = self.config.nat_traversal_config.is_some();
4215 let local_supports_rfc = local_has_nat_traversal;
4218 self.nat_traversal_frame_config = frame::nat_traversal_unified::NatTraversalFrameConfig {
4219 use_rfc_format: local_supports_rfc && params.supports_rfc_nat_traversal(),
4221 accept_legacy: true,
4223 };
4224
4225 self.negotiate_address_discovery(¶ms);
4227
4228 #[cfg(feature = "pqc")]
4230 {
4231 self.pqc_state.update_from_peer_params(¶ms);
4232
4233 if self.pqc_state.enabled && self.pqc_state.using_pqc {
4235 trace!("PQC enabled, adjusting MTU discovery for larger handshake packets");
4236 let current_mtu = self.path.mtud.current_mtu();
4240 if current_mtu < self.pqc_state.handshake_mtu {
4241 trace!(
4242 "Current MTU {} is less than PQC handshake MTU {}, will rely on MTU discovery",
4243 current_mtu, self.pqc_state.handshake_mtu
4244 );
4245 }
4246 }
4247 }
4248
4249 self.peer_params = params;
4250 self.path.mtud.on_peer_max_udp_payload_size_received(
4251 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
4252 );
4253 }
4254
4255 fn negotiate_nat_traversal_capability(&mut self, params: &TransportParameters) {
4257 let peer_nat_config = match ¶ms.nat_traversal {
4259 Some(config) => config,
4260 None => {
4261 if self.config.nat_traversal_config.is_some() {
4263 debug!(
4264 "Peer does not support NAT traversal, maintaining backward compatibility"
4265 );
4266 self.emit_nat_traversal_capability_event(false);
4267
4268 self.set_nat_traversal_compatibility_mode(false);
4270 }
4271 return;
4272 }
4273 };
4274
4275 let local_nat_config = match &self.config.nat_traversal_config {
4277 Some(config) => config,
4278 None => {
4279 debug!("NAT traversal not enabled locally, ignoring peer support");
4280 self.emit_nat_traversal_capability_event(false);
4281 self.set_nat_traversal_compatibility_mode(false);
4282 return;
4283 }
4284 };
4285
4286 info!("Both peers support NAT traversal, negotiating capabilities");
4288
4289 match self.negotiate_nat_traversal_parameters(local_nat_config, peer_nat_config) {
4291 Ok(negotiated_config) => {
4292 info!("NAT traversal capability negotiated successfully");
4293 self.emit_nat_traversal_capability_event(true);
4294
4295 self.init_nat_traversal_with_negotiated_config(&negotiated_config);
4297
4298 self.set_nat_traversal_compatibility_mode(true);
4300
4301 if matches!(
4303 negotiated_config,
4304 crate::transport_parameters::NatTraversalConfig::ClientSupport
4305 ) {
4306 self.initiate_nat_traversal_process();
4307 }
4308 }
4309 Err(e) => {
4310 warn!("NAT traversal capability negotiation failed: {}", e);
4311 self.emit_nat_traversal_capability_event(false);
4312 self.set_nat_traversal_compatibility_mode(false);
4313 }
4314 }
4315 }
4316
4317 fn emit_nat_traversal_capability_event(&mut self, negotiated: bool) {
4371 if negotiated {
4374 info!("NAT traversal capability successfully negotiated");
4375 } else {
4376 info!("NAT traversal capability not available (peer or local support missing)");
4377 }
4378
4379 }
4382
4383 fn set_nat_traversal_compatibility_mode(&mut self, enabled: bool) {
4385 if enabled {
4386 debug!("NAT traversal enabled for this connection");
4387 } else {
4389 debug!("NAT traversal disabled for this connection (backward compatibility mode)");
4390 if self.nat_traversal.is_some() {
4392 warn!("Clearing NAT traversal state due to compatibility mode");
4393 self.nat_traversal = None;
4394 }
4395 }
4396 }
4397
4398 fn negotiate_nat_traversal_parameters(
4400 &self,
4401 local_config: &crate::transport_parameters::NatTraversalConfig,
4402 peer_config: &crate::transport_parameters::NatTraversalConfig,
4403 ) -> Result<crate::transport_parameters::NatTraversalConfig, String> {
4404 match (local_config, peer_config) {
4409 (
4411 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4412 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4413 concurrency_limit,
4414 },
4415 ) => Ok(
4416 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4417 concurrency_limit: *concurrency_limit,
4418 },
4419 ),
4420 (
4422 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4423 concurrency_limit,
4424 },
4425 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4426 ) => Ok(
4427 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4428 concurrency_limit: *concurrency_limit,
4429 },
4430 ),
4431 (
4433 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4434 concurrency_limit: limit1,
4435 },
4436 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4437 concurrency_limit: limit2,
4438 },
4439 ) => Ok(
4440 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4441 concurrency_limit: (*limit1).min(*limit2),
4442 },
4443 ),
4444 (
4446 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4447 crate::transport_parameters::NatTraversalConfig::ClientSupport,
4448 ) => Err("Both endpoints claim to be NAT traversal clients".to_string()),
4449 }
4450 }
4451
4452 fn init_nat_traversal_with_negotiated_config(
4454 &mut self,
4455 config: &crate::transport_parameters::NatTraversalConfig,
4456 ) {
4457 let (role, _concurrency_limit) = match config {
4460 crate::transport_parameters::NatTraversalConfig::ClientSupport => {
4461 (NatTraversalRole::Client, 10) }
4464 crate::transport_parameters::NatTraversalConfig::ServerSupport {
4465 concurrency_limit,
4466 } => {
4467 (
4469 NatTraversalRole::Server { can_relay: false },
4470 concurrency_limit.into_inner() as u32,
4471 )
4472 }
4473 };
4474
4475 let max_candidates = 50; let coordination_timeout = Duration::from_secs(10); self.nat_traversal = Some(NatTraversalState::new(
4481 role,
4482 max_candidates,
4483 coordination_timeout,
4484 ));
4485
4486 trace!(
4487 "NAT traversal initialized with negotiated config: role={:?}",
4488 role
4489 );
4490
4491 match role {
4493 NatTraversalRole::Bootstrap => {
4494 self.prepare_address_observation();
4496 }
4497 NatTraversalRole::Client => {
4498 self.schedule_candidate_discovery();
4500 }
4501 NatTraversalRole::Server { .. } => {
4502 self.prepare_coordination_handling();
4504 }
4505 }
4506 }
4507
4508 fn initiate_nat_traversal_process(&mut self) {
4510 if let Some(nat_state) = &mut self.nat_traversal {
4511 match nat_state.start_candidate_discovery() {
4512 Ok(()) => {
4513 debug!("NAT traversal process initiated - candidate discovery started");
4514 self.timers.set(
4516 Timer::NatTraversal,
4517 Instant::now() + Duration::from_millis(100),
4518 );
4519 }
4520 Err(e) => {
4521 warn!("Failed to initiate NAT traversal process: {}", e);
4522 }
4523 }
4524 }
4525 }
4526
4527 fn prepare_address_observation(&mut self) {
4529 debug!("Preparing for address observation as bootstrap node");
4530 }
4533
4534 fn schedule_candidate_discovery(&mut self) {
4536 debug!("Scheduling candidate discovery for client endpoint");
4537 self.timers.set(
4539 Timer::NatTraversal,
4540 Instant::now() + Duration::from_millis(50),
4541 );
4542 }
4543
4544 fn prepare_coordination_handling(&mut self) {
4546 debug!("Preparing to handle coordination requests as server endpoint");
4547 }
4550
4551 fn handle_nat_traversal_timeout(&mut self, now: Instant) {
4553 let timeout_result = if let Some(nat_state) = &mut self.nat_traversal {
4555 nat_state.handle_timeout(now)
4556 } else {
4557 return;
4558 };
4559
4560 match timeout_result {
4562 Ok(actions) => {
4563 for action in actions {
4564 match action {
4565 nat_traversal::TimeoutAction::RetryDiscovery => {
4566 debug!("NAT traversal timeout: retrying candidate discovery");
4567 if let Some(nat_state) = &mut self.nat_traversal {
4568 if let Err(e) = nat_state.start_candidate_discovery() {
4569 warn!("Failed to retry candidate discovery: {}", e);
4570 }
4571 }
4572 }
4573 nat_traversal::TimeoutAction::RetryCoordination => {
4574 debug!("NAT traversal timeout: retrying coordination");
4575 self.timers
4577 .set(Timer::NatTraversal, now + Duration::from_secs(2));
4578 }
4579 nat_traversal::TimeoutAction::StartValidation => {
4580 debug!("NAT traversal timeout: starting path validation");
4581 self.start_nat_traversal_validation(now);
4582 }
4583 nat_traversal::TimeoutAction::Complete => {
4584 debug!("NAT traversal completed successfully");
4585 self.timers.stop(Timer::NatTraversal);
4587 }
4588 nat_traversal::TimeoutAction::Failed => {
4589 warn!("NAT traversal failed after timeout");
4590 self.handle_nat_traversal_failure();
4592 }
4593 }
4594 }
4595 }
4596 Err(e) => {
4597 warn!("NAT traversal timeout handling failed: {}", e);
4598 self.handle_nat_traversal_failure();
4599 }
4600 }
4601 }
4602
4603 fn start_nat_traversal_validation(&mut self, now: Instant) {
4605 if let Some(nat_state) = &mut self.nat_traversal {
4606 let pairs = nat_state.get_next_validation_pairs(3);
4608
4609 for pair in pairs {
4610 let challenge = self.rng.r#gen();
4612 self.path.challenge = Some(challenge);
4613 self.path.challenge_pending = true;
4614
4615 debug!(
4616 "Starting path validation for NAT traversal candidate: {}",
4617 pair.remote_addr
4618 );
4619 }
4620
4621 self.timers
4623 .set(Timer::PathValidation, now + Duration::from_secs(3));
4624 }
4625 }
4626
4627 fn handle_nat_traversal_failure(&mut self) {
4629 warn!("NAT traversal failed, considering fallback options");
4630
4631 self.nat_traversal = None;
4633 self.timers.stop(Timer::NatTraversal);
4634
4635 debug!("NAT traversal disabled for this connection due to failure");
4642 }
4643
4644 pub fn nat_traversal_supported(&self) -> bool {
4646 self.nat_traversal.is_some()
4647 && self.config.nat_traversal_config.is_some()
4648 && self.peer_params.nat_traversal.is_some()
4649 }
4650
4651 pub fn nat_traversal_config(&self) -> Option<&crate::transport_parameters::NatTraversalConfig> {
4653 self.peer_params.nat_traversal.as_ref()
4654 }
4655
4656 pub fn nat_traversal_ready(&self) -> bool {
4658 self.nat_traversal_supported() && matches!(self.state, State::Established)
4659 }
4660
4661 pub(crate) fn nat_traversal_stats(&self) -> Option<nat_traversal::NatTraversalStats> {
4666 self.nat_traversal.as_ref().map(|state| state.stats.clone())
4667 }
4668
4669 #[cfg(test)]
4671 pub(crate) fn force_enable_nat_traversal(&mut self, role: NatTraversalRole) {
4672 use crate::transport_parameters::NatTraversalConfig;
4673
4674 let config = match role {
4676 NatTraversalRole::Client => NatTraversalConfig::ClientSupport,
4677 NatTraversalRole::Server { .. } | NatTraversalRole::Bootstrap => {
4678 NatTraversalConfig::ServerSupport {
4679 concurrency_limit: VarInt::from_u32(5),
4680 }
4681 }
4682 };
4683
4684 self.peer_params.nat_traversal = Some(config.clone());
4685 self.config = Arc::new({
4686 let mut transport_config = (*self.config).clone();
4687 transport_config.nat_traversal_config = Some(config);
4688 transport_config
4689 });
4690
4691 self.nat_traversal = Some(NatTraversalState::new(role, 8, Duration::from_secs(10)));
4692 }
4693
4694 fn derive_peer_id_from_connection(&self) -> [u8; 32] {
4697 let mut hasher = std::collections::hash_map::DefaultHasher::new();
4699 use std::hash::Hasher;
4700 hasher.write(&self.rem_handshake_cid);
4701 hasher.write(&self.handshake_cid);
4702 hasher.write(&self.path.remote.to_string().into_bytes());
4703 let hash = hasher.finish();
4704 let mut peer_id = [0u8; 32];
4705 peer_id[..8].copy_from_slice(&hash.to_be_bytes());
4706 let cid_bytes = self.rem_handshake_cid.as_ref();
4708 let copy_len = (cid_bytes.len()).min(24);
4709 peer_id[8..8 + copy_len].copy_from_slice(&cid_bytes[..copy_len]);
4710 peer_id
4711 }
4712
4713 fn handle_add_address(
4715 &mut self,
4716 add_address: &crate::frame::AddAddress,
4717 now: Instant,
4718 ) -> Result<(), TransportError> {
4719 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4720 TransportError::PROTOCOL_VIOLATION("AddAddress frame without NAT traversal negotiation")
4721 })?;
4722
4723 match nat_state.add_remote_candidate(
4724 add_address.sequence,
4725 add_address.address,
4726 add_address.priority,
4727 now,
4728 ) {
4729 Ok(()) => {
4730 trace!(
4731 "Added remote candidate: {} (seq={}, priority={})",
4732 add_address.address, add_address.sequence, add_address.priority
4733 );
4734
4735 self.trigger_candidate_validation(add_address.address, now)?;
4737 Ok(())
4738 }
4739 Err(NatTraversalError::TooManyCandidates) => Err(TransportError::PROTOCOL_VIOLATION(
4740 "too many NAT traversal candidates",
4741 )),
4742 Err(NatTraversalError::DuplicateAddress) => {
4743 Ok(())
4745 }
4746 Err(e) => {
4747 warn!("Failed to add remote candidate: {}", e);
4748 Ok(()) }
4750 }
4751 }
4752
4753 fn handle_punch_me_now(
4755 &mut self,
4756 punch_me_now: &crate::frame::PunchMeNow,
4757 now: Instant,
4758 ) -> Result<(), TransportError> {
4759 trace!(
4760 "Received PunchMeNow: round={}, target_seq={}, local_addr={}",
4761 punch_me_now.round, punch_me_now.paired_with_sequence_number, punch_me_now.address
4762 );
4763
4764 if let Some(nat_state) = &self.nat_traversal {
4766 if matches!(nat_state.role, NatTraversalRole::Bootstrap) {
4767 let from_peer_id = self.derive_peer_id_from_connection();
4769
4770 let punch_me_now_clone = punch_me_now.clone();
4772 drop(nat_state); match self
4775 .nat_traversal
4776 .as_mut()
4777 .unwrap()
4778 .handle_punch_me_now_frame(
4779 from_peer_id,
4780 self.path.remote,
4781 &punch_me_now_clone,
4782 now,
4783 ) {
4784 Ok(Some(coordination_frame)) => {
4785 trace!("Bootstrap node coordinating PUNCH_ME_NOW between peers");
4786
4787 if let Some(target_peer_id) = punch_me_now.target_peer_id {
4789 self.endpoint_events.push_back(
4790 crate::shared::EndpointEventInner::RelayPunchMeNow(
4791 target_peer_id,
4792 coordination_frame,
4793 ),
4794 );
4795 }
4796
4797 return Ok(());
4798 }
4799 Ok(None) => {
4800 trace!("Bootstrap coordination completed or no action needed");
4801 return Ok(());
4802 }
4803 Err(e) => {
4804 warn!("Bootstrap coordination failed: {}", e);
4805 return Ok(());
4806 }
4807 }
4808 }
4809 }
4810
4811 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4813 TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation")
4814 })?;
4815
4816 if nat_state
4818 .handle_peer_punch_request(punch_me_now.round, now)
4819 .map_err(|_e| {
4820 TransportError::PROTOCOL_VIOLATION("Failed to handle peer punch request")
4821 })?
4822 {
4823 trace!("Coordination synchronized for round {}", punch_me_now.round);
4824
4825 let _local_addr = self
4828 .local_ip
4829 .map(|ip| SocketAddr::new(ip, 0))
4830 .unwrap_or_else(|| {
4831 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
4832 });
4833
4834 let target = nat_traversal::PunchTarget {
4835 remote_addr: punch_me_now.address,
4836 remote_sequence: punch_me_now.paired_with_sequence_number,
4837 challenge: self.rng.r#gen(),
4838 };
4839
4840 let _ = nat_state.start_coordination_round(vec![target], now);
4842 } else {
4843 debug!(
4844 "Failed to synchronize coordination for round {}",
4845 punch_me_now.round
4846 );
4847 }
4848
4849 Ok(())
4850 }
4851
4852 fn handle_remove_address(
4854 &mut self,
4855 remove_address: &crate::frame::RemoveAddress,
4856 ) -> Result<(), TransportError> {
4857 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
4858 TransportError::PROTOCOL_VIOLATION(
4859 "RemoveAddress frame without NAT traversal negotiation",
4860 )
4861 })?;
4862
4863 if nat_state.remove_candidate(remove_address.sequence) {
4864 trace!(
4865 "Removed candidate with sequence {}",
4866 remove_address.sequence
4867 );
4868 } else {
4869 trace!(
4870 "Attempted to remove unknown candidate sequence {}",
4871 remove_address.sequence
4872 );
4873 }
4874
4875 Ok(())
4876 }
4877
4878 fn handle_observed_address_frame(
4880 &mut self,
4881 observed_address: &crate::frame::ObservedAddress,
4882 now: Instant,
4883 ) -> Result<(), TransportError> {
4884 let state = self.address_discovery_state.as_mut().ok_or_else(|| {
4886 TransportError::PROTOCOL_VIOLATION(
4887 "ObservedAddress frame without address discovery negotiation",
4888 )
4889 })?;
4890
4891 if !state.enabled {
4893 return Err(TransportError::PROTOCOL_VIOLATION(
4894 "ObservedAddress frame received when address discovery is disabled",
4895 ));
4896 }
4897
4898 #[cfg(feature = "trace")]
4900 {
4901 use crate::trace_observed_address_received;
4902 trace_observed_address_received!(
4904 &self.event_log,
4905 self.trace_context.trace_id(),
4906 observed_address.address,
4907 0u64 );
4909 }
4910
4911 let path_id = 0u64; if let Some(&last_seq) = state.last_received_sequence.get(&path_id) {
4919 if observed_address.sequence_number <= last_seq {
4920 trace!(
4921 "Ignoring OBSERVED_ADDRESS frame with stale sequence number {} (last was {})",
4922 observed_address.sequence_number, last_seq
4923 );
4924 return Ok(());
4925 }
4926 }
4927
4928 state
4930 .last_received_sequence
4931 .insert(path_id, observed_address.sequence_number);
4932
4933 state.handle_observed_address(observed_address.address, path_id, now);
4935
4936 self.path
4938 .update_observed_address(observed_address.address, now);
4939
4940 trace!(
4942 "Received ObservedAddress frame: address={} for path={}",
4943 observed_address.address, path_id
4944 );
4945
4946 Ok(())
4947 }
4948
4949 pub fn queue_add_address(&mut self, sequence: VarInt, address: SocketAddr, priority: VarInt) {
4951 let add_address = frame::AddAddress {
4953 sequence,
4954 address,
4955 priority,
4956 };
4957
4958 self.spaces[SpaceId::Data]
4959 .pending
4960 .add_addresses
4961 .push(add_address);
4962 trace!(
4963 "Queued AddAddress frame: seq={}, addr={}, priority={}",
4964 sequence, address, priority
4965 );
4966 }
4967
4968 pub fn queue_punch_me_now(
4970 &mut self,
4971 round: VarInt,
4972 paired_with_sequence_number: VarInt,
4973 address: SocketAddr,
4974 ) {
4975 let punch_me_now = frame::PunchMeNow {
4976 round,
4977 paired_with_sequence_number,
4978 address,
4979 target_peer_id: None, };
4981
4982 self.spaces[SpaceId::Data]
4983 .pending
4984 .punch_me_now
4985 .push(punch_me_now);
4986 trace!(
4987 "Queued PunchMeNow frame: round={}, target={}",
4988 round, paired_with_sequence_number
4989 );
4990 }
4991
4992 pub fn queue_remove_address(&mut self, sequence: VarInt) {
4994 let remove_address = frame::RemoveAddress { sequence };
4995
4996 self.spaces[SpaceId::Data]
4997 .pending
4998 .remove_addresses
4999 .push(remove_address);
5000 trace!("Queued RemoveAddress frame: seq={}", sequence);
5001 }
5002
5003 pub fn queue_observed_address(&mut self, address: SocketAddr) {
5005 let sequence_number = if let Some(state) = &mut self.address_discovery_state {
5007 let seq = state.next_sequence_number;
5008 state.next_sequence_number =
5009 VarInt::from_u64(state.next_sequence_number.into_inner() + 1)
5010 .expect("sequence number overflow");
5011 seq
5012 } else {
5013 VarInt::from_u32(0)
5015 };
5016
5017 let observed_address = frame::ObservedAddress {
5018 sequence_number,
5019 address,
5020 };
5021 self.spaces[SpaceId::Data]
5022 .pending
5023 .observed_addresses
5024 .push(observed_address);
5025 trace!("Queued ObservedAddress frame: addr={}", address);
5026 }
5027
5028 pub fn check_for_address_observations(&mut self, now: Instant) {
5030 let Some(state) = &mut self.address_discovery_state else {
5032 return;
5033 };
5034
5035 if !state.enabled {
5037 return;
5038 }
5039
5040 let path_id = 0u64; let remote_address = self.path.remote;
5045
5046 if state.should_send_observation(path_id, now) {
5048 if let Some(frame) = state.queue_observed_address_frame(path_id, remote_address) {
5050 self.spaces[SpaceId::Data]
5052 .pending
5053 .observed_addresses
5054 .push(frame);
5055
5056 state.record_observation_sent(path_id);
5058
5059 #[cfg(feature = "trace")]
5061 {
5062 use crate::trace_observed_address_sent;
5063 trace_observed_address_sent!(
5065 &self.event_log,
5066 self.trace_context.trace_id(),
5067 remote_address,
5068 path_id
5069 );
5070 }
5071
5072 trace!(
5073 "Queued OBSERVED_ADDRESS frame for path {} with address {}",
5074 path_id, remote_address
5075 );
5076 }
5077 }
5078 }
5079
5080 fn trigger_candidate_validation(
5082 &mut self,
5083 candidate_address: SocketAddr,
5084 now: Instant,
5085 ) -> Result<(), TransportError> {
5086 let nat_state = self
5087 .nat_traversal
5088 .as_mut()
5089 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5090
5091 if nat_state
5093 .active_validations
5094 .contains_key(&candidate_address)
5095 {
5096 trace!("Validation already in progress for {}", candidate_address);
5097 return Ok(());
5098 }
5099
5100 let challenge = self.rng.r#gen::<u64>();
5102
5103 let validation_state = nat_traversal::PathValidationState {
5105 challenge,
5106 sent_at: now,
5107 retry_count: 0,
5108 max_retries: 3,
5109 coordination_round: None,
5110 timeout_state: nat_traversal::AdaptiveTimeoutState::new(),
5111 last_retry_at: None,
5112 };
5113
5114 nat_state
5116 .active_validations
5117 .insert(candidate_address, validation_state);
5118
5119 self.nat_traversal_challenges
5121 .push(candidate_address, challenge);
5122
5123 nat_state.stats.validations_succeeded += 1; trace!(
5127 "Triggered PATH_CHALLENGE validation for {} with challenge {:016x}",
5128 candidate_address, challenge
5129 );
5130
5131 Ok(())
5132 }
5133
5134 pub fn nat_traversal_state(&self) -> Option<(NatTraversalRole, usize, usize)> {
5136 self.nat_traversal.as_ref().map(|state| {
5137 (
5138 state.role,
5139 state.local_candidates.len(),
5140 state.remote_candidates.len(),
5141 )
5142 })
5143 }
5144
5145 pub fn initiate_nat_traversal_coordination(
5147 &mut self,
5148 now: Instant,
5149 ) -> Result<(), TransportError> {
5150 let nat_state = self
5151 .nat_traversal
5152 .as_mut()
5153 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
5154
5155 if nat_state.should_send_punch_request() {
5157 nat_state.generate_candidate_pairs(now);
5159
5160 let pairs = nat_state.get_next_validation_pairs(3);
5162 if pairs.is_empty() {
5163 return Err(TransportError::PROTOCOL_VIOLATION(
5164 "No candidate pairs for coordination",
5165 ));
5166 }
5167
5168 let targets: Vec<_> = pairs
5170 .into_iter()
5171 .map(|pair| nat_traversal::PunchTarget {
5172 remote_addr: pair.remote_addr,
5173 remote_sequence: pair.remote_sequence,
5174 challenge: self.rng.r#gen(),
5175 })
5176 .collect();
5177
5178 let round = nat_state
5180 .start_coordination_round(targets, now)
5181 .map_err(|_e| {
5182 TransportError::PROTOCOL_VIOLATION("Failed to start coordination round")
5183 })?;
5184
5185 let local_addr = self
5188 .local_ip
5189 .map(|ip| SocketAddr::new(ip, self.local_ip.map(|_| 0).unwrap_or(0)))
5190 .unwrap_or_else(|| {
5191 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
5192 });
5193
5194 let punch_me_now = frame::PunchMeNow {
5195 round,
5196 paired_with_sequence_number: VarInt::from_u32(0), address: local_addr,
5198 target_peer_id: None, };
5200
5201 self.spaces[SpaceId::Data]
5202 .pending
5203 .punch_me_now
5204 .push(punch_me_now);
5205 nat_state.mark_punch_request_sent();
5206
5207 trace!("Initiated NAT traversal coordination round {}", round);
5208 }
5209
5210 Ok(())
5211 }
5212
5213 pub fn validate_nat_candidates(&mut self, now: Instant) {
5215 self.generate_nat_traversal_challenges(now);
5216 }
5217
5218 pub fn send_nat_address_advertisement(
5233 &mut self,
5234 address: SocketAddr,
5235 priority: u32,
5236 ) -> Result<u64, ConnectionError> {
5237 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5239 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5240 "NAT traversal not enabled on this connection",
5241 ))
5242 })?;
5243
5244 let sequence = nat_state.next_sequence;
5246 nat_state.next_sequence =
5247 VarInt::from_u64(nat_state.next_sequence.into_inner() + 1).unwrap();
5248
5249 let now = Instant::now();
5251 nat_state.local_candidates.insert(
5252 sequence,
5253 nat_traversal::AddressCandidate {
5254 address,
5255 priority,
5256 source: nat_traversal::CandidateSource::Local,
5257 discovered_at: now,
5258 state: nat_traversal::CandidateState::New,
5259 attempt_count: 0,
5260 last_attempt: None,
5261 },
5262 );
5263
5264 nat_state.stats.local_candidates_sent += 1;
5266
5267 self.queue_add_address(sequence, address, VarInt::from_u32(priority));
5269
5270 debug!(
5271 "Queued ADD_ADDRESS frame: addr={}, priority={}, seq={}",
5272 address, priority, sequence
5273 );
5274 Ok(sequence.into_inner())
5275 }
5276
5277 pub fn send_nat_punch_coordination(
5290 &mut self,
5291 paired_with_sequence_number: u64,
5292 address: SocketAddr,
5293 round: u32,
5294 ) -> Result<(), ConnectionError> {
5295 let _nat_state = self.nat_traversal.as_ref().ok_or_else(|| {
5297 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5298 "NAT traversal not enabled on this connection",
5299 ))
5300 })?;
5301
5302 self.queue_punch_me_now(
5304 VarInt::from_u32(round),
5305 VarInt::from_u64(paired_with_sequence_number).map_err(|_| {
5306 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5307 "Invalid target sequence number",
5308 ))
5309 })?,
5310 address,
5311 );
5312
5313 debug!(
5314 "Queued PUNCH_ME_NOW frame: paired_with_seq={}, addr={}, round={}",
5315 paired_with_sequence_number, address, round
5316 );
5317 Ok(())
5318 }
5319
5320 pub fn send_nat_address_removal(&mut self, sequence: u64) -> Result<(), ConnectionError> {
5331 let nat_state = self.nat_traversal.as_mut().ok_or_else(|| {
5333 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5334 "NAT traversal not enabled on this connection",
5335 ))
5336 })?;
5337
5338 let sequence_varint = VarInt::from_u64(sequence).map_err(|_| {
5339 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION(
5340 "Invalid sequence number",
5341 ))
5342 })?;
5343
5344 nat_state.local_candidates.remove(&sequence_varint);
5346
5347 self.queue_remove_address(sequence_varint);
5349
5350 debug!("Queued REMOVE_ADDRESS frame: seq={}", sequence);
5351 Ok(())
5352 }
5353
5354 pub(crate) fn get_nat_traversal_stats(&self) -> Option<&nat_traversal::NatTraversalStats> {
5363 self.nat_traversal.as_ref().map(|state| &state.stats)
5364 }
5365
5366 pub fn is_nat_traversal_enabled(&self) -> bool {
5368 self.nat_traversal.is_some()
5369 }
5370
5371 pub fn get_nat_traversal_role(&self) -> Option<NatTraversalRole> {
5373 self.nat_traversal.as_ref().map(|state| state.role)
5374 }
5375
5376 fn negotiate_address_discovery(&mut self, peer_params: &TransportParameters) {
5378 let now = Instant::now();
5379
5380 match &peer_params.address_discovery {
5382 Some(peer_config) => {
5383 if let Some(state) = &mut self.address_discovery_state {
5385 if state.enabled {
5386 debug!(
5389 "Address discovery negotiated: rate={}, all_paths={}",
5390 state.max_observation_rate, state.observe_all_paths
5391 );
5392 } else {
5393 debug!("Address discovery disabled locally, ignoring peer support");
5395 }
5396 } else {
5397 self.address_discovery_state =
5399 Some(AddressDiscoveryState::new(peer_config, now));
5400 debug!("Address discovery initialized from peer config");
5401 }
5402 }
5403 _ => {
5404 if let Some(state) = &mut self.address_discovery_state {
5406 state.enabled = false;
5407 debug!("Address discovery disabled - peer doesn't support it");
5408 }
5409 }
5410 }
5411
5412 if let Some(state) = &self.address_discovery_state {
5414 if state.enabled {
5415 self.path.set_observation_rate(state.max_observation_rate);
5416 }
5417 }
5418 }
5419
5420 fn decrypt_packet(
5421 &mut self,
5422 now: Instant,
5423 packet: &mut Packet,
5424 ) -> Result<Option<u64>, Option<TransportError>> {
5425 let result = packet_crypto::decrypt_packet_body(
5426 packet,
5427 &self.spaces,
5428 self.zero_rtt_crypto.as_ref(),
5429 self.key_phase,
5430 self.prev_crypto.as_ref(),
5431 self.next_crypto.as_ref(),
5432 )?;
5433
5434 let result = match result {
5435 Some(r) => r,
5436 None => return Ok(None),
5437 };
5438
5439 if result.outgoing_key_update_acked {
5440 if let Some(prev) = self.prev_crypto.as_mut() {
5441 prev.end_packet = Some((result.number, now));
5442 self.set_key_discard_timer(now, packet.header.space());
5443 }
5444 }
5445
5446 if result.incoming_key_update {
5447 trace!("key update authenticated");
5448 self.update_keys(Some((result.number, now)), true);
5449 self.set_key_discard_timer(now, packet.header.space());
5450 }
5451
5452 Ok(Some(result.number))
5453 }
5454
5455 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5456 trace!("executing key update");
5457 let new = self
5461 .crypto
5462 .next_1rtt_keys()
5463 .expect("only called for `Data` packets");
5464 self.key_phase_size = new
5465 .local
5466 .confidentiality_limit()
5467 .saturating_sub(KEY_UPDATE_MARGIN);
5468 let old = mem::replace(
5469 &mut self.spaces[SpaceId::Data]
5470 .crypto
5471 .as_mut()
5472 .unwrap() .packet,
5474 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5475 );
5476 self.spaces[SpaceId::Data].sent_with_keys = 0;
5477 self.prev_crypto = Some(PrevCrypto {
5478 crypto: old,
5479 end_packet,
5480 update_unacked: remote,
5481 });
5482 self.key_phase = !self.key_phase;
5483 }
5484
5485 fn peer_supports_ack_frequency(&self) -> bool {
5486 self.peer_params.min_ack_delay.is_some()
5487 }
5488
5489 pub(crate) fn immediate_ack(&mut self) {
5494 self.spaces[self.highest_space].immediate_ack_pending = true;
5495 }
5496
5497 #[cfg(test)]
5499 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5500 let (first_decode, remaining) = match &event.0 {
5501 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5502 first_decode,
5503 remaining,
5504 ..
5505 }) => (first_decode, remaining),
5506 _ => return None,
5507 };
5508
5509 if remaining.is_some() {
5510 panic!("Packets should never be coalesced in tests");
5511 }
5512
5513 let decrypted_header = packet_crypto::unprotect_header(
5514 first_decode.clone(),
5515 &self.spaces,
5516 self.zero_rtt_crypto.as_ref(),
5517 self.peer_params.stateless_reset_token,
5518 )?;
5519
5520 let mut packet = decrypted_header.packet?;
5521 packet_crypto::decrypt_packet_body(
5522 &mut packet,
5523 &self.spaces,
5524 self.zero_rtt_crypto.as_ref(),
5525 self.key_phase,
5526 self.prev_crypto.as_ref(),
5527 self.next_crypto.as_ref(),
5528 )
5529 .ok()?;
5530
5531 Some(packet.payload.to_vec())
5532 }
5533
5534 #[cfg(test)]
5537 pub(crate) fn bytes_in_flight(&self) -> u64 {
5538 self.path.in_flight.bytes
5539 }
5540
5541 #[cfg(test)]
5543 pub(crate) fn congestion_window(&self) -> u64 {
5544 self.path
5545 .congestion
5546 .window()
5547 .saturating_sub(self.path.in_flight.bytes)
5548 }
5549
5550 #[cfg(test)]
5552 pub(crate) fn is_idle(&self) -> bool {
5553 Timer::VALUES
5554 .iter()
5555 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
5556 .filter_map(|&t| Some((t, self.timers.get(t)?)))
5557 .min_by_key(|&(_, time)| time)
5558 .is_none_or(|(timer, _)| timer == Timer::Idle)
5559 }
5560
5561 #[cfg(test)]
5563 pub(crate) fn lost_packets(&self) -> u64 {
5564 self.lost_packets
5565 }
5566
5567 #[cfg(test)]
5569 pub(crate) fn using_ecn(&self) -> bool {
5570 self.path.sending_ecn
5571 }
5572
5573 #[cfg(test)]
5575 pub(crate) fn total_recvd(&self) -> u64 {
5576 self.path.total_recvd
5577 }
5578
5579 #[cfg(test)]
5580 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
5581 self.local_cid_state.active_seq()
5582 }
5583
5584 #[cfg(test)]
5587 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
5588 let n = self.local_cid_state.assign_retire_seq(v);
5589 self.endpoint_events
5590 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
5591 }
5592
5593 #[cfg(test)]
5595 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
5596 self.rem_cids.active_seq()
5597 }
5598
5599 #[cfg(test)]
5601 pub(crate) fn path_mtu(&self) -> u16 {
5602 self.path.current_mtu()
5603 }
5604
5605 fn can_send_1rtt(&self, max_size: usize) -> bool {
5609 self.streams.can_send_stream_data()
5610 || self.path.challenge_pending
5611 || self
5612 .prev_path
5613 .as_ref()
5614 .is_some_and(|(_, x)| x.challenge_pending)
5615 || !self.path_responses.is_empty()
5616 || !self.nat_traversal_challenges.is_empty()
5617 || self
5618 .datagrams
5619 .outgoing
5620 .front()
5621 .is_some_and(|x| x.size(true) <= max_size)
5622 }
5623
5624 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
5626 for path in [&mut self.path]
5628 .into_iter()
5629 .chain(self.prev_path.as_mut().map(|(_, data)| data))
5630 {
5631 if path.remove_in_flight(pn, packet) {
5632 return;
5633 }
5634 }
5635 }
5636
5637 fn kill(&mut self, reason: ConnectionError) {
5639 self.close_common();
5640 self.error = Some(reason);
5641 self.state = State::Drained;
5642 self.endpoint_events.push_back(EndpointEventInner::Drained);
5643 }
5644
5645 fn generate_nat_traversal_challenges(&mut self, now: Instant) {
5647 let candidates: Vec<(VarInt, SocketAddr)> = if let Some(nat_state) = &self.nat_traversal {
5649 nat_state
5650 .get_validation_candidates()
5651 .into_iter()
5652 .take(3) .map(|(seq, candidate)| (seq, candidate.address))
5654 .collect()
5655 } else {
5656 return;
5657 };
5658
5659 if candidates.is_empty() {
5660 return;
5661 }
5662
5663 if let Some(nat_state) = &mut self.nat_traversal {
5665 for (seq, address) in candidates {
5666 let challenge: u64 = self.rng.r#gen();
5668
5669 if let Err(e) = nat_state.start_validation(seq, challenge, now) {
5671 debug!("Failed to start validation for candidate {}: {}", seq, e);
5672 continue;
5673 }
5674
5675 self.nat_traversal_challenges.push(address, challenge);
5677 trace!(
5678 "Queuing NAT validation PATH_CHALLENGE for {} with token {:08x}",
5679 address, challenge
5680 );
5681 }
5682 }
5683 }
5684
5685 pub fn current_mtu(&self) -> u16 {
5689 self.path.current_mtu()
5690 }
5691
5692 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
5699 let pn_len = match pn {
5700 Some(pn) => PacketNumber::new(
5701 pn,
5702 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
5703 )
5704 .len(),
5705 None => 4,
5707 };
5708
5709 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
5711 }
5712
5713 fn tag_len_1rtt(&self) -> usize {
5714 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
5715 Some(crypto) => Some(&*crypto.packet.local),
5716 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
5717 };
5718 key.map_or(16, |x| x.tag_len())
5722 }
5723
5724 fn on_path_validated(&mut self) {
5726 self.path.validated = true;
5727 let ConnectionSide::Server { server_config } = &self.side else {
5728 return;
5729 };
5730 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
5731 new_tokens.clear();
5732 for _ in 0..server_config.validation_token.sent {
5733 new_tokens.push(self.path.remote);
5734 }
5735 }
5736}
5737
5738impl fmt::Debug for Connection {
5739 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5740 f.debug_struct("Connection")
5741 .field("handshake_cid", &self.handshake_cid)
5742 .finish()
5743 }
5744}
5745
5746enum ConnectionSide {
5748 Client {
5749 token: Bytes,
5751 token_store: Arc<dyn TokenStore>,
5752 server_name: String,
5753 },
5754 Server {
5755 server_config: Arc<ServerConfig>,
5756 },
5757}
5758
5759impl ConnectionSide {
5760 fn remote_may_migrate(&self) -> bool {
5761 match self {
5762 Self::Server { server_config } => server_config.migration,
5763 Self::Client { .. } => false,
5764 }
5765 }
5766
5767 fn is_client(&self) -> bool {
5768 self.side().is_client()
5769 }
5770
5771 fn is_server(&self) -> bool {
5772 self.side().is_server()
5773 }
5774
5775 fn side(&self) -> Side {
5776 match *self {
5777 Self::Client { .. } => Side::Client,
5778 Self::Server { .. } => Side::Server,
5779 }
5780 }
5781}
5782
5783impl From<SideArgs> for ConnectionSide {
5784 fn from(side: SideArgs) -> Self {
5785 match side {
5786 SideArgs::Client {
5787 token_store,
5788 server_name,
5789 } => Self::Client {
5790 token: token_store.take(&server_name).unwrap_or_default(),
5791 token_store,
5792 server_name,
5793 },
5794 SideArgs::Server {
5795 server_config,
5796 pref_addr_cid: _,
5797 path_validated: _,
5798 } => Self::Server { server_config },
5799 }
5800 }
5801}
5802
5803pub(crate) enum SideArgs {
5805 Client {
5806 token_store: Arc<dyn TokenStore>,
5807 server_name: String,
5808 },
5809 Server {
5810 server_config: Arc<ServerConfig>,
5811 pref_addr_cid: Option<ConnectionId>,
5812 path_validated: bool,
5813 },
5814}
5815
5816impl SideArgs {
5817 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
5818 match *self {
5819 Self::Client { .. } => None,
5820 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
5821 }
5822 }
5823
5824 pub(crate) fn path_validated(&self) -> bool {
5825 match *self {
5826 Self::Client { .. } => true,
5827 Self::Server { path_validated, .. } => path_validated,
5828 }
5829 }
5830
5831 pub(crate) fn side(&self) -> Side {
5832 match *self {
5833 Self::Client { .. } => Side::Client,
5834 Self::Server { .. } => Side::Server,
5835 }
5836 }
5837}
5838
5839#[derive(Debug, Error, Clone, PartialEq, Eq)]
5841pub enum ConnectionError {
5842 #[error("peer doesn't implement any supported version")]
5844 VersionMismatch,
5845 #[error(transparent)]
5847 TransportError(#[from] TransportError),
5848 #[error("aborted by peer: {0}")]
5850 ConnectionClosed(frame::ConnectionClose),
5851 #[error("closed by peer: {0}")]
5853 ApplicationClosed(frame::ApplicationClose),
5854 #[error("reset by peer")]
5856 Reset,
5857 #[error("timed out")]
5863 TimedOut,
5864 #[error("closed")]
5866 LocallyClosed,
5867 #[error("CIDs exhausted")]
5871 CidsExhausted,
5872}
5873
5874impl From<Close> for ConnectionError {
5875 fn from(x: Close) -> Self {
5876 match x {
5877 Close::Connection(reason) => Self::ConnectionClosed(reason),
5878 Close::Application(reason) => Self::ApplicationClosed(reason),
5879 }
5880 }
5881}
5882
5883impl From<ConnectionError> for io::Error {
5885 fn from(x: ConnectionError) -> Self {
5886 use ConnectionError::*;
5887 let kind = match x {
5888 TimedOut => io::ErrorKind::TimedOut,
5889 Reset => io::ErrorKind::ConnectionReset,
5890 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
5891 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
5892 io::ErrorKind::Other
5893 }
5894 };
5895 Self::new(kind, x)
5896 }
5897}
5898
5899#[derive(Clone, Debug)]
5900pub enum State {
5902 Handshake(state::Handshake),
5904 Established,
5906 Closed(state::Closed),
5908 Draining,
5910 Drained,
5912}
5913
5914impl State {
5915 fn closed<R: Into<Close>>(reason: R) -> Self {
5916 Self::Closed(state::Closed {
5917 reason: reason.into(),
5918 })
5919 }
5920
5921 fn is_handshake(&self) -> bool {
5922 matches!(*self, Self::Handshake(_))
5923 }
5924
5925 fn is_established(&self) -> bool {
5926 matches!(*self, Self::Established)
5927 }
5928
5929 fn is_closed(&self) -> bool {
5930 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
5931 }
5932
5933 fn is_drained(&self) -> bool {
5934 matches!(*self, Self::Drained)
5935 }
5936}
5937
5938mod state {
5939 use super::*;
5940
5941 #[derive(Clone, Debug)]
5942 pub struct Handshake {
5943 pub(super) rem_cid_set: bool,
5947 pub(super) expected_token: Bytes,
5951 pub(super) client_hello: Option<Bytes>,
5955 }
5956
5957 #[derive(Clone, Debug)]
5958 pub struct Closed {
5959 pub(super) reason: Close,
5960 }
5961}
5962
5963#[derive(Debug)]
5965pub enum Event {
5966 HandshakeDataReady,
5968 Connected,
5970 ConnectionLost {
5974 reason: ConnectionError,
5976 },
5977 Stream(StreamEvent),
5979 DatagramReceived,
5981 DatagramsUnblocked,
5983}
5984
5985fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
5986 if x > y { x - y } else { Duration::ZERO }
5987}
5988
5989fn get_max_ack_delay(params: &TransportParameters) -> Duration {
5990 Duration::from_micros(params.max_ack_delay.0 * 1000)
5991}
5992
5993const MAX_BACKOFF_EXPONENT: u32 = 16;
5995
5996const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6004
6005const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6011 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6012
6013const KEY_UPDATE_MARGIN: u64 = 10_000;
6017
6018#[derive(Default)]
6019struct SentFrames {
6020 retransmits: ThinRetransmits,
6021 largest_acked: Option<u64>,
6022 stream_frames: StreamMetaVec,
6023 non_retransmits: bool,
6025 requires_padding: bool,
6026}
6027
6028impl SentFrames {
6029 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6031 self.largest_acked.is_some()
6032 && !self.non_retransmits
6033 && self.stream_frames.is_empty()
6034 && self.retransmits.is_empty(streams)
6035 }
6036}
6037
6038fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6046 match (x, y) {
6047 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6048 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6049 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6050 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6051 }
6052}
6053
6054#[cfg(feature = "pqc")]
6056#[derive(Debug, Clone)]
6057pub(crate) struct PqcState {
6058 enabled: bool,
6060 algorithms: Option<crate::transport_parameters::PqcAlgorithms>,
6062 handshake_mtu: u16,
6064 using_pqc: bool,
6066 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler,
6068}
6069
6070#[cfg(feature = "pqc")]
6071impl PqcState {
6072 fn new() -> Self {
6073 Self {
6074 enabled: false,
6075 algorithms: None,
6076 handshake_mtu: MIN_INITIAL_SIZE,
6077 using_pqc: false,
6078 packet_handler: crate::crypto::pqc::packet_handler::PqcPacketHandler::new(),
6079 }
6080 }
6081
6082 fn min_initial_size(&self) -> u16 {
6084 if self.enabled && self.using_pqc {
6085 std::cmp::max(self.handshake_mtu, 4096)
6087 } else {
6088 MIN_INITIAL_SIZE
6089 }
6090 }
6091
6092 fn update_from_peer_params(&mut self, params: &TransportParameters) {
6094 if let Some(ref algorithms) = params.pqc_algorithms {
6095 self.enabled = true;
6096 self.algorithms = Some(algorithms.clone());
6097 if algorithms.ml_kem_768
6099 || algorithms.ml_dsa_65
6100 || algorithms.hybrid_x25519_ml_kem
6101 || algorithms.hybrid_ed25519_ml_dsa
6102 {
6103 self.using_pqc = true;
6104 self.handshake_mtu = 4096; }
6106 }
6107 }
6108
6109 fn detect_pqc_from_crypto(&mut self, crypto_data: &[u8], space: SpaceId) {
6111 if self.packet_handler.detect_pqc_handshake(crypto_data, space) {
6112 self.using_pqc = true;
6113 self.handshake_mtu = self.packet_handler.get_min_packet_size(space);
6115 }
6116 }
6117
6118 fn should_trigger_mtu_discovery(&mut self) -> bool {
6120 self.packet_handler.should_trigger_mtu_discovery()
6121 }
6122
6123 fn get_mtu_config(&self) -> MtuDiscoveryConfig {
6125 self.packet_handler.get_pqc_mtu_config()
6126 }
6127
6128 fn calculate_crypto_frame_size(&self, available_space: usize, remaining_data: usize) -> usize {
6130 self.packet_handler
6131 .calculate_crypto_frame_size(available_space, remaining_data)
6132 }
6133
6134 fn should_adjust_coalescing(&self, current_size: usize, space: SpaceId) -> bool {
6136 self.packet_handler
6137 .adjust_coalescing_for_pqc(current_size, space)
6138 }
6139
6140 fn on_packet_sent(&mut self, space: SpaceId, size: u16) {
6142 self.packet_handler.on_packet_sent(space, size);
6143 }
6144
6145 fn reset(&mut self) {
6147 self.enabled = false;
6148 self.algorithms = None;
6149 self.handshake_mtu = MIN_INITIAL_SIZE;
6150 self.using_pqc = false;
6151 self.packet_handler.reset();
6152 }
6153}
6154
6155#[cfg(feature = "pqc")]
6156impl Default for PqcState {
6157 fn default() -> Self {
6158 Self::new()
6159 }
6160}
6161
6162#[derive(Debug, Clone)]
6164pub(crate) struct AddressDiscoveryState {
6165 enabled: bool,
6167 max_observation_rate: u8,
6169 observe_all_paths: bool,
6171 path_addresses: std::collections::HashMap<u64, paths::PathAddressInfo>,
6173 rate_limiter: AddressObservationRateLimiter,
6175 observed_addresses: Vec<ObservedAddressEvent>,
6177 bootstrap_mode: bool,
6179 next_sequence_number: VarInt,
6181 last_received_sequence: std::collections::HashMap<u64, VarInt>,
6183}
6184
6185#[derive(Debug, Clone, PartialEq, Eq)]
6187struct ObservedAddressEvent {
6188 address: SocketAddr,
6190 received_at: Instant,
6192 path_id: u64,
6194}
6195
6196#[derive(Debug, Clone)]
6198struct AddressObservationRateLimiter {
6199 tokens: f64,
6201 max_tokens: f64,
6203 rate: f64,
6205 last_update: Instant,
6207}
6208
6209impl AddressDiscoveryState {
6210 fn new(config: &crate::transport_parameters::AddressDiscoveryConfig, now: Instant) -> Self {
6212 use crate::transport_parameters::AddressDiscoveryConfig::*;
6213
6214 let (enabled, _can_send, _can_receive) = match config {
6216 SendOnly => (true, true, false),
6217 ReceiveOnly => (true, false, true),
6218 SendAndReceive => (true, true, true),
6219 };
6220
6221 let max_observation_rate = 10u8; let observe_all_paths = false; Self {
6227 enabled,
6228 max_observation_rate,
6229 observe_all_paths,
6230 path_addresses: std::collections::HashMap::new(),
6231 rate_limiter: AddressObservationRateLimiter::new(max_observation_rate, now),
6232 observed_addresses: Vec::new(),
6233 bootstrap_mode: false,
6234 next_sequence_number: VarInt::from_u32(0),
6235 last_received_sequence: std::collections::HashMap::new(),
6236 }
6237 }
6238
6239 fn should_send_observation(&mut self, path_id: u64, now: Instant) -> bool {
6241 if !self.should_observe_path(path_id) {
6243 return false;
6244 }
6245
6246 let needs_observation = match self.path_addresses.get(&path_id) {
6248 Some(info) => info.observed_address.is_none() || !info.notified,
6249 None => true,
6250 };
6251
6252 if !needs_observation {
6253 return false;
6254 }
6255
6256 self.rate_limiter.try_consume(1.0, now)
6258 }
6259
6260 fn record_observation_sent(&mut self, path_id: u64) {
6262 if let Some(info) = self.path_addresses.get_mut(&path_id) {
6263 info.mark_notified();
6264 }
6265 }
6266
6267 fn handle_observed_address(&mut self, address: SocketAddr, path_id: u64, now: Instant) {
6269 if !self.enabled {
6270 return;
6271 }
6272
6273 self.observed_addresses.push(ObservedAddressEvent {
6274 address,
6275 received_at: now,
6276 path_id,
6277 });
6278
6279 let info = self
6281 .path_addresses
6282 .entry(path_id)
6283 .or_insert_with(paths::PathAddressInfo::new);
6284 info.update_observed_address(address, now);
6285 }
6286
6287 pub(crate) fn get_observed_address(&self, path_id: u64) -> Option<SocketAddr> {
6289 self.path_addresses
6290 .get(&path_id)
6291 .and_then(|info| info.observed_address)
6292 }
6293
6294 pub(crate) fn get_all_observed_addresses(&self) -> Vec<SocketAddr> {
6296 self.path_addresses
6297 .values()
6298 .filter_map(|info| info.observed_address)
6299 .collect()
6300 }
6301
6302 pub(crate) fn stats(&self) -> AddressDiscoveryStats {
6304 AddressDiscoveryStats {
6305 frames_sent: self.observed_addresses.len() as u64, frames_received: self.observed_addresses.len() as u64,
6307 addresses_discovered: self
6308 .path_addresses
6309 .values()
6310 .filter(|info| info.observed_address.is_some())
6311 .count() as u64,
6312 address_changes_detected: 0, }
6314 }
6315
6316 fn has_unnotified_changes(&self) -> bool {
6318 self.path_addresses
6319 .values()
6320 .any(|info| info.observed_address.is_some() && !info.notified)
6321 }
6322
6323 fn queue_observed_address_frame(
6325 &mut self,
6326 path_id: u64,
6327 address: SocketAddr,
6328 ) -> Option<frame::ObservedAddress> {
6329 if !self.enabled {
6331 return None;
6332 }
6333
6334 if !self.observe_all_paths && path_id != 0 {
6336 return None;
6337 }
6338
6339 if let Some(info) = self.path_addresses.get(&path_id) {
6341 if info.notified {
6342 return None;
6343 }
6344 }
6345
6346 if self.rate_limiter.tokens < 1.0 {
6348 return None;
6349 }
6350
6351 self.rate_limiter.tokens -= 1.0;
6353
6354 let info = self
6356 .path_addresses
6357 .entry(path_id)
6358 .or_insert_with(paths::PathAddressInfo::new);
6359 info.observed_address = Some(address);
6360 info.notified = true;
6361
6362 let sequence_number = self.next_sequence_number;
6364 self.next_sequence_number = VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6365 .expect("sequence number overflow");
6366
6367 Some(frame::ObservedAddress {
6368 sequence_number,
6369 address,
6370 })
6371 }
6372
6373 fn check_for_address_observations(
6375 &mut self,
6376 _current_path: u64,
6377 peer_supports_address_discovery: bool,
6378 now: Instant,
6379 ) -> Vec<frame::ObservedAddress> {
6380 let mut frames = Vec::new();
6381
6382 if !self.enabled || !peer_supports_address_discovery {
6384 return frames;
6385 }
6386
6387 self.rate_limiter.update_tokens(now);
6389
6390 let paths_to_notify: Vec<u64> = self
6392 .path_addresses
6393 .iter()
6394 .filter_map(|(&path_id, info)| {
6395 if info.observed_address.is_some() && !info.notified {
6396 Some(path_id)
6397 } else {
6398 None
6399 }
6400 })
6401 .collect();
6402
6403 for path_id in paths_to_notify {
6405 if !self.should_observe_path(path_id) {
6407 continue;
6408 }
6409
6410 if !self.bootstrap_mode && self.rate_limiter.tokens < 1.0 {
6412 break; }
6414
6415 if let Some(info) = self.path_addresses.get_mut(&path_id) {
6417 if let Some(address) = info.observed_address {
6418 if self.bootstrap_mode {
6420 self.rate_limiter.tokens -= 0.2; } else {
6422 self.rate_limiter.tokens -= 1.0;
6423 }
6424
6425 info.notified = true;
6427
6428 let sequence_number = self.next_sequence_number;
6430 self.next_sequence_number =
6431 VarInt::from_u64(self.next_sequence_number.into_inner() + 1)
6432 .expect("sequence number overflow");
6433
6434 frames.push(frame::ObservedAddress {
6435 sequence_number,
6436 address,
6437 });
6438 }
6439 }
6440 }
6441
6442 frames
6443 }
6444
6445 fn update_rate_limit(&mut self, new_rate: f64) {
6447 self.max_observation_rate = new_rate as u8;
6448 self.rate_limiter.set_rate(new_rate as u8);
6449 }
6450
6451 fn from_transport_params(params: &TransportParameters) -> Option<Self> {
6453 params
6454 .address_discovery
6455 .as_ref()
6456 .map(|config| Self::new(config, Instant::now()))
6457 }
6458
6459 #[cfg(test)]
6461 fn new_with_params(enabled: bool, max_rate: f64, observe_all_paths: bool) -> Self {
6462 if !enabled {
6464 return Self {
6466 enabled: false,
6467 max_observation_rate: max_rate as u8,
6468 observe_all_paths,
6469 path_addresses: std::collections::HashMap::new(),
6470 rate_limiter: AddressObservationRateLimiter::new(max_rate as u8, Instant::now()),
6471 observed_addresses: Vec::new(),
6472 bootstrap_mode: false,
6473 next_sequence_number: VarInt::from_u32(0),
6474 last_received_sequence: std::collections::HashMap::new(),
6475 };
6476 }
6477
6478 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6480 let mut state = Self::new(&config, Instant::now());
6481 state.max_observation_rate = max_rate as u8;
6482 state.observe_all_paths = observe_all_paths;
6483 state.rate_limiter = AddressObservationRateLimiter::new(max_rate as u8, Instant::now());
6484 state
6485 }
6486
6487 fn set_bootstrap_mode(&mut self, enabled: bool) {
6489 self.bootstrap_mode = enabled;
6490 if enabled {
6492 let bootstrap_rate = self.get_effective_rate_limit();
6493 self.rate_limiter.rate = bootstrap_rate;
6494 self.rate_limiter.max_tokens = bootstrap_rate * 2.0; self.rate_limiter.tokens = self.rate_limiter.max_tokens;
6497 }
6498 }
6499
6500 fn is_bootstrap_mode(&self) -> bool {
6502 self.bootstrap_mode
6503 }
6504
6505 fn get_effective_rate_limit(&self) -> f64 {
6507 if self.bootstrap_mode {
6508 (self.max_observation_rate as f64) * 5.0
6510 } else {
6511 self.max_observation_rate as f64
6512 }
6513 }
6514
6515 fn should_observe_path(&self, path_id: u64) -> bool {
6517 if !self.enabled {
6518 return false;
6519 }
6520
6521 if self.bootstrap_mode {
6523 return true;
6524 }
6525
6526 self.observe_all_paths || path_id == 0
6528 }
6529
6530 fn should_send_observation_immediately(&self, is_new_connection: bool) -> bool {
6532 self.bootstrap_mode && is_new_connection
6533 }
6534}
6535
6536impl AddressObservationRateLimiter {
6537 fn new(rate: u8, now: Instant) -> Self {
6539 let rate_f64 = rate as f64;
6540 Self {
6541 tokens: rate_f64,
6542 max_tokens: rate_f64,
6543 rate: rate_f64,
6544 last_update: now,
6545 }
6546 }
6547
6548 fn try_consume(&mut self, tokens: f64, now: Instant) -> bool {
6550 self.update_tokens(now);
6551
6552 if self.tokens >= tokens {
6553 self.tokens -= tokens;
6554 true
6555 } else {
6556 false
6557 }
6558 }
6559
6560 fn update_tokens(&mut self, now: Instant) {
6562 let elapsed = now.saturating_duration_since(self.last_update);
6563 let new_tokens = elapsed.as_secs_f64() * self.rate;
6564 self.tokens = (self.tokens + new_tokens).min(self.max_tokens);
6565 self.last_update = now;
6566 }
6567
6568 fn set_rate(&mut self, rate: u8) {
6570 let rate_f64 = rate as f64;
6571 self.rate = rate_f64;
6572 self.max_tokens = rate_f64;
6573 if self.tokens > self.max_tokens {
6575 self.tokens = self.max_tokens;
6576 }
6577 }
6578}
6579
6580#[cfg(test)]
6581mod tests {
6582 use super::*;
6583 use crate::transport_parameters::AddressDiscoveryConfig;
6584 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
6585
6586 #[test]
6587 fn address_discovery_state_new() {
6588 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6589 let now = Instant::now();
6590 let state = AddressDiscoveryState::new(&config, now);
6591
6592 assert!(state.enabled);
6593 assert_eq!(state.max_observation_rate, 10);
6594 assert!(!state.observe_all_paths);
6595 assert!(state.path_addresses.is_empty());
6596 assert!(state.observed_addresses.is_empty());
6597 assert_eq!(state.rate_limiter.tokens, 10.0);
6598 }
6599
6600 #[test]
6601 fn address_discovery_state_disabled() {
6602 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6603 let now = Instant::now();
6604 let mut state = AddressDiscoveryState::new(&config, now);
6605
6606 state.enabled = false;
6608
6609 assert!(!state.should_send_observation(0, now));
6611 }
6612
6613 #[test]
6614 fn address_discovery_state_should_send_observation() {
6615 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6616 let now = Instant::now();
6617 let mut state = AddressDiscoveryState::new(&config, now);
6618
6619 assert!(state.should_send_observation(0, now));
6621
6622 let mut path_info = paths::PathAddressInfo::new();
6624 path_info.update_observed_address(
6625 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6626 now,
6627 );
6628 path_info.mark_notified();
6629 state.path_addresses.insert(0, path_info);
6630
6631 assert!(!state.should_send_observation(0, now));
6633
6634 assert!(!state.should_send_observation(1, now));
6636 }
6637
6638 #[test]
6639 fn address_discovery_state_rate_limiting() {
6640 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6641 let now = Instant::now();
6642 let mut state = AddressDiscoveryState::new(&config, now);
6643
6644 state.observe_all_paths = true;
6646
6647 assert!(state.should_send_observation(0, now));
6649
6650 state.rate_limiter.try_consume(9.0, now); assert!(!state.should_send_observation(0, now));
6655
6656 let later = now + Duration::from_secs(1);
6658 state.rate_limiter.update_tokens(later);
6659 assert!(state.should_send_observation(0, later));
6660 }
6661
6662 #[test]
6663 fn address_discovery_state_handle_observed_address() {
6664 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6665 let now = Instant::now();
6666 let mut state = AddressDiscoveryState::new(&config, now);
6667
6668 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6669 let addr2 = SocketAddr::new(
6670 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
6671 8080,
6672 );
6673
6674 state.handle_observed_address(addr1, 0, now);
6676 assert_eq!(state.observed_addresses.len(), 1);
6677 assert_eq!(state.observed_addresses[0].address, addr1);
6678 assert_eq!(state.observed_addresses[0].path_id, 0);
6679
6680 let later = now + Duration::from_millis(100);
6682 state.handle_observed_address(addr2, 1, later);
6683 assert_eq!(state.observed_addresses.len(), 2);
6684 assert_eq!(state.observed_addresses[1].address, addr2);
6685 assert_eq!(state.observed_addresses[1].path_id, 1);
6686 }
6687
6688 #[test]
6689 fn address_discovery_state_get_observed_address() {
6690 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6691 let now = Instant::now();
6692 let mut state = AddressDiscoveryState::new(&config, now);
6693
6694 assert_eq!(state.get_observed_address(0), None);
6696
6697 let mut path_info = paths::PathAddressInfo::new();
6699 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80);
6700 path_info.update_observed_address(addr, now);
6701 state.path_addresses.insert(0, path_info);
6702
6703 assert_eq!(state.get_observed_address(0), Some(addr));
6705 assert_eq!(state.get_observed_address(1), None);
6706 }
6707
6708 #[test]
6709 fn address_discovery_state_unnotified_changes() {
6710 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6711 let now = Instant::now();
6712 let mut state = AddressDiscoveryState::new(&config, now);
6713
6714 assert!(!state.has_unnotified_changes());
6716
6717 let mut path_info = paths::PathAddressInfo::new();
6719 path_info.update_observed_address(
6720 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
6721 now,
6722 );
6723 state.path_addresses.insert(0, path_info);
6724
6725 assert!(state.has_unnotified_changes());
6727
6728 state.record_observation_sent(0);
6730 assert!(!state.has_unnotified_changes());
6731 }
6732
6733 #[test]
6734 fn address_observation_rate_limiter_token_bucket() {
6735 let now = Instant::now();
6736 let mut limiter = AddressObservationRateLimiter::new(5, now); assert_eq!(limiter.tokens, 5.0);
6740 assert_eq!(limiter.max_tokens, 5.0);
6741 assert_eq!(limiter.rate, 5.0);
6742
6743 assert!(limiter.try_consume(3.0, now));
6745 assert_eq!(limiter.tokens, 2.0);
6746
6747 assert!(!limiter.try_consume(3.0, now));
6749 assert_eq!(limiter.tokens, 2.0);
6750
6751 let later = now + Duration::from_secs(1);
6753 limiter.update_tokens(later);
6754 assert_eq!(limiter.tokens, 5.0); let half_sec = now + Duration::from_millis(500);
6758 let mut limiter2 = AddressObservationRateLimiter::new(5, now);
6759 limiter2.try_consume(3.0, now);
6760 limiter2.update_tokens(half_sec);
6761 assert_eq!(limiter2.tokens, 4.5); }
6763
6764 #[test]
6766 fn connection_initializes_address_discovery_state_default() {
6767 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6770 let state = AddressDiscoveryState::new(&config, Instant::now());
6771 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
6774 }
6775
6776 #[test]
6777 fn connection_initializes_with_address_discovery_enabled() {
6778 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
6780 let state = AddressDiscoveryState::new(&config, Instant::now());
6781 assert!(state.enabled);
6782 assert_eq!(state.max_observation_rate, 10);
6783 assert!(!state.observe_all_paths);
6784 }
6785
6786 #[test]
6787 fn connection_address_discovery_enabled_by_default() {
6788 let config = crate::transport_parameters::AddressDiscoveryConfig::default();
6790 let state = AddressDiscoveryState::new(&config, Instant::now());
6791 assert!(state.enabled); }
6793
6794 #[test]
6795 fn negotiate_max_idle_timeout_commutative() {
6796 let test_params = [
6797 (None, None, None),
6798 (None, Some(VarInt(0)), None),
6799 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6800 (Some(VarInt(0)), Some(VarInt(0)), None),
6801 (
6802 Some(VarInt(2)),
6803 Some(VarInt(0)),
6804 Some(Duration::from_millis(2)),
6805 ),
6806 (
6807 Some(VarInt(1)),
6808 Some(VarInt(4)),
6809 Some(Duration::from_millis(1)),
6810 ),
6811 ];
6812
6813 for (left, right, result) in test_params {
6814 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6815 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6816 }
6817 }
6818
6819 #[test]
6820 fn path_creation_initializes_address_discovery() {
6821 let config = TransportConfig::default();
6822 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6823 let now = Instant::now();
6824
6825 let path = paths::PathData::new(remote, false, None, now, &config);
6827
6828 assert!(path.address_info.observed_address.is_none());
6830 assert!(path.address_info.last_observed.is_none());
6831 assert_eq!(path.address_info.observation_count, 0);
6832 assert!(!path.address_info.notified);
6833
6834 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6836 assert_eq!(path.observation_rate_limiter.max_tokens, 10.0);
6837 assert_eq!(path.observation_rate_limiter.tokens, 10.0);
6838 }
6839
6840 #[test]
6841 fn path_migration_resets_address_discovery() {
6842 let config = TransportConfig::default();
6843 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6844 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6845 let now = Instant::now();
6846
6847 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
6849 path1.update_observed_address(remote1, now);
6850 path1.mark_address_notified();
6851 path1.consume_observation_token(now);
6852 path1.set_observation_rate(20);
6853
6854 let path2 = paths::PathData::from_previous(remote2, &path1, now);
6856
6857 assert!(path2.address_info.observed_address.is_none());
6859 assert!(path2.address_info.last_observed.is_none());
6860 assert_eq!(path2.address_info.observation_count, 0);
6861 assert!(!path2.address_info.notified);
6862
6863 assert_eq!(path2.observation_rate_limiter.rate, 20.0);
6865 assert_eq!(path2.observation_rate_limiter.tokens, 20.0);
6866 }
6867
6868 #[test]
6869 fn connection_path_updates_observation_rate() {
6870 let config = TransportConfig::default();
6871 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 42);
6872 let now = Instant::now();
6873
6874 let mut path = paths::PathData::new(remote, false, None, now, &config);
6875
6876 assert_eq!(path.observation_rate_limiter.rate, 10.0);
6878
6879 path.set_observation_rate(25);
6881 assert_eq!(path.observation_rate_limiter.rate, 25.0);
6882 assert_eq!(path.observation_rate_limiter.max_tokens, 25.0);
6883
6884 path.observation_rate_limiter.tokens = 30.0; path.set_observation_rate(20);
6887 assert_eq!(path.observation_rate_limiter.tokens, 20.0); }
6889
6890 #[test]
6891 fn path_validation_preserves_discovery_state() {
6892 let config = TransportConfig::default();
6893 let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6894 let now = Instant::now();
6895
6896 let mut path = paths::PathData::new(remote, false, None, now, &config);
6897
6898 let observed = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 5678);
6900 path.update_observed_address(observed, now);
6901 path.set_observation_rate(15);
6902
6903 path.validated = true;
6905
6906 assert_eq!(path.address_info.observed_address, Some(observed));
6908 assert_eq!(path.observation_rate_limiter.rate, 15.0);
6909 }
6910
6911 #[test]
6912 fn address_discovery_state_initialization() {
6913 let state = AddressDiscoveryState::new_with_params(true, 30.0, true);
6915
6916 assert!(state.enabled);
6917 assert_eq!(state.max_observation_rate, 30);
6918 assert!(state.observe_all_paths);
6919 assert!(state.path_addresses.is_empty());
6920 assert!(state.observed_addresses.is_empty());
6921 }
6922
6923 #[test]
6925 fn handle_observed_address_frame_basic() {
6926 let config = AddressDiscoveryConfig::SendAndReceive;
6927 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6928 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6929 let now = Instant::now();
6930 let path_id = 0;
6931
6932 state.handle_observed_address(addr, path_id, now);
6934
6935 assert_eq!(state.observed_addresses.len(), 1);
6937 assert_eq!(state.observed_addresses[0].address, addr);
6938 assert_eq!(state.observed_addresses[0].path_id, path_id);
6939 assert_eq!(state.observed_addresses[0].received_at, now);
6940
6941 assert!(state.path_addresses.contains_key(&path_id));
6943 let path_info = &state.path_addresses[&path_id];
6944 assert_eq!(path_info.observed_address, Some(addr));
6945 assert_eq!(path_info.last_observed, Some(now));
6946 assert_eq!(path_info.observation_count, 1);
6947 }
6948
6949 #[test]
6950 fn handle_observed_address_frame_multiple_observations() {
6951 let config = AddressDiscoveryConfig::SendAndReceive;
6952 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6953 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6954 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
6955 let now = Instant::now();
6956 let path_id = 0;
6957
6958 state.handle_observed_address(addr1, path_id, now);
6960 state.handle_observed_address(addr1, path_id, now + Duration::from_secs(1));
6961 state.handle_observed_address(addr2, path_id, now + Duration::from_secs(2));
6962
6963 assert_eq!(state.observed_addresses.len(), 3);
6965
6966 let path_info = &state.path_addresses[&path_id];
6968 assert_eq!(path_info.observed_address, Some(addr2));
6969 assert_eq!(path_info.observation_count, 1); }
6971
6972 #[test]
6973 fn handle_observed_address_frame_disabled() {
6974 let config = AddressDiscoveryConfig::SendAndReceive;
6975 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6976 state.enabled = false; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
6978 let now = Instant::now();
6979
6980 state.handle_observed_address(addr, 0, now);
6982
6983 assert!(state.observed_addresses.is_empty());
6985 assert!(state.path_addresses.is_empty());
6986 }
6987
6988 #[test]
6989 fn should_send_observation_basic() {
6990 let config = AddressDiscoveryConfig::SendAndReceive;
6991 let mut state = AddressDiscoveryState::new(&config, Instant::now());
6992 state.max_observation_rate = 10;
6993 let now = Instant::now();
6994 let path_id = 0;
6995
6996 assert!(state.should_send_observation(path_id, now));
6998
6999 state.record_observation_sent(path_id);
7001
7002 assert!(state.should_send_observation(path_id, now));
7004 }
7005
7006 #[test]
7007 fn should_send_observation_rate_limiting() {
7008 let config = AddressDiscoveryConfig::SendAndReceive;
7009 let now = Instant::now();
7010 let mut state = AddressDiscoveryState::new(&config, now);
7011 state.max_observation_rate = 2; state.update_rate_limit(2.0);
7013 let path_id = 0;
7014
7015 assert!(state.should_send_observation(path_id, now));
7017 state.record_observation_sent(path_id);
7018 assert!(state.should_send_observation(path_id, now));
7019 state.record_observation_sent(path_id);
7020
7021 assert!(!state.should_send_observation(path_id, now));
7023
7024 let later = now + Duration::from_secs(1);
7026 assert!(state.should_send_observation(path_id, later));
7027 }
7028
7029 #[test]
7030 fn should_send_observation_disabled() {
7031 let config = AddressDiscoveryConfig::SendAndReceive;
7032 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7033 state.enabled = false;
7034
7035 assert!(!state.should_send_observation(0, Instant::now()));
7037 }
7038
7039 #[test]
7040 fn should_send_observation_per_path() {
7041 let config = AddressDiscoveryConfig::SendAndReceive;
7042 let now = Instant::now();
7043 let mut state = AddressDiscoveryState::new(&config, now);
7044 state.max_observation_rate = 2; state.observe_all_paths = true;
7046 state.update_rate_limit(2.0);
7047
7048 assert!(state.should_send_observation(0, now));
7050 state.record_observation_sent(0);
7051
7052 assert!(state.should_send_observation(1, now));
7054 state.record_observation_sent(1);
7055
7056 assert!(!state.should_send_observation(0, now));
7058 assert!(!state.should_send_observation(1, now));
7059
7060 let later = now + Duration::from_secs(1);
7062 assert!(state.should_send_observation(0, later));
7063 }
7064
7065 #[test]
7066 fn has_unnotified_changes_test() {
7067 let config = AddressDiscoveryConfig::SendAndReceive;
7068 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7069 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7070 let now = Instant::now();
7071
7072 assert!(!state.has_unnotified_changes());
7074
7075 state.handle_observed_address(addr, 0, now);
7077 assert!(state.has_unnotified_changes());
7078
7079 state.path_addresses.get_mut(&0).unwrap().notified = true;
7081 assert!(!state.has_unnotified_changes());
7082 }
7083
7084 #[test]
7085 fn get_observed_address_test() {
7086 let config = AddressDiscoveryConfig::SendAndReceive;
7087 let mut state = AddressDiscoveryState::new(&config, Instant::now());
7088 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7089 let now = Instant::now();
7090 let path_id = 0;
7091
7092 assert_eq!(state.get_observed_address(path_id), None);
7094
7095 state.handle_observed_address(addr, path_id, now);
7097 assert_eq!(state.get_observed_address(path_id), Some(addr));
7098
7099 assert_eq!(state.get_observed_address(999), None);
7101 }
7102
7103 #[test]
7105 fn rate_limiter_token_bucket_basic() {
7106 let now = Instant::now();
7107 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(5.0, now));
7111 assert!(limiter.try_consume(5.0, now));
7112
7113 assert!(!limiter.try_consume(1.0, now));
7115 }
7116
7117 #[test]
7118 fn rate_limiter_token_replenishment() {
7119 let now = Instant::now();
7120 let mut limiter = AddressObservationRateLimiter::new(10, now); assert!(limiter.try_consume(10.0, now));
7124 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7128 assert!(limiter.try_consume(10.0, later)); assert!(!limiter.try_consume(0.1, later)); let later = later + Duration::from_millis(500);
7133 assert!(limiter.try_consume(5.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7136
7137 #[test]
7138 fn rate_limiter_max_tokens_cap() {
7139 let now = Instant::now();
7140 let mut limiter = AddressObservationRateLimiter::new(10, now);
7141
7142 let later = now + Duration::from_secs(2);
7144 assert!(limiter.try_consume(10.0, later));
7146 assert!(!limiter.try_consume(10.1, later)); let later2 = later + Duration::from_secs(1);
7150 assert!(limiter.try_consume(3.0, later2));
7151
7152 let much_later = later2 + Duration::from_secs(2);
7154 assert!(limiter.try_consume(10.0, much_later)); assert!(!limiter.try_consume(0.1, much_later)); }
7157
7158 #[test]
7159 fn rate_limiter_fractional_consumption() {
7160 let now = Instant::now();
7161 let mut limiter = AddressObservationRateLimiter::new(10, now);
7162
7163 assert!(limiter.try_consume(0.5, now));
7165 assert!(limiter.try_consume(2.3, now));
7166 assert!(limiter.try_consume(7.2, now)); assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_millis(100); assert!(limiter.try_consume(1.0, later));
7172 assert!(!limiter.try_consume(0.1, later));
7173 }
7174
7175 #[test]
7176 fn rate_limiter_zero_rate() {
7177 let now = Instant::now();
7178 let mut limiter = AddressObservationRateLimiter::new(0, now); assert!(!limiter.try_consume(1.0, now));
7182 assert!(!limiter.try_consume(0.1, now));
7183 assert!(!limiter.try_consume(0.001, now));
7184
7185 let later = now + Duration::from_secs(10);
7187 assert!(!limiter.try_consume(0.001, later));
7188 }
7189
7190 #[test]
7191 fn rate_limiter_high_rate() {
7192 let now = Instant::now();
7193 let mut limiter = AddressObservationRateLimiter::new(63, now); assert!(limiter.try_consume(60.0, now));
7197 assert!(limiter.try_consume(3.0, now));
7198 assert!(!limiter.try_consume(0.1, now)); let later = now + Duration::from_secs(1);
7202 assert!(limiter.try_consume(63.0, later)); assert!(!limiter.try_consume(0.1, later)); }
7205
7206 #[test]
7207 fn rate_limiter_time_precision() {
7208 let now = Instant::now();
7209 let mut limiter = AddressObservationRateLimiter::new(100, now); assert!(limiter.try_consume(100.0, now));
7213 assert!(!limiter.try_consume(0.1, now));
7214
7215 let later = now + Duration::from_millis(10);
7217 assert!(limiter.try_consume(0.8, later)); assert!(!limiter.try_consume(0.5, later)); let much_later = later + Duration::from_millis(100); assert!(limiter.try_consume(5.0, much_later)); limiter.tokens = 0.0; let final_time = much_later + Duration::from_millis(1);
7229 limiter.update_tokens(final_time); assert!(limiter.tokens >= 0.09 && limiter.tokens <= 0.11);
7234 }
7235
7236 #[test]
7237 fn per_path_rate_limiting_independent() {
7238 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7239 let now = Instant::now();
7240 let mut state = AddressDiscoveryState::new(&config, now);
7241
7242 state.observe_all_paths = true;
7244
7245 state.update_rate_limit(5.0);
7247
7248 state
7250 .path_addresses
7251 .insert(0, paths::PathAddressInfo::new());
7252 state
7253 .path_addresses
7254 .insert(1, paths::PathAddressInfo::new());
7255 state
7256 .path_addresses
7257 .insert(2, paths::PathAddressInfo::new());
7258
7259 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7261 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7262 8080,
7263 ));
7264 state.path_addresses.get_mut(&1).unwrap().observed_address = Some(SocketAddr::new(
7265 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
7266 8081,
7267 ));
7268 state.path_addresses.get_mut(&2).unwrap().observed_address = Some(SocketAddr::new(
7269 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)),
7270 8082,
7271 ));
7272
7273 for _ in 0..3 {
7275 assert!(state.should_send_observation(0, now));
7276 state.record_observation_sent(0);
7277 state.path_addresses.get_mut(&0).unwrap().notified = false;
7279 }
7280
7281 for _ in 0..2 {
7283 assert!(state.should_send_observation(1, now));
7284 state.record_observation_sent(1);
7285 state.path_addresses.get_mut(&1).unwrap().notified = false;
7287 }
7288
7289 assert!(!state.should_send_observation(2, now));
7291
7292 let later = now + Duration::from_secs(1);
7294
7295 assert!(state.should_send_observation(0, later));
7297 assert!(state.should_send_observation(1, later));
7298 assert!(state.should_send_observation(2, later));
7299 }
7300
7301 #[test]
7302 fn per_path_rate_limiting_with_path_specific_limits() {
7303 let now = Instant::now();
7304 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7305 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7306 let config = TransportConfig::default();
7307
7308 let mut path1 = paths::PathData::new(remote1, false, None, now, &config);
7310 let mut path2 = paths::PathData::new(remote2, false, None, now, &config);
7311
7312 path1.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now); path2.observation_rate_limiter = paths::PathObservationRateLimiter::new(5, now); for _ in 0..10 {
7318 assert!(path1.observation_rate_limiter.can_send(now));
7319 path1.observation_rate_limiter.consume_token(now);
7320 }
7321 assert!(!path1.observation_rate_limiter.can_send(now));
7322
7323 for _ in 0..5 {
7325 assert!(path2.observation_rate_limiter.can_send(now));
7326 path2.observation_rate_limiter.consume_token(now);
7327 }
7328 assert!(!path2.observation_rate_limiter.can_send(now));
7329 }
7330
7331 #[test]
7332 fn per_path_rate_limiting_address_change_detection() {
7333 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7334 let now = Instant::now();
7335 let mut state = AddressDiscoveryState::new(&config, now);
7336
7337 let path_id = 0;
7339 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7340 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8080);
7341
7342 assert!(state.should_send_observation(path_id, now));
7344 state.handle_observed_address(addr1, path_id, now);
7345 state.record_observation_sent(path_id);
7346
7347 assert!(!state.should_send_observation(path_id, now));
7349
7350 state.handle_observed_address(addr2, path_id, now);
7352 if let Some(info) = state.path_addresses.get_mut(&path_id) {
7353 info.notified = false; }
7355
7356 assert!(state.should_send_observation(path_id, now));
7358 }
7359
7360 #[test]
7361 fn per_path_rate_limiting_migration() {
7362 let now = Instant::now();
7363 let remote1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
7364 let remote2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), 8081);
7365 let config = TransportConfig::default();
7366
7367 let mut path = paths::PathData::new(remote1, false, None, now, &config);
7369 path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7370
7371 for _ in 0..5 {
7373 assert!(path.observation_rate_limiter.can_send(now));
7374 path.observation_rate_limiter.consume_token(now);
7375 }
7376
7377 let mut new_path = paths::PathData::new(remote2, false, None, now, &config);
7379
7380 new_path.observation_rate_limiter = paths::PathObservationRateLimiter::new(10, now);
7383
7384 for _ in 0..10 {
7386 assert!(new_path.observation_rate_limiter.can_send(now));
7387 new_path.observation_rate_limiter.consume_token(now);
7388 }
7389 assert!(!new_path.observation_rate_limiter.can_send(now));
7390 }
7391
7392 #[test]
7393 fn per_path_rate_limiting_disabled_paths() {
7394 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7395 let now = Instant::now();
7396 let mut state = AddressDiscoveryState::new(&config, now);
7397
7398 assert!(state.should_send_observation(0, now));
7400
7401 assert!(!state.should_send_observation(1, now));
7403 assert!(!state.should_send_observation(2, now));
7404
7405 let later = now + Duration::from_secs(1);
7407 assert!(!state.should_send_observation(1, later));
7408 }
7409
7410 #[test]
7411 fn respecting_negotiated_max_observation_rate_basic() {
7412 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7413 let now = Instant::now();
7414 let mut state = AddressDiscoveryState::new(&config, now);
7415
7416 state.max_observation_rate = 10; state.rate_limiter = AddressObservationRateLimiter::new(10, now);
7419
7420 for _ in 0..10 {
7422 assert!(state.should_send_observation(0, now));
7423 }
7424 assert!(!state.should_send_observation(0, now));
7426 }
7427
7428 #[test]
7429 fn respecting_negotiated_max_observation_rate_zero() {
7430 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7431 let now = Instant::now();
7432 let mut state = AddressDiscoveryState::new(&config, now);
7433
7434 state.max_observation_rate = 0;
7436 state.rate_limiter = AddressObservationRateLimiter::new(0, now);
7437
7438 assert!(!state.should_send_observation(0, now));
7440 assert!(!state.should_send_observation(1, now));
7441
7442 let later = now + Duration::from_secs(10);
7444 assert!(!state.should_send_observation(0, later));
7445 }
7446
7447 #[test]
7448 fn respecting_negotiated_max_observation_rate_higher() {
7449 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7450 let now = Instant::now();
7451 let mut state = AddressDiscoveryState::new(&config, now);
7452
7453 state
7455 .path_addresses
7456 .insert(0, paths::PathAddressInfo::new());
7457 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7458 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7459 8080,
7460 ));
7461
7462 state.update_rate_limit(5.0);
7464
7465 state.max_observation_rate = 20; for _ in 0..5 {
7470 assert!(state.should_send_observation(0, now));
7471 state.record_observation_sent(0);
7472 state.path_addresses.get_mut(&0).unwrap().notified = false;
7474 }
7475 assert!(!state.should_send_observation(0, now));
7477 }
7478
7479 #[test]
7480 fn respecting_negotiated_max_observation_rate_dynamic_update() {
7481 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7482 let now = Instant::now();
7483 let mut state = AddressDiscoveryState::new(&config, now);
7484
7485 state
7487 .path_addresses
7488 .insert(0, paths::PathAddressInfo::new());
7489 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7490 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
7491 8080,
7492 ));
7493
7494 for _ in 0..5 {
7496 assert!(state.should_send_observation(0, now));
7497 state.record_observation_sent(0);
7498 state.path_addresses.get_mut(&0).unwrap().notified = false;
7500 }
7501
7502 state.max_observation_rate = 3;
7506 state.rate_limiter.set_rate(3);
7507
7508 for _ in 0..3 {
7511 assert!(state.should_send_observation(0, now));
7512 state.record_observation_sent(0);
7513 state.path_addresses.get_mut(&0).unwrap().notified = false;
7515 }
7516
7517 assert!(!state.should_send_observation(0, now));
7519
7520 let later = now + Duration::from_secs(1);
7522 for _ in 0..3 {
7523 assert!(state.should_send_observation(0, later));
7524 state.record_observation_sent(0);
7525 state.path_addresses.get_mut(&0).unwrap().notified = false;
7527 }
7528
7529 assert!(!state.should_send_observation(0, later));
7531 }
7532
7533 #[test]
7534 fn respecting_negotiated_max_observation_rate_with_paths() {
7535 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7536 let now = Instant::now();
7537 let mut state = AddressDiscoveryState::new(&config, now);
7538
7539 state.observe_all_paths = true;
7541
7542 for i in 0..3 {
7544 state
7545 .path_addresses
7546 .insert(i, paths::PathAddressInfo::new());
7547 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
7548 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100 + i as u8)),
7549 5000,
7550 ));
7551 }
7552
7553 for _ in 0..3 {
7556 for i in 0..3 {
7558 if state.should_send_observation(i, now) {
7559 state.record_observation_sent(i);
7560 state.path_addresses.get_mut(&i).unwrap().notified = false;
7562 }
7563 }
7564 }
7565
7566 assert!(state.should_send_observation(0, now));
7569 state.record_observation_sent(0);
7570
7571 assert!(!state.should_send_observation(0, now));
7573 assert!(!state.should_send_observation(1, now));
7574 assert!(!state.should_send_observation(2, now));
7575 }
7576
7577 #[test]
7578 fn queue_observed_address_frame_basic() {
7579 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7580 let now = Instant::now();
7581 let mut state = AddressDiscoveryState::new(&config, now);
7582
7583 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7585 let frame = state.queue_observed_address_frame(0, address);
7586
7587 assert!(frame.is_some());
7589 let frame = frame.unwrap();
7590 assert_eq!(frame.address, address);
7591
7592 assert!(state.path_addresses.contains_key(&0));
7594 assert!(state.path_addresses.get(&0).unwrap().notified);
7595 }
7596
7597 #[test]
7598 fn queue_observed_address_frame_rate_limited() {
7599 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7600 let now = Instant::now();
7601 let mut state = AddressDiscoveryState::new(&config, now);
7602
7603 state.observe_all_paths = true;
7605
7606 let mut addresses = Vec::new();
7608 for i in 0..10 {
7609 let addr = SocketAddr::new(
7610 IpAddr::V4(Ipv4Addr::new(192, 168, 1, i as u8)),
7611 5000 + i as u16,
7612 );
7613 addresses.push(addr);
7614 assert!(
7615 state.queue_observed_address_frame(i as u64, addr).is_some(),
7616 "Frame {} should be allowed",
7617 i + 1
7618 );
7619 }
7620
7621 let addr11 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 11)), 5011);
7623 assert!(
7624 state.queue_observed_address_frame(10, addr11).is_none(),
7625 "11th frame should be rate limited"
7626 );
7627 }
7628
7629 #[test]
7630 fn queue_observed_address_frame_disabled() {
7631 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7632 let now = Instant::now();
7633 let mut state = AddressDiscoveryState::new(&config, now);
7634
7635 state.enabled = false;
7637
7638 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7639
7640 assert!(state.queue_observed_address_frame(0, address).is_none());
7642 }
7643
7644 #[test]
7645 fn queue_observed_address_frame_already_notified() {
7646 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7647 let now = Instant::now();
7648 let mut state = AddressDiscoveryState::new(&config, now);
7649
7650 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7651
7652 assert!(state.queue_observed_address_frame(0, address).is_some());
7654
7655 assert!(state.queue_observed_address_frame(0, address).is_none());
7657
7658 let new_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7660 assert!(state.queue_observed_address_frame(0, new_address).is_none());
7661 }
7662
7663 #[test]
7664 fn queue_observed_address_frame_primary_path_only() {
7665 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7666 let now = Instant::now();
7667 let mut state = AddressDiscoveryState::new(&config, now);
7668
7669 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7670
7671 assert!(state.queue_observed_address_frame(0, address).is_some());
7673
7674 assert!(state.queue_observed_address_frame(1, address).is_none());
7676 assert!(state.queue_observed_address_frame(2, address).is_none());
7677 }
7678
7679 #[test]
7680 fn queue_observed_address_frame_updates_path_info() {
7681 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7682 let now = Instant::now();
7683 let mut state = AddressDiscoveryState::new(&config, now);
7684
7685 let address = SocketAddr::new(
7686 IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
7687 5000,
7688 );
7689
7690 let frame = state.queue_observed_address_frame(0, address);
7692 assert!(frame.is_some());
7693
7694 let path_info = state.path_addresses.get(&0).unwrap();
7696 assert_eq!(path_info.observed_address, Some(address));
7697 assert!(path_info.notified);
7698
7699 assert_eq!(state.observed_addresses.len(), 0);
7702 }
7703
7704 #[test]
7705 fn retransmits_includes_observed_addresses() {
7706 use crate::connection::spaces::Retransmits;
7707
7708 let mut retransmits = Retransmits::default();
7710
7711 assert!(retransmits.observed_addresses.is_empty());
7713
7714 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7716 let frame = frame::ObservedAddress {
7717 sequence_number: VarInt::from_u32(1),
7718 address,
7719 };
7720 retransmits.observed_addresses.push(frame);
7721
7722 assert_eq!(retransmits.observed_addresses.len(), 1);
7724 assert_eq!(retransmits.observed_addresses[0].address, address);
7725 }
7726
7727 #[test]
7728 fn check_for_address_observations_no_peer_support() {
7729 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7730 let now = Instant::now();
7731 let mut state = AddressDiscoveryState::new(&config, now);
7732
7733 state
7735 .path_addresses
7736 .insert(0, paths::PathAddressInfo::new());
7737 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
7738 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)),
7739 5000,
7740 ));
7741
7742 let frames = state.check_for_address_observations(0, false, now);
7744
7745 assert!(frames.is_empty());
7747 }
7748
7749 #[test]
7750 fn check_for_address_observations_with_peer_support() {
7751 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7752 let now = Instant::now();
7753 let mut state = AddressDiscoveryState::new(&config, now);
7754
7755 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7757 state
7758 .path_addresses
7759 .insert(0, paths::PathAddressInfo::new());
7760 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(address);
7761
7762 let frames = state.check_for_address_observations(0, true, now);
7764
7765 assert_eq!(frames.len(), 1);
7767 assert_eq!(frames[0].address, address);
7768
7769 assert!(state.path_addresses.get(&0).unwrap().notified);
7771 }
7772
7773 #[test]
7774 fn check_for_address_observations_rate_limited() {
7775 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7776 let now = Instant::now();
7777 let mut state = AddressDiscoveryState::new(&config, now);
7778
7779 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7781 state
7782 .path_addresses
7783 .insert(0, paths::PathAddressInfo::new());
7784 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(address);
7785
7786 for _ in 0..10 {
7788 let frames = state.check_for_address_observations(0, true, now);
7789 if frames.is_empty() {
7790 break;
7791 }
7792 state.path_addresses.get_mut(&0).unwrap().notified = false;
7794 }
7795
7796 assert_eq!(state.rate_limiter.tokens, 0.0);
7798
7799 state.path_addresses.get_mut(&0).unwrap().notified = false;
7801
7802 let frames2 = state.check_for_address_observations(0, true, now);
7804 assert_eq!(frames2.len(), 0);
7805
7806 state.path_addresses.get_mut(&0).unwrap().notified = false;
7808
7809 let later = now + Duration::from_millis(200); let frames3 = state.check_for_address_observations(0, true, later);
7812 assert_eq!(frames3.len(), 1);
7813 }
7814
7815 #[test]
7816 fn check_for_address_observations_multiple_paths() {
7817 let config = crate::transport_parameters::AddressDiscoveryConfig::SendAndReceive;
7818 let now = Instant::now();
7819 let mut state = AddressDiscoveryState::new(&config, now);
7820
7821 state.observe_all_paths = true;
7823
7824 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 5000);
7826 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)), 5001);
7827
7828 state
7829 .path_addresses
7830 .insert(0, paths::PathAddressInfo::new());
7831 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(addr1);
7832
7833 state
7834 .path_addresses
7835 .insert(1, paths::PathAddressInfo::new());
7836 state.path_addresses.get_mut(&1).unwrap().observed_address = Some(addr2);
7837
7838 let frames = state.check_for_address_observations(0, true, now);
7840
7841 assert_eq!(frames.len(), 2);
7843
7844 let addresses: Vec<_> = frames.iter().map(|f| f.address).collect();
7846 assert!(addresses.contains(&addr1));
7847 assert!(addresses.contains(&addr2));
7848
7849 assert!(state.path_addresses.get(&0).unwrap().notified);
7851 assert!(state.path_addresses.get(&1).unwrap().notified);
7852 }
7853
7854 #[test]
7856 fn test_rate_limiter_configuration() {
7857 let state = AddressDiscoveryState::new_with_params(true, 10.0, false);
7859 assert_eq!(state.rate_limiter.rate, 10.0);
7860 assert_eq!(state.rate_limiter.max_tokens, 10.0);
7861 assert_eq!(state.rate_limiter.tokens, 10.0);
7862
7863 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
7864 assert_eq!(state.rate_limiter.rate, 63.0);
7865 assert_eq!(state.rate_limiter.max_tokens, 63.0);
7866 }
7867
7868 #[test]
7869 fn test_rate_limiter_update_configuration() {
7870 let mut state = AddressDiscoveryState::new_with_params(true, 5.0, false);
7871
7872 assert_eq!(state.rate_limiter.rate, 5.0);
7874
7875 state.update_rate_limit(10.0);
7877 assert_eq!(state.rate_limiter.rate, 10.0);
7878 assert_eq!(state.rate_limiter.max_tokens, 10.0);
7879
7880 state.rate_limiter.tokens = 15.0;
7882 state.update_rate_limit(8.0);
7883 assert_eq!(state.rate_limiter.tokens, 8.0);
7884 }
7885
7886 #[test]
7887 fn test_rate_limiter_from_transport_params() {
7888 let mut params = TransportParameters::default();
7889 params.address_discovery = Some(AddressDiscoveryConfig::SendAndReceive);
7890
7891 let state = AddressDiscoveryState::from_transport_params(¶ms);
7892 assert!(state.is_some());
7893 let state = state.unwrap();
7894 assert_eq!(state.rate_limiter.rate, 10.0); assert!(!state.observe_all_paths); }
7897
7898 #[test]
7899 fn test_rate_limiter_zero_rate() {
7900 let state = AddressDiscoveryState::new_with_params(true, 0.0, false);
7901 assert_eq!(state.rate_limiter.rate, 0.0);
7902 assert_eq!(state.rate_limiter.tokens, 0.0);
7903
7904 let address = "192.168.1.1:443".parse().unwrap();
7906 let mut state = AddressDiscoveryState::new_with_params(true, 0.0, false);
7907 let frame = state.queue_observed_address_frame(0, address);
7908 assert!(frame.is_none());
7909 }
7910
7911 #[test]
7912 fn test_rate_limiter_configuration_edge_cases() {
7913 let state = AddressDiscoveryState::new_with_params(true, 63.0, false);
7915 assert_eq!(state.rate_limiter.rate, 63.0);
7916
7917 let state = AddressDiscoveryState::new_with_params(true, 100.0, false);
7919 assert_eq!(state.rate_limiter.rate, 100.0);
7921
7922 let state = AddressDiscoveryState::new_with_params(true, 2.5, false);
7924 assert_eq!(state.rate_limiter.rate, 2.0);
7926 }
7927
7928 #[test]
7929 fn test_rate_limiter_runtime_update() {
7930 let mut state = AddressDiscoveryState::new_with_params(true, 10.0, false);
7931 let now = Instant::now();
7932
7933 state.rate_limiter.tokens = 5.0;
7935
7936 state.update_rate_limit(3.0);
7938
7939 assert_eq!(state.rate_limiter.tokens, 3.0);
7941 assert_eq!(state.rate_limiter.rate, 3.0);
7942 assert_eq!(state.rate_limiter.max_tokens, 3.0);
7943
7944 let later = now + Duration::from_secs(1);
7946 state.rate_limiter.update_tokens(later);
7947
7948 assert_eq!(state.rate_limiter.tokens, 3.0);
7950 }
7951
7952 #[test]
7954 fn test_address_discovery_state_initialization_default() {
7955 let now = Instant::now();
7957 let default_config = crate::transport_parameters::AddressDiscoveryConfig::default();
7958
7959 let address_discovery_state = Some(AddressDiscoveryState::new(&default_config, now));
7962
7963 assert!(address_discovery_state.is_some());
7964 let state = address_discovery_state.unwrap();
7965
7966 assert!(state.enabled); assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths);
7970 }
7971
7972 #[test]
7973 fn test_address_discovery_state_initialization_on_handshake() {
7974 let now = Instant::now();
7976
7977 let mut address_discovery_state = Some(AddressDiscoveryState::new(
7979 &crate::transport_parameters::AddressDiscoveryConfig::default(),
7980 now,
7981 ));
7982
7983 let peer_params = TransportParameters {
7985 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
7986 ..TransportParameters::default()
7987 };
7988
7989 if let Some(peer_config) = &peer_params.address_discovery {
7991 address_discovery_state = Some(AddressDiscoveryState::new(peer_config, now));
7993 }
7994
7995 assert!(address_discovery_state.is_some());
7997 let state = address_discovery_state.unwrap();
7998 assert!(state.enabled);
7999 assert_eq!(state.max_observation_rate, 10); assert!(!state.observe_all_paths); }
8003
8004 #[test]
8005 fn test_address_discovery_negotiation_disabled_peer() {
8006 let now = Instant::now();
8008
8009 let our_config = AddressDiscoveryConfig::SendAndReceive;
8011 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8012
8013 let peer_params = TransportParameters {
8015 address_discovery: None,
8016 ..TransportParameters::default()
8017 };
8018
8019 if peer_params.address_discovery.is_none() {
8021 if let Some(state) = &mut address_discovery_state {
8022 state.enabled = false;
8023 }
8024 }
8025
8026 let state = address_discovery_state.unwrap();
8028 assert!(!state.enabled); }
8030
8031 #[test]
8032 fn test_address_discovery_negotiation_rate_limiting() {
8033 let now = Instant::now();
8035
8036 let our_config = AddressDiscoveryConfig::SendAndReceive;
8038 let mut address_discovery_state = Some(AddressDiscoveryState::new(&our_config, now));
8039
8040 if let Some(state) = &mut address_discovery_state {
8042 state.max_observation_rate = 30;
8043 state.update_rate_limit(30.0);
8044 }
8045
8046 let peer_params = TransportParameters {
8048 address_discovery: Some(AddressDiscoveryConfig::SendAndReceive),
8049 ..TransportParameters::default()
8050 };
8051
8052 if let (Some(state), Some(_peer_config)) =
8055 (&mut address_discovery_state, &peer_params.address_discovery)
8056 {
8057 let peer_rate = 15u8;
8060 let negotiated_rate = state.max_observation_rate.min(peer_rate);
8061 state.update_rate_limit(negotiated_rate as f64);
8062 }
8063
8064 let state = address_discovery_state.unwrap();
8066 assert_eq!(state.rate_limiter.rate, 15.0); }
8068
8069 #[test]
8070 fn test_address_discovery_path_initialization() {
8071 let now = Instant::now();
8073 let config = AddressDiscoveryConfig::SendAndReceive;
8074 let mut state = AddressDiscoveryState::new(&config, now);
8075
8076 assert!(state.path_addresses.is_empty());
8078
8079 let should_send = state.should_send_observation(0, now);
8081 assert!(should_send); }
8086
8087 #[test]
8088 fn test_address_discovery_multiple_path_initialization() {
8089 let now = Instant::now();
8091 let config = AddressDiscoveryConfig::SendAndReceive;
8092 let mut state = AddressDiscoveryState::new(&config, now);
8093
8094 assert!(state.should_send_observation(0, now)); assert!(!state.should_send_observation(1, now)); assert!(!state.should_send_observation(2, now)); state.observe_all_paths = true;
8101 assert!(state.should_send_observation(1, now)); assert!(state.should_send_observation(2, now)); let config_primary_only = AddressDiscoveryConfig::SendAndReceive;
8106 let mut state_primary = AddressDiscoveryState::new(&config_primary_only, now);
8107
8108 assert!(state_primary.should_send_observation(0, now)); assert!(!state_primary.should_send_observation(1, now)); }
8111
8112 #[test]
8113 fn test_handle_observed_address_frame_valid() {
8114 let now = Instant::now();
8116 let config = AddressDiscoveryConfig::SendAndReceive;
8117 let mut state = AddressDiscoveryState::new(&config, now);
8118
8119 let observed_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8121 state.handle_observed_address(observed_addr, 0, now);
8122
8123 assert_eq!(state.observed_addresses.len(), 1);
8125 assert_eq!(state.observed_addresses[0].address, observed_addr);
8126 assert_eq!(state.observed_addresses[0].path_id, 0);
8127 assert_eq!(state.observed_addresses[0].received_at, now);
8128
8129 let path_info = state.path_addresses.get(&0).unwrap();
8131 assert_eq!(path_info.observed_address, Some(observed_addr));
8132 assert_eq!(path_info.last_observed, Some(now));
8133 assert_eq!(path_info.observation_count, 1);
8134 }
8135
8136 #[test]
8137 fn test_handle_multiple_observed_addresses() {
8138 let now = Instant::now();
8140 let config = AddressDiscoveryConfig::SendAndReceive;
8141 let mut state = AddressDiscoveryState::new(&config, now);
8142
8143 let addr1 = SocketAddr::from(([192, 168, 1, 100], 5000));
8145 let addr2 = SocketAddr::from(([10, 0, 0, 50], 6000));
8146 let addr3 = SocketAddr::from(([192, 168, 1, 100], 7000)); state.handle_observed_address(addr1, 0, now);
8149 state.handle_observed_address(addr2, 1, now);
8150 state.handle_observed_address(addr3, 0, now + Duration::from_millis(100));
8151
8152 assert_eq!(state.observed_addresses.len(), 3);
8154
8155 let path0_info = state.path_addresses.get(&0).unwrap();
8157 assert_eq!(path0_info.observed_address, Some(addr3));
8158 assert_eq!(path0_info.observation_count, 1); let path1_info = state.path_addresses.get(&1).unwrap();
8162 assert_eq!(path1_info.observed_address, Some(addr2));
8163 assert_eq!(path1_info.observation_count, 1);
8164 }
8165
8166 #[test]
8167 fn test_get_observed_address() {
8168 let now = Instant::now();
8170 let config = AddressDiscoveryConfig::SendAndReceive;
8171 let mut state = AddressDiscoveryState::new(&config, now);
8172
8173 assert_eq!(state.get_observed_address(0), None);
8175
8176 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8178 state.handle_observed_address(addr, 0, now);
8179
8180 assert_eq!(state.get_observed_address(0), Some(addr));
8182
8183 assert_eq!(state.get_observed_address(999), None);
8185 }
8186
8187 #[test]
8188 fn test_has_unnotified_changes() {
8189 let now = Instant::now();
8191 let config = AddressDiscoveryConfig::SendAndReceive;
8192 let mut state = AddressDiscoveryState::new(&config, now);
8193
8194 assert!(!state.has_unnotified_changes());
8196
8197 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8199 state.handle_observed_address(addr, 0, now);
8200 assert!(state.has_unnotified_changes());
8201
8202 if let Some(path_info) = state.path_addresses.get_mut(&0) {
8204 path_info.notified = true;
8205 }
8206 assert!(!state.has_unnotified_changes());
8207
8208 let addr2 = SocketAddr::from(([192, 168, 1, 100], 6000));
8210 state.handle_observed_address(addr2, 0, now + Duration::from_secs(1));
8211 assert!(state.has_unnotified_changes());
8212 }
8213
8214 #[test]
8215 fn test_address_discovery_disabled() {
8216 let now = Instant::now();
8218 let config = AddressDiscoveryConfig::SendAndReceive;
8219 let mut state = AddressDiscoveryState::new(&config, now);
8220
8221 state.enabled = false;
8223
8224 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8226 state.handle_observed_address(addr, 0, now);
8227
8228 assert_eq!(state.observed_addresses.len(), 0);
8230
8231 assert!(!state.should_send_observation(0, now));
8233 }
8234
8235 #[test]
8236 fn test_rate_limiting_basic() {
8237 let now = Instant::now();
8239 let config = AddressDiscoveryConfig::SendAndReceive;
8240 let mut state = AddressDiscoveryState::new(&config, now);
8241
8242 state.observe_all_paths = true;
8244 state.rate_limiter.set_rate(2); assert!(state.should_send_observation(0, now));
8248 state.record_observation_sent(0);
8250
8251 assert!(state.should_send_observation(1, now));
8253 state.record_observation_sent(1);
8254
8255 assert!(!state.should_send_observation(2, now));
8257
8258 let later = now + Duration::from_millis(500);
8260 assert!(state.should_send_observation(3, later));
8261 state.record_observation_sent(3);
8262
8263 assert!(!state.should_send_observation(4, later));
8265
8266 let _one_sec_later = now + Duration::from_secs(1);
8270 let two_sec_later = now + Duration::from_secs(2);
8274 assert!(state.should_send_observation(5, two_sec_later));
8275 state.record_observation_sent(5);
8276
8277 assert!(state.should_send_observation(6, two_sec_later));
8288 state.record_observation_sent(6);
8289
8290 assert!(
8292 !state.should_send_observation(7, two_sec_later),
8293 "Expected no tokens available"
8294 );
8295 }
8296
8297 #[test]
8298 fn test_rate_limiting_per_path() {
8299 let now = Instant::now();
8301 let config = AddressDiscoveryConfig::SendAndReceive;
8302 let mut state = AddressDiscoveryState::new(&config, now);
8303
8304 state
8306 .path_addresses
8307 .insert(0, paths::PathAddressInfo::new());
8308 state.path_addresses.get_mut(&0).unwrap().observed_address = Some(SocketAddr::new(
8309 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
8310 8080,
8311 ));
8312
8313 for _ in 0..10 {
8315 assert!(state.should_send_observation(0, now));
8316 state.record_observation_sent(0);
8317 state.path_addresses.get_mut(&0).unwrap().notified = false;
8319 }
8320
8321 assert!(!state.should_send_observation(0, now));
8323
8324 let later = now + Duration::from_millis(100);
8326 assert!(state.should_send_observation(0, later));
8327 state.record_observation_sent(0);
8328
8329 state.path_addresses.get_mut(&0).unwrap().notified = false;
8331
8332 assert!(!state.should_send_observation(0, later));
8334 }
8335
8336 #[test]
8337 fn test_rate_limiting_zero_rate() {
8338 let now = Instant::now();
8340 let config = AddressDiscoveryConfig::SendAndReceive;
8341 let mut state = AddressDiscoveryState::new(&config, now);
8342
8343 state.rate_limiter.set_rate(0);
8345 state.rate_limiter.tokens = 0.0;
8346 state.rate_limiter.max_tokens = 0.0;
8347
8348 assert!(!state.should_send_observation(0, now));
8350 assert!(!state.should_send_observation(0, now + Duration::from_secs(10)));
8351 assert!(!state.should_send_observation(0, now + Duration::from_secs(100)));
8352 }
8353
8354 #[test]
8355 fn test_rate_limiting_update() {
8356 let now = Instant::now();
8358 let config = AddressDiscoveryConfig::SendAndReceive;
8359 let mut state = AddressDiscoveryState::new(&config, now);
8360
8361 state.observe_all_paths = true;
8363
8364 for i in 0..12 {
8366 state
8367 .path_addresses
8368 .insert(i, paths::PathAddressInfo::new());
8369 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
8370 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8371 8080,
8372 ));
8373 }
8374
8375 for i in 0..10 {
8378 assert!(state.should_send_observation(i, now));
8379 state.record_observation_sent(i);
8380 }
8381 assert!(!state.should_send_observation(10, now));
8383
8384 state.update_rate_limit(20.0);
8386
8387 let later = now + Duration::from_millis(50);
8390 assert!(state.should_send_observation(10, later));
8391 state.record_observation_sent(10);
8392
8393 let later2 = now + Duration::from_millis(100);
8395 assert!(state.should_send_observation(11, later2));
8396 }
8397
8398 #[test]
8399 fn test_rate_limiting_burst() {
8400 let now = Instant::now();
8402 let config = AddressDiscoveryConfig::SendAndReceive;
8403 let mut state = AddressDiscoveryState::new(&config, now);
8404
8405 for _ in 0..10 {
8407 assert!(state.should_send_observation(0, now));
8408 state.record_observation_sent(0);
8409 }
8410
8411 assert!(!state.should_send_observation(0, now));
8413
8414 let later = now + Duration::from_millis(100);
8416 assert!(state.should_send_observation(0, later));
8417 state.record_observation_sent(0);
8418 assert!(!state.should_send_observation(0, later));
8419 }
8420
8421 #[test]
8422 fn test_connection_rate_limiting_with_check_observations() {
8423 let now = Instant::now();
8425 let config = AddressDiscoveryConfig::SendAndReceive;
8426 let mut state = AddressDiscoveryState::new(&config, now);
8427
8428 let mut path_info = paths::PathAddressInfo::new();
8430 path_info.update_observed_address(
8431 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
8432 now,
8433 );
8434 state.path_addresses.insert(0, path_info);
8435
8436 let frame1 =
8438 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8439 assert!(frame1.is_some());
8440 state.record_observation_sent(0);
8441
8442 if let Some(info) = state.path_addresses.get_mut(&0) {
8444 info.notified = false;
8445 }
8446
8447 for _ in 1..10 {
8449 if let Some(info) = state.path_addresses.get_mut(&0) {
8451 info.notified = false;
8452 }
8453 let frame =
8454 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8455 assert!(frame.is_some());
8456 state.record_observation_sent(0);
8457 }
8458
8459 if let Some(info) = state.path_addresses.get_mut(&0) {
8461 info.notified = false;
8462 }
8463 let frame3 =
8464 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8465 assert!(frame3.is_none()); let later = now + Duration::from_millis(100);
8469 state.rate_limiter.update_tokens(later); if let Some(info) = state.path_addresses.get_mut(&0) {
8473 info.notified = false;
8474 }
8475
8476 let frame4 =
8477 state.queue_observed_address_frame(0, SocketAddr::from(([192, 168, 1, 1], 8080)));
8478 assert!(frame4.is_some()); }
8480
8481 #[test]
8482 fn test_queue_observed_address_frame() {
8483 let now = Instant::now();
8485 let config = AddressDiscoveryConfig::SendAndReceive;
8486 let mut state = AddressDiscoveryState::new(&config, now);
8487
8488 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8489
8490 let frame = state.queue_observed_address_frame(0, addr);
8492 assert!(frame.is_some());
8493 assert_eq!(frame.unwrap().address, addr);
8494
8495 state.record_observation_sent(0);
8497
8498 for i in 0..9 {
8500 if let Some(info) = state.path_addresses.get_mut(&0) {
8502 info.notified = false;
8503 }
8504
8505 let frame = state.queue_observed_address_frame(0, addr);
8506 assert!(frame.is_some(), "Frame {} should be allowed", i + 2);
8507 state.record_observation_sent(0);
8508 }
8509
8510 if let Some(info) = state.path_addresses.get_mut(&0) {
8512 info.notified = false;
8513 }
8514
8515 let frame = state.queue_observed_address_frame(0, addr);
8517 assert!(frame.is_none(), "11th frame should be rate limited");
8518 }
8519
8520 #[test]
8521 fn test_multi_path_basic() {
8522 let now = Instant::now();
8524 let config = AddressDiscoveryConfig::SendAndReceive;
8525 let mut state = AddressDiscoveryState::new(&config, now);
8526
8527 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8528 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8529 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8530
8531 state.handle_observed_address(addr1, 0, now);
8533 state.handle_observed_address(addr2, 1, now);
8534 state.handle_observed_address(addr3, 2, now);
8535
8536 assert_eq!(state.get_observed_address(0), Some(addr1));
8538 assert_eq!(state.get_observed_address(1), Some(addr2));
8539 assert_eq!(state.get_observed_address(2), Some(addr3));
8540
8541 assert!(state.has_unnotified_changes());
8543
8544 assert_eq!(state.observed_addresses.len(), 3);
8546 }
8547
8548 #[test]
8549 fn test_multi_path_observe_primary_only() {
8550 let now = Instant::now();
8552 let config = AddressDiscoveryConfig::SendAndReceive;
8553 let mut state = AddressDiscoveryState::new(&config, now);
8554
8555 assert!(state.should_send_observation(0, now));
8557 state.record_observation_sent(0);
8558
8559 assert!(!state.should_send_observation(1, now));
8561 assert!(!state.should_send_observation(2, now));
8562
8563 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8565 assert!(state.queue_observed_address_frame(0, addr).is_some());
8566 assert!(state.queue_observed_address_frame(1, addr).is_none());
8567 assert!(state.queue_observed_address_frame(2, addr).is_none());
8568 }
8569
8570 #[test]
8571 fn test_multi_path_rate_limiting() {
8572 let now = Instant::now();
8574 let config = AddressDiscoveryConfig::SendAndReceive;
8575 let mut state = AddressDiscoveryState::new(&config, now);
8576
8577 state.observe_all_paths = true;
8579
8580 for i in 0..21 {
8582 state
8583 .path_addresses
8584 .insert(i, paths::PathAddressInfo::new());
8585 state.path_addresses.get_mut(&i).unwrap().observed_address = Some(SocketAddr::new(
8586 IpAddr::V4(Ipv4Addr::new(192, 168, 1, (i + 1) as u8)),
8587 8080,
8588 ));
8589 }
8590
8591 for i in 0..10 {
8593 assert!(state.should_send_observation(i, now));
8594 state.record_observation_sent(i);
8595 }
8596
8597 assert!(!state.should_send_observation(10, now));
8599
8600 state.path_addresses.get_mut(&0).unwrap().notified = false;
8602 assert!(!state.should_send_observation(0, now)); let later = now + Duration::from_secs(1);
8606 for i in 10..20 {
8607 assert!(state.should_send_observation(i, later));
8608 state.record_observation_sent(i);
8609 }
8610 assert!(!state.should_send_observation(20, later));
8612 }
8613
8614 #[test]
8615 fn test_multi_path_address_changes() {
8616 let now = Instant::now();
8618 let config = AddressDiscoveryConfig::SendAndReceive;
8619 let mut state = AddressDiscoveryState::new(&config, now);
8620
8621 let addr1a = SocketAddr::from(([192, 168, 1, 1], 5000));
8622 let addr1b = SocketAddr::from(([192, 168, 1, 2], 5000));
8623 let addr2a = SocketAddr::from(([10, 0, 0, 1], 6000));
8624 let addr2b = SocketAddr::from(([10, 0, 0, 2], 6000));
8625
8626 state.handle_observed_address(addr1a, 0, now);
8628 state.handle_observed_address(addr2a, 1, now);
8629
8630 state.record_observation_sent(0);
8632 state.record_observation_sent(1);
8633 assert!(!state.has_unnotified_changes());
8634
8635 state.handle_observed_address(addr1b, 0, now + Duration::from_secs(1));
8637 assert!(state.has_unnotified_changes());
8638
8639 assert_eq!(state.get_observed_address(0), Some(addr1b));
8641 assert_eq!(state.get_observed_address(1), Some(addr2a));
8642
8643 state.record_observation_sent(0);
8645 assert!(!state.has_unnotified_changes());
8646
8647 state.handle_observed_address(addr2b, 1, now + Duration::from_secs(2));
8649 assert!(state.has_unnotified_changes());
8650 }
8651
8652 #[test]
8653 fn test_multi_path_migration() {
8654 let now = Instant::now();
8656 let config = AddressDiscoveryConfig::SendAndReceive;
8657 let mut state = AddressDiscoveryState::new(&config, now);
8658
8659 let addr_old = SocketAddr::from(([192, 168, 1, 1], 5000));
8660 let addr_new = SocketAddr::from(([10, 0, 0, 1], 6000));
8661
8662 state.handle_observed_address(addr_old, 0, now);
8664 assert_eq!(state.get_observed_address(0), Some(addr_old));
8665
8666 state.handle_observed_address(addr_new, 1, now + Duration::from_secs(1));
8668
8669 assert_eq!(state.get_observed_address(0), Some(addr_old));
8671 assert_eq!(state.get_observed_address(1), Some(addr_new));
8672
8673 assert_eq!(state.path_addresses.len(), 2);
8676 }
8677
8678 #[test]
8679 fn test_check_for_address_observations_multi_path() {
8680 let now = Instant::now();
8682 let config = AddressDiscoveryConfig::SendAndReceive;
8683 let mut state = AddressDiscoveryState::new(&config, now);
8684
8685 state.observe_all_paths = true;
8687
8688 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8690 let addr2 = SocketAddr::from(([10, 0, 0, 1], 6000));
8691 let addr3 = SocketAddr::from(([172, 16, 0, 1], 7000));
8692
8693 state.handle_observed_address(addr1, 0, now);
8694 state.handle_observed_address(addr2, 1, now);
8695 state.handle_observed_address(addr3, 2, now);
8696
8697 let frames = state.check_for_address_observations(0, true, now);
8699
8700 assert_eq!(frames.len(), 3);
8702
8703 let frame_addrs: Vec<_> = frames.iter().map(|f| f.address).collect();
8705 assert!(frame_addrs.contains(&addr1), "addr1 should be in frames");
8706 assert!(frame_addrs.contains(&addr2), "addr2 should be in frames");
8707 assert!(frame_addrs.contains(&addr3), "addr3 should be in frames");
8708
8709 assert!(!state.has_unnotified_changes());
8711 }
8712
8713 #[test]
8714 fn test_multi_path_with_peer_not_supporting() {
8715 let now = Instant::now();
8717 let config = AddressDiscoveryConfig::SendAndReceive;
8718 let mut state = AddressDiscoveryState::new(&config, now);
8719
8720 state.handle_observed_address(SocketAddr::from(([192, 168, 1, 1], 5000)), 0, now);
8722 state.handle_observed_address(SocketAddr::from(([10, 0, 0, 1], 6000)), 1, now);
8723
8724 let frames = state.check_for_address_observations(0, false, now);
8726 assert_eq!(frames.len(), 0);
8727
8728 assert!(state.has_unnotified_changes());
8730 }
8731
8732 #[test]
8734 fn test_bootstrap_node_aggressive_observation_mode() {
8735 let config = AddressDiscoveryConfig::SendAndReceive;
8737 let now = Instant::now();
8738 let mut state = AddressDiscoveryState::new(&config, now);
8739
8740 assert!(!state.is_bootstrap_mode());
8742
8743 state.set_bootstrap_mode(true);
8745 assert!(state.is_bootstrap_mode());
8746
8747 assert!(state.should_observe_path(0)); assert!(state.should_observe_path(1)); assert!(state.should_observe_path(2));
8751
8752 let bootstrap_rate = state.get_effective_rate_limit();
8754 assert!(bootstrap_rate > 10.0); }
8756
8757 #[test]
8758 fn test_bootstrap_node_immediate_observation() {
8759 let config = AddressDiscoveryConfig::SendAndReceive;
8761 let now = Instant::now();
8762 let mut state = AddressDiscoveryState::new(&config, now);
8763 state.set_bootstrap_mode(true);
8764
8765 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
8767 state.handle_observed_address(addr, 0, now);
8768
8769 assert!(state.should_send_observation_immediately(true));
8771
8772 assert!(state.should_send_observation(0, now));
8774
8775 let frame = state.queue_observed_address_frame(0, addr);
8777 assert!(frame.is_some());
8778 }
8779
8780 #[test]
8781 fn test_bootstrap_node_multiple_path_observations() {
8782 let config = AddressDiscoveryConfig::SendAndReceive;
8784 let now = Instant::now();
8785 let mut state = AddressDiscoveryState::new(&config, now);
8786 state.set_bootstrap_mode(true);
8787
8788 let addrs = vec![
8790 (0, SocketAddr::from(([192, 168, 1, 1], 5000))),
8791 (1, SocketAddr::from(([10, 0, 0, 1], 6000))),
8792 (2, SocketAddr::from(([172, 16, 0, 1], 7000))),
8793 ];
8794
8795 for (path_id, addr) in &addrs {
8796 state.handle_observed_address(*addr, *path_id, now);
8797 }
8798
8799 let frames = state.check_for_address_observations(0, true, now);
8801 assert_eq!(frames.len(), 3);
8802
8803 for (_, addr) in &addrs {
8805 assert!(frames.iter().any(|f| f.address == *addr));
8806 }
8807 }
8808
8809 #[test]
8810 fn test_bootstrap_node_rate_limit_override() {
8811 let config = AddressDiscoveryConfig::SendAndReceive;
8813 let now = Instant::now();
8814 let mut state = AddressDiscoveryState::new(&config, now);
8815 state.set_bootstrap_mode(true);
8816
8817 let addr = SocketAddr::from(([192, 168, 1, 1], 5000));
8819
8820 for i in 0..10 {
8822 state.handle_observed_address(addr, i, now);
8823 let can_send = state.should_send_observation(i, now);
8824 assert!(can_send, "Bootstrap node should send observation {i}");
8825 state.record_observation_sent(i);
8826 }
8827 }
8828
8829 #[test]
8830 fn test_bootstrap_node_configuration() {
8831 let config = AddressDiscoveryConfig::SendAndReceive;
8833 let mut state = AddressDiscoveryState::new(&config, Instant::now());
8834
8835 state.set_bootstrap_mode(true);
8837
8838 assert!(state.bootstrap_mode);
8840 assert!(state.enabled);
8841
8842 let effective_rate = state.get_effective_rate_limit();
8844 assert!(effective_rate > state.max_observation_rate as f64);
8845 }
8846
8847 #[test]
8848 fn test_bootstrap_node_persistent_observation() {
8849 let config = AddressDiscoveryConfig::SendAndReceive;
8851 let mut now = Instant::now();
8852 let mut state = AddressDiscoveryState::new(&config, now);
8853 state.set_bootstrap_mode(true);
8854
8855 let addr1 = SocketAddr::from(([192, 168, 1, 1], 5000));
8856 let addr2 = SocketAddr::from(([192, 168, 1, 2], 5000));
8857
8858 state.handle_observed_address(addr1, 0, now);
8860 assert!(state.should_send_observation(0, now));
8861 state.record_observation_sent(0);
8862
8863 now += Duration::from_secs(60);
8865 state.handle_observed_address(addr2, 0, now);
8866
8867 assert!(state.should_send_observation(0, now));
8869 }
8870
8871 #[test]
8872 fn test_bootstrap_node_multi_peer_support() {
8873 let config = AddressDiscoveryConfig::SendAndReceive;
8876 let now = Instant::now();
8877 let mut state = AddressDiscoveryState::new(&config, now);
8878 state.set_bootstrap_mode(true);
8879
8880 let peer_addresses = vec![
8882 (0, SocketAddr::from(([192, 168, 1, 1], 5000))), (1, SocketAddr::from(([10, 0, 0, 1], 6000))), (2, SocketAddr::from(([172, 16, 0, 1], 7000))), (3, SocketAddr::from(([192, 168, 2, 1], 8000))), ];
8887
8888 for (path_id, addr) in &peer_addresses {
8890 state.handle_observed_address(*addr, *path_id, now);
8891 }
8892
8893 let frames = state.check_for_address_observations(0, true, now);
8895 assert_eq!(frames.len(), peer_addresses.len());
8896
8897 for (_, addr) in &peer_addresses {
8899 assert!(frames.iter().any(|f| f.address == *addr));
8900 }
8901 }
8902
8903 mod address_discovery_tests {
8905 include!("address_discovery_tests.rs");
8906 }
8907}