1use std::{
2 cmp,
3 collections::VecDeque,
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12
13use rand::{Rng, SeedableRng, rngs::StdRng};
14use thiserror::Error;
15use tracing::{debug, error, trace, trace_span, warn};
16
17use crate::{
18 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
19 MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
20 TransportErrorCode, VarInt,
21 cid_generator::ConnectionIdGenerator,
22 cid_queue::CidQueue,
23 coding::BufMutExt,
24 config::{ServerConfig, TransportConfig},
25 crypto::{self, KeyPair, Keys, PacketKey},
26 frame::{self, Close, Datagram, FrameStruct, NewConnectionId, NewToken},
27 packet::{
28 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
29 PacketNumber, PartialDecode, SpaceId,
30 },
31 range_set::ArrayRangeSet,
32 shared::{
33 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
34 EndpointEvent, EndpointEventInner,
35 },
36 token::{ResetToken, Token, TokenPayload},
37 transport_parameters::TransportParameters,
38};
39
40mod ack_frequency;
41use ack_frequency::AckFrequencyState;
42
43mod assembler;
44pub use assembler::Chunk;
45
46mod cid_state;
47use cid_state::CidState;
48
49mod datagrams;
50use datagrams::DatagramState;
51pub use datagrams::{Datagrams, SendDatagramError};
52
53mod mtud;
54mod pacing;
55
56mod packet_builder;
57use packet_builder::PacketBuilder;
58
59mod packet_crypto;
60use packet_crypto::{PrevCrypto, ZeroRttCrypto};
61
62mod paths;
63pub use paths::RttEstimator;
64use paths::{PathData, PathResponses};
65
66pub(crate) mod qlog;
67
68mod send_buffer;
69
70mod spaces;
71#[cfg(fuzzing)]
72pub use spaces::Retransmits;
73#[cfg(not(fuzzing))]
74use spaces::Retransmits;
75use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
76
77mod stats;
78pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
79
80mod streams;
81#[cfg(fuzzing)]
82pub use streams::StreamsState;
83#[cfg(not(fuzzing))]
84use streams::StreamsState;
85pub use streams::{
86 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
87 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
88};
89
90mod timer;
91use crate::congestion::Controller;
92use timer::{Timer, TimerTable};
93
94pub struct Connection {
134 endpoint_config: Arc<EndpointConfig>,
135 config: Arc<TransportConfig>,
136 rng: StdRng,
137 crypto: Box<dyn crypto::Session>,
138 handshake_cid: ConnectionId,
140 rem_handshake_cid: ConnectionId,
142 local_ip: Option<IpAddr>,
145 path: PathData,
146 path_counter: u64,
150 allow_mtud: bool,
152 prev_path: Option<(ConnectionId, PathData)>,
153 state: State,
154 side: ConnectionSide,
155 zero_rtt_enabled: bool,
157 zero_rtt_crypto: Option<ZeroRttCrypto>,
159 key_phase: bool,
160 key_phase_size: u64,
162 peer_params: TransportParameters,
164 orig_rem_cid: ConnectionId,
166 initial_dst_cid: ConnectionId,
168 retry_src_cid: Option<ConnectionId>,
171 events: VecDeque<Event>,
172 endpoint_events: VecDeque<EndpointEventInner>,
173 spin_enabled: bool,
175 spin: bool,
177 spaces: [PacketSpace; 3],
179 highest_space: SpaceId,
181 prev_crypto: Option<PrevCrypto>,
183 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
188 accepted_0rtt: bool,
189 permit_idle_reset: bool,
191 idle_timeout: Option<Duration>,
193 timers: TimerTable,
194 authentication_failures: u64,
196 error: Option<ConnectionError>,
198 packet_number_filter: PacketNumberFilter,
200
201 path_responses: PathResponses,
206 close: bool,
207
208 ack_frequency: AckFrequencyState,
212
213 pto_count: u32,
218
219 receiving_ecn: bool,
224 total_authed_packets: u64,
226 app_limited: bool,
229
230 streams: StreamsState,
231 rem_cids: CidQueue,
233 local_cid_state: CidState,
235 datagrams: DatagramState,
237 stats: ConnectionStats,
239 version: u32,
241}
242
243impl Connection {
244 pub(crate) fn new(
245 endpoint_config: Arc<EndpointConfig>,
246 config: Arc<TransportConfig>,
247 init_cid: ConnectionId,
248 loc_cid: ConnectionId,
249 rem_cid: ConnectionId,
250 remote: SocketAddr,
251 local_ip: Option<IpAddr>,
252 crypto: Box<dyn crypto::Session>,
253 cid_gen: &dyn ConnectionIdGenerator,
254 now: Instant,
255 version: u32,
256 allow_mtud: bool,
257 rng_seed: [u8; 32],
258 side_args: SideArgs,
259 ) -> Self {
260 let pref_addr_cid = side_args.pref_addr_cid();
261 let path_validated = side_args.path_validated();
262 let connection_side = ConnectionSide::from(side_args);
263 let side = connection_side.side();
264 let initial_space = PacketSpace {
265 crypto: Some(crypto.initial_keys(&init_cid, side)),
266 ..PacketSpace::new(now)
267 };
268 let state = State::Handshake(state::Handshake {
269 rem_cid_set: side.is_server(),
270 expected_token: Bytes::new(),
271 client_hello: None,
272 });
273 let mut rng = StdRng::from_seed(rng_seed);
274 let mut this = Self {
275 endpoint_config,
276 crypto,
277 handshake_cid: loc_cid,
278 rem_handshake_cid: rem_cid,
279 local_cid_state: CidState::new(
280 cid_gen.cid_len(),
281 cid_gen.cid_lifetime(),
282 now,
283 if pref_addr_cid.is_some() { 2 } else { 1 },
284 ),
285 path: PathData::new(remote, allow_mtud, None, 0, now, &config),
286 path_counter: 0,
287 allow_mtud,
288 local_ip,
289 prev_path: None,
290 state,
291 side: connection_side,
292 zero_rtt_enabled: false,
293 zero_rtt_crypto: None,
294 key_phase: false,
295 key_phase_size: rng.random_range(10..1000),
302 peer_params: TransportParameters::default(),
303 orig_rem_cid: rem_cid,
304 initial_dst_cid: init_cid,
305 retry_src_cid: None,
306 events: VecDeque::new(),
307 endpoint_events: VecDeque::new(),
308 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
309 spin: false,
310 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
311 highest_space: SpaceId::Initial,
312 prev_crypto: None,
313 next_crypto: None,
314 accepted_0rtt: false,
315 permit_idle_reset: true,
316 idle_timeout: match config.max_idle_timeout {
317 None | Some(VarInt(0)) => None,
318 Some(dur) => Some(Duration::from_millis(dur.0)),
319 },
320 timers: TimerTable::default(),
321 authentication_failures: 0,
322 error: None,
323 #[cfg(test)]
324 packet_number_filter: match config.deterministic_packet_numbers {
325 false => PacketNumberFilter::new(&mut rng),
326 true => PacketNumberFilter::disabled(),
327 },
328 #[cfg(not(test))]
329 packet_number_filter: PacketNumberFilter::new(&mut rng),
330
331 path_responses: PathResponses::default(),
332 close: false,
333
334 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
335 &TransportParameters::default(),
336 )),
337
338 pto_count: 0,
339
340 app_limited: false,
341 receiving_ecn: false,
342 total_authed_packets: 0,
343
344 streams: StreamsState::new(
345 side,
346 config.max_concurrent_uni_streams,
347 config.max_concurrent_bidi_streams,
348 config.send_window,
349 config.receive_window,
350 config.stream_receive_window,
351 ),
352 datagrams: DatagramState::default(),
353 config,
354 rem_cids: CidQueue::new(rem_cid),
355 rng,
356 stats: ConnectionStats::default(),
357 version,
358 };
359 if path_validated {
360 this.on_path_validated();
361 }
362 if side.is_client() {
363 this.write_crypto();
365 this.init_0rtt();
366 }
367 this
368 }
369
370 #[must_use]
378 pub fn poll_timeout(&mut self) -> Option<Instant> {
379 self.timers.next_timeout()
380 }
381
382 #[must_use]
388 pub fn poll(&mut self) -> Option<Event> {
389 if let Some(x) = self.events.pop_front() {
390 return Some(x);
391 }
392
393 if let Some(event) = self.streams.poll() {
394 return Some(Event::Stream(event));
395 }
396
397 if let Some(err) = self.error.take() {
398 return Some(Event::ConnectionLost { reason: err });
399 }
400
401 None
402 }
403
404 #[must_use]
406 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
407 self.endpoint_events.pop_front().map(EndpointEvent)
408 }
409
410 #[must_use]
412 pub fn streams(&mut self) -> Streams<'_> {
413 Streams {
414 state: &mut self.streams,
415 conn_state: &self.state,
416 }
417 }
418
419 #[must_use]
421 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
422 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
423 RecvStream {
424 id,
425 state: &mut self.streams,
426 pending: &mut self.spaces[SpaceId::Data].pending,
427 }
428 }
429
430 #[must_use]
432 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
433 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
434 SendStream {
435 id,
436 state: &mut self.streams,
437 pending: &mut self.spaces[SpaceId::Data].pending,
438 conn_state: &self.state,
439 }
440 }
441
442 #[must_use]
452 pub fn poll_transmit(
453 &mut self,
454 now: Instant,
455 max_datagrams: usize,
456 buf: &mut Vec<u8>,
457 ) -> Option<Transmit> {
458 assert!(max_datagrams != 0);
459 let max_datagrams = match self.config.enable_segmentation_offload {
460 false => 1,
461 true => max_datagrams,
462 };
463
464 let mut num_datagrams = 0;
465 let mut datagram_start = 0;
468 let mut segment_size = usize::from(self.path.current_mtu());
469
470 if let Some(challenge) = self.send_path_challenge(now, buf) {
471 return Some(challenge);
472 }
473
474 for space in SpaceId::iter() {
476 let request_immediate_ack =
477 space == SpaceId::Data && self.peer_supports_ack_frequency();
478 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
479 }
480
481 let close = match self.state {
483 State::Drained => {
484 self.app_limited = true;
485 return None;
486 }
487 State::Draining | State::Closed(_) => {
488 if !self.close {
491 self.app_limited = true;
492 return None;
493 }
494 true
495 }
496 _ => false,
497 };
498
499 if let Some(config) = &self.config.ack_frequency_config {
501 self.spaces[SpaceId::Data].pending.ack_frequency = self
502 .ack_frequency
503 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
504 && self.highest_space == SpaceId::Data
505 && self.peer_supports_ack_frequency();
506 }
507
508 let mut buf_capacity = 0;
512
513 let mut coalesce = true;
514 let mut builder_storage: Option<PacketBuilder> = None;
515 let mut sent_frames = None;
516 let mut pad_datagram = false;
517 let mut pad_datagram_to_mtu = false;
518 let mut congestion_blocked = false;
519
520 let mut space_idx = 0;
522 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
523 while space_idx < spaces.len() {
526 let space_id = spaces[space_idx];
527 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
534 let frame_space_1rtt =
535 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
536
537 let can_send = self.space_can_send(space_id, frame_space_1rtt);
539 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
540 space_idx += 1;
541 continue;
542 }
543
544 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
545 || self.spaces[space_id].ping_pending
546 || self.spaces[space_id].immediate_ack_pending;
547 if space_id == SpaceId::Data {
548 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
549 }
550
551 pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
552
553 let buf_end = if let Some(builder) = &builder_storage {
557 buf.len().max(builder.min_size) + builder.tag_len
558 } else {
559 buf.len()
560 };
561
562 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
563 crypto.packet.local.tag_len()
564 } else if space_id == SpaceId::Data {
565 self.zero_rtt_crypto.as_ref().expect(
566 "sending packets in the application data space requires known 0-RTT or 1-RTT keys",
567 ).packet.tag_len()
568 } else {
569 unreachable!("tried to send {:?} packet without keys", space_id)
570 };
571 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
572 if num_datagrams >= max_datagrams {
576 break;
578 }
579
580 if self
587 .path
588 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
589 {
590 trace!("blocked by anti-amplification");
591 break;
592 }
593
594 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
597 let untracked_bytes = if let Some(builder) = &builder_storage {
599 buf_capacity - builder.partial_encode.start
600 } else {
601 0
602 } as u64;
603 debug_assert!(untracked_bytes <= segment_size as u64);
604
605 let bytes_to_send = segment_size as u64 + untracked_bytes;
606 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
607 space_idx += 1;
608 congestion_blocked = true;
609 trace!("blocked by congestion control");
612 continue;
613 }
614
615 let smoothed_rtt = self.path.rtt.get();
617 if let Some(delay) = self.path.pacing.delay(
618 smoothed_rtt,
619 bytes_to_send,
620 self.path.current_mtu(),
621 self.path.congestion.window(),
622 now,
623 ) {
624 self.timers.set(Timer::Pacing, delay);
625 congestion_blocked = true;
626 trace!("blocked by pacing");
629 break;
630 }
631 }
632
633 if let Some(mut builder) = builder_storage.take() {
635 if pad_datagram {
636 builder.pad_to(MIN_INITIAL_SIZE);
637 }
638
639 if num_datagrams > 1 || pad_datagram_to_mtu {
640 const MAX_PADDING: usize = 16;
653 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
654 - datagram_start
655 + builder.tag_len;
656 if (packet_len_unpadded + MAX_PADDING < segment_size
657 && !pad_datagram_to_mtu)
658 || datagram_start + segment_size > buf_capacity
659 {
660 trace!(
661 "GSO truncated by demand for {} padding bytes or loss probe",
662 segment_size - packet_len_unpadded
663 );
664 builder_storage = Some(builder);
665 break;
666 }
667
668 builder.pad_to(segment_size as u16);
671 }
672
673 builder.finish_and_track(now, self, sent_frames.take(), buf);
674
675 if num_datagrams == 1 {
676 segment_size = buf.len();
683 buf_capacity = buf.len();
686
687 if space_id == SpaceId::Data {
694 let frame_space_1rtt =
695 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
696 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
697 break;
698 }
699 }
700 }
701 }
702
703 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
705 0 => segment_size,
706 _ => {
707 self.spaces[space_id].loss_probes -= 1;
708 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
712 }
713 };
714 buf_capacity += next_datagram_size_limit;
715 if buf.capacity() < buf_capacity {
716 buf.reserve(max_datagrams * segment_size);
725 }
726 num_datagrams += 1;
727 coalesce = true;
728 pad_datagram = false;
729 datagram_start = buf.len();
730
731 debug_assert_eq!(
732 datagram_start % segment_size,
733 0,
734 "datagrams in a GSO batch must be aligned to the segment size"
735 );
736 } else {
737 if let Some(builder) = builder_storage.take() {
741 builder.finish_and_track(now, self, sent_frames.take(), buf);
742 }
743 }
744
745 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
746
747 if self.spaces[SpaceId::Initial].crypto.is_some()
752 && space_id == SpaceId::Handshake
753 && self.side.is_client()
754 {
755 self.discard_space(now, SpaceId::Initial);
758 }
759 if let Some(ref mut prev) = self.prev_crypto {
760 prev.update_unacked = false;
761 }
762
763 debug_assert!(
764 builder_storage.is_none() && sent_frames.is_none(),
765 "Previous packet must have been finished"
766 );
767
768 let builder = builder_storage.insert(PacketBuilder::new(
769 now,
770 space_id,
771 self.rem_cids.active(),
772 buf,
773 buf_capacity,
774 datagram_start,
775 ack_eliciting,
776 self,
777 )?);
778 coalesce = coalesce && !builder.short_header;
779
780 pad_datagram |=
782 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
783
784 if close {
785 trace!("sending CONNECTION_CLOSE");
786 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
791 Self::populate_acks(
792 now,
793 self.receiving_ecn,
794 &mut SentFrames::default(),
795 &mut self.spaces[space_id],
796 buf,
797 &mut self.stats,
798 );
799 }
800
801 debug_assert!(
805 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
806 "ACKs should leave space for ConnectionClose"
807 );
808 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
809 let max_frame_size = builder.max_size - buf.len();
810 match self.state {
811 State::Closed(state::Closed { ref reason }) => {
812 if space_id == SpaceId::Data || reason.is_transport_layer() {
813 reason.encode(buf, max_frame_size)
814 } else {
815 frame::ConnectionClose {
816 error_code: TransportErrorCode::APPLICATION_ERROR,
817 frame_type: None,
818 reason: Bytes::new(),
819 }
820 .encode(buf, max_frame_size)
821 }
822 }
823 State::Draining => frame::ConnectionClose {
824 error_code: TransportErrorCode::NO_ERROR,
825 frame_type: None,
826 reason: Bytes::new(),
827 }
828 .encode(buf, max_frame_size),
829 _ => unreachable!(
830 "tried to make a close packet when the connection wasn't closed"
831 ),
832 }
833 }
834 if space_id == self.highest_space {
835 self.close = false;
837 break;
839 } else {
840 space_idx += 1;
844 continue;
845 }
846 }
847
848 if space_id == SpaceId::Data && num_datagrams == 1 {
851 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
852 let mut builder = builder_storage.take().unwrap();
855 trace!("PATH_RESPONSE {:08x} (off-path)", token);
856 buf.write(frame::FrameType::PATH_RESPONSE);
857 buf.write(token);
858 self.stats.frame_tx.path_response += 1;
859 builder.pad_to(MIN_INITIAL_SIZE);
860 builder.finish_and_track(
861 now,
862 self,
863 Some(SentFrames {
864 non_retransmits: true,
865 ..SentFrames::default()
866 }),
867 buf,
868 );
869 self.stats.udp_tx.on_sent(1, buf.len());
870 return Some(Transmit {
871 destination: remote,
872 size: buf.len(),
873 ecn: None,
874 segment_size: None,
875 src_ip: self.local_ip,
876 });
877 }
878 }
879
880 let sent =
881 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
882
883 debug_assert!(
890 !(sent.is_ack_only(&self.streams)
891 && !can_send.acks
892 && can_send.other
893 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
894 && self.datagrams.outgoing.is_empty()),
895 "SendableFrames was {can_send:?}, but only ACKs have been written"
896 );
897 pad_datagram |= sent.requires_padding;
898
899 if sent.largest_acked.is_some() {
900 self.spaces[space_id].pending_acks.acks_sent();
901 self.timers.stop(Timer::MaxAckDelay);
902 }
903
904 sent_frames = Some(sent);
906
907 }
910
911 if let Some(mut builder) = builder_storage {
913 if pad_datagram {
914 builder.pad_to(MIN_INITIAL_SIZE);
915 }
916
917 if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
923 builder.pad_to(segment_size as u16);
924 }
925
926 let last_packet_number = builder.exact_number;
927 builder.finish_and_track(now, self, sent_frames, buf);
928 self.path
929 .congestion
930 .on_sent(now, buf.len() as u64, last_packet_number);
931
932 self.config.qlog_sink.emit_recovery_metrics(
933 self.pto_count,
934 &mut self.path,
935 now,
936 self.orig_rem_cid,
937 );
938 }
939
940 self.app_limited = buf.is_empty() && !congestion_blocked;
941
942 if buf.is_empty() && self.state.is_established() {
944 let space_id = SpaceId::Data;
945 let probe_size = self
946 .path
947 .mtud
948 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
949
950 let buf_capacity = probe_size as usize;
951 buf.reserve(buf_capacity);
952
953 let mut builder = PacketBuilder::new(
954 now,
955 space_id,
956 self.rem_cids.active(),
957 buf,
958 buf_capacity,
959 0,
960 true,
961 self,
962 )?;
963
964 buf.write(frame::FrameType::PING);
966 self.stats.frame_tx.ping += 1;
967
968 if self.peer_supports_ack_frequency() {
970 buf.write(frame::FrameType::IMMEDIATE_ACK);
971 self.stats.frame_tx.immediate_ack += 1;
972 }
973
974 builder.pad_to(probe_size);
975 let sent_frames = SentFrames {
976 non_retransmits: true,
977 ..Default::default()
978 };
979 builder.finish_and_track(now, self, Some(sent_frames), buf);
980
981 self.stats.path.sent_plpmtud_probes += 1;
982 num_datagrams = 1;
983
984 trace!(?probe_size, "writing MTUD probe");
985 }
986
987 if buf.is_empty() {
988 return None;
989 }
990
991 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
992 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
993
994 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
995
996 Some(Transmit {
997 destination: self.path.remote,
998 size: buf.len(),
999 ecn: if self.path.sending_ecn {
1000 Some(EcnCodepoint::Ect0)
1001 } else {
1002 None
1003 },
1004 segment_size: match num_datagrams {
1005 1 => None,
1006 _ => Some(segment_size),
1007 },
1008 src_ip: self.local_ip,
1009 })
1010 }
1011
1012 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1014 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1015 if !prev_path.challenge_pending {
1016 return None;
1017 }
1018 prev_path.challenge_pending = false;
1019 let token = prev_path
1020 .challenge
1021 .expect("previous path challenge pending without token");
1022 let destination = prev_path.remote;
1023 debug_assert_eq!(
1024 self.highest_space,
1025 SpaceId::Data,
1026 "PATH_CHALLENGE queued without 1-RTT keys"
1027 );
1028 buf.reserve(MIN_INITIAL_SIZE as usize);
1029
1030 let buf_capacity = buf.capacity();
1031
1032 let mut builder = PacketBuilder::new(
1038 now,
1039 SpaceId::Data,
1040 *prev_cid,
1041 buf,
1042 buf_capacity,
1043 0,
1044 false,
1045 self,
1046 )?;
1047 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1048 buf.write(frame::FrameType::PATH_CHALLENGE);
1049 buf.write(token);
1050 self.stats.frame_tx.path_challenge += 1;
1051
1052 builder.pad_to(MIN_INITIAL_SIZE);
1057
1058 builder.finish(self, now, buf);
1059 self.stats.udp_tx.on_sent(1, buf.len());
1060
1061 Some(Transmit {
1062 destination,
1063 size: buf.len(),
1064 ecn: None,
1065 segment_size: None,
1066 src_ip: self.local_ip,
1067 })
1068 }
1069
1070 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1072 if self.spaces[space_id].crypto.is_none()
1073 && (space_id != SpaceId::Data
1074 || self.zero_rtt_crypto.is_none()
1075 || self.side.is_server())
1076 {
1077 return SendableFrames::empty();
1079 }
1080 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1081 if space_id == SpaceId::Data {
1082 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1083 }
1084 can_send
1085 }
1086
1087 pub fn handle_event(&mut self, event: ConnectionEvent) {
1093 use ConnectionEventInner::*;
1094 match event.0 {
1095 Datagram(DatagramConnectionEvent {
1096 now,
1097 remote,
1098 ecn,
1099 first_decode,
1100 remaining,
1101 }) => {
1102 if remote != self.path.remote && !self.side.remote_may_migrate() {
1106 trace!("discarding packet from unrecognized peer {}", remote);
1107 return;
1108 }
1109
1110 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1111
1112 self.stats.udp_rx.datagrams += 1;
1113 self.stats.udp_rx.bytes += first_decode.len() as u64;
1114 let data_len = first_decode.len();
1115
1116 self.handle_decode(now, remote, ecn, first_decode);
1117 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1122
1123 if let Some(data) = remaining {
1124 self.stats.udp_rx.bytes += data.len() as u64;
1125 self.handle_coalesced(now, remote, ecn, data);
1126 }
1127
1128 self.config.qlog_sink.emit_recovery_metrics(
1129 self.pto_count,
1130 &mut self.path,
1131 now,
1132 self.orig_rem_cid,
1133 );
1134
1135 if was_anti_amplification_blocked {
1136 self.set_loss_detection_timer(now);
1140 }
1141 }
1142 NewIdentifiers(ids, now) => {
1143 self.local_cid_state.new_cids(&ids, now);
1144 ids.into_iter().rev().for_each(|frame| {
1145 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1146 });
1147 if self
1149 .timers
1150 .get(Timer::PushNewCid)
1151 .map_or(true, |x| x <= now)
1152 {
1153 self.reset_cid_retirement();
1154 }
1155 }
1156 }
1157 }
1158
1159 pub fn handle_timeout(&mut self, now: Instant) {
1169 for &timer in &Timer::VALUES {
1170 if !self.timers.is_expired(timer, now) {
1171 continue;
1172 }
1173 self.timers.stop(timer);
1174 trace!(timer = ?timer, "timeout");
1175 match timer {
1176 Timer::Close => {
1177 self.state = State::Drained;
1178 self.endpoint_events.push_back(EndpointEventInner::Drained);
1179 }
1180 Timer::Idle => {
1181 self.kill(ConnectionError::TimedOut);
1182 }
1183 Timer::KeepAlive => {
1184 trace!("sending keep-alive");
1185 self.ping();
1186 }
1187 Timer::LossDetection => {
1188 self.on_loss_detection_timeout(now);
1189
1190 self.config.qlog_sink.emit_recovery_metrics(
1191 self.pto_count,
1192 &mut self.path,
1193 now,
1194 self.orig_rem_cid,
1195 );
1196 }
1197 Timer::KeyDiscard => {
1198 self.zero_rtt_crypto = None;
1199 self.prev_crypto = None;
1200 }
1201 Timer::PathValidation => {
1202 debug!("path validation failed");
1203 if let Some((_, prev)) = self.prev_path.take() {
1204 self.path = prev;
1205 }
1206 self.path.challenge = None;
1207 self.path.challenge_pending = false;
1208 }
1209 Timer::Pacing => trace!("pacing timer expired"),
1210 Timer::PushNewCid => {
1211 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1213 if !self.state.is_closed() {
1214 trace!(
1215 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1216 self.local_cid_state.retire_prior_to()
1217 );
1218 self.endpoint_events
1219 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1220 }
1221 }
1222 Timer::MaxAckDelay => {
1223 trace!("max ack delay reached");
1224 self.spaces[SpaceId::Data]
1226 .pending_acks
1227 .on_max_ack_delay_timeout()
1228 }
1229 }
1230 }
1231 }
1232
1233 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1245 self.close_inner(
1246 now,
1247 Close::Application(frame::ApplicationClose { error_code, reason }),
1248 )
1249 }
1250
1251 fn close_inner(&mut self, now: Instant, reason: Close) {
1252 let was_closed = self.state.is_closed();
1253 if !was_closed {
1254 self.close_common();
1255 self.set_close_timer(now);
1256 self.close = true;
1257 self.state = State::Closed(state::Closed { reason });
1258 }
1259 }
1260
1261 pub fn datagrams(&mut self) -> Datagrams<'_> {
1263 Datagrams { conn: self }
1264 }
1265
1266 pub fn stats(&self) -> ConnectionStats {
1268 let mut stats = self.stats;
1269 stats.path.rtt = self.path.rtt.get();
1270 stats.path.cwnd = self.path.congestion.window();
1271 stats.path.current_mtu = self.path.mtud.current_mtu();
1272
1273 stats
1274 }
1275
1276 pub fn ping(&mut self) {
1280 self.spaces[self.highest_space].ping_pending = true;
1281 }
1282
1283 pub fn force_key_update(&mut self) {
1287 if !self.state.is_established() {
1288 debug!("ignoring forced key update in illegal state");
1289 return;
1290 }
1291 if self.prev_crypto.is_some() {
1292 debug!("ignoring redundant forced key update");
1295 return;
1296 }
1297 self.update_keys(None, false);
1298 }
1299
1300 #[doc(hidden)]
1302 #[deprecated]
1303 pub fn initiate_key_update(&mut self) {
1304 self.force_key_update();
1305 }
1306
1307 pub fn crypto_session(&self) -> &dyn crypto::Session {
1309 &*self.crypto
1310 }
1311
1312 pub fn is_handshaking(&self) -> bool {
1317 self.state.is_handshake()
1318 }
1319
1320 pub fn is_closed(&self) -> bool {
1328 self.state.is_closed()
1329 }
1330
1331 pub fn is_drained(&self) -> bool {
1336 self.state.is_drained()
1337 }
1338
1339 pub fn accepted_0rtt(&self) -> bool {
1343 self.accepted_0rtt
1344 }
1345
1346 pub fn has_0rtt(&self) -> bool {
1348 self.zero_rtt_enabled
1349 }
1350
1351 pub fn has_pending_retransmits(&self) -> bool {
1353 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1354 }
1355
1356 pub fn side(&self) -> Side {
1358 self.side.side()
1359 }
1360
1361 pub fn remote_address(&self) -> SocketAddr {
1363 self.path.remote
1364 }
1365
1366 pub fn local_ip(&self) -> Option<IpAddr> {
1376 self.local_ip
1377 }
1378
1379 pub fn rtt(&self) -> Duration {
1381 self.path.rtt.get()
1382 }
1383
1384 pub fn congestion_state(&self) -> &dyn Controller {
1386 self.path.congestion.as_ref()
1387 }
1388
1389 pub fn path_changed(&mut self, now: Instant) {
1400 self.path.reset(now, &self.config);
1401 }
1402
1403 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1408 self.streams.set_max_concurrent(dir, count);
1409 let pending = &mut self.spaces[SpaceId::Data].pending;
1412 self.streams.queue_max_stream_id(pending);
1413 }
1414
1415 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1421 self.streams.max_concurrent(dir)
1422 }
1423
1424 pub fn set_send_window(&mut self, send_window: u64) {
1426 self.streams.set_send_window(send_window);
1427 }
1428
1429 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1431 if self.streams.set_receive_window(receive_window) {
1432 self.spaces[SpaceId::Data].pending.max_data = true;
1433 }
1434 }
1435
1436 fn on_ack_received(
1437 &mut self,
1438 now: Instant,
1439 space: SpaceId,
1440 ack: frame::Ack,
1441 ) -> Result<(), TransportError> {
1442 if ack.largest >= self.spaces[space].next_packet_number {
1443 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1444 }
1445 let new_largest = {
1446 let space = &mut self.spaces[space];
1447 if space
1448 .largest_acked_packet
1449 .map_or(true, |pn| ack.largest > pn)
1450 {
1451 space.largest_acked_packet = Some(ack.largest);
1452 if let Some(info) = space.sent_packets.get(&ack.largest) {
1453 space.largest_acked_packet_sent = info.time_sent;
1457 }
1458 true
1459 } else {
1460 false
1461 }
1462 };
1463
1464 let mut newly_acked = ArrayRangeSet::new();
1466 for range in ack.iter() {
1467 self.packet_number_filter.check_ack(space, range.clone())?;
1468 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1469 newly_acked.insert_one(pn);
1470 }
1471 }
1472
1473 if newly_acked.is_empty() {
1474 return Ok(());
1475 }
1476
1477 let mut ack_eliciting_acked = false;
1478 for packet in newly_acked.elts() {
1479 if let Some(info) = self.spaces[space].take(packet) {
1480 if let Some(acked) = info.largest_acked {
1481 self.spaces[space].pending_acks.subtract_below(acked);
1487 }
1488 ack_eliciting_acked |= info.ack_eliciting;
1489
1490 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1492 if mtu_updated {
1493 self.path
1494 .congestion
1495 .on_mtu_update(self.path.mtud.current_mtu());
1496 }
1497
1498 self.ack_frequency.on_acked(packet);
1500
1501 self.on_packet_acked(now, info);
1502 }
1503 }
1504
1505 self.path.congestion.on_end_acks(
1506 now,
1507 self.path.in_flight.bytes,
1508 self.app_limited,
1509 self.spaces[space].largest_acked_packet,
1510 );
1511
1512 if new_largest && ack_eliciting_acked {
1513 let ack_delay = if space != SpaceId::Data {
1514 Duration::from_micros(0)
1515 } else {
1516 cmp::min(
1517 self.ack_frequency.peer_max_ack_delay,
1518 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1519 )
1520 };
1521 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1522 self.path.rtt.update(ack_delay, rtt);
1523 if self.path.first_packet_after_rtt_sample.is_none() {
1524 self.path.first_packet_after_rtt_sample =
1525 Some((space, self.spaces[space].next_packet_number));
1526 }
1527 }
1528
1529 self.detect_lost_packets(now, space, true);
1531
1532 if self.peer_completed_address_validation() {
1533 self.pto_count = 0;
1534 }
1535
1536 if self.path.sending_ecn {
1538 if let Some(ecn) = ack.ecn {
1539 if new_largest {
1544 let sent = self.spaces[space].largest_acked_packet_sent;
1545 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1546 }
1547 } else {
1548 debug!("ECN not acknowledged by peer");
1550 self.path.sending_ecn = false;
1551 }
1552 }
1553
1554 self.set_loss_detection_timer(now);
1555 Ok(())
1556 }
1557
1558 fn process_ecn(
1560 &mut self,
1561 now: Instant,
1562 space: SpaceId,
1563 newly_acked: u64,
1564 ecn: frame::EcnCounts,
1565 largest_sent_time: Instant,
1566 ) {
1567 match self.spaces[space].detect_ecn(newly_acked, ecn) {
1568 Err(e) => {
1569 debug!("halting ECN due to verification failure: {}", e);
1570 self.path.sending_ecn = false;
1571 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
1574 }
1575 Ok(false) => {}
1576 Ok(true) => {
1577 self.stats.path.congestion_events += 1;
1578 self.path
1579 .congestion
1580 .on_congestion_event(now, largest_sent_time, false, 0);
1581 }
1582 }
1583 }
1584
1585 fn on_packet_acked(&mut self, now: Instant, info: SentPacket) {
1588 self.remove_in_flight(&info);
1589 if info.ack_eliciting && self.path.challenge.is_none() {
1590 self.path.congestion.on_ack(
1593 now,
1594 info.time_sent,
1595 info.size.into(),
1596 self.app_limited,
1597 &self.path.rtt,
1598 );
1599 }
1600
1601 if let Some(retransmits) = info.retransmits.get() {
1603 for (id, _) in retransmits.reset_stream.iter() {
1604 self.streams.reset_acked(*id);
1605 }
1606 }
1607
1608 for frame in info.stream_frames {
1609 self.streams.received_ack_of(frame);
1610 }
1611 }
1612
1613 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
1614 let start = if self.zero_rtt_crypto.is_some() {
1615 now
1616 } else {
1617 self.prev_crypto
1618 .as_ref()
1619 .expect("no previous keys")
1620 .end_packet
1621 .as_ref()
1622 .expect("update not acknowledged yet")
1623 .1
1624 };
1625 self.timers
1626 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
1627 }
1628
1629 fn on_loss_detection_timeout(&mut self, now: Instant) {
1630 if let Some((_, pn_space)) = self.loss_time_and_space() {
1631 self.detect_lost_packets(now, pn_space, false);
1633 self.set_loss_detection_timer(now);
1634 return;
1635 }
1636
1637 let (_, space) = match self.pto_time_and_space(now) {
1638 Some(x) => x,
1639 None => {
1640 error!("PTO expired while unset");
1641 return;
1642 }
1643 };
1644 trace!(
1645 in_flight = self.path.in_flight.bytes,
1646 count = self.pto_count,
1647 ?space,
1648 "PTO fired"
1649 );
1650
1651 let count = match self.path.in_flight.ack_eliciting {
1652 0 => {
1655 debug_assert!(!self.peer_completed_address_validation());
1656 1
1657 }
1658 _ => 2,
1660 };
1661 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
1662 self.pto_count = self.pto_count.saturating_add(1);
1663 self.set_loss_detection_timer(now);
1664 }
1665
1666 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
1667 let mut lost_packets = Vec::<u64>::new();
1668 let mut lost_mtu_probe = None;
1669 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
1670 let rtt = self.path.rtt.conservative();
1671 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
1672
1673 let lost_send_time = now.checked_sub(loss_delay).unwrap();
1675 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
1676 let packet_threshold = self.config.packet_threshold as u64;
1677 let mut size_of_lost_packets = 0u64;
1678
1679 let congestion_period =
1683 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
1684 let mut persistent_congestion_start: Option<Instant> = None;
1685 let mut prev_packet = None;
1686 let mut in_persistent_congestion = false;
1687
1688 let space = &mut self.spaces[pn_space];
1689 space.loss_time = None;
1690
1691 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
1692 if prev_packet != Some(packet.wrapping_sub(1)) {
1693 persistent_congestion_start = None;
1695 }
1696
1697 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
1698 {
1699 if Some(packet) == in_flight_mtu_probe {
1700 lost_mtu_probe = in_flight_mtu_probe;
1703 } else {
1704 lost_packets.push(packet);
1705 size_of_lost_packets += info.size as u64;
1706 if info.ack_eliciting && due_to_ack {
1707 match persistent_congestion_start {
1708 Some(start) if info.time_sent - start > congestion_period => {
1711 in_persistent_congestion = true;
1712 }
1713 None if self
1715 .path
1716 .first_packet_after_rtt_sample
1717 .is_some_and(|x| x < (pn_space, packet)) =>
1718 {
1719 persistent_congestion_start = Some(info.time_sent);
1720 }
1721 _ => {}
1722 }
1723 }
1724 }
1725 } else {
1726 let next_loss_time = info.time_sent + loss_delay;
1727 space.loss_time = Some(
1728 space
1729 .loss_time
1730 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
1731 );
1732 persistent_congestion_start = None;
1733 }
1734
1735 prev_packet = Some(packet);
1736 }
1737
1738 if let Some(largest_lost) = lost_packets.last().cloned() {
1740 let old_bytes_in_flight = self.path.in_flight.bytes;
1741 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
1742 self.stats.path.lost_packets += lost_packets.len() as u64;
1743 self.stats.path.lost_bytes += size_of_lost_packets;
1744 trace!(
1745 "packets lost: {:?}, bytes lost: {}",
1746 lost_packets, size_of_lost_packets
1747 );
1748
1749 for &packet in &lost_packets {
1750 let info = self.spaces[pn_space].take(packet).unwrap(); self.config.qlog_sink.emit_packet_lost(
1752 packet,
1753 &info,
1754 lost_send_time,
1755 pn_space,
1756 now,
1757 self.orig_rem_cid,
1758 );
1759 self.remove_in_flight(&info);
1760 for frame in info.stream_frames {
1761 self.streams.retransmit(frame);
1762 }
1763 self.spaces[pn_space].pending |= info.retransmits;
1764 self.path.mtud.on_non_probe_lost(packet, info.size);
1765 }
1766
1767 if self.path.mtud.black_hole_detected(now) {
1768 self.stats.path.black_holes_detected += 1;
1769 self.path
1770 .congestion
1771 .on_mtu_update(self.path.mtud.current_mtu());
1772 if let Some(max_datagram_size) = self.datagrams().max_size() {
1773 self.datagrams.drop_oversized(max_datagram_size);
1774 }
1775 }
1776
1777 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
1779
1780 if lost_ack_eliciting {
1781 self.stats.path.congestion_events += 1;
1782 self.path.congestion.on_congestion_event(
1783 now,
1784 largest_lost_sent,
1785 in_persistent_congestion,
1786 size_of_lost_packets,
1787 );
1788 }
1789 }
1790
1791 if let Some(packet) = lost_mtu_probe {
1793 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(&info);
1795 self.path.mtud.on_probe_lost();
1796 self.stats.path.lost_plpmtud_probes += 1;
1797 }
1798 }
1799
1800 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
1801 SpaceId::iter()
1802 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
1803 .min_by_key(|&(time, _)| time)
1804 }
1805
1806 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
1807 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
1808 let mut duration = self.path.rtt.pto_base() * backoff;
1809
1810 if self.path.in_flight.ack_eliciting == 0 {
1811 debug_assert!(!self.peer_completed_address_validation());
1812 let space = match self.highest_space {
1813 SpaceId::Handshake => SpaceId::Handshake,
1814 _ => SpaceId::Initial,
1815 };
1816 return Some((now + duration, space));
1817 }
1818
1819 let mut result = None;
1820 for space in SpaceId::iter() {
1821 if !self.spaces[space].has_in_flight() {
1822 continue;
1823 }
1824 if space == SpaceId::Data {
1825 if self.is_handshaking() {
1827 return result;
1828 }
1829 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
1831 }
1832 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
1833 Some(time) => time,
1834 None => continue,
1835 };
1836 let pto = last_ack_eliciting + duration;
1837 if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
1838 result = Some((pto, space));
1839 }
1840 }
1841 result
1842 }
1843
1844 fn peer_completed_address_validation(&self) -> bool {
1845 if self.side.is_server() || self.state.is_closed() {
1846 return true;
1847 }
1848 self.spaces[SpaceId::Handshake]
1851 .largest_acked_packet
1852 .is_some()
1853 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
1854 || (self.spaces[SpaceId::Data].crypto.is_some()
1855 && self.spaces[SpaceId::Handshake].crypto.is_none())
1856 }
1857
1858 fn set_loss_detection_timer(&mut self, now: Instant) {
1859 if self.state.is_closed() {
1860 return;
1864 }
1865
1866 if let Some((loss_time, _)) = self.loss_time_and_space() {
1867 self.timers.set(Timer::LossDetection, loss_time);
1869 return;
1870 }
1871
1872 if self.path.anti_amplification_blocked(1) {
1873 self.timers.stop(Timer::LossDetection);
1875 return;
1876 }
1877
1878 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
1879 self.timers.stop(Timer::LossDetection);
1882 return;
1883 }
1884
1885 if let Some((timeout, _)) = self.pto_time_and_space(now) {
1888 self.timers.set(Timer::LossDetection, timeout);
1889 } else {
1890 self.timers.stop(Timer::LossDetection);
1891 }
1892 }
1893
1894 fn pto(&self, space: SpaceId) -> Duration {
1896 let max_ack_delay = match space {
1897 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
1898 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
1899 };
1900 self.path.rtt.pto_base() + max_ack_delay
1901 }
1902
1903 fn on_packet_authenticated(
1904 &mut self,
1905 now: Instant,
1906 space_id: SpaceId,
1907 ecn: Option<EcnCodepoint>,
1908 packet: Option<u64>,
1909 spin: bool,
1910 is_1rtt: bool,
1911 ) {
1912 self.total_authed_packets += 1;
1913 self.reset_keep_alive(now);
1914 self.reset_idle_timeout(now, space_id);
1915 self.permit_idle_reset = true;
1916 self.receiving_ecn |= ecn.is_some();
1917 if let Some(x) = ecn {
1918 let space = &mut self.spaces[space_id];
1919 space.ecn_counters += x;
1920
1921 if x.is_ce() {
1922 space.pending_acks.set_immediate_ack_required();
1923 }
1924 }
1925
1926 let packet = match packet {
1927 Some(x) => x,
1928 None => return,
1929 };
1930 if self.side.is_server() {
1931 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
1932 self.discard_space(now, SpaceId::Initial);
1934 }
1935 if self.zero_rtt_crypto.is_some() && is_1rtt {
1936 self.set_key_discard_timer(now, space_id)
1938 }
1939 }
1940 let space = &mut self.spaces[space_id];
1941 space.pending_acks.insert_one(packet, now);
1942 if packet >= space.rx_packet {
1943 space.rx_packet = packet;
1944 self.spin = self.side.is_client() ^ spin;
1946 }
1947
1948 self.config.qlog_sink.emit_packet_received(
1949 packet,
1950 space_id,
1951 !is_1rtt,
1952 now,
1953 self.orig_rem_cid,
1954 );
1955 }
1956
1957 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
1958 let timeout = match self.idle_timeout {
1959 None => return,
1960 Some(dur) => dur,
1961 };
1962 if self.state.is_closed() {
1963 self.timers.stop(Timer::Idle);
1964 return;
1965 }
1966 let dt = cmp::max(timeout, 3 * self.pto(space));
1967 self.timers.set(Timer::Idle, now + dt);
1968 }
1969
1970 fn reset_keep_alive(&mut self, now: Instant) {
1971 let interval = match self.config.keep_alive_interval {
1972 Some(x) if self.state.is_established() => x,
1973 _ => return,
1974 };
1975 self.timers.set(Timer::KeepAlive, now + interval);
1976 }
1977
1978 fn reset_cid_retirement(&mut self) {
1979 if let Some(t) = self.local_cid_state.next_timeout() {
1980 self.timers.set(Timer::PushNewCid, t);
1981 }
1982 }
1983
1984 pub(crate) fn handle_first_packet(
1989 &mut self,
1990 now: Instant,
1991 remote: SocketAddr,
1992 ecn: Option<EcnCodepoint>,
1993 packet_number: u64,
1994 packet: InitialPacket,
1995 remaining: Option<BytesMut>,
1996 ) -> Result<(), ConnectionError> {
1997 let span = trace_span!("first recv");
1998 let _guard = span.enter();
1999 debug_assert!(self.side.is_server());
2000 let len = packet.header_data.len() + packet.payload.len();
2001 self.path.total_recvd = len as u64;
2002
2003 match self.state {
2004 State::Handshake(ref mut state) => {
2005 state.expected_token = packet.header.token.clone();
2006 }
2007 _ => unreachable!("first packet must be delivered in Handshake state"),
2008 }
2009
2010 self.on_packet_authenticated(
2011 now,
2012 SpaceId::Initial,
2013 ecn,
2014 Some(packet_number),
2015 false,
2016 false,
2017 );
2018
2019 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2020 if let Some(data) = remaining {
2021 self.handle_coalesced(now, remote, ecn, data);
2022 }
2023
2024 self.config.qlog_sink.emit_recovery_metrics(
2025 self.pto_count,
2026 &mut self.path,
2027 now,
2028 self.orig_rem_cid,
2029 );
2030
2031 Ok(())
2032 }
2033
2034 fn init_0rtt(&mut self) {
2035 let (header, packet) = match self.crypto.early_crypto() {
2036 Some(x) => x,
2037 None => return,
2038 };
2039 if self.side.is_client() {
2040 match self.crypto.transport_parameters() {
2041 Ok(params) => {
2042 let params = params
2043 .expect("crypto layer didn't supply transport parameters with ticket");
2044 let params = TransportParameters {
2046 initial_src_cid: None,
2047 original_dst_cid: None,
2048 preferred_address: None,
2049 retry_src_cid: None,
2050 stateless_reset_token: None,
2051 min_ack_delay: None,
2052 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2053 max_ack_delay: TransportParameters::default().max_ack_delay,
2054 ..params
2055 };
2056 self.set_peer_params(params);
2057 }
2058 Err(e) => {
2059 error!("session ticket has malformed transport parameters: {}", e);
2060 return;
2061 }
2062 }
2063 }
2064 trace!("0-RTT enabled");
2065 self.zero_rtt_enabled = true;
2066 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2067 }
2068
2069 fn read_crypto(
2070 &mut self,
2071 space: SpaceId,
2072 crypto: &frame::Crypto,
2073 payload_len: usize,
2074 ) -> Result<(), TransportError> {
2075 let expected = if !self.state.is_handshake() {
2076 SpaceId::Data
2077 } else if self.highest_space == SpaceId::Initial {
2078 SpaceId::Initial
2079 } else {
2080 SpaceId::Handshake
2083 };
2084 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2088
2089 let end = crypto.offset + crypto.data.len() as u64;
2090 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2091 warn!(
2092 "received new {:?} CRYPTO data when expecting {:?}",
2093 space, expected
2094 );
2095 return Err(TransportError::PROTOCOL_VIOLATION(
2096 "new data at unexpected encryption level",
2097 ));
2098 }
2099
2100 let space = &mut self.spaces[space];
2101 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2102 if max > self.config.crypto_buffer_size as u64 {
2103 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2104 }
2105
2106 space
2107 .crypto_stream
2108 .insert(crypto.offset, crypto.data.clone(), payload_len);
2109 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2110 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2111 if self.crypto.read_handshake(&chunk.bytes)? {
2112 self.events.push_back(Event::HandshakeDataReady);
2113 }
2114 }
2115
2116 Ok(())
2117 }
2118
2119 fn write_crypto(&mut self) {
2120 loop {
2121 let space = self.highest_space;
2122 let mut outgoing = Vec::new();
2123 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2124 match space {
2125 SpaceId::Initial => {
2126 self.upgrade_crypto(SpaceId::Handshake, crypto);
2127 }
2128 SpaceId::Handshake => {
2129 self.upgrade_crypto(SpaceId::Data, crypto);
2130 }
2131 _ => unreachable!("got updated secrets during 1-RTT"),
2132 }
2133 }
2134 if outgoing.is_empty() {
2135 if space == self.highest_space {
2136 break;
2137 } else {
2138 continue;
2140 }
2141 }
2142 let offset = self.spaces[space].crypto_offset;
2143 let outgoing = Bytes::from(outgoing);
2144 if let State::Handshake(ref mut state) = self.state {
2145 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2146 state.client_hello = Some(outgoing.clone());
2147 }
2148 }
2149 self.spaces[space].crypto_offset += outgoing.len() as u64;
2150 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2151 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2152 offset,
2153 data: outgoing,
2154 });
2155 }
2156 }
2157
2158 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2160 debug_assert!(
2161 self.spaces[space].crypto.is_none(),
2162 "already reached packet space {space:?}"
2163 );
2164 trace!("{:?} keys ready", space);
2165 if space == SpaceId::Data {
2166 self.next_crypto = Some(
2168 self.crypto
2169 .next_1rtt_keys()
2170 .expect("handshake should be complete"),
2171 );
2172 }
2173
2174 self.spaces[space].crypto = Some(crypto);
2175 debug_assert!(space as usize > self.highest_space as usize);
2176 self.highest_space = space;
2177 if space == SpaceId::Data && self.side.is_client() {
2178 self.zero_rtt_crypto = None;
2180 }
2181 }
2182
2183 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2184 debug_assert!(space_id != SpaceId::Data);
2185 trace!("discarding {:?} keys", space_id);
2186 if space_id == SpaceId::Initial {
2187 if let ConnectionSide::Client { token, .. } = &mut self.side {
2189 *token = Bytes::new();
2190 }
2191 }
2192 let space = &mut self.spaces[space_id];
2193 space.crypto = None;
2194 space.time_of_last_ack_eliciting_packet = None;
2195 space.loss_time = None;
2196 let sent_packets = mem::take(&mut space.sent_packets);
2197 for packet in sent_packets.into_values() {
2198 self.remove_in_flight(&packet);
2199 }
2200 self.set_loss_detection_timer(now)
2201 }
2202
2203 fn handle_coalesced(
2204 &mut self,
2205 now: Instant,
2206 remote: SocketAddr,
2207 ecn: Option<EcnCodepoint>,
2208 data: BytesMut,
2209 ) {
2210 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2211 let mut remaining = Some(data);
2212 while let Some(data) = remaining {
2213 match PartialDecode::new(
2214 data,
2215 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2216 &[self.version],
2217 self.endpoint_config.grease_quic_bit,
2218 ) {
2219 Ok((partial_decode, rest)) => {
2220 remaining = rest;
2221 self.handle_decode(now, remote, ecn, partial_decode);
2222 }
2223 Err(e) => {
2224 trace!("malformed header: {}", e);
2225 return;
2226 }
2227 }
2228 }
2229 }
2230
2231 fn handle_decode(
2232 &mut self,
2233 now: Instant,
2234 remote: SocketAddr,
2235 ecn: Option<EcnCodepoint>,
2236 partial_decode: PartialDecode,
2237 ) {
2238 if let Some(decoded) = packet_crypto::unprotect_header(
2239 partial_decode,
2240 &self.spaces,
2241 self.zero_rtt_crypto.as_ref(),
2242 self.peer_params.stateless_reset_token,
2243 ) {
2244 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2245 }
2246 }
2247
2248 fn handle_packet(
2249 &mut self,
2250 now: Instant,
2251 remote: SocketAddr,
2252 ecn: Option<EcnCodepoint>,
2253 packet: Option<Packet>,
2254 stateless_reset: bool,
2255 ) {
2256 self.stats.udp_rx.ios += 1;
2257 if let Some(ref packet) = packet {
2258 trace!(
2259 "got {:?} packet ({} bytes) from {} using id {}",
2260 packet.header.space(),
2261 packet.payload.len() + packet.header_data.len(),
2262 remote,
2263 packet.header.dst_cid(),
2264 );
2265 }
2266
2267 if self.is_handshaking() && remote != self.path.remote {
2268 debug!("discarding packet with unexpected remote during handshake");
2269 return;
2270 }
2271
2272 let was_closed = self.state.is_closed();
2273 let was_drained = self.state.is_drained();
2274
2275 let decrypted = match packet {
2276 None => Err(None),
2277 Some(mut packet) => self
2278 .decrypt_packet(now, &mut packet)
2279 .map(move |number| (packet, number)),
2280 };
2281 let result = match decrypted {
2282 _ if stateless_reset => {
2283 debug!("got stateless reset");
2284 Err(ConnectionError::Reset)
2285 }
2286 Err(Some(e)) => {
2287 warn!("illegal packet: {}", e);
2288 Err(e.into())
2289 }
2290 Err(None) => {
2291 debug!("failed to authenticate packet");
2292 self.authentication_failures += 1;
2293 let integrity_limit = self.spaces[self.highest_space]
2294 .crypto
2295 .as_ref()
2296 .unwrap()
2297 .packet
2298 .local
2299 .integrity_limit();
2300 if self.authentication_failures > integrity_limit {
2301 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2302 } else {
2303 return;
2304 }
2305 }
2306 Ok((packet, number)) => {
2307 let span = match number {
2308 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2309 None => trace_span!("recv", space = ?packet.header.space()),
2310 };
2311 let _guard = span.enter();
2312
2313 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2314 if number.is_some_and(is_duplicate) {
2315 debug!("discarding possible duplicate packet");
2316 return;
2317 } else if self.state.is_handshake() && packet.header.is_short() {
2318 trace!("dropping short packet during handshake");
2320 return;
2321 } else {
2322 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2323 if let State::Handshake(ref hs) = self.state {
2324 if self.side.is_server() && token != &hs.expected_token {
2325 warn!("discarding Initial with invalid retry token");
2329 return;
2330 }
2331 }
2332 }
2333
2334 if !self.state.is_closed() {
2335 let spin = match packet.header {
2336 Header::Short { spin, .. } => spin,
2337 _ => false,
2338 };
2339 self.on_packet_authenticated(
2340 now,
2341 packet.header.space(),
2342 ecn,
2343 number,
2344 spin,
2345 packet.header.is_1rtt(),
2346 );
2347 }
2348
2349 self.process_decrypted_packet(now, remote, number, packet)
2350 }
2351 }
2352 };
2353
2354 if let Err(conn_err) = result {
2356 self.error = Some(conn_err.clone());
2357 self.state = match conn_err {
2358 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2359 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2360 ConnectionError::Reset
2361 | ConnectionError::TransportError(TransportError {
2362 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2363 ..
2364 }) => State::Drained,
2365 ConnectionError::TimedOut => {
2366 unreachable!("timeouts aren't generated by packet processing");
2367 }
2368 ConnectionError::TransportError(err) => {
2369 debug!("closing connection due to transport error: {}", err);
2370 State::closed(err)
2371 }
2372 ConnectionError::VersionMismatch => State::Draining,
2373 ConnectionError::LocallyClosed => {
2374 unreachable!("LocallyClosed isn't generated by packet processing");
2375 }
2376 ConnectionError::CidsExhausted => {
2377 unreachable!("CidsExhausted isn't generated by packet processing");
2378 }
2379 };
2380 }
2381
2382 if !was_closed && self.state.is_closed() {
2383 self.close_common();
2384 if !self.state.is_drained() {
2385 self.set_close_timer(now);
2386 }
2387 }
2388 if !was_drained && self.state.is_drained() {
2389 self.endpoint_events.push_back(EndpointEventInner::Drained);
2390 self.timers.stop(Timer::Close);
2393 }
2394
2395 if let State::Closed(_) = self.state {
2397 self.close = remote == self.path.remote;
2398 }
2399 }
2400
2401 fn process_decrypted_packet(
2402 &mut self,
2403 now: Instant,
2404 remote: SocketAddr,
2405 number: Option<u64>,
2406 packet: Packet,
2407 ) -> Result<(), ConnectionError> {
2408 let state = match self.state {
2409 State::Established => {
2410 match packet.header.space() {
2411 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2412 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2413 _ => {
2414 trace!("discarding unexpected pre-handshake packet");
2415 }
2416 }
2417 return Ok(());
2418 }
2419 State::Closed(_) => {
2420 for result in frame::Iter::new(packet.payload.freeze())? {
2421 let frame = match result {
2422 Ok(frame) => frame,
2423 Err(err) => {
2424 debug!("frame decoding error: {err:?}");
2425 continue;
2426 }
2427 };
2428
2429 if let Frame::Padding = frame {
2430 continue;
2431 };
2432
2433 self.stats.frame_rx.record(&frame);
2434
2435 if let Frame::Close(_) = frame {
2436 trace!("draining");
2437 self.state = State::Draining;
2438 break;
2439 }
2440 }
2441 return Ok(());
2442 }
2443 State::Draining | State::Drained => return Ok(()),
2444 State::Handshake(ref mut state) => state,
2445 };
2446
2447 match packet.header {
2448 Header::Retry {
2449 src_cid: rem_cid, ..
2450 } => {
2451 if self.side.is_server() {
2452 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2453 }
2454
2455 if self.total_authed_packets > 1
2456 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2458 &self.rem_cids.active(),
2459 &packet.header_data,
2460 &packet.payload,
2461 )
2462 {
2463 trace!("discarding invalid Retry");
2464 return Ok(());
2472 }
2473
2474 trace!("retrying with CID {}", rem_cid);
2475 let client_hello = state.client_hello.take().unwrap();
2476 self.retry_src_cid = Some(rem_cid);
2477 self.rem_cids.update_initial_cid(rem_cid);
2478 self.rem_handshake_cid = rem_cid;
2479
2480 let space = &mut self.spaces[SpaceId::Initial];
2481 if let Some(info) = space.take(0) {
2482 self.on_packet_acked(now, info);
2483 };
2484
2485 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
2487 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2488 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2489 crypto_offset: client_hello.len() as u64,
2490 ..PacketSpace::new(now)
2491 };
2492 self.spaces[SpaceId::Initial]
2493 .pending
2494 .crypto
2495 .push_back(frame::Crypto {
2496 offset: 0,
2497 data: client_hello,
2498 });
2499
2500 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2502 for info in zero_rtt.into_values() {
2503 self.remove_in_flight(&info);
2504 self.spaces[SpaceId::Data].pending |= info.retransmits;
2505 }
2506 self.streams.retransmit_all_for_0rtt();
2507
2508 let token_len = packet.payload.len() - 16;
2509 let ConnectionSide::Client { ref mut token, .. } = self.side else {
2510 unreachable!("we already short-circuited if we're server");
2511 };
2512 *token = packet.payload.freeze().split_to(token_len);
2513 self.state = State::Handshake(state::Handshake {
2514 expected_token: Bytes::new(),
2515 rem_cid_set: false,
2516 client_hello: None,
2517 });
2518 Ok(())
2519 }
2520 Header::Long {
2521 ty: LongType::Handshake,
2522 src_cid: rem_cid,
2523 ..
2524 } => {
2525 if rem_cid != self.rem_handshake_cid {
2526 debug!(
2527 "discarding packet with mismatched remote CID: {} != {}",
2528 self.rem_handshake_cid, rem_cid
2529 );
2530 return Ok(());
2531 }
2532 self.on_path_validated();
2533
2534 self.process_early_payload(now, packet)?;
2535 if self.state.is_closed() {
2536 return Ok(());
2537 }
2538
2539 if self.crypto.is_handshaking() {
2540 trace!("handshake ongoing");
2541 return Ok(());
2542 }
2543
2544 if self.side.is_client() {
2545 let params =
2547 self.crypto
2548 .transport_parameters()?
2549 .ok_or_else(|| TransportError {
2550 code: TransportErrorCode::crypto(0x6d),
2551 frame: None,
2552 reason: "transport parameters missing".into(),
2553 })?;
2554
2555 if self.has_0rtt() {
2556 if !self.crypto.early_data_accepted().unwrap() {
2557 debug_assert!(self.side.is_client());
2558 debug!("0-RTT rejected");
2559 self.accepted_0rtt = false;
2560 self.streams.zero_rtt_rejected();
2561
2562 self.spaces[SpaceId::Data].pending = Retransmits::default();
2564
2565 let sent_packets =
2567 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2568 for packet in sent_packets.into_values() {
2569 self.remove_in_flight(&packet);
2570 }
2571 } else {
2572 self.accepted_0rtt = true;
2573 params.validate_resumption_from(&self.peer_params)?;
2574 }
2575 }
2576 if let Some(token) = params.stateless_reset_token {
2577 self.endpoint_events
2578 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2579 }
2580 self.handle_peer_params(params)?;
2581 self.issue_first_cids(now);
2582 } else {
2583 self.spaces[SpaceId::Data].pending.handshake_done = true;
2585 self.discard_space(now, SpaceId::Handshake);
2586 }
2587
2588 self.events.push_back(Event::Connected);
2589 self.state = State::Established;
2590 trace!("established");
2591 Ok(())
2592 }
2593 Header::Initial(InitialHeader {
2594 src_cid: rem_cid, ..
2595 }) => {
2596 if !state.rem_cid_set {
2597 trace!("switching remote CID to {}", rem_cid);
2598 let mut state = state.clone();
2599 self.rem_cids.update_initial_cid(rem_cid);
2600 self.rem_handshake_cid = rem_cid;
2601 self.orig_rem_cid = rem_cid;
2602 state.rem_cid_set = true;
2603 self.state = State::Handshake(state);
2604 } else if rem_cid != self.rem_handshake_cid {
2605 debug!(
2606 "discarding packet with mismatched remote CID: {} != {}",
2607 self.rem_handshake_cid, rem_cid
2608 );
2609 return Ok(());
2610 }
2611
2612 let starting_space = self.highest_space;
2613 self.process_early_payload(now, packet)?;
2614
2615 if self.side.is_server()
2616 && starting_space == SpaceId::Initial
2617 && self.highest_space != SpaceId::Initial
2618 {
2619 let params =
2620 self.crypto
2621 .transport_parameters()?
2622 .ok_or_else(|| TransportError {
2623 code: TransportErrorCode::crypto(0x6d),
2624 frame: None,
2625 reason: "transport parameters missing".into(),
2626 })?;
2627 self.handle_peer_params(params)?;
2628 self.issue_first_cids(now);
2629 self.init_0rtt();
2630 }
2631 Ok(())
2632 }
2633 Header::Long {
2634 ty: LongType::ZeroRtt,
2635 ..
2636 } => {
2637 self.process_payload(now, remote, number.unwrap(), packet)?;
2638 Ok(())
2639 }
2640 Header::VersionNegotiate { .. } => {
2641 if self.total_authed_packets > 1 {
2642 return Ok(());
2643 }
2644 let supported = packet
2645 .payload
2646 .chunks(4)
2647 .any(|x| match <[u8; 4]>::try_from(x) {
2648 Ok(version) => self.version == u32::from_be_bytes(version),
2649 Err(_) => false,
2650 });
2651 if supported {
2652 return Ok(());
2653 }
2654 debug!("remote doesn't support our version");
2655 Err(ConnectionError::VersionMismatch)
2656 }
2657 Header::Short { .. } => unreachable!(
2658 "short packets received during handshake are discarded in handle_packet"
2659 ),
2660 }
2661 }
2662
2663 fn process_early_payload(
2665 &mut self,
2666 now: Instant,
2667 packet: Packet,
2668 ) -> Result<(), TransportError> {
2669 debug_assert_ne!(packet.header.space(), SpaceId::Data);
2670 let payload_len = packet.payload.len();
2671 let mut ack_eliciting = false;
2672 for result in frame::Iter::new(packet.payload.freeze())? {
2673 let frame = result?;
2674 let span = match frame {
2675 Frame::Padding => continue,
2676 _ => Some(trace_span!("frame", ty = %frame.ty())),
2677 };
2678
2679 self.stats.frame_rx.record(&frame);
2680
2681 let _guard = span.as_ref().map(|x| x.enter());
2682 ack_eliciting |= frame.is_ack_eliciting();
2683
2684 match frame {
2686 Frame::Padding | Frame::Ping => {}
2687 Frame::Crypto(frame) => {
2688 self.read_crypto(packet.header.space(), &frame, payload_len)?;
2689 }
2690 Frame::Ack(ack) => {
2691 self.on_ack_received(now, packet.header.space(), ack)?;
2692 }
2693 Frame::Close(reason) => {
2694 self.error = Some(reason.into());
2695 self.state = State::Draining;
2696 return Ok(());
2697 }
2698 _ => {
2699 let mut err =
2700 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
2701 err.frame = Some(frame.ty());
2702 return Err(err);
2703 }
2704 }
2705 }
2706
2707 if ack_eliciting {
2708 self.spaces[packet.header.space()]
2710 .pending_acks
2711 .set_immediate_ack_required();
2712 }
2713
2714 self.write_crypto();
2715 Ok(())
2716 }
2717
2718 fn process_payload(
2719 &mut self,
2720 now: Instant,
2721 remote: SocketAddr,
2722 number: u64,
2723 packet: Packet,
2724 ) -> Result<(), TransportError> {
2725 let payload = packet.payload.freeze();
2726 let mut is_probing_packet = true;
2727 let mut close = None;
2728 let payload_len = payload.len();
2729 let mut ack_eliciting = false;
2730 for result in frame::Iter::new(payload)? {
2731 let frame = result?;
2732 let span = match frame {
2733 Frame::Padding => continue,
2734 _ => Some(trace_span!("frame", ty = %frame.ty())),
2735 };
2736
2737 self.stats.frame_rx.record(&frame);
2738 match &frame {
2741 Frame::Crypto(f) => {
2742 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
2743 }
2744 Frame::Stream(f) => {
2745 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
2746 }
2747 Frame::Datagram(f) => {
2748 trace!(len = f.data.len(), "got datagram frame");
2749 }
2750 f => {
2751 trace!("got frame {:?}", f);
2752 }
2753 }
2754
2755 let _guard = span.as_ref().map(|x| x.enter());
2756 if packet.header.is_0rtt() {
2757 match frame {
2758 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
2759 return Err(TransportError::PROTOCOL_VIOLATION(
2760 "illegal frame type in 0-RTT",
2761 ));
2762 }
2763 _ => {}
2764 }
2765 }
2766 ack_eliciting |= frame.is_ack_eliciting();
2767
2768 match frame {
2770 Frame::Padding
2771 | Frame::PathChallenge(_)
2772 | Frame::PathResponse(_)
2773 | Frame::NewConnectionId(_) => {}
2774 _ => {
2775 is_probing_packet = false;
2776 }
2777 }
2778 match frame {
2779 Frame::Crypto(frame) => {
2780 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
2781 }
2782 Frame::Stream(frame) => {
2783 if self.streams.received(frame, payload_len)?.should_transmit() {
2784 self.spaces[SpaceId::Data].pending.max_data = true;
2785 }
2786 }
2787 Frame::Ack(ack) => {
2788 self.on_ack_received(now, SpaceId::Data, ack)?;
2789 }
2790 Frame::Padding | Frame::Ping => {}
2791 Frame::Close(reason) => {
2792 close = Some(reason);
2793 }
2794 Frame::PathChallenge(token) => {
2795 self.path_responses.push(number, token, remote);
2796 if remote == self.path.remote {
2797 match self.peer_supports_ack_frequency() {
2800 true => self.immediate_ack(),
2801 false => self.ping(),
2802 }
2803 }
2804 }
2805 Frame::PathResponse(token) => {
2806 if self.path.challenge == Some(token) && remote == self.path.remote {
2807 trace!("new path validated");
2808 self.timers.stop(Timer::PathValidation);
2809 self.path.challenge = None;
2810 self.path.validated = true;
2811 if let Some((_, ref mut prev_path)) = self.prev_path {
2812 prev_path.challenge = None;
2813 prev_path.challenge_pending = false;
2814 }
2815 } else {
2816 debug!(token, "ignoring invalid PATH_RESPONSE");
2817 }
2818 }
2819 Frame::MaxData(bytes) => {
2820 self.streams.received_max_data(bytes);
2821 }
2822 Frame::MaxStreamData { id, offset } => {
2823 self.streams.received_max_stream_data(id, offset)?;
2824 }
2825 Frame::MaxStreams { dir, count } => {
2826 self.streams.received_max_streams(dir, count)?;
2827 }
2828 Frame::ResetStream(frame) => {
2829 if self.streams.received_reset(frame)?.should_transmit() {
2830 self.spaces[SpaceId::Data].pending.max_data = true;
2831 }
2832 }
2833 Frame::DataBlocked { offset } => {
2834 debug!(offset, "peer claims to be blocked at connection level");
2835 }
2836 Frame::StreamDataBlocked { id, offset } => {
2837 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
2838 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
2839 return Err(TransportError::STREAM_STATE_ERROR(
2840 "STREAM_DATA_BLOCKED on send-only stream",
2841 ));
2842 }
2843 debug!(
2844 stream = %id,
2845 offset, "peer claims to be blocked at stream level"
2846 );
2847 }
2848 Frame::StreamsBlocked { dir, limit } => {
2849 if limit > MAX_STREAM_COUNT {
2850 return Err(TransportError::FRAME_ENCODING_ERROR(
2851 "unrepresentable stream limit",
2852 ));
2853 }
2854 debug!(
2855 "peer claims to be blocked opening more than {} {} streams",
2856 limit, dir
2857 );
2858 }
2859 Frame::StopSending(frame::StopSending { id, error_code }) => {
2860 if id.initiator() != self.side.side() {
2861 if id.dir() == Dir::Uni {
2862 debug!("got STOP_SENDING on recv-only {}", id);
2863 return Err(TransportError::STREAM_STATE_ERROR(
2864 "STOP_SENDING on recv-only stream",
2865 ));
2866 }
2867 } else if self.streams.is_local_unopened(id) {
2868 return Err(TransportError::STREAM_STATE_ERROR(
2869 "STOP_SENDING on unopened stream",
2870 ));
2871 }
2872 self.streams.received_stop_sending(id, error_code);
2873 }
2874 Frame::RetireConnectionId { sequence } => {
2875 let allow_more_cids = self
2876 .local_cid_state
2877 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
2878 self.endpoint_events
2879 .push_back(EndpointEventInner::RetireConnectionId(
2880 now,
2881 sequence,
2882 allow_more_cids,
2883 ));
2884 }
2885 Frame::NewConnectionId(frame) => {
2886 trace!(
2887 sequence = frame.sequence,
2888 id = %frame.id,
2889 retire_prior_to = frame.retire_prior_to,
2890 );
2891 if self.rem_cids.active().is_empty() {
2892 return Err(TransportError::PROTOCOL_VIOLATION(
2893 "NEW_CONNECTION_ID when CIDs aren't in use",
2894 ));
2895 }
2896 if frame.retire_prior_to > frame.sequence {
2897 return Err(TransportError::PROTOCOL_VIOLATION(
2898 "NEW_CONNECTION_ID retiring unissued CIDs",
2899 ));
2900 }
2901
2902 use crate::cid_queue::InsertError;
2903 match self.rem_cids.insert(frame) {
2904 Ok(None) => {}
2905 Ok(Some((retired, reset_token))) => {
2906 let pending_retired =
2907 &mut self.spaces[SpaceId::Data].pending.retire_cids;
2908 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
2911 if (pending_retired.len() as u64)
2914 .saturating_add(retired.end.saturating_sub(retired.start))
2915 > MAX_PENDING_RETIRED_CIDS
2916 {
2917 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
2918 "queued too many retired CIDs",
2919 ));
2920 }
2921 pending_retired.extend(retired);
2922 self.set_reset_token(reset_token);
2923 }
2924 Err(InsertError::ExceedsLimit) => {
2925 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
2926 }
2927 Err(InsertError::Retired) => {
2928 trace!("discarding already-retired");
2929 self.spaces[SpaceId::Data]
2933 .pending
2934 .retire_cids
2935 .push(frame.sequence);
2936 continue;
2937 }
2938 };
2939
2940 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
2941 self.update_rem_cid();
2944 }
2945 }
2946 Frame::NewToken(NewToken { token }) => {
2947 let ConnectionSide::Client {
2948 token_store,
2949 server_name,
2950 ..
2951 } = &self.side
2952 else {
2953 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
2954 };
2955 if token.is_empty() {
2956 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
2957 }
2958 trace!("got new token");
2959 token_store.insert(server_name, token);
2960 }
2961 Frame::Datagram(datagram) => {
2962 if self
2963 .datagrams
2964 .received(datagram, &self.config.datagram_receive_buffer_size)?
2965 {
2966 self.events.push_back(Event::DatagramReceived);
2967 }
2968 }
2969 Frame::AckFrequency(ack_frequency) => {
2970 let space = &mut self.spaces[SpaceId::Data];
2972
2973 if !self
2974 .ack_frequency
2975 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
2976 {
2977 continue;
2979 }
2980
2981 if let Some(timeout) = space
2984 .pending_acks
2985 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
2986 {
2987 self.timers.set(Timer::MaxAckDelay, timeout);
2988 }
2989 }
2990 Frame::ImmediateAck => {
2991 self.spaces[SpaceId::Data]
2993 .pending_acks
2994 .set_immediate_ack_required();
2995 }
2996 Frame::HandshakeDone => {
2997 if self.side.is_server() {
2998 return Err(TransportError::PROTOCOL_VIOLATION(
2999 "client sent HANDSHAKE_DONE",
3000 ));
3001 }
3002 if self.spaces[SpaceId::Handshake].crypto.is_some() {
3003 self.discard_space(now, SpaceId::Handshake);
3004 }
3005 }
3006 }
3007 }
3008
3009 let space = &mut self.spaces[SpaceId::Data];
3010 if space
3011 .pending_acks
3012 .packet_received(now, number, ack_eliciting, &space.dedup)
3013 {
3014 self.timers
3015 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3016 }
3017
3018 let pending = &mut self.spaces[SpaceId::Data].pending;
3023 self.streams.queue_max_stream_id(pending);
3024
3025 if let Some(reason) = close {
3026 self.error = Some(reason.into());
3027 self.state = State::Draining;
3028 self.close = true;
3029 }
3030
3031 if remote != self.path.remote
3032 && !is_probing_packet
3033 && number == self.spaces[SpaceId::Data].rx_packet
3034 {
3035 let ConnectionSide::Server { ref server_config } = self.side else {
3036 panic!("packets from unknown remote should be dropped by clients");
3037 };
3038 debug_assert!(
3039 server_config.migration,
3040 "migration-initiating packets should have been dropped immediately"
3041 );
3042 self.migrate(now, remote);
3043 self.update_rem_cid();
3045 self.spin = false;
3046 }
3047
3048 Ok(())
3049 }
3050
3051 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3052 trace!(%remote, "migration initiated");
3053 self.path_counter = self.path_counter.wrapping_add(1);
3054 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3058 PathData::from_previous(remote, &self.path, self.path_counter, now)
3059 } else {
3060 let peer_max_udp_payload_size =
3061 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3062 .unwrap_or(u16::MAX);
3063 PathData::new(
3064 remote,
3065 self.allow_mtud,
3066 Some(peer_max_udp_payload_size),
3067 self.path_counter,
3068 now,
3069 &self.config,
3070 )
3071 };
3072 new_path.challenge = Some(self.rng.random());
3073 new_path.challenge_pending = true;
3074 let prev_pto = self.pto(SpaceId::Data);
3075
3076 let mut prev = mem::replace(&mut self.path, new_path);
3077 if prev.challenge.is_none() {
3079 prev.challenge = Some(self.rng.random());
3080 prev.challenge_pending = true;
3081 self.prev_path = Some((self.rem_cids.active(), prev));
3084 }
3085
3086 self.timers.set(
3087 Timer::PathValidation,
3088 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3089 );
3090 }
3091
3092 pub fn local_address_changed(&mut self) {
3094 self.update_rem_cid();
3095 self.ping();
3096 }
3097
3098 fn update_rem_cid(&mut self) {
3100 let (reset_token, retired) = match self.rem_cids.next() {
3101 Some(x) => x,
3102 None => return,
3103 };
3104
3105 self.spaces[SpaceId::Data]
3107 .pending
3108 .retire_cids
3109 .extend(retired);
3110 self.set_reset_token(reset_token);
3111 }
3112
3113 fn set_reset_token(&mut self, reset_token: ResetToken) {
3114 self.endpoint_events
3115 .push_back(EndpointEventInner::ResetToken(
3116 self.path.remote,
3117 reset_token,
3118 ));
3119 self.peer_params.stateless_reset_token = Some(reset_token);
3120 }
3121
3122 fn issue_first_cids(&mut self, now: Instant) {
3124 if self.local_cid_state.cid_len() == 0 {
3125 return;
3126 }
3127
3128 let mut n = self.peer_params.issue_cids_limit() - 1;
3130 if let ConnectionSide::Server { server_config } = &self.side {
3131 if server_config.has_preferred_address() {
3132 n -= 1;
3134 }
3135 }
3136 self.endpoint_events
3137 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3138 }
3139
3140 fn populate_packet(
3141 &mut self,
3142 now: Instant,
3143 space_id: SpaceId,
3144 buf: &mut Vec<u8>,
3145 max_size: usize,
3146 pn: u64,
3147 ) -> SentFrames {
3148 let mut sent = SentFrames::default();
3149 let space = &mut self.spaces[space_id];
3150 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3151 space.pending_acks.maybe_ack_non_eliciting();
3152
3153 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3155 buf.write(frame::FrameType::HANDSHAKE_DONE);
3156 sent.retransmits.get_or_create().handshake_done = true;
3157 self.stats.frame_tx.handshake_done =
3159 self.stats.frame_tx.handshake_done.saturating_add(1);
3160 }
3161
3162 if mem::replace(&mut space.ping_pending, false) {
3164 trace!("PING");
3165 buf.write(frame::FrameType::PING);
3166 sent.non_retransmits = true;
3167 self.stats.frame_tx.ping += 1;
3168 }
3169
3170 if mem::replace(&mut space.immediate_ack_pending, false) {
3172 trace!("IMMEDIATE_ACK");
3173 buf.write(frame::FrameType::IMMEDIATE_ACK);
3174 sent.non_retransmits = true;
3175 self.stats.frame_tx.immediate_ack += 1;
3176 }
3177
3178 if space.pending_acks.can_send() {
3180 Self::populate_acks(
3181 now,
3182 self.receiving_ecn,
3183 &mut sent,
3184 space,
3185 buf,
3186 &mut self.stats,
3187 );
3188 }
3189
3190 if mem::replace(&mut space.pending.ack_frequency, false) {
3192 let sequence_number = self.ack_frequency.next_sequence_number();
3193
3194 let config = self.config.ack_frequency_config.as_ref().unwrap();
3196
3197 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3199 self.path.rtt.get(),
3200 config,
3201 &self.peer_params,
3202 );
3203
3204 trace!(?max_ack_delay, "ACK_FREQUENCY");
3205
3206 frame::AckFrequency {
3207 sequence: sequence_number,
3208 ack_eliciting_threshold: config.ack_eliciting_threshold,
3209 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3210 reordering_threshold: config.reordering_threshold,
3211 }
3212 .encode(buf);
3213
3214 sent.retransmits.get_or_create().ack_frequency = true;
3215
3216 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3217 self.stats.frame_tx.ack_frequency += 1;
3218 }
3219
3220 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3222 if let Some(token) = self.path.challenge {
3224 self.path.challenge_pending = false;
3226 sent.non_retransmits = true;
3227 sent.requires_padding = true;
3228 trace!("PATH_CHALLENGE {:08x}", token);
3229 buf.write(frame::FrameType::PATH_CHALLENGE);
3230 buf.write(token);
3231 self.stats.frame_tx.path_challenge += 1;
3232 }
3233 }
3234
3235 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3237 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3238 sent.non_retransmits = true;
3239 sent.requires_padding = true;
3240 trace!("PATH_RESPONSE {:08x}", token);
3241 buf.write(frame::FrameType::PATH_RESPONSE);
3242 buf.write(token);
3243 self.stats.frame_tx.path_response += 1;
3244 }
3245 }
3246
3247 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3249 let mut frame = match space.pending.crypto.pop_front() {
3250 Some(x) => x,
3251 None => break,
3252 };
3253
3254 let max_crypto_data_size = max_size
3259 - buf.len()
3260 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3262 - 2; let len = frame
3265 .data
3266 .len()
3267 .min(2usize.pow(14) - 1)
3268 .min(max_crypto_data_size);
3269
3270 let data = frame.data.split_to(len);
3271 let truncated = frame::Crypto {
3272 offset: frame.offset,
3273 data,
3274 };
3275 trace!(
3276 "CRYPTO: off {} len {}",
3277 truncated.offset,
3278 truncated.data.len()
3279 );
3280 truncated.encode(buf);
3281 self.stats.frame_tx.crypto += 1;
3282 sent.retransmits.get_or_create().crypto.push_back(truncated);
3283 if !frame.data.is_empty() {
3284 frame.offset += len as u64;
3285 space.pending.crypto.push_front(frame);
3286 }
3287 }
3288
3289 if space_id == SpaceId::Data {
3290 self.streams.write_control_frames(
3291 buf,
3292 &mut space.pending,
3293 &mut sent.retransmits,
3294 &mut self.stats.frame_tx,
3295 max_size,
3296 );
3297 }
3298
3299 while buf.len() + NewConnectionId::SIZE_BOUND < max_size {
3301 let issued = match space.pending.new_cids.pop() {
3302 Some(x) => x,
3303 None => break,
3304 };
3305 trace!(
3306 sequence = issued.sequence,
3307 id = %issued.id,
3308 "NEW_CONNECTION_ID"
3309 );
3310 frame::NewConnectionId {
3311 sequence: issued.sequence,
3312 retire_prior_to: self.local_cid_state.retire_prior_to(),
3313 id: issued.id,
3314 reset_token: issued.reset_token,
3315 }
3316 .encode(buf);
3317 sent.retransmits.get_or_create().new_cids.push(issued);
3318 self.stats.frame_tx.new_connection_id += 1;
3319 }
3320
3321 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3323 let seq = match space.pending.retire_cids.pop() {
3324 Some(x) => x,
3325 None => break,
3326 };
3327 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3328 buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3329 buf.write_var(seq);
3330 sent.retransmits.get_or_create().retire_cids.push(seq);
3331 self.stats.frame_tx.retire_connection_id += 1;
3332 }
3333
3334 let mut sent_datagrams = false;
3336 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3337 match self.datagrams.write(buf, max_size) {
3338 true => {
3339 sent_datagrams = true;
3340 sent.non_retransmits = true;
3341 self.stats.frame_tx.datagram += 1;
3342 }
3343 false => break,
3344 }
3345 }
3346 if self.datagrams.send_blocked && sent_datagrams {
3347 self.events.push_back(Event::DatagramsUnblocked);
3348 self.datagrams.send_blocked = false;
3349 }
3350
3351 while let Some(remote_addr) = space.pending.new_tokens.pop() {
3353 debug_assert_eq!(space_id, SpaceId::Data);
3354 let ConnectionSide::Server { server_config } = &self.side else {
3355 panic!("NEW_TOKEN frames should not be enqueued by clients");
3356 };
3357
3358 if remote_addr != self.path.remote {
3359 continue;
3364 }
3365
3366 let token = Token::new(
3367 TokenPayload::Validation {
3368 ip: remote_addr.ip(),
3369 issued: server_config.time_source.now(),
3370 },
3371 &mut self.rng,
3372 );
3373 let new_token = NewToken {
3374 token: token.encode(&*server_config.token_key).into(),
3375 };
3376
3377 if buf.len() + new_token.size() >= max_size {
3378 space.pending.new_tokens.push(remote_addr);
3379 break;
3380 }
3381
3382 new_token.encode(buf);
3383 sent.retransmits
3384 .get_or_create()
3385 .new_tokens
3386 .push(remote_addr);
3387 self.stats.frame_tx.new_token += 1;
3388 }
3389
3390 if space_id == SpaceId::Data {
3392 sent.stream_frames =
3393 self.streams
3394 .write_stream_frames(buf, max_size, self.config.send_fairness);
3395 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
3396 }
3397
3398 sent
3399 }
3400
3401 fn populate_acks(
3406 now: Instant,
3407 receiving_ecn: bool,
3408 sent: &mut SentFrames,
3409 space: &mut PacketSpace,
3410 buf: &mut Vec<u8>,
3411 stats: &mut ConnectionStats,
3412 ) {
3413 debug_assert!(!space.pending_acks.ranges().is_empty());
3414
3415 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
3417 let ecn = if receiving_ecn {
3418 Some(&space.ecn_counters)
3419 } else {
3420 None
3421 };
3422 sent.largest_acked = space.pending_acks.ranges().max();
3423
3424 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
3425
3426 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
3428 let delay = delay_micros >> ack_delay_exp.into_inner();
3429
3430 trace!(
3431 "ACK {:?}, Delay = {}us",
3432 space.pending_acks.ranges(),
3433 delay_micros
3434 );
3435
3436 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
3437 stats.frame_tx.acks += 1;
3438 }
3439
3440 fn close_common(&mut self) {
3441 trace!("connection closed");
3442 for &timer in &Timer::VALUES {
3443 self.timers.stop(timer);
3444 }
3445 }
3446
3447 fn set_close_timer(&mut self, now: Instant) {
3448 self.timers
3449 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
3450 }
3451
3452 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
3454 if Some(self.orig_rem_cid) != params.initial_src_cid
3455 || (self.side.is_client()
3456 && (Some(self.initial_dst_cid) != params.original_dst_cid
3457 || self.retry_src_cid != params.retry_src_cid))
3458 {
3459 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
3460 "CID authentication failure",
3461 ));
3462 }
3463
3464 self.set_peer_params(params);
3465
3466 Ok(())
3467 }
3468
3469 fn set_peer_params(&mut self, params: TransportParameters) {
3470 self.streams.set_params(¶ms);
3471 self.idle_timeout =
3472 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
3473 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
3474 if let Some(ref info) = params.preferred_address {
3475 self.rem_cids.insert(frame::NewConnectionId {
3476 sequence: 1,
3477 id: info.connection_id,
3478 reset_token: info.stateless_reset_token,
3479 retire_prior_to: 0,
3480 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
3481 }
3482 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
3483 self.peer_params = params;
3484 self.path.mtud.on_peer_max_udp_payload_size_received(
3485 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
3486 );
3487 }
3488
3489 fn decrypt_packet(
3490 &mut self,
3491 now: Instant,
3492 packet: &mut Packet,
3493 ) -> Result<Option<u64>, Option<TransportError>> {
3494 let result = packet_crypto::decrypt_packet_body(
3495 packet,
3496 &self.spaces,
3497 self.zero_rtt_crypto.as_ref(),
3498 self.key_phase,
3499 self.prev_crypto.as_ref(),
3500 self.next_crypto.as_ref(),
3501 )?;
3502
3503 let result = match result {
3504 Some(r) => r,
3505 None => return Ok(None),
3506 };
3507
3508 if result.outgoing_key_update_acked {
3509 if let Some(prev) = self.prev_crypto.as_mut() {
3510 prev.end_packet = Some((result.number, now));
3511 self.set_key_discard_timer(now, packet.header.space());
3512 }
3513 }
3514
3515 if result.incoming_key_update {
3516 trace!("key update authenticated");
3517 self.update_keys(Some((result.number, now)), true);
3518 self.set_key_discard_timer(now, packet.header.space());
3519 }
3520
3521 Ok(Some(result.number))
3522 }
3523
3524 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
3525 trace!("executing key update");
3526 let new = self
3530 .crypto
3531 .next_1rtt_keys()
3532 .expect("only called for `Data` packets");
3533 self.key_phase_size = new
3534 .local
3535 .confidentiality_limit()
3536 .saturating_sub(KEY_UPDATE_MARGIN);
3537 let old = mem::replace(
3538 &mut self.spaces[SpaceId::Data]
3539 .crypto
3540 .as_mut()
3541 .unwrap() .packet,
3543 mem::replace(self.next_crypto.as_mut().unwrap(), new),
3544 );
3545 self.spaces[SpaceId::Data].sent_with_keys = 0;
3546 self.prev_crypto = Some(PrevCrypto {
3547 crypto: old,
3548 end_packet,
3549 update_unacked: remote,
3550 });
3551 self.key_phase = !self.key_phase;
3552 }
3553
3554 fn peer_supports_ack_frequency(&self) -> bool {
3555 self.peer_params.min_ack_delay.is_some()
3556 }
3557
3558 pub(crate) fn immediate_ack(&mut self) {
3563 self.spaces[self.highest_space].immediate_ack_pending = true;
3564 }
3565
3566 #[cfg(test)]
3568 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
3569 let (first_decode, remaining) = match &event.0 {
3570 ConnectionEventInner::Datagram(DatagramConnectionEvent {
3571 first_decode,
3572 remaining,
3573 ..
3574 }) => (first_decode, remaining),
3575 _ => return None,
3576 };
3577
3578 if remaining.is_some() {
3579 panic!("Packets should never be coalesced in tests");
3580 }
3581
3582 let decrypted_header = packet_crypto::unprotect_header(
3583 first_decode.clone(),
3584 &self.spaces,
3585 self.zero_rtt_crypto.as_ref(),
3586 self.peer_params.stateless_reset_token,
3587 )?;
3588
3589 let mut packet = decrypted_header.packet?;
3590 packet_crypto::decrypt_packet_body(
3591 &mut packet,
3592 &self.spaces,
3593 self.zero_rtt_crypto.as_ref(),
3594 self.key_phase,
3595 self.prev_crypto.as_ref(),
3596 self.next_crypto.as_ref(),
3597 )
3598 .ok()?;
3599
3600 Some(packet.payload.to_vec())
3601 }
3602
3603 #[cfg(test)]
3606 pub(crate) fn bytes_in_flight(&self) -> u64 {
3607 self.path.in_flight.bytes
3608 }
3609
3610 #[cfg(test)]
3612 pub(crate) fn congestion_window(&self) -> u64 {
3613 self.path
3614 .congestion
3615 .window()
3616 .saturating_sub(self.path.in_flight.bytes)
3617 }
3618
3619 #[cfg(test)]
3621 pub(crate) fn is_idle(&self) -> bool {
3622 Timer::VALUES
3623 .iter()
3624 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
3625 .filter_map(|&t| Some((t, self.timers.get(t)?)))
3626 .min_by_key(|&(_, time)| time)
3627 .map_or(true, |(timer, _)| timer == Timer::Idle)
3628 }
3629
3630 #[cfg(test)]
3632 pub(crate) fn using_ecn(&self) -> bool {
3633 self.path.sending_ecn
3634 }
3635
3636 #[cfg(test)]
3638 pub(crate) fn total_recvd(&self) -> u64 {
3639 self.path.total_recvd
3640 }
3641
3642 #[cfg(test)]
3643 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
3644 self.local_cid_state.active_seq()
3645 }
3646
3647 #[cfg(test)]
3650 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
3651 let n = self.local_cid_state.assign_retire_seq(v);
3652 self.endpoint_events
3653 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3654 }
3655
3656 #[cfg(test)]
3658 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
3659 self.rem_cids.active_seq()
3660 }
3661
3662 #[cfg(test)]
3664 pub(crate) fn path_mtu(&self) -> u16 {
3665 self.path.current_mtu()
3666 }
3667
3668 fn can_send_1rtt(&self, max_size: usize) -> bool {
3672 self.streams.can_send_stream_data()
3673 || self.path.challenge_pending
3674 || self
3675 .prev_path
3676 .as_ref()
3677 .is_some_and(|(_, x)| x.challenge_pending)
3678 || !self.path_responses.is_empty()
3679 || self
3680 .datagrams
3681 .outgoing
3682 .front()
3683 .is_some_and(|x| x.size(true) <= max_size)
3684 }
3685
3686 fn remove_in_flight(&mut self, packet: &SentPacket) {
3688 for path in [&mut self.path]
3690 .into_iter()
3691 .chain(self.prev_path.as_mut().map(|(_, data)| data))
3692 {
3693 if path.remove_in_flight(packet) {
3694 return;
3695 }
3696 }
3697 }
3698
3699 fn kill(&mut self, reason: ConnectionError) {
3701 self.close_common();
3702 self.error = Some(reason);
3703 self.state = State::Drained;
3704 self.endpoint_events.push_back(EndpointEventInner::Drained);
3705 }
3706
3707 pub fn current_mtu(&self) -> u16 {
3711 self.path.current_mtu()
3712 }
3713
3714 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
3721 let pn_len = match pn {
3722 Some(pn) => PacketNumber::new(
3723 pn,
3724 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
3725 )
3726 .len(),
3727 None => 4,
3729 };
3730
3731 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
3733 }
3734
3735 fn tag_len_1rtt(&self) -> usize {
3736 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
3737 Some(crypto) => Some(&*crypto.packet.local),
3738 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
3739 };
3740 key.map_or(16, |x| x.tag_len())
3744 }
3745
3746 fn on_path_validated(&mut self) {
3748 self.path.validated = true;
3749 let ConnectionSide::Server { server_config } = &self.side else {
3750 return;
3751 };
3752 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
3753 new_tokens.clear();
3754 for _ in 0..server_config.validation_token.sent {
3755 new_tokens.push(self.path.remote);
3756 }
3757 }
3758}
3759
3760impl fmt::Debug for Connection {
3761 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3762 f.debug_struct("Connection")
3763 .field("handshake_cid", &self.handshake_cid)
3764 .finish()
3765 }
3766}
3767
3768enum ConnectionSide {
3770 Client {
3771 token: Bytes,
3773 token_store: Arc<dyn TokenStore>,
3774 server_name: String,
3775 },
3776 Server {
3777 server_config: Arc<ServerConfig>,
3778 },
3779}
3780
3781impl ConnectionSide {
3782 fn remote_may_migrate(&self) -> bool {
3783 match self {
3784 Self::Server { server_config } => server_config.migration,
3785 Self::Client { .. } => false,
3786 }
3787 }
3788
3789 fn is_client(&self) -> bool {
3790 self.side().is_client()
3791 }
3792
3793 fn is_server(&self) -> bool {
3794 self.side().is_server()
3795 }
3796
3797 fn side(&self) -> Side {
3798 match *self {
3799 Self::Client { .. } => Side::Client,
3800 Self::Server { .. } => Side::Server,
3801 }
3802 }
3803}
3804
3805impl From<SideArgs> for ConnectionSide {
3806 fn from(side: SideArgs) -> Self {
3807 match side {
3808 SideArgs::Client {
3809 token_store,
3810 server_name,
3811 } => Self::Client {
3812 token: token_store.take(&server_name).unwrap_or_default(),
3813 token_store,
3814 server_name,
3815 },
3816 SideArgs::Server {
3817 server_config,
3818 pref_addr_cid: _,
3819 path_validated: _,
3820 } => Self::Server { server_config },
3821 }
3822 }
3823}
3824
3825pub(crate) enum SideArgs {
3827 Client {
3828 token_store: Arc<dyn TokenStore>,
3829 server_name: String,
3830 },
3831 Server {
3832 server_config: Arc<ServerConfig>,
3833 pref_addr_cid: Option<ConnectionId>,
3834 path_validated: bool,
3835 },
3836}
3837
3838impl SideArgs {
3839 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
3840 match *self {
3841 Self::Client { .. } => None,
3842 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
3843 }
3844 }
3845
3846 pub(crate) fn path_validated(&self) -> bool {
3847 match *self {
3848 Self::Client { .. } => true,
3849 Self::Server { path_validated, .. } => path_validated,
3850 }
3851 }
3852
3853 pub(crate) fn side(&self) -> Side {
3854 match *self {
3855 Self::Client { .. } => Side::Client,
3856 Self::Server { .. } => Side::Server,
3857 }
3858 }
3859}
3860
3861#[derive(Debug, Error, Clone, PartialEq, Eq)]
3863pub enum ConnectionError {
3864 #[error("peer doesn't implement any supported version")]
3866 VersionMismatch,
3867 #[error(transparent)]
3869 TransportError(#[from] TransportError),
3870 #[error("aborted by peer: {0}")]
3872 ConnectionClosed(frame::ConnectionClose),
3873 #[error("closed by peer: {0}")]
3875 ApplicationClosed(frame::ApplicationClose),
3876 #[error("reset by peer")]
3878 Reset,
3879 #[error("timed out")]
3885 TimedOut,
3886 #[error("closed")]
3888 LocallyClosed,
3889 #[error("CIDs exhausted")]
3893 CidsExhausted,
3894}
3895
3896impl From<Close> for ConnectionError {
3897 fn from(x: Close) -> Self {
3898 match x {
3899 Close::Connection(reason) => Self::ConnectionClosed(reason),
3900 Close::Application(reason) => Self::ApplicationClosed(reason),
3901 }
3902 }
3903}
3904
3905impl From<ConnectionError> for io::Error {
3907 fn from(x: ConnectionError) -> Self {
3908 use ConnectionError::*;
3909 let kind = match x {
3910 TimedOut => io::ErrorKind::TimedOut,
3911 Reset => io::ErrorKind::ConnectionReset,
3912 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
3913 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
3914 io::ErrorKind::Other
3915 }
3916 };
3917 Self::new(kind, x)
3918 }
3919}
3920
3921#[allow(unreachable_pub)] #[derive(Clone)]
3923pub enum State {
3924 Handshake(state::Handshake),
3925 Established,
3926 Closed(state::Closed),
3927 Draining,
3928 Drained,
3930}
3931
3932impl State {
3933 fn closed<R: Into<Close>>(reason: R) -> Self {
3934 Self::Closed(state::Closed {
3935 reason: reason.into(),
3936 })
3937 }
3938
3939 fn is_handshake(&self) -> bool {
3940 matches!(*self, Self::Handshake(_))
3941 }
3942
3943 fn is_established(&self) -> bool {
3944 matches!(*self, Self::Established)
3945 }
3946
3947 fn is_closed(&self) -> bool {
3948 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
3949 }
3950
3951 fn is_drained(&self) -> bool {
3952 matches!(*self, Self::Drained)
3953 }
3954}
3955
3956mod state {
3957 use super::*;
3958
3959 #[allow(unreachable_pub)] #[derive(Clone)]
3961 pub struct Handshake {
3962 pub(super) rem_cid_set: bool,
3966 pub(super) expected_token: Bytes,
3970 pub(super) client_hello: Option<Bytes>,
3974 }
3975
3976 #[allow(unreachable_pub)] #[derive(Clone)]
3978 pub struct Closed {
3979 pub(super) reason: Close,
3980 }
3981}
3982
3983#[derive(Debug)]
3985pub enum Event {
3986 HandshakeDataReady,
3988 Connected,
3990 ConnectionLost {
3994 reason: ConnectionError,
3996 },
3997 Stream(StreamEvent),
3999 DatagramReceived,
4001 DatagramsUnblocked,
4003}
4004
4005fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
4006 if x > y { x - y } else { Duration::ZERO }
4007}
4008
4009fn get_max_ack_delay(params: &TransportParameters) -> Duration {
4010 Duration::from_micros(params.max_ack_delay.0 * 1000)
4011}
4012
4013const MAX_BACKOFF_EXPONENT: u32 = 16;
4015
4016const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
4024
4025const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
4031 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
4032
4033const KEY_UPDATE_MARGIN: u64 = 10_000;
4037
4038#[derive(Default)]
4039struct SentFrames {
4040 retransmits: ThinRetransmits,
4041 largest_acked: Option<u64>,
4042 stream_frames: StreamMetaVec,
4043 non_retransmits: bool,
4045 requires_padding: bool,
4046}
4047
4048impl SentFrames {
4049 fn is_ack_only(&self, streams: &StreamsState) -> bool {
4051 self.largest_acked.is_some()
4052 && !self.non_retransmits
4053 && self.stream_frames.is_empty()
4054 && self.retransmits.is_empty(streams)
4055 }
4056}
4057
4058fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
4066 match (x, y) {
4067 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
4068 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
4069 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
4070 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
4071 }
4072}
4073
4074#[cfg(test)]
4075mod tests {
4076 use super::*;
4077
4078 #[test]
4079 fn negotiate_max_idle_timeout_commutative() {
4080 let test_params = [
4081 (None, None, None),
4082 (None, Some(VarInt(0)), None),
4083 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
4084 (Some(VarInt(0)), Some(VarInt(0)), None),
4085 (
4086 Some(VarInt(2)),
4087 Some(VarInt(0)),
4088 Some(Duration::from_millis(2)),
4089 ),
4090 (
4091 Some(VarInt(1)),
4092 Some(VarInt(4)),
4093 Some(Duration::from_millis(1)),
4094 ),
4095 ];
4096
4097 for (left, right, result) in test_params {
4098 assert_eq!(negotiate_max_idle_timeout(left, right), result);
4099 assert_eq!(negotiate_max_idle_timeout(right, left), result);
4100 }
4101 }
4102}