1use std::{
2 cmp,
3 collections::VecDeque,
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12use rand::{rngs::StdRng, Rng, SeedableRng};
13use thiserror::Error;
14use tracing::{debug, error, trace, trace_span, warn};
15
16use crate::{
17 cid_generator::ConnectionIdGenerator,
18 cid_queue::CidQueue,
19 coding::BufMutExt,
20 config::{ServerConfig, TransportConfig},
21 crypto::{self, KeyPair, Keys, PacketKey},
22 frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
23 packet::{
24 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
25 PacketNumber, PartialDecode, SpaceId,
26 },
27 range_set::ArrayRangeSet,
28 shared::{
29 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
30 EndpointEvent, EndpointEventInner,
31 },
32 token::{ResetToken, Token, TokenPayload},
33 transport_parameters::TransportParameters,
34 Dir, Duration, EndpointConfig, Frame, Instant, Side, StreamId, TokenStore, Transmit,
35 TransportError, TransportErrorCode, VarInt, INITIAL_MTU, MAX_CID_SIZE, MAX_STREAM_COUNT,
36 MIN_INITIAL_SIZE, TIMER_GRANULARITY,
37};
38
39mod ack_frequency;
40use ack_frequency::AckFrequencyState;
41
42mod assembler;
43pub use assembler::Chunk;
44
45mod cid_state;
46use cid_state::CidState;
47
48mod datagrams;
49use datagrams::DatagramState;
50pub use datagrams::{Datagrams, SendDatagramError};
51
52mod mtud;
53mod pacing;
54
55mod packet_builder;
56use packet_builder::PacketBuilder;
57
58mod packet_crypto;
59use packet_crypto::{PrevCrypto, ZeroRttCrypto};
60
61mod paths;
62pub use paths::RttEstimator;
63use paths::{PathData, PathResponses};
64
65mod send_buffer;
66
67mod spaces;
68#[cfg(fuzzing)]
69pub use spaces::Retransmits;
70#[cfg(not(fuzzing))]
71use spaces::Retransmits;
72use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
73
74mod stats;
75pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
76
77mod streams;
78#[cfg(fuzzing)]
79pub use streams::StreamsState;
80#[cfg(not(fuzzing))]
81use streams::StreamsState;
82pub use streams::{
83 BytesSource, Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream,
84 SendStream, ShouldTransmit, StreamEvent, Streams, WriteError, Written,
85};
86
87mod timer;
88use crate::congestion::Controller;
89use timer::{Timer, TimerTable};
90
91pub struct Connection {
131 endpoint_config: Arc<EndpointConfig>,
132 config: Arc<TransportConfig>,
133 rng: StdRng,
134 crypto: Box<dyn crypto::Session>,
135 handshake_cid: ConnectionId,
137 rem_handshake_cid: ConnectionId,
139 local_ip: Option<IpAddr>,
142 path: PathData,
143 allow_mtud: bool,
145 prev_path: Option<(ConnectionId, PathData)>,
146 state: State,
147 side: ConnectionSide,
148 zero_rtt_enabled: bool,
150 zero_rtt_crypto: Option<ZeroRttCrypto>,
152 key_phase: bool,
153 key_phase_size: u64,
155 peer_params: TransportParameters,
157 orig_rem_cid: ConnectionId,
159 initial_dst_cid: ConnectionId,
161 retry_src_cid: Option<ConnectionId>,
164 lost_packets: u64,
166 events: VecDeque<Event>,
167 endpoint_events: VecDeque<EndpointEventInner>,
168 spin_enabled: bool,
170 spin: bool,
172 spaces: [PacketSpace; 3],
174 highest_space: SpaceId,
176 prev_crypto: Option<PrevCrypto>,
178 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
183 accepted_0rtt: bool,
184 permit_idle_reset: bool,
186 idle_timeout: Option<Duration>,
188 timers: TimerTable,
189 authentication_failures: u64,
191 error: Option<ConnectionError>,
193 packet_number_filter: PacketNumberFilter,
195
196 path_responses: PathResponses,
201 close: bool,
202
203 ack_frequency: AckFrequencyState,
207
208 pto_count: u32,
213
214 receiving_ecn: bool,
219 total_authed_packets: u64,
221 app_limited: bool,
224
225 next_observed_addr_seq_no: VarInt,
230
231 streams: StreamsState,
232 rem_cids: CidQueue,
234 local_cid_state: CidState,
236 datagrams: DatagramState,
238 stats: ConnectionStats,
240 version: u32,
242}
243
244impl Connection {
245 pub(crate) fn new(
246 endpoint_config: Arc<EndpointConfig>,
247 config: Arc<TransportConfig>,
248 init_cid: ConnectionId,
249 loc_cid: ConnectionId,
250 rem_cid: ConnectionId,
251 remote: SocketAddr,
252 local_ip: Option<IpAddr>,
253 crypto: Box<dyn crypto::Session>,
254 cid_gen: &dyn ConnectionIdGenerator,
255 now: Instant,
256 version: u32,
257 allow_mtud: bool,
258 rng_seed: [u8; 32],
259 side_args: SideArgs,
260 ) -> Self {
261 let pref_addr_cid = side_args.pref_addr_cid();
262 let path_validated = side_args.path_validated();
263 let connection_side = ConnectionSide::from(side_args);
264 let side = connection_side.side();
265 let initial_space = PacketSpace {
266 crypto: Some(crypto.initial_keys(&init_cid, side)),
267 ..PacketSpace::new(now)
268 };
269 let state = State::Handshake(state::Handshake {
270 rem_cid_set: side.is_server(),
271 expected_token: Bytes::new(),
272 client_hello: None,
273 });
274 let mut rng = StdRng::from_seed(rng_seed);
275 let mut this = Self {
276 endpoint_config,
277 crypto,
278 handshake_cid: loc_cid,
279 rem_handshake_cid: rem_cid,
280 local_cid_state: CidState::new(
281 cid_gen.cid_len(),
282 cid_gen.cid_lifetime(),
283 now,
284 if pref_addr_cid.is_some() { 2 } else { 1 },
285 ),
286 path: PathData::new(remote, allow_mtud, None, now, &config),
287 allow_mtud,
288 local_ip,
289 prev_path: None,
290 state,
291 side: connection_side,
292 zero_rtt_enabled: false,
293 zero_rtt_crypto: None,
294 key_phase: false,
295 key_phase_size: rng.gen_range(10..1000),
302 peer_params: TransportParameters::default(),
303 orig_rem_cid: rem_cid,
304 initial_dst_cid: init_cid,
305 retry_src_cid: None,
306 lost_packets: 0,
307 events: VecDeque::new(),
308 endpoint_events: VecDeque::new(),
309 spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
310 spin: false,
311 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
312 highest_space: SpaceId::Initial,
313 prev_crypto: None,
314 next_crypto: None,
315 accepted_0rtt: false,
316 permit_idle_reset: true,
317 idle_timeout: match config.max_idle_timeout {
318 None | Some(VarInt(0)) => None,
319 Some(dur) => Some(Duration::from_millis(dur.0)),
320 },
321 timers: TimerTable::default(),
322 authentication_failures: 0,
323 error: None,
324 #[cfg(test)]
325 packet_number_filter: match config.deterministic_packet_numbers {
326 false => PacketNumberFilter::new(&mut rng),
327 true => PacketNumberFilter::disabled(),
328 },
329 #[cfg(not(test))]
330 packet_number_filter: PacketNumberFilter::new(&mut rng),
331
332 path_responses: PathResponses::default(),
333 close: false,
334
335 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
336 &TransportParameters::default(),
337 )),
338
339 pto_count: 0,
340
341 app_limited: false,
342 receiving_ecn: false,
343 total_authed_packets: 0,
344
345 next_observed_addr_seq_no: 0u32.into(),
346
347 streams: StreamsState::new(
348 side,
349 config.max_concurrent_uni_streams,
350 config.max_concurrent_bidi_streams,
351 config.send_window,
352 config.receive_window,
353 config.stream_receive_window,
354 ),
355 datagrams: DatagramState::default(),
356 config,
357 rem_cids: CidQueue::new(rem_cid),
358 rng,
359 stats: ConnectionStats::default(),
360 version,
361 };
362 if path_validated {
363 this.on_path_validated();
364 }
365 if side.is_client() {
366 this.write_crypto();
368 this.init_0rtt();
369 }
370 this
371 }
372
373 #[must_use]
381 pub fn poll_timeout(&mut self) -> Option<Instant> {
382 self.timers.next_timeout()
383 }
384
385 #[must_use]
391 pub fn poll(&mut self) -> Option<Event> {
392 if let Some(x) = self.events.pop_front() {
393 return Some(x);
394 }
395
396 if let Some(event) = self.streams.poll() {
397 return Some(Event::Stream(event));
398 }
399
400 if let Some(err) = self.error.take() {
401 return Some(Event::ConnectionLost { reason: err });
402 }
403
404 None
405 }
406
407 #[must_use]
409 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
410 self.endpoint_events.pop_front().map(EndpointEvent)
411 }
412
413 #[must_use]
415 pub fn streams(&mut self) -> Streams<'_> {
416 Streams {
417 state: &mut self.streams,
418 conn_state: &self.state,
419 }
420 }
421
422 #[must_use]
424 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
425 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
426 RecvStream {
427 id,
428 state: &mut self.streams,
429 pending: &mut self.spaces[SpaceId::Data].pending,
430 }
431 }
432
433 #[must_use]
435 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
436 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
437 SendStream {
438 id,
439 state: &mut self.streams,
440 pending: &mut self.spaces[SpaceId::Data].pending,
441 conn_state: &self.state,
442 }
443 }
444
445 #[must_use]
455 pub fn poll_transmit(
456 &mut self,
457 now: Instant,
458 max_datagrams: usize,
459 buf: &mut Vec<u8>,
460 ) -> Option<Transmit> {
461 assert!(max_datagrams != 0);
462 let max_datagrams = match self.config.enable_segmentation_offload {
463 false => 1,
464 true => max_datagrams.min(MAX_TRANSMIT_SEGMENTS),
465 };
466
467 let mut num_datagrams = 0;
468 let mut datagram_start = 0;
471 let mut segment_size = usize::from(self.path.current_mtu());
472
473 if let Some((prev_cid, ref mut prev_path)) = self.prev_path {
475 if prev_path.challenge_pending {
476 prev_path.challenge_pending = false;
477 let token = prev_path
478 .challenge
479 .expect("previous path challenge pending without token");
480 let destination = prev_path.remote;
481 debug_assert_eq!(
482 self.highest_space,
483 SpaceId::Data,
484 "PATH_CHALLENGE queued without 1-RTT keys"
485 );
486 buf.reserve(MIN_INITIAL_SIZE as usize);
487
488 let buf_capacity = buf.capacity();
489
490 let mut builder = PacketBuilder::new(
496 now,
497 SpaceId::Data,
498 prev_cid,
499 buf,
500 buf_capacity,
501 0,
502 false,
503 self,
504 )?;
505 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
506 buf.write(frame::FrameType::PATH_CHALLENGE);
507 buf.write(token);
508 self.stats.frame_tx.path_challenge += 1;
509
510 builder.pad_to(MIN_INITIAL_SIZE);
515
516 builder.finish(self, buf);
517 self.stats.udp_tx.on_sent(1, buf.len());
518 return Some(Transmit {
519 destination,
520 size: buf.len(),
521 ecn: None,
522 segment_size: None,
523 src_ip: self.local_ip,
524 });
525 }
526 }
527
528 for space in SpaceId::iter() {
530 let request_immediate_ack =
531 space == SpaceId::Data && self.peer_supports_ack_frequency();
532 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
533 }
534
535 let close = match self.state {
537 State::Drained => {
538 self.app_limited = true;
539 return None;
540 }
541 State::Draining | State::Closed(_) => {
542 if !self.close {
545 self.app_limited = true;
546 return None;
547 }
548 true
549 }
550 _ => false,
551 };
552
553 if let Some(config) = &self.config.ack_frequency_config {
555 self.spaces[SpaceId::Data].pending.ack_frequency = self
556 .ack_frequency
557 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
558 && self.highest_space == SpaceId::Data
559 && self.peer_supports_ack_frequency();
560 }
561
562 let mut buf_capacity = 0;
566
567 let mut coalesce = true;
568 let mut builder_storage: Option<PacketBuilder> = None;
569 let mut sent_frames = None;
570 let mut pad_datagram = false;
571 let mut congestion_blocked = false;
572
573 let mut space_idx = 0;
575 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
576 while space_idx < spaces.len() {
579 let space_id = spaces[space_idx];
580 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
587 let frame_space_1rtt =
588 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
589
590 let can_send = self.space_can_send(space_id, frame_space_1rtt);
592 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
593 space_idx += 1;
594 continue;
595 }
596
597 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
598 || self.spaces[space_id].ping_pending
599 || self.spaces[space_id].immediate_ack_pending;
600 if space_id == SpaceId::Data {
601 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
602 }
603
604 let buf_end = if let Some(builder) = &builder_storage {
608 buf.len().max(builder.min_size) + builder.tag_len
609 } else {
610 buf.len()
611 };
612
613 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
614 crypto.packet.local.tag_len()
615 } else if space_id == SpaceId::Data {
616 self.zero_rtt_crypto.as_ref().expect(
617 "sending packets in the application data space requires known 0-RTT or 1-RTT keys",
618 ).packet.tag_len()
619 } else {
620 unreachable!("tried to send {:?} packet without keys", space_id)
621 };
622 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
623 if buf_capacity >= segment_size * max_datagrams {
627 break;
629 }
630
631 if self
638 .path
639 .anti_amplification_blocked(segment_size as u64 * num_datagrams + 1)
640 {
641 trace!("blocked by anti-amplification");
642 break;
643 }
644
645 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
648 let untracked_bytes = if let Some(builder) = &builder_storage {
650 buf_capacity - builder.partial_encode.start
651 } else {
652 0
653 } as u64;
654 debug_assert!(untracked_bytes <= segment_size as u64);
655
656 let bytes_to_send = segment_size as u64 + untracked_bytes;
657 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
658 space_idx += 1;
659 congestion_blocked = true;
660 trace!("blocked by congestion control");
663 continue;
664 }
665
666 let smoothed_rtt = self.path.rtt.get();
668 if let Some(delay) = self.path.pacing.delay(
669 smoothed_rtt,
670 bytes_to_send,
671 self.path.current_mtu(),
672 self.path.congestion.window(),
673 now,
674 ) {
675 self.timers.set(Timer::Pacing, delay);
676 congestion_blocked = true;
677 trace!("blocked by pacing");
680 break;
681 }
682 }
683
684 if let Some(mut builder) = builder_storage.take() {
686 if pad_datagram {
687 builder.pad_to(MIN_INITIAL_SIZE);
688 }
689
690 if num_datagrams > 1 {
691 const MAX_PADDING: usize = 16;
704 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
705 - datagram_start
706 + builder.tag_len;
707 if packet_len_unpadded + MAX_PADDING < segment_size
708 || datagram_start + segment_size > buf_capacity
709 {
710 trace!(
711 "GSO truncated by demand for {} padding bytes or loss probe",
712 segment_size - packet_len_unpadded
713 );
714 builder_storage = Some(builder);
715 break;
716 }
717
718 builder.pad_to(segment_size as u16);
721 }
722
723 builder.finish_and_track(now, self, sent_frames.take(), buf);
724
725 if num_datagrams == 1 {
726 segment_size = buf.len();
733 buf_capacity = buf.len();
736
737 if space_id == SpaceId::Data {
744 let frame_space_1rtt =
745 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
746 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
747 break;
748 }
749 }
750 }
751 }
752
753 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
755 0 => segment_size,
756 _ => {
757 self.spaces[space_id].loss_probes -= 1;
758 usize::from(INITIAL_MTU)
762 }
763 };
764 buf_capacity += next_datagram_size_limit;
765 if buf.capacity() < buf_capacity {
766 buf.reserve(max_datagrams * segment_size);
775 }
776 num_datagrams += 1;
777 coalesce = true;
778 pad_datagram = false;
779 datagram_start = buf.len();
780
781 debug_assert_eq!(
782 datagram_start % segment_size,
783 0,
784 "datagrams in a GSO batch must be aligned to the segment size"
785 );
786 } else {
787 if let Some(builder) = builder_storage.take() {
791 builder.finish_and_track(now, self, sent_frames.take(), buf);
792 }
793 }
794
795 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
796
797 if self.spaces[SpaceId::Initial].crypto.is_some()
802 && space_id == SpaceId::Handshake
803 && self.side.is_client()
804 {
805 self.discard_space(now, SpaceId::Initial);
808 }
809 if let Some(ref mut prev) = self.prev_crypto {
810 prev.update_unacked = false;
811 }
812
813 debug_assert!(
814 builder_storage.is_none() && sent_frames.is_none(),
815 "Previous packet must have been finished"
816 );
817
818 let builder = builder_storage.insert(PacketBuilder::new(
819 now,
820 space_id,
821 self.rem_cids.active(),
822 buf,
823 buf_capacity,
824 datagram_start,
825 ack_eliciting,
826 self,
827 )?);
828 coalesce = coalesce && !builder.short_header;
829
830 pad_datagram |=
832 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
833
834 if close {
835 trace!("sending CONNECTION_CLOSE");
836 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
841 Self::populate_acks(
842 now,
843 self.receiving_ecn,
844 &mut SentFrames::default(),
845 &mut self.spaces[space_id],
846 buf,
847 &mut self.stats,
848 );
849 }
850
851 debug_assert!(
855 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
856 "ACKs should leave space for ConnectionClose"
857 );
858 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
859 let max_frame_size = builder.max_size - buf.len();
860 match self.state {
861 State::Closed(state::Closed { ref reason }) => {
862 if space_id == SpaceId::Data || reason.is_transport_layer() {
863 reason.encode(buf, max_frame_size)
864 } else {
865 frame::ConnectionClose {
866 error_code: TransportErrorCode::APPLICATION_ERROR,
867 frame_type: None,
868 reason: Bytes::new(),
869 }
870 .encode(buf, max_frame_size)
871 }
872 }
873 State::Draining => frame::ConnectionClose {
874 error_code: TransportErrorCode::NO_ERROR,
875 frame_type: None,
876 reason: Bytes::new(),
877 }
878 .encode(buf, max_frame_size),
879 _ => unreachable!(
880 "tried to make a close packet when the connection wasn't closed"
881 ),
882 }
883 }
884 if space_id == self.highest_space {
885 self.close = false;
887 break;
889 } else {
890 space_idx += 1;
894 continue;
895 }
896 }
897
898 if space_id == SpaceId::Data && num_datagrams == 1 {
901 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
902 let mut builder = builder_storage.take().unwrap();
905 trace!("PATH_RESPONSE {:08x} (off-path)", token);
906 buf.write(frame::FrameType::PATH_RESPONSE);
907 buf.write(token);
908 self.stats.frame_tx.path_response += 1;
909 builder.pad_to(MIN_INITIAL_SIZE);
910 builder.finish_and_track(
911 now,
912 self,
913 Some(SentFrames {
914 non_retransmits: true,
915 ..SentFrames::default()
916 }),
917 buf,
918 );
919 self.stats.udp_tx.on_sent(1, buf.len());
920 return Some(Transmit {
921 destination: remote,
922 size: buf.len(),
923 ecn: None,
924 segment_size: None,
925 src_ip: self.local_ip,
926 });
927 }
928 }
929
930 let sent =
931 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
932
933 debug_assert!(
940 !(sent.is_ack_only(&self.streams)
941 && !can_send.acks
942 && can_send.other
943 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
944 && self.datagrams.outgoing.is_empty()),
945 "SendableFrames was {can_send:?}, but only ACKs have been written"
946 );
947 pad_datagram |= sent.requires_padding;
948
949 if sent.largest_acked.is_some() {
950 self.spaces[space_id].pending_acks.acks_sent();
951 self.timers.stop(Timer::MaxAckDelay);
952 }
953
954 sent_frames = Some(sent);
956
957 }
960
961 if let Some(mut builder) = builder_storage {
963 if pad_datagram {
964 builder.pad_to(MIN_INITIAL_SIZE);
965 }
966 let last_packet_number = builder.exact_number;
967 builder.finish_and_track(now, self, sent_frames, buf);
968 self.path
969 .congestion
970 .on_sent(now, buf.len() as u64, last_packet_number);
971 }
972
973 self.app_limited = buf.is_empty() && !congestion_blocked;
974
975 if buf.is_empty() && self.state.is_established() {
977 let space_id = SpaceId::Data;
978 let probe_size = self
979 .path
980 .mtud
981 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
982
983 let buf_capacity = probe_size as usize;
984 buf.reserve(buf_capacity);
985
986 let mut builder = PacketBuilder::new(
987 now,
988 space_id,
989 self.rem_cids.active(),
990 buf,
991 buf_capacity,
992 0,
993 true,
994 self,
995 )?;
996
997 buf.write(frame::FrameType::PING);
999 self.stats.frame_tx.ping += 1;
1000
1001 if self.peer_supports_ack_frequency() {
1003 buf.write(frame::FrameType::IMMEDIATE_ACK);
1004 self.stats.frame_tx.immediate_ack += 1;
1005 }
1006
1007 builder.pad_to(probe_size);
1008 let sent_frames = SentFrames {
1009 non_retransmits: true,
1010 ..Default::default()
1011 };
1012 builder.finish_and_track(now, self, Some(sent_frames), buf);
1013
1014 self.stats.path.sent_plpmtud_probes += 1;
1015 num_datagrams = 1;
1016
1017 trace!(?probe_size, "writing MTUD probe");
1018 }
1019
1020 if buf.is_empty() {
1021 return None;
1022 }
1023
1024 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1025 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1026
1027 self.stats.udp_tx.on_sent(num_datagrams, buf.len());
1028
1029 Some(Transmit {
1030 destination: self.path.remote,
1031 size: buf.len(),
1032 ecn: if self.path.sending_ecn {
1033 Some(EcnCodepoint::Ect0)
1034 } else {
1035 None
1036 },
1037 segment_size: match num_datagrams {
1038 1 => None,
1039 _ => Some(segment_size),
1040 },
1041 src_ip: self.local_ip,
1042 })
1043 }
1044
1045 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1047 if self.spaces[space_id].crypto.is_none()
1048 && (space_id != SpaceId::Data
1049 || self.zero_rtt_crypto.is_none()
1050 || self.side.is_server())
1051 {
1052 return SendableFrames::empty();
1054 }
1055 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1056 if space_id == SpaceId::Data {
1057 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1058 }
1059 can_send
1060 }
1061
1062 pub fn handle_event(&mut self, event: ConnectionEvent) {
1068 use ConnectionEventInner::*;
1069 match event.0 {
1070 Datagram(DatagramConnectionEvent {
1071 now,
1072 remote,
1073 ecn,
1074 first_decode,
1075 remaining,
1076 }) => {
1077 if remote != self.path.remote && !self.side.remote_may_migrate() {
1081 trace!("discarding packet from unrecognized peer {}", remote);
1082 return;
1083 }
1084
1085 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1086
1087 self.stats.udp_rx.datagrams += 1;
1088 self.stats.udp_rx.bytes += first_decode.len() as u64;
1089 let data_len = first_decode.len();
1090
1091 self.handle_decode(now, remote, ecn, first_decode);
1092 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1097
1098 if let Some(data) = remaining {
1099 self.stats.udp_rx.bytes += data.len() as u64;
1100 self.handle_coalesced(now, remote, ecn, data);
1101 }
1102
1103 if was_anti_amplification_blocked {
1104 self.set_loss_detection_timer(now);
1108 }
1109 }
1110 NewIdentifiers(ids, now) => {
1111 self.local_cid_state.new_cids(&ids, now);
1112 ids.into_iter().rev().for_each(|frame| {
1113 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1114 });
1115 if self
1117 .timers
1118 .get(Timer::PushNewCid)
1119 .map_or(true, |x| x <= now)
1120 {
1121 self.reset_cid_retirement();
1122 }
1123 }
1124 }
1125 }
1126
1127 pub fn handle_timeout(&mut self, now: Instant) {
1137 for &timer in &Timer::VALUES {
1138 if !self.timers.is_expired(timer, now) {
1139 continue;
1140 }
1141 self.timers.stop(timer);
1142 trace!(timer = ?timer, "timeout");
1143 match timer {
1144 Timer::Close => {
1145 self.state = State::Drained;
1146 self.endpoint_events.push_back(EndpointEventInner::Drained);
1147 }
1148 Timer::Idle => {
1149 self.kill(ConnectionError::TimedOut);
1150 }
1151 Timer::KeepAlive => {
1152 trace!("sending keep-alive");
1153 self.ping();
1154 }
1155 Timer::LossDetection => {
1156 self.on_loss_detection_timeout(now);
1157 }
1158 Timer::KeyDiscard => {
1159 self.zero_rtt_crypto = None;
1160 self.prev_crypto = None;
1161 }
1162 Timer::PathValidation => {
1163 debug!("path validation failed");
1164 if let Some((_, prev)) = self.prev_path.take() {
1165 self.path = prev;
1166 }
1167 self.path.challenge = None;
1168 self.path.challenge_pending = false;
1169 }
1170 Timer::Pacing => trace!("pacing timer expired"),
1171 Timer::PushNewCid => {
1172 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1174 if !self.state.is_closed() {
1175 trace!(
1176 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1177 self.local_cid_state.retire_prior_to()
1178 );
1179 self.endpoint_events
1180 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1181 }
1182 }
1183 Timer::MaxAckDelay => {
1184 trace!("max ack delay reached");
1185 self.spaces[SpaceId::Data]
1187 .pending_acks
1188 .on_max_ack_delay_timeout()
1189 }
1190 }
1191 }
1192 }
1193
1194 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1206 self.close_inner(
1207 now,
1208 Close::Application(frame::ApplicationClose { error_code, reason }),
1209 )
1210 }
1211
1212 fn close_inner(&mut self, now: Instant, reason: Close) {
1213 let was_closed = self.state.is_closed();
1214 if !was_closed {
1215 self.close_common();
1216 self.set_close_timer(now);
1217 self.close = true;
1218 self.state = State::Closed(state::Closed { reason });
1219 }
1220 }
1221
1222 pub fn datagrams(&mut self) -> Datagrams<'_> {
1224 Datagrams { conn: self }
1225 }
1226
1227 pub fn stats(&self) -> ConnectionStats {
1229 let mut stats = self.stats;
1230 stats.path.rtt = self.path.rtt.get();
1231 stats.path.cwnd = self.path.congestion.window();
1232 stats.path.current_mtu = self.path.mtud.current_mtu();
1233
1234 stats
1235 }
1236
1237 pub fn ping(&mut self) {
1241 self.spaces[self.highest_space].ping_pending = true;
1242 }
1243
1244 pub fn force_key_update(&mut self) {
1248 self.update_keys(None, false);
1249 }
1250
1251 pub fn crypto_session(&self) -> &dyn crypto::Session {
1253 &*self.crypto
1254 }
1255
1256 pub fn is_handshaking(&self) -> bool {
1261 self.state.is_handshake()
1262 }
1263
1264 pub fn is_closed(&self) -> bool {
1272 self.state.is_closed()
1273 }
1274
1275 pub fn is_drained(&self) -> bool {
1280 self.state.is_drained()
1281 }
1282
1283 pub fn accepted_0rtt(&self) -> bool {
1287 self.accepted_0rtt
1288 }
1289
1290 pub fn has_0rtt(&self) -> bool {
1292 self.zero_rtt_enabled
1293 }
1294
1295 pub fn has_pending_retransmits(&self) -> bool {
1297 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1298 }
1299
1300 pub fn side(&self) -> Side {
1302 self.side.side()
1303 }
1304
1305 pub fn remote_address(&self) -> SocketAddr {
1307 self.path.remote
1308 }
1309
1310 pub fn local_ip(&self) -> Option<IpAddr> {
1320 self.local_ip
1321 }
1322
1323 pub fn rtt(&self) -> Duration {
1325 self.path.rtt.get()
1326 }
1327
1328 pub fn congestion_state(&self) -> &dyn Controller {
1330 self.path.congestion.as_ref()
1331 }
1332
1333 pub fn path_changed(&mut self, now: Instant) {
1344 self.path.reset(now, &self.config);
1345 }
1346
1347 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1352 self.streams.set_max_concurrent(dir, count);
1353 let pending = &mut self.spaces[SpaceId::Data].pending;
1356 self.streams.queue_max_stream_id(pending);
1357 }
1358
1359 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1365 self.streams.max_concurrent(dir)
1366 }
1367
1368 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1370 if self.streams.set_receive_window(receive_window) {
1371 self.spaces[SpaceId::Data].pending.max_data = true;
1372 }
1373 }
1374
1375 fn on_ack_received(
1376 &mut self,
1377 now: Instant,
1378 space: SpaceId,
1379 ack: frame::Ack,
1380 ) -> Result<(), TransportError> {
1381 if ack.largest >= self.spaces[space].next_packet_number {
1382 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1383 }
1384 let new_largest = {
1385 let space = &mut self.spaces[space];
1386 if space
1387 .largest_acked_packet
1388 .map_or(true, |pn| ack.largest > pn)
1389 {
1390 space.largest_acked_packet = Some(ack.largest);
1391 if let Some(info) = space.sent_packets.get(&ack.largest) {
1392 space.largest_acked_packet_sent = info.time_sent;
1396 }
1397 true
1398 } else {
1399 false
1400 }
1401 };
1402
1403 let mut newly_acked = ArrayRangeSet::new();
1405 for range in ack.iter() {
1406 self.packet_number_filter.check_ack(space, range.clone())?;
1407 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1408 newly_acked.insert_one(pn);
1409 }
1410 }
1411
1412 if newly_acked.is_empty() {
1413 return Ok(());
1414 }
1415
1416 let mut ack_eliciting_acked = false;
1417 for packet in newly_acked.elts() {
1418 if let Some(info) = self.spaces[space].take(packet) {
1419 if let Some(acked) = info.largest_acked {
1420 self.spaces[space].pending_acks.subtract_below(acked);
1426 }
1427 ack_eliciting_acked |= info.ack_eliciting;
1428
1429 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1431 if mtu_updated {
1432 self.path
1433 .congestion
1434 .on_mtu_update(self.path.mtud.current_mtu());
1435 }
1436
1437 self.ack_frequency.on_acked(packet);
1439
1440 self.on_packet_acked(now, packet, info);
1441 }
1442 }
1443
1444 self.path.congestion.on_end_acks(
1445 now,
1446 self.path.in_flight.bytes,
1447 self.app_limited,
1448 self.spaces[space].largest_acked_packet,
1449 );
1450
1451 if new_largest && ack_eliciting_acked {
1452 let ack_delay = if space != SpaceId::Data {
1453 Duration::from_micros(0)
1454 } else {
1455 cmp::min(
1456 self.ack_frequency.peer_max_ack_delay,
1457 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1458 )
1459 };
1460 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1461 self.path.rtt.update(ack_delay, rtt);
1462 if self.path.first_packet_after_rtt_sample.is_none() {
1463 self.path.first_packet_after_rtt_sample =
1464 Some((space, self.spaces[space].next_packet_number));
1465 }
1466 }
1467
1468 self.detect_lost_packets(now, space, true);
1470
1471 if self.peer_completed_address_validation() {
1472 self.pto_count = 0;
1473 }
1474
1475 if self.path.sending_ecn {
1477 if let Some(ecn) = ack.ecn {
1478 if new_largest {
1483 let sent = self.spaces[space].largest_acked_packet_sent;
1484 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1485 }
1486 } else {
1487 debug!("ECN not acknowledged by peer");
1489 self.path.sending_ecn = false;
1490 }
1491 }
1492
1493 self.set_loss_detection_timer(now);
1494 Ok(())
1495 }
1496
1497 fn process_ecn(
1499 &mut self,
1500 now: Instant,
1501 space: SpaceId,
1502 newly_acked: u64,
1503 ecn: frame::EcnCounts,
1504 largest_sent_time: Instant,
1505 ) {
1506 match self.spaces[space].detect_ecn(newly_acked, ecn) {
1507 Err(e) => {
1508 debug!("halting ECN due to verification failure: {}", e);
1509 self.path.sending_ecn = false;
1510 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
1513 }
1514 Ok(false) => {}
1515 Ok(true) => {
1516 self.stats.path.congestion_events += 1;
1517 self.path
1518 .congestion
1519 .on_congestion_event(now, largest_sent_time, false, 0);
1520 }
1521 }
1522 }
1523
1524 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
1527 self.remove_in_flight(pn, &info);
1528 if info.ack_eliciting && self.path.challenge.is_none() {
1529 self.path.congestion.on_ack(
1532 now,
1533 info.time_sent,
1534 info.size.into(),
1535 self.app_limited,
1536 &self.path.rtt,
1537 );
1538 }
1539
1540 if let Some(retransmits) = info.retransmits.get() {
1542 for (id, _) in retransmits.reset_stream.iter() {
1543 self.streams.reset_acked(*id);
1544 }
1545 }
1546
1547 for frame in info.stream_frames {
1548 self.streams.received_ack_of(frame);
1549 }
1550 }
1551
1552 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
1553 let start = if self.zero_rtt_crypto.is_some() {
1554 now
1555 } else {
1556 self.prev_crypto
1557 .as_ref()
1558 .expect("no previous keys")
1559 .end_packet
1560 .as_ref()
1561 .expect("update not acknowledged yet")
1562 .1
1563 };
1564 self.timers
1565 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
1566 }
1567
1568 fn on_loss_detection_timeout(&mut self, now: Instant) {
1569 if let Some((_, pn_space)) = self.loss_time_and_space() {
1570 self.detect_lost_packets(now, pn_space, false);
1572 self.set_loss_detection_timer(now);
1573 return;
1574 }
1575
1576 let (_, space) = match self.pto_time_and_space(now) {
1577 Some(x) => x,
1578 None => {
1579 error!("PTO expired while unset");
1580 return;
1581 }
1582 };
1583 trace!(
1584 in_flight = self.path.in_flight.bytes,
1585 count = self.pto_count,
1586 ?space,
1587 "PTO fired"
1588 );
1589
1590 let count = match self.path.in_flight.ack_eliciting {
1591 0 => {
1594 debug_assert!(!self.peer_completed_address_validation());
1595 1
1596 }
1597 _ => 2,
1599 };
1600 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
1601 self.pto_count = self.pto_count.saturating_add(1);
1602 self.set_loss_detection_timer(now);
1603 }
1604
1605 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
1606 let mut lost_packets = Vec::<u64>::new();
1607 let mut lost_mtu_probe = None;
1608 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
1609 let rtt = self.path.rtt.conservative();
1610 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
1611
1612 let lost_send_time = now.checked_sub(loss_delay).unwrap();
1614 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
1615 let packet_threshold = self.config.packet_threshold as u64;
1616 let mut size_of_lost_packets = 0u64;
1617
1618 let congestion_period =
1622 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
1623 let mut persistent_congestion_start: Option<Instant> = None;
1624 let mut prev_packet = None;
1625 let mut in_persistent_congestion = false;
1626
1627 let space = &mut self.spaces[pn_space];
1628 space.loss_time = None;
1629
1630 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
1631 if prev_packet != Some(packet.wrapping_sub(1)) {
1632 persistent_congestion_start = None;
1634 }
1635
1636 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
1637 {
1638 if Some(packet) == in_flight_mtu_probe {
1639 lost_mtu_probe = in_flight_mtu_probe;
1642 } else {
1643 lost_packets.push(packet);
1644 size_of_lost_packets += info.size as u64;
1645 if info.ack_eliciting && due_to_ack {
1646 match persistent_congestion_start {
1647 Some(start) if info.time_sent - start > congestion_period => {
1650 in_persistent_congestion = true;
1651 }
1652 None if self
1654 .path
1655 .first_packet_after_rtt_sample
1656 .is_some_and(|x| x < (pn_space, packet)) =>
1657 {
1658 persistent_congestion_start = Some(info.time_sent);
1659 }
1660 _ => {}
1661 }
1662 }
1663 }
1664 } else {
1665 let next_loss_time = info.time_sent + loss_delay;
1666 space.loss_time = Some(
1667 space
1668 .loss_time
1669 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
1670 );
1671 persistent_congestion_start = None;
1672 }
1673
1674 prev_packet = Some(packet);
1675 }
1676
1677 if let Some(largest_lost) = lost_packets.last().cloned() {
1679 let old_bytes_in_flight = self.path.in_flight.bytes;
1680 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
1681 self.lost_packets += lost_packets.len() as u64;
1682 self.stats.path.lost_packets += lost_packets.len() as u64;
1683 self.stats.path.lost_bytes += size_of_lost_packets;
1684 trace!(
1685 "packets lost: {:?}, bytes lost: {}",
1686 lost_packets,
1687 size_of_lost_packets
1688 );
1689
1690 for &packet in &lost_packets {
1691 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
1693 for frame in info.stream_frames {
1694 self.streams.retransmit(frame);
1695 }
1696 self.spaces[pn_space].pending |= info.retransmits;
1697 self.path.mtud.on_non_probe_lost(packet, info.size);
1698 }
1699
1700 if self.path.mtud.black_hole_detected(now) {
1701 self.stats.path.black_holes_detected += 1;
1702 self.path
1703 .congestion
1704 .on_mtu_update(self.path.mtud.current_mtu());
1705 if let Some(max_datagram_size) = self.datagrams().max_size() {
1706 self.datagrams.drop_oversized(max_datagram_size);
1707 }
1708 }
1709
1710 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
1712
1713 if lost_ack_eliciting {
1714 self.stats.path.congestion_events += 1;
1715 self.path.congestion.on_congestion_event(
1716 now,
1717 largest_lost_sent,
1718 in_persistent_congestion,
1719 size_of_lost_packets,
1720 );
1721 }
1722 }
1723
1724 if let Some(packet) = lost_mtu_probe {
1726 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
1728 self.path.mtud.on_probe_lost();
1729 self.stats.path.lost_plpmtud_probes += 1;
1730 }
1731 }
1732
1733 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
1734 SpaceId::iter()
1735 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
1736 .min_by_key(|&(time, _)| time)
1737 }
1738
1739 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
1740 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
1741 let mut duration = self.path.rtt.pto_base() * backoff;
1742
1743 if self.path.in_flight.ack_eliciting == 0 {
1744 debug_assert!(!self.peer_completed_address_validation());
1745 let space = match self.highest_space {
1746 SpaceId::Handshake => SpaceId::Handshake,
1747 _ => SpaceId::Initial,
1748 };
1749 return Some((now + duration, space));
1750 }
1751
1752 let mut result = None;
1753 for space in SpaceId::iter() {
1754 if self.spaces[space].in_flight == 0 {
1755 continue;
1756 }
1757 if space == SpaceId::Data {
1758 if self.is_handshaking() {
1760 return result;
1761 }
1762 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
1764 }
1765 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
1766 Some(time) => time,
1767 None => continue,
1768 };
1769 let pto = last_ack_eliciting + duration;
1770 if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
1771 result = Some((pto, space));
1772 }
1773 }
1774 result
1775 }
1776
1777 fn peer_completed_address_validation(&self) -> bool {
1778 if self.side.is_server() || self.state.is_closed() {
1779 return true;
1780 }
1781 self.spaces[SpaceId::Handshake]
1784 .largest_acked_packet
1785 .is_some()
1786 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
1787 || (self.spaces[SpaceId::Data].crypto.is_some()
1788 && self.spaces[SpaceId::Handshake].crypto.is_none())
1789 }
1790
1791 fn set_loss_detection_timer(&mut self, now: Instant) {
1792 if self.state.is_closed() {
1793 return;
1797 }
1798
1799 if let Some((loss_time, _)) = self.loss_time_and_space() {
1800 self.timers.set(Timer::LossDetection, loss_time);
1802 return;
1803 }
1804
1805 if self.path.anti_amplification_blocked(1) {
1806 self.timers.stop(Timer::LossDetection);
1808 return;
1809 }
1810
1811 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
1812 self.timers.stop(Timer::LossDetection);
1815 return;
1816 }
1817
1818 if let Some((timeout, _)) = self.pto_time_and_space(now) {
1821 self.timers.set(Timer::LossDetection, timeout);
1822 } else {
1823 self.timers.stop(Timer::LossDetection);
1824 }
1825 }
1826
1827 fn pto(&self, space: SpaceId) -> Duration {
1829 let max_ack_delay = match space {
1830 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
1831 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
1832 };
1833 self.path.rtt.pto_base() + max_ack_delay
1834 }
1835
1836 fn on_packet_authenticated(
1837 &mut self,
1838 now: Instant,
1839 space_id: SpaceId,
1840 ecn: Option<EcnCodepoint>,
1841 packet: Option<u64>,
1842 spin: bool,
1843 is_1rtt: bool,
1844 ) {
1845 self.total_authed_packets += 1;
1846 self.reset_keep_alive(now);
1847 self.reset_idle_timeout(now, space_id);
1848 self.permit_idle_reset = true;
1849 self.receiving_ecn |= ecn.is_some();
1850 if let Some(x) = ecn {
1851 let space = &mut self.spaces[space_id];
1852 space.ecn_counters += x;
1853
1854 if x.is_ce() {
1855 space.pending_acks.set_immediate_ack_required();
1856 }
1857 }
1858
1859 let packet = match packet {
1860 Some(x) => x,
1861 None => return,
1862 };
1863 if self.side.is_server() {
1864 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
1865 self.discard_space(now, SpaceId::Initial);
1867 }
1868 if self.zero_rtt_crypto.is_some() && is_1rtt {
1869 self.set_key_discard_timer(now, space_id)
1871 }
1872 }
1873 let space = &mut self.spaces[space_id];
1874 space.pending_acks.insert_one(packet, now);
1875 if packet >= space.rx_packet {
1876 space.rx_packet = packet;
1877 self.spin = self.side.is_client() ^ spin;
1879 }
1880 }
1881
1882 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
1883 let timeout = match self.idle_timeout {
1884 None => return,
1885 Some(dur) => dur,
1886 };
1887 if self.state.is_closed() {
1888 self.timers.stop(Timer::Idle);
1889 return;
1890 }
1891 let dt = cmp::max(timeout, 3 * self.pto(space));
1892 self.timers.set(Timer::Idle, now + dt);
1893 }
1894
1895 fn reset_keep_alive(&mut self, now: Instant) {
1896 let interval = match self.config.keep_alive_interval {
1897 Some(x) if self.state.is_established() => x,
1898 _ => return,
1899 };
1900 self.timers.set(Timer::KeepAlive, now + interval);
1901 }
1902
1903 fn reset_cid_retirement(&mut self) {
1904 if let Some(t) = self.local_cid_state.next_timeout() {
1905 self.timers.set(Timer::PushNewCid, t);
1906 }
1907 }
1908
1909 pub(crate) fn handle_first_packet(
1914 &mut self,
1915 now: Instant,
1916 remote: SocketAddr,
1917 ecn: Option<EcnCodepoint>,
1918 packet_number: u64,
1919 packet: InitialPacket,
1920 remaining: Option<BytesMut>,
1921 ) -> Result<(), ConnectionError> {
1922 let span = trace_span!("first recv");
1923 let _guard = span.enter();
1924 debug_assert!(self.side.is_server());
1925 let len = packet.header_data.len() + packet.payload.len();
1926 self.path.total_recvd = len as u64;
1927
1928 match self.state {
1929 State::Handshake(ref mut state) => {
1930 state.expected_token = packet.header.token.clone();
1931 }
1932 _ => unreachable!("first packet must be delivered in Handshake state"),
1933 }
1934
1935 self.on_packet_authenticated(
1936 now,
1937 SpaceId::Initial,
1938 ecn,
1939 Some(packet_number),
1940 false,
1941 false,
1942 );
1943
1944 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
1945 if let Some(data) = remaining {
1946 self.handle_coalesced(now, remote, ecn, data);
1947 }
1948 Ok(())
1949 }
1950
1951 fn init_0rtt(&mut self) {
1952 let (header, packet) = match self.crypto.early_crypto() {
1953 Some(x) => x,
1954 None => return,
1955 };
1956 if self.side.is_client() {
1957 match self.crypto.transport_parameters() {
1958 Ok(params) => {
1959 let params = params
1960 .expect("crypto layer didn't supply transport parameters with ticket");
1961 let params = TransportParameters {
1963 initial_src_cid: None,
1964 original_dst_cid: None,
1965 preferred_address: None,
1966 retry_src_cid: None,
1967 stateless_reset_token: None,
1968 min_ack_delay: None,
1969 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
1970 max_ack_delay: TransportParameters::default().max_ack_delay,
1971 ..params
1972 };
1973 self.set_peer_params(params);
1974 }
1975 Err(e) => {
1976 error!("session ticket has malformed transport parameters: {}", e);
1977 return;
1978 }
1979 }
1980 }
1981 trace!("0-RTT enabled");
1982 self.zero_rtt_enabled = true;
1983 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
1984 }
1985
1986 fn read_crypto(
1987 &mut self,
1988 space: SpaceId,
1989 crypto: &frame::Crypto,
1990 payload_len: usize,
1991 ) -> Result<(), TransportError> {
1992 let expected = if !self.state.is_handshake() {
1993 SpaceId::Data
1994 } else if self.highest_space == SpaceId::Initial {
1995 SpaceId::Initial
1996 } else {
1997 SpaceId::Handshake
2000 };
2001 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2005
2006 let end = crypto.offset + crypto.data.len() as u64;
2007 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2008 warn!(
2009 "received new {:?} CRYPTO data when expecting {:?}",
2010 space, expected
2011 );
2012 return Err(TransportError::PROTOCOL_VIOLATION(
2013 "new data at unexpected encryption level",
2014 ));
2015 }
2016
2017 let space = &mut self.spaces[space];
2018 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2019 if max > self.config.crypto_buffer_size as u64 {
2020 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2021 }
2022
2023 space
2024 .crypto_stream
2025 .insert(crypto.offset, crypto.data.clone(), payload_len);
2026 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2027 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2028 if self.crypto.read_handshake(&chunk.bytes)? {
2029 self.events.push_back(Event::HandshakeDataReady);
2030 }
2031 }
2032
2033 Ok(())
2034 }
2035
2036 fn write_crypto(&mut self) {
2037 loop {
2038 let space = self.highest_space;
2039 let mut outgoing = Vec::new();
2040 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2041 match space {
2042 SpaceId::Initial => {
2043 self.upgrade_crypto(SpaceId::Handshake, crypto);
2044 }
2045 SpaceId::Handshake => {
2046 self.upgrade_crypto(SpaceId::Data, crypto);
2047 }
2048 _ => unreachable!("got updated secrets during 1-RTT"),
2049 }
2050 }
2051 if outgoing.is_empty() {
2052 if space == self.highest_space {
2053 break;
2054 } else {
2055 continue;
2057 }
2058 }
2059 let offset = self.spaces[space].crypto_offset;
2060 let outgoing = Bytes::from(outgoing);
2061 if let State::Handshake(ref mut state) = self.state {
2062 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2063 state.client_hello = Some(outgoing.clone());
2064 }
2065 }
2066 self.spaces[space].crypto_offset += outgoing.len() as u64;
2067 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2068 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2069 offset,
2070 data: outgoing,
2071 });
2072 }
2073 }
2074
2075 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2077 debug_assert!(
2078 self.spaces[space].crypto.is_none(),
2079 "already reached packet space {space:?}"
2080 );
2081 trace!("{:?} keys ready", space);
2082 if space == SpaceId::Data {
2083 self.next_crypto = Some(
2085 self.crypto
2086 .next_1rtt_keys()
2087 .expect("handshake should be complete"),
2088 );
2089 }
2090
2091 self.spaces[space].crypto = Some(crypto);
2092 debug_assert!(space as usize > self.highest_space as usize);
2093 self.highest_space = space;
2094 if space == SpaceId::Data && self.side.is_client() {
2095 self.zero_rtt_crypto = None;
2097 }
2098 }
2099
2100 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2101 debug_assert!(space_id != SpaceId::Data);
2102 trace!("discarding {:?} keys", space_id);
2103 if space_id == SpaceId::Initial {
2104 if let ConnectionSide::Client { token, .. } = &mut self.side {
2106 *token = Bytes::new();
2107 }
2108 }
2109 let space = &mut self.spaces[space_id];
2110 space.crypto = None;
2111 space.time_of_last_ack_eliciting_packet = None;
2112 space.loss_time = None;
2113 space.in_flight = 0;
2114 let sent_packets = mem::take(&mut space.sent_packets);
2115 for (pn, packet) in sent_packets.into_iter() {
2116 self.remove_in_flight(pn, &packet);
2117 }
2118 self.set_loss_detection_timer(now)
2119 }
2120
2121 fn handle_coalesced(
2122 &mut self,
2123 now: Instant,
2124 remote: SocketAddr,
2125 ecn: Option<EcnCodepoint>,
2126 data: BytesMut,
2127 ) {
2128 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2129 let mut remaining = Some(data);
2130 while let Some(data) = remaining {
2131 match PartialDecode::new(
2132 data,
2133 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2134 &[self.version],
2135 self.endpoint_config.grease_quic_bit,
2136 ) {
2137 Ok((partial_decode, rest)) => {
2138 remaining = rest;
2139 self.handle_decode(now, remote, ecn, partial_decode);
2140 }
2141 Err(e) => {
2142 trace!("malformed header: {}", e);
2143 return;
2144 }
2145 }
2146 }
2147 }
2148
2149 fn handle_decode(
2150 &mut self,
2151 now: Instant,
2152 remote: SocketAddr,
2153 ecn: Option<EcnCodepoint>,
2154 partial_decode: PartialDecode,
2155 ) {
2156 if let Some(decoded) = packet_crypto::unprotect_header(
2157 partial_decode,
2158 &self.spaces,
2159 self.zero_rtt_crypto.as_ref(),
2160 self.peer_params.stateless_reset_token,
2161 ) {
2162 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2163 }
2164 }
2165
2166 fn handle_packet(
2167 &mut self,
2168 now: Instant,
2169 remote: SocketAddr,
2170 ecn: Option<EcnCodepoint>,
2171 packet: Option<Packet>,
2172 stateless_reset: bool,
2173 ) {
2174 self.stats.udp_rx.ios += 1;
2175 if let Some(ref packet) = packet {
2176 trace!(
2177 "got {:?} packet ({} bytes) from {} using id {}",
2178 packet.header.space(),
2179 packet.payload.len() + packet.header_data.len(),
2180 remote,
2181 packet.header.dst_cid(),
2182 );
2183 }
2184
2185 if self.is_handshaking() && remote != self.path.remote {
2186 debug!("discarding packet with unexpected remote during handshake");
2187 return;
2188 }
2189
2190 let was_closed = self.state.is_closed();
2191 let was_drained = self.state.is_drained();
2192
2193 let decrypted = match packet {
2194 None => Err(None),
2195 Some(mut packet) => self
2196 .decrypt_packet(now, &mut packet)
2197 .map(move |number| (packet, number)),
2198 };
2199 let result = match decrypted {
2200 _ if stateless_reset => {
2201 debug!("got stateless reset");
2202 Err(ConnectionError::Reset)
2203 }
2204 Err(Some(e)) => {
2205 warn!("illegal packet: {}", e);
2206 Err(e.into())
2207 }
2208 Err(None) => {
2209 debug!("failed to authenticate packet");
2210 self.authentication_failures += 1;
2211 let integrity_limit = self.spaces[self.highest_space]
2212 .crypto
2213 .as_ref()
2214 .unwrap()
2215 .packet
2216 .local
2217 .integrity_limit();
2218 if self.authentication_failures > integrity_limit {
2219 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2220 } else {
2221 return;
2222 }
2223 }
2224 Ok((packet, number)) => {
2225 let span = match number {
2226 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2227 None => trace_span!("recv", space = ?packet.header.space()),
2228 };
2229 let _guard = span.enter();
2230
2231 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2232 if number.is_some_and(is_duplicate) {
2233 debug!("discarding possible duplicate packet");
2234 return;
2235 } else if self.state.is_handshake() && packet.header.is_short() {
2236 trace!("dropping short packet during handshake");
2238 return;
2239 } else {
2240 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2241 if let State::Handshake(ref hs) = self.state {
2242 if self.side.is_server() && token != &hs.expected_token {
2243 warn!("discarding Initial with invalid retry token");
2247 return;
2248 }
2249 }
2250 }
2251
2252 if !self.state.is_closed() {
2253 let spin = match packet.header {
2254 Header::Short { spin, .. } => spin,
2255 _ => false,
2256 };
2257 self.on_packet_authenticated(
2258 now,
2259 packet.header.space(),
2260 ecn,
2261 number,
2262 spin,
2263 packet.header.is_1rtt(),
2264 );
2265 }
2266 self.process_decrypted_packet(now, remote, number, packet)
2267 }
2268 }
2269 };
2270
2271 if let Err(conn_err) = result {
2273 self.error = Some(conn_err.clone());
2274 self.state = match conn_err {
2275 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2276 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2277 ConnectionError::Reset
2278 | ConnectionError::TransportError(TransportError {
2279 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2280 ..
2281 }) => State::Drained,
2282 ConnectionError::TimedOut => {
2283 unreachable!("timeouts aren't generated by packet processing");
2284 }
2285 ConnectionError::TransportError(err) => {
2286 debug!("closing connection due to transport error: {}", err);
2287 State::closed(err)
2288 }
2289 ConnectionError::VersionMismatch => State::Draining,
2290 ConnectionError::LocallyClosed => {
2291 unreachable!("LocallyClosed isn't generated by packet processing");
2292 }
2293 ConnectionError::CidsExhausted => {
2294 unreachable!("CidsExhausted isn't generated by packet processing");
2295 }
2296 };
2297 }
2298
2299 if !was_closed && self.state.is_closed() {
2300 self.close_common();
2301 if !self.state.is_drained() {
2302 self.set_close_timer(now);
2303 }
2304 }
2305 if !was_drained && self.state.is_drained() {
2306 self.endpoint_events.push_back(EndpointEventInner::Drained);
2307 self.timers.stop(Timer::Close);
2310 }
2311
2312 if let State::Closed(_) = self.state {
2314 self.close = remote == self.path.remote;
2315 }
2316 }
2317
2318 fn process_decrypted_packet(
2319 &mut self,
2320 now: Instant,
2321 remote: SocketAddr,
2322 number: Option<u64>,
2323 packet: Packet,
2324 ) -> Result<(), ConnectionError> {
2325 let state = match self.state {
2326 State::Established => {
2327 match packet.header.space() {
2328 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2329 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2330 _ => {
2331 trace!("discarding unexpected pre-handshake packet");
2332 }
2333 }
2334 return Ok(());
2335 }
2336 State::Closed(_) => {
2337 for result in frame::Iter::new(packet.payload.freeze())? {
2338 let frame = match result {
2339 Ok(frame) => frame,
2340 Err(err) => {
2341 debug!("frame decoding error: {err:?}");
2342 continue;
2343 }
2344 };
2345
2346 if let Frame::Padding = frame {
2347 continue;
2348 };
2349
2350 self.stats.frame_rx.record(&frame);
2351
2352 if let Frame::Close(_) = frame {
2353 trace!("draining");
2354 self.state = State::Draining;
2355 break;
2356 }
2357 }
2358 return Ok(());
2359 }
2360 State::Draining | State::Drained => return Ok(()),
2361 State::Handshake(ref mut state) => state,
2362 };
2363
2364 match packet.header {
2365 Header::Retry {
2366 src_cid: rem_cid, ..
2367 } => {
2368 if self.side.is_server() {
2369 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2370 }
2371
2372 if self.total_authed_packets > 1
2373 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2375 &self.rem_cids.active(),
2376 &packet.header_data,
2377 &packet.payload,
2378 )
2379 {
2380 trace!("discarding invalid Retry");
2381 return Ok(());
2389 }
2390
2391 trace!("retrying with CID {}", rem_cid);
2392 let client_hello = state.client_hello.take().unwrap();
2393 self.retry_src_cid = Some(rem_cid);
2394 self.rem_cids.update_initial_cid(rem_cid);
2395 self.rem_handshake_cid = rem_cid;
2396
2397 let space = &mut self.spaces[SpaceId::Initial];
2398 if let Some(info) = space.take(0) {
2399 self.on_packet_acked(now, 0, info);
2400 };
2401
2402 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
2404 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2405 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2406 crypto_offset: client_hello.len() as u64,
2407 ..PacketSpace::new(now)
2408 };
2409 self.spaces[SpaceId::Initial]
2410 .pending
2411 .crypto
2412 .push_back(frame::Crypto {
2413 offset: 0,
2414 data: client_hello,
2415 });
2416
2417 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2419 for (pn, info) in zero_rtt {
2420 self.remove_in_flight(pn, &info);
2421 self.spaces[SpaceId::Data].pending |= info.retransmits;
2422 }
2423 self.streams.retransmit_all_for_0rtt();
2424
2425 let token_len = packet.payload.len() - 16;
2426 let ConnectionSide::Client { ref mut token, .. } = self.side else {
2427 unreachable!("we already short-circuited if we're server");
2428 };
2429 *token = packet.payload.freeze().split_to(token_len);
2430 self.state = State::Handshake(state::Handshake {
2431 expected_token: Bytes::new(),
2432 rem_cid_set: false,
2433 client_hello: None,
2434 });
2435 Ok(())
2436 }
2437 Header::Long {
2438 ty: LongType::Handshake,
2439 src_cid: rem_cid,
2440 ..
2441 } => {
2442 if rem_cid != self.rem_handshake_cid {
2443 debug!(
2444 "discarding packet with mismatched remote CID: {} != {}",
2445 self.rem_handshake_cid, rem_cid
2446 );
2447 return Ok(());
2448 }
2449 self.on_path_validated();
2450
2451 self.process_early_payload(now, packet)?;
2452 if self.state.is_closed() {
2453 return Ok(());
2454 }
2455
2456 if self.crypto.is_handshaking() {
2457 trace!("handshake ongoing");
2458 return Ok(());
2459 }
2460
2461 if self.side.is_client() {
2462 let params =
2464 self.crypto
2465 .transport_parameters()?
2466 .ok_or_else(|| TransportError {
2467 code: TransportErrorCode::crypto(0x6d),
2468 frame: None,
2469 reason: "transport parameters missing".into(),
2470 })?;
2471
2472 if self.has_0rtt() {
2473 if !self.crypto.early_data_accepted().unwrap() {
2474 debug_assert!(self.side.is_client());
2475 debug!("0-RTT rejected");
2476 self.accepted_0rtt = false;
2477 self.streams.zero_rtt_rejected();
2478
2479 self.spaces[SpaceId::Data].pending = Retransmits::default();
2481
2482 let sent_packets =
2484 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2485 for (pn, packet) in sent_packets {
2486 self.remove_in_flight(pn, &packet);
2487 }
2488 } else {
2489 self.accepted_0rtt = true;
2490 params.validate_resumption_from(&self.peer_params)?;
2491 }
2492 }
2493 if let Some(token) = params.stateless_reset_token {
2494 self.endpoint_events
2495 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2496 }
2497 self.handle_peer_params(params)?;
2498 self.issue_first_cids(now);
2499 } else {
2500 self.spaces[SpaceId::Data].pending.handshake_done = true;
2502 self.discard_space(now, SpaceId::Handshake);
2503 }
2504
2505 self.events.push_back(Event::Connected);
2506 self.state = State::Established;
2507 trace!("established");
2508 Ok(())
2509 }
2510 Header::Initial(InitialHeader {
2511 src_cid: rem_cid, ..
2512 }) => {
2513 if !state.rem_cid_set {
2514 trace!("switching remote CID to {}", rem_cid);
2515 let mut state = state.clone();
2516 self.rem_cids.update_initial_cid(rem_cid);
2517 self.rem_handshake_cid = rem_cid;
2518 self.orig_rem_cid = rem_cid;
2519 state.rem_cid_set = true;
2520 self.state = State::Handshake(state);
2521 } else if rem_cid != self.rem_handshake_cid {
2522 debug!(
2523 "discarding packet with mismatched remote CID: {} != {}",
2524 self.rem_handshake_cid, rem_cid
2525 );
2526 return Ok(());
2527 }
2528
2529 let starting_space = self.highest_space;
2530 self.process_early_payload(now, packet)?;
2531
2532 if self.side.is_server()
2533 && starting_space == SpaceId::Initial
2534 && self.highest_space != SpaceId::Initial
2535 {
2536 let params =
2537 self.crypto
2538 .transport_parameters()?
2539 .ok_or_else(|| TransportError {
2540 code: TransportErrorCode::crypto(0x6d),
2541 frame: None,
2542 reason: "transport parameters missing".into(),
2543 })?;
2544 self.handle_peer_params(params)?;
2545 self.issue_first_cids(now);
2546 self.init_0rtt();
2547 }
2548 Ok(())
2549 }
2550 Header::Long {
2551 ty: LongType::ZeroRtt,
2552 ..
2553 } => {
2554 self.process_payload(now, remote, number.unwrap(), packet)?;
2555 Ok(())
2556 }
2557 Header::VersionNegotiate { .. } => {
2558 if self.total_authed_packets > 1 {
2559 return Ok(());
2560 }
2561 let supported = packet
2562 .payload
2563 .chunks(4)
2564 .any(|x| match <[u8; 4]>::try_from(x) {
2565 Ok(version) => self.version == u32::from_be_bytes(version),
2566 Err(_) => false,
2567 });
2568 if supported {
2569 return Ok(());
2570 }
2571 debug!("remote doesn't support our version");
2572 Err(ConnectionError::VersionMismatch)
2573 }
2574 Header::Short { .. } => unreachable!(
2575 "short packets received during handshake are discarded in handle_packet"
2576 ),
2577 }
2578 }
2579
2580 fn process_early_payload(
2582 &mut self,
2583 now: Instant,
2584 packet: Packet,
2585 ) -> Result<(), TransportError> {
2586 debug_assert_ne!(packet.header.space(), SpaceId::Data);
2587 let payload_len = packet.payload.len();
2588 let mut ack_eliciting = false;
2589 for result in frame::Iter::new(packet.payload.freeze())? {
2590 let frame = result?;
2591 let span = match frame {
2592 Frame::Padding => continue,
2593 _ => Some(trace_span!("frame", ty = %frame.ty())),
2594 };
2595
2596 self.stats.frame_rx.record(&frame);
2597
2598 let _guard = span.as_ref().map(|x| x.enter());
2599 ack_eliciting |= frame.is_ack_eliciting();
2600
2601 match frame {
2603 Frame::Padding | Frame::Ping => {}
2604 Frame::Crypto(frame) => {
2605 self.read_crypto(packet.header.space(), &frame, payload_len)?;
2606 }
2607 Frame::Ack(ack) => {
2608 self.on_ack_received(now, packet.header.space(), ack)?;
2609 }
2610 Frame::Close(reason) => {
2611 self.error = Some(reason.into());
2612 self.state = State::Draining;
2613 return Ok(());
2614 }
2615 _ => {
2616 let mut err =
2617 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
2618 err.frame = Some(frame.ty());
2619 return Err(err);
2620 }
2621 }
2622 }
2623
2624 if ack_eliciting {
2625 self.spaces[packet.header.space()]
2627 .pending_acks
2628 .set_immediate_ack_required();
2629 }
2630
2631 self.write_crypto();
2632 Ok(())
2633 }
2634
2635 fn process_payload(
2636 &mut self,
2637 now: Instant,
2638 remote: SocketAddr,
2639 number: u64,
2640 packet: Packet,
2641 ) -> Result<(), TransportError> {
2642 let payload = packet.payload.freeze();
2643 let mut is_probing_packet = true;
2644 let mut close = None;
2645 let payload_len = payload.len();
2646 let mut ack_eliciting = false;
2647 let mut migration_observed_addr = None;
2650 for result in frame::Iter::new(payload)? {
2651 let frame = result?;
2652 let span = match frame {
2653 Frame::Padding => continue,
2654 _ => Some(trace_span!("frame", ty = %frame.ty())),
2655 };
2656
2657 self.stats.frame_rx.record(&frame);
2658 match &frame {
2661 Frame::Crypto(f) => {
2662 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
2663 }
2664 Frame::Stream(f) => {
2665 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
2666 }
2667 Frame::Datagram(f) => {
2668 trace!(len = f.data.len(), "got datagram frame");
2669 }
2670 f => {
2671 trace!("got frame {:?}", f);
2672 }
2673 }
2674
2675 let _guard = span.as_ref().map(|x| x.enter());
2676 if packet.header.is_0rtt() {
2677 match frame {
2678 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
2679 return Err(TransportError::PROTOCOL_VIOLATION(
2680 "illegal frame type in 0-RTT",
2681 ));
2682 }
2683 _ => {}
2684 }
2685 }
2686 ack_eliciting |= frame.is_ack_eliciting();
2687
2688 match frame {
2690 Frame::Padding
2691 | Frame::PathChallenge(_)
2692 | Frame::PathResponse(_)
2693 | Frame::NewConnectionId(_)
2694 | Frame::ObservedAddr(_) => {}
2695 _ => {
2696 is_probing_packet = false;
2697 }
2698 }
2699 match frame {
2700 Frame::Crypto(frame) => {
2701 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
2702 }
2703 Frame::Stream(frame) => {
2704 if self.streams.received(frame, payload_len)?.should_transmit() {
2705 self.spaces[SpaceId::Data].pending.max_data = true;
2706 }
2707 }
2708 Frame::Ack(ack) => {
2709 self.on_ack_received(now, SpaceId::Data, ack)?;
2710 }
2711 Frame::Padding | Frame::Ping => {}
2712 Frame::Close(reason) => {
2713 close = Some(reason);
2714 }
2715 Frame::PathChallenge(token) => {
2716 self.path_responses.push(number, token, remote);
2717 if remote == self.path.remote {
2718 match self.peer_supports_ack_frequency() {
2721 true => self.immediate_ack(),
2722 false => self.ping(),
2723 }
2724 }
2725 }
2726 Frame::PathResponse(token) => {
2727 if self.path.challenge == Some(token) && remote == self.path.remote {
2728 trace!("new path validated");
2729 self.timers.stop(Timer::PathValidation);
2730 self.path.challenge = None;
2731 self.path.validated = true;
2732 if let Some((_, ref mut prev_path)) = self.prev_path {
2733 prev_path.challenge = None;
2734 prev_path.challenge_pending = false;
2735 }
2736 } else {
2737 debug!(token, "ignoring invalid PATH_RESPONSE");
2738 }
2739 }
2740 Frame::MaxData(bytes) => {
2741 self.streams.received_max_data(bytes);
2742 }
2743 Frame::MaxStreamData { id, offset } => {
2744 self.streams.received_max_stream_data(id, offset)?;
2745 }
2746 Frame::MaxStreams { dir, count } => {
2747 self.streams.received_max_streams(dir, count)?;
2748 }
2749 Frame::ResetStream(frame) => {
2750 if self.streams.received_reset(frame)?.should_transmit() {
2751 self.spaces[SpaceId::Data].pending.max_data = true;
2752 }
2753 }
2754 Frame::DataBlocked { offset } => {
2755 debug!(offset, "peer claims to be blocked at connection level");
2756 }
2757 Frame::StreamDataBlocked { id, offset } => {
2758 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
2759 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
2760 return Err(TransportError::STREAM_STATE_ERROR(
2761 "STREAM_DATA_BLOCKED on send-only stream",
2762 ));
2763 }
2764 debug!(
2765 stream = %id,
2766 offset, "peer claims to be blocked at stream level"
2767 );
2768 }
2769 Frame::StreamsBlocked { dir, limit } => {
2770 if limit > MAX_STREAM_COUNT {
2771 return Err(TransportError::FRAME_ENCODING_ERROR(
2772 "unrepresentable stream limit",
2773 ));
2774 }
2775 debug!(
2776 "peer claims to be blocked opening more than {} {} streams",
2777 limit, dir
2778 );
2779 }
2780 Frame::StopSending(frame::StopSending { id, error_code }) => {
2781 if id.initiator() != self.side.side() {
2782 if id.dir() == Dir::Uni {
2783 debug!("got STOP_SENDING on recv-only {}", id);
2784 return Err(TransportError::STREAM_STATE_ERROR(
2785 "STOP_SENDING on recv-only stream",
2786 ));
2787 }
2788 } else if self.streams.is_local_unopened(id) {
2789 return Err(TransportError::STREAM_STATE_ERROR(
2790 "STOP_SENDING on unopened stream",
2791 ));
2792 }
2793 self.streams.received_stop_sending(id, error_code);
2794 }
2795 Frame::RetireConnectionId { sequence } => {
2796 let allow_more_cids = self
2797 .local_cid_state
2798 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
2799 self.endpoint_events
2800 .push_back(EndpointEventInner::RetireConnectionId(
2801 now,
2802 sequence,
2803 allow_more_cids,
2804 ));
2805 }
2806 Frame::NewConnectionId(frame) => {
2807 trace!(
2808 sequence = frame.sequence,
2809 id = %frame.id,
2810 retire_prior_to = frame.retire_prior_to,
2811 );
2812 if self.rem_cids.active().is_empty() {
2813 return Err(TransportError::PROTOCOL_VIOLATION(
2814 "NEW_CONNECTION_ID when CIDs aren't in use",
2815 ));
2816 }
2817 if frame.retire_prior_to > frame.sequence {
2818 return Err(TransportError::PROTOCOL_VIOLATION(
2819 "NEW_CONNECTION_ID retiring unissued CIDs",
2820 ));
2821 }
2822
2823 use crate::cid_queue::InsertError;
2824 match self.rem_cids.insert(frame) {
2825 Ok(None) => {}
2826 Ok(Some((retired, reset_token))) => {
2827 let pending_retired =
2828 &mut self.spaces[SpaceId::Data].pending.retire_cids;
2829 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
2832 if (pending_retired.len() as u64)
2835 .saturating_add(retired.end.saturating_sub(retired.start))
2836 > MAX_PENDING_RETIRED_CIDS
2837 {
2838 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
2839 "queued too many retired CIDs",
2840 ));
2841 }
2842 pending_retired.extend(retired);
2843 self.set_reset_token(reset_token);
2844 }
2845 Err(InsertError::ExceedsLimit) => {
2846 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
2847 }
2848 Err(InsertError::Retired) => {
2849 trace!("discarding already-retired");
2850 self.spaces[SpaceId::Data]
2854 .pending
2855 .retire_cids
2856 .push(frame.sequence);
2857 continue;
2858 }
2859 };
2860
2861 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
2862 self.update_rem_cid();
2865 }
2866 }
2867 Frame::NewToken(NewToken { token }) => {
2868 let ConnectionSide::Client {
2869 token_store,
2870 server_name,
2871 ..
2872 } = &self.side
2873 else {
2874 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
2875 };
2876 if token.is_empty() {
2877 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
2878 }
2879 trace!("got new token");
2880 token_store.insert(server_name, token);
2881 }
2882 Frame::Datagram(datagram) => {
2883 if self
2884 .datagrams
2885 .received(datagram, &self.config.datagram_receive_buffer_size)?
2886 {
2887 self.events.push_back(Event::DatagramReceived);
2888 }
2889 }
2890 Frame::AckFrequency(ack_frequency) => {
2891 let space = &mut self.spaces[SpaceId::Data];
2893
2894 if !self
2895 .ack_frequency
2896 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
2897 {
2898 continue;
2900 }
2901
2902 if let Some(timeout) = space
2905 .pending_acks
2906 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
2907 {
2908 self.timers.set(Timer::MaxAckDelay, timeout);
2909 }
2910 }
2911 Frame::ImmediateAck => {
2912 self.spaces[SpaceId::Data]
2914 .pending_acks
2915 .set_immediate_ack_required();
2916 }
2917 Frame::HandshakeDone => {
2918 if self.side.is_server() {
2919 return Err(TransportError::PROTOCOL_VIOLATION(
2920 "client sent HANDSHAKE_DONE",
2921 ));
2922 }
2923 if self.spaces[SpaceId::Handshake].crypto.is_some() {
2924 self.discard_space(now, SpaceId::Handshake);
2925 }
2926 }
2927 Frame::ObservedAddr(observed) => {
2928 if !self
2930 .peer_params
2931 .address_discovery_role
2932 .should_report(&self.config.address_discovery_role)
2933 {
2934 return Err(TransportError::PROTOCOL_VIOLATION(
2935 "received OBSERVED_ADDRESS frame when not negotiated",
2936 ));
2937 }
2938 if packet.header.space() != SpaceId::Data {
2940 return Err(TransportError::PROTOCOL_VIOLATION(
2941 "OBSERVED_ADDRESS frame outside data space",
2942 ));
2943 }
2944
2945 if remote == self.path.remote {
2946 if let Some(updated) = self.path.update_observed_addr_report(observed) {
2947 self.events.push_back(Event::ObservedAddr(updated));
2948 }
2949 } else {
2950 migration_observed_addr = Some(observed)
2952 }
2953 }
2954 }
2955 }
2956
2957 let space = &mut self.spaces[SpaceId::Data];
2958 if space
2959 .pending_acks
2960 .packet_received(now, number, ack_eliciting, &space.dedup)
2961 {
2962 self.timers
2963 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
2964 }
2965
2966 let pending = &mut self.spaces[SpaceId::Data].pending;
2971 self.streams.queue_max_stream_id(pending);
2972
2973 if let Some(reason) = close {
2974 self.error = Some(reason.into());
2975 self.state = State::Draining;
2976 self.close = true;
2977 }
2978
2979 if remote != self.path.remote
2980 && !is_probing_packet
2981 && number == self.spaces[SpaceId::Data].rx_packet
2982 {
2983 let ConnectionSide::Server { ref server_config } = self.side else {
2984 panic!("packets from unknown remote should be dropped by clients");
2985 };
2986 debug_assert!(
2987 server_config.migration,
2988 "migration-initiating packets should have been dropped immediately"
2989 );
2990 self.migrate(now, remote, migration_observed_addr);
2991 self.update_rem_cid();
2993 self.spin = false;
2994 }
2995
2996 Ok(())
2997 }
2998
2999 fn migrate(&mut self, now: Instant, remote: SocketAddr, observed_addr: Option<ObservedAddr>) {
3000 trace!(%remote, "migration initiated");
3001 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3005 PathData::from_previous(remote, &self.path, now)
3006 } else {
3007 let peer_max_udp_payload_size =
3008 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3009 .unwrap_or(u16::MAX);
3010 PathData::new(
3011 remote,
3012 self.allow_mtud,
3013 Some(peer_max_udp_payload_size),
3014 now,
3015 &self.config,
3016 )
3017 };
3018 new_path.last_observed_addr_report = self.path.last_observed_addr_report.clone();
3019 if let Some(report) = observed_addr {
3020 if let Some(updated) = new_path.update_observed_addr_report(report) {
3021 self.events.push_back(Event::ObservedAddr(updated));
3022 }
3023 }
3024 new_path.challenge = Some(self.rng.gen());
3025 new_path.challenge_pending = true;
3026 let prev_pto = self.pto(SpaceId::Data);
3027
3028 let mut prev = mem::replace(&mut self.path, new_path);
3029 if prev.challenge.is_none() {
3031 prev.challenge = Some(self.rng.gen());
3032 prev.challenge_pending = true;
3033 self.prev_path = Some((self.rem_cids.active(), prev));
3036 }
3037
3038 self.timers.set(
3039 Timer::PathValidation,
3040 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3041 );
3042 }
3043
3044 pub fn local_address_changed(&mut self) {
3046 self.update_rem_cid();
3047 self.ping();
3048 }
3049
3050 fn update_rem_cid(&mut self) {
3052 let (reset_token, retired) = match self.rem_cids.next() {
3053 Some(x) => x,
3054 None => return,
3055 };
3056
3057 self.spaces[SpaceId::Data]
3059 .pending
3060 .retire_cids
3061 .extend(retired);
3062 self.set_reset_token(reset_token);
3063 }
3064
3065 fn set_reset_token(&mut self, reset_token: ResetToken) {
3066 self.endpoint_events
3067 .push_back(EndpointEventInner::ResetToken(
3068 self.path.remote,
3069 reset_token,
3070 ));
3071 self.peer_params.stateless_reset_token = Some(reset_token);
3072 }
3073
3074 fn issue_first_cids(&mut self, now: Instant) {
3076 if self.local_cid_state.cid_len() == 0 {
3077 return;
3078 }
3079
3080 let n = self.peer_params.issue_cids_limit() - 1;
3082 self.endpoint_events
3083 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3084 }
3085
3086 fn populate_packet(
3087 &mut self,
3088 now: Instant,
3089 space_id: SpaceId,
3090 buf: &mut Vec<u8>,
3091 max_size: usize,
3092 pn: u64,
3093 ) -> SentFrames {
3094 let mut sent = SentFrames::default();
3095 let space = &mut self.spaces[space_id];
3096 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3097 space.pending_acks.maybe_ack_non_eliciting();
3098
3099 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3101 buf.write(frame::FrameType::HANDSHAKE_DONE);
3102 sent.retransmits.get_or_create().handshake_done = true;
3103 self.stats.frame_tx.handshake_done =
3105 self.stats.frame_tx.handshake_done.saturating_add(1);
3106 }
3107
3108 let mut send_observed_address =
3110 |space_id: SpaceId,
3111 buf: &mut Vec<u8>,
3112 max_size: usize,
3113 space: &mut PacketSpace,
3114 sent: &mut SentFrames,
3115 stats: &mut ConnectionStats,
3116 skip_sent_check: bool| {
3117 let send_allowed = self
3121 .config
3122 .address_discovery_role
3123 .should_report(&self.peer_params.address_discovery_role);
3124 let send_required =
3125 space.pending.observed_addr || !self.path.observed_addr_sent || skip_sent_check;
3126 if space_id != SpaceId::Data || !send_allowed || !send_required {
3127 return;
3128 }
3129
3130 let observed =
3131 frame::ObservedAddr::new(self.path.remote, self.next_observed_addr_seq_no);
3132
3133 if buf.len() + observed.size() < max_size {
3134 observed.write(buf);
3135
3136 self.next_observed_addr_seq_no =
3137 self.next_observed_addr_seq_no.saturating_add(1u8);
3138 self.path.observed_addr_sent = true;
3139
3140 stats.frame_tx.observed_addr += 1;
3141 sent.retransmits.get_or_create().observed_addr = true;
3142 space.pending.observed_addr = false;
3143 }
3144 };
3145 send_observed_address(
3146 space_id,
3147 buf,
3148 max_size,
3149 space,
3150 &mut sent,
3151 &mut self.stats,
3152 false,
3153 );
3154
3155 if mem::replace(&mut space.ping_pending, false) {
3157 trace!("PING");
3158 buf.write(frame::FrameType::PING);
3159 sent.non_retransmits = true;
3160 self.stats.frame_tx.ping += 1;
3161 }
3162
3163 if mem::replace(&mut space.immediate_ack_pending, false) {
3165 trace!("IMMEDIATE_ACK");
3166 buf.write(frame::FrameType::IMMEDIATE_ACK);
3167 sent.non_retransmits = true;
3168 self.stats.frame_tx.immediate_ack += 1;
3169 }
3170
3171 if space.pending_acks.can_send() {
3173 Self::populate_acks(
3174 now,
3175 self.receiving_ecn,
3176 &mut sent,
3177 space,
3178 buf,
3179 &mut self.stats,
3180 );
3181 }
3182
3183 if mem::replace(&mut space.pending.ack_frequency, false) {
3185 let sequence_number = self.ack_frequency.next_sequence_number();
3186
3187 let config = self.config.ack_frequency_config.as_ref().unwrap();
3189
3190 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3192 self.path.rtt.get(),
3193 config,
3194 &self.peer_params,
3195 );
3196
3197 trace!(?max_ack_delay, "ACK_FREQUENCY");
3198
3199 frame::AckFrequency {
3200 sequence: sequence_number,
3201 ack_eliciting_threshold: config.ack_eliciting_threshold,
3202 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3203 reordering_threshold: config.reordering_threshold,
3204 }
3205 .encode(buf);
3206
3207 sent.retransmits.get_or_create().ack_frequency = true;
3208
3209 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3210 self.stats.frame_tx.ack_frequency += 1;
3211 }
3212
3213 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3215 if let Some(token) = self.path.challenge {
3217 self.path.challenge_pending = false;
3219 sent.non_retransmits = true;
3220 sent.requires_padding = true;
3221 trace!("PATH_CHALLENGE {:08x}", token);
3222 buf.write(frame::FrameType::PATH_CHALLENGE);
3223 buf.write(token);
3224
3225 send_observed_address(
3226 space_id,
3227 buf,
3228 max_size,
3229 space,
3230 &mut sent,
3231 &mut self.stats,
3232 true,
3233 );
3234 }
3235 }
3236
3237 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3239 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3240 sent.non_retransmits = true;
3241 sent.requires_padding = true;
3242 trace!("PATH_RESPONSE {:08x}", token);
3243 buf.write(frame::FrameType::PATH_RESPONSE);
3244 buf.write(token);
3245 self.stats.frame_tx.path_response += 1;
3246
3247 send_observed_address(
3251 space_id,
3252 buf,
3253 max_size,
3254 space,
3255 &mut sent,
3256 &mut self.stats,
3257 true,
3258 );
3259 }
3260 }
3261
3262 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3264 let mut frame = match space.pending.crypto.pop_front() {
3265 Some(x) => x,
3266 None => break,
3267 };
3268
3269 let max_crypto_data_size = max_size
3274 - buf.len()
3275 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3277 - 2; let len = frame
3280 .data
3281 .len()
3282 .min(2usize.pow(14) - 1)
3283 .min(max_crypto_data_size);
3284
3285 let data = frame.data.split_to(len);
3286 let truncated = frame::Crypto {
3287 offset: frame.offset,
3288 data,
3289 };
3290 trace!(
3291 "CRYPTO: off {} len {}",
3292 truncated.offset,
3293 truncated.data.len()
3294 );
3295 truncated.encode(buf);
3296 self.stats.frame_tx.crypto += 1;
3297 sent.retransmits.get_or_create().crypto.push_back(truncated);
3298 if !frame.data.is_empty() {
3299 frame.offset += len as u64;
3300 space.pending.crypto.push_front(frame);
3301 }
3302 }
3303
3304 if space_id == SpaceId::Data {
3305 self.streams.write_control_frames(
3306 buf,
3307 &mut space.pending,
3308 &mut sent.retransmits,
3309 &mut self.stats.frame_tx,
3310 max_size,
3311 );
3312 }
3313
3314 while buf.len() + 44 < max_size {
3316 let issued = match space.pending.new_cids.pop() {
3317 Some(x) => x,
3318 None => break,
3319 };
3320 trace!(
3321 sequence = issued.sequence,
3322 id = %issued.id,
3323 "NEW_CONNECTION_ID"
3324 );
3325 frame::NewConnectionId {
3326 sequence: issued.sequence,
3327 retire_prior_to: self.local_cid_state.retire_prior_to(),
3328 id: issued.id,
3329 reset_token: issued.reset_token,
3330 }
3331 .encode(buf);
3332 sent.retransmits.get_or_create().new_cids.push(issued);
3333 self.stats.frame_tx.new_connection_id += 1;
3334 }
3335
3336 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3338 let seq = match space.pending.retire_cids.pop() {
3339 Some(x) => x,
3340 None => break,
3341 };
3342 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3343 buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3344 buf.write_var(seq);
3345 sent.retransmits.get_or_create().retire_cids.push(seq);
3346 self.stats.frame_tx.retire_connection_id += 1;
3347 }
3348
3349 let mut sent_datagrams = false;
3351 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3352 match self.datagrams.write(buf, max_size) {
3353 true => {
3354 sent_datagrams = true;
3355 sent.non_retransmits = true;
3356 self.stats.frame_tx.datagram += 1;
3357 }
3358 false => break,
3359 }
3360 }
3361 if self.datagrams.send_blocked && sent_datagrams {
3362 self.events.push_back(Event::DatagramsUnblocked);
3363 self.datagrams.send_blocked = false;
3364 }
3365
3366 while let Some(remote_addr) = space.pending.new_tokens.pop() {
3368 debug_assert_eq!(space_id, SpaceId::Data);
3369 let ConnectionSide::Server { server_config } = &self.side else {
3370 panic!("NEW_TOKEN frames should not be enqueued by clients");
3371 };
3372
3373 if remote_addr != self.path.remote {
3374 continue;
3379 }
3380
3381 let token = Token::new(
3382 TokenPayload::Validation {
3383 ip: remote_addr.ip(),
3384 issued: server_config.time_source.now(),
3385 },
3386 &mut self.rng,
3387 );
3388 let new_token = NewToken {
3389 token: token.encode(&*server_config.token_key).into(),
3390 };
3391
3392 if buf.len() + new_token.size() >= max_size {
3393 space.pending.new_tokens.push(remote_addr);
3394 break;
3395 }
3396
3397 new_token.encode(buf);
3398 sent.retransmits
3399 .get_or_create()
3400 .new_tokens
3401 .push(remote_addr);
3402 self.stats.frame_tx.new_token += 1;
3403 }
3404
3405 if space_id == SpaceId::Data {
3407 sent.stream_frames =
3408 self.streams
3409 .write_stream_frames(buf, max_size, self.config.send_fairness);
3410 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
3411 }
3412
3413 sent
3414 }
3415
3416 fn populate_acks(
3421 now: Instant,
3422 receiving_ecn: bool,
3423 sent: &mut SentFrames,
3424 space: &mut PacketSpace,
3425 buf: &mut Vec<u8>,
3426 stats: &mut ConnectionStats,
3427 ) {
3428 debug_assert!(!space.pending_acks.ranges().is_empty());
3429
3430 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
3432 let ecn = if receiving_ecn {
3433 Some(&space.ecn_counters)
3434 } else {
3435 None
3436 };
3437 sent.largest_acked = space.pending_acks.ranges().max();
3438
3439 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
3440
3441 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
3443 let delay = delay_micros >> ack_delay_exp.into_inner();
3444
3445 trace!(
3446 "ACK {:?}, Delay = {}us",
3447 space.pending_acks.ranges(),
3448 delay_micros
3449 );
3450
3451 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
3452 stats.frame_tx.acks += 1;
3453 }
3454
3455 fn close_common(&mut self) {
3456 trace!("connection closed");
3457 for &timer in &Timer::VALUES {
3458 self.timers.stop(timer);
3459 }
3460 }
3461
3462 fn set_close_timer(&mut self, now: Instant) {
3463 self.timers
3464 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
3465 }
3466
3467 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
3469 if Some(self.orig_rem_cid) != params.initial_src_cid
3470 || (self.side.is_client()
3471 && (Some(self.initial_dst_cid) != params.original_dst_cid
3472 || self.retry_src_cid != params.retry_src_cid))
3473 {
3474 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
3475 "CID authentication failure",
3476 ));
3477 }
3478
3479 self.set_peer_params(params);
3480
3481 Ok(())
3482 }
3483
3484 fn set_peer_params(&mut self, params: TransportParameters) {
3485 self.streams.set_params(¶ms);
3486 self.idle_timeout =
3487 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
3488 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
3489 if let Some(ref info) = params.preferred_address {
3490 self.rem_cids.insert(frame::NewConnectionId {
3491 sequence: 1,
3492 id: info.connection_id,
3493 reset_token: info.stateless_reset_token,
3494 retire_prior_to: 0,
3495 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
3496 }
3497 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
3498 self.peer_params = params;
3499 self.path.mtud.on_peer_max_udp_payload_size_received(
3500 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
3501 );
3502 }
3503
3504 fn decrypt_packet(
3505 &mut self,
3506 now: Instant,
3507 packet: &mut Packet,
3508 ) -> Result<Option<u64>, Option<TransportError>> {
3509 let result = packet_crypto::decrypt_packet_body(
3510 packet,
3511 &self.spaces,
3512 self.zero_rtt_crypto.as_ref(),
3513 self.key_phase,
3514 self.prev_crypto.as_ref(),
3515 self.next_crypto.as_ref(),
3516 )?;
3517
3518 let result = match result {
3519 Some(r) => r,
3520 None => return Ok(None),
3521 };
3522
3523 if result.outgoing_key_update_acked {
3524 if let Some(prev) = self.prev_crypto.as_mut() {
3525 prev.end_packet = Some((result.number, now));
3526 self.set_key_discard_timer(now, packet.header.space());
3527 }
3528 }
3529
3530 if result.incoming_key_update {
3531 trace!("key update authenticated");
3532 self.update_keys(Some((result.number, now)), true);
3533 self.set_key_discard_timer(now, packet.header.space());
3534 }
3535
3536 Ok(Some(result.number))
3537 }
3538
3539 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
3540 trace!("executing key update");
3541 let new = self
3545 .crypto
3546 .next_1rtt_keys()
3547 .expect("only called for `Data` packets");
3548 self.key_phase_size = new
3549 .local
3550 .confidentiality_limit()
3551 .saturating_sub(KEY_UPDATE_MARGIN);
3552 let old = mem::replace(
3553 &mut self.spaces[SpaceId::Data]
3554 .crypto
3555 .as_mut()
3556 .unwrap() .packet,
3558 mem::replace(self.next_crypto.as_mut().unwrap(), new),
3559 );
3560 self.spaces[SpaceId::Data].sent_with_keys = 0;
3561 self.prev_crypto = Some(PrevCrypto {
3562 crypto: old,
3563 end_packet,
3564 update_unacked: remote,
3565 });
3566 self.key_phase = !self.key_phase;
3567 }
3568
3569 fn peer_supports_ack_frequency(&self) -> bool {
3570 self.peer_params.min_ack_delay.is_some()
3571 }
3572
3573 pub(crate) fn immediate_ack(&mut self) {
3578 self.spaces[self.highest_space].immediate_ack_pending = true;
3579 }
3580
3581 #[cfg(test)]
3583 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
3584 let (first_decode, remaining) = match &event.0 {
3585 ConnectionEventInner::Datagram(DatagramConnectionEvent {
3586 first_decode,
3587 remaining,
3588 ..
3589 }) => (first_decode, remaining),
3590 _ => return None,
3591 };
3592
3593 if remaining.is_some() {
3594 panic!("Packets should never be coalesced in tests");
3595 }
3596
3597 let decrypted_header = packet_crypto::unprotect_header(
3598 first_decode.clone(),
3599 &self.spaces,
3600 self.zero_rtt_crypto.as_ref(),
3601 self.peer_params.stateless_reset_token,
3602 )?;
3603
3604 let mut packet = decrypted_header.packet?;
3605 packet_crypto::decrypt_packet_body(
3606 &mut packet,
3607 &self.spaces,
3608 self.zero_rtt_crypto.as_ref(),
3609 self.key_phase,
3610 self.prev_crypto.as_ref(),
3611 self.next_crypto.as_ref(),
3612 )
3613 .ok()?;
3614
3615 Some(packet.payload.to_vec())
3616 }
3617
3618 #[cfg(test)]
3621 pub(crate) fn bytes_in_flight(&self) -> u64 {
3622 self.path.in_flight.bytes
3623 }
3624
3625 #[cfg(test)]
3627 pub(crate) fn congestion_window(&self) -> u64 {
3628 self.path
3629 .congestion
3630 .window()
3631 .saturating_sub(self.path.in_flight.bytes)
3632 }
3633
3634 #[cfg(test)]
3636 pub(crate) fn is_idle(&self) -> bool {
3637 Timer::VALUES
3638 .iter()
3639 .filter(|&&t| t != Timer::KeepAlive && t != Timer::PushNewCid)
3640 .filter_map(|&t| Some((t, self.timers.get(t)?)))
3641 .min_by_key(|&(_, time)| time)
3642 .map_or(true, |(timer, _)| timer == Timer::Idle)
3643 }
3644
3645 #[cfg(test)]
3647 pub(crate) fn lost_packets(&self) -> u64 {
3648 self.lost_packets
3649 }
3650
3651 #[cfg(test)]
3653 pub(crate) fn using_ecn(&self) -> bool {
3654 self.path.sending_ecn
3655 }
3656
3657 #[cfg(test)]
3659 pub(crate) fn total_recvd(&self) -> u64 {
3660 self.path.total_recvd
3661 }
3662
3663 #[cfg(test)]
3664 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
3665 self.local_cid_state.active_seq()
3666 }
3667
3668 #[cfg(test)]
3671 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
3672 let n = self.local_cid_state.assign_retire_seq(v);
3673 self.endpoint_events
3674 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3675 }
3676
3677 #[cfg(test)]
3679 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
3680 self.rem_cids.active_seq()
3681 }
3682
3683 #[cfg(test)]
3685 pub(crate) fn path_mtu(&self) -> u16 {
3686 self.path.current_mtu()
3687 }
3688
3689 fn can_send_1rtt(&self, max_size: usize) -> bool {
3693 self.streams.can_send_stream_data()
3694 || self.path.challenge_pending
3695 || self
3696 .prev_path
3697 .as_ref()
3698 .is_some_and(|(_, x)| x.challenge_pending)
3699 || !self.path_responses.is_empty()
3700 || self
3701 .datagrams
3702 .outgoing
3703 .front()
3704 .is_some_and(|x| x.size(true) <= max_size)
3705 }
3706
3707 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
3709 for path in [&mut self.path]
3711 .into_iter()
3712 .chain(self.prev_path.as_mut().map(|(_, data)| data))
3713 {
3714 if path.remove_in_flight(pn, packet) {
3715 return;
3716 }
3717 }
3718 }
3719
3720 fn kill(&mut self, reason: ConnectionError) {
3722 self.close_common();
3723 self.error = Some(reason);
3724 self.state = State::Drained;
3725 self.endpoint_events.push_back(EndpointEventInner::Drained);
3726 }
3727
3728 pub fn current_mtu(&self) -> u16 {
3732 self.path.current_mtu()
3733 }
3734
3735 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
3742 let pn_len = match pn {
3743 Some(pn) => PacketNumber::new(
3744 pn,
3745 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
3746 )
3747 .len(),
3748 None => 4,
3750 };
3751
3752 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
3754 }
3755
3756 fn tag_len_1rtt(&self) -> usize {
3757 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
3758 Some(crypto) => Some(&*crypto.packet.local),
3759 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
3760 };
3761 key.map_or(16, |x| x.tag_len())
3765 }
3766
3767 fn on_path_validated(&mut self) {
3769 self.path.validated = true;
3770 let ConnectionSide::Server { server_config } = &self.side else {
3771 return;
3772 };
3773 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
3774 new_tokens.clear();
3775 for _ in 0..server_config.validation_token.sent {
3776 new_tokens.push(self.path.remote);
3777 }
3778 }
3779}
3780
3781impl fmt::Debug for Connection {
3782 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3783 f.debug_struct("Connection")
3784 .field("handshake_cid", &self.handshake_cid)
3785 .finish()
3786 }
3787}
3788
3789enum ConnectionSide {
3791 Client {
3792 token: Bytes,
3794 token_store: Arc<dyn TokenStore>,
3795 server_name: String,
3796 },
3797 Server {
3798 server_config: Arc<ServerConfig>,
3799 },
3800}
3801
3802impl ConnectionSide {
3803 fn remote_may_migrate(&self) -> bool {
3804 match self {
3805 Self::Server { server_config } => server_config.migration,
3806 Self::Client { .. } => false,
3807 }
3808 }
3809
3810 fn is_client(&self) -> bool {
3811 self.side().is_client()
3812 }
3813
3814 fn is_server(&self) -> bool {
3815 self.side().is_server()
3816 }
3817
3818 fn side(&self) -> Side {
3819 match *self {
3820 Self::Client { .. } => Side::Client,
3821 Self::Server { .. } => Side::Server,
3822 }
3823 }
3824}
3825
3826impl From<SideArgs> for ConnectionSide {
3827 fn from(side: SideArgs) -> Self {
3828 match side {
3829 SideArgs::Client {
3830 token_store,
3831 server_name,
3832 } => Self::Client {
3833 token: token_store.take(&server_name).unwrap_or_default(),
3834 token_store,
3835 server_name,
3836 },
3837 SideArgs::Server {
3838 server_config,
3839 pref_addr_cid: _,
3840 path_validated: _,
3841 } => Self::Server { server_config },
3842 }
3843 }
3844}
3845
3846pub(crate) enum SideArgs {
3848 Client {
3849 token_store: Arc<dyn TokenStore>,
3850 server_name: String,
3851 },
3852 Server {
3853 server_config: Arc<ServerConfig>,
3854 pref_addr_cid: Option<ConnectionId>,
3855 path_validated: bool,
3856 },
3857}
3858
3859impl SideArgs {
3860 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
3861 match *self {
3862 Self::Client { .. } => None,
3863 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
3864 }
3865 }
3866
3867 pub(crate) fn path_validated(&self) -> bool {
3868 match *self {
3869 Self::Client { .. } => true,
3870 Self::Server { path_validated, .. } => path_validated,
3871 }
3872 }
3873
3874 pub(crate) fn side(&self) -> Side {
3875 match *self {
3876 Self::Client { .. } => Side::Client,
3877 Self::Server { .. } => Side::Server,
3878 }
3879 }
3880}
3881
3882#[derive(Debug, Error, Clone, PartialEq, Eq)]
3884pub enum ConnectionError {
3885 #[error("peer doesn't implement any supported version")]
3887 VersionMismatch,
3888 #[error(transparent)]
3890 TransportError(#[from] TransportError),
3891 #[error("aborted by peer: {0}")]
3893 ConnectionClosed(frame::ConnectionClose),
3894 #[error("closed by peer: {0}")]
3896 ApplicationClosed(frame::ApplicationClose),
3897 #[error("reset by peer")]
3899 Reset,
3900 #[error("timed out")]
3906 TimedOut,
3907 #[error("closed")]
3909 LocallyClosed,
3910 #[error("CIDs exhausted")]
3914 CidsExhausted,
3915}
3916
3917impl From<Close> for ConnectionError {
3918 fn from(x: Close) -> Self {
3919 match x {
3920 Close::Connection(reason) => Self::ConnectionClosed(reason),
3921 Close::Application(reason) => Self::ApplicationClosed(reason),
3922 }
3923 }
3924}
3925
3926impl From<ConnectionError> for io::Error {
3928 fn from(x: ConnectionError) -> Self {
3929 use ConnectionError::*;
3930 let kind = match x {
3931 TimedOut => io::ErrorKind::TimedOut,
3932 Reset => io::ErrorKind::ConnectionReset,
3933 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
3934 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
3935 io::ErrorKind::Other
3936 }
3937 };
3938 Self::new(kind, x)
3939 }
3940}
3941
3942#[allow(unreachable_pub)] #[derive(Clone)]
3944pub enum State {
3945 Handshake(state::Handshake),
3946 Established,
3947 Closed(state::Closed),
3948 Draining,
3949 Drained,
3951}
3952
3953impl State {
3954 fn closed<R: Into<Close>>(reason: R) -> Self {
3955 Self::Closed(state::Closed {
3956 reason: reason.into(),
3957 })
3958 }
3959
3960 fn is_handshake(&self) -> bool {
3961 matches!(*self, Self::Handshake(_))
3962 }
3963
3964 fn is_established(&self) -> bool {
3965 matches!(*self, Self::Established)
3966 }
3967
3968 fn is_closed(&self) -> bool {
3969 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
3970 }
3971
3972 fn is_drained(&self) -> bool {
3973 matches!(*self, Self::Drained)
3974 }
3975}
3976
3977mod state {
3978 use super::*;
3979
3980 #[allow(unreachable_pub)] #[derive(Clone)]
3982 pub struct Handshake {
3983 pub(super) rem_cid_set: bool,
3987 pub(super) expected_token: Bytes,
3991 pub(super) client_hello: Option<Bytes>,
3995 }
3996
3997 #[allow(unreachable_pub)] #[derive(Clone)]
3999 pub struct Closed {
4000 pub(super) reason: Close,
4001 }
4002}
4003
4004#[derive(Debug)]
4006pub enum Event {
4007 HandshakeDataReady,
4009 Connected,
4011 ConnectionLost {
4015 reason: ConnectionError,
4017 },
4018 Stream(StreamEvent),
4020 DatagramReceived,
4022 DatagramsUnblocked,
4024 ObservedAddr(SocketAddr),
4026}
4027
4028fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
4029 if x > y {
4030 x - y
4031 } else {
4032 Duration::ZERO
4033 }
4034}
4035
4036fn get_max_ack_delay(params: &TransportParameters) -> Duration {
4037 Duration::from_micros(params.max_ack_delay.0 * 1000)
4038}
4039
4040const MAX_BACKOFF_EXPONENT: u32 = 16;
4042
4043const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
4051
4052const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
4058 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
4059
4060const MAX_TRANSMIT_SEGMENTS: usize = 10;
4066
4067const KEY_UPDATE_MARGIN: u64 = 10_000;
4071
4072#[derive(Default)]
4073struct SentFrames {
4074 retransmits: ThinRetransmits,
4075 largest_acked: Option<u64>,
4076 stream_frames: StreamMetaVec,
4077 non_retransmits: bool,
4079 requires_padding: bool,
4080}
4081
4082impl SentFrames {
4083 fn is_ack_only(&self, streams: &StreamsState) -> bool {
4085 self.largest_acked.is_some()
4086 && !self.non_retransmits
4087 && self.stream_frames.is_empty()
4088 && self.retransmits.is_empty(streams)
4089 }
4090}
4091
4092fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
4100 match (x, y) {
4101 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
4102 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
4103 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
4104 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
4105 }
4106}
4107
4108#[cfg(test)]
4109mod tests {
4110 use super::*;
4111
4112 #[test]
4113 fn negotiate_max_idle_timeout_commutative() {
4114 let test_params = [
4115 (None, None, None),
4116 (None, Some(VarInt(0)), None),
4117 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
4118 (Some(VarInt(0)), Some(VarInt(0)), None),
4119 (
4120 Some(VarInt(2)),
4121 Some(VarInt(0)),
4122 Some(Duration::from_millis(2)),
4123 ),
4124 (
4125 Some(VarInt(1)),
4126 Some(VarInt(4)),
4127 Some(Duration::from_millis(1)),
4128 ),
4129 ];
4130
4131 for (left, right, result) in test_params {
4132 assert_eq!(negotiate_max_idle_timeout(left, right), result);
4133 assert_eq!(negotiate_max_idle_timeout(right, left), result);
4134 }
4135 }
4136}