1use std::{
2 cmp,
3 collections::VecDeque,
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12
13use rand::{Rng, SeedableRng, rngs::StdRng};
14use thiserror::Error;
15use tracing::{debug, error, trace, trace_span, warn};
16
17use crate::{
18 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
19 MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
20 TransportErrorCode, VarInt,
21 cid_generator::ConnectionIdGenerator,
22 cid_queue::CidQueue,
23 coding::BufMutExt,
24 config::{ServerConfig, TransportConfig},
25 crypto::{self, KeyPair, Keys, PacketKey},
26 frame::{self, Close, Datagram, FrameStruct, NewConnectionId, NewToken},
27 packet::{
28 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
29 PacketNumber, PartialDecode, SpaceId,
30 },
31 range_set::ArrayRangeSet,
32 shared::{
33 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
34 EndpointEvent, EndpointEventInner,
35 },
36 token::{ResetToken, Token, TokenPayload},
37 transport_parameters::TransportParameters,
38};
39
40mod ack_frequency;
41use ack_frequency::AckFrequencyState;
42
43mod assembler;
44pub use assembler::Chunk;
45
46mod cid_state;
47use cid_state::CidState;
48
49mod datagrams;
50use datagrams::DatagramState;
51pub use datagrams::{Datagrams, SendDatagramError};
52
53mod mtud;
54mod pacing;
55
56mod packet_builder;
57use packet_builder::PacketBuilder;
58
59mod packet_crypto;
60use packet_crypto::{PrevCrypto, ZeroRttCrypto};
61
62mod paths;
63pub use paths::RttEstimator;
64use paths::{PathData, PathResponses};
65
66pub(crate) mod qlog;
67
68mod send_buffer;
69
70mod spaces;
71#[cfg(fuzzing)]
72pub use spaces::Retransmits;
73#[cfg(not(fuzzing))]
74use spaces::Retransmits;
75use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
76
77mod stats;
78pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
79
80mod streams;
81#[cfg(fuzzing)]
82pub use streams::StreamsState;
83#[cfg(not(fuzzing))]
84use streams::StreamsState;
85pub use streams::{
86 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
87 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
88};
89
90mod timer;
91use crate::congestion::Controller;
92use timer::{Timer, TimerTable};
93
94pub struct Connection {
134 endpoint_config: Arc<EndpointConfig>,
135 config: Arc<TransportConfig>,
136 rng: StdRng,
137 crypto: Box<dyn crypto::Session>,
138 handshake_cid: ConnectionId,
140 rem_handshake_cid: ConnectionId,
142 local_ip: Option<IpAddr>,
145 path: PathData,
146 path_counter: u64,
150 allow_mtud: bool,
152 prev_path: Option<(ConnectionId, PathData)>,
153 state: State,
154 side: ConnectionSide,
155 zero_rtt_enabled: bool,
157 zero_rtt_crypto: Option<ZeroRttCrypto>,
159 key_phase: bool,
160 key_phase_size: u64,
162 peer_params: TransportParameters,
164 orig_rem_cid: ConnectionId,
166 initial_dst_cid: ConnectionId,
168 retry_src_cid: Option<ConnectionId>,
171 events: VecDeque<Event>,
172 endpoint_events: VecDeque<EndpointEventInner>,
173 spin_enabled: bool,
175 spin: bool,
177 spaces: [PacketSpace; 3],
179 highest_space: SpaceId,
181 prev_crypto: Option<PrevCrypto>,
183 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
188 accepted_0rtt: bool,
189 permit_idle_reset: bool,
191 idle_timeout: Option<Duration>,
193 timers: TimerTable,
194 authentication_failures: u64,
196 error: Option<ConnectionError>,
198 packet_number_filter: PacketNumberFilter,
200
201 path_responses: PathResponses,
206 close: bool,
207
208 ack_frequency: AckFrequencyState,
212
213 pto_count: u32,
218
219 receiving_ecn: bool,
224 total_authed_packets: u64,
226 app_limited: bool,
229
230 streams: StreamsState,
231 rem_cids: CidQueue,
233 local_cid_state: CidState,
235 datagrams: DatagramState,
237 stats: ConnectionStats,
239 version: u32,
241}
242
243impl Connection {
244 pub(crate) fn new(
245 endpoint_config: Arc<EndpointConfig>,
246 config: Arc<TransportConfig>,
247 init_cid: ConnectionId,
248 loc_cid: ConnectionId,
249 rem_cid: ConnectionId,
250 remote: SocketAddr,
251 local_ip: Option<IpAddr>,
252 crypto: Box<dyn crypto::Session>,
253 cid_gen: &dyn ConnectionIdGenerator,
254 now: Instant,
255 version: u32,
256 allow_mtud: bool,
257 rng_seed: [u8; 32],
258 side_args: SideArgs,
259 ) -> Self {
260 let pref_addr_cid = side_args.pref_addr_cid();
261 let path_validated = side_args.path_validated();
262 let connection_side = ConnectionSide::from(side_args);
263 let side = connection_side.side();
264 let initial_space = PacketSpace {
265 crypto: Some(crypto.initial_keys(&init_cid, side)),
266 ..PacketSpace::new(now)
267 };
268 let state = State::Handshake(state::Handshake {
269 rem_cid_set: side.is_server(),
270 expected_token: Bytes::new(),
271 client_hello: None,
272 });
273 let mut rng = StdRng::from_seed(rng_seed);
274 let mut this = Self {
275 endpoint_config,
276 crypto,
277 handshake_cid: loc_cid,
278 rem_handshake_cid: rem_cid,
279 local_cid_state: CidState::new(
280 cid_gen.cid_len(),
281 cid_gen.cid_lifetime(),
282 now,
283 if pref_addr_cid.is_some() { 2 } else { 1 },
284 ),
285 path: PathData::new(remote, allow_mtud, None, 0, now, &config),
286 path_counter: 0,
287 allow_mtud,
288 local_ip,
289 prev_path: None,
290 state,
291 side: connection_side,
292 zero_rtt_enabled: false,
293 zero_rtt_crypto: None,
294 key_phase: false,
295 key_phase_size: rng.random_range(10..1000),
302 peer_params: TransportParameters::default(),
303 orig_rem_cid: rem_cid,
304 initial_dst_cid: init_cid,
305 retry_src_cid: None,
306 events: VecDeque::new(),
307 endpoint_events: VecDeque::new(),
308 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
309 spin: false,
310 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
311 highest_space: SpaceId::Initial,
312 prev_crypto: None,
313 next_crypto: None,
314 accepted_0rtt: false,
315 permit_idle_reset: true,
316 idle_timeout: match config.max_idle_timeout {
317 None | Some(VarInt(0)) => None,
318 Some(dur) => Some(Duration::from_millis(dur.0)),
319 },
320 timers: TimerTable::default(),
321 authentication_failures: 0,
322 error: None,
323 #[cfg(test)]
324 packet_number_filter: match config.deterministic_packet_numbers {
325 false => PacketNumberFilter::new(&mut rng),
326 true => PacketNumberFilter::disabled(),
327 },
328 #[cfg(not(test))]
329 packet_number_filter: PacketNumberFilter::new(&mut rng),
330
331 path_responses: PathResponses::default(),
332 close: false,
333
334 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
335 &TransportParameters::default(),
336 )),
337
338 pto_count: 0,
339
340 app_limited: false,
341 receiving_ecn: false,
342 total_authed_packets: 0,
343
344 streams: StreamsState::new(
345 side,
346 config.max_concurrent_uni_streams,
347 config.max_concurrent_bidi_streams,
348 config.send_window,
349 config.receive_window,
350 config.stream_receive_window,
351 ),
352 datagrams: DatagramState::default(),
353 config,
354 rem_cids: CidQueue::new(rem_cid),
355 rng,
356 stats: ConnectionStats::default(),
357 version,
358 };
359 if path_validated {
360 this.on_path_validated();
361 }
362 if side.is_client() {
363 this.write_crypto();
365 this.init_0rtt();
366 }
367 this
368 }
369
370 #[must_use]
378 pub fn poll_timeout(&mut self) -> Option<Instant> {
379 self.timers.next_timeout()
380 }
381
382 #[must_use]
388 pub fn poll(&mut self) -> Option<Event> {
389 if let Some(x) = self.events.pop_front() {
390 return Some(x);
391 }
392
393 if let Some(event) = self.streams.poll() {
394 return Some(Event::Stream(event));
395 }
396
397 if let Some(err) = self.error.take() {
398 return Some(Event::ConnectionLost { reason: err });
399 }
400
401 None
402 }
403
404 #[must_use]
406 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
407 self.endpoint_events.pop_front().map(EndpointEvent)
408 }
409
410 #[must_use]
412 pub fn streams(&mut self) -> Streams<'_> {
413 Streams {
414 state: &mut self.streams,
415 conn_state: &self.state,
416 }
417 }
418
419 #[must_use]
421 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
422 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
423 RecvStream {
424 id,
425 state: &mut self.streams,
426 pending: &mut self.spaces[SpaceId::Data].pending,
427 }
428 }
429
430 #[must_use]
432 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
433 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
434 SendStream {
435 id,
436 state: &mut self.streams,
437 pending: &mut self.spaces[SpaceId::Data].pending,
438 conn_state: &self.state,
439 }
440 }
441
442 #[must_use]
452 pub fn poll_transmit(
453 &mut self,
454 now: Instant,
455 max_datagrams: usize,
456 buf: &mut Vec<u8>,
457 ) -> Option<Transmit> {
458 assert!(max_datagrams != 0);
459 let max_datagrams = match self.config.enable_segmentation_offload {
460 false => 1,
461 true => max_datagrams,
462 };
463
464 let mut num_datagrams = 0;
465 let mut datagram_start = 0;
468 let mut segment_size = usize::from(self.path.current_mtu());
469
470 if let Some(challenge) = self.send_path_challenge(now, buf) {
471 return Some(challenge);
472 }
473
474 for space in SpaceId::iter() {
476 let request_immediate_ack =
477 space == SpaceId::Data && self.peer_supports_ack_frequency();
478 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
479 }
480
481 let close = match self.state {
483 State::Drained => {
484 self.app_limited = true;
485 return None;
486 }
487 State::Draining | State::Closed(_) => {
488 if !self.close {
491 self.app_limited = true;
492 return None;
493 }
494 true
495 }
496 _ => false,
497 };
498
499 if let Some(config) = &self.config.ack_frequency_config {
501 self.spaces[SpaceId::Data].pending.ack_frequency = self
502 .ack_frequency
503 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
504 && self.highest_space == SpaceId::Data
505 && self.peer_supports_ack_frequency();
506 }
507
508 let mut buf_capacity = 0;
512
513 let mut coalesce = true;
514 let mut builder_storage: Option<PacketBuilder> = None;
515 let mut sent_frames = None;
516 let mut pad_datagram = false;
517 let mut pad_datagram_to_mtu = false;
518 let mut congestion_blocked = false;
519
520 let mut space_idx = 0;
522 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
523 while space_idx < spaces.len() {
526 let space_id = spaces[space_idx];
527 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
534 let frame_space_1rtt =
535 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
536
537 let can_send = self.space_can_send(space_id, frame_space_1rtt);
539 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
540 space_idx += 1;
541 continue;
542 }
543
544 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
545 || self.spaces[space_id].ping_pending
546 || self.spaces[space_id].immediate_ack_pending;
547 if space_id == SpaceId::Data {
548 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
549 }
550
551 pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
552
553 let buf_end = if let Some(builder) = &builder_storage {
557 buf.len().max(builder.min_size) + builder.tag_len
558 } else {
559 buf.len()
560 };
561
562 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
563 crypto.packet.local.tag_len()
564 } else if space_id == SpaceId::Data {
565 self.zero_rtt_crypto.as_ref().expect(
566 "sending packets in the application data space requires known 0-RTT or 1-RTT keys",
567 ).packet.tag_len()
568 } else {
569 unreachable!("tried to send {:?} packet without keys", space_id)
570 };
571 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
572 if num_datagrams >= max_datagrams {
576 break;
578 }
579
580 if self
587 .path
588 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
589 {
590 trace!("blocked by anti-amplification");
591 break;
592 }
593
594 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
597 let untracked_bytes = if let Some(builder) = &builder_storage {
599 buf_capacity - builder.partial_encode.start
600 } else {
601 0
602 } as u64;
603 debug_assert!(untracked_bytes <= segment_size as u64);
604
605 let bytes_to_send = segment_size as u64 + untracked_bytes;
606 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
607 space_idx += 1;
608 congestion_blocked = true;
609 trace!("blocked by congestion control");
612 continue;
613 }
614
615 let smoothed_rtt = self.path.rtt.get();
617 if let Some(delay) = self.path.pacing.delay(
618 smoothed_rtt,
619 bytes_to_send,
620 self.path.current_mtu(),
621 self.path.congestion.window(),
622 now,
623 ) {
624 self.timers.set(Timer::Pacing, delay);
625 congestion_blocked = true;
626 trace!("blocked by pacing");
629 break;
630 }
631 }
632
633 if let Some(mut builder) = builder_storage.take() {
635 if pad_datagram {
636 builder.pad_to(MIN_INITIAL_SIZE);
637 }
638
639 if num_datagrams > 1 || pad_datagram_to_mtu {
640 const MAX_PADDING: usize = 16;
653 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
654 - datagram_start
655 + builder.tag_len;
656 if (packet_len_unpadded + MAX_PADDING < segment_size
657 && !pad_datagram_to_mtu)
658 || datagram_start + segment_size > buf_capacity
659 {
660 trace!(
661 "GSO truncated by demand for {} padding bytes or loss probe",
662 segment_size - packet_len_unpadded
663 );
664 builder_storage = Some(builder);
665 break;
666 }
667
668 builder.pad_to(segment_size as u16);
671 }
672
673 builder.finish_and_track(now, self, sent_frames.take(), buf);
674
675 if num_datagrams == 1 {
676 segment_size = buf.len();
683 buf_capacity = buf.len();
686
687 if space_id == SpaceId::Data {
694 let frame_space_1rtt =
695 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
696 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
697 break;
698 }
699 }
700 }
701 }
702
703 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
705 0 => segment_size,
706 _ => {
707 self.spaces[space_id].loss_probes -= 1;
708 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
712 }
713 };
714 buf_capacity += next_datagram_size_limit;
715 if buf.capacity() < buf_capacity {
716 buf.reserve(max_datagrams * segment_size);
725 }
726 num_datagrams += 1;
727 coalesce = true;
728 pad_datagram = false;
729 datagram_start = buf.len();
730
731 debug_assert_eq!(
732 datagram_start % segment_size,
733 0,
734 "datagrams in a GSO batch must be aligned to the segment size"
735 );
736 } else {
737 if let Some(builder) = builder_storage.take() {
741 builder.finish_and_track(now, self, sent_frames.take(), buf);
742 }
743 }
744
745 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
746
747 if self.spaces[SpaceId::Initial].crypto.is_some()
752 && space_id == SpaceId::Handshake
753 && self.side.is_client()
754 {
755 self.discard_space(now, SpaceId::Initial);
758 }
759 if let Some(ref mut prev) = self.prev_crypto {
760 prev.update_unacked = false;
761 }
762
763 debug_assert!(
764 builder_storage.is_none() && sent_frames.is_none(),
765 "Previous packet must have been finished"
766 );
767
768 let builder = builder_storage.insert(PacketBuilder::new(
769 now,
770 space_id,
771 self.rem_cids.active(),
772 buf,
773 buf_capacity,
774 datagram_start,
775 ack_eliciting,
776 self,
777 )?);
778 coalesce = coalesce && !builder.short_header;
779
780 pad_datagram |=
782 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
783
784 if close {
785 trace!("sending CONNECTION_CLOSE");
786 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
791 Self::populate_acks(
792 now,
793 self.receiving_ecn,
794 &mut SentFrames::default(),
795 &mut self.spaces[space_id],
796 buf,
797 &mut self.stats,
798 );
799 }
800
801 debug_assert!(
805 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
806 "ACKs should leave space for ConnectionClose"
807 );
808 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
809 let max_frame_size = builder.max_size - buf.len();
810 match self.state {
811 State::Closed(state::Closed { ref reason }) => {
812 if space_id == SpaceId::Data || reason.is_transport_layer() {
813 reason.encode(buf, max_frame_size)
814 } else {
815 frame::ConnectionClose {
816 error_code: TransportErrorCode::APPLICATION_ERROR,
817 frame_type: None,
818 reason: Bytes::new(),
819 }
820 .encode(buf, max_frame_size)
821 }
822 }
823 State::Draining => frame::ConnectionClose {
824 error_code: TransportErrorCode::NO_ERROR,
825 frame_type: None,
826 reason: Bytes::new(),
827 }
828 .encode(buf, max_frame_size),
829 _ => unreachable!(
830 "tried to make a close packet when the connection wasn't closed"
831 ),
832 }
833 }
834 if space_id == self.highest_space {
835 self.close = false;
837 break;
839 } else {
840 space_idx += 1;
844 continue;
845 }
846 }
847
848 if space_id == SpaceId::Data && num_datagrams == 1 {
851 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
852 let mut builder = builder_storage.take().unwrap();
855 trace!("PATH_RESPONSE {:08x} (off-path)", token);
856 buf.write(frame::FrameType::PATH_RESPONSE);
857 buf.write(token);
858 self.stats.frame_tx.path_response += 1;
859 builder.pad_to(MIN_INITIAL_SIZE);
860 builder.finish_and_track(
861 now,
862 self,
863 Some(SentFrames {
864 non_retransmits: true,
865 ..SentFrames::default()
866 }),
867 buf,
868 );
869 self.stats.udp_tx.on_sent(1, buf.len());
870 return Some(Transmit {
871 destination: remote,
872 size: buf.len(),
873 ecn: None,
874 segment_size: None,
875 src_ip: self.local_ip,
876 });
877 }
878 }
879
880 let sent =
881 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
882
883 debug_assert!(
890 !(sent.is_ack_only(&self.streams)
891 && !can_send.acks
892 && can_send.other
893 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
894 && self.datagrams.outgoing.is_empty()),
895 "SendableFrames was {can_send:?}, but only ACKs have been written"
896 );
897 pad_datagram |= sent.requires_padding;
898
899 if sent.largest_acked.is_some() {
900 self.spaces[space_id].pending_acks.acks_sent();
901 self.timers.stop(Timer::MaxAckDelay);
902 }
903
904 sent_frames = Some(sent);
906
907 }
910
911 if let Some(mut builder) = builder_storage {
913 if pad_datagram {
914 builder.pad_to(MIN_INITIAL_SIZE);
915 }
916
917 if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
923 builder.pad_to(segment_size as u16);
924 }
925
926 let last_packet_number = builder.exact_number;
927 builder.finish_and_track(now, self, sent_frames, buf);
928 self.path
929 .congestion
930 .on_sent(now, buf.len() as u64, last_packet_number);
931
932 self.config.qlog_sink.emit_recovery_metrics(
933 self.pto_count,
934 &mut self.path,
935 now,
936 self.orig_rem_cid,
937 );
938 }
939
940 self.app_limited = buf.is_empty() && !congestion_blocked;
941
942 if buf.is_empty() && self.state.is_established() {
944 let space_id = SpaceId::Data;
945 let probe_size = self
946 .path
947 .mtud
948 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
949
950 let buf_capacity = probe_size as usize;
951 buf.reserve(buf_capacity);
952
953 let mut builder = PacketBuilder::new(
954 now,
955 space_id,
956 self.rem_cids.active(),
957 buf,
958 buf_capacity,
959 0,
960 true,
961 self,
962 )?;
963
964 buf.write(frame::FrameType::PING);
966 self.stats.frame_tx.ping += 1;
967
968 if self.peer_supports_ack_frequency() {
970 buf.write(frame::FrameType::IMMEDIATE_ACK);
971 self.stats.frame_tx.immediate_ack += 1;
972 }
973
974 builder.pad_to(probe_size);
975 let sent_frames = SentFrames {
976 non_retransmits: true,
977 ..Default::default()
978 };
979 builder.finish_and_track(now, self, Some(sent_frames), buf);
980
981 self.stats.path.sent_plpmtud_probes += 1;
982 num_datagrams = 1;
983
984 trace!(?probe_size, "writing MTUD probe");
985 }
986
987 if buf.is_empty() {
988 return None;
989 }
990
991 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
992 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
993
994 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
995
996 Some(Transmit {
997 destination: self.path.remote,
998 size: buf.len(),
999 ecn: if self.path.sending_ecn {
1000 Some(EcnCodepoint::Ect0)
1001 } else {
1002 None
1003 },
1004 segment_size: match num_datagrams {
1005 1 => None,
1006 _ => Some(segment_size),
1007 },
1008 src_ip: self.local_ip,
1009 })
1010 }
1011
1012 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1014 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1015 if !prev_path.challenge_pending {
1016 return None;
1017 }
1018 prev_path.challenge_pending = false;
1019 let token = prev_path
1020 .challenge
1021 .expect("previous path challenge pending without token");
1022 let destination = prev_path.remote;
1023 debug_assert_eq!(
1024 self.highest_space,
1025 SpaceId::Data,
1026 "PATH_CHALLENGE queued without 1-RTT keys"
1027 );
1028 buf.reserve(MIN_INITIAL_SIZE as usize);
1029
1030 let buf_capacity = buf.capacity();
1031
1032 let mut builder = PacketBuilder::new(
1038 now,
1039 SpaceId::Data,
1040 *prev_cid,
1041 buf,
1042 buf_capacity,
1043 0,
1044 false,
1045 self,
1046 )?;
1047 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1048 buf.write(frame::FrameType::PATH_CHALLENGE);
1049 buf.write(token);
1050 self.stats.frame_tx.path_challenge += 1;
1051
1052 builder.pad_to(MIN_INITIAL_SIZE);
1057
1058 builder.finish(self, now, buf);
1059 self.stats.udp_tx.on_sent(1, buf.len());
1060
1061 Some(Transmit {
1062 destination,
1063 size: buf.len(),
1064 ecn: None,
1065 segment_size: None,
1066 src_ip: self.local_ip,
1067 })
1068 }
1069
1070 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1072 if self.spaces[space_id].crypto.is_none()
1073 && (space_id != SpaceId::Data
1074 || self.zero_rtt_crypto.is_none()
1075 || self.side.is_server())
1076 {
1077 return SendableFrames::empty();
1079 }
1080 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1081 if space_id == SpaceId::Data {
1082 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1083 }
1084 can_send
1085 }
1086
1087 pub fn handle_event(&mut self, event: ConnectionEvent) {
1093 use ConnectionEventInner::*;
1094 match event.0 {
1095 Datagram(DatagramConnectionEvent {
1096 now,
1097 remote,
1098 ecn,
1099 first_decode,
1100 remaining,
1101 }) => {
1102 if remote != self.path.remote && !self.side.remote_may_migrate() {
1106 trace!("discarding packet from unrecognized peer {}", remote);
1107 return;
1108 }
1109
1110 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1111
1112 self.stats.udp_rx.datagrams += 1;
1113 self.stats.udp_rx.bytes += first_decode.len() as u64;
1114 let data_len = first_decode.len();
1115
1116 self.handle_decode(now, remote, ecn, first_decode);
1117 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1122
1123 if let Some(data) = remaining {
1124 self.stats.udp_rx.bytes += data.len() as u64;
1125 self.handle_coalesced(now, remote, ecn, data);
1126 }
1127
1128 self.config.qlog_sink.emit_recovery_metrics(
1129 self.pto_count,
1130 &mut self.path,
1131 now,
1132 self.orig_rem_cid,
1133 );
1134
1135 if was_anti_amplification_blocked {
1136 self.set_loss_detection_timer(now);
1140 }
1141 }
1142 NewIdentifiers(ids, now) => {
1143 self.local_cid_state.new_cids(&ids, now);
1144 ids.into_iter().rev().for_each(|frame| {
1145 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1146 });
1147 if self
1149 .timers
1150 .get(Timer::PushNewCid)
1151 .map_or(true, |x| x <= now)
1152 {
1153 self.reset_cid_retirement();
1154 }
1155 }
1156 }
1157 }
1158
1159 pub fn handle_timeout(&mut self, now: Instant) {
1169 for &timer in &Timer::VALUES {
1170 if !self.timers.is_expired(timer, now) {
1171 continue;
1172 }
1173 self.timers.stop(timer);
1174 trace!(timer = ?timer, "timeout");
1175 match timer {
1176 Timer::Close => {
1177 self.state = State::Drained;
1178 self.endpoint_events.push_back(EndpointEventInner::Drained);
1179 }
1180 Timer::Idle => {
1181 self.kill(ConnectionError::TimedOut);
1182 }
1183 Timer::KeepAlive => {
1184 trace!("sending keep-alive");
1185 self.ping();
1186 }
1187 Timer::LossDetection => {
1188 self.on_loss_detection_timeout(now);
1189
1190 self.config.qlog_sink.emit_recovery_metrics(
1191 self.pto_count,
1192 &mut self.path,
1193 now,
1194 self.orig_rem_cid,
1195 );
1196 }
1197 Timer::KeyDiscard => {
1198 self.zero_rtt_crypto = None;
1199 self.prev_crypto = None;
1200 }
1201 Timer::PathValidation => {
1202 debug!("path validation failed");
1203 if let Some((_, prev)) = self.prev_path.take() {
1204 self.path = prev;
1205 }
1206 self.path.challenge = None;
1207 self.path.challenge_pending = false;
1208 }
1209 Timer::Pacing => trace!("pacing timer expired"),
1210 Timer::PushNewCid => {
1211 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1213 if !self.state.is_closed() {
1214 trace!(
1215 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1216 self.local_cid_state.retire_prior_to()
1217 );
1218 self.endpoint_events
1219 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1220 }
1221 }
1222 Timer::MaxAckDelay => {
1223 trace!("max ack delay reached");
1224 self.spaces[SpaceId::Data]
1226 .pending_acks
1227 .on_max_ack_delay_timeout()
1228 }
1229 }
1230 }
1231 }
1232
1233 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1245 self.close_inner(
1246 now,
1247 Close::Application(frame::ApplicationClose { error_code, reason }),
1248 )
1249 }
1250
1251 fn close_inner(&mut self, now: Instant, reason: Close) {
1252 let was_closed = self.state.is_closed();
1253 if !was_closed {
1254 self.close_common();
1255 self.set_close_timer(now);
1256 self.close = true;
1257 self.state = State::Closed(state::Closed { reason });
1258 }
1259 }
1260
1261 pub fn datagrams(&mut self) -> Datagrams<'_> {
1263 Datagrams { conn: self }
1264 }
1265
1266 pub fn stats(&self) -> ConnectionStats {
1268 let mut stats = self.stats;
1269 stats.path.rtt = self.path.rtt.get();
1270 stats.path.cwnd = self.path.congestion.window();
1271 stats.path.current_mtu = self.path.mtud.current_mtu();
1272
1273 stats
1274 }
1275
1276 pub fn ping(&mut self) {
1280 self.spaces[self.highest_space].ping_pending = true;
1281 }
1282
1283 pub fn force_key_update(&mut self) {
1287 if !self.state.is_established() {
1288 debug!("ignoring forced key update in illegal state");
1289 return;
1290 }
1291 if self.prev_crypto.is_some() {
1292 debug!("ignoring redundant forced key update");
1295 return;
1296 }
1297 self.update_keys(None, false);
1298 }
1299
1300 #[doc(hidden)]
1302 #[deprecated]
1303 pub fn initiate_key_update(&mut self) {
1304 self.force_key_update();
1305 }
1306
1307 pub fn crypto_session(&self) -> &dyn crypto::Session {
1309 &*self.crypto
1310 }
1311
1312 pub fn is_handshaking(&self) -> bool {
1317 self.state.is_handshake()
1318 }
1319
1320 pub fn is_closed(&self) -> bool {
1328 self.state.is_closed()
1329 }
1330
1331 pub fn is_drained(&self) -> bool {
1336 self.state.is_drained()
1337 }
1338
1339 pub fn accepted_0rtt(&self) -> bool {
1343 self.accepted_0rtt
1344 }
1345
1346 pub fn has_0rtt(&self) -> bool {
1348 self.zero_rtt_enabled
1349 }
1350
1351 pub fn has_pending_retransmits(&self) -> bool {
1353 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1354 }
1355
1356 pub fn side(&self) -> Side {
1358 self.side.side()
1359 }
1360
1361 pub fn remote_address(&self) -> SocketAddr {
1363 self.path.remote
1364 }
1365
1366 pub fn local_ip(&self) -> Option<IpAddr> {
1376 self.local_ip
1377 }
1378
1379 pub fn rtt(&self) -> Duration {
1381 self.path.rtt.get()
1382 }
1383
1384 pub fn congestion_state(&self) -> &dyn Controller {
1386 self.path.congestion.as_ref()
1387 }
1388
1389 pub fn path_changed(&mut self, now: Instant) {
1400 self.path.reset(now, &self.config);
1401 }
1402
1403 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1408 self.streams.set_max_concurrent(dir, count);
1409 let pending = &mut self.spaces[SpaceId::Data].pending;
1412 self.streams.queue_max_stream_id(pending);
1413 }
1414
1415 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1421 self.streams.max_concurrent(dir)
1422 }
1423
1424 pub fn set_send_window(&mut self, send_window: u64) {
1426 self.streams.set_send_window(send_window);
1427 }
1428
1429 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1431 if self.streams.set_receive_window(receive_window) {
1432 self.spaces[SpaceId::Data].pending.max_data = true;
1433 }
1434 }
1435
1436 fn on_ack_received(
1437 &mut self,
1438 now: Instant,
1439 space: SpaceId,
1440 ack: frame::Ack,
1441 ) -> Result<(), TransportError> {
1442 if ack.largest >= self.spaces[space].next_packet_number {
1443 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1444 }
1445 let new_largest = {
1446 let space = &mut self.spaces[space];
1447 if space
1448 .largest_acked_packet
1449 .map_or(true, |pn| ack.largest > pn)
1450 {
1451 space.largest_acked_packet = Some(ack.largest);
1452 if let Some(info) = space.sent_packets.get(&ack.largest) {
1453 space.largest_acked_packet_sent = info.time_sent;
1457 }
1458 true
1459 } else {
1460 false
1461 }
1462 };
1463
1464 let mut newly_acked = ArrayRangeSet::new();
1466 for range in ack.iter() {
1467 self.packet_number_filter.check_ack(space, range.clone())?;
1468 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1469 newly_acked.insert_one(pn);
1470 }
1471 }
1472
1473 if newly_acked.is_empty() {
1474 return Ok(());
1475 }
1476
1477 let mut ack_eliciting_acked = false;
1478 for packet in newly_acked.elts() {
1479 if let Some(info) = self.spaces[space].take(packet) {
1480 if let Some(acked) = info.largest_acked {
1481 self.spaces[space].pending_acks.subtract_below(acked);
1487 }
1488 ack_eliciting_acked |= info.ack_eliciting;
1489
1490 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1492 if mtu_updated {
1493 self.path
1494 .congestion
1495 .on_mtu_update(self.path.mtud.current_mtu());
1496 }
1497
1498 self.ack_frequency.on_acked(packet);
1500
1501 self.on_packet_acked(now, info);
1502 }
1503 }
1504
1505 self.path.congestion.on_end_acks(
1506 now,
1507 self.path.in_flight.bytes,
1508 self.app_limited,
1509 self.spaces[space].largest_acked_packet,
1510 );
1511
1512 if new_largest && ack_eliciting_acked {
1513 let ack_delay = if space != SpaceId::Data {
1514 Duration::from_micros(0)
1515 } else {
1516 cmp::min(
1517 self.ack_frequency.peer_max_ack_delay,
1518 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1519 )
1520 };
1521 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1522 self.path.rtt.update(ack_delay, rtt);
1523 if self.path.first_packet_after_rtt_sample.is_none() {
1524 self.path.first_packet_after_rtt_sample =
1525 Some((space, self.spaces[space].next_packet_number));
1526 }
1527 }
1528
1529 self.detect_lost_packets(now, space, true);
1531
1532 if self.peer_completed_address_validation() {
1533 self.pto_count = 0;
1534 }
1535
1536 if self.path.sending_ecn {
1538 if let Some(ecn) = ack.ecn {
1539 if new_largest {
1544 let sent = self.spaces[space].largest_acked_packet_sent;
1545 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1546 }
1547 } else {
1548 debug!("ECN not acknowledged by peer");
1550 self.path.sending_ecn = false;
1551 }
1552 }
1553
1554 self.set_loss_detection_timer(now);
1555 Ok(())
1556 }
1557
1558 fn process_ecn(
1560 &mut self,
1561 now: Instant,
1562 space: SpaceId,
1563 newly_acked: u64,
1564 ecn: frame::EcnCounts,
1565 largest_sent_time: Instant,
1566 ) {
1567 match self.spaces[space].detect_ecn(newly_acked, ecn) {
1568 Err(e) => {
1569 debug!("halting ECN due to verification failure: {}", e);
1570 self.path.sending_ecn = false;
1571 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
1574 }
1575 Ok(false) => {}
1576 Ok(true) => {
1577 self.stats.path.congestion_events += 1;
1578 self.path
1579 .congestion
1580 .on_congestion_event(now, largest_sent_time, false, 0);
1581 }
1582 }
1583 }
1584
1585 fn on_packet_acked(&mut self, now: Instant, info: SentPacket) {
1588 self.remove_in_flight(&info);
1589 if info.ack_eliciting && self.path.challenge.is_none() {
1590 self.path.congestion.on_ack(
1593 now,
1594 info.time_sent,
1595 info.size.into(),
1596 self.app_limited,
1597 &self.path.rtt,
1598 );
1599 }
1600
1601 if let Some(retransmits) = info.retransmits.get() {
1603 for (id, _) in retransmits.reset_stream.iter() {
1604 self.streams.reset_acked(*id);
1605 }
1606 }
1607
1608 for frame in info.stream_frames {
1609 self.streams.received_ack_of(frame);
1610 }
1611 }
1612
1613 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
1614 let start = if self.zero_rtt_crypto.is_some() {
1615 now
1616 } else {
1617 self.prev_crypto
1618 .as_ref()
1619 .expect("no previous keys")
1620 .end_packet
1621 .as_ref()
1622 .expect("update not acknowledged yet")
1623 .1
1624 };
1625 self.timers
1626 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
1627 }
1628
1629 fn on_loss_detection_timeout(&mut self, now: Instant) {
1630 if let Some((_, pn_space)) = self.loss_time_and_space() {
1631 self.detect_lost_packets(now, pn_space, false);
1633 self.set_loss_detection_timer(now);
1634 return;
1635 }
1636
1637 let (_, space) = match self.pto_time_and_space(now) {
1638 Some(x) => x,
1639 None => {
1640 error!("PTO expired while unset");
1641 return;
1642 }
1643 };
1644 trace!(
1645 in_flight = self.path.in_flight.bytes,
1646 count = self.pto_count,
1647 ?space,
1648 "PTO fired"
1649 );
1650
1651 let count = match self.path.in_flight.ack_eliciting {
1652 0 => {
1655 debug_assert!(!self.peer_completed_address_validation());
1656 1
1657 }
1658 _ => 2,
1660 };
1661 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
1662 self.pto_count = self.pto_count.saturating_add(1);
1663 self.set_loss_detection_timer(now);
1664 }
1665
1666 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
1667 let mut lost_packets = Vec::<u64>::new();
1668 let mut lost_mtu_probe = None;
1669 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
1670 let rtt = self.path.rtt.conservative();
1671 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
1672
1673 let lost_send_time = now.checked_sub(loss_delay).unwrap();
1675 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
1676 let packet_threshold = self.config.packet_threshold as u64;
1677 let mut size_of_lost_packets = 0u64;
1678
1679 let congestion_period =
1683 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
1684 let mut persistent_congestion_start: Option<Instant> = None;
1685 let mut prev_packet = None;
1686 let mut in_persistent_congestion = false;
1687
1688 let space = &mut self.spaces[pn_space];
1689 space.loss_time = None;
1690
1691 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
1692 if prev_packet != Some(packet.wrapping_sub(1)) {
1693 persistent_congestion_start = None;
1695 }
1696
1697 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
1698 {
1699 if Some(packet) == in_flight_mtu_probe {
1700 lost_mtu_probe = in_flight_mtu_probe;
1703 } else {
1704 lost_packets.push(packet);
1705 size_of_lost_packets += info.size as u64;
1706 if info.ack_eliciting && due_to_ack {
1707 match persistent_congestion_start {
1708 Some(start) if info.time_sent - start > congestion_period => {
1711 in_persistent_congestion = true;
1712 }
1713 None if self
1715 .path
1716 .first_packet_after_rtt_sample
1717 .is_some_and(|x| x < (pn_space, packet)) =>
1718 {
1719 persistent_congestion_start = Some(info.time_sent);
1720 }
1721 _ => {}
1722 }
1723 }
1724 }
1725 } else {
1726 let next_loss_time = info.time_sent + loss_delay;
1727 space.loss_time = Some(
1728 space
1729 .loss_time
1730 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
1731 );
1732 persistent_congestion_start = None;
1733 }
1734
1735 prev_packet = Some(packet);
1736 }
1737
1738 if let Some(largest_lost) = lost_packets.last().cloned() {
1740 let old_bytes_in_flight = self.path.in_flight.bytes;
1741 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
1742 self.stats.path.lost_packets += lost_packets.len() as u64;
1743 self.stats.path.lost_bytes += size_of_lost_packets;
1744 trace!(
1745 "packets lost: {:?}, bytes lost: {}",
1746 lost_packets, size_of_lost_packets
1747 );
1748
1749 for &packet in &lost_packets {
1750 let info = self.spaces[pn_space].take(packet).unwrap(); self.config.qlog_sink.emit_packet_lost(
1752 packet,
1753 &info,
1754 lost_send_time,
1755 pn_space,
1756 now,
1757 self.orig_rem_cid,
1758 );
1759 self.remove_in_flight(&info);
1760 for frame in info.stream_frames {
1761 self.streams.retransmit(frame);
1762 }
1763 self.spaces[pn_space].pending |= info.retransmits;
1764 self.path.mtud.on_non_probe_lost(packet, info.size);
1765 }
1766
1767 if self.path.mtud.black_hole_detected(now) {
1768 self.stats.path.black_holes_detected += 1;
1769 self.path
1770 .congestion
1771 .on_mtu_update(self.path.mtud.current_mtu());
1772 if let Some(max_datagram_size) = self.datagrams().max_size() {
1773 self.datagrams.drop_oversized(max_datagram_size);
1774 }
1775 }
1776
1777 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
1779
1780 if lost_ack_eliciting {
1781 self.stats.path.congestion_events += 1;
1782 self.path.congestion.on_congestion_event(
1783 now,
1784 largest_lost_sent,
1785 in_persistent_congestion,
1786 size_of_lost_packets,
1787 );
1788 }
1789 }
1790
1791 if let Some(packet) = lost_mtu_probe {
1793 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(&info);
1795 self.path.mtud.on_probe_lost();
1796 self.stats.path.lost_plpmtud_probes += 1;
1797 }
1798 }
1799
1800 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
1801 SpaceId::iter()
1802 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
1803 .min_by_key(|&(time, _)| time)
1804 }
1805
1806 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
1807 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
1808 let mut duration = self.path.rtt.pto_base() * backoff;
1809
1810 if self.path.in_flight.ack_eliciting == 0 {
1811 debug_assert!(!self.peer_completed_address_validation());
1812 let space = match self.highest_space {
1813 SpaceId::Handshake => SpaceId::Handshake,
1814 _ => SpaceId::Initial,
1815 };
1816 return Some((now + duration, space));
1817 }
1818
1819 let mut result = None;
1820 for space in SpaceId::iter() {
1821 if !self.spaces[space].has_in_flight() {
1822 continue;
1823 }
1824 if space == SpaceId::Data {
1825 if self.is_handshaking() {
1827 return result;
1828 }
1829 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
1831 }
1832 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
1833 Some(time) => time,
1834 None => continue,
1835 };
1836 let pto = last_ack_eliciting + duration;
1837 if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
1838 result = Some((pto, space));
1839 }
1840 }
1841 result
1842 }
1843
1844 fn peer_completed_address_validation(&self) -> bool {
1845 if self.side.is_server() || self.state.is_closed() {
1846 return true;
1847 }
1848 self.spaces[SpaceId::Handshake]
1851 .largest_acked_packet
1852 .is_some()
1853 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
1854 || (self.spaces[SpaceId::Data].crypto.is_some()
1855 && self.spaces[SpaceId::Handshake].crypto.is_none())
1856 }
1857
1858 fn set_loss_detection_timer(&mut self, now: Instant) {
1859 if self.state.is_closed() {
1860 return;
1864 }
1865
1866 if let Some((loss_time, _)) = self.loss_time_and_space() {
1867 self.timers.set(Timer::LossDetection, loss_time);
1869 return;
1870 }
1871
1872 if self.path.anti_amplification_blocked(1) {
1873 self.timers.stop(Timer::LossDetection);
1875 return;
1876 }
1877
1878 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
1879 self.timers.stop(Timer::LossDetection);
1882 return;
1883 }
1884
1885 if let Some((timeout, _)) = self.pto_time_and_space(now) {
1888 self.timers.set(Timer::LossDetection, timeout);
1889 } else {
1890 self.timers.stop(Timer::LossDetection);
1891 }
1892 }
1893
1894 fn pto(&self, space: SpaceId) -> Duration {
1896 let max_ack_delay = match space {
1897 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
1898 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
1899 };
1900 self.path.rtt.pto_base() + max_ack_delay
1901 }
1902
1903 fn on_packet_authenticated(
1904 &mut self,
1905 now: Instant,
1906 space_id: SpaceId,
1907 ecn: Option<EcnCodepoint>,
1908 packet: Option<u64>,
1909 spin: bool,
1910 is_1rtt: bool,
1911 ) {
1912 self.total_authed_packets += 1;
1913 self.reset_keep_alive(now);
1914 self.reset_idle_timeout(now, space_id);
1915 self.permit_idle_reset = true;
1916 self.receiving_ecn |= ecn.is_some();
1917 if let Some(x) = ecn {
1918 let space = &mut self.spaces[space_id];
1919 space.ecn_counters += x;
1920
1921 if x.is_ce() {
1922 space.pending_acks.set_immediate_ack_required();
1923 }
1924 }
1925
1926 let packet = match packet {
1927 Some(x) => x,
1928 None => return,
1929 };
1930 if self.side.is_server() {
1931 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
1932 self.discard_space(now, SpaceId::Initial);
1934 }
1935 if self.zero_rtt_crypto.is_some() && is_1rtt {
1936 self.set_key_discard_timer(now, space_id)
1938 }
1939 }
1940 let space = &mut self.spaces[space_id];
1941 space.pending_acks.insert_one(packet, now);
1942 if packet >= space.rx_packet {
1943 space.rx_packet = packet;
1944 self.spin = self.side.is_client() ^ spin;
1946 }
1947
1948 self.config.qlog_sink.emit_packet_received(
1949 packet,
1950 space_id,
1951 !is_1rtt,
1952 now,
1953 self.orig_rem_cid,
1954 );
1955 }
1956
1957 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
1958 let timeout = match self.idle_timeout {
1959 None => return,
1960 Some(dur) => dur,
1961 };
1962 if self.state.is_closed() {
1963 self.timers.stop(Timer::Idle);
1964 return;
1965 }
1966 let dt = cmp::max(timeout, 3 * self.pto(space));
1967 self.timers.set(Timer::Idle, now + dt);
1968 }
1969
1970 fn reset_keep_alive(&mut self, now: Instant) {
1971 let interval = match self.config.keep_alive_interval {
1972 Some(x) if self.state.is_established() => x,
1973 _ => return,
1974 };
1975 self.timers.set(Timer::KeepAlive, now + interval);
1976 }
1977
1978 fn reset_cid_retirement(&mut self) {
1979 if let Some(t) = self.local_cid_state.next_timeout() {
1980 self.timers.set(Timer::PushNewCid, t);
1981 }
1982 }
1983
1984 pub(crate) fn handle_first_packet(
1989 &mut self,
1990 now: Instant,
1991 remote: SocketAddr,
1992 ecn: Option<EcnCodepoint>,
1993 packet_number: u64,
1994 packet: InitialPacket,
1995 remaining: Option<BytesMut>,
1996 ) -> Result<(), ConnectionError> {
1997 let span = trace_span!("first recv");
1998 let _guard = span.enter();
1999 debug_assert!(self.side.is_server());
2000 let len = packet.header_data.len() + packet.payload.len();
2001 self.path.total_recvd = len as u64;
2002
2003 match self.state {
2004 State::Handshake(ref mut state) => {
2005 state.expected_token = packet.header.token.clone();
2006 }
2007 _ => unreachable!("first packet must be delivered in Handshake state"),
2008 }
2009
2010 self.on_packet_authenticated(
2011 now,
2012 SpaceId::Initial,
2013 ecn,
2014 Some(packet_number),
2015 false,
2016 false,
2017 );
2018
2019 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2020 if let Some(data) = remaining {
2021 self.handle_coalesced(now, remote, ecn, data);
2022 }
2023
2024 self.config.qlog_sink.emit_recovery_metrics(
2025 self.pto_count,
2026 &mut self.path,
2027 now,
2028 self.orig_rem_cid,
2029 );
2030
2031 Ok(())
2032 }
2033
2034 fn init_0rtt(&mut self) {
2035 let (header, packet) = match self.crypto.early_crypto() {
2036 Some(x) => x,
2037 None => return,
2038 };
2039 if self.side.is_client() {
2040 match self.crypto.transport_parameters() {
2041 Ok(params) => {
2042 let params = params
2043 .expect("crypto layer didn't supply transport parameters with ticket");
2044 let params = TransportParameters {
2046 initial_src_cid: None,
2047 original_dst_cid: None,
2048 preferred_address: None,
2049 retry_src_cid: None,
2050 stateless_reset_token: None,
2051 min_ack_delay: None,
2052 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2053 max_ack_delay: TransportParameters::default().max_ack_delay,
2054 ..params
2055 };
2056 self.set_peer_params(params);
2057 }
2058 Err(e) => {
2059 error!("session ticket has malformed transport parameters: {}", e);
2060 return;
2061 }
2062 }
2063 }
2064 trace!("0-RTT enabled");
2065 self.zero_rtt_enabled = true;
2066 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2067 }
2068
2069 fn read_crypto(
2070 &mut self,
2071 space: SpaceId,
2072 crypto: &frame::Crypto,
2073 payload_len: usize,
2074 ) -> Result<(), TransportError> {
2075 let expected = if !self.state.is_handshake() {
2076 SpaceId::Data
2077 } else if self.highest_space == SpaceId::Initial {
2078 SpaceId::Initial
2079 } else {
2080 SpaceId::Handshake
2083 };
2084 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2088
2089 let end = crypto.offset + crypto.data.len() as u64;
2090 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2091 warn!(
2092 "received new {:?} CRYPTO data when expecting {:?}",
2093 space, expected
2094 );
2095 return Err(TransportError::PROTOCOL_VIOLATION(
2096 "new data at unexpected encryption level",
2097 ));
2098 }
2099
2100 let space = &mut self.spaces[space];
2101 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2102 if max > self.config.crypto_buffer_size as u64 {
2103 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2104 }
2105
2106 space
2107 .crypto_stream
2108 .insert(crypto.offset, crypto.data.clone(), payload_len);
2109 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2110 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2111 if self.crypto.read_handshake(&chunk.bytes)? {
2112 self.events.push_back(Event::HandshakeDataReady);
2113 }
2114 }
2115
2116 Ok(())
2117 }
2118
2119 fn write_crypto(&mut self) {
2120 loop {
2121 let space = self.highest_space;
2122 let mut outgoing = Vec::new();
2123 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2124 match space {
2125 SpaceId::Initial => {
2126 self.upgrade_crypto(SpaceId::Handshake, crypto);
2127 }
2128 SpaceId::Handshake => {
2129 self.upgrade_crypto(SpaceId::Data, crypto);
2130 }
2131 _ => unreachable!("got updated secrets during 1-RTT"),
2132 }
2133 }
2134 if outgoing.is_empty() {
2135 if space == self.highest_space {
2136 break;
2137 } else {
2138 continue;
2140 }
2141 }
2142 let offset = self.spaces[space].crypto_offset;
2143 let outgoing = Bytes::from(outgoing);
2144 if let State::Handshake(ref mut state) = self.state {
2145 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2146 state.client_hello = Some(outgoing.clone());
2147 }
2148 }
2149 self.spaces[space].crypto_offset += outgoing.len() as u64;
2150 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2151 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2152 offset,
2153 data: outgoing,
2154 });
2155 }
2156 }
2157
2158 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2160 debug_assert!(
2161 self.spaces[space].crypto.is_none(),
2162 "already reached packet space {space:?}"
2163 );
2164 trace!("{:?} keys ready", space);
2165 if space == SpaceId::Data {
2166 self.next_crypto = Some(
2168 self.crypto
2169 .next_1rtt_keys()
2170 .expect("handshake should be complete"),
2171 );
2172 }
2173
2174 self.spaces[space].crypto = Some(crypto);
2175 debug_assert!(space as usize > self.highest_space as usize);
2176 self.highest_space = space;
2177 if space == SpaceId::Data && self.side.is_client() {
2178 self.zero_rtt_crypto = None;
2180 }
2181 }
2182
2183 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2184 debug_assert!(space_id != SpaceId::Data);
2185 trace!("discarding {:?} keys", space_id);
2186 if space_id == SpaceId::Initial {
2187 if let ConnectionSide::Client { token, .. } = &mut self.side {
2189 *token = Bytes::new();
2190 }
2191 }
2192 let space = &mut self.spaces[space_id];
2193 space.crypto = None;
2194 space.time_of_last_ack_eliciting_packet = None;
2195 space.loss_time = None;
2196 let sent_packets = mem::take(&mut space.sent_packets);
2197 for packet in sent_packets.into_values() {
2198 self.remove_in_flight(&packet);
2199 }
2200 self.set_loss_detection_timer(now)
2201 }
2202
2203 fn handle_coalesced(
2204 &mut self,
2205 now: Instant,
2206 remote: SocketAddr,
2207 ecn: Option<EcnCodepoint>,
2208 data: BytesMut,
2209 ) {
2210 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2211 let mut remaining = Some(data);
2212 while let Some(data) = remaining {
2213 match PartialDecode::new(
2214 data,
2215 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2216 &[self.version],
2217 self.endpoint_config.grease_quic_bit,
2218 ) {
2219 Ok((partial_decode, rest)) => {
2220 remaining = rest;
2221 self.handle_decode(now, remote, ecn, partial_decode);
2222 }
2223 Err(e) => {
2224 trace!("malformed header: {}", e);
2225 return;
2226 }
2227 }
2228 }
2229 }
2230
2231 fn handle_decode(
2232 &mut self,
2233 now: Instant,
2234 remote: SocketAddr,
2235 ecn: Option<EcnCodepoint>,
2236 partial_decode: PartialDecode,
2237 ) {
2238 if let Some(decoded) = packet_crypto::unprotect_header(
2239 partial_decode,
2240 &self.spaces,
2241 self.zero_rtt_crypto.as_ref(),
2242 self.peer_params.stateless_reset_token,
2243 ) {
2244 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2245 }
2246 }
2247
2248 fn handle_packet(
2249 &mut self,
2250 now: Instant,
2251 remote: SocketAddr,
2252 ecn: Option<EcnCodepoint>,
2253 packet: Option<Packet>,
2254 stateless_reset: bool,
2255 ) {
2256 self.stats.udp_rx.ios += 1;
2257 if let Some(ref packet) = packet {
2258 trace!(
2259 "got {:?} packet ({} bytes) from {} using id {}",
2260 packet.header.space(),
2261 packet.payload.len() + packet.header_data.len(),
2262 remote,
2263 packet.header.dst_cid(),
2264 );
2265 }
2266
2267 if self.is_handshaking() && remote != self.path.remote {
2268 debug!("discarding packet with unexpected remote during handshake");
2269 return;
2270 }
2271
2272 let was_closed = self.state.is_closed();
2273 let was_drained = self.state.is_drained();
2274
2275 let decrypted = match packet {
2276 None => Err(None),
2277 Some(mut packet) => self
2278 .decrypt_packet(now, &mut packet)
2279 .map(move |number| (packet, number)),
2280 };
2281 let result = match decrypted {
2282 _ if stateless_reset => {
2283 debug!("got stateless reset");
2284 Err(ConnectionError::Reset)
2285 }
2286 Err(Some(e)) => {
2287 warn!("illegal packet: {}", e);
2288 Err(e.into())
2289 }
2290 Err(None) => {
2291 debug!("failed to authenticate packet");
2292 self.authentication_failures += 1;
2293 let integrity_limit = self.spaces[self.highest_space]
2294 .crypto
2295 .as_ref()
2296 .unwrap()
2297 .packet
2298 .local
2299 .integrity_limit();
2300 if self.authentication_failures > integrity_limit {
2301 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2302 } else {
2303 return;
2304 }
2305 }
2306 Ok((packet, number)) => {
2307 let span = match number {
2308 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2309 None => trace_span!("recv", space = ?packet.header.space()),
2310 };
2311 let _guard = span.enter();
2312
2313 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2314 if number.is_some_and(is_duplicate) {
2315 debug!("discarding possible duplicate packet");
2316 return;
2317 } else if self.state.is_handshake() && packet.header.is_short() {
2318 trace!("dropping short packet during handshake");
2320 return;
2321 } else {
2322 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2323 if let State::Handshake(ref hs) = self.state {
2324 if self.side.is_server() && token != &hs.expected_token {
2325 warn!("discarding Initial with invalid retry token");
2329 return;
2330 }
2331 }
2332 }
2333
2334 if !self.state.is_closed() {
2335 let spin = match packet.header {
2336 Header::Short { spin, .. } => spin,
2337 _ => false,
2338 };
2339 self.on_packet_authenticated(
2340 now,
2341 packet.header.space(),
2342 ecn,
2343 number,
2344 spin,
2345 packet.header.is_1rtt(),
2346 );
2347 }
2348
2349 self.process_decrypted_packet(now, remote, number, packet)
2350 }
2351 }
2352 };
2353
2354 if let Err(conn_err) = result {
2356 self.error = Some(conn_err.clone());
2357 self.state = match conn_err {
2358 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2359 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2360 ConnectionError::Reset
2361 | ConnectionError::TransportError(TransportError {
2362 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2363 ..
2364 }) => State::Drained,
2365 ConnectionError::TimedOut => {
2366 unreachable!("timeouts aren't generated by packet processing");
2367 }
2368 ConnectionError::TransportError(err) => {
2369 debug!("closing connection due to transport error: {}", err);
2370 State::closed(err)
2371 }
2372 ConnectionError::VersionMismatch => State::Draining,
2373 ConnectionError::LocallyClosed => {
2374 unreachable!("LocallyClosed isn't generated by packet processing");
2375 }
2376 ConnectionError::CidsExhausted => {
2377 unreachable!("CidsExhausted isn't generated by packet processing");
2378 }
2379 ConnectionError::JlsAuthFailed(_) => {
2380 unreachable!("JLSAuthFailed isn't generated by packet processing");
2381 }
2382 ConnectionError::JlsForwardError(_) => {
2383 unreachable!("JlsForwardError isn't generated by packet processing");
2384 }
2385 };
2386 }
2387
2388 if !was_closed && self.state.is_closed() {
2389 self.close_common();
2390 if !self.state.is_drained() {
2391 self.set_close_timer(now);
2392 }
2393 }
2394 if !was_drained && self.state.is_drained() {
2395 self.endpoint_events.push_back(EndpointEventInner::Drained);
2396 self.timers.stop(Timer::Close);
2399 }
2400
2401 if let State::Closed(_) = self.state {
2403 self.close = remote == self.path.remote;
2404 }
2405 }
2406
2407 fn process_decrypted_packet(
2408 &mut self,
2409 now: Instant,
2410 remote: SocketAddr,
2411 number: Option<u64>,
2412 packet: Packet,
2413 ) -> Result<(), ConnectionError> {
2414 let state = match self.state {
2415 State::Established => {
2416 match packet.header.space() {
2417 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2418 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2419 _ => {
2420 trace!("discarding unexpected pre-handshake packet");
2421 }
2422 }
2423 return Ok(());
2424 }
2425 State::Closed(_) => {
2426 for result in frame::Iter::new(packet.payload.freeze())? {
2427 let frame = match result {
2428 Ok(frame) => frame,
2429 Err(err) => {
2430 debug!("frame decoding error: {err:?}");
2431 continue;
2432 }
2433 };
2434
2435 if let Frame::Padding = frame {
2436 continue;
2437 };
2438
2439 self.stats.frame_rx.record(&frame);
2440
2441 if let Frame::Close(_) = frame {
2442 trace!("draining");
2443 self.state = State::Draining;
2444 break;
2445 }
2446 }
2447 return Ok(());
2448 }
2449 State::Draining | State::Drained => return Ok(()),
2450 State::Handshake(ref mut state) => state,
2451 };
2452
2453 match packet.header {
2454 Header::Retry {
2455 src_cid: rem_cid, ..
2456 } => {
2457 if self.side.is_server() {
2458 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2459 }
2460
2461 if self.total_authed_packets > 1
2462 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2464 &self.rem_cids.active(),
2465 &packet.header_data,
2466 &packet.payload,
2467 )
2468 {
2469 trace!("discarding invalid Retry");
2470 return Ok(());
2478 }
2479
2480 trace!("retrying with CID {}", rem_cid);
2481 let client_hello = state.client_hello.take().unwrap();
2482 self.retry_src_cid = Some(rem_cid);
2483 self.rem_cids.update_initial_cid(rem_cid);
2484 self.rem_handshake_cid = rem_cid;
2485
2486 let space = &mut self.spaces[SpaceId::Initial];
2487 if let Some(info) = space.take(0) {
2488 self.on_packet_acked(now, info);
2489 };
2490
2491 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
2493 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2494 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2495 crypto_offset: client_hello.len() as u64,
2496 ..PacketSpace::new(now)
2497 };
2498 self.spaces[SpaceId::Initial]
2499 .pending
2500 .crypto
2501 .push_back(frame::Crypto {
2502 offset: 0,
2503 data: client_hello,
2504 });
2505
2506 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2508 for info in zero_rtt.into_values() {
2509 self.remove_in_flight(&info);
2510 self.spaces[SpaceId::Data].pending |= info.retransmits;
2511 }
2512 self.streams.retransmit_all_for_0rtt();
2513
2514 let token_len = packet.payload.len() - 16;
2515 let ConnectionSide::Client { ref mut token, .. } = self.side else {
2516 unreachable!("we already short-circuited if we're server");
2517 };
2518 *token = packet.payload.freeze().split_to(token_len);
2519 self.state = State::Handshake(state::Handshake {
2520 expected_token: Bytes::new(),
2521 rem_cid_set: false,
2522 client_hello: None,
2523 });
2524 Ok(())
2525 }
2526 Header::Long {
2527 ty: LongType::Handshake,
2528 src_cid: rem_cid,
2529 ..
2530 } => {
2531 if rem_cid != self.rem_handshake_cid {
2532 debug!(
2533 "discarding packet with mismatched remote CID: {} != {}",
2534 self.rem_handshake_cid, rem_cid
2535 );
2536 return Ok(());
2537 }
2538 self.on_path_validated();
2539
2540 self.process_early_payload(now, packet)?;
2541 if self.state.is_closed() {
2542 return Ok(());
2543 }
2544
2545 if self.crypto.is_handshaking() {
2546 trace!("handshake ongoing");
2547 return Ok(());
2548 }
2549
2550 if self.side.is_client() {
2551 let params =
2553 self.crypto
2554 .transport_parameters()?
2555 .ok_or_else(|| TransportError {
2556 code: TransportErrorCode::crypto(0x6d),
2557 frame: None,
2558 reason: "transport parameters missing".into(),
2559 })?;
2560
2561 if self.has_0rtt() {
2562 if !self.crypto.early_data_accepted().unwrap() {
2563 debug_assert!(self.side.is_client());
2564 debug!("0-RTT rejected");
2565 self.accepted_0rtt = false;
2566 self.streams.zero_rtt_rejected();
2567
2568 self.spaces[SpaceId::Data].pending = Retransmits::default();
2570
2571 let sent_packets =
2573 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2574 for packet in sent_packets.into_values() {
2575 self.remove_in_flight(&packet);
2576 }
2577 } else {
2578 self.accepted_0rtt = true;
2579 params.validate_resumption_from(&self.peer_params)?;
2580 }
2581 }
2582 if let Some(token) = params.stateless_reset_token {
2583 self.endpoint_events
2584 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2585 }
2586 self.handle_peer_params(params)?;
2587 self.issue_first_cids(now);
2588 } else {
2589 self.spaces[SpaceId::Data].pending.handshake_done = true;
2591 self.discard_space(now, SpaceId::Handshake);
2592 }
2593
2594 self.events.push_back(Event::Connected);
2595 self.state = State::Established;
2596 trace!("established");
2597 Ok(())
2598 }
2599 Header::Initial(InitialHeader {
2600 src_cid: rem_cid, ..
2601 }) => {
2602 if !state.rem_cid_set {
2603 trace!("switching remote CID to {}", rem_cid);
2604 let mut state = state.clone();
2605 self.rem_cids.update_initial_cid(rem_cid);
2606 self.rem_handshake_cid = rem_cid;
2607 self.orig_rem_cid = rem_cid;
2608 state.rem_cid_set = true;
2609 self.state = State::Handshake(state);
2610 } else if rem_cid != self.rem_handshake_cid {
2611 debug!(
2612 "discarding packet with mismatched remote CID: {} != {}",
2613 self.rem_handshake_cid, rem_cid
2614 );
2615 return Ok(());
2616 }
2617
2618 let starting_space = self.highest_space;
2619 self.process_early_payload(now, packet)?;
2620
2621 if self.side.is_server()
2622 && starting_space == SpaceId::Initial
2623 && self.highest_space != SpaceId::Initial
2624 {
2625 let params =
2626 self.crypto
2627 .transport_parameters()?
2628 .ok_or_else(|| TransportError {
2629 code: TransportErrorCode::crypto(0x6d),
2630 frame: None,
2631 reason: "transport parameters missing".into(),
2632 })?;
2633 self.handle_peer_params(params)?;
2634 self.issue_first_cids(now);
2635 self.init_0rtt();
2636 }
2637 Ok(())
2638 }
2639 Header::Long {
2640 ty: LongType::ZeroRtt,
2641 ..
2642 } => {
2643 self.process_payload(now, remote, number.unwrap(), packet)?;
2644 Ok(())
2645 }
2646 Header::VersionNegotiate { .. } => {
2647 if self.total_authed_packets > 1 {
2648 return Ok(());
2649 }
2650 let supported = packet
2651 .payload
2652 .chunks(4)
2653 .any(|x| match <[u8; 4]>::try_from(x) {
2654 Ok(version) => self.version == u32::from_be_bytes(version),
2655 Err(_) => false,
2656 });
2657 if supported {
2658 return Ok(());
2659 }
2660 debug!("remote doesn't support our version");
2661 Err(ConnectionError::VersionMismatch)
2662 }
2663 Header::Short { .. } => unreachable!(
2664 "short packets received during handshake are discarded in handle_packet"
2665 ),
2666 }
2667 }
2668
2669 fn process_early_payload(
2671 &mut self,
2672 now: Instant,
2673 packet: Packet,
2674 ) -> Result<(), TransportError> {
2675 debug_assert_ne!(packet.header.space(), SpaceId::Data);
2676 let payload_len = packet.payload.len();
2677 let mut ack_eliciting = false;
2678 for result in frame::Iter::new(packet.payload.freeze())? {
2679 let frame = result?;
2680 let span = match frame {
2681 Frame::Padding => continue,
2682 _ => Some(trace_span!("frame", ty = %frame.ty())),
2683 };
2684
2685 self.stats.frame_rx.record(&frame);
2686
2687 let _guard = span.as_ref().map(|x| x.enter());
2688 ack_eliciting |= frame.is_ack_eliciting();
2689
2690 match frame {
2692 Frame::Padding | Frame::Ping => {}
2693 Frame::Crypto(frame) => {
2694 self.read_crypto(packet.header.space(), &frame, payload_len)?;
2695 }
2696 Frame::Ack(ack) => {
2697 self.on_ack_received(now, packet.header.space(), ack)?;
2698 }
2699 Frame::Close(reason) => {
2700 self.error = Some(reason.into());
2701 self.state = State::Draining;
2702 return Ok(());
2703 }
2704 _ => {
2705 let mut err =
2706 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
2707 err.frame = Some(frame.ty());
2708 return Err(err);
2709 }
2710 }
2711 }
2712
2713 if ack_eliciting {
2714 self.spaces[packet.header.space()]
2716 .pending_acks
2717 .set_immediate_ack_required();
2718 }
2719
2720 match (self.crypto.is_jls(),self.crypto.is_jls_enabled()) {
2722 (Some(true), true) => {
2723 debug!("JLS authenticated");
2724 }
2725 (Some(false), true) => {
2726 warn!("JLS authentication falied");
2727 if self.side() == Side::Server {
2728 return Ok(());
2729 }
2730 }
2731 (None, true) => {
2732 warn!("JLS not authenticated");
2733 }
2734 (_, false) => {
2735 debug!("JLS disabled");
2736 }
2737 }
2738
2739 self.write_crypto();
2740 Ok(())
2741 }
2742
2743 fn process_payload(
2744 &mut self,
2745 now: Instant,
2746 remote: SocketAddr,
2747 number: u64,
2748 packet: Packet,
2749 ) -> Result<(), TransportError> {
2750 let payload = packet.payload.freeze();
2751 let mut is_probing_packet = true;
2752 let mut close = None;
2753 let payload_len = payload.len();
2754 let mut ack_eliciting = false;
2755 for result in frame::Iter::new(payload)? {
2756 let frame = result?;
2757 let span = match frame {
2758 Frame::Padding => continue,
2759 _ => Some(trace_span!("frame", ty = %frame.ty())),
2760 };
2761
2762 self.stats.frame_rx.record(&frame);
2763 match &frame {
2766 Frame::Crypto(f) => {
2767 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
2768 }
2769 Frame::Stream(f) => {
2770 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
2771 }
2772 Frame::Datagram(f) => {
2773 trace!(len = f.data.len(), "got datagram frame");
2774 }
2775 f => {
2776 trace!("got frame {:?}", f);
2777 }
2778 }
2779
2780 let _guard = span.as_ref().map(|x| x.enter());
2781 if packet.header.is_0rtt() {
2782 match frame {
2783 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
2784 return Err(TransportError::PROTOCOL_VIOLATION(
2785 "illegal frame type in 0-RTT",
2786 ));
2787 }
2788 _ => {}
2789 }
2790 }
2791 ack_eliciting |= frame.is_ack_eliciting();
2792
2793 match frame {
2795 Frame::Padding
2796 | Frame::PathChallenge(_)
2797 | Frame::PathResponse(_)
2798 | Frame::NewConnectionId(_) => {}
2799 _ => {
2800 is_probing_packet = false;
2801 }
2802 }
2803 match frame {
2804 Frame::Crypto(frame) => {
2805 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
2806 }
2807 Frame::Stream(frame) => {
2808 if self.streams.received(frame, payload_len)?.should_transmit() {
2809 self.spaces[SpaceId::Data].pending.max_data = true;
2810 }
2811 }
2812 Frame::Ack(ack) => {
2813 self.on_ack_received(now, SpaceId::Data, ack)?;
2814 }
2815 Frame::Padding | Frame::Ping => {}
2816 Frame::Close(reason) => {
2817 close = Some(reason);
2818 }
2819 Frame::PathChallenge(token) => {
2820 self.path_responses.push(number, token, remote);
2821 if remote == self.path.remote {
2822 match self.peer_supports_ack_frequency() {
2825 true => self.immediate_ack(),
2826 false => self.ping(),
2827 }
2828 }
2829 }
2830 Frame::PathResponse(token) => {
2831 if self.path.challenge == Some(token) && remote == self.path.remote {
2832 trace!("new path validated");
2833 self.timers.stop(Timer::PathValidation);
2834 self.path.challenge = None;
2835 self.path.validated = true;
2836 if let Some((_, ref mut prev_path)) = self.prev_path {
2837 prev_path.challenge = None;
2838 prev_path.challenge_pending = false;
2839 }
2840 } else {
2841 debug!(token, "ignoring invalid PATH_RESPONSE");
2842 }
2843 }
2844 Frame::MaxData(bytes) => {
2845 self.streams.received_max_data(bytes);
2846 }
2847 Frame::MaxStreamData { id, offset } => {
2848 self.streams.received_max_stream_data(id, offset)?;
2849 }
2850 Frame::MaxStreams { dir, count } => {
2851 self.streams.received_max_streams(dir, count)?;
2852 }
2853 Frame::ResetStream(frame) => {
2854 if self.streams.received_reset(frame)?.should_transmit() {
2855 self.spaces[SpaceId::Data].pending.max_data = true;
2856 }
2857 }
2858 Frame::DataBlocked { offset } => {
2859 debug!(offset, "peer claims to be blocked at connection level");
2860 }
2861 Frame::StreamDataBlocked { id, offset } => {
2862 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
2863 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
2864 return Err(TransportError::STREAM_STATE_ERROR(
2865 "STREAM_DATA_BLOCKED on send-only stream",
2866 ));
2867 }
2868 debug!(
2869 stream = %id,
2870 offset, "peer claims to be blocked at stream level"
2871 );
2872 }
2873 Frame::StreamsBlocked { dir, limit } => {
2874 if limit > MAX_STREAM_COUNT {
2875 return Err(TransportError::FRAME_ENCODING_ERROR(
2876 "unrepresentable stream limit",
2877 ));
2878 }
2879 debug!(
2880 "peer claims to be blocked opening more than {} {} streams",
2881 limit, dir
2882 );
2883 }
2884 Frame::StopSending(frame::StopSending { id, error_code }) => {
2885 if id.initiator() != self.side.side() {
2886 if id.dir() == Dir::Uni {
2887 debug!("got STOP_SENDING on recv-only {}", id);
2888 return Err(TransportError::STREAM_STATE_ERROR(
2889 "STOP_SENDING on recv-only stream",
2890 ));
2891 }
2892 } else if self.streams.is_local_unopened(id) {
2893 return Err(TransportError::STREAM_STATE_ERROR(
2894 "STOP_SENDING on unopened stream",
2895 ));
2896 }
2897 self.streams.received_stop_sending(id, error_code);
2898 }
2899 Frame::RetireConnectionId { sequence } => {
2900 let allow_more_cids = self
2901 .local_cid_state
2902 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
2903 self.endpoint_events
2904 .push_back(EndpointEventInner::RetireConnectionId(
2905 now,
2906 sequence,
2907 allow_more_cids,
2908 ));
2909 }
2910 Frame::NewConnectionId(frame) => {
2911 trace!(
2912 sequence = frame.sequence,
2913 id = %frame.id,
2914 retire_prior_to = frame.retire_prior_to,
2915 );
2916 if self.rem_cids.active().is_empty() {
2917 return Err(TransportError::PROTOCOL_VIOLATION(
2918 "NEW_CONNECTION_ID when CIDs aren't in use",
2919 ));
2920 }
2921 if frame.retire_prior_to > frame.sequence {
2922 return Err(TransportError::PROTOCOL_VIOLATION(
2923 "NEW_CONNECTION_ID retiring unissued CIDs",
2924 ));
2925 }
2926
2927 use crate::cid_queue::InsertError;
2928 match self.rem_cids.insert(frame) {
2929 Ok(None) => {}
2930 Ok(Some((retired, reset_token))) => {
2931 let pending_retired =
2932 &mut self.spaces[SpaceId::Data].pending.retire_cids;
2933 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
2936 if (pending_retired.len() as u64)
2939 .saturating_add(retired.end.saturating_sub(retired.start))
2940 > MAX_PENDING_RETIRED_CIDS
2941 {
2942 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
2943 "queued too many retired CIDs",
2944 ));
2945 }
2946 pending_retired.extend(retired);
2947 self.set_reset_token(reset_token);
2948 }
2949 Err(InsertError::ExceedsLimit) => {
2950 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
2951 }
2952 Err(InsertError::Retired) => {
2953 trace!("discarding already-retired");
2954 self.spaces[SpaceId::Data]
2958 .pending
2959 .retire_cids
2960 .push(frame.sequence);
2961 continue;
2962 }
2963 };
2964
2965 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
2966 self.update_rem_cid();
2969 }
2970 }
2971 Frame::NewToken(NewToken { token }) => {
2972 let ConnectionSide::Client {
2973 token_store,
2974 server_name,
2975 ..
2976 } = &self.side
2977 else {
2978 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
2979 };
2980 if token.is_empty() {
2981 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
2982 }
2983 trace!("got new token");
2984 token_store.insert(server_name, token);
2985 }
2986 Frame::Datagram(datagram) => {
2987 if self
2988 .datagrams
2989 .received(datagram, &self.config.datagram_receive_buffer_size)?
2990 {
2991 self.events.push_back(Event::DatagramReceived);
2992 }
2993 }
2994 Frame::AckFrequency(ack_frequency) => {
2995 let space = &mut self.spaces[SpaceId::Data];
2997
2998 if !self
2999 .ack_frequency
3000 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3001 {
3002 continue;
3004 }
3005
3006 if let Some(timeout) = space
3009 .pending_acks
3010 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3011 {
3012 self.timers.set(Timer::MaxAckDelay, timeout);
3013 }
3014 }
3015 Frame::ImmediateAck => {
3016 self.spaces[SpaceId::Data]
3018 .pending_acks
3019 .set_immediate_ack_required();
3020 }
3021 Frame::HandshakeDone => {
3022 if self.side.is_server() {
3023 return Err(TransportError::PROTOCOL_VIOLATION(
3024 "client sent HANDSHAKE_DONE",
3025 ));
3026 }
3027 if self.spaces[SpaceId::Handshake].crypto.is_some() {
3028 self.discard_space(now, SpaceId::Handshake);
3029 }
3030 }
3031 }
3032 }
3033
3034 let space = &mut self.spaces[SpaceId::Data];
3035 if space
3036 .pending_acks
3037 .packet_received(now, number, ack_eliciting, &space.dedup)
3038 {
3039 self.timers
3040 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3041 }
3042
3043 let pending = &mut self.spaces[SpaceId::Data].pending;
3048 self.streams.queue_max_stream_id(pending);
3049
3050 if let Some(reason) = close {
3051 self.error = Some(reason.into());
3052 self.state = State::Draining;
3053 self.close = true;
3054 }
3055
3056 if remote != self.path.remote
3057 && !is_probing_packet
3058 && number == self.spaces[SpaceId::Data].rx_packet
3059 {
3060 let ConnectionSide::Server { ref server_config } = self.side else {
3061 panic!("packets from unknown remote should be dropped by clients");
3062 };
3063 debug_assert!(
3064 server_config.migration,
3065 "migration-initiating packets should have been dropped immediately"
3066 );
3067 self.migrate(now, remote);
3068 self.update_rem_cid();
3070 self.spin = false;
3071 }
3072
3073 Ok(())
3074 }
3075
3076 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3077 trace!(%remote, "migration initiated");
3078 self.path_counter = self.path_counter.wrapping_add(1);
3079 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3083 PathData::from_previous(remote, &self.path, self.path_counter, now)
3084 } else {
3085 let peer_max_udp_payload_size =
3086 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3087 .unwrap_or(u16::MAX);
3088 PathData::new(
3089 remote,
3090 self.allow_mtud,
3091 Some(peer_max_udp_payload_size),
3092 self.path_counter,
3093 now,
3094 &self.config,
3095 )
3096 };
3097 new_path.challenge = Some(self.rng.random());
3098 new_path.challenge_pending = true;
3099 let prev_pto = self.pto(SpaceId::Data);
3100
3101 let mut prev = mem::replace(&mut self.path, new_path);
3102 if prev.challenge.is_none() {
3104 prev.challenge = Some(self.rng.random());
3105 prev.challenge_pending = true;
3106 self.prev_path = Some((self.rem_cids.active(), prev));
3109 }
3110
3111 self.timers.set(
3112 Timer::PathValidation,
3113 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3114 );
3115 }
3116
3117 pub fn local_address_changed(&mut self) {
3119 self.update_rem_cid();
3120 self.ping();
3121 }
3122
3123 fn update_rem_cid(&mut self) {
3125 let (reset_token, retired) = match self.rem_cids.next() {
3126 Some(x) => x,
3127 None => return,
3128 };
3129
3130 self.spaces[SpaceId::Data]
3132 .pending
3133 .retire_cids
3134 .extend(retired);
3135 self.set_reset_token(reset_token);
3136 }
3137
3138 fn set_reset_token(&mut self, reset_token: ResetToken) {
3139 self.endpoint_events
3140 .push_back(EndpointEventInner::ResetToken(
3141 self.path.remote,
3142 reset_token,
3143 ));
3144 self.peer_params.stateless_reset_token = Some(reset_token);
3145 }
3146
3147 fn issue_first_cids(&mut self, now: Instant) {
3149 if self.local_cid_state.cid_len() == 0 {
3150 return;
3151 }
3152
3153 let mut n = self.peer_params.issue_cids_limit() - 1;
3155 if let ConnectionSide::Server { server_config } = &self.side {
3156 if server_config.has_preferred_address() {
3157 n -= 1;
3159 }
3160 }
3161 self.endpoint_events
3162 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3163 }
3164
3165 fn populate_packet(
3166 &mut self,
3167 now: Instant,
3168 space_id: SpaceId,
3169 buf: &mut Vec<u8>,
3170 max_size: usize,
3171 pn: u64,
3172 ) -> SentFrames {
3173 let mut sent = SentFrames::default();
3174 let space = &mut self.spaces[space_id];
3175 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3176 space.pending_acks.maybe_ack_non_eliciting();
3177
3178 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3180 buf.write(frame::FrameType::HANDSHAKE_DONE);
3181 sent.retransmits.get_or_create().handshake_done = true;
3182 self.stats.frame_tx.handshake_done =
3184 self.stats.frame_tx.handshake_done.saturating_add(1);
3185 }
3186
3187 if mem::replace(&mut space.ping_pending, false) {
3189 trace!("PING");
3190 buf.write(frame::FrameType::PING);
3191 sent.non_retransmits = true;
3192 self.stats.frame_tx.ping += 1;
3193 }
3194
3195 if mem::replace(&mut space.immediate_ack_pending, false) {
3197 trace!("IMMEDIATE_ACK");
3198 buf.write(frame::FrameType::IMMEDIATE_ACK);
3199 sent.non_retransmits = true;
3200 self.stats.frame_tx.immediate_ack += 1;
3201 }
3202
3203 if space.pending_acks.can_send() {
3205 Self::populate_acks(
3206 now,
3207 self.receiving_ecn,
3208 &mut sent,
3209 space,
3210 buf,
3211 &mut self.stats,
3212 );
3213 }
3214
3215 if mem::replace(&mut space.pending.ack_frequency, false) {
3217 let sequence_number = self.ack_frequency.next_sequence_number();
3218
3219 let config = self.config.ack_frequency_config.as_ref().unwrap();
3221
3222 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3224 self.path.rtt.get(),
3225 config,
3226 &self.peer_params,
3227 );
3228
3229 trace!(?max_ack_delay, "ACK_FREQUENCY");
3230
3231 frame::AckFrequency {
3232 sequence: sequence_number,
3233 ack_eliciting_threshold: config.ack_eliciting_threshold,
3234 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3235 reordering_threshold: config.reordering_threshold,
3236 }
3237 .encode(buf);
3238
3239 sent.retransmits.get_or_create().ack_frequency = true;
3240
3241 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3242 self.stats.frame_tx.ack_frequency += 1;
3243 }
3244
3245 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3247 if let Some(token) = self.path.challenge {
3249 self.path.challenge_pending = false;
3251 sent.non_retransmits = true;
3252 sent.requires_padding = true;
3253 trace!("PATH_CHALLENGE {:08x}", token);
3254 buf.write(frame::FrameType::PATH_CHALLENGE);
3255 buf.write(token);
3256 self.stats.frame_tx.path_challenge += 1;
3257 }
3258 }
3259
3260 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3262 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3263 sent.non_retransmits = true;
3264 sent.requires_padding = true;
3265 trace!("PATH_RESPONSE {:08x}", token);
3266 buf.write(frame::FrameType::PATH_RESPONSE);
3267 buf.write(token);
3268 self.stats.frame_tx.path_response += 1;
3269 }
3270 }
3271
3272 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3274 let mut frame = match space.pending.crypto.pop_front() {
3275 Some(x) => x,
3276 None => break,
3277 };
3278
3279 let max_crypto_data_size = max_size
3284 - buf.len()
3285 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3287 - 2; let len = frame
3290 .data
3291 .len()
3292 .min(2usize.pow(14) - 1)
3293 .min(max_crypto_data_size);
3294
3295 let data = frame.data.split_to(len);
3296 let truncated = frame::Crypto {
3297 offset: frame.offset,
3298 data,
3299 };
3300 trace!(
3301 "CRYPTO: off {} len {}",
3302 truncated.offset,
3303 truncated.data.len()
3304 );
3305 truncated.encode(buf);
3306 self.stats.frame_tx.crypto += 1;
3307 sent.retransmits.get_or_create().crypto.push_back(truncated);
3308 if !frame.data.is_empty() {
3309 frame.offset += len as u64;
3310 space.pending.crypto.push_front(frame);
3311 }
3312 }
3313
3314 if space_id == SpaceId::Data {
3315 self.streams.write_control_frames(
3316 buf,
3317 &mut space.pending,
3318 &mut sent.retransmits,
3319 &mut self.stats.frame_tx,
3320 max_size,
3321 );
3322 }
3323
3324 while buf.len() + NewConnectionId::SIZE_BOUND < max_size {
3326 let issued = match space.pending.new_cids.pop() {
3327 Some(x) => x,
3328 None => break,
3329 };
3330 trace!(
3331 sequence = issued.sequence,
3332 id = %issued.id,
3333 "NEW_CONNECTION_ID"
3334 );
3335 frame::NewConnectionId {
3336 sequence: issued.sequence,
3337 retire_prior_to: self.local_cid_state.retire_prior_to(),
3338 id: issued.id,
3339 reset_token: issued.reset_token,
3340 }
3341 .encode(buf);
3342 sent.retransmits.get_or_create().new_cids.push(issued);
3343 self.stats.frame_tx.new_connection_id += 1;
3344 }
3345
3346 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3348 let seq = match space.pending.retire_cids.pop() {
3349 Some(x) => x,
3350 None => break,
3351 };
3352 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3353 buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3354 buf.write_var(seq);
3355 sent.retransmits.get_or_create().retire_cids.push(seq);
3356 self.stats.frame_tx.retire_connection_id += 1;
3357 }
3358
3359 let mut sent_datagrams = false;
3361 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3362 match self.datagrams.write(buf, max_size) {
3363 true => {
3364 sent_datagrams = true;
3365 sent.non_retransmits = true;
3366 self.stats.frame_tx.datagram += 1;
3367 }
3368 false => break,
3369 }
3370 }
3371 if self.datagrams.send_blocked && sent_datagrams {
3372 self.events.push_back(Event::DatagramsUnblocked);
3373 self.datagrams.send_blocked = false;
3374 }
3375
3376 while let Some(remote_addr) = space.pending.new_tokens.pop() {
3378 debug_assert_eq!(space_id, SpaceId::Data);
3379 let ConnectionSide::Server { server_config } = &self.side else {
3380 panic!("NEW_TOKEN frames should not be enqueued by clients");
3381 };
3382
3383 if remote_addr != self.path.remote {
3384 continue;
3389 }
3390
3391 let token = Token::new(
3392 TokenPayload::Validation {
3393 ip: remote_addr.ip(),
3394 issued: server_config.time_source.now(),
3395 },
3396 &mut self.rng,
3397 );
3398 let new_token = NewToken {
3399 token: token.encode(&*server_config.token_key).into(),
3400 };
3401
3402 if buf.len() + new_token.size() >= max_size {
3403 space.pending.new_tokens.push(remote_addr);
3404 break;
3405 }
3406
3407 new_token.encode(buf);
3408 sent.retransmits
3409 .get_or_create()
3410 .new_tokens
3411 .push(remote_addr);
3412 self.stats.frame_tx.new_token += 1;
3413 }
3414
3415 if space_id == SpaceId::Data {
3417 sent.stream_frames =
3418 self.streams
3419 .write_stream_frames(buf, max_size, self.config.send_fairness);
3420 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
3421 }
3422
3423 sent
3424 }
3425
3426 fn populate_acks(
3431 now: Instant,
3432 receiving_ecn: bool,
3433 sent: &mut SentFrames,
3434 space: &mut PacketSpace,
3435 buf: &mut Vec<u8>,
3436 stats: &mut ConnectionStats,
3437 ) {
3438 debug_assert!(!space.pending_acks.ranges().is_empty());
3439
3440 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
3442 let ecn = if receiving_ecn {
3443 Some(&space.ecn_counters)
3444 } else {
3445 None
3446 };
3447 sent.largest_acked = space.pending_acks.ranges().max();
3448
3449 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
3450
3451 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
3453 let delay = delay_micros >> ack_delay_exp.into_inner();
3454
3455 trace!(
3456 "ACK {:?}, Delay = {}us",
3457 space.pending_acks.ranges(),
3458 delay_micros
3459 );
3460
3461 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
3462 stats.frame_tx.acks += 1;
3463 }
3464
3465 fn close_common(&mut self) {
3466 trace!("connection closed");
3467 for &timer in &Timer::VALUES {
3468 self.timers.stop(timer);
3469 }
3470 }
3471
3472 fn set_close_timer(&mut self, now: Instant) {
3473 self.timers
3474 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
3475 }
3476
3477 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
3479 if Some(self.orig_rem_cid) != params.initial_src_cid
3480 || (self.side.is_client()
3481 && (Some(self.initial_dst_cid) != params.original_dst_cid
3482 || self.retry_src_cid != params.retry_src_cid))
3483 {
3484 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
3485 "CID authentication failure",
3486 ));
3487 }
3488
3489 self.set_peer_params(params);
3490
3491 Ok(())
3492 }
3493
3494 fn set_peer_params(&mut self, params: TransportParameters) {
3495 self.streams.set_params(¶ms);
3496 self.idle_timeout =
3497 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
3498 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
3499 if let Some(ref info) = params.preferred_address {
3500 self.rem_cids.insert(frame::NewConnectionId {
3501 sequence: 1,
3502 id: info.connection_id,
3503 reset_token: info.stateless_reset_token,
3504 retire_prior_to: 0,
3505 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
3506 }
3507 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
3508 self.peer_params = params;
3509 self.path.mtud.on_peer_max_udp_payload_size_received(
3510 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
3511 );
3512 }
3513
3514 fn decrypt_packet(
3515 &mut self,
3516 now: Instant,
3517 packet: &mut Packet,
3518 ) -> Result<Option<u64>, Option<TransportError>> {
3519 let result = packet_crypto::decrypt_packet_body(
3520 packet,
3521 &self.spaces,
3522 self.zero_rtt_crypto.as_ref(),
3523 self.key_phase,
3524 self.prev_crypto.as_ref(),
3525 self.next_crypto.as_ref(),
3526 )?;
3527
3528 let result = match result {
3529 Some(r) => r,
3530 None => return Ok(None),
3531 };
3532
3533 if result.outgoing_key_update_acked {
3534 if let Some(prev) = self.prev_crypto.as_mut() {
3535 prev.end_packet = Some((result.number, now));
3536 self.set_key_discard_timer(now, packet.header.space());
3537 }
3538 }
3539
3540 if result.incoming_key_update {
3541 trace!("key update authenticated");
3542 self.update_keys(Some((result.number, now)), true);
3543 self.set_key_discard_timer(now, packet.header.space());
3544 }
3545
3546 Ok(Some(result.number))
3547 }
3548
3549 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
3550 trace!("executing key update");
3551 let new = self
3555 .crypto
3556 .next_1rtt_keys()
3557 .expect("only called for `Data` packets");
3558 self.key_phase_size = new
3559 .local
3560 .confidentiality_limit()
3561 .saturating_sub(KEY_UPDATE_MARGIN);
3562 let old = mem::replace(
3563 &mut self.spaces[SpaceId::Data]
3564 .crypto
3565 .as_mut()
3566 .unwrap() .packet,
3568 mem::replace(self.next_crypto.as_mut().unwrap(), new),
3569 );
3570 self.spaces[SpaceId::Data].sent_with_keys = 0;
3571 self.prev_crypto = Some(PrevCrypto {
3572 crypto: old,
3573 end_packet,
3574 update_unacked: remote,
3575 });
3576 self.key_phase = !self.key_phase;
3577 }
3578
3579 fn peer_supports_ack_frequency(&self) -> bool {
3580 self.peer_params.min_ack_delay.is_some()
3581 }
3582
3583 pub(crate) fn immediate_ack(&mut self) {
3588 self.spaces[self.highest_space].immediate_ack_pending = true;
3589 }
3590
3591 #[cfg(test)]
3593 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
3594 let (first_decode, remaining) = match &event.0 {
3595 ConnectionEventInner::Datagram(DatagramConnectionEvent {
3596 first_decode,
3597 remaining,
3598 ..
3599 }) => (first_decode, remaining),
3600 _ => return None,
3601 };
3602
3603 if remaining.is_some() {
3604 panic!("Packets should never be coalesced in tests");
3605 }
3606
3607 let decrypted_header = packet_crypto::unprotect_header(
3608 first_decode.clone(),
3609 &self.spaces,
3610 self.zero_rtt_crypto.as_ref(),
3611 self.peer_params.stateless_reset_token,
3612 )?;
3613
3614 let mut packet = decrypted_header.packet?;
3615 packet_crypto::decrypt_packet_body(
3616 &mut packet,
3617 &self.spaces,
3618 self.zero_rtt_crypto.as_ref(),
3619 self.key_phase,
3620 self.prev_crypto.as_ref(),
3621 self.next_crypto.as_ref(),
3622 )
3623 .ok()?;
3624
3625 Some(packet.payload.to_vec())
3626 }
3627
3628 #[cfg(test)]
3631 pub(crate) fn bytes_in_flight(&self) -> u64 {
3632 self.path.in_flight.bytes
3633 }
3634
3635 #[cfg(test)]
3637 pub(crate) fn congestion_window(&self) -> u64 {
3638 self.path
3639 .congestion
3640 .window()
3641 .saturating_sub(self.path.in_flight.bytes)
3642 }
3643
3644 #[cfg(test)]
3646 pub(crate) fn is_idle(&self) -> bool {
3647 Timer::VALUES
3648 .iter()
3649 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
3650 .filter_map(|&t| Some((t, self.timers.get(t)?)))
3651 .min_by_key(|&(_, time)| time)
3652 .map_or(true, |(timer, _)| timer == Timer::Idle)
3653 }
3654
3655 #[cfg(test)]
3657 pub(crate) fn using_ecn(&self) -> bool {
3658 self.path.sending_ecn
3659 }
3660
3661 #[cfg(test)]
3663 pub(crate) fn total_recvd(&self) -> u64 {
3664 self.path.total_recvd
3665 }
3666
3667 #[cfg(test)]
3668 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
3669 self.local_cid_state.active_seq()
3670 }
3671
3672 #[cfg(test)]
3675 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
3676 let n = self.local_cid_state.assign_retire_seq(v);
3677 self.endpoint_events
3678 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3679 }
3680
3681 #[cfg(test)]
3683 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
3684 self.rem_cids.active_seq()
3685 }
3686
3687 #[cfg(test)]
3689 pub(crate) fn path_mtu(&self) -> u16 {
3690 self.path.current_mtu()
3691 }
3692
3693 fn can_send_1rtt(&self, max_size: usize) -> bool {
3697 self.streams.can_send_stream_data()
3698 || self.path.challenge_pending
3699 || self
3700 .prev_path
3701 .as_ref()
3702 .is_some_and(|(_, x)| x.challenge_pending)
3703 || !self.path_responses.is_empty()
3704 || self
3705 .datagrams
3706 .outgoing
3707 .front()
3708 .is_some_and(|x| x.size(true) <= max_size)
3709 }
3710
3711 fn remove_in_flight(&mut self, packet: &SentPacket) {
3713 for path in [&mut self.path]
3715 .into_iter()
3716 .chain(self.prev_path.as_mut().map(|(_, data)| data))
3717 {
3718 if path.remove_in_flight(packet) {
3719 return;
3720 }
3721 }
3722 }
3723
3724 fn kill(&mut self, reason: ConnectionError) {
3726 self.close_common();
3727 self.error = Some(reason);
3728 self.state = State::Drained;
3729 self.endpoint_events.push_back(EndpointEventInner::Drained);
3730 }
3731
3732 pub fn current_mtu(&self) -> u16 {
3736 self.path.current_mtu()
3737 }
3738
3739 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
3746 let pn_len = match pn {
3747 Some(pn) => PacketNumber::new(
3748 pn,
3749 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
3750 )
3751 .len(),
3752 None => 4,
3754 };
3755
3756 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
3758 }
3759
3760 fn tag_len_1rtt(&self) -> usize {
3761 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
3762 Some(crypto) => Some(&*crypto.packet.local),
3763 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
3764 };
3765 key.map_or(16, |x| x.tag_len())
3769 }
3770
3771 fn on_path_validated(&mut self) {
3773 self.path.validated = true;
3774 let ConnectionSide::Server { server_config } = &self.side else {
3775 return;
3776 };
3777 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
3778 new_tokens.clear();
3779 for _ in 0..server_config.validation_token.sent {
3780 new_tokens.push(self.path.remote);
3781 }
3782 }
3783}
3784
3785impl fmt::Debug for Connection {
3786 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3787 f.debug_struct("Connection")
3788 .field("handshake_cid", &self.handshake_cid)
3789 .finish()
3790 }
3791}
3792
3793enum ConnectionSide {
3795 Client {
3796 token: Bytes,
3798 token_store: Arc<dyn TokenStore>,
3799 server_name: String,
3800 },
3801 Server {
3802 server_config: Arc<ServerConfig>,
3803 },
3804}
3805
3806impl ConnectionSide {
3807 fn remote_may_migrate(&self) -> bool {
3808 match self {
3809 Self::Server { server_config } => server_config.migration,
3810 Self::Client { .. } => false,
3811 }
3812 }
3813
3814 fn is_client(&self) -> bool {
3815 self.side().is_client()
3816 }
3817
3818 fn is_server(&self) -> bool {
3819 self.side().is_server()
3820 }
3821
3822 fn side(&self) -> Side {
3823 match *self {
3824 Self::Client { .. } => Side::Client,
3825 Self::Server { .. } => Side::Server,
3826 }
3827 }
3828}
3829
3830impl From<SideArgs> for ConnectionSide {
3831 fn from(side: SideArgs) -> Self {
3832 match side {
3833 SideArgs::Client {
3834 token_store,
3835 server_name,
3836 } => Self::Client {
3837 token: token_store.take(&server_name).unwrap_or_default(),
3838 token_store,
3839 server_name,
3840 },
3841 SideArgs::Server {
3842 server_config,
3843 pref_addr_cid: _,
3844 path_validated: _,
3845 } => Self::Server { server_config },
3846 }
3847 }
3848}
3849
3850pub(crate) enum SideArgs {
3852 Client {
3853 token_store: Arc<dyn TokenStore>,
3854 server_name: String,
3855 },
3856 Server {
3857 server_config: Arc<ServerConfig>,
3858 pref_addr_cid: Option<ConnectionId>,
3859 path_validated: bool,
3860 },
3861}
3862
3863impl SideArgs {
3864 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
3865 match *self {
3866 Self::Client { .. } => None,
3867 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
3868 }
3869 }
3870
3871 pub(crate) fn path_validated(&self) -> bool {
3872 match *self {
3873 Self::Client { .. } => true,
3874 Self::Server { path_validated, .. } => path_validated,
3875 }
3876 }
3877
3878 pub(crate) fn side(&self) -> Side {
3879 match *self {
3880 Self::Client { .. } => Side::Client,
3881 Self::Server { .. } => Side::Server,
3882 }
3883 }
3884}
3885
3886#[derive(Debug, Error, Clone, PartialEq, Eq)]
3888pub enum ConnectionError {
3889 #[error("peer doesn't implement any supported version")]
3891 VersionMismatch,
3892 #[error(transparent)]
3894 TransportError(#[from] TransportError),
3895 #[error("aborted by peer: {0}")]
3897 ConnectionClosed(frame::ConnectionClose),
3898 #[error("closed by peer: {0}")]
3900 ApplicationClosed(frame::ApplicationClose),
3901 #[error("reset by peer")]
3903 Reset,
3904 #[error("timed out")]
3910 TimedOut,
3911 #[error("closed")]
3913 LocallyClosed,
3914 #[error("CIDs exhausted")]
3918 CidsExhausted,
3919
3920 #[error("JLS Authentication failed")]
3922 JlsAuthFailed(JlsAuthInner),
3923
3924 #[error("JLS Forward Error")]
3926 JlsForwardError(String),
3927}
3928
3929#[derive(Debug, Clone, Eq, PartialEq)]
3930pub struct JlsAuthInner {
3931 pub upstream_addr: Option<String>,
3932}
3933
3934impl From<Close> for ConnectionError {
3935 fn from(x: Close) -> Self {
3936 match x {
3937 Close::Connection(reason) => Self::ConnectionClosed(reason),
3938 Close::Application(reason) => Self::ApplicationClosed(reason),
3939 }
3940 }
3941}
3942
3943impl From<ConnectionError> for io::Error {
3945 fn from(x: ConnectionError) -> Self {
3946 use ConnectionError::*;
3947 let kind = match x {
3948 TimedOut => io::ErrorKind::TimedOut,
3949 Reset => io::ErrorKind::ConnectionReset,
3950 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
3951 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
3952 io::ErrorKind::Other
3953 }
3954 _ => io::ErrorKind::Other,
3955 };
3956 Self::new(kind, x)
3957 }
3958}
3959
3960#[allow(unreachable_pub)] #[derive(Clone)]
3962pub enum State {
3963 Handshake(state::Handshake),
3964 Established,
3965 Closed(state::Closed),
3966 Draining,
3967 Drained,
3969}
3970
3971impl State {
3972 fn closed<R: Into<Close>>(reason: R) -> Self {
3973 Self::Closed(state::Closed {
3974 reason: reason.into(),
3975 })
3976 }
3977
3978 fn is_handshake(&self) -> bool {
3979 matches!(*self, Self::Handshake(_))
3980 }
3981
3982 fn is_established(&self) -> bool {
3983 matches!(*self, Self::Established)
3984 }
3985
3986 fn is_closed(&self) -> bool {
3987 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
3988 }
3989
3990 fn is_drained(&self) -> bool {
3991 matches!(*self, Self::Drained)
3992 }
3993}
3994
3995mod state {
3996 use super::*;
3997
3998 #[allow(unreachable_pub)] #[derive(Clone)]
4000 pub struct Handshake {
4001 pub(super) rem_cid_set: bool,
4005 pub(super) expected_token: Bytes,
4009 pub(super) client_hello: Option<Bytes>,
4013 }
4014
4015 #[allow(unreachable_pub)] #[derive(Clone)]
4017 pub struct Closed {
4018 pub(super) reason: Close,
4019 }
4020}
4021
4022#[derive(Debug)]
4024pub enum Event {
4025 HandshakeDataReady,
4027 Connected,
4029 ConnectionLost {
4033 reason: ConnectionError,
4035 },
4036 Stream(StreamEvent),
4038 DatagramReceived,
4040 DatagramsUnblocked,
4042}
4043
4044fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
4045 if x > y { x - y } else { Duration::ZERO }
4046}
4047
4048fn get_max_ack_delay(params: &TransportParameters) -> Duration {
4049 Duration::from_micros(params.max_ack_delay.0 * 1000)
4050}
4051
4052const MAX_BACKOFF_EXPONENT: u32 = 16;
4054
4055const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
4063
4064const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
4070 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
4071
4072const KEY_UPDATE_MARGIN: u64 = 10_000;
4076
4077#[derive(Default)]
4078struct SentFrames {
4079 retransmits: ThinRetransmits,
4080 largest_acked: Option<u64>,
4081 stream_frames: StreamMetaVec,
4082 non_retransmits: bool,
4084 requires_padding: bool,
4085}
4086
4087impl SentFrames {
4088 fn is_ack_only(&self, streams: &StreamsState) -> bool {
4090 self.largest_acked.is_some()
4091 && !self.non_retransmits
4092 && self.stream_frames.is_empty()
4093 && self.retransmits.is_empty(streams)
4094 }
4095}
4096
4097fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
4105 match (x, y) {
4106 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
4107 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
4108 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
4109 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
4110 }
4111}
4112
4113#[cfg(test)]
4114mod tests {
4115 use super::*;
4116
4117 #[test]
4118 fn negotiate_max_idle_timeout_commutative() {
4119 let test_params = [
4120 (None, None, None),
4121 (None, Some(VarInt(0)), None),
4122 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
4123 (Some(VarInt(0)), Some(VarInt(0)), None),
4124 (
4125 Some(VarInt(2)),
4126 Some(VarInt(0)),
4127 Some(Duration::from_millis(2)),
4128 ),
4129 (
4130 Some(VarInt(1)),
4131 Some(VarInt(4)),
4132 Some(Duration::from_millis(1)),
4133 ),
4134 ];
4135
4136 for (left, right, result) in test_params {
4137 assert_eq!(negotiate_max_idle_timeout(left, right), result);
4138 assert_eq!(negotiate_max_idle_timeout(right, left), result);
4139 }
4140 }
4141}