1use std::{
2 cmp,
3 collections::VecDeque,
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12use rand::{Rng, SeedableRng, rngs::StdRng};
15use thiserror::Error;
16use tracing::{debug, error, info, trace, trace_span, warn};
17
18use crate::{
19 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
20 MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
21 TransportErrorCode, VarInt,
22 cid_generator::ConnectionIdGenerator,
23 cid_queue::CidQueue,
24 coding::BufMutExt,
25 config::{ServerConfig, TransportConfig},
26 crypto::{self, KeyPair, Keys, PacketKey},
27 frame::{self, Close, Datagram, FrameStruct, NewToken},
28 packet::{
29 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
30 PacketNumber, PartialDecode, SpaceId,
31 },
32 range_set::ArrayRangeSet,
33 shared::{
34 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
35 EndpointEvent, EndpointEventInner,
36 },
37 token::{ResetToken, Token, TokenPayload},
38 transport_parameters::TransportParameters,
39};
40
41mod ack_frequency;
42use ack_frequency::AckFrequencyState;
43
44pub(crate) mod nat_traversal;
45pub(crate) use nat_traversal::{NatTraversalRole, NatTraversalError, CoordinationPhase};
46use nat_traversal::NatTraversalState;
47
48mod assembler;
49pub use assembler::Chunk;
50
51mod cid_state;
52use cid_state::CidState;
53
54mod datagrams;
55use datagrams::DatagramState;
56pub use datagrams::{Datagrams, SendDatagramError};
57
58mod mtud;
59mod pacing;
60
61mod packet_builder;
62use packet_builder::PacketBuilder;
63
64mod packet_crypto;
65use packet_crypto::{PrevCrypto, ZeroRttCrypto};
66
67mod paths;
68pub use paths::RttEstimator;
69use paths::{PathData, PathResponses, NatTraversalChallenges};
70
71mod send_buffer;
72
73mod spaces;
74#[cfg(fuzzing)]
75pub use spaces::Retransmits;
76#[cfg(not(fuzzing))]
77use spaces::Retransmits;
78use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
79
80mod stats;
81pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
82
83mod streams;
84#[cfg(fuzzing)]
85pub use streams::StreamsState;
86#[cfg(not(fuzzing))]
87use streams::StreamsState;
88pub use streams::{
89 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
90 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
91};
92
93mod timer;
94use crate::congestion::Controller;
95use timer::{Timer, TimerTable};
96
97pub struct Connection {
137 endpoint_config: Arc<EndpointConfig>,
138 config: Arc<TransportConfig>,
139 rng: StdRng,
140 crypto: Box<dyn crypto::Session>,
141 handshake_cid: ConnectionId,
143 rem_handshake_cid: ConnectionId,
145 local_ip: Option<IpAddr>,
148 path: PathData,
149 allow_mtud: bool,
151 prev_path: Option<(ConnectionId, PathData)>,
152 state: State,
153 side: ConnectionSide,
154 zero_rtt_enabled: bool,
156 zero_rtt_crypto: Option<ZeroRttCrypto>,
158 key_phase: bool,
159 key_phase_size: u64,
161 peer_params: TransportParameters,
163 orig_rem_cid: ConnectionId,
165 initial_dst_cid: ConnectionId,
167 retry_src_cid: Option<ConnectionId>,
170 lost_packets: u64,
172 events: VecDeque<Event>,
173 endpoint_events: VecDeque<EndpointEventInner>,
174 spin_enabled: bool,
176 spin: bool,
178 spaces: [PacketSpace; 3],
180 highest_space: SpaceId,
182 prev_crypto: Option<PrevCrypto>,
184 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
189 accepted_0rtt: bool,
190 permit_idle_reset: bool,
192 idle_timeout: Option<Duration>,
194 timers: TimerTable,
195 authentication_failures: u64,
197 error: Option<ConnectionError>,
199 packet_number_filter: PacketNumberFilter,
201
202 path_responses: PathResponses,
207 nat_traversal_challenges: NatTraversalChallenges,
209 close: bool,
210
211 ack_frequency: AckFrequencyState,
215
216 pto_count: u32,
221
222 receiving_ecn: bool,
227 total_authed_packets: u64,
229 app_limited: bool,
232
233 streams: StreamsState,
234 rem_cids: CidQueue,
236 local_cid_state: CidState,
238 datagrams: DatagramState,
240 stats: ConnectionStats,
242 version: u32,
244
245 nat_traversal: Option<NatTraversalState>,
247
248 #[cfg(feature = "__qlog")]
250 qlog_streamer: Option<Box<dyn std::io::Write + Send + Sync>>,
251}
252
253impl Connection {
254 pub(crate) fn new(
255 endpoint_config: Arc<EndpointConfig>,
256 config: Arc<TransportConfig>,
257 init_cid: ConnectionId,
258 loc_cid: ConnectionId,
259 rem_cid: ConnectionId,
260 remote: SocketAddr,
261 local_ip: Option<IpAddr>,
262 crypto: Box<dyn crypto::Session>,
263 cid_gen: &dyn ConnectionIdGenerator,
264 now: Instant,
265 version: u32,
266 allow_mtud: bool,
267 rng_seed: [u8; 32],
268 side_args: SideArgs,
269 ) -> Self {
270 let pref_addr_cid = side_args.pref_addr_cid();
271 let path_validated = side_args.path_validated();
272 let connection_side = ConnectionSide::from(side_args);
273 let side = connection_side.side();
274 let initial_space = PacketSpace {
275 crypto: Some(crypto.initial_keys(&init_cid, side)),
276 ..PacketSpace::new(now)
277 };
278 let state = State::Handshake(state::Handshake {
279 rem_cid_set: side.is_server(),
280 expected_token: Bytes::new(),
281 client_hello: None,
282 });
283 let mut rng = StdRng::from_seed(rng_seed);
284 let mut this = Self {
285 endpoint_config,
286 crypto,
287 handshake_cid: loc_cid,
288 rem_handshake_cid: rem_cid,
289 local_cid_state: CidState::new(
290 cid_gen.cid_len(),
291 cid_gen.cid_lifetime(),
292 now,
293 if pref_addr_cid.is_some() { 2 } else { 1 },
294 ),
295 path: PathData::new(remote, allow_mtud, None, now, &config),
296 allow_mtud,
297 local_ip,
298 prev_path: None,
299 state,
300 side: connection_side,
301 zero_rtt_enabled: false,
302 zero_rtt_crypto: None,
303 key_phase: false,
304 key_phase_size: rng.gen_range(10..1000),
311 peer_params: TransportParameters::default(),
312 orig_rem_cid: rem_cid,
313 initial_dst_cid: init_cid,
314 retry_src_cid: None,
315 lost_packets: 0,
316 events: VecDeque::new(),
317 endpoint_events: VecDeque::new(),
318 spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
319 spin: false,
320 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
321 highest_space: SpaceId::Initial,
322 prev_crypto: None,
323 next_crypto: None,
324 accepted_0rtt: false,
325 permit_idle_reset: true,
326 idle_timeout: match config.max_idle_timeout {
327 None | Some(VarInt(0)) => None,
328 Some(dur) => Some(Duration::from_millis(dur.0)),
329 },
330 timers: TimerTable::default(),
331 authentication_failures: 0,
332 error: None,
333 #[cfg(test)]
334 packet_number_filter: match config.deterministic_packet_numbers {
335 false => PacketNumberFilter::new(&mut rng),
336 true => PacketNumberFilter::disabled(),
337 },
338 #[cfg(not(test))]
339 packet_number_filter: PacketNumberFilter::new(&mut rng),
340
341 path_responses: PathResponses::default(),
342 nat_traversal_challenges: NatTraversalChallenges::default(),
343 close: false,
344
345 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
346 &TransportParameters::default(),
347 )),
348
349 pto_count: 0,
350
351 app_limited: false,
352 receiving_ecn: false,
353 total_authed_packets: 0,
354
355 streams: StreamsState::new(
356 side,
357 config.max_concurrent_uni_streams,
358 config.max_concurrent_bidi_streams,
359 config.send_window,
360 config.receive_window,
361 config.stream_receive_window,
362 ),
363 datagrams: DatagramState::default(),
364 config,
365 rem_cids: CidQueue::new(rem_cid),
366 rng,
367 stats: ConnectionStats::default(),
368 version,
369 nat_traversal: None, #[cfg(feature = "__qlog")]
372 qlog_streamer: None,
373 };
374 if path_validated {
375 this.on_path_validated();
376 }
377 if side.is_client() {
378 this.write_crypto();
380 this.init_0rtt();
381 }
382 this
383 }
384
385 #[cfg(feature = "__qlog")]
387 pub fn set_qlog(
388 &mut self,
389 writer: Box<dyn std::io::Write + Send + Sync>,
390 _title: Option<String>,
391 _description: Option<String>,
392 _now: Instant,
393 ) {
394 self.qlog_streamer = Some(writer);
395 }
396
397 #[cfg(feature = "__qlog")]
399 fn emit_qlog_recovery_metrics(&mut self, _now: Instant) {
400 }
403
404 #[must_use]
412 pub fn poll_timeout(&mut self) -> Option<Instant> {
413 let mut next_timeout = self.timers.next_timeout();
414
415 if let Some(nat_state) = &self.nat_traversal {
417 if let Some(nat_timeout) = nat_state.get_next_timeout(Instant::now()) {
418 self.timers.set(Timer::NatTraversal, nat_timeout);
420 next_timeout = Some(next_timeout.map_or(nat_timeout, |t| t.min(nat_timeout)));
421 }
422 }
423
424 next_timeout
425 }
426
427 #[must_use]
433 pub fn poll(&mut self) -> Option<Event> {
434 if let Some(x) = self.events.pop_front() {
435 return Some(x);
436 }
437
438 if let Some(event) = self.streams.poll() {
439 return Some(Event::Stream(event));
440 }
441
442 if let Some(err) = self.error.take() {
443 return Some(Event::ConnectionLost { reason: err });
444 }
445
446 None
447 }
448
449 #[must_use]
451 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
452 self.endpoint_events.pop_front().map(EndpointEvent)
453 }
454
455 #[must_use]
457 pub fn streams(&mut self) -> Streams<'_> {
458 Streams {
459 state: &mut self.streams,
460 conn_state: &self.state,
461 }
462 }
463
464 #[must_use]
466 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
467 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
468 RecvStream {
469 id,
470 state: &mut self.streams,
471 pending: &mut self.spaces[SpaceId::Data].pending,
472 }
473 }
474
475 #[must_use]
477 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
478 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
479 SendStream {
480 id,
481 state: &mut self.streams,
482 pending: &mut self.spaces[SpaceId::Data].pending,
483 conn_state: &self.state,
484 }
485 }
486
487 #[must_use]
497 pub fn poll_transmit(
498 &mut self,
499 now: Instant,
500 max_datagrams: usize,
501 buf: &mut Vec<u8>,
502 ) -> Option<Transmit> {
503 assert!(max_datagrams != 0);
504 let max_datagrams = match self.config.enable_segmentation_offload {
505 false => 1,
506 true => max_datagrams,
507 };
508
509 let mut num_datagrams = 0;
510 let mut datagram_start = 0;
513 let mut segment_size = usize::from(self.path.current_mtu());
514
515 if let Some(nat_traversal) = &mut self.nat_traversal {
517 if nat_traversal.check_coordination_timeout(now) {
518 trace!("NAT traversal coordination timed out, may retry");
519 }
520 }
521
522 if let Some(challenge) = self.send_nat_traversal_challenge(now, buf) {
524 return Some(challenge);
525 }
526
527 if let Some(challenge) = self.send_path_challenge(now, buf) {
528 return Some(challenge);
529 }
530
531 for space in SpaceId::iter() {
533 let request_immediate_ack =
534 space == SpaceId::Data && self.peer_supports_ack_frequency();
535 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
536 }
537
538 let close = match self.state {
540 State::Drained => {
541 self.app_limited = true;
542 return None;
543 }
544 State::Draining | State::Closed(_) => {
545 if !self.close {
548 self.app_limited = true;
549 return None;
550 }
551 true
552 }
553 _ => false,
554 };
555
556 if let Some(config) = &self.config.ack_frequency_config {
558 self.spaces[SpaceId::Data].pending.ack_frequency = self
559 .ack_frequency
560 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
561 && self.highest_space == SpaceId::Data
562 && self.peer_supports_ack_frequency();
563 }
564
565 let mut buf_capacity = 0;
569
570 let mut coalesce = true;
571 let mut builder_storage: Option<PacketBuilder> = None;
572 let mut sent_frames = None;
573 let mut pad_datagram = false;
574 let mut pad_datagram_to_mtu = false;
575 let mut congestion_blocked = false;
576
577 let mut space_idx = 0;
579 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
580 while space_idx < spaces.len() {
583 let space_id = spaces[space_idx];
584 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
591 let frame_space_1rtt =
592 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
593
594 let can_send = self.space_can_send(space_id, frame_space_1rtt);
596 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
597 space_idx += 1;
598 continue;
599 }
600
601 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
602 || self.spaces[space_id].ping_pending
603 || self.spaces[space_id].immediate_ack_pending;
604 if space_id == SpaceId::Data {
605 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
606 }
607
608 pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
609
610 let buf_end = if let Some(builder) = &builder_storage {
614 buf.len().max(builder.min_size) + builder.tag_len
615 } else {
616 buf.len()
617 };
618
619 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
620 crypto.packet.local.tag_len()
621 } else if space_id == SpaceId::Data {
622 self.zero_rtt_crypto.as_ref().expect(
623 "sending packets in the application data space requires known 0-RTT or 1-RTT keys",
624 ).packet.tag_len()
625 } else {
626 unreachable!("tried to send {:?} packet without keys", space_id)
627 };
628 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
629 if num_datagrams >= max_datagrams {
633 break;
635 }
636
637 if self
644 .path
645 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
646 {
647 trace!("blocked by anti-amplification");
648 break;
649 }
650
651 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
654 let untracked_bytes = if let Some(builder) = &builder_storage {
656 buf_capacity - builder.partial_encode.start
657 } else {
658 0
659 } as u64;
660 debug_assert!(untracked_bytes <= segment_size as u64);
661
662 let bytes_to_send = segment_size as u64 + untracked_bytes;
663 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
664 space_idx += 1;
665 congestion_blocked = true;
666 trace!("blocked by congestion control");
669 continue;
670 }
671
672 let smoothed_rtt = self.path.rtt.get();
674 if let Some(delay) = self.path.pacing.delay(
675 smoothed_rtt,
676 bytes_to_send,
677 self.path.current_mtu(),
678 self.path.congestion.window(),
679 now,
680 ) {
681 self.timers.set(Timer::Pacing, delay);
682 congestion_blocked = true;
683 trace!("blocked by pacing");
686 break;
687 }
688 }
689
690 if let Some(mut builder) = builder_storage.take() {
692 if pad_datagram {
693 builder.pad_to(MIN_INITIAL_SIZE);
694 }
695
696 if num_datagrams > 1 || pad_datagram_to_mtu {
697 const MAX_PADDING: usize = 16;
710 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
711 - datagram_start
712 + builder.tag_len;
713 if (packet_len_unpadded + MAX_PADDING < segment_size
714 && !pad_datagram_to_mtu)
715 || datagram_start + segment_size > buf_capacity
716 {
717 trace!(
718 "GSO truncated by demand for {} padding bytes or loss probe",
719 segment_size - packet_len_unpadded
720 );
721 builder_storage = Some(builder);
722 break;
723 }
724
725 builder.pad_to(segment_size as u16);
728 }
729
730 builder.finish_and_track(now, self, sent_frames.take(), buf);
731
732 if num_datagrams == 1 {
733 segment_size = buf.len();
740 buf_capacity = buf.len();
743
744 if space_id == SpaceId::Data {
751 let frame_space_1rtt =
752 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
753 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
754 break;
755 }
756 }
757 }
758 }
759
760 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
762 0 => segment_size,
763 _ => {
764 self.spaces[space_id].loss_probes -= 1;
765 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
769 }
770 };
771 buf_capacity += next_datagram_size_limit;
772 if buf.capacity() < buf_capacity {
773 buf.reserve(max_datagrams * segment_size);
782 }
783 num_datagrams += 1;
784 coalesce = true;
785 pad_datagram = false;
786 datagram_start = buf.len();
787
788 debug_assert_eq!(
789 datagram_start % segment_size,
790 0,
791 "datagrams in a GSO batch must be aligned to the segment size"
792 );
793 } else {
794 if let Some(builder) = builder_storage.take() {
798 builder.finish_and_track(now, self, sent_frames.take(), buf);
799 }
800 }
801
802 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
803
804 if self.spaces[SpaceId::Initial].crypto.is_some()
809 && space_id == SpaceId::Handshake
810 && self.side.is_client()
811 {
812 self.discard_space(now, SpaceId::Initial);
815 }
816 if let Some(ref mut prev) = self.prev_crypto {
817 prev.update_unacked = false;
818 }
819
820 debug_assert!(
821 builder_storage.is_none() && sent_frames.is_none(),
822 "Previous packet must have been finished"
823 );
824
825 let builder = builder_storage.insert(PacketBuilder::new(
826 now,
827 space_id,
828 self.rem_cids.active(),
829 buf,
830 buf_capacity,
831 datagram_start,
832 ack_eliciting,
833 self,
834 )?);
835 coalesce = coalesce && !builder.short_header;
836
837 pad_datagram |=
839 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
840
841 if close {
842 trace!("sending CONNECTION_CLOSE");
843 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
848 Self::populate_acks(
849 now,
850 self.receiving_ecn,
851 &mut SentFrames::default(),
852 &mut self.spaces[space_id],
853 buf,
854 &mut self.stats,
855 );
856 }
857
858 debug_assert!(
862 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
863 "ACKs should leave space for ConnectionClose"
864 );
865 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
866 let max_frame_size = builder.max_size - buf.len();
867 match self.state {
868 State::Closed(state::Closed { ref reason }) => {
869 if space_id == SpaceId::Data || reason.is_transport_layer() {
870 reason.encode(buf, max_frame_size)
871 } else {
872 frame::ConnectionClose {
873 error_code: TransportErrorCode::APPLICATION_ERROR,
874 frame_type: None,
875 reason: Bytes::new(),
876 }
877 .encode(buf, max_frame_size)
878 }
879 }
880 State::Draining => frame::ConnectionClose {
881 error_code: TransportErrorCode::NO_ERROR,
882 frame_type: None,
883 reason: Bytes::new(),
884 }
885 .encode(buf, max_frame_size),
886 _ => unreachable!(
887 "tried to make a close packet when the connection wasn't closed"
888 ),
889 }
890 }
891 if space_id == self.highest_space {
892 self.close = false;
894 break;
896 } else {
897 space_idx += 1;
901 continue;
902 }
903 }
904
905 if space_id == SpaceId::Data && num_datagrams == 1 {
908 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
909 let mut builder = builder_storage.take().unwrap();
912 trace!("PATH_RESPONSE {:08x} (off-path)", token);
913 buf.write(frame::FrameType::PATH_RESPONSE);
914 buf.write(token);
915 self.stats.frame_tx.path_response += 1;
916 builder.pad_to(MIN_INITIAL_SIZE);
917 builder.finish_and_track(
918 now,
919 self,
920 Some(SentFrames {
921 non_retransmits: true,
922 ..SentFrames::default()
923 }),
924 buf,
925 );
926 self.stats.udp_tx.on_sent(1, buf.len());
927 return Some(Transmit {
928 destination: remote,
929 size: buf.len(),
930 ecn: None,
931 segment_size: None,
932 src_ip: self.local_ip,
933 });
934 }
935 }
936
937 let sent =
938 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
939
940 debug_assert!(
947 !(sent.is_ack_only(&self.streams)
948 && !can_send.acks
949 && can_send.other
950 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
951 && self.datagrams.outgoing.is_empty()),
952 "SendableFrames was {can_send:?}, but only ACKs have been written"
953 );
954 pad_datagram |= sent.requires_padding;
955
956 if sent.largest_acked.is_some() {
957 self.spaces[space_id].pending_acks.acks_sent();
958 self.timers.stop(Timer::MaxAckDelay);
959 }
960
961 sent_frames = Some(sent);
963
964 }
967
968 if let Some(mut builder) = builder_storage {
970 if pad_datagram {
971 builder.pad_to(MIN_INITIAL_SIZE);
972 }
973
974 if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
980 builder.pad_to(segment_size as u16);
981 }
982
983 let last_packet_number = builder.exact_number;
984 builder.finish_and_track(now, self, sent_frames, buf);
985 self.path
986 .congestion
987 .on_sent(now, buf.len() as u64, last_packet_number);
988
989 #[cfg(feature = "__qlog")]
990 self.emit_qlog_recovery_metrics(now);
991 }
992
993 self.app_limited = buf.is_empty() && !congestion_blocked;
994
995 if buf.is_empty() && self.state.is_established() {
997 let space_id = SpaceId::Data;
998 let probe_size = self
999 .path
1000 .mtud
1001 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
1002
1003 let buf_capacity = probe_size as usize;
1004 buf.reserve(buf_capacity);
1005
1006 let mut builder = PacketBuilder::new(
1007 now,
1008 space_id,
1009 self.rem_cids.active(),
1010 buf,
1011 buf_capacity,
1012 0,
1013 true,
1014 self,
1015 )?;
1016
1017 buf.write(frame::FrameType::PING);
1019 self.stats.frame_tx.ping += 1;
1020
1021 if self.peer_supports_ack_frequency() {
1023 buf.write(frame::FrameType::IMMEDIATE_ACK);
1024 self.stats.frame_tx.immediate_ack += 1;
1025 }
1026
1027 builder.pad_to(probe_size);
1028 let sent_frames = SentFrames {
1029 non_retransmits: true,
1030 ..Default::default()
1031 };
1032 builder.finish_and_track(now, self, Some(sent_frames), buf);
1033
1034 self.stats.path.sent_plpmtud_probes += 1;
1035 num_datagrams = 1;
1036
1037 trace!(?probe_size, "writing MTUD probe");
1038 }
1039
1040 if buf.is_empty() {
1041 return None;
1042 }
1043
1044 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1045 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1046
1047 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
1048
1049 Some(Transmit {
1050 destination: self.path.remote,
1051 size: buf.len(),
1052 ecn: if self.path.sending_ecn {
1053 Some(EcnCodepoint::Ect0)
1054 } else {
1055 None
1056 },
1057 segment_size: match num_datagrams {
1058 1 => None,
1059 _ => Some(segment_size),
1060 },
1061 src_ip: self.local_ip,
1062 })
1063 }
1064
1065 fn send_coordination_request(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1067 let should_send = self.nat_traversal.as_ref()?.should_send_punch_request();
1069 if !should_send {
1070 return None;
1071 }
1072
1073 let (round, target_addrs, coordinator_addr) = {
1074 let nat_traversal = self.nat_traversal.as_ref()?;
1075 let coord = nat_traversal.coordination.as_ref()?;
1076 let addrs: Vec<_> = coord.punch_targets.iter().map(|t| t.remote_addr).collect();
1077 (coord.round, addrs, self.path.remote) };
1079
1080 if target_addrs.is_empty() {
1081 return None;
1082 }
1083
1084 debug_assert_eq!(
1085 self.highest_space,
1086 SpaceId::Data,
1087 "PUNCH_ME_NOW queued without 1-RTT keys"
1088 );
1089
1090 buf.reserve(MIN_INITIAL_SIZE as usize);
1091 let buf_capacity = buf.capacity();
1092
1093 let mut builder = PacketBuilder::new(
1094 now,
1095 SpaceId::Data,
1096 self.rem_cids.active(),
1097 buf,
1098 buf_capacity,
1099 0,
1100 false,
1101 self,
1102 )?;
1103
1104 trace!("sending PUNCH_ME_NOW round {} with {} targets", round, target_addrs.len());
1105
1106 buf.write(frame::FrameType::PUNCH_ME_NOW);
1108 buf.write(round);
1109 buf.write(target_addrs.len() as u8);
1110 for addr in target_addrs {
1111 match addr {
1112 SocketAddr::V4(v4) => {
1113 buf.write(4u8); buf.write(u32::from(*v4.ip()));
1115 buf.write(v4.port());
1116 }
1117 SocketAddr::V6(v6) => {
1118 buf.write(6u8); buf.write(*v6.ip());
1120 buf.write(v6.port());
1121 }
1122 }
1123 }
1124
1125 self.stats.frame_tx.ping += 1; builder.pad_to(MIN_INITIAL_SIZE);
1128 builder.finish_and_track(now, self, None, buf);
1129
1130 if let Some(nat_traversal) = &mut self.nat_traversal {
1132 nat_traversal.mark_punch_request_sent();
1133 }
1134
1135 Some(Transmit {
1136 destination: coordinator_addr,
1137 size: buf.len(),
1138 ecn: if self.path.sending_ecn {
1139 Some(EcnCodepoint::Ect0)
1140 } else {
1141 None
1142 },
1143 segment_size: None,
1144 src_ip: self.local_ip,
1145 })
1146 }
1147
1148 fn send_coordinated_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1150 if let Some(nat_traversal) = &mut self.nat_traversal {
1152 if nat_traversal.should_start_punching(now) {
1153 nat_traversal.start_punching_phase(now);
1154 }
1155 }
1156
1157 let (target_addr, challenge) = {
1159 let nat_traversal = self.nat_traversal.as_ref()?;
1160 match nat_traversal.get_coordination_phase() {
1161 Some(CoordinationPhase::Punching) => {
1162 let targets = nat_traversal.get_punch_targets_from_coordination()?;
1163 if targets.is_empty() {
1164 return None;
1165 }
1166 let target = &targets[0];
1168 (target.remote_addr, target.challenge)
1169 }
1170 _ => return None,
1171 }
1172 };
1173
1174 debug_assert_eq!(
1175 self.highest_space,
1176 SpaceId::Data,
1177 "PATH_CHALLENGE queued without 1-RTT keys"
1178 );
1179
1180 buf.reserve(MIN_INITIAL_SIZE as usize);
1181 let buf_capacity = buf.capacity();
1182
1183 let mut builder = PacketBuilder::new(
1184 now,
1185 SpaceId::Data,
1186 self.rem_cids.active(),
1187 buf,
1188 buf_capacity,
1189 0,
1190 false,
1191 self,
1192 )?;
1193
1194 trace!("sending coordinated PATH_CHALLENGE {:08x} to {}", challenge, target_addr);
1195 buf.write(frame::FrameType::PATH_CHALLENGE);
1196 buf.write(challenge);
1197 self.stats.frame_tx.path_challenge += 1;
1198
1199 builder.pad_to(MIN_INITIAL_SIZE);
1200 builder.finish_and_track(now, self, None, buf);
1201
1202 if let Some(nat_traversal) = &mut self.nat_traversal {
1204 nat_traversal.mark_coordination_validating();
1205 }
1206
1207 Some(Transmit {
1208 destination: target_addr,
1209 size: buf.len(),
1210 ecn: if self.path.sending_ecn {
1211 Some(EcnCodepoint::Ect0)
1212 } else {
1213 None
1214 },
1215 segment_size: None,
1216 src_ip: self.local_ip,
1217 })
1218 }
1219
1220 fn send_nat_traversal_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1222 if let Some(request) = self.send_coordination_request(now, buf) {
1224 return Some(request);
1225 }
1226
1227 if let Some(punch) = self.send_coordinated_path_challenge(now, buf) {
1229 return Some(punch);
1230 }
1231
1232 let (remote_addr, remote_sequence) = {
1234 let nat_traversal = self.nat_traversal.as_ref()?;
1235 let candidates = nat_traversal.get_validation_candidates();
1236 if candidates.is_empty() {
1237 return None;
1238 }
1239 let (sequence, candidate) = candidates[0];
1241 (candidate.address, sequence)
1242 };
1243
1244 let challenge = self.rng.gen::<u64>();
1245
1246 if let Err(e) = self.nat_traversal.as_mut()?.start_validation(remote_sequence, challenge, now) {
1248 warn!("Failed to start NAT traversal validation: {}", e);
1249 return None;
1250 }
1251
1252 debug_assert_eq!(
1253 self.highest_space,
1254 SpaceId::Data,
1255 "PATH_CHALLENGE queued without 1-RTT keys"
1256 );
1257
1258 buf.reserve(MIN_INITIAL_SIZE as usize);
1259 let buf_capacity = buf.capacity();
1260
1261 let mut builder = PacketBuilder::new(
1263 now,
1264 SpaceId::Data,
1265 self.rem_cids.active(),
1266 buf,
1267 buf_capacity,
1268 0,
1269 false,
1270 self,
1271 )?;
1272
1273 trace!("sending PATH_CHALLENGE {:08x} to NAT candidate {}", challenge, remote_addr);
1274 buf.write(frame::FrameType::PATH_CHALLENGE);
1275 buf.write(challenge);
1276 self.stats.frame_tx.path_challenge += 1;
1277
1278 builder.pad_to(MIN_INITIAL_SIZE);
1280
1281 builder.finish_and_track(now, self, None, buf);
1282
1283 Some(Transmit {
1284 destination: remote_addr,
1285 size: buf.len(),
1286 ecn: if self.path.sending_ecn {
1287 Some(EcnCodepoint::Ect0)
1288 } else {
1289 None
1290 },
1291 segment_size: None,
1292 src_ip: self.local_ip,
1293 })
1294 }
1295
1296 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1298 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1299 if !prev_path.challenge_pending {
1300 return None;
1301 }
1302 prev_path.challenge_pending = false;
1303 let token = prev_path
1304 .challenge
1305 .expect("previous path challenge pending without token");
1306 let destination = prev_path.remote;
1307 debug_assert_eq!(
1308 self.highest_space,
1309 SpaceId::Data,
1310 "PATH_CHALLENGE queued without 1-RTT keys"
1311 );
1312 buf.reserve(MIN_INITIAL_SIZE as usize);
1313
1314 let buf_capacity = buf.capacity();
1315
1316 let mut builder = PacketBuilder::new(
1322 now,
1323 SpaceId::Data,
1324 *prev_cid,
1325 buf,
1326 buf_capacity,
1327 0,
1328 false,
1329 self,
1330 )?;
1331 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1332 buf.write(frame::FrameType::PATH_CHALLENGE);
1333 buf.write(token);
1334 self.stats.frame_tx.path_challenge += 1;
1335
1336 builder.pad_to(MIN_INITIAL_SIZE);
1341
1342 builder.finish(self, buf);
1343 self.stats.udp_tx.on_sent(1, buf.len());
1344
1345 Some(Transmit {
1346 destination,
1347 size: buf.len(),
1348 ecn: None,
1349 segment_size: None,
1350 src_ip: self.local_ip,
1351 })
1352 }
1353
1354 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1356 if self.spaces[space_id].crypto.is_none()
1357 && (space_id != SpaceId::Data
1358 || self.zero_rtt_crypto.is_none()
1359 || self.side.is_server())
1360 {
1361 return SendableFrames::empty();
1363 }
1364 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1365 if space_id == SpaceId::Data {
1366 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1367 }
1368 can_send
1369 }
1370
1371 pub fn handle_event(&mut self, event: ConnectionEvent) {
1377 use ConnectionEventInner::*;
1378 match event.0 {
1379 Datagram(DatagramConnectionEvent {
1380 now,
1381 remote,
1382 ecn,
1383 first_decode,
1384 remaining,
1385 }) => {
1386 if remote != self.path.remote && !self.side.remote_may_migrate() {
1390 trace!("discarding packet from unrecognized peer {}", remote);
1391 return;
1392 }
1393
1394 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1395
1396 self.stats.udp_rx.datagrams += 1;
1397 self.stats.udp_rx.bytes += first_decode.len() as u64;
1398 let data_len = first_decode.len();
1399
1400 self.handle_decode(now, remote, ecn, first_decode);
1401 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1406
1407 if let Some(data) = remaining {
1408 self.stats.udp_rx.bytes += data.len() as u64;
1409 self.handle_coalesced(now, remote, ecn, data);
1410 }
1411
1412 #[cfg(feature = "__qlog")]
1413 self.emit_qlog_recovery_metrics(now);
1414
1415 if was_anti_amplification_blocked {
1416 self.set_loss_detection_timer(now);
1420 }
1421 }
1422 NewIdentifiers(ids, now) => {
1423 self.local_cid_state.new_cids(&ids, now);
1424 ids.into_iter().rev().for_each(|frame| {
1425 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1426 });
1427 if self
1429 .timers
1430 .get(Timer::PushNewCid)
1431 .map_or(true, |x| x <= now)
1432 {
1433 self.reset_cid_retirement();
1434 }
1435 }
1436 }
1437 }
1438
1439 pub fn handle_timeout(&mut self, now: Instant) {
1449 for &timer in &Timer::VALUES {
1450 if !self.timers.is_expired(timer, now) {
1451 continue;
1452 }
1453 self.timers.stop(timer);
1454 trace!(timer = ?timer, "timeout");
1455 match timer {
1456 Timer::Close => {
1457 self.state = State::Drained;
1458 self.endpoint_events.push_back(EndpointEventInner::Drained);
1459 }
1460 Timer::Idle => {
1461 self.kill(ConnectionError::TimedOut);
1462 }
1463 Timer::KeepAlive => {
1464 trace!("sending keep-alive");
1465 self.ping();
1466 }
1467 Timer::LossDetection => {
1468 self.on_loss_detection_timeout(now);
1469
1470 #[cfg(feature = "__qlog")]
1471 self.emit_qlog_recovery_metrics(now);
1472 }
1473 Timer::KeyDiscard => {
1474 self.zero_rtt_crypto = None;
1475 self.prev_crypto = None;
1476 }
1477 Timer::PathValidation => {
1478 debug!("path validation failed");
1479 if let Some((_, prev)) = self.prev_path.take() {
1480 self.path = prev;
1481 }
1482 self.path.challenge = None;
1483 self.path.challenge_pending = false;
1484 }
1485 Timer::Pacing => trace!("pacing timer expired"),
1486 Timer::NatTraversal => {
1487 self.handle_nat_traversal_timeout(now);
1488 }
1489 Timer::PushNewCid => {
1490 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1492 if !self.state.is_closed() {
1493 trace!(
1494 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1495 self.local_cid_state.retire_prior_to()
1496 );
1497 self.endpoint_events
1498 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1499 }
1500 }
1501 Timer::MaxAckDelay => {
1502 trace!("max ack delay reached");
1503 self.spaces[SpaceId::Data]
1505 .pending_acks
1506 .on_max_ack_delay_timeout()
1507 }
1508 }
1509 }
1510 }
1511
1512 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1524 self.close_inner(
1525 now,
1526 Close::Application(frame::ApplicationClose { error_code, reason }),
1527 )
1528 }
1529
1530 fn close_inner(&mut self, now: Instant, reason: Close) {
1531 let was_closed = self.state.is_closed();
1532 if !was_closed {
1533 self.close_common();
1534 self.set_close_timer(now);
1535 self.close = true;
1536 self.state = State::Closed(state::Closed { reason });
1537 }
1538 }
1539
1540 pub fn datagrams(&mut self) -> Datagrams<'_> {
1542 Datagrams { conn: self }
1543 }
1544
1545 pub fn stats(&self) -> ConnectionStats {
1547 let mut stats = self.stats;
1548 stats.path.rtt = self.path.rtt.get();
1549 stats.path.cwnd = self.path.congestion.window();
1550 stats.path.current_mtu = self.path.mtud.current_mtu();
1551
1552 stats
1553 }
1554
1555 pub fn ping(&mut self) {
1559 self.spaces[self.highest_space].ping_pending = true;
1560 }
1561
1562 pub fn force_key_update(&mut self) {
1566 if !self.state.is_established() {
1567 debug!("ignoring forced key update in illegal state");
1568 return;
1569 }
1570 if self.prev_crypto.is_some() {
1571 debug!("ignoring redundant forced key update");
1574 return;
1575 }
1576 self.update_keys(None, false);
1577 }
1578
1579 #[doc(hidden)]
1581 #[deprecated]
1582 pub fn initiate_key_update(&mut self) {
1583 self.force_key_update();
1584 }
1585
1586 pub fn crypto_session(&self) -> &dyn crypto::Session {
1588 &*self.crypto
1589 }
1590
1591 pub fn is_handshaking(&self) -> bool {
1596 self.state.is_handshake()
1597 }
1598
1599 pub fn is_closed(&self) -> bool {
1607 self.state.is_closed()
1608 }
1609
1610 pub fn is_drained(&self) -> bool {
1615 self.state.is_drained()
1616 }
1617
1618 pub fn accepted_0rtt(&self) -> bool {
1622 self.accepted_0rtt
1623 }
1624
1625 pub fn has_0rtt(&self) -> bool {
1627 self.zero_rtt_enabled
1628 }
1629
1630 pub fn has_pending_retransmits(&self) -> bool {
1632 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1633 }
1634
1635 pub fn side(&self) -> Side {
1637 self.side.side()
1638 }
1639
1640 pub fn remote_address(&self) -> SocketAddr {
1642 self.path.remote
1643 }
1644
1645 pub fn local_ip(&self) -> Option<IpAddr> {
1655 self.local_ip
1656 }
1657
1658 pub fn rtt(&self) -> Duration {
1660 self.path.rtt.get()
1661 }
1662
1663 pub fn congestion_state(&self) -> &dyn Controller {
1665 self.path.congestion.as_ref()
1666 }
1667
1668 pub fn path_changed(&mut self, now: Instant) {
1679 self.path.reset(now, &self.config);
1680 }
1681
1682 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1687 self.streams.set_max_concurrent(dir, count);
1688 let pending = &mut self.spaces[SpaceId::Data].pending;
1691 self.streams.queue_max_stream_id(pending);
1692 }
1693
1694 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1700 self.streams.max_concurrent(dir)
1701 }
1702
1703 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1705 if self.streams.set_receive_window(receive_window) {
1706 self.spaces[SpaceId::Data].pending.max_data = true;
1707 }
1708 }
1709
1710 fn on_ack_received(
1711 &mut self,
1712 now: Instant,
1713 space: SpaceId,
1714 ack: frame::Ack,
1715 ) -> Result<(), TransportError> {
1716 if ack.largest >= self.spaces[space].next_packet_number {
1717 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1718 }
1719 let new_largest = {
1720 let space = &mut self.spaces[space];
1721 if space
1722 .largest_acked_packet
1723 .map_or(true, |pn| ack.largest > pn)
1724 {
1725 space.largest_acked_packet = Some(ack.largest);
1726 if let Some(info) = space.sent_packets.get(&ack.largest) {
1727 space.largest_acked_packet_sent = info.time_sent;
1731 }
1732 true
1733 } else {
1734 false
1735 }
1736 };
1737
1738 let mut newly_acked = ArrayRangeSet::new();
1740 for range in ack.iter() {
1741 self.packet_number_filter.check_ack(space, range.clone())?;
1742 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1743 newly_acked.insert_one(pn);
1744 }
1745 }
1746
1747 if newly_acked.is_empty() {
1748 return Ok(());
1749 }
1750
1751 let mut ack_eliciting_acked = false;
1752 for packet in newly_acked.elts() {
1753 if let Some(info) = self.spaces[space].take(packet) {
1754 if let Some(acked) = info.largest_acked {
1755 self.spaces[space].pending_acks.subtract_below(acked);
1761 }
1762 ack_eliciting_acked |= info.ack_eliciting;
1763
1764 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1766 if mtu_updated {
1767 self.path
1768 .congestion
1769 .on_mtu_update(self.path.mtud.current_mtu());
1770 }
1771
1772 self.ack_frequency.on_acked(packet);
1774
1775 self.on_packet_acked(now, packet, info);
1776 }
1777 }
1778
1779 self.path.congestion.on_end_acks(
1780 now,
1781 self.path.in_flight.bytes,
1782 self.app_limited,
1783 self.spaces[space].largest_acked_packet,
1784 );
1785
1786 if new_largest && ack_eliciting_acked {
1787 let ack_delay = if space != SpaceId::Data {
1788 Duration::from_micros(0)
1789 } else {
1790 cmp::min(
1791 self.ack_frequency.peer_max_ack_delay,
1792 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1793 )
1794 };
1795 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1796 self.path.rtt.update(ack_delay, rtt);
1797 if self.path.first_packet_after_rtt_sample.is_none() {
1798 self.path.first_packet_after_rtt_sample =
1799 Some((space, self.spaces[space].next_packet_number));
1800 }
1801 }
1802
1803 self.detect_lost_packets(now, space, true);
1805
1806 if self.peer_completed_address_validation() {
1807 self.pto_count = 0;
1808 }
1809
1810 if self.path.sending_ecn {
1812 if let Some(ecn) = ack.ecn {
1813 if new_largest {
1818 let sent = self.spaces[space].largest_acked_packet_sent;
1819 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1820 }
1821 } else {
1822 debug!("ECN not acknowledged by peer");
1824 self.path.sending_ecn = false;
1825 }
1826 }
1827
1828 self.set_loss_detection_timer(now);
1829 Ok(())
1830 }
1831
1832 fn process_ecn(
1834 &mut self,
1835 now: Instant,
1836 space: SpaceId,
1837 newly_acked: u64,
1838 ecn: frame::EcnCounts,
1839 largest_sent_time: Instant,
1840 ) {
1841 match self.spaces[space].detect_ecn(newly_acked, ecn) {
1842 Err(e) => {
1843 debug!("halting ECN due to verification failure: {}", e);
1844 self.path.sending_ecn = false;
1845 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
1848 }
1849 Ok(false) => {}
1850 Ok(true) => {
1851 self.stats.path.congestion_events += 1;
1852 self.path
1853 .congestion
1854 .on_congestion_event(now, largest_sent_time, false, 0);
1855 }
1856 }
1857 }
1858
1859 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
1862 self.remove_in_flight(pn, &info);
1863 if info.ack_eliciting && self.path.challenge.is_none() {
1864 self.path.congestion.on_ack(
1867 now,
1868 info.time_sent,
1869 info.size.into(),
1870 self.app_limited,
1871 &self.path.rtt,
1872 );
1873 }
1874
1875 if let Some(retransmits) = info.retransmits.get() {
1877 for (id, _) in retransmits.reset_stream.iter() {
1878 self.streams.reset_acked(*id);
1879 }
1880 }
1881
1882 for frame in info.stream_frames {
1883 self.streams.received_ack_of(frame);
1884 }
1885 }
1886
1887 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
1888 let start = if self.zero_rtt_crypto.is_some() {
1889 now
1890 } else {
1891 self.prev_crypto
1892 .as_ref()
1893 .expect("no previous keys")
1894 .end_packet
1895 .as_ref()
1896 .expect("update not acknowledged yet")
1897 .1
1898 };
1899 self.timers
1900 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
1901 }
1902
1903 fn on_loss_detection_timeout(&mut self, now: Instant) {
1904 if let Some((_, pn_space)) = self.loss_time_and_space() {
1905 self.detect_lost_packets(now, pn_space, false);
1907 self.set_loss_detection_timer(now);
1908 return;
1909 }
1910
1911 let (_, space) = match self.pto_time_and_space(now) {
1912 Some(x) => x,
1913 None => {
1914 error!("PTO expired while unset");
1915 return;
1916 }
1917 };
1918 trace!(
1919 in_flight = self.path.in_flight.bytes,
1920 count = self.pto_count,
1921 ?space,
1922 "PTO fired"
1923 );
1924
1925 let count = match self.path.in_flight.ack_eliciting {
1926 0 => {
1929 debug_assert!(!self.peer_completed_address_validation());
1930 1
1931 }
1932 _ => 2,
1934 };
1935 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
1936 self.pto_count = self.pto_count.saturating_add(1);
1937 self.set_loss_detection_timer(now);
1938 }
1939
1940 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
1941 let mut lost_packets = Vec::<u64>::new();
1942 let mut lost_mtu_probe = None;
1943 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
1944 let rtt = self.path.rtt.conservative();
1945 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
1946
1947 let lost_send_time = now.checked_sub(loss_delay).unwrap();
1949 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
1950 let packet_threshold = self.config.packet_threshold as u64;
1951 let mut size_of_lost_packets = 0u64;
1952
1953 let congestion_period =
1957 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
1958 let mut persistent_congestion_start: Option<Instant> = None;
1959 let mut prev_packet = None;
1960 let mut in_persistent_congestion = false;
1961
1962 let space = &mut self.spaces[pn_space];
1963 space.loss_time = None;
1964
1965 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
1966 if prev_packet != Some(packet.wrapping_sub(1)) {
1967 persistent_congestion_start = None;
1969 }
1970
1971 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
1972 {
1973 if Some(packet) == in_flight_mtu_probe {
1974 lost_mtu_probe = in_flight_mtu_probe;
1977 } else {
1978 lost_packets.push(packet);
1979 size_of_lost_packets += info.size as u64;
1980 if info.ack_eliciting && due_to_ack {
1981 match persistent_congestion_start {
1982 Some(start) if info.time_sent - start > congestion_period => {
1985 in_persistent_congestion = true;
1986 }
1987 None if self
1989 .path
1990 .first_packet_after_rtt_sample
1991 .is_some_and(|x| x < (pn_space, packet)) =>
1992 {
1993 persistent_congestion_start = Some(info.time_sent);
1994 }
1995 _ => {}
1996 }
1997 }
1998 }
1999 } else {
2000 let next_loss_time = info.time_sent + loss_delay;
2001 space.loss_time = Some(
2002 space
2003 .loss_time
2004 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
2005 );
2006 persistent_congestion_start = None;
2007 }
2008
2009 prev_packet = Some(packet);
2010 }
2011
2012 if let Some(largest_lost) = lost_packets.last().cloned() {
2014 let old_bytes_in_flight = self.path.in_flight.bytes;
2015 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
2016 self.lost_packets += lost_packets.len() as u64;
2017 self.stats.path.lost_packets += lost_packets.len() as u64;
2018 self.stats.path.lost_bytes += size_of_lost_packets;
2019 trace!(
2020 "packets lost: {:?}, bytes lost: {}",
2021 lost_packets, size_of_lost_packets
2022 );
2023
2024 for &packet in &lost_packets {
2025 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2027 for frame in info.stream_frames {
2028 self.streams.retransmit(frame);
2029 }
2030 self.spaces[pn_space].pending |= info.retransmits;
2031 self.path.mtud.on_non_probe_lost(packet, info.size);
2032 }
2033
2034 if self.path.mtud.black_hole_detected(now) {
2035 self.stats.path.black_holes_detected += 1;
2036 self.path
2037 .congestion
2038 .on_mtu_update(self.path.mtud.current_mtu());
2039 if let Some(max_datagram_size) = self.datagrams().max_size() {
2040 self.datagrams.drop_oversized(max_datagram_size);
2041 }
2042 }
2043
2044 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
2046
2047 if lost_ack_eliciting {
2048 self.stats.path.congestion_events += 1;
2049 self.path.congestion.on_congestion_event(
2050 now,
2051 largest_lost_sent,
2052 in_persistent_congestion,
2053 size_of_lost_packets,
2054 );
2055 }
2056 }
2057
2058 if let Some(packet) = lost_mtu_probe {
2060 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2062 self.path.mtud.on_probe_lost();
2063 self.stats.path.lost_plpmtud_probes += 1;
2064 }
2065 }
2066
2067 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
2068 SpaceId::iter()
2069 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
2070 .min_by_key(|&(time, _)| time)
2071 }
2072
2073 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
2074 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
2075 let mut duration = self.path.rtt.pto_base() * backoff;
2076
2077 if self.path.in_flight.ack_eliciting == 0 {
2078 debug_assert!(!self.peer_completed_address_validation());
2079 let space = match self.highest_space {
2080 SpaceId::Handshake => SpaceId::Handshake,
2081 _ => SpaceId::Initial,
2082 };
2083 return Some((now + duration, space));
2084 }
2085
2086 let mut result = None;
2087 for space in SpaceId::iter() {
2088 if self.spaces[space].in_flight == 0 {
2089 continue;
2090 }
2091 if space == SpaceId::Data {
2092 if self.is_handshaking() {
2094 return result;
2095 }
2096 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2098 }
2099 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
2100 Some(time) => time,
2101 None => continue,
2102 };
2103 let pto = last_ack_eliciting + duration;
2104 if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
2105 result = Some((pto, space));
2106 }
2107 }
2108 result
2109 }
2110
2111 fn peer_completed_address_validation(&self) -> bool {
2112 if self.side.is_server() || self.state.is_closed() {
2113 return true;
2114 }
2115 self.spaces[SpaceId::Handshake]
2118 .largest_acked_packet
2119 .is_some()
2120 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
2121 || (self.spaces[SpaceId::Data].crypto.is_some()
2122 && self.spaces[SpaceId::Handshake].crypto.is_none())
2123 }
2124
2125 fn set_loss_detection_timer(&mut self, now: Instant) {
2126 if self.state.is_closed() {
2127 return;
2131 }
2132
2133 if let Some((loss_time, _)) = self.loss_time_and_space() {
2134 self.timers.set(Timer::LossDetection, loss_time);
2136 return;
2137 }
2138
2139 if self.path.anti_amplification_blocked(1) {
2140 self.timers.stop(Timer::LossDetection);
2142 return;
2143 }
2144
2145 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
2146 self.timers.stop(Timer::LossDetection);
2149 return;
2150 }
2151
2152 if let Some((timeout, _)) = self.pto_time_and_space(now) {
2155 self.timers.set(Timer::LossDetection, timeout);
2156 } else {
2157 self.timers.stop(Timer::LossDetection);
2158 }
2159 }
2160
2161 fn pto(&self, space: SpaceId) -> Duration {
2163 let max_ack_delay = match space {
2164 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
2165 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
2166 };
2167 self.path.rtt.pto_base() + max_ack_delay
2168 }
2169
2170 fn on_packet_authenticated(
2171 &mut self,
2172 now: Instant,
2173 space_id: SpaceId,
2174 ecn: Option<EcnCodepoint>,
2175 packet: Option<u64>,
2176 spin: bool,
2177 is_1rtt: bool,
2178 ) {
2179 self.total_authed_packets += 1;
2180 self.reset_keep_alive(now);
2181 self.reset_idle_timeout(now, space_id);
2182 self.permit_idle_reset = true;
2183 self.receiving_ecn |= ecn.is_some();
2184 if let Some(x) = ecn {
2185 let space = &mut self.spaces[space_id];
2186 space.ecn_counters += x;
2187
2188 if x.is_ce() {
2189 space.pending_acks.set_immediate_ack_required();
2190 }
2191 }
2192
2193 let packet = match packet {
2194 Some(x) => x,
2195 None => return,
2196 };
2197 if self.side.is_server() {
2198 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
2199 self.discard_space(now, SpaceId::Initial);
2201 }
2202 if self.zero_rtt_crypto.is_some() && is_1rtt {
2203 self.set_key_discard_timer(now, space_id)
2205 }
2206 }
2207 let space = &mut self.spaces[space_id];
2208 space.pending_acks.insert_one(packet, now);
2209 if packet >= space.rx_packet {
2210 space.rx_packet = packet;
2211 self.spin = self.side.is_client() ^ spin;
2213 }
2214 }
2215
2216 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
2217 let timeout = match self.idle_timeout {
2218 None => return,
2219 Some(dur) => dur,
2220 };
2221 if self.state.is_closed() {
2222 self.timers.stop(Timer::Idle);
2223 return;
2224 }
2225 let dt = cmp::max(timeout, 3 * self.pto(space));
2226 self.timers.set(Timer::Idle, now + dt);
2227 }
2228
2229 fn reset_keep_alive(&mut self, now: Instant) {
2230 let interval = match self.config.keep_alive_interval {
2231 Some(x) if self.state.is_established() => x,
2232 _ => return,
2233 };
2234 self.timers.set(Timer::KeepAlive, now + interval);
2235 }
2236
2237 fn reset_cid_retirement(&mut self) {
2238 if let Some(t) = self.local_cid_state.next_timeout() {
2239 self.timers.set(Timer::PushNewCid, t);
2240 }
2241 }
2242
2243 pub(crate) fn handle_first_packet(
2248 &mut self,
2249 now: Instant,
2250 remote: SocketAddr,
2251 ecn: Option<EcnCodepoint>,
2252 packet_number: u64,
2253 packet: InitialPacket,
2254 remaining: Option<BytesMut>,
2255 ) -> Result<(), ConnectionError> {
2256 let span = trace_span!("first recv");
2257 let _guard = span.enter();
2258 debug_assert!(self.side.is_server());
2259 let len = packet.header_data.len() + packet.payload.len();
2260 self.path.total_recvd = len as u64;
2261
2262 match self.state {
2263 State::Handshake(ref mut state) => {
2264 state.expected_token = packet.header.token.clone();
2265 }
2266 _ => unreachable!("first packet must be delivered in Handshake state"),
2267 }
2268
2269 self.on_packet_authenticated(
2270 now,
2271 SpaceId::Initial,
2272 ecn,
2273 Some(packet_number),
2274 false,
2275 false,
2276 );
2277
2278 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2279 if let Some(data) = remaining {
2280 self.handle_coalesced(now, remote, ecn, data);
2281 }
2282
2283 #[cfg(feature = "__qlog")]
2284 self.emit_qlog_recovery_metrics(now);
2285
2286 Ok(())
2287 }
2288
2289 fn init_0rtt(&mut self) {
2290 let (header, packet) = match self.crypto.early_crypto() {
2291 Some(x) => x,
2292 None => return,
2293 };
2294 if self.side.is_client() {
2295 match self.crypto.transport_parameters() {
2296 Ok(params) => {
2297 let params = params
2298 .expect("crypto layer didn't supply transport parameters with ticket");
2299 let params = TransportParameters {
2301 initial_src_cid: None,
2302 original_dst_cid: None,
2303 preferred_address: None,
2304 retry_src_cid: None,
2305 stateless_reset_token: None,
2306 min_ack_delay: None,
2307 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2308 max_ack_delay: TransportParameters::default().max_ack_delay,
2309 ..params
2310 };
2311 self.set_peer_params(params);
2312 }
2313 Err(e) => {
2314 error!("session ticket has malformed transport parameters: {}", e);
2315 return;
2316 }
2317 }
2318 }
2319 trace!("0-RTT enabled");
2320 self.zero_rtt_enabled = true;
2321 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2322 }
2323
2324 fn read_crypto(
2325 &mut self,
2326 space: SpaceId,
2327 crypto: &frame::Crypto,
2328 payload_len: usize,
2329 ) -> Result<(), TransportError> {
2330 let expected = if !self.state.is_handshake() {
2331 SpaceId::Data
2332 } else if self.highest_space == SpaceId::Initial {
2333 SpaceId::Initial
2334 } else {
2335 SpaceId::Handshake
2338 };
2339 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2343
2344 let end = crypto.offset + crypto.data.len() as u64;
2345 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2346 warn!(
2347 "received new {:?} CRYPTO data when expecting {:?}",
2348 space, expected
2349 );
2350 return Err(TransportError::PROTOCOL_VIOLATION(
2351 "new data at unexpected encryption level",
2352 ));
2353 }
2354
2355 let space = &mut self.spaces[space];
2356 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2357 if max > self.config.crypto_buffer_size as u64 {
2358 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2359 }
2360
2361 space
2362 .crypto_stream
2363 .insert(crypto.offset, crypto.data.clone(), payload_len);
2364 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2365 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2366 if self.crypto.read_handshake(&chunk.bytes)? {
2367 self.events.push_back(Event::HandshakeDataReady);
2368 }
2369 }
2370
2371 Ok(())
2372 }
2373
2374 fn write_crypto(&mut self) {
2375 loop {
2376 let space = self.highest_space;
2377 let mut outgoing = Vec::new();
2378 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2379 match space {
2380 SpaceId::Initial => {
2381 self.upgrade_crypto(SpaceId::Handshake, crypto);
2382 }
2383 SpaceId::Handshake => {
2384 self.upgrade_crypto(SpaceId::Data, crypto);
2385 }
2386 _ => unreachable!("got updated secrets during 1-RTT"),
2387 }
2388 }
2389 if outgoing.is_empty() {
2390 if space == self.highest_space {
2391 break;
2392 } else {
2393 continue;
2395 }
2396 }
2397 let offset = self.spaces[space].crypto_offset;
2398 let outgoing = Bytes::from(outgoing);
2399 if let State::Handshake(ref mut state) = self.state {
2400 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2401 state.client_hello = Some(outgoing.clone());
2402 }
2403 }
2404 self.spaces[space].crypto_offset += outgoing.len() as u64;
2405 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2406 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2407 offset,
2408 data: outgoing,
2409 });
2410 }
2411 }
2412
2413 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2415 debug_assert!(
2416 self.spaces[space].crypto.is_none(),
2417 "already reached packet space {space:?}"
2418 );
2419 trace!("{:?} keys ready", space);
2420 if space == SpaceId::Data {
2421 self.next_crypto = Some(
2423 self.crypto
2424 .next_1rtt_keys()
2425 .expect("handshake should be complete"),
2426 );
2427 }
2428
2429 self.spaces[space].crypto = Some(crypto);
2430 debug_assert!(space as usize > self.highest_space as usize);
2431 self.highest_space = space;
2432 if space == SpaceId::Data && self.side.is_client() {
2433 self.zero_rtt_crypto = None;
2435 }
2436 }
2437
2438 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2439 debug_assert!(space_id != SpaceId::Data);
2440 trace!("discarding {:?} keys", space_id);
2441 if space_id == SpaceId::Initial {
2442 if let ConnectionSide::Client { token, .. } = &mut self.side {
2444 *token = Bytes::new();
2445 }
2446 }
2447 let space = &mut self.spaces[space_id];
2448 space.crypto = None;
2449 space.time_of_last_ack_eliciting_packet = None;
2450 space.loss_time = None;
2451 space.in_flight = 0;
2452 let sent_packets = mem::take(&mut space.sent_packets);
2453 for (pn, packet) in sent_packets.into_iter() {
2454 self.remove_in_flight(pn, &packet);
2455 }
2456 self.set_loss_detection_timer(now)
2457 }
2458
2459 fn handle_coalesced(
2460 &mut self,
2461 now: Instant,
2462 remote: SocketAddr,
2463 ecn: Option<EcnCodepoint>,
2464 data: BytesMut,
2465 ) {
2466 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2467 let mut remaining = Some(data);
2468 while let Some(data) = remaining {
2469 match PartialDecode::new(
2470 data,
2471 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2472 &[self.version],
2473 self.endpoint_config.grease_quic_bit,
2474 ) {
2475 Ok((partial_decode, rest)) => {
2476 remaining = rest;
2477 self.handle_decode(now, remote, ecn, partial_decode);
2478 }
2479 Err(e) => {
2480 trace!("malformed header: {}", e);
2481 return;
2482 }
2483 }
2484 }
2485 }
2486
2487 fn handle_decode(
2488 &mut self,
2489 now: Instant,
2490 remote: SocketAddr,
2491 ecn: Option<EcnCodepoint>,
2492 partial_decode: PartialDecode,
2493 ) {
2494 if let Some(decoded) = packet_crypto::unprotect_header(
2495 partial_decode,
2496 &self.spaces,
2497 self.zero_rtt_crypto.as_ref(),
2498 self.peer_params.stateless_reset_token,
2499 ) {
2500 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2501 }
2502 }
2503
2504 fn handle_packet(
2505 &mut self,
2506 now: Instant,
2507 remote: SocketAddr,
2508 ecn: Option<EcnCodepoint>,
2509 packet: Option<Packet>,
2510 stateless_reset: bool,
2511 ) {
2512 self.stats.udp_rx.ios += 1;
2513 if let Some(ref packet) = packet {
2514 trace!(
2515 "got {:?} packet ({} bytes) from {} using id {}",
2516 packet.header.space(),
2517 packet.payload.len() + packet.header_data.len(),
2518 remote,
2519 packet.header.dst_cid(),
2520 );
2521 }
2522
2523 if self.is_handshaking() && remote != self.path.remote {
2524 debug!("discarding packet with unexpected remote during handshake");
2525 return;
2526 }
2527
2528 let was_closed = self.state.is_closed();
2529 let was_drained = self.state.is_drained();
2530
2531 let decrypted = match packet {
2532 None => Err(None),
2533 Some(mut packet) => self
2534 .decrypt_packet(now, &mut packet)
2535 .map(move |number| (packet, number)),
2536 };
2537 let result = match decrypted {
2538 _ if stateless_reset => {
2539 debug!("got stateless reset");
2540 Err(ConnectionError::Reset)
2541 }
2542 Err(Some(e)) => {
2543 warn!("illegal packet: {}", e);
2544 Err(e.into())
2545 }
2546 Err(None) => {
2547 debug!("failed to authenticate packet");
2548 self.authentication_failures += 1;
2549 let integrity_limit = self.spaces[self.highest_space]
2550 .crypto
2551 .as_ref()
2552 .unwrap()
2553 .packet
2554 .local
2555 .integrity_limit();
2556 if self.authentication_failures > integrity_limit {
2557 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2558 } else {
2559 return;
2560 }
2561 }
2562 Ok((packet, number)) => {
2563 let span = match number {
2564 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2565 None => trace_span!("recv", space = ?packet.header.space()),
2566 };
2567 let _guard = span.enter();
2568
2569 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2570 if number.is_some_and(is_duplicate) {
2571 debug!("discarding possible duplicate packet");
2572 return;
2573 } else if self.state.is_handshake() && packet.header.is_short() {
2574 trace!("dropping short packet during handshake");
2576 return;
2577 } else {
2578 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2579 if let State::Handshake(ref hs) = self.state {
2580 if self.side.is_server() && token != &hs.expected_token {
2581 warn!("discarding Initial with invalid retry token");
2585 return;
2586 }
2587 }
2588 }
2589
2590 if !self.state.is_closed() {
2591 let spin = match packet.header {
2592 Header::Short { spin, .. } => spin,
2593 _ => false,
2594 };
2595 self.on_packet_authenticated(
2596 now,
2597 packet.header.space(),
2598 ecn,
2599 number,
2600 spin,
2601 packet.header.is_1rtt(),
2602 );
2603 }
2604
2605 self.process_decrypted_packet(now, remote, number, packet)
2606 }
2607 }
2608 };
2609
2610 if let Err(conn_err) = result {
2612 self.error = Some(conn_err.clone());
2613 self.state = match conn_err {
2614 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2615 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2616 ConnectionError::Reset
2617 | ConnectionError::TransportError(TransportError {
2618 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2619 ..
2620 }) => State::Drained,
2621 ConnectionError::TimedOut => {
2622 unreachable!("timeouts aren't generated by packet processing");
2623 }
2624 ConnectionError::TransportError(err) => {
2625 debug!("closing connection due to transport error: {}", err);
2626 State::closed(err)
2627 }
2628 ConnectionError::VersionMismatch => State::Draining,
2629 ConnectionError::LocallyClosed => {
2630 unreachable!("LocallyClosed isn't generated by packet processing");
2631 }
2632 ConnectionError::CidsExhausted => {
2633 unreachable!("CidsExhausted isn't generated by packet processing");
2634 }
2635 };
2636 }
2637
2638 if !was_closed && self.state.is_closed() {
2639 self.close_common();
2640 if !self.state.is_drained() {
2641 self.set_close_timer(now);
2642 }
2643 }
2644 if !was_drained && self.state.is_drained() {
2645 self.endpoint_events.push_back(EndpointEventInner::Drained);
2646 self.timers.stop(Timer::Close);
2649 }
2650
2651 if let State::Closed(_) = self.state {
2653 self.close = remote == self.path.remote;
2654 }
2655 }
2656
2657 fn process_decrypted_packet(
2658 &mut self,
2659 now: Instant,
2660 remote: SocketAddr,
2661 number: Option<u64>,
2662 packet: Packet,
2663 ) -> Result<(), ConnectionError> {
2664 let state = match self.state {
2665 State::Established => {
2666 match packet.header.space() {
2667 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2668 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2669 _ => {
2670 trace!("discarding unexpected pre-handshake packet");
2671 }
2672 }
2673 return Ok(());
2674 }
2675 State::Closed(_) => {
2676 for result in frame::Iter::new(packet.payload.freeze())? {
2677 let frame = match result {
2678 Ok(frame) => frame,
2679 Err(err) => {
2680 debug!("frame decoding error: {err:?}");
2681 continue;
2682 }
2683 };
2684
2685 if let Frame::Padding = frame {
2686 continue;
2687 };
2688
2689 self.stats.frame_rx.record(&frame);
2690
2691 if let Frame::Close(_) = frame {
2692 trace!("draining");
2693 self.state = State::Draining;
2694 break;
2695 }
2696 }
2697 return Ok(());
2698 }
2699 State::Draining | State::Drained => return Ok(()),
2700 State::Handshake(ref mut state) => state,
2701 };
2702
2703 match packet.header {
2704 Header::Retry {
2705 src_cid: rem_cid, ..
2706 } => {
2707 if self.side.is_server() {
2708 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2709 }
2710
2711 if self.total_authed_packets > 1
2712 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2714 &self.rem_cids.active(),
2715 &packet.header_data,
2716 &packet.payload,
2717 )
2718 {
2719 trace!("discarding invalid Retry");
2720 return Ok(());
2728 }
2729
2730 trace!("retrying with CID {}", rem_cid);
2731 let client_hello = state.client_hello.take().unwrap();
2732 self.retry_src_cid = Some(rem_cid);
2733 self.rem_cids.update_initial_cid(rem_cid);
2734 self.rem_handshake_cid = rem_cid;
2735
2736 let space = &mut self.spaces[SpaceId::Initial];
2737 if let Some(info) = space.take(0) {
2738 self.on_packet_acked(now, 0, info);
2739 };
2740
2741 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
2743 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2744 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2745 crypto_offset: client_hello.len() as u64,
2746 ..PacketSpace::new(now)
2747 };
2748 self.spaces[SpaceId::Initial]
2749 .pending
2750 .crypto
2751 .push_back(frame::Crypto {
2752 offset: 0,
2753 data: client_hello,
2754 });
2755
2756 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2758 for (pn, info) in zero_rtt {
2759 self.remove_in_flight(pn, &info);
2760 self.spaces[SpaceId::Data].pending |= info.retransmits;
2761 }
2762 self.streams.retransmit_all_for_0rtt();
2763
2764 let token_len = packet.payload.len() - 16;
2765 let ConnectionSide::Client { ref mut token, .. } = self.side else {
2766 unreachable!("we already short-circuited if we're server");
2767 };
2768 *token = packet.payload.freeze().split_to(token_len);
2769 self.state = State::Handshake(state::Handshake {
2770 expected_token: Bytes::new(),
2771 rem_cid_set: false,
2772 client_hello: None,
2773 });
2774 Ok(())
2775 }
2776 Header::Long {
2777 ty: LongType::Handshake,
2778 src_cid: rem_cid,
2779 ..
2780 } => {
2781 if rem_cid != self.rem_handshake_cid {
2782 debug!(
2783 "discarding packet with mismatched remote CID: {} != {}",
2784 self.rem_handshake_cid, rem_cid
2785 );
2786 return Ok(());
2787 }
2788 self.on_path_validated();
2789
2790 self.process_early_payload(now, packet)?;
2791 if self.state.is_closed() {
2792 return Ok(());
2793 }
2794
2795 if self.crypto.is_handshaking() {
2796 trace!("handshake ongoing");
2797 return Ok(());
2798 }
2799
2800 if self.side.is_client() {
2801 let params =
2803 self.crypto
2804 .transport_parameters()?
2805 .ok_or_else(|| TransportError {
2806 code: TransportErrorCode::crypto(0x6d),
2807 frame: None,
2808 reason: "transport parameters missing".into(),
2809 })?;
2810
2811 if self.has_0rtt() {
2812 if !self.crypto.early_data_accepted().unwrap() {
2813 debug_assert!(self.side.is_client());
2814 debug!("0-RTT rejected");
2815 self.accepted_0rtt = false;
2816 self.streams.zero_rtt_rejected();
2817
2818 self.spaces[SpaceId::Data].pending = Retransmits::default();
2820
2821 let sent_packets =
2823 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2824 for (pn, packet) in sent_packets {
2825 self.remove_in_flight(pn, &packet);
2826 }
2827 } else {
2828 self.accepted_0rtt = true;
2829 params.validate_resumption_from(&self.peer_params)?;
2830 }
2831 }
2832 if let Some(token) = params.stateless_reset_token {
2833 self.endpoint_events
2834 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2835 }
2836 self.handle_peer_params(params)?;
2837 self.issue_first_cids(now);
2838 } else {
2839 self.spaces[SpaceId::Data].pending.handshake_done = true;
2841 self.discard_space(now, SpaceId::Handshake);
2842 }
2843
2844 self.events.push_back(Event::Connected);
2845 self.state = State::Established;
2846 trace!("established");
2847 Ok(())
2848 }
2849 Header::Initial(InitialHeader {
2850 src_cid: rem_cid, ..
2851 }) => {
2852 if !state.rem_cid_set {
2853 trace!("switching remote CID to {}", rem_cid);
2854 let mut state = state.clone();
2855 self.rem_cids.update_initial_cid(rem_cid);
2856 self.rem_handshake_cid = rem_cid;
2857 self.orig_rem_cid = rem_cid;
2858 state.rem_cid_set = true;
2859 self.state = State::Handshake(state);
2860 } else if rem_cid != self.rem_handshake_cid {
2861 debug!(
2862 "discarding packet with mismatched remote CID: {} != {}",
2863 self.rem_handshake_cid, rem_cid
2864 );
2865 return Ok(());
2866 }
2867
2868 let starting_space = self.highest_space;
2869 self.process_early_payload(now, packet)?;
2870
2871 if self.side.is_server()
2872 && starting_space == SpaceId::Initial
2873 && self.highest_space != SpaceId::Initial
2874 {
2875 let params =
2876 self.crypto
2877 .transport_parameters()?
2878 .ok_or_else(|| TransportError {
2879 code: TransportErrorCode::crypto(0x6d),
2880 frame: None,
2881 reason: "transport parameters missing".into(),
2882 })?;
2883 self.handle_peer_params(params)?;
2884 self.issue_first_cids(now);
2885 self.init_0rtt();
2886 }
2887 Ok(())
2888 }
2889 Header::Long {
2890 ty: LongType::ZeroRtt,
2891 ..
2892 } => {
2893 self.process_payload(now, remote, number.unwrap(), packet)?;
2894 Ok(())
2895 }
2896 Header::VersionNegotiate { .. } => {
2897 if self.total_authed_packets > 1 {
2898 return Ok(());
2899 }
2900 let supported = packet
2901 .payload
2902 .chunks(4)
2903 .any(|x| match <[u8; 4]>::try_from(x) {
2904 Ok(version) => self.version == u32::from_be_bytes(version),
2905 Err(_) => false,
2906 });
2907 if supported {
2908 return Ok(());
2909 }
2910 debug!("remote doesn't support our version");
2911 Err(ConnectionError::VersionMismatch)
2912 }
2913 Header::Short { .. } => unreachable!(
2914 "short packets received during handshake are discarded in handle_packet"
2915 ),
2916 }
2917 }
2918
2919 fn process_early_payload(
2921 &mut self,
2922 now: Instant,
2923 packet: Packet,
2924 ) -> Result<(), TransportError> {
2925 debug_assert_ne!(packet.header.space(), SpaceId::Data);
2926 let payload_len = packet.payload.len();
2927 let mut ack_eliciting = false;
2928 for result in frame::Iter::new(packet.payload.freeze())? {
2929 let frame = result?;
2930 let span = match frame {
2931 Frame::Padding => continue,
2932 _ => Some(trace_span!("frame", ty = %frame.ty())),
2933 };
2934
2935 self.stats.frame_rx.record(&frame);
2936
2937 let _guard = span.as_ref().map(|x| x.enter());
2938 ack_eliciting |= frame.is_ack_eliciting();
2939
2940 match frame {
2942 Frame::Padding | Frame::Ping => {}
2943 Frame::Crypto(frame) => {
2944 self.read_crypto(packet.header.space(), &frame, payload_len)?;
2945 }
2946 Frame::Ack(ack) => {
2947 self.on_ack_received(now, packet.header.space(), ack)?;
2948 }
2949 Frame::Close(reason) => {
2950 self.error = Some(reason.into());
2951 self.state = State::Draining;
2952 return Ok(());
2953 }
2954 _ => {
2955 let mut err =
2956 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
2957 err.frame = Some(frame.ty());
2958 return Err(err);
2959 }
2960 }
2961 }
2962
2963 if ack_eliciting {
2964 self.spaces[packet.header.space()]
2966 .pending_acks
2967 .set_immediate_ack_required();
2968 }
2969
2970 self.write_crypto();
2971 Ok(())
2972 }
2973
2974 fn process_payload(
2975 &mut self,
2976 now: Instant,
2977 remote: SocketAddr,
2978 number: u64,
2979 packet: Packet,
2980 ) -> Result<(), TransportError> {
2981 let payload = packet.payload.freeze();
2982 let mut is_probing_packet = true;
2983 let mut close = None;
2984 let payload_len = payload.len();
2985 let mut ack_eliciting = false;
2986 for result in frame::Iter::new(payload)? {
2987 let frame = result?;
2988 let span = match frame {
2989 Frame::Padding => continue,
2990 _ => Some(trace_span!("frame", ty = %frame.ty())),
2991 };
2992
2993 self.stats.frame_rx.record(&frame);
2994 match &frame {
2997 Frame::Crypto(f) => {
2998 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
2999 }
3000 Frame::Stream(f) => {
3001 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
3002 }
3003 Frame::Datagram(f) => {
3004 trace!(len = f.data.len(), "got datagram frame");
3005 }
3006 f => {
3007 trace!("got frame {:?}", f);
3008 }
3009 }
3010
3011 let _guard = span.as_ref().map(|x| x.enter());
3012 if packet.header.is_0rtt() {
3013 match frame {
3014 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
3015 return Err(TransportError::PROTOCOL_VIOLATION(
3016 "illegal frame type in 0-RTT",
3017 ));
3018 }
3019 _ => {}
3020 }
3021 }
3022 ack_eliciting |= frame.is_ack_eliciting();
3023
3024 match frame {
3026 Frame::Padding
3027 | Frame::PathChallenge(_)
3028 | Frame::PathResponse(_)
3029 | Frame::NewConnectionId(_) => {}
3030 _ => {
3031 is_probing_packet = false;
3032 }
3033 }
3034 match frame {
3035 Frame::Crypto(frame) => {
3036 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
3037 }
3038 Frame::Stream(frame) => {
3039 if self.streams.received(frame, payload_len)?.should_transmit() {
3040 self.spaces[SpaceId::Data].pending.max_data = true;
3041 }
3042 }
3043 Frame::Ack(ack) => {
3044 self.on_ack_received(now, SpaceId::Data, ack)?;
3045 }
3046 Frame::Padding | Frame::Ping => {}
3047 Frame::Close(reason) => {
3048 close = Some(reason);
3049 }
3050 Frame::PathChallenge(token) => {
3051 self.path_responses.push(number, token, remote);
3052 if remote == self.path.remote {
3053 match self.peer_supports_ack_frequency() {
3056 true => self.immediate_ack(),
3057 false => self.ping(),
3058 }
3059 }
3060 }
3061 Frame::PathResponse(token) => {
3062 if self.path.challenge == Some(token) && remote == self.path.remote {
3063 trace!("new path validated");
3064 self.timers.stop(Timer::PathValidation);
3065 self.path.challenge = None;
3066 self.path.validated = true;
3067 if let Some((_, ref mut prev_path)) = self.prev_path {
3068 prev_path.challenge = None;
3069 prev_path.challenge_pending = false;
3070 }
3071 self.on_path_validated();
3072 } else if let Some(nat_traversal) = &mut self.nat_traversal {
3073 match nat_traversal.handle_validation_success(remote, token, now) {
3075 Ok(sequence) => {
3076 trace!("NAT traversal candidate {} validated for sequence {}", remote, sequence);
3077
3078 if nat_traversal.handle_coordination_success(remote, now) {
3080 trace!("Coordination succeeded via {}", remote);
3081
3082 let can_migrate = match &self.side {
3084 ConnectionSide::Client { .. } => true, ConnectionSide::Server { server_config } => server_config.migration,
3086 };
3087
3088 if can_migrate {
3089 let best_pairs = nat_traversal.get_best_succeeded_pairs();
3091 if let Some(best) = best_pairs.first() {
3092 if best.remote_addr == remote && best.remote_addr != self.path.remote {
3093 debug!("NAT traversal found better path, initiating migration");
3094 if let Err(e) = self.migrate_to_nat_traversal_path(now) {
3096 warn!("Failed to migrate to NAT traversal path: {:?}", e);
3097 }
3098 }
3099 }
3100 }
3101 } else {
3102 if nat_traversal.mark_pair_succeeded(remote) {
3104 trace!("NAT traversal pair succeeded for {}", remote);
3105 }
3106 }
3107 }
3108 Err(NatTraversalError::ChallengeMismatch) => {
3109 debug!("PATH_RESPONSE challenge mismatch for NAT candidate {}", remote);
3110 }
3111 Err(e) => {
3112 debug!("NAT traversal validation error: {}", e);
3113 }
3114 }
3115 } else {
3116 debug!(token, "ignoring invalid PATH_RESPONSE");
3117 }
3118 }
3119 Frame::MaxData(bytes) => {
3120 self.streams.received_max_data(bytes);
3121 }
3122 Frame::MaxStreamData { id, offset } => {
3123 self.streams.received_max_stream_data(id, offset)?;
3124 }
3125 Frame::MaxStreams { dir, count } => {
3126 self.streams.received_max_streams(dir, count)?;
3127 }
3128 Frame::ResetStream(frame) => {
3129 if self.streams.received_reset(frame)?.should_transmit() {
3130 self.spaces[SpaceId::Data].pending.max_data = true;
3131 }
3132 }
3133 Frame::DataBlocked { offset } => {
3134 debug!(offset, "peer claims to be blocked at connection level");
3135 }
3136 Frame::StreamDataBlocked { id, offset } => {
3137 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
3138 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
3139 return Err(TransportError::STREAM_STATE_ERROR(
3140 "STREAM_DATA_BLOCKED on send-only stream",
3141 ));
3142 }
3143 debug!(
3144 stream = %id,
3145 offset, "peer claims to be blocked at stream level"
3146 );
3147 }
3148 Frame::StreamsBlocked { dir, limit } => {
3149 if limit > MAX_STREAM_COUNT {
3150 return Err(TransportError::FRAME_ENCODING_ERROR(
3151 "unrepresentable stream limit",
3152 ));
3153 }
3154 debug!(
3155 "peer claims to be blocked opening more than {} {} streams",
3156 limit, dir
3157 );
3158 }
3159 Frame::StopSending(frame::StopSending { id, error_code }) => {
3160 if id.initiator() != self.side.side() {
3161 if id.dir() == Dir::Uni {
3162 debug!("got STOP_SENDING on recv-only {}", id);
3163 return Err(TransportError::STREAM_STATE_ERROR(
3164 "STOP_SENDING on recv-only stream",
3165 ));
3166 }
3167 } else if self.streams.is_local_unopened(id) {
3168 return Err(TransportError::STREAM_STATE_ERROR(
3169 "STOP_SENDING on unopened stream",
3170 ));
3171 }
3172 self.streams.received_stop_sending(id, error_code);
3173 }
3174 Frame::RetireConnectionId { sequence } => {
3175 let allow_more_cids = self
3176 .local_cid_state
3177 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
3178 self.endpoint_events
3179 .push_back(EndpointEventInner::RetireConnectionId(
3180 now,
3181 sequence,
3182 allow_more_cids,
3183 ));
3184 }
3185 Frame::NewConnectionId(frame) => {
3186 trace!(
3187 sequence = frame.sequence,
3188 id = %frame.id,
3189 retire_prior_to = frame.retire_prior_to,
3190 );
3191 if self.rem_cids.active().is_empty() {
3192 return Err(TransportError::PROTOCOL_VIOLATION(
3193 "NEW_CONNECTION_ID when CIDs aren't in use",
3194 ));
3195 }
3196 if frame.retire_prior_to > frame.sequence {
3197 return Err(TransportError::PROTOCOL_VIOLATION(
3198 "NEW_CONNECTION_ID retiring unissued CIDs",
3199 ));
3200 }
3201
3202 use crate::cid_queue::InsertError;
3203 match self.rem_cids.insert(frame) {
3204 Ok(None) => {}
3205 Ok(Some((retired, reset_token))) => {
3206 let pending_retired =
3207 &mut self.spaces[SpaceId::Data].pending.retire_cids;
3208 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
3211 if (pending_retired.len() as u64)
3214 .saturating_add(retired.end.saturating_sub(retired.start))
3215 > MAX_PENDING_RETIRED_CIDS
3216 {
3217 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
3218 "queued too many retired CIDs",
3219 ));
3220 }
3221 pending_retired.extend(retired);
3222 self.set_reset_token(reset_token);
3223 }
3224 Err(InsertError::ExceedsLimit) => {
3225 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
3226 }
3227 Err(InsertError::Retired) => {
3228 trace!("discarding already-retired");
3229 self.spaces[SpaceId::Data]
3233 .pending
3234 .retire_cids
3235 .push(frame.sequence);
3236 continue;
3237 }
3238 };
3239
3240 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
3241 self.update_rem_cid();
3244 }
3245 }
3246 Frame::NewToken(NewToken { token }) => {
3247 let ConnectionSide::Client {
3248 token_store,
3249 server_name,
3250 ..
3251 } = &self.side
3252 else {
3253 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
3254 };
3255 if token.is_empty() {
3256 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
3257 }
3258 trace!("got new token");
3259 token_store.insert(server_name, token);
3260 }
3261 Frame::Datagram(datagram) => {
3262 if self
3263 .datagrams
3264 .received(datagram, &self.config.datagram_receive_buffer_size)?
3265 {
3266 self.events.push_back(Event::DatagramReceived);
3267 }
3268 }
3269 Frame::AckFrequency(ack_frequency) => {
3270 let space = &mut self.spaces[SpaceId::Data];
3272
3273 if !self
3274 .ack_frequency
3275 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3276 {
3277 continue;
3279 }
3280
3281 if let Some(timeout) = space
3284 .pending_acks
3285 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3286 {
3287 self.timers.set(Timer::MaxAckDelay, timeout);
3288 }
3289 }
3290 Frame::ImmediateAck => {
3291 self.spaces[SpaceId::Data]
3293 .pending_acks
3294 .set_immediate_ack_required();
3295 }
3296 Frame::HandshakeDone => {
3297 if self.side.is_server() {
3298 return Err(TransportError::PROTOCOL_VIOLATION(
3299 "client sent HANDSHAKE_DONE",
3300 ));
3301 }
3302 if self.spaces[SpaceId::Handshake].crypto.is_some() {
3303 self.discard_space(now, SpaceId::Handshake);
3304 }
3305 }
3306 Frame::AddAddress(add_address) => {
3307 self.handle_add_address(&add_address, now)?;
3308 }
3309 Frame::PunchMeNow(punch_me_now) => {
3310 self.handle_punch_me_now(&punch_me_now, now)?;
3311 }
3312 Frame::RemoveAddress(remove_address) => {
3313 self.handle_remove_address(&remove_address)?;
3314 }
3315 }
3316 }
3317
3318 let space = &mut self.spaces[SpaceId::Data];
3319 if space
3320 .pending_acks
3321 .packet_received(now, number, ack_eliciting, &space.dedup)
3322 {
3323 self.timers
3324 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3325 }
3326
3327 let pending = &mut self.spaces[SpaceId::Data].pending;
3332 self.streams.queue_max_stream_id(pending);
3333
3334 if let Some(reason) = close {
3335 self.error = Some(reason.into());
3336 self.state = State::Draining;
3337 self.close = true;
3338 }
3339
3340 if remote != self.path.remote
3341 && !is_probing_packet
3342 && number == self.spaces[SpaceId::Data].rx_packet
3343 {
3344 let ConnectionSide::Server { ref server_config } = self.side else {
3345 panic!("packets from unknown remote should be dropped by clients");
3346 };
3347 debug_assert!(
3348 server_config.migration,
3349 "migration-initiating packets should have been dropped immediately"
3350 );
3351 self.migrate(now, remote);
3352 self.update_rem_cid();
3354 self.spin = false;
3355 }
3356
3357 Ok(())
3358 }
3359
3360 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3361 trace!(%remote, "migration initiated");
3362 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3366 PathData::from_previous(remote, &self.path, now)
3367 } else {
3368 let peer_max_udp_payload_size =
3369 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3370 .unwrap_or(u16::MAX);
3371 PathData::new(
3372 remote,
3373 self.allow_mtud,
3374 Some(peer_max_udp_payload_size),
3375 now,
3376 &self.config,
3377 )
3378 };
3379 new_path.challenge = Some(self.rng.gen());
3380 new_path.challenge_pending = true;
3381 let prev_pto = self.pto(SpaceId::Data);
3382
3383 let mut prev = mem::replace(&mut self.path, new_path);
3384 if prev.challenge.is_none() {
3386 prev.challenge = Some(self.rng.gen());
3387 prev.challenge_pending = true;
3388 self.prev_path = Some((self.rem_cids.active(), prev));
3391 }
3392
3393 self.timers.set(
3394 Timer::PathValidation,
3395 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3396 );
3397 }
3398
3399 pub fn local_address_changed(&mut self) {
3401 self.update_rem_cid();
3402 self.ping();
3403 }
3404
3405 pub fn migrate_to_nat_traversal_path(&mut self, now: Instant) -> Result<(), TransportError> {
3407 let (remote_addr, local_addr) = {
3409 let nat_state = self.nat_traversal.as_ref()
3410 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
3411
3412 let best_pairs = nat_state.get_best_succeeded_pairs();
3414 if best_pairs.is_empty() {
3415 return Err(TransportError::PROTOCOL_VIOLATION("No validated NAT traversal paths"));
3416 }
3417
3418 let best_path = best_pairs.iter()
3420 .find(|pair| pair.remote_addr != self.path.remote)
3421 .or_else(|| best_pairs.first());
3422
3423 let best_path = best_path
3424 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("No suitable NAT traversal path"))?;
3425
3426 debug!("Migrating to NAT traversal path: {} -> {} (priority: {})",
3427 self.path.remote, best_path.remote_addr, best_path.priority);
3428
3429 (best_path.remote_addr, best_path.local_addr)
3430 };
3431
3432 self.migrate(now, remote_addr);
3434
3435 if local_addr != SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0) {
3437 self.local_ip = Some(local_addr.ip());
3438 }
3439
3440 self.path.challenge_pending = true;
3442
3443 Ok(())
3444 }
3445
3446 fn update_rem_cid(&mut self) {
3448 let (reset_token, retired) = match self.rem_cids.next() {
3449 Some(x) => x,
3450 None => return,
3451 };
3452
3453 self.spaces[SpaceId::Data]
3455 .pending
3456 .retire_cids
3457 .extend(retired);
3458 self.set_reset_token(reset_token);
3459 }
3460
3461 fn set_reset_token(&mut self, reset_token: ResetToken) {
3462 self.endpoint_events
3463 .push_back(EndpointEventInner::ResetToken(
3464 self.path.remote,
3465 reset_token,
3466 ));
3467 self.peer_params.stateless_reset_token = Some(reset_token);
3468 }
3469
3470 fn issue_first_cids(&mut self, now: Instant) {
3472 if self.local_cid_state.cid_len() == 0 {
3473 return;
3474 }
3475
3476 let mut n = self.peer_params.issue_cids_limit() - 1;
3478 if let ConnectionSide::Server { server_config } = &self.side {
3479 if server_config.has_preferred_address() {
3480 n -= 1;
3482 }
3483 }
3484 self.endpoint_events
3485 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3486 }
3487
3488 fn populate_packet(
3489 &mut self,
3490 now: Instant,
3491 space_id: SpaceId,
3492 buf: &mut Vec<u8>,
3493 max_size: usize,
3494 pn: u64,
3495 ) -> SentFrames {
3496 let mut sent = SentFrames::default();
3497 let space = &mut self.spaces[space_id];
3498 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3499 space.pending_acks.maybe_ack_non_eliciting();
3500
3501 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3503 buf.write(frame::FrameType::HANDSHAKE_DONE);
3504 sent.retransmits.get_or_create().handshake_done = true;
3505 self.stats.frame_tx.handshake_done =
3507 self.stats.frame_tx.handshake_done.saturating_add(1);
3508 }
3509
3510 if mem::replace(&mut space.ping_pending, false) {
3512 trace!("PING");
3513 buf.write(frame::FrameType::PING);
3514 sent.non_retransmits = true;
3515 self.stats.frame_tx.ping += 1;
3516 }
3517
3518 if mem::replace(&mut space.immediate_ack_pending, false) {
3520 trace!("IMMEDIATE_ACK");
3521 buf.write(frame::FrameType::IMMEDIATE_ACK);
3522 sent.non_retransmits = true;
3523 self.stats.frame_tx.immediate_ack += 1;
3524 }
3525
3526 if space.pending_acks.can_send() {
3528 Self::populate_acks(
3529 now,
3530 self.receiving_ecn,
3531 &mut sent,
3532 space,
3533 buf,
3534 &mut self.stats,
3535 );
3536 }
3537
3538 if mem::replace(&mut space.pending.ack_frequency, false) {
3540 let sequence_number = self.ack_frequency.next_sequence_number();
3541
3542 let config = self.config.ack_frequency_config.as_ref().unwrap();
3544
3545 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3547 self.path.rtt.get(),
3548 config,
3549 &self.peer_params,
3550 );
3551
3552 trace!(?max_ack_delay, "ACK_FREQUENCY");
3553
3554 frame::AckFrequency {
3555 sequence: sequence_number,
3556 ack_eliciting_threshold: config.ack_eliciting_threshold,
3557 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3558 reordering_threshold: config.reordering_threshold,
3559 }
3560 .encode(buf);
3561
3562 sent.retransmits.get_or_create().ack_frequency = true;
3563
3564 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3565 self.stats.frame_tx.ack_frequency += 1;
3566 }
3567
3568 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3570 if let Some(token) = self.path.challenge {
3572 self.path.challenge_pending = false;
3574 sent.non_retransmits = true;
3575 sent.requires_padding = true;
3576 trace!("PATH_CHALLENGE {:08x}", token);
3577 buf.write(frame::FrameType::PATH_CHALLENGE);
3578 buf.write(token);
3579 self.stats.frame_tx.path_challenge += 1;
3580 }
3581
3582 }
3591
3592 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3594 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3595 sent.non_retransmits = true;
3596 sent.requires_padding = true;
3597 trace!("PATH_RESPONSE {:08x}", token);
3598 buf.write(frame::FrameType::PATH_RESPONSE);
3599 buf.write(token);
3600 self.stats.frame_tx.path_response += 1;
3601 }
3602 }
3603
3604 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3606 let mut frame = match space.pending.crypto.pop_front() {
3607 Some(x) => x,
3608 None => break,
3609 };
3610
3611 let max_crypto_data_size = max_size
3616 - buf.len()
3617 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3619 - 2; let len = frame
3622 .data
3623 .len()
3624 .min(2usize.pow(14) - 1)
3625 .min(max_crypto_data_size);
3626
3627 let data = frame.data.split_to(len);
3628 let truncated = frame::Crypto {
3629 offset: frame.offset,
3630 data,
3631 };
3632 trace!(
3633 "CRYPTO: off {} len {}",
3634 truncated.offset,
3635 truncated.data.len()
3636 );
3637 truncated.encode(buf);
3638 self.stats.frame_tx.crypto += 1;
3639 sent.retransmits.get_or_create().crypto.push_back(truncated);
3640 if !frame.data.is_empty() {
3641 frame.offset += len as u64;
3642 space.pending.crypto.push_front(frame);
3643 }
3644 }
3645
3646 if space_id == SpaceId::Data {
3647 self.streams.write_control_frames(
3648 buf,
3649 &mut space.pending,
3650 &mut sent.retransmits,
3651 &mut self.stats.frame_tx,
3652 max_size,
3653 );
3654 }
3655
3656 while buf.len() + 44 < max_size {
3658 let issued = match space.pending.new_cids.pop() {
3659 Some(x) => x,
3660 None => break,
3661 };
3662 trace!(
3663 sequence = issued.sequence,
3664 id = %issued.id,
3665 "NEW_CONNECTION_ID"
3666 );
3667 frame::NewConnectionId {
3668 sequence: issued.sequence,
3669 retire_prior_to: self.local_cid_state.retire_prior_to(),
3670 id: issued.id,
3671 reset_token: issued.reset_token,
3672 }
3673 .encode(buf);
3674 sent.retransmits.get_or_create().new_cids.push(issued);
3675 self.stats.frame_tx.new_connection_id += 1;
3676 }
3677
3678 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3680 let seq = match space.pending.retire_cids.pop() {
3681 Some(x) => x,
3682 None => break,
3683 };
3684 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3685 buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3686 buf.write_var(seq);
3687 sent.retransmits.get_or_create().retire_cids.push(seq);
3688 self.stats.frame_tx.retire_connection_id += 1;
3689 }
3690
3691 let mut sent_datagrams = false;
3693 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3694 match self.datagrams.write(buf, max_size) {
3695 true => {
3696 sent_datagrams = true;
3697 sent.non_retransmits = true;
3698 self.stats.frame_tx.datagram += 1;
3699 }
3700 false => break,
3701 }
3702 }
3703 if self.datagrams.send_blocked && sent_datagrams {
3704 self.events.push_back(Event::DatagramsUnblocked);
3705 self.datagrams.send_blocked = false;
3706 }
3707
3708 while let Some(remote_addr) = space.pending.new_tokens.pop() {
3710 debug_assert_eq!(space_id, SpaceId::Data);
3711 let ConnectionSide::Server { server_config } = &self.side else {
3712 panic!("NEW_TOKEN frames should not be enqueued by clients");
3713 };
3714
3715 if remote_addr != self.path.remote {
3716 continue;
3721 }
3722
3723 let token = Token::new(
3724 TokenPayload::Validation {
3725 ip: remote_addr.ip(),
3726 issued: server_config.time_source.now(),
3727 },
3728 &mut self.rng,
3729 );
3730 let new_token = NewToken {
3731 token: token.encode(&*server_config.token_key).into(),
3732 };
3733
3734 if buf.len() + new_token.size() >= max_size {
3735 space.pending.new_tokens.push(remote_addr);
3736 break;
3737 }
3738
3739 new_token.encode(buf);
3740 sent.retransmits
3741 .get_or_create()
3742 .new_tokens
3743 .push(remote_addr);
3744 self.stats.frame_tx.new_token += 1;
3745 }
3746
3747 while buf.len() + frame::AddAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3749 let add_address = match space.pending.add_addresses.pop() {
3750 Some(x) => x,
3751 None => break,
3752 };
3753 trace!(
3754 sequence = %add_address.sequence,
3755 address = %add_address.address,
3756 "ADD_ADDRESS"
3757 );
3758 add_address.encode(buf);
3759 sent.retransmits.get_or_create().add_addresses.push(add_address);
3760 self.stats.frame_tx.add_address += 1;
3761 }
3762
3763 while buf.len() + frame::PunchMeNow::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3765 let punch_me_now = match space.pending.punch_me_now.pop() {
3766 Some(x) => x,
3767 None => break,
3768 };
3769 trace!(
3770 round = %punch_me_now.round,
3771 target_sequence = %punch_me_now.target_sequence,
3772 "PUNCH_ME_NOW"
3773 );
3774 punch_me_now.encode(buf);
3775 sent.retransmits.get_or_create().punch_me_now.push(punch_me_now);
3776 self.stats.frame_tx.punch_me_now += 1;
3777 }
3778
3779 while buf.len() + frame::RemoveAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3781 let remove_address = match space.pending.remove_addresses.pop() {
3782 Some(x) => x,
3783 None => break,
3784 };
3785 trace!(
3786 sequence = %remove_address.sequence,
3787 "REMOVE_ADDRESS"
3788 );
3789 remove_address.encode(buf);
3790 sent.retransmits.get_or_create().remove_addresses.push(remove_address);
3791 self.stats.frame_tx.remove_address += 1;
3792 }
3793
3794 if space_id == SpaceId::Data {
3796 sent.stream_frames =
3797 self.streams
3798 .write_stream_frames(buf, max_size, self.config.send_fairness);
3799 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
3800 }
3801
3802 sent
3803 }
3804
3805 fn populate_acks(
3810 now: Instant,
3811 receiving_ecn: bool,
3812 sent: &mut SentFrames,
3813 space: &mut PacketSpace,
3814 buf: &mut Vec<u8>,
3815 stats: &mut ConnectionStats,
3816 ) {
3817 debug_assert!(!space.pending_acks.ranges().is_empty());
3818
3819 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
3821 let ecn = if receiving_ecn {
3822 Some(&space.ecn_counters)
3823 } else {
3824 None
3825 };
3826 sent.largest_acked = space.pending_acks.ranges().max();
3827
3828 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
3829
3830 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
3832 let delay = delay_micros >> ack_delay_exp.into_inner();
3833
3834 trace!(
3835 "ACK {:?}, Delay = {}us",
3836 space.pending_acks.ranges(),
3837 delay_micros
3838 );
3839
3840 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
3841 stats.frame_tx.acks += 1;
3842 }
3843
3844 fn close_common(&mut self) {
3845 trace!("connection closed");
3846 for &timer in &Timer::VALUES {
3847 self.timers.stop(timer);
3848 }
3849 }
3850
3851 fn set_close_timer(&mut self, now: Instant) {
3852 self.timers
3853 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
3854 }
3855
3856 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
3858 if Some(self.orig_rem_cid) != params.initial_src_cid
3859 || (self.side.is_client()
3860 && (Some(self.initial_dst_cid) != params.original_dst_cid
3861 || self.retry_src_cid != params.retry_src_cid))
3862 {
3863 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
3864 "CID authentication failure",
3865 ));
3866 }
3867
3868 self.set_peer_params(params);
3869
3870 Ok(())
3871 }
3872
3873 fn set_peer_params(&mut self, params: TransportParameters) {
3874 self.streams.set_params(¶ms);
3875 self.idle_timeout =
3876 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
3877 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
3878 if let Some(ref info) = params.preferred_address {
3879 self.rem_cids.insert(frame::NewConnectionId {
3880 sequence: 1,
3881 id: info.connection_id,
3882 reset_token: info.stateless_reset_token,
3883 retire_prior_to: 0,
3884 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
3885 }
3886 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
3887
3888 self.negotiate_nat_traversal_capability(¶ms);
3890
3891 self.peer_params = params;
3892 self.path.mtud.on_peer_max_udp_payload_size_received(
3893 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
3894 );
3895 }
3896
3897 fn negotiate_nat_traversal_capability(&mut self, params: &TransportParameters) {
3899 let peer_nat_config = match ¶ms.nat_traversal {
3901 Some(config) => config,
3902 None => {
3903 if self.config.nat_traversal_config.is_some() {
3905 debug!("Peer does not support NAT traversal, maintaining backward compatibility");
3906 self.emit_nat_traversal_capability_event(false);
3907
3908 self.set_nat_traversal_compatibility_mode(false);
3910 }
3911 return;
3912 }
3913 };
3914
3915 let local_nat_config = match &self.config.nat_traversal_config {
3917 Some(config) => config,
3918 None => {
3919 debug!("NAT traversal not enabled locally, ignoring peer support");
3920 self.emit_nat_traversal_capability_event(false);
3921 self.set_nat_traversal_compatibility_mode(false);
3922 return;
3923 }
3924 };
3925
3926 info!("Both peers support NAT traversal, negotiating capabilities");
3928
3929 match self.negotiate_nat_traversal_parameters(local_nat_config, peer_nat_config) {
3931 Ok(negotiated_config) => {
3932 info!("NAT traversal capability negotiated successfully");
3933 self.emit_nat_traversal_capability_event(true);
3934
3935 self.init_nat_traversal_with_negotiated_config(&negotiated_config);
3937
3938 self.set_nat_traversal_compatibility_mode(true);
3940
3941 if matches!(negotiated_config.role, crate::transport_parameters::NatTraversalRole::Client) {
3943 self.initiate_nat_traversal_process();
3944 }
3945 }
3946 Err(e) => {
3947 warn!("NAT traversal capability negotiation failed: {}", e);
3948 self.emit_nat_traversal_capability_event(false);
3949 self.set_nat_traversal_compatibility_mode(false);
3950 }
3951 }
3952 }
3953
3954 fn validate_nat_traversal_roles(
3956 &self,
3957 local_config: &crate::transport_parameters::NatTraversalConfig,
3958 peer_config: &crate::transport_parameters::NatTraversalConfig,
3959 ) -> Result<(), String> {
3960
3961 match (&local_config.role, &peer_config.role) {
3963 (crate::transport_parameters::NatTraversalRole::Bootstrap, crate::transport_parameters::NatTraversalRole::Bootstrap) => {
3965 debug!("Both endpoints are bootstrap nodes - unusual but allowed");
3966 }
3967 (crate::transport_parameters::NatTraversalRole::Client, crate::transport_parameters::NatTraversalRole::Server { .. }) |
3969 (crate::transport_parameters::NatTraversalRole::Server { .. }, crate::transport_parameters::NatTraversalRole::Client) => {
3970 debug!("Client-Server NAT traversal role combination");
3971 }
3972 (crate::transport_parameters::NatTraversalRole::Bootstrap, _) | (_, crate::transport_parameters::NatTraversalRole::Bootstrap) => {
3974 debug!("Bootstrap node coordination");
3975 }
3976 (crate::transport_parameters::NatTraversalRole::Client, crate::transport_parameters::NatTraversalRole::Client) => {
3978 debug!("Client-Client connection requires bootstrap coordination");
3979 }
3980 (crate::transport_parameters::NatTraversalRole::Server { .. }, crate::transport_parameters::NatTraversalRole::Server { .. }) => {
3982 debug!("Server-Server connection");
3983 }
3984 }
3985
3986 Ok(())
3987 }
3988
3989 fn emit_nat_traversal_capability_event(&mut self, negotiated: bool) {
3991 if negotiated {
3994 info!("NAT traversal capability successfully negotiated");
3995 } else {
3996 info!("NAT traversal capability not available (peer or local support missing)");
3997 }
3998
3999 }
4002
4003 fn set_nat_traversal_compatibility_mode(&mut self, enabled: bool) {
4005 if enabled {
4006 debug!("NAT traversal enabled for this connection");
4007 } else {
4009 debug!("NAT traversal disabled for this connection (backward compatibility mode)");
4010 if self.nat_traversal.is_some() {
4012 warn!("Clearing NAT traversal state due to compatibility mode");
4013 self.nat_traversal = None;
4014 }
4015 }
4016 }
4017
4018 fn negotiate_nat_traversal_parameters(
4020 &self,
4021 local_config: &crate::transport_parameters::NatTraversalConfig,
4022 peer_config: &crate::transport_parameters::NatTraversalConfig,
4023 ) -> Result<crate::transport_parameters::NatTraversalConfig, String> {
4024
4025 self.validate_nat_traversal_roles(local_config, peer_config)?;
4027
4028 let negotiated_role = local_config.role;
4030
4031 let max_candidates = local_config.max_candidates.into_inner()
4033 .min(peer_config.max_candidates.into_inner())
4034 .min(50); let coordination_timeout = local_config.coordination_timeout.into_inner()
4037 .min(peer_config.coordination_timeout.into_inner())
4038 .min(30000); let max_concurrent_attempts = local_config.max_concurrent_attempts.into_inner()
4041 .min(peer_config.max_concurrent_attempts.into_inner())
4042 .min(10); let peer_id = local_config.peer_id.or(peer_config.peer_id);
4046
4047 let negotiated_config = crate::transport_parameters::NatTraversalConfig {
4048 role: negotiated_role,
4049 max_candidates: VarInt::from_u64(max_candidates).unwrap(),
4050 coordination_timeout: VarInt::from_u64(coordination_timeout).unwrap(),
4051 max_concurrent_attempts: VarInt::from_u64(max_concurrent_attempts).unwrap(),
4052 peer_id,
4053 };
4054
4055 negotiated_config.validate()
4057 .map_err(|e| format!("Negotiated configuration validation failed: {:?}", e))?;
4058
4059 debug!(
4060 "NAT traversal parameters negotiated: role={:?}, max_candidates={}, timeout={}ms, max_attempts={}",
4061 negotiated_role, max_candidates, coordination_timeout, max_concurrent_attempts
4062 );
4063
4064 Ok(negotiated_config)
4065 }
4066
4067 fn init_nat_traversal_with_negotiated_config(&mut self, config: &crate::transport_parameters::NatTraversalConfig) {
4069
4070 let role = match config.role {
4072 crate::transport_parameters::NatTraversalRole::Client => NatTraversalRole::Client,
4073 crate::transport_parameters::NatTraversalRole::Server { can_relay } => NatTraversalRole::Server { can_relay },
4074 crate::transport_parameters::NatTraversalRole::Bootstrap => NatTraversalRole::Bootstrap,
4075 };
4076
4077 let max_candidates = config.max_candidates.into_inner().min(50) as u32;
4078 let coordination_timeout = Duration::from_millis(
4079 config.coordination_timeout.into_inner().min(30000)
4080 );
4081
4082 self.nat_traversal = Some(NatTraversalState::new(
4084 role,
4085 max_candidates,
4086 coordination_timeout,
4087 ));
4088
4089 trace!("NAT traversal initialized with negotiated config: role={:?}", role);
4090
4091 match role {
4093 NatTraversalRole::Bootstrap => {
4094 self.prepare_address_observation();
4096 }
4097 NatTraversalRole::Client => {
4098 self.schedule_candidate_discovery();
4100 }
4101 NatTraversalRole::Server { .. } => {
4102 self.prepare_coordination_handling();
4104 }
4105 }
4106 }
4107
4108 fn initiate_nat_traversal_process(&mut self) {
4110 if let Some(nat_state) = &mut self.nat_traversal {
4111 match nat_state.start_candidate_discovery() {
4112 Ok(()) => {
4113 debug!("NAT traversal process initiated - candidate discovery started");
4114 self.timers.set(Timer::NatTraversal, Instant::now() + Duration::from_millis(100));
4116 }
4117 Err(e) => {
4118 warn!("Failed to initiate NAT traversal process: {}", e);
4119 }
4120 }
4121 }
4122 }
4123
4124 fn prepare_address_observation(&mut self) {
4126 debug!("Preparing for address observation as bootstrap node");
4127 }
4130
4131 fn schedule_candidate_discovery(&mut self) {
4133 debug!("Scheduling candidate discovery for client endpoint");
4134 self.timers.set(Timer::NatTraversal, Instant::now() + Duration::from_millis(50));
4136 }
4137
4138 fn prepare_coordination_handling(&mut self) {
4140 debug!("Preparing to handle coordination requests as server endpoint");
4141 }
4144
4145 fn handle_nat_traversal_timeout(&mut self, now: Instant) {
4147 let timeout_result = if let Some(nat_state) = &mut self.nat_traversal {
4149 nat_state.handle_timeout(now)
4150 } else {
4151 return;
4152 };
4153
4154 match timeout_result {
4156 Ok(actions) => {
4157 for action in actions {
4158 match action {
4159 nat_traversal::TimeoutAction::RetryDiscovery => {
4160 debug!("NAT traversal timeout: retrying candidate discovery");
4161 if let Some(nat_state) = &mut self.nat_traversal {
4162 if let Err(e) = nat_state.start_candidate_discovery() {
4163 warn!("Failed to retry candidate discovery: {}", e);
4164 }
4165 }
4166 }
4167 nat_traversal::TimeoutAction::RetryCoordination => {
4168 debug!("NAT traversal timeout: retrying coordination");
4169 self.timers.set(Timer::NatTraversal, now + Duration::from_secs(2));
4171 }
4172 nat_traversal::TimeoutAction::StartValidation => {
4173 debug!("NAT traversal timeout: starting path validation");
4174 self.start_nat_traversal_validation(now);
4175 }
4176 nat_traversal::TimeoutAction::Complete => {
4177 debug!("NAT traversal completed successfully");
4178 self.timers.stop(Timer::NatTraversal);
4180 }
4181 nat_traversal::TimeoutAction::Failed => {
4182 warn!("NAT traversal failed after timeout");
4183 self.handle_nat_traversal_failure();
4185 }
4186 }
4187 }
4188 }
4189 Err(e) => {
4190 warn!("NAT traversal timeout handling failed: {}", e);
4191 self.handle_nat_traversal_failure();
4192 }
4193 }
4194 }
4195
4196 fn start_nat_traversal_validation(&mut self, now: Instant) {
4198 if let Some(nat_state) = &mut self.nat_traversal {
4199 let pairs = nat_state.get_next_validation_pairs(3);
4201
4202 for pair in pairs {
4203 let challenge = self.rng.gen();
4205 self.path.challenge = Some(challenge);
4206 self.path.challenge_pending = true;
4207
4208 debug!("Starting path validation for NAT traversal candidate: {}", pair.remote_addr);
4209 }
4210
4211 self.timers.set(Timer::PathValidation, now + Duration::from_secs(3));
4213 }
4214 }
4215
4216 fn handle_nat_traversal_failure(&mut self) {
4218 warn!("NAT traversal failed, considering fallback options");
4219
4220 self.nat_traversal = None;
4222 self.timers.stop(Timer::NatTraversal);
4223
4224 debug!("NAT traversal disabled for this connection due to failure");
4231 }
4232
4233 pub fn nat_traversal_supported(&self) -> bool {
4235 self.nat_traversal.is_some() &&
4236 self.config.nat_traversal_config.is_some() &&
4237 self.peer_params.nat_traversal.is_some()
4238 }
4239
4240 pub fn nat_traversal_config(&self) -> Option<&crate::transport_parameters::NatTraversalConfig> {
4242 self.peer_params.nat_traversal.as_ref()
4243 }
4244
4245 pub fn nat_traversal_ready(&self) -> bool {
4247 self.nat_traversal_supported() &&
4248 matches!(self.state, State::Established)
4249 }
4250
4251 #[allow(dead_code)]
4256 pub(crate) fn nat_traversal_stats(&self) -> Option<nat_traversal::NatTraversalStats> {
4257 self.nat_traversal.as_ref().map(|state| state.stats.clone())
4258 }
4259
4260 #[cfg(test)]
4262 pub(crate) fn force_enable_nat_traversal(&mut self, role: NatTraversalRole) {
4263 use crate::transport_parameters::{NatTraversalConfig, NatTraversalRole as TPRole};
4264
4265 let tp_role = match role {
4266 NatTraversalRole::Client => TPRole::Client,
4267 NatTraversalRole::Server { can_relay } => TPRole::Server { can_relay },
4268 NatTraversalRole::Bootstrap => TPRole::Bootstrap,
4269 };
4270
4271 let config = NatTraversalConfig {
4272 role: tp_role,
4273 max_candidates: VarInt::from_u32(8),
4274 coordination_timeout: VarInt::from_u32(10000),
4275 max_concurrent_attempts: VarInt::from_u32(3),
4276 peer_id: None,
4277 };
4278
4279 self.peer_params.nat_traversal = Some(config.clone());
4280 self.config = Arc::new({
4281 let mut transport_config = (*self.config).clone();
4282 transport_config.nat_traversal_config = Some(config);
4283 transport_config
4284 });
4285
4286 self.nat_traversal = Some(NatTraversalState::new(
4287 role,
4288 8,
4289 Duration::from_secs(10),
4290 ));
4291 }
4292
4293
4294
4295
4296
4297 fn derive_peer_id_from_connection(&self) -> [u8; 32] {
4300 let mut hasher = std::collections::hash_map::DefaultHasher::new();
4302 use std::hash::Hasher;
4303 hasher.write(&self.rem_handshake_cid);
4304 hasher.write(&self.handshake_cid);
4305 hasher.write(&self.path.remote.to_string().into_bytes());
4306 let hash = hasher.finish();
4307 let mut peer_id = [0u8; 32];
4308 peer_id[..8].copy_from_slice(&hash.to_be_bytes());
4309 let cid_bytes = self.rem_handshake_cid.as_ref();
4311 let copy_len = (cid_bytes.len()).min(24);
4312 peer_id[8..8+copy_len].copy_from_slice(&cid_bytes[..copy_len]);
4313 peer_id
4314 }
4315
4316 fn handle_add_address(
4318 &mut self,
4319 add_address: &crate::frame::AddAddress,
4320 now: Instant,
4321 ) -> Result<(), TransportError> {
4322 let nat_state = self.nat_traversal.as_mut()
4323 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("AddAddress frame without NAT traversal negotiation"))?;
4324
4325 match nat_state.add_remote_candidate(
4326 add_address.sequence,
4327 add_address.address,
4328 add_address.priority,
4329 now,
4330 ) {
4331 Ok(()) => {
4332 trace!("Added remote candidate: {} (seq={}, priority={})",
4333 add_address.address, add_address.sequence, add_address.priority);
4334
4335 self.trigger_candidate_validation(add_address.address, now)?;
4337 Ok(())
4338 }
4339 Err(NatTraversalError::TooManyCandidates) => {
4340 Err(TransportError::PROTOCOL_VIOLATION("too many NAT traversal candidates"))
4341 }
4342 Err(NatTraversalError::DuplicateAddress) => {
4343 Ok(())
4345 }
4346 Err(e) => {
4347 warn!("Failed to add remote candidate: {}", e);
4348 Ok(()) }
4350 }
4351 }
4352
4353 fn handle_punch_me_now(
4355 &mut self,
4356 punch_me_now: &crate::frame::PunchMeNow,
4357 now: Instant,
4358 ) -> Result<(), TransportError> {
4359 trace!("Received PunchMeNow: round={}, target_seq={}, local_addr={}",
4360 punch_me_now.round, punch_me_now.target_sequence, punch_me_now.local_address);
4361
4362 if let Some(nat_state) = &self.nat_traversal {
4364 if matches!(nat_state.role, NatTraversalRole::Bootstrap) {
4365 let from_peer_id = self.derive_peer_id_from_connection();
4367
4368 let punch_me_now_clone = punch_me_now.clone();
4370 drop(nat_state); match self.nat_traversal.as_mut().unwrap().handle_punch_me_now_frame(from_peer_id, self.path.remote, &punch_me_now_clone, now) {
4373 Ok(Some(coordination_frame)) => {
4374 trace!("Bootstrap node coordinating PUNCH_ME_NOW between peers");
4375
4376 if let Some(target_peer_id) = punch_me_now.target_peer_id {
4378 self.endpoint_events.push_back(
4379 crate::shared::EndpointEventInner::RelayPunchMeNow(target_peer_id, coordination_frame)
4380 );
4381 }
4382
4383 return Ok(());
4384 }
4385 Ok(None) => {
4386 trace!("Bootstrap coordination completed or no action needed");
4387 return Ok(());
4388 }
4389 Err(e) => {
4390 warn!("Bootstrap coordination failed: {}", e);
4391 return Ok(());
4392 }
4393 }
4394 }
4395 }
4396
4397 let nat_state = self.nat_traversal.as_mut()
4399 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation"))?;
4400
4401 if nat_state.handle_peer_punch_request(punch_me_now.round, now)
4403 .map_err(|_e| TransportError::PROTOCOL_VIOLATION("Failed to handle peer punch request"))? {
4404 trace!("Coordination synchronized for round {}", punch_me_now.round);
4405
4406 let _local_addr = self.local_ip
4409 .map(|ip| SocketAddr::new(ip, 0))
4410 .unwrap_or_else(|| SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0));
4411
4412 let target = nat_traversal::PunchTarget {
4413 remote_addr: punch_me_now.local_address,
4414 remote_sequence: punch_me_now.target_sequence,
4415 challenge: self.rng.gen(),
4416 };
4417
4418 nat_state.start_coordination_round(vec![target], now);
4420
4421 } else {
4422 debug!("Failed to synchronize coordination for round {}", punch_me_now.round);
4423 }
4424
4425 Ok(())
4426 }
4427
4428 fn handle_remove_address(
4430 &mut self,
4431 remove_address: &crate::frame::RemoveAddress,
4432 ) -> Result<(), TransportError> {
4433 let nat_state = self.nat_traversal.as_mut()
4434 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("RemoveAddress frame without NAT traversal negotiation"))?;
4435
4436 if nat_state.remove_candidate(remove_address.sequence) {
4437 trace!("Removed candidate with sequence {}", remove_address.sequence);
4438 } else {
4439 trace!("Attempted to remove unknown candidate sequence {}", remove_address.sequence);
4440 }
4441
4442 Ok(())
4443 }
4444
4445 pub fn queue_add_address(&mut self, sequence: VarInt, address: SocketAddr, priority: VarInt) {
4447 let add_address = frame::AddAddress {
4449 sequence,
4450 address,
4451 priority,
4452 };
4453
4454 self.spaces[SpaceId::Data].pending.add_addresses.push(add_address);
4455 trace!("Queued AddAddress frame: seq={}, addr={}, priority={}", sequence, address, priority);
4456 }
4457
4458 pub fn queue_punch_me_now(&mut self, round: VarInt, target_sequence: VarInt, local_address: SocketAddr) {
4460 let punch_me_now = frame::PunchMeNow {
4461 round,
4462 target_sequence,
4463 local_address,
4464 target_peer_id: None, };
4466
4467 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch_me_now);
4468 trace!("Queued PunchMeNow frame: round={}, target={}", round, target_sequence);
4469 }
4470
4471 pub fn queue_remove_address(&mut self, sequence: VarInt) {
4473 let remove_address = frame::RemoveAddress { sequence };
4474
4475 self.spaces[SpaceId::Data].pending.remove_addresses.push(remove_address);
4476 trace!("Queued RemoveAddress frame: seq={}", sequence);
4477 }
4478
4479 fn trigger_candidate_validation(
4481 &mut self,
4482 candidate_address: SocketAddr,
4483 now: Instant,
4484 ) -> Result<(), TransportError> {
4485 let nat_state = self.nat_traversal.as_mut()
4486 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
4487
4488 if nat_state.active_validations.contains_key(&candidate_address) {
4490 trace!("Validation already in progress for {}", candidate_address);
4491 return Ok(());
4492 }
4493
4494 let challenge = self.rng.gen::<u64>();
4496
4497 let validation_state = nat_traversal::PathValidationState {
4499 challenge,
4500 sent_at: now,
4501 retry_count: 0,
4502 max_retries: 3,
4503 coordination_round: None,
4504 timeout_state: nat_traversal::AdaptiveTimeoutState::new(),
4505 last_retry_at: None,
4506 };
4507
4508 nat_state.active_validations.insert(candidate_address, validation_state);
4510
4511 self.nat_traversal_challenges.push(candidate_address, challenge);
4513
4514 nat_state.stats.validations_succeeded += 1; trace!("Triggered PATH_CHALLENGE validation for {} with challenge {:016x}",
4518 candidate_address, challenge);
4519
4520 Ok(())
4521 }
4522
4523
4524
4525
4526 pub fn nat_traversal_state(&self) -> Option<(NatTraversalRole, usize, usize)> {
4528 self.nat_traversal.as_ref().map(|state| {
4529 (
4530 state.role,
4531 state.local_candidates.len(),
4532 state.remote_candidates.len(),
4533 )
4534 })
4535 }
4536
4537 pub fn initiate_nat_traversal_coordination(&mut self, now: Instant) -> Result<(), TransportError> {
4539 let nat_state = self.nat_traversal.as_mut()
4540 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
4541
4542 if nat_state.should_send_punch_request() {
4544 nat_state.generate_candidate_pairs(now);
4546
4547 let pairs = nat_state.get_next_validation_pairs(3);
4549 if pairs.is_empty() {
4550 return Err(TransportError::PROTOCOL_VIOLATION("No candidate pairs for coordination"));
4551 }
4552
4553 let targets: Vec<_> = pairs.into_iter()
4555 .map(|pair| nat_traversal::PunchTarget {
4556 remote_addr: pair.remote_addr,
4557 remote_sequence: pair.remote_sequence,
4558 challenge: self.rng.gen(),
4559 })
4560 .collect();
4561
4562 let round = nat_state.start_coordination_round(targets, now)
4564 .map_err(|_e| TransportError::PROTOCOL_VIOLATION("Failed to start coordination round"))?;
4565
4566 let local_addr = self.local_ip
4569 .map(|ip| SocketAddr::new(ip, self.local_ip.map(|_| 0).unwrap_or(0)))
4570 .unwrap_or_else(|| SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0));
4571
4572 let punch_me_now = frame::PunchMeNow {
4573 round,
4574 target_sequence: VarInt::from_u32(0), local_address: local_addr,
4576 target_peer_id: None, };
4578
4579 self.spaces[SpaceId::Data].pending.punch_me_now.push(punch_me_now);
4580 nat_state.mark_punch_request_sent();
4581
4582 trace!("Initiated NAT traversal coordination round {}", round);
4583 }
4584
4585 Ok(())
4586 }
4587
4588 pub fn validate_nat_candidates(&mut self, now: Instant) {
4590 self.generate_nat_traversal_challenges(now);
4591 }
4592
4593 pub fn send_nat_address_advertisement(
4608 &mut self,
4609 address: SocketAddr,
4610 priority: u32
4611 ) -> Result<u64, ConnectionError> {
4612 let nat_state = self.nat_traversal.as_mut()
4614 .ok_or_else(|| ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled on this connection")))?;
4615
4616 let sequence = nat_state.next_sequence;
4618 nat_state.next_sequence = VarInt::from_u64(nat_state.next_sequence.into_inner() + 1).unwrap();
4619
4620 let now = Instant::now();
4622 nat_state.local_candidates.insert(
4623 sequence,
4624 nat_traversal::AddressCandidate {
4625 address,
4626 priority,
4627 source: nat_traversal::CandidateSource::Local,
4628 discovered_at: now,
4629 state: nat_traversal::CandidateState::New,
4630 attempt_count: 0,
4631 last_attempt: None,
4632 }
4633 );
4634
4635 nat_state.stats.local_candidates_sent += 1;
4637
4638 self.queue_add_address(sequence, address, VarInt::from_u32(priority));
4640
4641 debug!("Queued ADD_ADDRESS frame: addr={}, priority={}, seq={}", address, priority, sequence);
4642 Ok(sequence.into_inner())
4643 }
4644
4645 pub fn send_nat_punch_coordination(
4658 &mut self,
4659 target_sequence: u64,
4660 local_address: SocketAddr,
4661 round: u32,
4662 ) -> Result<(), ConnectionError> {
4663 let _nat_state = self.nat_traversal.as_ref()
4665 .ok_or_else(|| ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled on this connection")))?;
4666
4667 self.queue_punch_me_now(
4669 VarInt::from_u32(round),
4670 VarInt::from_u64(target_sequence).map_err(|_| {
4671 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION("Invalid target sequence number"))
4672 })?,
4673 local_address,
4674 );
4675
4676 debug!("Queued PUNCH_ME_NOW frame: target_seq={}, local_addr={}, round={}",
4677 target_sequence, local_address, round);
4678 Ok(())
4679 }
4680
4681 pub fn send_nat_address_removal(
4692 &mut self,
4693 sequence: u64,
4694 ) -> Result<(), ConnectionError> {
4695 let nat_state = self.nat_traversal.as_mut()
4697 .ok_or_else(|| ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled on this connection")))?;
4698
4699 let sequence_varint = VarInt::from_u64(sequence).map_err(|_| {
4700 ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION("Invalid sequence number"))
4701 })?;
4702
4703 nat_state.local_candidates.remove(&sequence_varint);
4705
4706 self.queue_remove_address(sequence_varint);
4708
4709 debug!("Queued REMOVE_ADDRESS frame: seq={}", sequence);
4710 Ok(())
4711 }
4712
4713 #[allow(dead_code)]
4722 pub(crate) fn get_nat_traversal_stats(&self) -> Option<&nat_traversal::NatTraversalStats> {
4723 self.nat_traversal.as_ref().map(|state| &state.stats)
4724 }
4725
4726 pub fn is_nat_traversal_enabled(&self) -> bool {
4728 self.nat_traversal.is_some()
4729 }
4730
4731 pub fn get_nat_traversal_role(&self) -> Option<NatTraversalRole> {
4733 self.nat_traversal.as_ref().map(|state| state.role)
4734 }
4735
4736
4737 fn decrypt_packet(
4738 &mut self,
4739 now: Instant,
4740 packet: &mut Packet,
4741 ) -> Result<Option<u64>, Option<TransportError>> {
4742 let result = packet_crypto::decrypt_packet_body(
4743 packet,
4744 &self.spaces,
4745 self.zero_rtt_crypto.as_ref(),
4746 self.key_phase,
4747 self.prev_crypto.as_ref(),
4748 self.next_crypto.as_ref(),
4749 )?;
4750
4751 let result = match result {
4752 Some(r) => r,
4753 None => return Ok(None),
4754 };
4755
4756 if result.outgoing_key_update_acked {
4757 if let Some(prev) = self.prev_crypto.as_mut() {
4758 prev.end_packet = Some((result.number, now));
4759 self.set_key_discard_timer(now, packet.header.space());
4760 }
4761 }
4762
4763 if result.incoming_key_update {
4764 trace!("key update authenticated");
4765 self.update_keys(Some((result.number, now)), true);
4766 self.set_key_discard_timer(now, packet.header.space());
4767 }
4768
4769 Ok(Some(result.number))
4770 }
4771
4772 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
4773 trace!("executing key update");
4774 let new = self
4778 .crypto
4779 .next_1rtt_keys()
4780 .expect("only called for `Data` packets");
4781 self.key_phase_size = new
4782 .local
4783 .confidentiality_limit()
4784 .saturating_sub(KEY_UPDATE_MARGIN);
4785 let old = mem::replace(
4786 &mut self.spaces[SpaceId::Data]
4787 .crypto
4788 .as_mut()
4789 .unwrap() .packet,
4791 mem::replace(self.next_crypto.as_mut().unwrap(), new),
4792 );
4793 self.spaces[SpaceId::Data].sent_with_keys = 0;
4794 self.prev_crypto = Some(PrevCrypto {
4795 crypto: old,
4796 end_packet,
4797 update_unacked: remote,
4798 });
4799 self.key_phase = !self.key_phase;
4800 }
4801
4802 fn peer_supports_ack_frequency(&self) -> bool {
4803 self.peer_params.min_ack_delay.is_some()
4804 }
4805
4806 pub(crate) fn immediate_ack(&mut self) {
4811 self.spaces[self.highest_space].immediate_ack_pending = true;
4812 }
4813
4814 #[cfg(test)]
4816 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
4817 let (first_decode, remaining) = match &event.0 {
4818 ConnectionEventInner::Datagram(DatagramConnectionEvent {
4819 first_decode,
4820 remaining,
4821 ..
4822 }) => (first_decode, remaining),
4823 _ => return None,
4824 };
4825
4826 if remaining.is_some() {
4827 panic!("Packets should never be coalesced in tests");
4828 }
4829
4830 let decrypted_header = packet_crypto::unprotect_header(
4831 first_decode.clone(),
4832 &self.spaces,
4833 self.zero_rtt_crypto.as_ref(),
4834 self.peer_params.stateless_reset_token,
4835 )?;
4836
4837 let mut packet = decrypted_header.packet?;
4838 packet_crypto::decrypt_packet_body(
4839 &mut packet,
4840 &self.spaces,
4841 self.zero_rtt_crypto.as_ref(),
4842 self.key_phase,
4843 self.prev_crypto.as_ref(),
4844 self.next_crypto.as_ref(),
4845 )
4846 .ok()?;
4847
4848 Some(packet.payload.to_vec())
4849 }
4850
4851 #[cfg(test)]
4854 pub(crate) fn bytes_in_flight(&self) -> u64 {
4855 self.path.in_flight.bytes
4856 }
4857
4858 #[cfg(test)]
4860 pub(crate) fn congestion_window(&self) -> u64 {
4861 self.path
4862 .congestion
4863 .window()
4864 .saturating_sub(self.path.in_flight.bytes)
4865 }
4866
4867 #[cfg(test)]
4869 pub(crate) fn is_idle(&self) -> bool {
4870 Timer::VALUES
4871 .iter()
4872 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
4873 .filter_map(|&t| Some((t, self.timers.get(t)?)))
4874 .min_by_key(|&(_, time)| time)
4875 .map_or(true, |(timer, _)| timer == Timer::Idle)
4876 }
4877
4878 #[cfg(test)]
4880 pub(crate) fn lost_packets(&self) -> u64 {
4881 self.lost_packets
4882 }
4883
4884 #[cfg(test)]
4886 pub(crate) fn using_ecn(&self) -> bool {
4887 self.path.sending_ecn
4888 }
4889
4890 #[cfg(test)]
4892 pub(crate) fn total_recvd(&self) -> u64 {
4893 self.path.total_recvd
4894 }
4895
4896 #[cfg(test)]
4897 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
4898 self.local_cid_state.active_seq()
4899 }
4900
4901 #[cfg(test)]
4904 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
4905 let n = self.local_cid_state.assign_retire_seq(v);
4906 self.endpoint_events
4907 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
4908 }
4909
4910 #[cfg(test)]
4912 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
4913 self.rem_cids.active_seq()
4914 }
4915
4916 #[cfg(test)]
4918 pub(crate) fn path_mtu(&self) -> u16 {
4919 self.path.current_mtu()
4920 }
4921
4922 fn can_send_1rtt(&self, max_size: usize) -> bool {
4926 self.streams.can_send_stream_data()
4927 || self.path.challenge_pending
4928 || self
4929 .prev_path
4930 .as_ref()
4931 .is_some_and(|(_, x)| x.challenge_pending)
4932 || !self.path_responses.is_empty()
4933 || !self.nat_traversal_challenges.is_empty()
4934 || self
4935 .datagrams
4936 .outgoing
4937 .front()
4938 .is_some_and(|x| x.size(true) <= max_size)
4939 }
4940
4941 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
4943 for path in [&mut self.path]
4945 .into_iter()
4946 .chain(self.prev_path.as_mut().map(|(_, data)| data))
4947 {
4948 if path.remove_in_flight(pn, packet) {
4949 return;
4950 }
4951 }
4952 }
4953
4954 fn kill(&mut self, reason: ConnectionError) {
4956 self.close_common();
4957 self.error = Some(reason);
4958 self.state = State::Drained;
4959 self.endpoint_events.push_back(EndpointEventInner::Drained);
4960 }
4961
4962 fn generate_nat_traversal_challenges(&mut self, now: Instant) {
4964 let candidates: Vec<(VarInt, SocketAddr)> = if let Some(nat_state) = &self.nat_traversal {
4966 nat_state.get_validation_candidates()
4967 .into_iter()
4968 .take(3) .map(|(seq, candidate)| (seq, candidate.address))
4970 .collect()
4971 } else {
4972 return;
4973 };
4974
4975 if candidates.is_empty() {
4976 return;
4977 }
4978
4979 if let Some(nat_state) = &mut self.nat_traversal {
4981 for (seq, address) in candidates {
4982 let challenge: u64 = self.rng.gen();
4984
4985 if let Err(e) = nat_state.start_validation(seq, challenge, now) {
4987 debug!("Failed to start validation for candidate {}: {}", seq, e);
4988 continue;
4989 }
4990
4991 self.nat_traversal_challenges.push(address, challenge);
4993 trace!("Queuing NAT validation PATH_CHALLENGE for {} with token {:08x}",
4994 address, challenge);
4995 }
4996 }
4997 }
4998
4999 pub fn current_mtu(&self) -> u16 {
5003 self.path.current_mtu()
5004 }
5005
5006 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
5013 let pn_len = match pn {
5014 Some(pn) => PacketNumber::new(
5015 pn,
5016 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
5017 )
5018 .len(),
5019 None => 4,
5021 };
5022
5023 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
5025 }
5026
5027 fn tag_len_1rtt(&self) -> usize {
5028 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
5029 Some(crypto) => Some(&*crypto.packet.local),
5030 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
5031 };
5032 key.map_or(16, |x| x.tag_len())
5036 }
5037
5038 fn on_path_validated(&mut self) {
5040 self.path.validated = true;
5041 let ConnectionSide::Server { server_config } = &self.side else {
5042 return;
5043 };
5044 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
5045 new_tokens.clear();
5046 for _ in 0..server_config.validation_token.sent {
5047 new_tokens.push(self.path.remote);
5048 }
5049 }
5050}
5051
5052impl fmt::Debug for Connection {
5053 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5054 f.debug_struct("Connection")
5055 .field("handshake_cid", &self.handshake_cid)
5056 .finish()
5057 }
5058}
5059
5060enum ConnectionSide {
5062 Client {
5063 token: Bytes,
5065 token_store: Arc<dyn TokenStore>,
5066 server_name: String,
5067 },
5068 Server {
5069 server_config: Arc<ServerConfig>,
5070 },
5071}
5072
5073impl ConnectionSide {
5074 fn remote_may_migrate(&self) -> bool {
5075 match self {
5076 Self::Server { server_config } => server_config.migration,
5077 Self::Client { .. } => false,
5078 }
5079 }
5080
5081 fn is_client(&self) -> bool {
5082 self.side().is_client()
5083 }
5084
5085 fn is_server(&self) -> bool {
5086 self.side().is_server()
5087 }
5088
5089 fn side(&self) -> Side {
5090 match *self {
5091 Self::Client { .. } => Side::Client,
5092 Self::Server { .. } => Side::Server,
5093 }
5094 }
5095}
5096
5097impl From<SideArgs> for ConnectionSide {
5098 fn from(side: SideArgs) -> Self {
5099 match side {
5100 SideArgs::Client {
5101 token_store,
5102 server_name,
5103 } => Self::Client {
5104 token: token_store.take(&server_name).unwrap_or_default(),
5105 token_store,
5106 server_name,
5107 },
5108 SideArgs::Server {
5109 server_config,
5110 pref_addr_cid: _,
5111 path_validated: _,
5112 } => Self::Server { server_config },
5113 }
5114 }
5115}
5116
5117pub(crate) enum SideArgs {
5119 Client {
5120 token_store: Arc<dyn TokenStore>,
5121 server_name: String,
5122 },
5123 Server {
5124 server_config: Arc<ServerConfig>,
5125 pref_addr_cid: Option<ConnectionId>,
5126 path_validated: bool,
5127 },
5128}
5129
5130impl SideArgs {
5131 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
5132 match *self {
5133 Self::Client { .. } => None,
5134 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
5135 }
5136 }
5137
5138 pub(crate) fn path_validated(&self) -> bool {
5139 match *self {
5140 Self::Client { .. } => true,
5141 Self::Server { path_validated, .. } => path_validated,
5142 }
5143 }
5144
5145 pub(crate) fn side(&self) -> Side {
5146 match *self {
5147 Self::Client { .. } => Side::Client,
5148 Self::Server { .. } => Side::Server,
5149 }
5150 }
5151}
5152
5153#[derive(Debug, Error, Clone, PartialEq, Eq)]
5155pub enum ConnectionError {
5156 #[error("peer doesn't implement any supported version")]
5158 VersionMismatch,
5159 #[error(transparent)]
5161 TransportError(#[from] TransportError),
5162 #[error("aborted by peer: {0}")]
5164 ConnectionClosed(frame::ConnectionClose),
5165 #[error("closed by peer: {0}")]
5167 ApplicationClosed(frame::ApplicationClose),
5168 #[error("reset by peer")]
5170 Reset,
5171 #[error("timed out")]
5177 TimedOut,
5178 #[error("closed")]
5180 LocallyClosed,
5181 #[error("CIDs exhausted")]
5185 CidsExhausted,
5186}
5187
5188impl From<Close> for ConnectionError {
5189 fn from(x: Close) -> Self {
5190 match x {
5191 Close::Connection(reason) => Self::ConnectionClosed(reason),
5192 Close::Application(reason) => Self::ApplicationClosed(reason),
5193 }
5194 }
5195}
5196
5197impl From<ConnectionError> for io::Error {
5199 fn from(x: ConnectionError) -> Self {
5200 use ConnectionError::*;
5201 let kind = match x {
5202 TimedOut => io::ErrorKind::TimedOut,
5203 Reset => io::ErrorKind::ConnectionReset,
5204 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
5205 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
5206 io::ErrorKind::Other
5207 }
5208 };
5209 Self::new(kind, x)
5210 }
5211}
5212
5213#[allow(unreachable_pub)] #[derive(Clone)]
5215pub enum State {
5216 Handshake(state::Handshake),
5217 Established,
5218 Closed(state::Closed),
5219 Draining,
5220 Drained,
5222}
5223
5224impl State {
5225 fn closed<R: Into<Close>>(reason: R) -> Self {
5226 Self::Closed(state::Closed {
5227 reason: reason.into(),
5228 })
5229 }
5230
5231 fn is_handshake(&self) -> bool {
5232 matches!(*self, Self::Handshake(_))
5233 }
5234
5235 fn is_established(&self) -> bool {
5236 matches!(*self, Self::Established)
5237 }
5238
5239 fn is_closed(&self) -> bool {
5240 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
5241 }
5242
5243 fn is_drained(&self) -> bool {
5244 matches!(*self, Self::Drained)
5245 }
5246}
5247
5248mod state {
5249 use super::*;
5250
5251 #[allow(unreachable_pub)] #[derive(Clone)]
5253 pub struct Handshake {
5254 pub(super) rem_cid_set: bool,
5258 pub(super) expected_token: Bytes,
5262 pub(super) client_hello: Option<Bytes>,
5266 }
5267
5268 #[allow(unreachable_pub)] #[derive(Clone)]
5270 pub struct Closed {
5271 pub(super) reason: Close,
5272 }
5273}
5274
5275#[derive(Debug)]
5277pub enum Event {
5278 HandshakeDataReady,
5280 Connected,
5282 ConnectionLost {
5286 reason: ConnectionError,
5288 },
5289 Stream(StreamEvent),
5291 DatagramReceived,
5293 DatagramsUnblocked,
5295}
5296
5297fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
5298 if x > y { x - y } else { Duration::ZERO }
5299}
5300
5301fn get_max_ack_delay(params: &TransportParameters) -> Duration {
5302 Duration::from_micros(params.max_ack_delay.0 * 1000)
5303}
5304
5305const MAX_BACKOFF_EXPONENT: u32 = 16;
5307
5308const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
5316
5317const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
5323 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
5324
5325const KEY_UPDATE_MARGIN: u64 = 10_000;
5329
5330#[derive(Default)]
5331struct SentFrames {
5332 retransmits: ThinRetransmits,
5333 largest_acked: Option<u64>,
5334 stream_frames: StreamMetaVec,
5335 non_retransmits: bool,
5337 requires_padding: bool,
5338}
5339
5340impl SentFrames {
5341 fn is_ack_only(&self, streams: &StreamsState) -> bool {
5343 self.largest_acked.is_some()
5344 && !self.non_retransmits
5345 && self.stream_frames.is_empty()
5346 && self.retransmits.is_empty(streams)
5347 }
5348}
5349
5350fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
5358 match (x, y) {
5359 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
5360 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
5361 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
5362 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
5363 }
5364}
5365
5366#[cfg(test)]
5367mod tests {
5368 use super::*;
5369
5370 #[test]
5371 fn negotiate_max_idle_timeout_commutative() {
5372 let test_params = [
5373 (None, None, None),
5374 (None, Some(VarInt(0)), None),
5375 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
5376 (Some(VarInt(0)), Some(VarInt(0)), None),
5377 (
5378 Some(VarInt(2)),
5379 Some(VarInt(0)),
5380 Some(Duration::from_millis(2)),
5381 ),
5382 (
5383 Some(VarInt(1)),
5384 Some(VarInt(4)),
5385 Some(Duration::from_millis(1)),
5386 ),
5387 ];
5388
5389 for (left, right, result) in test_params {
5390 assert_eq!(negotiate_max_idle_timeout(left, right), result);
5391 assert_eq!(negotiate_max_idle_timeout(right, left), result);
5392 }
5393 }
5394}