1use std::{
2 cmp,
3 collections::VecDeque,
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12#[cfg(feature = "__qlog")]
13use qlog::{
14 Configuration, QLOG_VERSION, TraceSeq, VantagePoint, VantagePointType, events::EventData,
15 events::EventImportance, streamer::QlogStreamer,
16};
17
18use rand::{Rng, SeedableRng, rngs::StdRng};
19use thiserror::Error;
20use tracing::{debug, error, trace, trace_span, warn};
21
22use crate::{
23 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
24 MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
25 TransportErrorCode, VarInt,
26 cid_generator::ConnectionIdGenerator,
27 cid_queue::CidQueue,
28 coding::BufMutExt,
29 config::{ServerConfig, TransportConfig},
30 crypto::{self, KeyPair, Keys, PacketKey},
31 frame::{self, Close, Datagram, FrameStruct, NewToken},
32 packet::{
33 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
34 PacketNumber, PartialDecode, SpaceId,
35 },
36 range_set::ArrayRangeSet,
37 shared::{
38 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
39 EndpointEvent, EndpointEventInner,
40 },
41 token::{ResetToken, Token, TokenPayload},
42 transport_parameters::TransportParameters,
43};
44
45mod ack_frequency;
46use ack_frequency::AckFrequencyState;
47
48pub mod nat_traversal;
49pub use nat_traversal::{NatTraversalRole, NatTraversalError, CoordinationPhase};
50use nat_traversal::NatTraversalState;
51
52mod assembler;
53pub use assembler::Chunk;
54
55mod cid_state;
56use cid_state::CidState;
57
58mod datagrams;
59use datagrams::DatagramState;
60pub use datagrams::{Datagrams, SendDatagramError};
61
62mod mtud;
63mod pacing;
64
65mod packet_builder;
66use packet_builder::PacketBuilder;
67
68mod packet_crypto;
69use packet_crypto::{PrevCrypto, ZeroRttCrypto};
70
71mod paths;
72pub use paths::RttEstimator;
73use paths::{PathData, PathResponses};
74
75mod send_buffer;
76
77mod spaces;
78#[cfg(fuzzing)]
79pub use spaces::Retransmits;
80#[cfg(not(fuzzing))]
81use spaces::Retransmits;
82use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
83
84mod stats;
85pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
86
87mod streams;
88#[cfg(fuzzing)]
89pub use streams::StreamsState;
90#[cfg(not(fuzzing))]
91use streams::StreamsState;
92pub use streams::{
93 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
94 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
95};
96
97mod timer;
98use crate::congestion::Controller;
99use timer::{Timer, TimerTable};
100
101pub struct Connection {
141 endpoint_config: Arc<EndpointConfig>,
142 config: Arc<TransportConfig>,
143 rng: StdRng,
144 crypto: Box<dyn crypto::Session>,
145 handshake_cid: ConnectionId,
147 rem_handshake_cid: ConnectionId,
149 local_ip: Option<IpAddr>,
152 path: PathData,
153 allow_mtud: bool,
155 prev_path: Option<(ConnectionId, PathData)>,
156 state: State,
157 side: ConnectionSide,
158 zero_rtt_enabled: bool,
160 zero_rtt_crypto: Option<ZeroRttCrypto>,
162 key_phase: bool,
163 key_phase_size: u64,
165 peer_params: TransportParameters,
167 orig_rem_cid: ConnectionId,
169 initial_dst_cid: ConnectionId,
171 retry_src_cid: Option<ConnectionId>,
174 lost_packets: u64,
176 events: VecDeque<Event>,
177 endpoint_events: VecDeque<EndpointEventInner>,
178 spin_enabled: bool,
180 spin: bool,
182 spaces: [PacketSpace; 3],
184 highest_space: SpaceId,
186 prev_crypto: Option<PrevCrypto>,
188 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
193 accepted_0rtt: bool,
194 permit_idle_reset: bool,
196 idle_timeout: Option<Duration>,
198 timers: TimerTable,
199 authentication_failures: u64,
201 error: Option<ConnectionError>,
203 packet_number_filter: PacketNumberFilter,
205
206 path_responses: PathResponses,
211 close: bool,
212
213 ack_frequency: AckFrequencyState,
217
218 pto_count: u32,
223
224 receiving_ecn: bool,
229 total_authed_packets: u64,
231 app_limited: bool,
234
235 streams: StreamsState,
236 rem_cids: CidQueue,
238 local_cid_state: CidState,
240 datagrams: DatagramState,
242 stats: ConnectionStats,
244 version: u32,
246
247 nat_traversal: Option<NatTraversalState>,
249
250 #[cfg(feature = "__qlog")]
252 qlog_streamer: Option<QlogStreamer>,
253}
254
255impl Connection {
256 pub(crate) fn new(
257 endpoint_config: Arc<EndpointConfig>,
258 config: Arc<TransportConfig>,
259 init_cid: ConnectionId,
260 loc_cid: ConnectionId,
261 rem_cid: ConnectionId,
262 remote: SocketAddr,
263 local_ip: Option<IpAddr>,
264 crypto: Box<dyn crypto::Session>,
265 cid_gen: &dyn ConnectionIdGenerator,
266 now: Instant,
267 version: u32,
268 allow_mtud: bool,
269 rng_seed: [u8; 32],
270 side_args: SideArgs,
271 ) -> Self {
272 let pref_addr_cid = side_args.pref_addr_cid();
273 let path_validated = side_args.path_validated();
274 let connection_side = ConnectionSide::from(side_args);
275 let side = connection_side.side();
276 let initial_space = PacketSpace {
277 crypto: Some(crypto.initial_keys(&init_cid, side)),
278 ..PacketSpace::new(now)
279 };
280 let state = State::Handshake(state::Handshake {
281 rem_cid_set: side.is_server(),
282 expected_token: Bytes::new(),
283 client_hello: None,
284 });
285 let mut rng = StdRng::from_seed(rng_seed);
286 let mut this = Self {
287 endpoint_config,
288 crypto,
289 handshake_cid: loc_cid,
290 rem_handshake_cid: rem_cid,
291 local_cid_state: CidState::new(
292 cid_gen.cid_len(),
293 cid_gen.cid_lifetime(),
294 now,
295 if pref_addr_cid.is_some() { 2 } else { 1 },
296 ),
297 path: PathData::new(remote, allow_mtud, None, now, &config),
298 allow_mtud,
299 local_ip,
300 prev_path: None,
301 state,
302 side: connection_side,
303 zero_rtt_enabled: false,
304 zero_rtt_crypto: None,
305 key_phase: false,
306 key_phase_size: rng.random_range(10..1000),
313 peer_params: TransportParameters::default(),
314 orig_rem_cid: rem_cid,
315 initial_dst_cid: init_cid,
316 retry_src_cid: None,
317 lost_packets: 0,
318 events: VecDeque::new(),
319 endpoint_events: VecDeque::new(),
320 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
321 spin: false,
322 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
323 highest_space: SpaceId::Initial,
324 prev_crypto: None,
325 next_crypto: None,
326 accepted_0rtt: false,
327 permit_idle_reset: true,
328 idle_timeout: match config.max_idle_timeout {
329 None | Some(VarInt(0)) => None,
330 Some(dur) => Some(Duration::from_millis(dur.0)),
331 },
332 timers: TimerTable::default(),
333 authentication_failures: 0,
334 error: None,
335 #[cfg(test)]
336 packet_number_filter: match config.deterministic_packet_numbers {
337 false => PacketNumberFilter::new(&mut rng),
338 true => PacketNumberFilter::disabled(),
339 },
340 #[cfg(not(test))]
341 packet_number_filter: PacketNumberFilter::new(&mut rng),
342
343 path_responses: PathResponses::default(),
344 close: false,
345
346 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
347 &TransportParameters::default(),
348 )),
349
350 pto_count: 0,
351
352 app_limited: false,
353 receiving_ecn: false,
354 total_authed_packets: 0,
355
356 streams: StreamsState::new(
357 side,
358 config.max_concurrent_uni_streams,
359 config.max_concurrent_bidi_streams,
360 config.send_window,
361 config.receive_window,
362 config.stream_receive_window,
363 ),
364 datagrams: DatagramState::default(),
365 config,
366 rem_cids: CidQueue::new(rem_cid),
367 rng,
368 stats: ConnectionStats::default(),
369 version,
370 nat_traversal: None, #[cfg(feature = "__qlog")]
373 qlog_streamer: None,
374 };
375 if path_validated {
376 this.on_path_validated();
377 }
378 if side.is_client() {
379 this.write_crypto();
381 this.init_0rtt();
382 }
383 this
384 }
385
386 #[cfg(feature = "__qlog")]
388 pub fn set_qlog(
389 &mut self,
390 writer: Box<dyn io::Write + Send + Sync>,
391 title: Option<String>,
392 description: Option<String>,
393 now: Instant,
394 ) {
395 let ty = if self.side.is_server() {
396 VantagePointType::Server
397 } else {
398 VantagePointType::Client
399 };
400
401 let level = EventImportance::Core;
402
403 let trace = TraceSeq::new(
404 VantagePoint {
405 name: None,
406 ty,
407 flow: None,
408 },
409 title.clone(),
410 description.clone(),
411 Some(Configuration {
412 time_offset: Some(0.0),
413 original_uris: None,
414 }),
415 None,
416 );
417
418 let mut streamer = QlogStreamer::new(
419 QLOG_VERSION.to_string(),
420 title,
421 description,
422 None,
423 now,
424 trace,
425 level,
426 writer,
427 );
428
429 if let Err(e) = streamer.start_log() {
430 warn!("could not initialize qlog streamer: {e}");
431 return;
432 }
433
434 self.qlog_streamer = Some(streamer);
435 }
436
437 #[cfg(feature = "__qlog")]
439 fn emit_qlog_recovery_metrics(&mut self, now: Instant) {
440 let Some(qlog_streamer) = &mut self.qlog_streamer else {
441 return;
442 };
443
444 let Some(metrics) = self.path.qlog_congestion_metrics(self.pto_count) else {
445 return;
446 };
447
448 let event = EventData::MetricsUpdated(metrics);
449
450 if let Err(e) = qlog_streamer.add_event_data_with_instant(event, now) {
451 warn!("could not emit qlog event, dropping qlog streamer: {e}");
452 self.qlog_streamer = None;
453 }
454 }
455
456 #[must_use]
464 pub fn poll_timeout(&mut self) -> Option<Instant> {
465 self.timers.next_timeout()
466 }
467
468 #[must_use]
474 pub fn poll(&mut self) -> Option<Event> {
475 if let Some(x) = self.events.pop_front() {
476 return Some(x);
477 }
478
479 if let Some(event) = self.streams.poll() {
480 return Some(Event::Stream(event));
481 }
482
483 if let Some(err) = self.error.take() {
484 return Some(Event::ConnectionLost { reason: err });
485 }
486
487 None
488 }
489
490 #[must_use]
492 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
493 self.endpoint_events.pop_front().map(EndpointEvent)
494 }
495
496 #[must_use]
498 pub fn streams(&mut self) -> Streams<'_> {
499 Streams {
500 state: &mut self.streams,
501 conn_state: &self.state,
502 }
503 }
504
505 #[must_use]
507 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
508 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
509 RecvStream {
510 id,
511 state: &mut self.streams,
512 pending: &mut self.spaces[SpaceId::Data].pending,
513 }
514 }
515
516 #[must_use]
518 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
519 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
520 SendStream {
521 id,
522 state: &mut self.streams,
523 pending: &mut self.spaces[SpaceId::Data].pending,
524 conn_state: &self.state,
525 }
526 }
527
528 #[must_use]
538 pub fn poll_transmit(
539 &mut self,
540 now: Instant,
541 max_datagrams: usize,
542 buf: &mut Vec<u8>,
543 ) -> Option<Transmit> {
544 assert!(max_datagrams != 0);
545 let max_datagrams = match self.config.enable_segmentation_offload {
546 false => 1,
547 true => max_datagrams,
548 };
549
550 let mut num_datagrams = 0;
551 let mut datagram_start = 0;
554 let mut segment_size = usize::from(self.path.current_mtu());
555
556 if let Some(nat_traversal) = &mut self.nat_traversal {
558 if nat_traversal.check_coordination_timeout(now) {
559 trace!("NAT traversal coordination timed out, may retry");
560 }
561 }
562
563 if let Some(challenge) = self.send_nat_traversal_challenge(now, buf) {
565 return Some(challenge);
566 }
567
568 if let Some(challenge) = self.send_path_challenge(now, buf) {
569 return Some(challenge);
570 }
571
572 for space in SpaceId::iter() {
574 let request_immediate_ack =
575 space == SpaceId::Data && self.peer_supports_ack_frequency();
576 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
577 }
578
579 let close = match self.state {
581 State::Drained => {
582 self.app_limited = true;
583 return None;
584 }
585 State::Draining | State::Closed(_) => {
586 if !self.close {
589 self.app_limited = true;
590 return None;
591 }
592 true
593 }
594 _ => false,
595 };
596
597 if let Some(config) = &self.config.ack_frequency_config {
599 self.spaces[SpaceId::Data].pending.ack_frequency = self
600 .ack_frequency
601 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
602 && self.highest_space == SpaceId::Data
603 && self.peer_supports_ack_frequency();
604 }
605
606 let mut buf_capacity = 0;
610
611 let mut coalesce = true;
612 let mut builder_storage: Option<PacketBuilder> = None;
613 let mut sent_frames = None;
614 let mut pad_datagram = false;
615 let mut pad_datagram_to_mtu = false;
616 let mut congestion_blocked = false;
617
618 let mut space_idx = 0;
620 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
621 while space_idx < spaces.len() {
624 let space_id = spaces[space_idx];
625 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
632 let frame_space_1rtt =
633 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
634
635 let can_send = self.space_can_send(space_id, frame_space_1rtt);
637 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
638 space_idx += 1;
639 continue;
640 }
641
642 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
643 || self.spaces[space_id].ping_pending
644 || self.spaces[space_id].immediate_ack_pending;
645 if space_id == SpaceId::Data {
646 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
647 }
648
649 pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
650
651 let buf_end = if let Some(builder) = &builder_storage {
655 buf.len().max(builder.min_size) + builder.tag_len
656 } else {
657 buf.len()
658 };
659
660 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
661 crypto.packet.local.tag_len()
662 } else if space_id == SpaceId::Data {
663 self.zero_rtt_crypto.as_ref().expect(
664 "sending packets in the application data space requires known 0-RTT or 1-RTT keys",
665 ).packet.tag_len()
666 } else {
667 unreachable!("tried to send {:?} packet without keys", space_id)
668 };
669 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
670 if num_datagrams >= max_datagrams {
674 break;
676 }
677
678 if self
685 .path
686 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
687 {
688 trace!("blocked by anti-amplification");
689 break;
690 }
691
692 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
695 let untracked_bytes = if let Some(builder) = &builder_storage {
697 buf_capacity - builder.partial_encode.start
698 } else {
699 0
700 } as u64;
701 debug_assert!(untracked_bytes <= segment_size as u64);
702
703 let bytes_to_send = segment_size as u64 + untracked_bytes;
704 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
705 space_idx += 1;
706 congestion_blocked = true;
707 trace!("blocked by congestion control");
710 continue;
711 }
712
713 let smoothed_rtt = self.path.rtt.get();
715 if let Some(delay) = self.path.pacing.delay(
716 smoothed_rtt,
717 bytes_to_send,
718 self.path.current_mtu(),
719 self.path.congestion.window(),
720 now,
721 ) {
722 self.timers.set(Timer::Pacing, delay);
723 congestion_blocked = true;
724 trace!("blocked by pacing");
727 break;
728 }
729 }
730
731 if let Some(mut builder) = builder_storage.take() {
733 if pad_datagram {
734 builder.pad_to(MIN_INITIAL_SIZE);
735 }
736
737 if num_datagrams > 1 || pad_datagram_to_mtu {
738 const MAX_PADDING: usize = 16;
751 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
752 - datagram_start
753 + builder.tag_len;
754 if (packet_len_unpadded + MAX_PADDING < segment_size
755 && !pad_datagram_to_mtu)
756 || datagram_start + segment_size > buf_capacity
757 {
758 trace!(
759 "GSO truncated by demand for {} padding bytes or loss probe",
760 segment_size - packet_len_unpadded
761 );
762 builder_storage = Some(builder);
763 break;
764 }
765
766 builder.pad_to(segment_size as u16);
769 }
770
771 builder.finish_and_track(now, self, sent_frames.take(), buf);
772
773 if num_datagrams == 1 {
774 segment_size = buf.len();
781 buf_capacity = buf.len();
784
785 if space_id == SpaceId::Data {
792 let frame_space_1rtt =
793 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
794 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
795 break;
796 }
797 }
798 }
799 }
800
801 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
803 0 => segment_size,
804 _ => {
805 self.spaces[space_id].loss_probes -= 1;
806 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
810 }
811 };
812 buf_capacity += next_datagram_size_limit;
813 if buf.capacity() < buf_capacity {
814 buf.reserve(max_datagrams * segment_size);
823 }
824 num_datagrams += 1;
825 coalesce = true;
826 pad_datagram = false;
827 datagram_start = buf.len();
828
829 debug_assert_eq!(
830 datagram_start % segment_size,
831 0,
832 "datagrams in a GSO batch must be aligned to the segment size"
833 );
834 } else {
835 if let Some(builder) = builder_storage.take() {
839 builder.finish_and_track(now, self, sent_frames.take(), buf);
840 }
841 }
842
843 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
844
845 if self.spaces[SpaceId::Initial].crypto.is_some()
850 && space_id == SpaceId::Handshake
851 && self.side.is_client()
852 {
853 self.discard_space(now, SpaceId::Initial);
856 }
857 if let Some(ref mut prev) = self.prev_crypto {
858 prev.update_unacked = false;
859 }
860
861 debug_assert!(
862 builder_storage.is_none() && sent_frames.is_none(),
863 "Previous packet must have been finished"
864 );
865
866 let builder = builder_storage.insert(PacketBuilder::new(
867 now,
868 space_id,
869 self.rem_cids.active(),
870 buf,
871 buf_capacity,
872 datagram_start,
873 ack_eliciting,
874 self,
875 )?);
876 coalesce = coalesce && !builder.short_header;
877
878 pad_datagram |=
880 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
881
882 if close {
883 trace!("sending CONNECTION_CLOSE");
884 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
889 Self::populate_acks(
890 now,
891 self.receiving_ecn,
892 &mut SentFrames::default(),
893 &mut self.spaces[space_id],
894 buf,
895 &mut self.stats,
896 );
897 }
898
899 debug_assert!(
903 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
904 "ACKs should leave space for ConnectionClose"
905 );
906 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
907 let max_frame_size = builder.max_size - buf.len();
908 match self.state {
909 State::Closed(state::Closed { ref reason }) => {
910 if space_id == SpaceId::Data || reason.is_transport_layer() {
911 reason.encode(buf, max_frame_size)
912 } else {
913 frame::ConnectionClose {
914 error_code: TransportErrorCode::APPLICATION_ERROR,
915 frame_type: None,
916 reason: Bytes::new(),
917 }
918 .encode(buf, max_frame_size)
919 }
920 }
921 State::Draining => frame::ConnectionClose {
922 error_code: TransportErrorCode::NO_ERROR,
923 frame_type: None,
924 reason: Bytes::new(),
925 }
926 .encode(buf, max_frame_size),
927 _ => unreachable!(
928 "tried to make a close packet when the connection wasn't closed"
929 ),
930 }
931 }
932 if space_id == self.highest_space {
933 self.close = false;
935 break;
937 } else {
938 space_idx += 1;
942 continue;
943 }
944 }
945
946 if space_id == SpaceId::Data && num_datagrams == 1 {
949 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
950 let mut builder = builder_storage.take().unwrap();
953 trace!("PATH_RESPONSE {:08x} (off-path)", token);
954 buf.write(frame::FrameType::PATH_RESPONSE);
955 buf.write(token);
956 self.stats.frame_tx.path_response += 1;
957 builder.pad_to(MIN_INITIAL_SIZE);
958 builder.finish_and_track(
959 now,
960 self,
961 Some(SentFrames {
962 non_retransmits: true,
963 ..SentFrames::default()
964 }),
965 buf,
966 );
967 self.stats.udp_tx.on_sent(1, buf.len());
968 return Some(Transmit {
969 destination: remote,
970 size: buf.len(),
971 ecn: None,
972 segment_size: None,
973 src_ip: self.local_ip,
974 });
975 }
976 }
977
978 let sent =
979 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
980
981 debug_assert!(
988 !(sent.is_ack_only(&self.streams)
989 && !can_send.acks
990 && can_send.other
991 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
992 && self.datagrams.outgoing.is_empty()),
993 "SendableFrames was {can_send:?}, but only ACKs have been written"
994 );
995 pad_datagram |= sent.requires_padding;
996
997 if sent.largest_acked.is_some() {
998 self.spaces[space_id].pending_acks.acks_sent();
999 self.timers.stop(Timer::MaxAckDelay);
1000 }
1001
1002 sent_frames = Some(sent);
1004
1005 }
1008
1009 if let Some(mut builder) = builder_storage {
1011 if pad_datagram {
1012 builder.pad_to(MIN_INITIAL_SIZE);
1013 }
1014
1015 if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
1021 builder.pad_to(segment_size as u16);
1022 }
1023
1024 let last_packet_number = builder.exact_number;
1025 builder.finish_and_track(now, self, sent_frames, buf);
1026 self.path
1027 .congestion
1028 .on_sent(now, buf.len() as u64, last_packet_number);
1029
1030 #[cfg(feature = "__qlog")]
1031 self.emit_qlog_recovery_metrics(now);
1032 }
1033
1034 self.app_limited = buf.is_empty() && !congestion_blocked;
1035
1036 if buf.is_empty() && self.state.is_established() {
1038 let space_id = SpaceId::Data;
1039 let probe_size = self
1040 .path
1041 .mtud
1042 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
1043
1044 let buf_capacity = probe_size as usize;
1045 buf.reserve(buf_capacity);
1046
1047 let mut builder = PacketBuilder::new(
1048 now,
1049 space_id,
1050 self.rem_cids.active(),
1051 buf,
1052 buf_capacity,
1053 0,
1054 true,
1055 self,
1056 )?;
1057
1058 buf.write(frame::FrameType::PING);
1060 self.stats.frame_tx.ping += 1;
1061
1062 if self.peer_supports_ack_frequency() {
1064 buf.write(frame::FrameType::IMMEDIATE_ACK);
1065 self.stats.frame_tx.immediate_ack += 1;
1066 }
1067
1068 builder.pad_to(probe_size);
1069 let sent_frames = SentFrames {
1070 non_retransmits: true,
1071 ..Default::default()
1072 };
1073 builder.finish_and_track(now, self, Some(sent_frames), buf);
1074
1075 self.stats.path.sent_plpmtud_probes += 1;
1076 num_datagrams = 1;
1077
1078 trace!(?probe_size, "writing MTUD probe");
1079 }
1080
1081 if buf.is_empty() {
1082 return None;
1083 }
1084
1085 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1086 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1087
1088 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
1089
1090 Some(Transmit {
1091 destination: self.path.remote,
1092 size: buf.len(),
1093 ecn: if self.path.sending_ecn {
1094 Some(EcnCodepoint::Ect0)
1095 } else {
1096 None
1097 },
1098 segment_size: match num_datagrams {
1099 1 => None,
1100 _ => Some(segment_size),
1101 },
1102 src_ip: self.local_ip,
1103 })
1104 }
1105
1106 fn send_coordination_request(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1108 let should_send = self.nat_traversal.as_ref()?.should_send_punch_request();
1110 if !should_send {
1111 return None;
1112 }
1113
1114 let (round, target_addrs, coordinator_addr) = {
1115 let nat_traversal = self.nat_traversal.as_ref()?;
1116 let coord = nat_traversal.coordination.as_ref()?;
1117 let addrs: Vec<_> = coord.punch_targets.iter().map(|t| t.remote_addr).collect();
1118 (coord.round, addrs, self.path.remote) };
1120
1121 if target_addrs.is_empty() {
1122 return None;
1123 }
1124
1125 debug_assert_eq!(
1126 self.highest_space,
1127 SpaceId::Data,
1128 "PUNCH_ME_NOW queued without 1-RTT keys"
1129 );
1130
1131 buf.reserve(MIN_INITIAL_SIZE as usize);
1132 let buf_capacity = buf.capacity();
1133
1134 let mut builder = PacketBuilder::new(
1135 now,
1136 SpaceId::Data,
1137 self.rem_cids.active(),
1138 buf,
1139 buf_capacity,
1140 0,
1141 false,
1142 self,
1143 )?;
1144
1145 trace!("sending PUNCH_ME_NOW round {} with {} targets", round, target_addrs.len());
1146
1147 buf.write(frame::FrameType::PUNCH_ME_NOW);
1149 buf.write(round);
1150 buf.write(target_addrs.len() as u8);
1151 for addr in target_addrs {
1152 match addr {
1153 SocketAddr::V4(v4) => {
1154 buf.write(4u8); buf.write(u32::from(*v4.ip()));
1156 buf.write(v4.port());
1157 }
1158 SocketAddr::V6(v6) => {
1159 buf.write(6u8); buf.write(*v6.ip());
1161 buf.write(v6.port());
1162 }
1163 }
1164 }
1165
1166 self.stats.frame_tx.ping += 1; builder.pad_to(MIN_INITIAL_SIZE);
1169 builder.finish_and_track(now, self, None, buf);
1170
1171 if let Some(nat_traversal) = &mut self.nat_traversal {
1173 nat_traversal.mark_punch_request_sent();
1174 }
1175
1176 Some(Transmit {
1177 destination: coordinator_addr,
1178 size: buf.len(),
1179 ecn: if self.path.sending_ecn {
1180 Some(EcnCodepoint::Ect0)
1181 } else {
1182 None
1183 },
1184 segment_size: None,
1185 src_ip: self.local_ip,
1186 })
1187 }
1188
1189 fn send_coordinated_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1191 if let Some(nat_traversal) = &mut self.nat_traversal {
1193 if nat_traversal.should_start_punching(now) {
1194 nat_traversal.start_punching_phase(now);
1195 }
1196 }
1197
1198 let (target_addr, challenge) = {
1200 let nat_traversal = self.nat_traversal.as_ref()?;
1201 match nat_traversal.get_coordination_phase() {
1202 Some(CoordinationPhase::Punching) => {
1203 let targets = nat_traversal.get_punch_targets()?;
1204 if targets.is_empty() {
1205 return None;
1206 }
1207 let target = &targets[0];
1209 (target.remote_addr, target.challenge)
1210 }
1211 _ => return None,
1212 }
1213 };
1214
1215 debug_assert_eq!(
1216 self.highest_space,
1217 SpaceId::Data,
1218 "PATH_CHALLENGE queued without 1-RTT keys"
1219 );
1220
1221 buf.reserve(MIN_INITIAL_SIZE as usize);
1222 let buf_capacity = buf.capacity();
1223
1224 let mut builder = PacketBuilder::new(
1225 now,
1226 SpaceId::Data,
1227 self.rem_cids.active(),
1228 buf,
1229 buf_capacity,
1230 0,
1231 false,
1232 self,
1233 )?;
1234
1235 trace!("sending coordinated PATH_CHALLENGE {:08x} to {}", challenge, target_addr);
1236 buf.write(frame::FrameType::PATH_CHALLENGE);
1237 buf.write(challenge);
1238 self.stats.frame_tx.path_challenge += 1;
1239
1240 builder.pad_to(MIN_INITIAL_SIZE);
1241 builder.finish_and_track(now, self, None, buf);
1242
1243 if let Some(nat_traversal) = &mut self.nat_traversal {
1245 nat_traversal.mark_coordination_validating();
1246 }
1247
1248 Some(Transmit {
1249 destination: target_addr,
1250 size: buf.len(),
1251 ecn: if self.path.sending_ecn {
1252 Some(EcnCodepoint::Ect0)
1253 } else {
1254 None
1255 },
1256 segment_size: None,
1257 src_ip: self.local_ip,
1258 })
1259 }
1260
1261 fn send_nat_traversal_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1263 if let Some(request) = self.send_coordination_request(now, buf) {
1265 return Some(request);
1266 }
1267
1268 if let Some(punch) = self.send_coordinated_path_challenge(now, buf) {
1270 return Some(punch);
1271 }
1272
1273 let (remote_addr, remote_sequence) = {
1275 let nat_traversal = self.nat_traversal.as_ref()?;
1276 let candidates = nat_traversal.get_validation_candidates();
1277 if candidates.is_empty() {
1278 return None;
1279 }
1280 let (sequence, candidate) = candidates[0];
1282 (candidate.address, sequence)
1283 };
1284
1285 let challenge = rand::Rng::random::<u64>(&mut self.rng);
1286
1287 if let Err(e) = self.nat_traversal.as_mut()?.start_validation(remote_sequence, challenge, now) {
1289 warn!("Failed to start NAT traversal validation: {}", e);
1290 return None;
1291 }
1292
1293 debug_assert_eq!(
1294 self.highest_space,
1295 SpaceId::Data,
1296 "PATH_CHALLENGE queued without 1-RTT keys"
1297 );
1298
1299 buf.reserve(MIN_INITIAL_SIZE as usize);
1300 let buf_capacity = buf.capacity();
1301
1302 let mut builder = PacketBuilder::new(
1304 now,
1305 SpaceId::Data,
1306 self.rem_cids.active(),
1307 buf,
1308 buf_capacity,
1309 0,
1310 false,
1311 self,
1312 )?;
1313
1314 trace!("sending PATH_CHALLENGE {:08x} to NAT candidate {}", challenge, remote_addr);
1315 buf.write(frame::FrameType::PATH_CHALLENGE);
1316 buf.write(challenge);
1317 self.stats.frame_tx.path_challenge += 1;
1318
1319 builder.pad_to(MIN_INITIAL_SIZE);
1321
1322 builder.finish_and_track(now, self, None, buf);
1323
1324 Some(Transmit {
1325 destination: remote_addr,
1326 size: buf.len(),
1327 ecn: if self.path.sending_ecn {
1328 Some(EcnCodepoint::Ect0)
1329 } else {
1330 None
1331 },
1332 segment_size: None,
1333 src_ip: self.local_ip,
1334 })
1335 }
1336
1337 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1339 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1340 if !prev_path.challenge_pending {
1341 return None;
1342 }
1343 prev_path.challenge_pending = false;
1344 let token = prev_path
1345 .challenge
1346 .expect("previous path challenge pending without token");
1347 let destination = prev_path.remote;
1348 debug_assert_eq!(
1349 self.highest_space,
1350 SpaceId::Data,
1351 "PATH_CHALLENGE queued without 1-RTT keys"
1352 );
1353 buf.reserve(MIN_INITIAL_SIZE as usize);
1354
1355 let buf_capacity = buf.capacity();
1356
1357 let mut builder = PacketBuilder::new(
1363 now,
1364 SpaceId::Data,
1365 *prev_cid,
1366 buf,
1367 buf_capacity,
1368 0,
1369 false,
1370 self,
1371 )?;
1372 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1373 buf.write(frame::FrameType::PATH_CHALLENGE);
1374 buf.write(token);
1375 self.stats.frame_tx.path_challenge += 1;
1376
1377 builder.pad_to(MIN_INITIAL_SIZE);
1382
1383 builder.finish(self, buf);
1384 self.stats.udp_tx.on_sent(1, buf.len());
1385
1386 Some(Transmit {
1387 destination,
1388 size: buf.len(),
1389 ecn: None,
1390 segment_size: None,
1391 src_ip: self.local_ip,
1392 })
1393 }
1394
1395 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1397 if self.spaces[space_id].crypto.is_none()
1398 && (space_id != SpaceId::Data
1399 || self.zero_rtt_crypto.is_none()
1400 || self.side.is_server())
1401 {
1402 return SendableFrames::empty();
1404 }
1405 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1406 if space_id == SpaceId::Data {
1407 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1408 }
1409 can_send
1410 }
1411
1412 pub fn handle_event(&mut self, event: ConnectionEvent) {
1418 use ConnectionEventInner::*;
1419 match event.0 {
1420 Datagram(DatagramConnectionEvent {
1421 now,
1422 remote,
1423 ecn,
1424 first_decode,
1425 remaining,
1426 }) => {
1427 if remote != self.path.remote && !self.side.remote_may_migrate() {
1431 trace!("discarding packet from unrecognized peer {}", remote);
1432 return;
1433 }
1434
1435 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1436
1437 self.stats.udp_rx.datagrams += 1;
1438 self.stats.udp_rx.bytes += first_decode.len() as u64;
1439 let data_len = first_decode.len();
1440
1441 self.handle_decode(now, remote, ecn, first_decode);
1442 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1447
1448 if let Some(data) = remaining {
1449 self.stats.udp_rx.bytes += data.len() as u64;
1450 self.handle_coalesced(now, remote, ecn, data);
1451 }
1452
1453 #[cfg(feature = "__qlog")]
1454 self.emit_qlog_recovery_metrics(now);
1455
1456 if was_anti_amplification_blocked {
1457 self.set_loss_detection_timer(now);
1461 }
1462 }
1463 NewIdentifiers(ids, now) => {
1464 self.local_cid_state.new_cids(&ids, now);
1465 ids.into_iter().rev().for_each(|frame| {
1466 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1467 });
1468 if self
1470 .timers
1471 .get(Timer::PushNewCid)
1472 .map_or(true, |x| x <= now)
1473 {
1474 self.reset_cid_retirement();
1475 }
1476 }
1477 }
1478 }
1479
1480 pub fn handle_timeout(&mut self, now: Instant) {
1490 for &timer in &Timer::VALUES {
1491 if !self.timers.is_expired(timer, now) {
1492 continue;
1493 }
1494 self.timers.stop(timer);
1495 trace!(timer = ?timer, "timeout");
1496 match timer {
1497 Timer::Close => {
1498 self.state = State::Drained;
1499 self.endpoint_events.push_back(EndpointEventInner::Drained);
1500 }
1501 Timer::Idle => {
1502 self.kill(ConnectionError::TimedOut);
1503 }
1504 Timer::KeepAlive => {
1505 trace!("sending keep-alive");
1506 self.ping();
1507 }
1508 Timer::LossDetection => {
1509 self.on_loss_detection_timeout(now);
1510
1511 #[cfg(feature = "__qlog")]
1512 self.emit_qlog_recovery_metrics(now);
1513 }
1514 Timer::KeyDiscard => {
1515 self.zero_rtt_crypto = None;
1516 self.prev_crypto = None;
1517 }
1518 Timer::PathValidation => {
1519 debug!("path validation failed");
1520 if let Some((_, prev)) = self.prev_path.take() {
1521 self.path = prev;
1522 }
1523 self.path.challenge = None;
1524 self.path.challenge_pending = false;
1525 }
1526 Timer::Pacing => trace!("pacing timer expired"),
1527 Timer::PushNewCid => {
1528 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1530 if !self.state.is_closed() {
1531 trace!(
1532 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1533 self.local_cid_state.retire_prior_to()
1534 );
1535 self.endpoint_events
1536 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1537 }
1538 }
1539 Timer::MaxAckDelay => {
1540 trace!("max ack delay reached");
1541 self.spaces[SpaceId::Data]
1543 .pending_acks
1544 .on_max_ack_delay_timeout()
1545 }
1546 }
1547 }
1548 }
1549
1550 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1562 self.close_inner(
1563 now,
1564 Close::Application(frame::ApplicationClose { error_code, reason }),
1565 )
1566 }
1567
1568 fn close_inner(&mut self, now: Instant, reason: Close) {
1569 let was_closed = self.state.is_closed();
1570 if !was_closed {
1571 self.close_common();
1572 self.set_close_timer(now);
1573 self.close = true;
1574 self.state = State::Closed(state::Closed { reason });
1575 }
1576 }
1577
1578 pub fn datagrams(&mut self) -> Datagrams<'_> {
1580 Datagrams { conn: self }
1581 }
1582
1583 pub fn stats(&self) -> ConnectionStats {
1585 let mut stats = self.stats;
1586 stats.path.rtt = self.path.rtt.get();
1587 stats.path.cwnd = self.path.congestion.window();
1588 stats.path.current_mtu = self.path.mtud.current_mtu();
1589
1590 stats
1591 }
1592
1593 pub fn ping(&mut self) {
1597 self.spaces[self.highest_space].ping_pending = true;
1598 }
1599
1600 pub fn force_key_update(&mut self) {
1604 if !self.state.is_established() {
1605 debug!("ignoring forced key update in illegal state");
1606 return;
1607 }
1608 if self.prev_crypto.is_some() {
1609 debug!("ignoring redundant forced key update");
1612 return;
1613 }
1614 self.update_keys(None, false);
1615 }
1616
1617 #[doc(hidden)]
1619 #[deprecated]
1620 pub fn initiate_key_update(&mut self) {
1621 self.force_key_update();
1622 }
1623
1624 pub fn crypto_session(&self) -> &dyn crypto::Session {
1626 &*self.crypto
1627 }
1628
1629 pub fn is_handshaking(&self) -> bool {
1634 self.state.is_handshake()
1635 }
1636
1637 pub fn is_closed(&self) -> bool {
1645 self.state.is_closed()
1646 }
1647
1648 pub fn is_drained(&self) -> bool {
1653 self.state.is_drained()
1654 }
1655
1656 pub fn accepted_0rtt(&self) -> bool {
1660 self.accepted_0rtt
1661 }
1662
1663 pub fn has_0rtt(&self) -> bool {
1665 self.zero_rtt_enabled
1666 }
1667
1668 pub fn has_pending_retransmits(&self) -> bool {
1670 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1671 }
1672
1673 pub fn side(&self) -> Side {
1675 self.side.side()
1676 }
1677
1678 pub fn remote_address(&self) -> SocketAddr {
1680 self.path.remote
1681 }
1682
1683 pub fn local_ip(&self) -> Option<IpAddr> {
1693 self.local_ip
1694 }
1695
1696 pub fn rtt(&self) -> Duration {
1698 self.path.rtt.get()
1699 }
1700
1701 pub fn congestion_state(&self) -> &dyn Controller {
1703 self.path.congestion.as_ref()
1704 }
1705
1706 pub fn path_changed(&mut self, now: Instant) {
1717 self.path.reset(now, &self.config);
1718 }
1719
1720 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1725 self.streams.set_max_concurrent(dir, count);
1726 let pending = &mut self.spaces[SpaceId::Data].pending;
1729 self.streams.queue_max_stream_id(pending);
1730 }
1731
1732 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1738 self.streams.max_concurrent(dir)
1739 }
1740
1741 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1743 if self.streams.set_receive_window(receive_window) {
1744 self.spaces[SpaceId::Data].pending.max_data = true;
1745 }
1746 }
1747
1748 fn on_ack_received(
1749 &mut self,
1750 now: Instant,
1751 space: SpaceId,
1752 ack: frame::Ack,
1753 ) -> Result<(), TransportError> {
1754 if ack.largest >= self.spaces[space].next_packet_number {
1755 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1756 }
1757 let new_largest = {
1758 let space = &mut self.spaces[space];
1759 if space
1760 .largest_acked_packet
1761 .map_or(true, |pn| ack.largest > pn)
1762 {
1763 space.largest_acked_packet = Some(ack.largest);
1764 if let Some(info) = space.sent_packets.get(&ack.largest) {
1765 space.largest_acked_packet_sent = info.time_sent;
1769 }
1770 true
1771 } else {
1772 false
1773 }
1774 };
1775
1776 let mut newly_acked = ArrayRangeSet::new();
1778 for range in ack.iter() {
1779 self.packet_number_filter.check_ack(space, range.clone())?;
1780 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1781 newly_acked.insert_one(pn);
1782 }
1783 }
1784
1785 if newly_acked.is_empty() {
1786 return Ok(());
1787 }
1788
1789 let mut ack_eliciting_acked = false;
1790 for packet in newly_acked.elts() {
1791 if let Some(info) = self.spaces[space].take(packet) {
1792 if let Some(acked) = info.largest_acked {
1793 self.spaces[space].pending_acks.subtract_below(acked);
1799 }
1800 ack_eliciting_acked |= info.ack_eliciting;
1801
1802 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1804 if mtu_updated {
1805 self.path
1806 .congestion
1807 .on_mtu_update(self.path.mtud.current_mtu());
1808 }
1809
1810 self.ack_frequency.on_acked(packet);
1812
1813 self.on_packet_acked(now, packet, info);
1814 }
1815 }
1816
1817 self.path.congestion.on_end_acks(
1818 now,
1819 self.path.in_flight.bytes,
1820 self.app_limited,
1821 self.spaces[space].largest_acked_packet,
1822 );
1823
1824 if new_largest && ack_eliciting_acked {
1825 let ack_delay = if space != SpaceId::Data {
1826 Duration::from_micros(0)
1827 } else {
1828 cmp::min(
1829 self.ack_frequency.peer_max_ack_delay,
1830 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1831 )
1832 };
1833 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1834 self.path.rtt.update(ack_delay, rtt);
1835 if self.path.first_packet_after_rtt_sample.is_none() {
1836 self.path.first_packet_after_rtt_sample =
1837 Some((space, self.spaces[space].next_packet_number));
1838 }
1839 }
1840
1841 self.detect_lost_packets(now, space, true);
1843
1844 if self.peer_completed_address_validation() {
1845 self.pto_count = 0;
1846 }
1847
1848 if self.path.sending_ecn {
1850 if let Some(ecn) = ack.ecn {
1851 if new_largest {
1856 let sent = self.spaces[space].largest_acked_packet_sent;
1857 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1858 }
1859 } else {
1860 debug!("ECN not acknowledged by peer");
1862 self.path.sending_ecn = false;
1863 }
1864 }
1865
1866 self.set_loss_detection_timer(now);
1867 Ok(())
1868 }
1869
1870 fn process_ecn(
1872 &mut self,
1873 now: Instant,
1874 space: SpaceId,
1875 newly_acked: u64,
1876 ecn: frame::EcnCounts,
1877 largest_sent_time: Instant,
1878 ) {
1879 match self.spaces[space].detect_ecn(newly_acked, ecn) {
1880 Err(e) => {
1881 debug!("halting ECN due to verification failure: {}", e);
1882 self.path.sending_ecn = false;
1883 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
1886 }
1887 Ok(false) => {}
1888 Ok(true) => {
1889 self.stats.path.congestion_events += 1;
1890 self.path
1891 .congestion
1892 .on_congestion_event(now, largest_sent_time, false, 0);
1893 }
1894 }
1895 }
1896
1897 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
1900 self.remove_in_flight(pn, &info);
1901 if info.ack_eliciting && self.path.challenge.is_none() {
1902 self.path.congestion.on_ack(
1905 now,
1906 info.time_sent,
1907 info.size.into(),
1908 self.app_limited,
1909 &self.path.rtt,
1910 );
1911 }
1912
1913 if let Some(retransmits) = info.retransmits.get() {
1915 for (id, _) in retransmits.reset_stream.iter() {
1916 self.streams.reset_acked(*id);
1917 }
1918 }
1919
1920 for frame in info.stream_frames {
1921 self.streams.received_ack_of(frame);
1922 }
1923 }
1924
1925 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
1926 let start = if self.zero_rtt_crypto.is_some() {
1927 now
1928 } else {
1929 self.prev_crypto
1930 .as_ref()
1931 .expect("no previous keys")
1932 .end_packet
1933 .as_ref()
1934 .expect("update not acknowledged yet")
1935 .1
1936 };
1937 self.timers
1938 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
1939 }
1940
1941 fn on_loss_detection_timeout(&mut self, now: Instant) {
1942 if let Some((_, pn_space)) = self.loss_time_and_space() {
1943 self.detect_lost_packets(now, pn_space, false);
1945 self.set_loss_detection_timer(now);
1946 return;
1947 }
1948
1949 let (_, space) = match self.pto_time_and_space(now) {
1950 Some(x) => x,
1951 None => {
1952 error!("PTO expired while unset");
1953 return;
1954 }
1955 };
1956 trace!(
1957 in_flight = self.path.in_flight.bytes,
1958 count = self.pto_count,
1959 ?space,
1960 "PTO fired"
1961 );
1962
1963 let count = match self.path.in_flight.ack_eliciting {
1964 0 => {
1967 debug_assert!(!self.peer_completed_address_validation());
1968 1
1969 }
1970 _ => 2,
1972 };
1973 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
1974 self.pto_count = self.pto_count.saturating_add(1);
1975 self.set_loss_detection_timer(now);
1976 }
1977
1978 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
1979 let mut lost_packets = Vec::<u64>::new();
1980 let mut lost_mtu_probe = None;
1981 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
1982 let rtt = self.path.rtt.conservative();
1983 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
1984
1985 let lost_send_time = now.checked_sub(loss_delay).unwrap();
1987 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
1988 let packet_threshold = self.config.packet_threshold as u64;
1989 let mut size_of_lost_packets = 0u64;
1990
1991 let congestion_period =
1995 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
1996 let mut persistent_congestion_start: Option<Instant> = None;
1997 let mut prev_packet = None;
1998 let mut in_persistent_congestion = false;
1999
2000 let space = &mut self.spaces[pn_space];
2001 space.loss_time = None;
2002
2003 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
2004 if prev_packet != Some(packet.wrapping_sub(1)) {
2005 persistent_congestion_start = None;
2007 }
2008
2009 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
2010 {
2011 if Some(packet) == in_flight_mtu_probe {
2012 lost_mtu_probe = in_flight_mtu_probe;
2015 } else {
2016 lost_packets.push(packet);
2017 size_of_lost_packets += info.size as u64;
2018 if info.ack_eliciting && due_to_ack {
2019 match persistent_congestion_start {
2020 Some(start) if info.time_sent - start > congestion_period => {
2023 in_persistent_congestion = true;
2024 }
2025 None if self
2027 .path
2028 .first_packet_after_rtt_sample
2029 .is_some_and(|x| x < (pn_space, packet)) =>
2030 {
2031 persistent_congestion_start = Some(info.time_sent);
2032 }
2033 _ => {}
2034 }
2035 }
2036 }
2037 } else {
2038 let next_loss_time = info.time_sent + loss_delay;
2039 space.loss_time = Some(
2040 space
2041 .loss_time
2042 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
2043 );
2044 persistent_congestion_start = None;
2045 }
2046
2047 prev_packet = Some(packet);
2048 }
2049
2050 if let Some(largest_lost) = lost_packets.last().cloned() {
2052 let old_bytes_in_flight = self.path.in_flight.bytes;
2053 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
2054 self.lost_packets += lost_packets.len() as u64;
2055 self.stats.path.lost_packets += lost_packets.len() as u64;
2056 self.stats.path.lost_bytes += size_of_lost_packets;
2057 trace!(
2058 "packets lost: {:?}, bytes lost: {}",
2059 lost_packets, size_of_lost_packets
2060 );
2061
2062 for &packet in &lost_packets {
2063 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2065 for frame in info.stream_frames {
2066 self.streams.retransmit(frame);
2067 }
2068 self.spaces[pn_space].pending |= info.retransmits;
2069 self.path.mtud.on_non_probe_lost(packet, info.size);
2070 }
2071
2072 if self.path.mtud.black_hole_detected(now) {
2073 self.stats.path.black_holes_detected += 1;
2074 self.path
2075 .congestion
2076 .on_mtu_update(self.path.mtud.current_mtu());
2077 if let Some(max_datagram_size) = self.datagrams().max_size() {
2078 self.datagrams.drop_oversized(max_datagram_size);
2079 }
2080 }
2081
2082 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
2084
2085 if lost_ack_eliciting {
2086 self.stats.path.congestion_events += 1;
2087 self.path.congestion.on_congestion_event(
2088 now,
2089 largest_lost_sent,
2090 in_persistent_congestion,
2091 size_of_lost_packets,
2092 );
2093 }
2094 }
2095
2096 if let Some(packet) = lost_mtu_probe {
2098 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
2100 self.path.mtud.on_probe_lost();
2101 self.stats.path.lost_plpmtud_probes += 1;
2102 }
2103 }
2104
2105 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
2106 SpaceId::iter()
2107 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
2108 .min_by_key(|&(time, _)| time)
2109 }
2110
2111 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
2112 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
2113 let mut duration = self.path.rtt.pto_base() * backoff;
2114
2115 if self.path.in_flight.ack_eliciting == 0 {
2116 debug_assert!(!self.peer_completed_address_validation());
2117 let space = match self.highest_space {
2118 SpaceId::Handshake => SpaceId::Handshake,
2119 _ => SpaceId::Initial,
2120 };
2121 return Some((now + duration, space));
2122 }
2123
2124 let mut result = None;
2125 for space in SpaceId::iter() {
2126 if self.spaces[space].in_flight == 0 {
2127 continue;
2128 }
2129 if space == SpaceId::Data {
2130 if self.is_handshaking() {
2132 return result;
2133 }
2134 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2136 }
2137 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
2138 Some(time) => time,
2139 None => continue,
2140 };
2141 let pto = last_ack_eliciting + duration;
2142 if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
2143 result = Some((pto, space));
2144 }
2145 }
2146 result
2147 }
2148
2149 fn peer_completed_address_validation(&self) -> bool {
2150 if self.side.is_server() || self.state.is_closed() {
2151 return true;
2152 }
2153 self.spaces[SpaceId::Handshake]
2156 .largest_acked_packet
2157 .is_some()
2158 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
2159 || (self.spaces[SpaceId::Data].crypto.is_some()
2160 && self.spaces[SpaceId::Handshake].crypto.is_none())
2161 }
2162
2163 fn set_loss_detection_timer(&mut self, now: Instant) {
2164 if self.state.is_closed() {
2165 return;
2169 }
2170
2171 if let Some((loss_time, _)) = self.loss_time_and_space() {
2172 self.timers.set(Timer::LossDetection, loss_time);
2174 return;
2175 }
2176
2177 if self.path.anti_amplification_blocked(1) {
2178 self.timers.stop(Timer::LossDetection);
2180 return;
2181 }
2182
2183 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
2184 self.timers.stop(Timer::LossDetection);
2187 return;
2188 }
2189
2190 if let Some((timeout, _)) = self.pto_time_and_space(now) {
2193 self.timers.set(Timer::LossDetection, timeout);
2194 } else {
2195 self.timers.stop(Timer::LossDetection);
2196 }
2197 }
2198
2199 fn pto(&self, space: SpaceId) -> Duration {
2201 let max_ack_delay = match space {
2202 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
2203 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
2204 };
2205 self.path.rtt.pto_base() + max_ack_delay
2206 }
2207
2208 fn on_packet_authenticated(
2209 &mut self,
2210 now: Instant,
2211 space_id: SpaceId,
2212 ecn: Option<EcnCodepoint>,
2213 packet: Option<u64>,
2214 spin: bool,
2215 is_1rtt: bool,
2216 ) {
2217 self.total_authed_packets += 1;
2218 self.reset_keep_alive(now);
2219 self.reset_idle_timeout(now, space_id);
2220 self.permit_idle_reset = true;
2221 self.receiving_ecn |= ecn.is_some();
2222 if let Some(x) = ecn {
2223 let space = &mut self.spaces[space_id];
2224 space.ecn_counters += x;
2225
2226 if x.is_ce() {
2227 space.pending_acks.set_immediate_ack_required();
2228 }
2229 }
2230
2231 let packet = match packet {
2232 Some(x) => x,
2233 None => return,
2234 };
2235 if self.side.is_server() {
2236 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
2237 self.discard_space(now, SpaceId::Initial);
2239 }
2240 if self.zero_rtt_crypto.is_some() && is_1rtt {
2241 self.set_key_discard_timer(now, space_id)
2243 }
2244 }
2245 let space = &mut self.spaces[space_id];
2246 space.pending_acks.insert_one(packet, now);
2247 if packet >= space.rx_packet {
2248 space.rx_packet = packet;
2249 self.spin = self.side.is_client() ^ spin;
2251 }
2252 }
2253
2254 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
2255 let timeout = match self.idle_timeout {
2256 None => return,
2257 Some(dur) => dur,
2258 };
2259 if self.state.is_closed() {
2260 self.timers.stop(Timer::Idle);
2261 return;
2262 }
2263 let dt = cmp::max(timeout, 3 * self.pto(space));
2264 self.timers.set(Timer::Idle, now + dt);
2265 }
2266
2267 fn reset_keep_alive(&mut self, now: Instant) {
2268 let interval = match self.config.keep_alive_interval {
2269 Some(x) if self.state.is_established() => x,
2270 _ => return,
2271 };
2272 self.timers.set(Timer::KeepAlive, now + interval);
2273 }
2274
2275 fn reset_cid_retirement(&mut self) {
2276 if let Some(t) = self.local_cid_state.next_timeout() {
2277 self.timers.set(Timer::PushNewCid, t);
2278 }
2279 }
2280
2281 pub(crate) fn handle_first_packet(
2286 &mut self,
2287 now: Instant,
2288 remote: SocketAddr,
2289 ecn: Option<EcnCodepoint>,
2290 packet_number: u64,
2291 packet: InitialPacket,
2292 remaining: Option<BytesMut>,
2293 ) -> Result<(), ConnectionError> {
2294 let span = trace_span!("first recv");
2295 let _guard = span.enter();
2296 debug_assert!(self.side.is_server());
2297 let len = packet.header_data.len() + packet.payload.len();
2298 self.path.total_recvd = len as u64;
2299
2300 match self.state {
2301 State::Handshake(ref mut state) => {
2302 state.expected_token = packet.header.token.clone();
2303 }
2304 _ => unreachable!("first packet must be delivered in Handshake state"),
2305 }
2306
2307 self.on_packet_authenticated(
2308 now,
2309 SpaceId::Initial,
2310 ecn,
2311 Some(packet_number),
2312 false,
2313 false,
2314 );
2315
2316 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2317 if let Some(data) = remaining {
2318 self.handle_coalesced(now, remote, ecn, data);
2319 }
2320
2321 #[cfg(feature = "__qlog")]
2322 self.emit_qlog_recovery_metrics(now);
2323
2324 Ok(())
2325 }
2326
2327 fn init_0rtt(&mut self) {
2328 let (header, packet) = match self.crypto.early_crypto() {
2329 Some(x) => x,
2330 None => return,
2331 };
2332 if self.side.is_client() {
2333 match self.crypto.transport_parameters() {
2334 Ok(params) => {
2335 let params = params
2336 .expect("crypto layer didn't supply transport parameters with ticket");
2337 let params = TransportParameters {
2339 initial_src_cid: None,
2340 original_dst_cid: None,
2341 preferred_address: None,
2342 retry_src_cid: None,
2343 stateless_reset_token: None,
2344 min_ack_delay: None,
2345 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2346 max_ack_delay: TransportParameters::default().max_ack_delay,
2347 ..params
2348 };
2349 self.set_peer_params(params);
2350 }
2351 Err(e) => {
2352 error!("session ticket has malformed transport parameters: {}", e);
2353 return;
2354 }
2355 }
2356 }
2357 trace!("0-RTT enabled");
2358 self.zero_rtt_enabled = true;
2359 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2360 }
2361
2362 fn read_crypto(
2363 &mut self,
2364 space: SpaceId,
2365 crypto: &frame::Crypto,
2366 payload_len: usize,
2367 ) -> Result<(), TransportError> {
2368 let expected = if !self.state.is_handshake() {
2369 SpaceId::Data
2370 } else if self.highest_space == SpaceId::Initial {
2371 SpaceId::Initial
2372 } else {
2373 SpaceId::Handshake
2376 };
2377 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2381
2382 let end = crypto.offset + crypto.data.len() as u64;
2383 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2384 warn!(
2385 "received new {:?} CRYPTO data when expecting {:?}",
2386 space, expected
2387 );
2388 return Err(TransportError::PROTOCOL_VIOLATION(
2389 "new data at unexpected encryption level",
2390 ));
2391 }
2392
2393 let space = &mut self.spaces[space];
2394 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2395 if max > self.config.crypto_buffer_size as u64 {
2396 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2397 }
2398
2399 space
2400 .crypto_stream
2401 .insert(crypto.offset, crypto.data.clone(), payload_len);
2402 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2403 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2404 if self.crypto.read_handshake(&chunk.bytes)? {
2405 self.events.push_back(Event::HandshakeDataReady);
2406 }
2407 }
2408
2409 Ok(())
2410 }
2411
2412 fn write_crypto(&mut self) {
2413 loop {
2414 let space = self.highest_space;
2415 let mut outgoing = Vec::new();
2416 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2417 match space {
2418 SpaceId::Initial => {
2419 self.upgrade_crypto(SpaceId::Handshake, crypto);
2420 }
2421 SpaceId::Handshake => {
2422 self.upgrade_crypto(SpaceId::Data, crypto);
2423 }
2424 _ => unreachable!("got updated secrets during 1-RTT"),
2425 }
2426 }
2427 if outgoing.is_empty() {
2428 if space == self.highest_space {
2429 break;
2430 } else {
2431 continue;
2433 }
2434 }
2435 let offset = self.spaces[space].crypto_offset;
2436 let outgoing = Bytes::from(outgoing);
2437 if let State::Handshake(ref mut state) = self.state {
2438 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2439 state.client_hello = Some(outgoing.clone());
2440 }
2441 }
2442 self.spaces[space].crypto_offset += outgoing.len() as u64;
2443 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2444 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2445 offset,
2446 data: outgoing,
2447 });
2448 }
2449 }
2450
2451 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2453 debug_assert!(
2454 self.spaces[space].crypto.is_none(),
2455 "already reached packet space {space:?}"
2456 );
2457 trace!("{:?} keys ready", space);
2458 if space == SpaceId::Data {
2459 self.next_crypto = Some(
2461 self.crypto
2462 .next_1rtt_keys()
2463 .expect("handshake should be complete"),
2464 );
2465 }
2466
2467 self.spaces[space].crypto = Some(crypto);
2468 debug_assert!(space as usize > self.highest_space as usize);
2469 self.highest_space = space;
2470 if space == SpaceId::Data && self.side.is_client() {
2471 self.zero_rtt_crypto = None;
2473 }
2474 }
2475
2476 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2477 debug_assert!(space_id != SpaceId::Data);
2478 trace!("discarding {:?} keys", space_id);
2479 if space_id == SpaceId::Initial {
2480 if let ConnectionSide::Client { token, .. } = &mut self.side {
2482 *token = Bytes::new();
2483 }
2484 }
2485 let space = &mut self.spaces[space_id];
2486 space.crypto = None;
2487 space.time_of_last_ack_eliciting_packet = None;
2488 space.loss_time = None;
2489 space.in_flight = 0;
2490 let sent_packets = mem::take(&mut space.sent_packets);
2491 for (pn, packet) in sent_packets.into_iter() {
2492 self.remove_in_flight(pn, &packet);
2493 }
2494 self.set_loss_detection_timer(now)
2495 }
2496
2497 fn handle_coalesced(
2498 &mut self,
2499 now: Instant,
2500 remote: SocketAddr,
2501 ecn: Option<EcnCodepoint>,
2502 data: BytesMut,
2503 ) {
2504 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2505 let mut remaining = Some(data);
2506 while let Some(data) = remaining {
2507 match PartialDecode::new(
2508 data,
2509 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2510 &[self.version],
2511 self.endpoint_config.grease_quic_bit,
2512 ) {
2513 Ok((partial_decode, rest)) => {
2514 remaining = rest;
2515 self.handle_decode(now, remote, ecn, partial_decode);
2516 }
2517 Err(e) => {
2518 trace!("malformed header: {}", e);
2519 return;
2520 }
2521 }
2522 }
2523 }
2524
2525 fn handle_decode(
2526 &mut self,
2527 now: Instant,
2528 remote: SocketAddr,
2529 ecn: Option<EcnCodepoint>,
2530 partial_decode: PartialDecode,
2531 ) {
2532 if let Some(decoded) = packet_crypto::unprotect_header(
2533 partial_decode,
2534 &self.spaces,
2535 self.zero_rtt_crypto.as_ref(),
2536 self.peer_params.stateless_reset_token,
2537 ) {
2538 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2539 }
2540 }
2541
2542 fn handle_packet(
2543 &mut self,
2544 now: Instant,
2545 remote: SocketAddr,
2546 ecn: Option<EcnCodepoint>,
2547 packet: Option<Packet>,
2548 stateless_reset: bool,
2549 ) {
2550 self.stats.udp_rx.ios += 1;
2551 if let Some(ref packet) = packet {
2552 trace!(
2553 "got {:?} packet ({} bytes) from {} using id {}",
2554 packet.header.space(),
2555 packet.payload.len() + packet.header_data.len(),
2556 remote,
2557 packet.header.dst_cid(),
2558 );
2559 }
2560
2561 if self.is_handshaking() && remote != self.path.remote {
2562 debug!("discarding packet with unexpected remote during handshake");
2563 return;
2564 }
2565
2566 let was_closed = self.state.is_closed();
2567 let was_drained = self.state.is_drained();
2568
2569 let decrypted = match packet {
2570 None => Err(None),
2571 Some(mut packet) => self
2572 .decrypt_packet(now, &mut packet)
2573 .map(move |number| (packet, number)),
2574 };
2575 let result = match decrypted {
2576 _ if stateless_reset => {
2577 debug!("got stateless reset");
2578 Err(ConnectionError::Reset)
2579 }
2580 Err(Some(e)) => {
2581 warn!("illegal packet: {}", e);
2582 Err(e.into())
2583 }
2584 Err(None) => {
2585 debug!("failed to authenticate packet");
2586 self.authentication_failures += 1;
2587 let integrity_limit = self.spaces[self.highest_space]
2588 .crypto
2589 .as_ref()
2590 .unwrap()
2591 .packet
2592 .local
2593 .integrity_limit();
2594 if self.authentication_failures > integrity_limit {
2595 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2596 } else {
2597 return;
2598 }
2599 }
2600 Ok((packet, number)) => {
2601 let span = match number {
2602 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2603 None => trace_span!("recv", space = ?packet.header.space()),
2604 };
2605 let _guard = span.enter();
2606
2607 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2608 if number.is_some_and(is_duplicate) {
2609 debug!("discarding possible duplicate packet");
2610 return;
2611 } else if self.state.is_handshake() && packet.header.is_short() {
2612 trace!("dropping short packet during handshake");
2614 return;
2615 } else {
2616 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2617 if let State::Handshake(ref hs) = self.state {
2618 if self.side.is_server() && token != &hs.expected_token {
2619 warn!("discarding Initial with invalid retry token");
2623 return;
2624 }
2625 }
2626 }
2627
2628 if !self.state.is_closed() {
2629 let spin = match packet.header {
2630 Header::Short { spin, .. } => spin,
2631 _ => false,
2632 };
2633 self.on_packet_authenticated(
2634 now,
2635 packet.header.space(),
2636 ecn,
2637 number,
2638 spin,
2639 packet.header.is_1rtt(),
2640 );
2641 }
2642
2643 self.process_decrypted_packet(now, remote, number, packet)
2644 }
2645 }
2646 };
2647
2648 if let Err(conn_err) = result {
2650 self.error = Some(conn_err.clone());
2651 self.state = match conn_err {
2652 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2653 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2654 ConnectionError::Reset
2655 | ConnectionError::TransportError(TransportError {
2656 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2657 ..
2658 }) => State::Drained,
2659 ConnectionError::TimedOut => {
2660 unreachable!("timeouts aren't generated by packet processing");
2661 }
2662 ConnectionError::TransportError(err) => {
2663 debug!("closing connection due to transport error: {}", err);
2664 State::closed(err)
2665 }
2666 ConnectionError::VersionMismatch => State::Draining,
2667 ConnectionError::LocallyClosed => {
2668 unreachable!("LocallyClosed isn't generated by packet processing");
2669 }
2670 ConnectionError::CidsExhausted => {
2671 unreachable!("CidsExhausted isn't generated by packet processing");
2672 }
2673 };
2674 }
2675
2676 if !was_closed && self.state.is_closed() {
2677 self.close_common();
2678 if !self.state.is_drained() {
2679 self.set_close_timer(now);
2680 }
2681 }
2682 if !was_drained && self.state.is_drained() {
2683 self.endpoint_events.push_back(EndpointEventInner::Drained);
2684 self.timers.stop(Timer::Close);
2687 }
2688
2689 if let State::Closed(_) = self.state {
2691 self.close = remote == self.path.remote;
2692 }
2693 }
2694
2695 fn process_decrypted_packet(
2696 &mut self,
2697 now: Instant,
2698 remote: SocketAddr,
2699 number: Option<u64>,
2700 packet: Packet,
2701 ) -> Result<(), ConnectionError> {
2702 let state = match self.state {
2703 State::Established => {
2704 match packet.header.space() {
2705 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2706 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2707 _ => {
2708 trace!("discarding unexpected pre-handshake packet");
2709 }
2710 }
2711 return Ok(());
2712 }
2713 State::Closed(_) => {
2714 for result in frame::Iter::new(packet.payload.freeze())? {
2715 let frame = match result {
2716 Ok(frame) => frame,
2717 Err(err) => {
2718 debug!("frame decoding error: {err:?}");
2719 continue;
2720 }
2721 };
2722
2723 if let Frame::Padding = frame {
2724 continue;
2725 };
2726
2727 self.stats.frame_rx.record(&frame);
2728
2729 if let Frame::Close(_) = frame {
2730 trace!("draining");
2731 self.state = State::Draining;
2732 break;
2733 }
2734 }
2735 return Ok(());
2736 }
2737 State::Draining | State::Drained => return Ok(()),
2738 State::Handshake(ref mut state) => state,
2739 };
2740
2741 match packet.header {
2742 Header::Retry {
2743 src_cid: rem_cid, ..
2744 } => {
2745 if self.side.is_server() {
2746 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2747 }
2748
2749 if self.total_authed_packets > 1
2750 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2752 &self.rem_cids.active(),
2753 &packet.header_data,
2754 &packet.payload,
2755 )
2756 {
2757 trace!("discarding invalid Retry");
2758 return Ok(());
2766 }
2767
2768 trace!("retrying with CID {}", rem_cid);
2769 let client_hello = state.client_hello.take().unwrap();
2770 self.retry_src_cid = Some(rem_cid);
2771 self.rem_cids.update_initial_cid(rem_cid);
2772 self.rem_handshake_cid = rem_cid;
2773
2774 let space = &mut self.spaces[SpaceId::Initial];
2775 if let Some(info) = space.take(0) {
2776 self.on_packet_acked(now, 0, info);
2777 };
2778
2779 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
2781 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2782 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2783 crypto_offset: client_hello.len() as u64,
2784 ..PacketSpace::new(now)
2785 };
2786 self.spaces[SpaceId::Initial]
2787 .pending
2788 .crypto
2789 .push_back(frame::Crypto {
2790 offset: 0,
2791 data: client_hello,
2792 });
2793
2794 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2796 for (pn, info) in zero_rtt {
2797 self.remove_in_flight(pn, &info);
2798 self.spaces[SpaceId::Data].pending |= info.retransmits;
2799 }
2800 self.streams.retransmit_all_for_0rtt();
2801
2802 let token_len = packet.payload.len() - 16;
2803 let ConnectionSide::Client { ref mut token, .. } = self.side else {
2804 unreachable!("we already short-circuited if we're server");
2805 };
2806 *token = packet.payload.freeze().split_to(token_len);
2807 self.state = State::Handshake(state::Handshake {
2808 expected_token: Bytes::new(),
2809 rem_cid_set: false,
2810 client_hello: None,
2811 });
2812 Ok(())
2813 }
2814 Header::Long {
2815 ty: LongType::Handshake,
2816 src_cid: rem_cid,
2817 ..
2818 } => {
2819 if rem_cid != self.rem_handshake_cid {
2820 debug!(
2821 "discarding packet with mismatched remote CID: {} != {}",
2822 self.rem_handshake_cid, rem_cid
2823 );
2824 return Ok(());
2825 }
2826 self.on_path_validated();
2827
2828 self.process_early_payload(now, packet)?;
2829 if self.state.is_closed() {
2830 return Ok(());
2831 }
2832
2833 if self.crypto.is_handshaking() {
2834 trace!("handshake ongoing");
2835 return Ok(());
2836 }
2837
2838 if self.side.is_client() {
2839 let params =
2841 self.crypto
2842 .transport_parameters()?
2843 .ok_or_else(|| TransportError {
2844 code: TransportErrorCode::crypto(0x6d),
2845 frame: None,
2846 reason: "transport parameters missing".into(),
2847 })?;
2848
2849 if self.has_0rtt() {
2850 if !self.crypto.early_data_accepted().unwrap() {
2851 debug_assert!(self.side.is_client());
2852 debug!("0-RTT rejected");
2853 self.accepted_0rtt = false;
2854 self.streams.zero_rtt_rejected();
2855
2856 self.spaces[SpaceId::Data].pending = Retransmits::default();
2858
2859 let sent_packets =
2861 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2862 for (pn, packet) in sent_packets {
2863 self.remove_in_flight(pn, &packet);
2864 }
2865 } else {
2866 self.accepted_0rtt = true;
2867 params.validate_resumption_from(&self.peer_params)?;
2868 }
2869 }
2870 if let Some(token) = params.stateless_reset_token {
2871 self.endpoint_events
2872 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2873 }
2874 self.handle_peer_params(params)?;
2875 self.issue_first_cids(now);
2876 } else {
2877 self.spaces[SpaceId::Data].pending.handshake_done = true;
2879 self.discard_space(now, SpaceId::Handshake);
2880 }
2881
2882 self.events.push_back(Event::Connected);
2883 self.state = State::Established;
2884 trace!("established");
2885 Ok(())
2886 }
2887 Header::Initial(InitialHeader {
2888 src_cid: rem_cid, ..
2889 }) => {
2890 if !state.rem_cid_set {
2891 trace!("switching remote CID to {}", rem_cid);
2892 let mut state = state.clone();
2893 self.rem_cids.update_initial_cid(rem_cid);
2894 self.rem_handshake_cid = rem_cid;
2895 self.orig_rem_cid = rem_cid;
2896 state.rem_cid_set = true;
2897 self.state = State::Handshake(state);
2898 } else if rem_cid != self.rem_handshake_cid {
2899 debug!(
2900 "discarding packet with mismatched remote CID: {} != {}",
2901 self.rem_handshake_cid, rem_cid
2902 );
2903 return Ok(());
2904 }
2905
2906 let starting_space = self.highest_space;
2907 self.process_early_payload(now, packet)?;
2908
2909 if self.side.is_server()
2910 && starting_space == SpaceId::Initial
2911 && self.highest_space != SpaceId::Initial
2912 {
2913 let params =
2914 self.crypto
2915 .transport_parameters()?
2916 .ok_or_else(|| TransportError {
2917 code: TransportErrorCode::crypto(0x6d),
2918 frame: None,
2919 reason: "transport parameters missing".into(),
2920 })?;
2921 self.handle_peer_params(params)?;
2922 self.issue_first_cids(now);
2923 self.init_0rtt();
2924 }
2925 Ok(())
2926 }
2927 Header::Long {
2928 ty: LongType::ZeroRtt,
2929 ..
2930 } => {
2931 self.process_payload(now, remote, number.unwrap(), packet)?;
2932 Ok(())
2933 }
2934 Header::VersionNegotiate { .. } => {
2935 if self.total_authed_packets > 1 {
2936 return Ok(());
2937 }
2938 let supported = packet
2939 .payload
2940 .chunks(4)
2941 .any(|x| match <[u8; 4]>::try_from(x) {
2942 Ok(version) => self.version == u32::from_be_bytes(version),
2943 Err(_) => false,
2944 });
2945 if supported {
2946 return Ok(());
2947 }
2948 debug!("remote doesn't support our version");
2949 Err(ConnectionError::VersionMismatch)
2950 }
2951 Header::Short { .. } => unreachable!(
2952 "short packets received during handshake are discarded in handle_packet"
2953 ),
2954 }
2955 }
2956
2957 fn process_early_payload(
2959 &mut self,
2960 now: Instant,
2961 packet: Packet,
2962 ) -> Result<(), TransportError> {
2963 debug_assert_ne!(packet.header.space(), SpaceId::Data);
2964 let payload_len = packet.payload.len();
2965 let mut ack_eliciting = false;
2966 for result in frame::Iter::new(packet.payload.freeze())? {
2967 let frame = result?;
2968 let span = match frame {
2969 Frame::Padding => continue,
2970 _ => Some(trace_span!("frame", ty = %frame.ty())),
2971 };
2972
2973 self.stats.frame_rx.record(&frame);
2974
2975 let _guard = span.as_ref().map(|x| x.enter());
2976 ack_eliciting |= frame.is_ack_eliciting();
2977
2978 match frame {
2980 Frame::Padding | Frame::Ping => {}
2981 Frame::Crypto(frame) => {
2982 self.read_crypto(packet.header.space(), &frame, payload_len)?;
2983 }
2984 Frame::Ack(ack) => {
2985 self.on_ack_received(now, packet.header.space(), ack)?;
2986 }
2987 Frame::Close(reason) => {
2988 self.error = Some(reason.into());
2989 self.state = State::Draining;
2990 return Ok(());
2991 }
2992 _ => {
2993 let mut err =
2994 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
2995 err.frame = Some(frame.ty());
2996 return Err(err);
2997 }
2998 }
2999 }
3000
3001 if ack_eliciting {
3002 self.spaces[packet.header.space()]
3004 .pending_acks
3005 .set_immediate_ack_required();
3006 }
3007
3008 self.write_crypto();
3009 Ok(())
3010 }
3011
3012 fn process_payload(
3013 &mut self,
3014 now: Instant,
3015 remote: SocketAddr,
3016 number: u64,
3017 packet: Packet,
3018 ) -> Result<(), TransportError> {
3019 let payload = packet.payload.freeze();
3020 let mut is_probing_packet = true;
3021 let mut close = None;
3022 let payload_len = payload.len();
3023 let mut ack_eliciting = false;
3024 for result in frame::Iter::new(payload)? {
3025 let frame = result?;
3026 let span = match frame {
3027 Frame::Padding => continue,
3028 _ => Some(trace_span!("frame", ty = %frame.ty())),
3029 };
3030
3031 self.stats.frame_rx.record(&frame);
3032 match &frame {
3035 Frame::Crypto(f) => {
3036 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
3037 }
3038 Frame::Stream(f) => {
3039 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
3040 }
3041 Frame::Datagram(f) => {
3042 trace!(len = f.data.len(), "got datagram frame");
3043 }
3044 f => {
3045 trace!("got frame {:?}", f);
3046 }
3047 }
3048
3049 let _guard = span.as_ref().map(|x| x.enter());
3050 if packet.header.is_0rtt() {
3051 match frame {
3052 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
3053 return Err(TransportError::PROTOCOL_VIOLATION(
3054 "illegal frame type in 0-RTT",
3055 ));
3056 }
3057 _ => {}
3058 }
3059 }
3060 ack_eliciting |= frame.is_ack_eliciting();
3061
3062 match frame {
3064 Frame::Padding
3065 | Frame::PathChallenge(_)
3066 | Frame::PathResponse(_)
3067 | Frame::NewConnectionId(_) => {}
3068 _ => {
3069 is_probing_packet = false;
3070 }
3071 }
3072 match frame {
3073 Frame::Crypto(frame) => {
3074 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
3075 }
3076 Frame::Stream(frame) => {
3077 if self.streams.received(frame, payload_len)?.should_transmit() {
3078 self.spaces[SpaceId::Data].pending.max_data = true;
3079 }
3080 }
3081 Frame::Ack(ack) => {
3082 self.on_ack_received(now, SpaceId::Data, ack)?;
3083 }
3084 Frame::Padding | Frame::Ping => {}
3085 Frame::Close(reason) => {
3086 close = Some(reason);
3087 }
3088 Frame::PathChallenge(token) => {
3089 self.path_responses.push(number, token, remote);
3090 if remote == self.path.remote {
3091 match self.peer_supports_ack_frequency() {
3094 true => self.immediate_ack(),
3095 false => self.ping(),
3096 }
3097 }
3098 }
3099 Frame::PathResponse(token) => {
3100 if self.path.challenge == Some(token) && remote == self.path.remote {
3101 trace!("new path validated");
3102 self.timers.stop(Timer::PathValidation);
3103 self.path.challenge = None;
3104 self.path.validated = true;
3105 if let Some((_, ref mut prev_path)) = self.prev_path {
3106 prev_path.challenge = None;
3107 prev_path.challenge_pending = false;
3108 }
3109 } else if let Some(nat_traversal) = &mut self.nat_traversal {
3110 match nat_traversal.handle_validation_success(remote, token) {
3112 Ok(sequence) => {
3113 trace!("NAT traversal candidate {} validated for sequence {}", remote, sequence);
3114
3115 if nat_traversal.handle_coordination_success(remote) {
3117 trace!("Coordination succeeded via {}", remote);
3118 } else {
3120 if nat_traversal.mark_pair_succeeded(remote) {
3122 trace!("NAT traversal pair succeeded for {}", remote);
3123 }
3124 }
3125 }
3126 Err(NatTraversalError::ChallengeMismatch) => {
3127 debug!("PATH_RESPONSE challenge mismatch for NAT candidate {}", remote);
3128 }
3129 Err(e) => {
3130 debug!("NAT traversal validation error: {}", e);
3131 }
3132 }
3133 } else {
3134 debug!(token, "ignoring invalid PATH_RESPONSE");
3135 }
3136 }
3137 Frame::MaxData(bytes) => {
3138 self.streams.received_max_data(bytes);
3139 }
3140 Frame::MaxStreamData { id, offset } => {
3141 self.streams.received_max_stream_data(id, offset)?;
3142 }
3143 Frame::MaxStreams { dir, count } => {
3144 self.streams.received_max_streams(dir, count)?;
3145 }
3146 Frame::ResetStream(frame) => {
3147 if self.streams.received_reset(frame)?.should_transmit() {
3148 self.spaces[SpaceId::Data].pending.max_data = true;
3149 }
3150 }
3151 Frame::DataBlocked { offset } => {
3152 debug!(offset, "peer claims to be blocked at connection level");
3153 }
3154 Frame::StreamDataBlocked { id, offset } => {
3155 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
3156 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
3157 return Err(TransportError::STREAM_STATE_ERROR(
3158 "STREAM_DATA_BLOCKED on send-only stream",
3159 ));
3160 }
3161 debug!(
3162 stream = %id,
3163 offset, "peer claims to be blocked at stream level"
3164 );
3165 }
3166 Frame::StreamsBlocked { dir, limit } => {
3167 if limit > MAX_STREAM_COUNT {
3168 return Err(TransportError::FRAME_ENCODING_ERROR(
3169 "unrepresentable stream limit",
3170 ));
3171 }
3172 debug!(
3173 "peer claims to be blocked opening more than {} {} streams",
3174 limit, dir
3175 );
3176 }
3177 Frame::StopSending(frame::StopSending { id, error_code }) => {
3178 if id.initiator() != self.side.side() {
3179 if id.dir() == Dir::Uni {
3180 debug!("got STOP_SENDING on recv-only {}", id);
3181 return Err(TransportError::STREAM_STATE_ERROR(
3182 "STOP_SENDING on recv-only stream",
3183 ));
3184 }
3185 } else if self.streams.is_local_unopened(id) {
3186 return Err(TransportError::STREAM_STATE_ERROR(
3187 "STOP_SENDING on unopened stream",
3188 ));
3189 }
3190 self.streams.received_stop_sending(id, error_code);
3191 }
3192 Frame::RetireConnectionId { sequence } => {
3193 let allow_more_cids = self
3194 .local_cid_state
3195 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
3196 self.endpoint_events
3197 .push_back(EndpointEventInner::RetireConnectionId(
3198 now,
3199 sequence,
3200 allow_more_cids,
3201 ));
3202 }
3203 Frame::NewConnectionId(frame) => {
3204 trace!(
3205 sequence = frame.sequence,
3206 id = %frame.id,
3207 retire_prior_to = frame.retire_prior_to,
3208 );
3209 if self.rem_cids.active().is_empty() {
3210 return Err(TransportError::PROTOCOL_VIOLATION(
3211 "NEW_CONNECTION_ID when CIDs aren't in use",
3212 ));
3213 }
3214 if frame.retire_prior_to > frame.sequence {
3215 return Err(TransportError::PROTOCOL_VIOLATION(
3216 "NEW_CONNECTION_ID retiring unissued CIDs",
3217 ));
3218 }
3219
3220 use crate::cid_queue::InsertError;
3221 match self.rem_cids.insert(frame) {
3222 Ok(None) => {}
3223 Ok(Some((retired, reset_token))) => {
3224 let pending_retired =
3225 &mut self.spaces[SpaceId::Data].pending.retire_cids;
3226 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
3229 if (pending_retired.len() as u64)
3232 .saturating_add(retired.end.saturating_sub(retired.start))
3233 > MAX_PENDING_RETIRED_CIDS
3234 {
3235 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
3236 "queued too many retired CIDs",
3237 ));
3238 }
3239 pending_retired.extend(retired);
3240 self.set_reset_token(reset_token);
3241 }
3242 Err(InsertError::ExceedsLimit) => {
3243 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
3244 }
3245 Err(InsertError::Retired) => {
3246 trace!("discarding already-retired");
3247 self.spaces[SpaceId::Data]
3251 .pending
3252 .retire_cids
3253 .push(frame.sequence);
3254 continue;
3255 }
3256 };
3257
3258 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
3259 self.update_rem_cid();
3262 }
3263 }
3264 Frame::NewToken(NewToken { token }) => {
3265 let ConnectionSide::Client {
3266 token_store,
3267 server_name,
3268 ..
3269 } = &self.side
3270 else {
3271 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
3272 };
3273 if token.is_empty() {
3274 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
3275 }
3276 trace!("got new token");
3277 token_store.insert(server_name, token);
3278 }
3279 Frame::Datagram(datagram) => {
3280 if self
3281 .datagrams
3282 .received(datagram, &self.config.datagram_receive_buffer_size)?
3283 {
3284 self.events.push_back(Event::DatagramReceived);
3285 }
3286 }
3287 Frame::AckFrequency(ack_frequency) => {
3288 let space = &mut self.spaces[SpaceId::Data];
3290
3291 if !self
3292 .ack_frequency
3293 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3294 {
3295 continue;
3297 }
3298
3299 if let Some(timeout) = space
3302 .pending_acks
3303 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3304 {
3305 self.timers.set(Timer::MaxAckDelay, timeout);
3306 }
3307 }
3308 Frame::ImmediateAck => {
3309 self.spaces[SpaceId::Data]
3311 .pending_acks
3312 .set_immediate_ack_required();
3313 }
3314 Frame::HandshakeDone => {
3315 if self.side.is_server() {
3316 return Err(TransportError::PROTOCOL_VIOLATION(
3317 "client sent HANDSHAKE_DONE",
3318 ));
3319 }
3320 if self.spaces[SpaceId::Handshake].crypto.is_some() {
3321 self.discard_space(now, SpaceId::Handshake);
3322 }
3323 }
3324 Frame::AddAddress(add_address) => {
3325 self.handle_add_address(&add_address, now)?;
3326 }
3327 Frame::PunchMeNow(punch_me_now) => {
3328 self.handle_punch_me_now(&punch_me_now, now)?;
3329 }
3330 Frame::RemoveAddress(remove_address) => {
3331 self.handle_remove_address(&remove_address)?;
3332 }
3333 }
3334 }
3335
3336 let space = &mut self.spaces[SpaceId::Data];
3337 if space
3338 .pending_acks
3339 .packet_received(now, number, ack_eliciting, &space.dedup)
3340 {
3341 self.timers
3342 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3343 }
3344
3345 let pending = &mut self.spaces[SpaceId::Data].pending;
3350 self.streams.queue_max_stream_id(pending);
3351
3352 if let Some(reason) = close {
3353 self.error = Some(reason.into());
3354 self.state = State::Draining;
3355 self.close = true;
3356 }
3357
3358 if remote != self.path.remote
3359 && !is_probing_packet
3360 && number == self.spaces[SpaceId::Data].rx_packet
3361 {
3362 let ConnectionSide::Server { ref server_config } = self.side else {
3363 panic!("packets from unknown remote should be dropped by clients");
3364 };
3365 debug_assert!(
3366 server_config.migration,
3367 "migration-initiating packets should have been dropped immediately"
3368 );
3369 self.migrate(now, remote);
3370 self.update_rem_cid();
3372 self.spin = false;
3373 }
3374
3375 Ok(())
3376 }
3377
3378 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3379 trace!(%remote, "migration initiated");
3380 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3384 PathData::from_previous(remote, &self.path, now)
3385 } else {
3386 let peer_max_udp_payload_size =
3387 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3388 .unwrap_or(u16::MAX);
3389 PathData::new(
3390 remote,
3391 self.allow_mtud,
3392 Some(peer_max_udp_payload_size),
3393 now,
3394 &self.config,
3395 )
3396 };
3397 new_path.challenge = Some(self.rng.random());
3398 new_path.challenge_pending = true;
3399 let prev_pto = self.pto(SpaceId::Data);
3400
3401 let mut prev = mem::replace(&mut self.path, new_path);
3402 if prev.challenge.is_none() {
3404 prev.challenge = Some(self.rng.random());
3405 prev.challenge_pending = true;
3406 self.prev_path = Some((self.rem_cids.active(), prev));
3409 }
3410
3411 self.timers.set(
3412 Timer::PathValidation,
3413 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3414 );
3415 }
3416
3417 pub fn local_address_changed(&mut self) {
3419 self.update_rem_cid();
3420 self.ping();
3421 }
3422
3423 fn update_rem_cid(&mut self) {
3425 let (reset_token, retired) = match self.rem_cids.next() {
3426 Some(x) => x,
3427 None => return,
3428 };
3429
3430 self.spaces[SpaceId::Data]
3432 .pending
3433 .retire_cids
3434 .extend(retired);
3435 self.set_reset_token(reset_token);
3436 }
3437
3438 fn set_reset_token(&mut self, reset_token: ResetToken) {
3439 self.endpoint_events
3440 .push_back(EndpointEventInner::ResetToken(
3441 self.path.remote,
3442 reset_token,
3443 ));
3444 self.peer_params.stateless_reset_token = Some(reset_token);
3445 }
3446
3447 fn issue_first_cids(&mut self, now: Instant) {
3449 if self.local_cid_state.cid_len() == 0 {
3450 return;
3451 }
3452
3453 let mut n = self.peer_params.issue_cids_limit() - 1;
3455 if let ConnectionSide::Server { server_config } = &self.side {
3456 if server_config.has_preferred_address() {
3457 n -= 1;
3459 }
3460 }
3461 self.endpoint_events
3462 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3463 }
3464
3465 fn populate_packet(
3466 &mut self,
3467 now: Instant,
3468 space_id: SpaceId,
3469 buf: &mut Vec<u8>,
3470 max_size: usize,
3471 pn: u64,
3472 ) -> SentFrames {
3473 let mut sent = SentFrames::default();
3474 let space = &mut self.spaces[space_id];
3475 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3476 space.pending_acks.maybe_ack_non_eliciting();
3477
3478 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3480 buf.write(frame::FrameType::HANDSHAKE_DONE);
3481 sent.retransmits.get_or_create().handshake_done = true;
3482 self.stats.frame_tx.handshake_done =
3484 self.stats.frame_tx.handshake_done.saturating_add(1);
3485 }
3486
3487 if mem::replace(&mut space.ping_pending, false) {
3489 trace!("PING");
3490 buf.write(frame::FrameType::PING);
3491 sent.non_retransmits = true;
3492 self.stats.frame_tx.ping += 1;
3493 }
3494
3495 if mem::replace(&mut space.immediate_ack_pending, false) {
3497 trace!("IMMEDIATE_ACK");
3498 buf.write(frame::FrameType::IMMEDIATE_ACK);
3499 sent.non_retransmits = true;
3500 self.stats.frame_tx.immediate_ack += 1;
3501 }
3502
3503 if space.pending_acks.can_send() {
3505 Self::populate_acks(
3506 now,
3507 self.receiving_ecn,
3508 &mut sent,
3509 space,
3510 buf,
3511 &mut self.stats,
3512 );
3513 }
3514
3515 if mem::replace(&mut space.pending.ack_frequency, false) {
3517 let sequence_number = self.ack_frequency.next_sequence_number();
3518
3519 let config = self.config.ack_frequency_config.as_ref().unwrap();
3521
3522 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3524 self.path.rtt.get(),
3525 config,
3526 &self.peer_params,
3527 );
3528
3529 trace!(?max_ack_delay, "ACK_FREQUENCY");
3530
3531 frame::AckFrequency {
3532 sequence: sequence_number,
3533 ack_eliciting_threshold: config.ack_eliciting_threshold,
3534 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3535 reordering_threshold: config.reordering_threshold,
3536 }
3537 .encode(buf);
3538
3539 sent.retransmits.get_or_create().ack_frequency = true;
3540
3541 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3542 self.stats.frame_tx.ack_frequency += 1;
3543 }
3544
3545 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3547 if let Some(token) = self.path.challenge {
3549 self.path.challenge_pending = false;
3551 sent.non_retransmits = true;
3552 sent.requires_padding = true;
3553 trace!("PATH_CHALLENGE {:08x}", token);
3554 buf.write(frame::FrameType::PATH_CHALLENGE);
3555 buf.write(token);
3556 self.stats.frame_tx.path_challenge += 1;
3557 }
3558 }
3559
3560 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3562 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3563 sent.non_retransmits = true;
3564 sent.requires_padding = true;
3565 trace!("PATH_RESPONSE {:08x}", token);
3566 buf.write(frame::FrameType::PATH_RESPONSE);
3567 buf.write(token);
3568 self.stats.frame_tx.path_response += 1;
3569 }
3570 }
3571
3572 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3574 let mut frame = match space.pending.crypto.pop_front() {
3575 Some(x) => x,
3576 None => break,
3577 };
3578
3579 let max_crypto_data_size = max_size
3584 - buf.len()
3585 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3587 - 2; let len = frame
3590 .data
3591 .len()
3592 .min(2usize.pow(14) - 1)
3593 .min(max_crypto_data_size);
3594
3595 let data = frame.data.split_to(len);
3596 let truncated = frame::Crypto {
3597 offset: frame.offset,
3598 data,
3599 };
3600 trace!(
3601 "CRYPTO: off {} len {}",
3602 truncated.offset,
3603 truncated.data.len()
3604 );
3605 truncated.encode(buf);
3606 self.stats.frame_tx.crypto += 1;
3607 sent.retransmits.get_or_create().crypto.push_back(truncated);
3608 if !frame.data.is_empty() {
3609 frame.offset += len as u64;
3610 space.pending.crypto.push_front(frame);
3611 }
3612 }
3613
3614 if space_id == SpaceId::Data {
3615 self.streams.write_control_frames(
3616 buf,
3617 &mut space.pending,
3618 &mut sent.retransmits,
3619 &mut self.stats.frame_tx,
3620 max_size,
3621 );
3622 }
3623
3624 while buf.len() + 44 < max_size {
3626 let issued = match space.pending.new_cids.pop() {
3627 Some(x) => x,
3628 None => break,
3629 };
3630 trace!(
3631 sequence = issued.sequence,
3632 id = %issued.id,
3633 "NEW_CONNECTION_ID"
3634 );
3635 frame::NewConnectionId {
3636 sequence: issued.sequence,
3637 retire_prior_to: self.local_cid_state.retire_prior_to(),
3638 id: issued.id,
3639 reset_token: issued.reset_token,
3640 }
3641 .encode(buf);
3642 sent.retransmits.get_or_create().new_cids.push(issued);
3643 self.stats.frame_tx.new_connection_id += 1;
3644 }
3645
3646 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3648 let seq = match space.pending.retire_cids.pop() {
3649 Some(x) => x,
3650 None => break,
3651 };
3652 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3653 buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3654 buf.write_var(seq);
3655 sent.retransmits.get_or_create().retire_cids.push(seq);
3656 self.stats.frame_tx.retire_connection_id += 1;
3657 }
3658
3659 let mut sent_datagrams = false;
3661 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3662 match self.datagrams.write(buf, max_size) {
3663 true => {
3664 sent_datagrams = true;
3665 sent.non_retransmits = true;
3666 self.stats.frame_tx.datagram += 1;
3667 }
3668 false => break,
3669 }
3670 }
3671 if self.datagrams.send_blocked && sent_datagrams {
3672 self.events.push_back(Event::DatagramsUnblocked);
3673 self.datagrams.send_blocked = false;
3674 }
3675
3676 while let Some(remote_addr) = space.pending.new_tokens.pop() {
3678 debug_assert_eq!(space_id, SpaceId::Data);
3679 let ConnectionSide::Server { server_config } = &self.side else {
3680 panic!("NEW_TOKEN frames should not be enqueued by clients");
3681 };
3682
3683 if remote_addr != self.path.remote {
3684 continue;
3689 }
3690
3691 let token = Token::new(
3692 TokenPayload::Validation {
3693 ip: remote_addr.ip(),
3694 issued: server_config.time_source.now(),
3695 },
3696 &mut self.rng,
3697 );
3698 let new_token = NewToken {
3699 token: token.encode(&*server_config.token_key).into(),
3700 };
3701
3702 if buf.len() + new_token.size() >= max_size {
3703 space.pending.new_tokens.push(remote_addr);
3704 break;
3705 }
3706
3707 new_token.encode(buf);
3708 sent.retransmits
3709 .get_or_create()
3710 .new_tokens
3711 .push(remote_addr);
3712 self.stats.frame_tx.new_token += 1;
3713 }
3714
3715 if space_id == SpaceId::Data {
3717 sent.stream_frames =
3718 self.streams
3719 .write_stream_frames(buf, max_size, self.config.send_fairness);
3720 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
3721 }
3722
3723 sent
3724 }
3725
3726 fn populate_acks(
3731 now: Instant,
3732 receiving_ecn: bool,
3733 sent: &mut SentFrames,
3734 space: &mut PacketSpace,
3735 buf: &mut Vec<u8>,
3736 stats: &mut ConnectionStats,
3737 ) {
3738 debug_assert!(!space.pending_acks.ranges().is_empty());
3739
3740 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
3742 let ecn = if receiving_ecn {
3743 Some(&space.ecn_counters)
3744 } else {
3745 None
3746 };
3747 sent.largest_acked = space.pending_acks.ranges().max();
3748
3749 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
3750
3751 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
3753 let delay = delay_micros >> ack_delay_exp.into_inner();
3754
3755 trace!(
3756 "ACK {:?}, Delay = {}us",
3757 space.pending_acks.ranges(),
3758 delay_micros
3759 );
3760
3761 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
3762 stats.frame_tx.acks += 1;
3763 }
3764
3765 fn close_common(&mut self) {
3766 trace!("connection closed");
3767 for &timer in &Timer::VALUES {
3768 self.timers.stop(timer);
3769 }
3770 }
3771
3772 fn set_close_timer(&mut self, now: Instant) {
3773 self.timers
3774 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
3775 }
3776
3777 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
3779 if Some(self.orig_rem_cid) != params.initial_src_cid
3780 || (self.side.is_client()
3781 && (Some(self.initial_dst_cid) != params.original_dst_cid
3782 || self.retry_src_cid != params.retry_src_cid))
3783 {
3784 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
3785 "CID authentication failure",
3786 ));
3787 }
3788
3789 self.set_peer_params(params);
3790
3791 Ok(())
3792 }
3793
3794 fn set_peer_params(&mut self, params: TransportParameters) {
3795 self.streams.set_params(¶ms);
3796 self.idle_timeout =
3797 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
3798 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
3799 if let Some(ref info) = params.preferred_address {
3800 self.rem_cids.insert(frame::NewConnectionId {
3801 sequence: 1,
3802 id: info.connection_id,
3803 reset_token: info.stateless_reset_token,
3804 retire_prior_to: 0,
3805 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
3806 }
3807 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
3808 self.peer_params = params;
3809 self.path.mtud.on_peer_max_udp_payload_size_received(
3810 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
3811 );
3812
3813 if let Some(nat_config) = ¶ms.nat_traversal {
3815 self.init_nat_traversal(nat_config);
3816 }
3817 }
3818
3819 fn init_nat_traversal(&mut self, peer_config: &crate::transport_parameters::NatTraversalConfig) {
3821 use crate::transport_parameters::NatTraversalRole as TPRole;
3822
3823 let role = match peer_config.role {
3825 TPRole::Client => NatTraversalRole::Client,
3826 TPRole::Server { can_relay } => NatTraversalRole::Server { can_relay },
3827 TPRole::Bootstrap => NatTraversalRole::Bootstrap,
3828 };
3829
3830 let max_candidates = peer_config.max_candidates.into_inner()
3832 .min(50) as u32; let coordination_timeout = Duration::from_millis(
3834 peer_config.coordination_timeout.into_inner().min(10000) );
3836
3837 self.nat_traversal = Some(NatTraversalState::new(
3838 role,
3839 max_candidates,
3840 coordination_timeout,
3841 ));
3842
3843 trace!("NAT traversal initialized with role {:?}", role);
3844 }
3845
3846 fn handle_add_address(
3848 &mut self,
3849 add_address: &crate::frame::AddAddress,
3850 now: Instant,
3851 ) -> Result<(), TransportError> {
3852 let nat_state = self.nat_traversal.as_mut()
3853 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("AddAddress frame without NAT traversal negotiation"))?;
3854
3855 match nat_state.add_remote_candidate(
3856 add_address.sequence,
3857 add_address.address,
3858 add_address.priority,
3859 now,
3860 ) {
3861 Ok(()) => {
3862 trace!("Added remote candidate: {} (seq={}, priority={})",
3863 add_address.address, add_address.sequence, add_address.priority);
3864
3865 Ok(())
3868 }
3869 Err(NatTraversalError::TooManyCandidates) => {
3870 Err(TransportError::PROTOCOL_VIOLATION("too many NAT traversal candidates"))
3871 }
3872 Err(NatTraversalError::DuplicateAddress) => {
3873 Ok(())
3875 }
3876 Err(e) => {
3877 warn!("Failed to add remote candidate: {}", e);
3878 Ok(()) }
3880 }
3881 }
3882
3883 fn handle_punch_me_now(
3885 &mut self,
3886 punch_me_now: &crate::frame::PunchMeNow,
3887 now: Instant,
3888 ) -> Result<(), TransportError> {
3889 let nat_state = self.nat_traversal.as_mut()
3890 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation"))?;
3891
3892 trace!("Received PunchMeNow: round={}, target_seq={}, local_addr={}",
3893 punch_me_now.round, punch_me_now.target_sequence, punch_me_now.local_address);
3894
3895 if nat_state.handle_peer_punch_request(punch_me_now.round, now) {
3897 trace!("Coordination synchronized for round {}", punch_me_now.round);
3898
3899 } else {
3907 debug!("Failed to synchronize coordination for round {}", punch_me_now.round);
3908 }
3909
3910 Ok(())
3911 }
3912
3913 fn handle_remove_address(
3915 &mut self,
3916 remove_address: &crate::frame::RemoveAddress,
3917 ) -> Result<(), TransportError> {
3918 let nat_state = self.nat_traversal.as_mut()
3919 .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("RemoveAddress frame without NAT traversal negotiation"))?;
3920
3921 if nat_state.remove_candidate(remove_address.sequence) {
3922 trace!("Removed candidate with sequence {}", remove_address.sequence);
3923 } else {
3924 trace!("Attempted to remove unknown candidate sequence {}", remove_address.sequence);
3925 }
3926
3927 Ok(())
3928 }
3929
3930 fn decrypt_packet(
3931 &mut self,
3932 now: Instant,
3933 packet: &mut Packet,
3934 ) -> Result<Option<u64>, Option<TransportError>> {
3935 let result = packet_crypto::decrypt_packet_body(
3936 packet,
3937 &self.spaces,
3938 self.zero_rtt_crypto.as_ref(),
3939 self.key_phase,
3940 self.prev_crypto.as_ref(),
3941 self.next_crypto.as_ref(),
3942 )?;
3943
3944 let result = match result {
3945 Some(r) => r,
3946 None => return Ok(None),
3947 };
3948
3949 if result.outgoing_key_update_acked {
3950 if let Some(prev) = self.prev_crypto.as_mut() {
3951 prev.end_packet = Some((result.number, now));
3952 self.set_key_discard_timer(now, packet.header.space());
3953 }
3954 }
3955
3956 if result.incoming_key_update {
3957 trace!("key update authenticated");
3958 self.update_keys(Some((result.number, now)), true);
3959 self.set_key_discard_timer(now, packet.header.space());
3960 }
3961
3962 Ok(Some(result.number))
3963 }
3964
3965 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
3966 trace!("executing key update");
3967 let new = self
3971 .crypto
3972 .next_1rtt_keys()
3973 .expect("only called for `Data` packets");
3974 self.key_phase_size = new
3975 .local
3976 .confidentiality_limit()
3977 .saturating_sub(KEY_UPDATE_MARGIN);
3978 let old = mem::replace(
3979 &mut self.spaces[SpaceId::Data]
3980 .crypto
3981 .as_mut()
3982 .unwrap() .packet,
3984 mem::replace(self.next_crypto.as_mut().unwrap(), new),
3985 );
3986 self.spaces[SpaceId::Data].sent_with_keys = 0;
3987 self.prev_crypto = Some(PrevCrypto {
3988 crypto: old,
3989 end_packet,
3990 update_unacked: remote,
3991 });
3992 self.key_phase = !self.key_phase;
3993 }
3994
3995 fn peer_supports_ack_frequency(&self) -> bool {
3996 self.peer_params.min_ack_delay.is_some()
3997 }
3998
3999 pub(crate) fn immediate_ack(&mut self) {
4004 self.spaces[self.highest_space].immediate_ack_pending = true;
4005 }
4006
4007 #[cfg(test)]
4009 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
4010 let (first_decode, remaining) = match &event.0 {
4011 ConnectionEventInner::Datagram(DatagramConnectionEvent {
4012 first_decode,
4013 remaining,
4014 ..
4015 }) => (first_decode, remaining),
4016 _ => return None,
4017 };
4018
4019 if remaining.is_some() {
4020 panic!("Packets should never be coalesced in tests");
4021 }
4022
4023 let decrypted_header = packet_crypto::unprotect_header(
4024 first_decode.clone(),
4025 &self.spaces,
4026 self.zero_rtt_crypto.as_ref(),
4027 self.peer_params.stateless_reset_token,
4028 )?;
4029
4030 let mut packet = decrypted_header.packet?;
4031 packet_crypto::decrypt_packet_body(
4032 &mut packet,
4033 &self.spaces,
4034 self.zero_rtt_crypto.as_ref(),
4035 self.key_phase,
4036 self.prev_crypto.as_ref(),
4037 self.next_crypto.as_ref(),
4038 )
4039 .ok()?;
4040
4041 Some(packet.payload.to_vec())
4042 }
4043
4044 #[cfg(test)]
4047 pub(crate) fn bytes_in_flight(&self) -> u64 {
4048 self.path.in_flight.bytes
4049 }
4050
4051 #[cfg(test)]
4053 pub(crate) fn congestion_window(&self) -> u64 {
4054 self.path
4055 .congestion
4056 .window()
4057 .saturating_sub(self.path.in_flight.bytes)
4058 }
4059
4060 #[cfg(test)]
4062 pub(crate) fn is_idle(&self) -> bool {
4063 Timer::VALUES
4064 .iter()
4065 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
4066 .filter_map(|&t| Some((t, self.timers.get(t)?)))
4067 .min_by_key(|&(_, time)| time)
4068 .map_or(true, |(timer, _)| timer == Timer::Idle)
4069 }
4070
4071 #[cfg(test)]
4073 pub(crate) fn lost_packets(&self) -> u64 {
4074 self.lost_packets
4075 }
4076
4077 #[cfg(test)]
4079 pub(crate) fn using_ecn(&self) -> bool {
4080 self.path.sending_ecn
4081 }
4082
4083 #[cfg(test)]
4085 pub(crate) fn total_recvd(&self) -> u64 {
4086 self.path.total_recvd
4087 }
4088
4089 #[cfg(test)]
4090 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
4091 self.local_cid_state.active_seq()
4092 }
4093
4094 #[cfg(test)]
4097 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
4098 let n = self.local_cid_state.assign_retire_seq(v);
4099 self.endpoint_events
4100 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
4101 }
4102
4103 #[cfg(test)]
4105 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
4106 self.rem_cids.active_seq()
4107 }
4108
4109 #[cfg(test)]
4111 pub(crate) fn path_mtu(&self) -> u16 {
4112 self.path.current_mtu()
4113 }
4114
4115 fn can_send_1rtt(&self, max_size: usize) -> bool {
4119 self.streams.can_send_stream_data()
4120 || self.path.challenge_pending
4121 || self
4122 .prev_path
4123 .as_ref()
4124 .is_some_and(|(_, x)| x.challenge_pending)
4125 || !self.path_responses.is_empty()
4126 || self
4127 .datagrams
4128 .outgoing
4129 .front()
4130 .is_some_and(|x| x.size(true) <= max_size)
4131 }
4132
4133 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
4135 for path in [&mut self.path]
4137 .into_iter()
4138 .chain(self.prev_path.as_mut().map(|(_, data)| data))
4139 {
4140 if path.remove_in_flight(pn, packet) {
4141 return;
4142 }
4143 }
4144 }
4145
4146 fn kill(&mut self, reason: ConnectionError) {
4148 self.close_common();
4149 self.error = Some(reason);
4150 self.state = State::Drained;
4151 self.endpoint_events.push_back(EndpointEventInner::Drained);
4152 }
4153
4154 pub fn current_mtu(&self) -> u16 {
4158 self.path.current_mtu()
4159 }
4160
4161 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
4168 let pn_len = match pn {
4169 Some(pn) => PacketNumber::new(
4170 pn,
4171 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
4172 )
4173 .len(),
4174 None => 4,
4176 };
4177
4178 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
4180 }
4181
4182 fn tag_len_1rtt(&self) -> usize {
4183 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
4184 Some(crypto) => Some(&*crypto.packet.local),
4185 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
4186 };
4187 key.map_or(16, |x| x.tag_len())
4191 }
4192
4193 fn on_path_validated(&mut self) {
4195 self.path.validated = true;
4196 let ConnectionSide::Server { server_config } = &self.side else {
4197 return;
4198 };
4199 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
4200 new_tokens.clear();
4201 for _ in 0..server_config.validation_token.sent {
4202 new_tokens.push(self.path.remote);
4203 }
4204 }
4205}
4206
4207impl fmt::Debug for Connection {
4208 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4209 f.debug_struct("Connection")
4210 .field("handshake_cid", &self.handshake_cid)
4211 .finish()
4212 }
4213}
4214
4215enum ConnectionSide {
4217 Client {
4218 token: Bytes,
4220 token_store: Arc<dyn TokenStore>,
4221 server_name: String,
4222 },
4223 Server {
4224 server_config: Arc<ServerConfig>,
4225 },
4226}
4227
4228impl ConnectionSide {
4229 fn remote_may_migrate(&self) -> bool {
4230 match self {
4231 Self::Server { server_config } => server_config.migration,
4232 Self::Client { .. } => false,
4233 }
4234 }
4235
4236 fn is_client(&self) -> bool {
4237 self.side().is_client()
4238 }
4239
4240 fn is_server(&self) -> bool {
4241 self.side().is_server()
4242 }
4243
4244 fn side(&self) -> Side {
4245 match *self {
4246 Self::Client { .. } => Side::Client,
4247 Self::Server { .. } => Side::Server,
4248 }
4249 }
4250}
4251
4252impl From<SideArgs> for ConnectionSide {
4253 fn from(side: SideArgs) -> Self {
4254 match side {
4255 SideArgs::Client {
4256 token_store,
4257 server_name,
4258 } => Self::Client {
4259 token: token_store.take(&server_name).unwrap_or_default(),
4260 token_store,
4261 server_name,
4262 },
4263 SideArgs::Server {
4264 server_config,
4265 pref_addr_cid: _,
4266 path_validated: _,
4267 } => Self::Server { server_config },
4268 }
4269 }
4270}
4271
4272pub(crate) enum SideArgs {
4274 Client {
4275 token_store: Arc<dyn TokenStore>,
4276 server_name: String,
4277 },
4278 Server {
4279 server_config: Arc<ServerConfig>,
4280 pref_addr_cid: Option<ConnectionId>,
4281 path_validated: bool,
4282 },
4283}
4284
4285impl SideArgs {
4286 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
4287 match *self {
4288 Self::Client { .. } => None,
4289 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
4290 }
4291 }
4292
4293 pub(crate) fn path_validated(&self) -> bool {
4294 match *self {
4295 Self::Client { .. } => true,
4296 Self::Server { path_validated, .. } => path_validated,
4297 }
4298 }
4299
4300 pub(crate) fn side(&self) -> Side {
4301 match *self {
4302 Self::Client { .. } => Side::Client,
4303 Self::Server { .. } => Side::Server,
4304 }
4305 }
4306}
4307
4308#[derive(Debug, Error, Clone, PartialEq, Eq)]
4310pub enum ConnectionError {
4311 #[error("peer doesn't implement any supported version")]
4313 VersionMismatch,
4314 #[error(transparent)]
4316 TransportError(#[from] TransportError),
4317 #[error("aborted by peer: {0}")]
4319 ConnectionClosed(frame::ConnectionClose),
4320 #[error("closed by peer: {0}")]
4322 ApplicationClosed(frame::ApplicationClose),
4323 #[error("reset by peer")]
4325 Reset,
4326 #[error("timed out")]
4332 TimedOut,
4333 #[error("closed")]
4335 LocallyClosed,
4336 #[error("CIDs exhausted")]
4340 CidsExhausted,
4341}
4342
4343impl From<Close> for ConnectionError {
4344 fn from(x: Close) -> Self {
4345 match x {
4346 Close::Connection(reason) => Self::ConnectionClosed(reason),
4347 Close::Application(reason) => Self::ApplicationClosed(reason),
4348 }
4349 }
4350}
4351
4352impl From<ConnectionError> for io::Error {
4354 fn from(x: ConnectionError) -> Self {
4355 use ConnectionError::*;
4356 let kind = match x {
4357 TimedOut => io::ErrorKind::TimedOut,
4358 Reset => io::ErrorKind::ConnectionReset,
4359 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
4360 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
4361 io::ErrorKind::Other
4362 }
4363 };
4364 Self::new(kind, x)
4365 }
4366}
4367
4368#[allow(unreachable_pub)] #[derive(Clone)]
4370pub enum State {
4371 Handshake(state::Handshake),
4372 Established,
4373 Closed(state::Closed),
4374 Draining,
4375 Drained,
4377}
4378
4379impl State {
4380 fn closed<R: Into<Close>>(reason: R) -> Self {
4381 Self::Closed(state::Closed {
4382 reason: reason.into(),
4383 })
4384 }
4385
4386 fn is_handshake(&self) -> bool {
4387 matches!(*self, Self::Handshake(_))
4388 }
4389
4390 fn is_established(&self) -> bool {
4391 matches!(*self, Self::Established)
4392 }
4393
4394 fn is_closed(&self) -> bool {
4395 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
4396 }
4397
4398 fn is_drained(&self) -> bool {
4399 matches!(*self, Self::Drained)
4400 }
4401}
4402
4403mod state {
4404 use super::*;
4405
4406 #[allow(unreachable_pub)] #[derive(Clone)]
4408 pub struct Handshake {
4409 pub(super) rem_cid_set: bool,
4413 pub(super) expected_token: Bytes,
4417 pub(super) client_hello: Option<Bytes>,
4421 }
4422
4423 #[allow(unreachable_pub)] #[derive(Clone)]
4425 pub struct Closed {
4426 pub(super) reason: Close,
4427 }
4428}
4429
4430#[derive(Debug)]
4432pub enum Event {
4433 HandshakeDataReady,
4435 Connected,
4437 ConnectionLost {
4441 reason: ConnectionError,
4443 },
4444 Stream(StreamEvent),
4446 DatagramReceived,
4448 DatagramsUnblocked,
4450}
4451
4452fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
4453 if x > y { x - y } else { Duration::ZERO }
4454}
4455
4456fn get_max_ack_delay(params: &TransportParameters) -> Duration {
4457 Duration::from_micros(params.max_ack_delay.0 * 1000)
4458}
4459
4460const MAX_BACKOFF_EXPONENT: u32 = 16;
4462
4463const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
4471
4472const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
4478 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
4479
4480const KEY_UPDATE_MARGIN: u64 = 10_000;
4484
4485#[derive(Default)]
4486struct SentFrames {
4487 retransmits: ThinRetransmits,
4488 largest_acked: Option<u64>,
4489 stream_frames: StreamMetaVec,
4490 non_retransmits: bool,
4492 requires_padding: bool,
4493}
4494
4495impl SentFrames {
4496 fn is_ack_only(&self, streams: &StreamsState) -> bool {
4498 self.largest_acked.is_some()
4499 && !self.non_retransmits
4500 && self.stream_frames.is_empty()
4501 && self.retransmits.is_empty(streams)
4502 }
4503}
4504
4505fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
4513 match (x, y) {
4514 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
4515 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
4516 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
4517 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
4518 }
4519}
4520
4521#[cfg(test)]
4522mod tests {
4523 use super::*;
4524
4525 #[test]
4526 fn negotiate_max_idle_timeout_commutative() {
4527 let test_params = [
4528 (None, None, None),
4529 (None, Some(VarInt(0)), None),
4530 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
4531 (Some(VarInt(0)), Some(VarInt(0)), None),
4532 (
4533 Some(VarInt(2)),
4534 Some(VarInt(0)),
4535 Some(Duration::from_millis(2)),
4536 ),
4537 (
4538 Some(VarInt(1)),
4539 Some(VarInt(4)),
4540 Some(Duration::from_millis(1)),
4541 ),
4542 ];
4543
4544 for (left, right, result) in test_params {
4545 assert_eq!(negotiate_max_idle_timeout(left, right), result);
4546 assert_eq!(negotiate_max_idle_timeout(right, left), result);
4547 }
4548 }
4549}