iroh_quinn_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::VecDeque,
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12use rand::{rngs::StdRng, Rng, SeedableRng};
13use thiserror::Error;
14use tracing::{debug, error, trace, trace_span, warn};
15
16use crate::{
17    cid_generator::ConnectionIdGenerator,
18    cid_queue::CidQueue,
19    coding::BufMutExt,
20    config::{ServerConfig, TransportConfig},
21    crypto::{self, KeyPair, Keys, PacketKey},
22    frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
23    packet::{
24        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
25        PacketNumber, PartialDecode, SpaceId,
26    },
27    range_set::ArrayRangeSet,
28    shared::{
29        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
30        EndpointEvent, EndpointEventInner,
31    },
32    token::{ResetToken, Token, TokenPayload},
33    transport_parameters::TransportParameters,
34    Dir, Duration, EndpointConfig, Frame, Instant, Side, StreamId, TokenStore, Transmit,
35    TransportError, TransportErrorCode, VarInt, INITIAL_MTU, MAX_CID_SIZE, MAX_STREAM_COUNT,
36    MIN_INITIAL_SIZE, TIMER_GRANULARITY,
37};
38
39mod ack_frequency;
40use ack_frequency::AckFrequencyState;
41
42mod assembler;
43pub use assembler::Chunk;
44
45mod cid_state;
46use cid_state::CidState;
47
48mod datagrams;
49use datagrams::DatagramState;
50pub use datagrams::{Datagrams, SendDatagramError};
51
52mod mtud;
53mod pacing;
54
55mod packet_builder;
56use packet_builder::PacketBuilder;
57
58mod packet_crypto;
59use packet_crypto::{PrevCrypto, ZeroRttCrypto};
60
61mod paths;
62pub use paths::RttEstimator;
63use paths::{PathData, PathResponses};
64
65mod send_buffer;
66
67mod spaces;
68#[cfg(fuzzing)]
69pub use spaces::Retransmits;
70#[cfg(not(fuzzing))]
71use spaces::Retransmits;
72use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
73
74mod stats;
75pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
76
77mod streams;
78#[cfg(fuzzing)]
79pub use streams::StreamsState;
80#[cfg(not(fuzzing))]
81use streams::StreamsState;
82pub use streams::{
83    BytesSource, Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream,
84    SendStream, ShouldTransmit, StreamEvent, Streams, WriteError, Written,
85};
86
87mod timer;
88use crate::congestion::Controller;
89use timer::{Timer, TimerTable};
90
91/// Protocol state and logic for a single QUIC connection
92///
93/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
94/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
95/// expects timeouts through various methods. A number of simple getter methods are exposed
96/// to allow callers to inspect some of the connection state.
97///
98/// `Connection` has roughly 4 types of methods:
99///
100/// - A. Simple getters, taking `&self`
101/// - B. Handlers for incoming events from the network or system, named `handle_*`.
102/// - C. State machine mutators, for incoming commands from the application. For convenience we
103///   refer to this as "performing I/O" below, however as per the design of this library none of the
104///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
105///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
106/// - D. Polling functions for outgoing events or actions for the caller to
107///   take, named `poll_*`.
108///
109/// The simplest way to use this API correctly is to call (B) and (C) whenever
110/// appropriate, then after each of those calls, as soon as feasible call all
111/// polling methods (D) and deal with their outputs appropriately, e.g. by
112/// passing it to the application or by making a system-level I/O call. You
113/// should call the polling functions in this order:
114///
115/// 1. [`poll_transmit`](Self::poll_transmit)
116/// 2. [`poll_timeout`](Self::poll_timeout)
117/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
118/// 4. [`poll`](Self::poll)
119///
120/// Currently the only actual dependency is from (2) to (1), however additional
121/// dependencies may be added in future, so the above order is recommended.
122///
123/// (A) may be called whenever desired.
124///
125/// Care should be made to ensure that the input events represent monotonically
126/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
127/// with events of the same [`Instant`] may be interleaved in any order with a
128/// call to [`handle_event`](Self::handle_event) at that same instant; however
129/// events or timeouts with different instants must not be interleaved.
130pub struct Connection {
131    endpoint_config: Arc<EndpointConfig>,
132    config: Arc<TransportConfig>,
133    rng: StdRng,
134    crypto: Box<dyn crypto::Session>,
135    /// The CID we initially chose, for use during the handshake
136    handshake_cid: ConnectionId,
137    /// The CID the peer initially chose, for use during the handshake
138    rem_handshake_cid: ConnectionId,
139    /// The "real" local IP address which was was used to receive the initial packet.
140    /// This is only populated for the server case, and if known
141    local_ip: Option<IpAddr>,
142    path: PathData,
143    /// Whether MTU detection is supported in this environment
144    allow_mtud: bool,
145    prev_path: Option<(ConnectionId, PathData)>,
146    state: State,
147    side: ConnectionSide,
148    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
149    zero_rtt_enabled: bool,
150    /// Set if 0-RTT is supported, then cleared when no longer needed.
151    zero_rtt_crypto: Option<ZeroRttCrypto>,
152    key_phase: bool,
153    /// How many packets are in the current key phase. Used only for `Data` space.
154    key_phase_size: u64,
155    /// Transport parameters set by the peer
156    peer_params: TransportParameters,
157    /// Source ConnectionId of the first packet received from the peer
158    orig_rem_cid: ConnectionId,
159    /// Destination ConnectionId sent by the client on the first Initial
160    initial_dst_cid: ConnectionId,
161    /// The value that the server included in the Source Connection ID field of a Retry packet, if
162    /// one was received
163    retry_src_cid: Option<ConnectionId>,
164    /// Total number of outgoing packets that have been deemed lost
165    lost_packets: u64,
166    events: VecDeque<Event>,
167    endpoint_events: VecDeque<EndpointEventInner>,
168    /// Whether the spin bit is in use for this connection
169    spin_enabled: bool,
170    /// Outgoing spin bit state
171    spin: bool,
172    /// Packet number spaces: initial, handshake, 1-RTT
173    spaces: [PacketSpace; 3],
174    /// Highest usable packet number space
175    highest_space: SpaceId,
176    /// 1-RTT keys used prior to a key update
177    prev_crypto: Option<PrevCrypto>,
178    /// 1-RTT keys to be used for the next key update
179    ///
180    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
181    /// spoofing key updates.
182    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
183    accepted_0rtt: bool,
184    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
185    permit_idle_reset: bool,
186    /// Negotiated idle timeout
187    idle_timeout: Option<Duration>,
188    timers: TimerTable,
189    /// Number of packets received which could not be authenticated
190    authentication_failures: u64,
191    /// Why the connection was lost, if it has been
192    error: Option<ConnectionError>,
193    /// Identifies Data-space packet numbers to skip. Not used in earlier spaces.
194    packet_number_filter: PacketNumberFilter,
195
196    //
197    // Queued non-retransmittable 1-RTT data
198    //
199    /// Responses to PATH_CHALLENGE frames
200    path_responses: PathResponses,
201    close: bool,
202
203    //
204    // ACK frequency
205    //
206    ack_frequency: AckFrequencyState,
207
208    //
209    // Loss Detection
210    //
211    /// The number of times a PTO has been sent without receiving an ack.
212    pto_count: u32,
213
214    //
215    // Congestion Control
216    //
217    /// Whether the most recently received packet had an ECN codepoint set
218    receiving_ecn: bool,
219    /// Number of packets authenticated
220    total_authed_packets: u64,
221    /// Whether the last `poll_transmit` call yielded no data because there was
222    /// no outgoing application data.
223    app_limited: bool,
224
225    //
226    // ObservedAddr
227    //
228    /// Sequence number for the next observed address frame sent to the peer.
229    next_observed_addr_seq_no: VarInt,
230
231    streams: StreamsState,
232    /// Surplus remote CIDs for future use on new paths
233    rem_cids: CidQueue,
234    // Attributes of CIDs generated by local peer
235    local_cid_state: CidState,
236    /// State of the unreliable datagram extension
237    datagrams: DatagramState,
238    /// Connection level statistics
239    stats: ConnectionStats,
240    /// QUIC version used for the connection.
241    version: u32,
242}
243
244impl Connection {
245    pub(crate) fn new(
246        endpoint_config: Arc<EndpointConfig>,
247        config: Arc<TransportConfig>,
248        init_cid: ConnectionId,
249        loc_cid: ConnectionId,
250        rem_cid: ConnectionId,
251        remote: SocketAddr,
252        local_ip: Option<IpAddr>,
253        crypto: Box<dyn crypto::Session>,
254        cid_gen: &dyn ConnectionIdGenerator,
255        now: Instant,
256        version: u32,
257        allow_mtud: bool,
258        rng_seed: [u8; 32],
259        side_args: SideArgs,
260    ) -> Self {
261        let pref_addr_cid = side_args.pref_addr_cid();
262        let path_validated = side_args.path_validated();
263        let connection_side = ConnectionSide::from(side_args);
264        let side = connection_side.side();
265        let initial_space = PacketSpace {
266            crypto: Some(crypto.initial_keys(&init_cid, side)),
267            ..PacketSpace::new(now)
268        };
269        let state = State::Handshake(state::Handshake {
270            rem_cid_set: side.is_server(),
271            expected_token: Bytes::new(),
272            client_hello: None,
273        });
274        let mut rng = StdRng::from_seed(rng_seed);
275        let mut this = Self {
276            endpoint_config,
277            crypto,
278            handshake_cid: loc_cid,
279            rem_handshake_cid: rem_cid,
280            local_cid_state: CidState::new(
281                cid_gen.cid_len(),
282                cid_gen.cid_lifetime(),
283                now,
284                if pref_addr_cid.is_some() { 2 } else { 1 },
285            ),
286            path: PathData::new(remote, allow_mtud, None, now, &config),
287            allow_mtud,
288            local_ip,
289            prev_path: None,
290            state,
291            side: connection_side,
292            zero_rtt_enabled: false,
293            zero_rtt_crypto: None,
294            key_phase: false,
295            // A small initial key phase size ensures peers that don't handle key updates correctly
296            // fail sooner rather than later. It's okay for both peers to do this, as the first one
297            // to perform an update will reset the other's key phase size in `update_keys`, and a
298            // simultaneous key update by both is just like a regular key update with a really fast
299            // response. Inspired by quic-go's similar behavior of performing the first key update
300            // at the 100th short-header packet.
301            key_phase_size: rng.gen_range(10..1000),
302            peer_params: TransportParameters::default(),
303            orig_rem_cid: rem_cid,
304            initial_dst_cid: init_cid,
305            retry_src_cid: None,
306            lost_packets: 0,
307            events: VecDeque::new(),
308            endpoint_events: VecDeque::new(),
309            spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
310            spin: false,
311            spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
312            highest_space: SpaceId::Initial,
313            prev_crypto: None,
314            next_crypto: None,
315            accepted_0rtt: false,
316            permit_idle_reset: true,
317            idle_timeout: match config.max_idle_timeout {
318                None | Some(VarInt(0)) => None,
319                Some(dur) => Some(Duration::from_millis(dur.0)),
320            },
321            timers: TimerTable::default(),
322            authentication_failures: 0,
323            error: None,
324            #[cfg(test)]
325            packet_number_filter: match config.deterministic_packet_numbers {
326                false => PacketNumberFilter::new(&mut rng),
327                true => PacketNumberFilter::disabled(),
328            },
329            #[cfg(not(test))]
330            packet_number_filter: PacketNumberFilter::new(&mut rng),
331
332            path_responses: PathResponses::default(),
333            close: false,
334
335            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
336                &TransportParameters::default(),
337            )),
338
339            pto_count: 0,
340
341            app_limited: false,
342            receiving_ecn: false,
343            total_authed_packets: 0,
344
345            next_observed_addr_seq_no: 0u32.into(),
346
347            streams: StreamsState::new(
348                side,
349                config.max_concurrent_uni_streams,
350                config.max_concurrent_bidi_streams,
351                config.send_window,
352                config.receive_window,
353                config.stream_receive_window,
354            ),
355            datagrams: DatagramState::default(),
356            config,
357            rem_cids: CidQueue::new(rem_cid),
358            rng,
359            stats: ConnectionStats::default(),
360            version,
361        };
362        if path_validated {
363            this.on_path_validated();
364        }
365        if side.is_client() {
366            // Kick off the connection
367            this.write_crypto();
368            this.init_0rtt();
369        }
370        this
371    }
372
373    /// Returns the next time at which `handle_timeout` should be called
374    ///
375    /// The value returned may change after:
376    /// - the application performed some I/O on the connection
377    /// - a call was made to `handle_event`
378    /// - a call to `poll_transmit` returned `Some`
379    /// - a call was made to `handle_timeout`
380    #[must_use]
381    pub fn poll_timeout(&mut self) -> Option<Instant> {
382        self.timers.next_timeout()
383    }
384
385    /// Returns application-facing events
386    ///
387    /// Connections should be polled for events after:
388    /// - a call was made to `handle_event`
389    /// - a call was made to `handle_timeout`
390    #[must_use]
391    pub fn poll(&mut self) -> Option<Event> {
392        if let Some(x) = self.events.pop_front() {
393            return Some(x);
394        }
395
396        if let Some(event) = self.streams.poll() {
397            return Some(Event::Stream(event));
398        }
399
400        if let Some(err) = self.error.take() {
401            return Some(Event::ConnectionLost { reason: err });
402        }
403
404        None
405    }
406
407    /// Return endpoint-facing events
408    #[must_use]
409    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
410        self.endpoint_events.pop_front().map(EndpointEvent)
411    }
412
413    /// Provide control over streams
414    #[must_use]
415    pub fn streams(&mut self) -> Streams<'_> {
416        Streams {
417            state: &mut self.streams,
418            conn_state: &self.state,
419        }
420    }
421
422    /// Provide control over streams
423    #[must_use]
424    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
425        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
426        RecvStream {
427            id,
428            state: &mut self.streams,
429            pending: &mut self.spaces[SpaceId::Data].pending,
430        }
431    }
432
433    /// Provide control over streams
434    #[must_use]
435    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
436        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
437        SendStream {
438            id,
439            state: &mut self.streams,
440            pending: &mut self.spaces[SpaceId::Data].pending,
441            conn_state: &self.state,
442        }
443    }
444
445    /// Returns packets to transmit
446    ///
447    /// Connections should be polled for transmit after:
448    /// - the application performed some I/O on the connection
449    /// - a call was made to `handle_event`
450    /// - a call was made to `handle_timeout`
451    ///
452    /// `max_datagrams` specifies how many datagrams can be returned inside a
453    /// single Transmit using GSO. This must be at least 1.
454    #[must_use]
455    pub fn poll_transmit(
456        &mut self,
457        now: Instant,
458        max_datagrams: usize,
459        buf: &mut Vec<u8>,
460    ) -> Option<Transmit> {
461        assert!(max_datagrams != 0);
462        let max_datagrams = match self.config.enable_segmentation_offload {
463            false => 1,
464            true => max_datagrams.min(MAX_TRANSMIT_SEGMENTS),
465        };
466
467        let mut num_datagrams = 0;
468        // Position in `buf` of the first byte of the current UDP datagram. When coalescing QUIC
469        // packets, this can be earlier than the start of the current QUIC packet.
470        let mut datagram_start = 0;
471        let mut segment_size = usize::from(self.path.current_mtu());
472
473        // Send PATH_CHALLENGE for a previous path if necessary
474        if let Some((prev_cid, ref mut prev_path)) = self.prev_path {
475            if prev_path.challenge_pending {
476                prev_path.challenge_pending = false;
477                let token = prev_path
478                    .challenge
479                    .expect("previous path challenge pending without token");
480                let destination = prev_path.remote;
481                debug_assert_eq!(
482                    self.highest_space,
483                    SpaceId::Data,
484                    "PATH_CHALLENGE queued without 1-RTT keys"
485                );
486                buf.reserve(MIN_INITIAL_SIZE as usize);
487
488                let buf_capacity = buf.capacity();
489
490                // Use the previous CID to avoid linking the new path with the previous path. We
491                // don't bother accounting for possible retirement of that prev_cid because this is
492                // sent once, immediately after migration, when the CID is known to be valid. Even
493                // if a post-migration packet caused the CID to be retired, it's fair to pretend
494                // this is sent first.
495                let mut builder = PacketBuilder::new(
496                    now,
497                    SpaceId::Data,
498                    prev_cid,
499                    buf,
500                    buf_capacity,
501                    0,
502                    false,
503                    self,
504                )?;
505                trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
506                buf.write(frame::FrameType::PATH_CHALLENGE);
507                buf.write(token);
508                self.stats.frame_tx.path_challenge += 1;
509
510                // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
511                // to at least the smallest allowed maximum datagram size of 1200 bytes,
512                // unless the anti-amplification limit for the path does not permit
513                // sending a datagram of this size
514                builder.pad_to(MIN_INITIAL_SIZE);
515
516                builder.finish(self, buf);
517                self.stats.udp_tx.on_sent(1, buf.len());
518                return Some(Transmit {
519                    destination,
520                    size: buf.len(),
521                    ecn: None,
522                    segment_size: None,
523                    src_ip: self.local_ip,
524                });
525            }
526        }
527
528        // If we need to send a probe, make sure we have something to send.
529        for space in SpaceId::iter() {
530            let request_immediate_ack =
531                space == SpaceId::Data && self.peer_supports_ack_frequency();
532            self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
533        }
534
535        // Check whether we need to send a close message
536        let close = match self.state {
537            State::Drained => {
538                self.app_limited = true;
539                return None;
540            }
541            State::Draining | State::Closed(_) => {
542                // self.close is only reset once the associated packet had been
543                // encoded successfully
544                if !self.close {
545                    self.app_limited = true;
546                    return None;
547                }
548                true
549            }
550            _ => false,
551        };
552
553        // Check whether we need to send an ACK_FREQUENCY frame
554        if let Some(config) = &self.config.ack_frequency_config {
555            self.spaces[SpaceId::Data].pending.ack_frequency = self
556                .ack_frequency
557                .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
558                && self.highest_space == SpaceId::Data
559                && self.peer_supports_ack_frequency();
560        }
561
562        // Reserving capacity can provide more capacity than we asked for. However, we are not
563        // allowed to write more than `segment_size`. Therefore the maximum capacity is tracked
564        // separately.
565        let mut buf_capacity = 0;
566
567        let mut coalesce = true;
568        let mut builder_storage: Option<PacketBuilder> = None;
569        let mut sent_frames = None;
570        let mut pad_datagram = false;
571        let mut congestion_blocked = false;
572
573        // Iterate over all spaces and find data to send
574        let mut space_idx = 0;
575        let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
576        // This loop will potentially spend multiple iterations in the same `SpaceId`,
577        // so we cannot trivially rewrite it to take advantage of `SpaceId::iter()`.
578        while space_idx < spaces.len() {
579            let space_id = spaces[space_idx];
580            // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed to
581            // be able to send an individual frame at least this large in the next 1-RTT
582            // packet. This could be generalized to support every space, but it's only needed to
583            // handle large fixed-size frames, which only exist in 1-RTT (application datagrams). We
584            // don't account for coalesced packets potentially occupying space because frames can
585            // always spill into the next datagram.
586            let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
587            let frame_space_1rtt =
588                segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
589
590            // Is there data or a close message to send in this space?
591            let can_send = self.space_can_send(space_id, frame_space_1rtt);
592            if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
593                space_idx += 1;
594                continue;
595            }
596
597            let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
598                || self.spaces[space_id].ping_pending
599                || self.spaces[space_id].immediate_ack_pending;
600            if space_id == SpaceId::Data {
601                ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
602            }
603
604            // Can we append more data into the current buffer?
605            // It is not safe to assume that `buf.len()` is the end of the data,
606            // since the last packet might not have been finished.
607            let buf_end = if let Some(builder) = &builder_storage {
608                buf.len().max(builder.min_size) + builder.tag_len
609            } else {
610                buf.len()
611            };
612
613            let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
614                crypto.packet.local.tag_len()
615            } else if space_id == SpaceId::Data {
616                self.zero_rtt_crypto.as_ref().expect(
617                    "sending packets in the application data space requires known 0-RTT or 1-RTT keys",
618                ).packet.tag_len()
619            } else {
620                unreachable!("tried to send {:?} packet without keys", space_id)
621            };
622            if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
623                // We need to send 1 more datagram and extend the buffer for that.
624
625                // Is 1 more datagram allowed?
626                if buf_capacity >= segment_size * max_datagrams {
627                    // No more datagrams allowed
628                    break;
629                }
630
631                // Anti-amplification is only based on `total_sent`, which gets
632                // updated at the end of this method. Therefore we pass the amount
633                // of bytes for datagrams that are already created, as well as 1 byte
634                // for starting another datagram. If there is any anti-amplification
635                // budget left, we always allow a full MTU to be sent
636                // (see https://github.com/quinn-rs/quinn/issues/1082)
637                if self
638                    .path
639                    .anti_amplification_blocked(segment_size as u64 * num_datagrams + 1)
640                {
641                    trace!("blocked by anti-amplification");
642                    break;
643                }
644
645                // Congestion control and pacing checks
646                // Tail loss probes must not be blocked by congestion, or a deadlock could arise
647                if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
648                    // Assume the current packet will get padded to fill the segment
649                    let untracked_bytes = if let Some(builder) = &builder_storage {
650                        buf_capacity - builder.partial_encode.start
651                    } else {
652                        0
653                    } as u64;
654                    debug_assert!(untracked_bytes <= segment_size as u64);
655
656                    let bytes_to_send = segment_size as u64 + untracked_bytes;
657                    if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
658                        space_idx += 1;
659                        congestion_blocked = true;
660                        // We continue instead of breaking here in order to avoid
661                        // blocking loss probes queued for higher spaces.
662                        trace!("blocked by congestion control");
663                        continue;
664                    }
665
666                    // Check whether the next datagram is blocked by pacing
667                    let smoothed_rtt = self.path.rtt.get();
668                    if let Some(delay) = self.path.pacing.delay(
669                        smoothed_rtt,
670                        bytes_to_send,
671                        self.path.current_mtu(),
672                        self.path.congestion.window(),
673                        now,
674                    ) {
675                        self.timers.set(Timer::Pacing, delay);
676                        congestion_blocked = true;
677                        // Loss probes should be subject to pacing, even though
678                        // they are not congestion controlled.
679                        trace!("blocked by pacing");
680                        break;
681                    }
682                }
683
684                // Finish current packet
685                if let Some(mut builder) = builder_storage.take() {
686                    if pad_datagram {
687                        builder.pad_to(MIN_INITIAL_SIZE);
688                    }
689
690                    if num_datagrams > 1 {
691                        // If too many padding bytes would be required to continue the GSO batch
692                        // after this packet, end the GSO batch here. Ensures that fixed-size frames
693                        // with heterogeneous sizes (e.g. application datagrams) won't inadvertently
694                        // waste large amounts of bandwidth. The exact threshold is a bit arbitrary
695                        // and might benefit from further tuning, though there's no universally
696                        // optimal value.
697                        //
698                        // Additionally, if this datagram is a loss probe and `segment_size` is
699                        // larger than `INITIAL_MTU`, then padding it to `segment_size` to continue
700                        // the GSO batch would risk failure to recover from a reduction in path
701                        // MTU. Loss probes are the only packets for which we might grow
702                        // `buf_capacity` by less than `segment_size`.
703                        const MAX_PADDING: usize = 16;
704                        let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
705                            - datagram_start
706                            + builder.tag_len;
707                        if packet_len_unpadded + MAX_PADDING < segment_size
708                            || datagram_start + segment_size > buf_capacity
709                        {
710                            trace!(
711                                "GSO truncated by demand for {} padding bytes or loss probe",
712                                segment_size - packet_len_unpadded
713                            );
714                            builder_storage = Some(builder);
715                            break;
716                        }
717
718                        // Pad the current datagram to GSO segment size so it can be included in the
719                        // GSO batch.
720                        builder.pad_to(segment_size as u16);
721                    }
722
723                    builder.finish_and_track(now, self, sent_frames.take(), buf);
724
725                    if num_datagrams == 1 {
726                        // Set the segment size for this GSO batch to the size of the first UDP
727                        // datagram in the batch. Larger data that cannot be fragmented
728                        // (e.g. application datagrams) will be included in a future batch. When
729                        // sending large enough volumes of data for GSO to be useful, we expect
730                        // packet sizes to usually be consistent, e.g. populated by max-size STREAM
731                        // frames or uniformly sized datagrams.
732                        segment_size = buf.len();
733                        // Clip the unused capacity out of the buffer so future packets don't
734                        // overrun
735                        buf_capacity = buf.len();
736
737                        // Check whether the data we planned to send will fit in the reduced segment
738                        // size. If not, bail out and leave it for the next GSO batch so we don't
739                        // end up trying to send an empty packet. We can't easily compute the right
740                        // segment size before the original call to `space_can_send`, because at
741                        // that time we haven't determined whether we're going to coalesce with the
742                        // first datagram or potentially pad it to `MIN_INITIAL_SIZE`.
743                        if space_id == SpaceId::Data {
744                            let frame_space_1rtt =
745                                segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
746                            if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
747                                break;
748                            }
749                        }
750                    }
751                }
752
753                // Allocate space for another datagram
754                let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
755                    0 => segment_size,
756                    _ => {
757                        self.spaces[space_id].loss_probes -= 1;
758                        // Clamp the datagram to at most the minimum MTU to ensure that loss probes
759                        // can get through and enable recovery even if the path MTU has shrank
760                        // unexpectedly.
761                        usize::from(INITIAL_MTU)
762                    }
763                };
764                buf_capacity += next_datagram_size_limit;
765                if buf.capacity() < buf_capacity {
766                    // We reserve the maximum space for sending `max_datagrams` upfront
767                    // to avoid any reallocations if more datagrams have to be appended later on.
768                    // Benchmarks have shown shown a 5-10% throughput improvement
769                    // compared to continuously resizing the datagram buffer.
770                    // While this will lead to over-allocation for small transmits
771                    // (e.g. purely containing ACKs), modern memory allocators
772                    // (e.g. mimalloc and jemalloc) will pool certain allocation sizes
773                    // and therefore this is still rather efficient.
774                    buf.reserve(max_datagrams * segment_size);
775                }
776                num_datagrams += 1;
777                coalesce = true;
778                pad_datagram = false;
779                datagram_start = buf.len();
780
781                debug_assert_eq!(
782                    datagram_start % segment_size,
783                    0,
784                    "datagrams in a GSO batch must be aligned to the segment size"
785                );
786            } else {
787                // We can append/coalesce the next packet into the current
788                // datagram.
789                // Finish current packet without adding extra padding
790                if let Some(builder) = builder_storage.take() {
791                    builder.finish_and_track(now, self, sent_frames.take(), buf);
792                }
793            }
794
795            debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
796
797            //
798            // From here on, we've determined that a packet will definitely be sent.
799            //
800
801            if self.spaces[SpaceId::Initial].crypto.is_some()
802                && space_id == SpaceId::Handshake
803                && self.side.is_client()
804            {
805                // A client stops both sending and processing Initial packets when it
806                // sends its first Handshake packet.
807                self.discard_space(now, SpaceId::Initial);
808            }
809            if let Some(ref mut prev) = self.prev_crypto {
810                prev.update_unacked = false;
811            }
812
813            debug_assert!(
814                builder_storage.is_none() && sent_frames.is_none(),
815                "Previous packet must have been finished"
816            );
817
818            let builder = builder_storage.insert(PacketBuilder::new(
819                now,
820                space_id,
821                self.rem_cids.active(),
822                buf,
823                buf_capacity,
824                datagram_start,
825                ack_eliciting,
826                self,
827            )?);
828            coalesce = coalesce && !builder.short_header;
829
830            // https://tools.ietf.org/html/draft-ietf-quic-transport-34#section-14.1
831            pad_datagram |=
832                space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
833
834            if close {
835                trace!("sending CONNECTION_CLOSE");
836                // Encode ACKs before the ConnectionClose message, to give the receiver
837                // a better approximate on what data has been processed. This is
838                // especially important with ack delay, since the peer might not
839                // have gotten any other ACK for the data earlier on.
840                if !self.spaces[space_id].pending_acks.ranges().is_empty() {
841                    Self::populate_acks(
842                        now,
843                        self.receiving_ecn,
844                        &mut SentFrames::default(),
845                        &mut self.spaces[space_id],
846                        buf,
847                        &mut self.stats,
848                    );
849                }
850
851                // Since there only 64 ACK frames there will always be enough space
852                // to encode the ConnectionClose frame too. However we still have the
853                // check here to prevent crashes if something changes.
854                debug_assert!(
855                    buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
856                    "ACKs should leave space for ConnectionClose"
857                );
858                if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
859                    let max_frame_size = builder.max_size - buf.len();
860                    match self.state {
861                        State::Closed(state::Closed { ref reason }) => {
862                            if space_id == SpaceId::Data || reason.is_transport_layer() {
863                                reason.encode(buf, max_frame_size)
864                            } else {
865                                frame::ConnectionClose {
866                                    error_code: TransportErrorCode::APPLICATION_ERROR,
867                                    frame_type: None,
868                                    reason: Bytes::new(),
869                                }
870                                .encode(buf, max_frame_size)
871                            }
872                        }
873                        State::Draining => frame::ConnectionClose {
874                            error_code: TransportErrorCode::NO_ERROR,
875                            frame_type: None,
876                            reason: Bytes::new(),
877                        }
878                        .encode(buf, max_frame_size),
879                        _ => unreachable!(
880                            "tried to make a close packet when the connection wasn't closed"
881                        ),
882                    }
883                }
884                if space_id == self.highest_space {
885                    // Don't send another close packet
886                    self.close = false;
887                    // `CONNECTION_CLOSE` is the final packet
888                    break;
889                } else {
890                    // Send a close frame in every possible space for robustness, per RFC9000
891                    // "Immediate Close during the Handshake". Don't bother trying to send anything
892                    // else.
893                    space_idx += 1;
894                    continue;
895                }
896            }
897
898            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path
899            // validation can occur while the link is saturated.
900            if space_id == SpaceId::Data && num_datagrams == 1 {
901                if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
902                    // `unwrap` guaranteed to succeed because `builder_storage` was populated just
903                    // above.
904                    let mut builder = builder_storage.take().unwrap();
905                    trace!("PATH_RESPONSE {:08x} (off-path)", token);
906                    buf.write(frame::FrameType::PATH_RESPONSE);
907                    buf.write(token);
908                    self.stats.frame_tx.path_response += 1;
909                    builder.pad_to(MIN_INITIAL_SIZE);
910                    builder.finish_and_track(
911                        now,
912                        self,
913                        Some(SentFrames {
914                            non_retransmits: true,
915                            ..SentFrames::default()
916                        }),
917                        buf,
918                    );
919                    self.stats.udp_tx.on_sent(1, buf.len());
920                    return Some(Transmit {
921                        destination: remote,
922                        size: buf.len(),
923                        ecn: None,
924                        segment_size: None,
925                        src_ip: self.local_ip,
926                    });
927                }
928            }
929
930            let sent =
931                self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
932
933            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
934            // any other reason, there is a bug which leads to one component announcing write
935            // readiness while not writing any data. This degrades performance. The condition is
936            // only checked if the full MTU is available and when potentially large fixed-size
937            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
938            // writing ACKs.
939            debug_assert!(
940                !(sent.is_ack_only(&self.streams)
941                    && !can_send.acks
942                    && can_send.other
943                    && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
944                    && self.datagrams.outgoing.is_empty()),
945                "SendableFrames was {can_send:?}, but only ACKs have been written"
946            );
947            pad_datagram |= sent.requires_padding;
948
949            if sent.largest_acked.is_some() {
950                self.spaces[space_id].pending_acks.acks_sent();
951                self.timers.stop(Timer::MaxAckDelay);
952            }
953
954            // Keep information about the packet around until it gets finalized
955            sent_frames = Some(sent);
956
957            // Don't increment space_idx.
958            // We stay in the current space and check if there is more data to send.
959        }
960
961        // Finish the last packet
962        if let Some(mut builder) = builder_storage {
963            if pad_datagram {
964                builder.pad_to(MIN_INITIAL_SIZE);
965            }
966            let last_packet_number = builder.exact_number;
967            builder.finish_and_track(now, self, sent_frames, buf);
968            self.path
969                .congestion
970                .on_sent(now, buf.len() as u64, last_packet_number);
971        }
972
973        self.app_limited = buf.is_empty() && !congestion_blocked;
974
975        // Send MTU probe if necessary
976        if buf.is_empty() && self.state.is_established() {
977            let space_id = SpaceId::Data;
978            let probe_size = self
979                .path
980                .mtud
981                .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
982
983            let buf_capacity = probe_size as usize;
984            buf.reserve(buf_capacity);
985
986            let mut builder = PacketBuilder::new(
987                now,
988                space_id,
989                self.rem_cids.active(),
990                buf,
991                buf_capacity,
992                0,
993                true,
994                self,
995            )?;
996
997            // We implement MTU probes as ping packets padded up to the probe size
998            buf.write(frame::FrameType::PING);
999            self.stats.frame_tx.ping += 1;
1000
1001            // If supported by the peer, we want no delays to the probe's ACK
1002            if self.peer_supports_ack_frequency() {
1003                buf.write(frame::FrameType::IMMEDIATE_ACK);
1004                self.stats.frame_tx.immediate_ack += 1;
1005            }
1006
1007            builder.pad_to(probe_size);
1008            let sent_frames = SentFrames {
1009                non_retransmits: true,
1010                ..Default::default()
1011            };
1012            builder.finish_and_track(now, self, Some(sent_frames), buf);
1013
1014            self.stats.path.sent_plpmtud_probes += 1;
1015            num_datagrams = 1;
1016
1017            trace!(?probe_size, "writing MTUD probe");
1018        }
1019
1020        if buf.is_empty() {
1021            return None;
1022        }
1023
1024        trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1025        self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1026
1027        self.stats.udp_tx.on_sent(num_datagrams, buf.len());
1028
1029        Some(Transmit {
1030            destination: self.path.remote,
1031            size: buf.len(),
1032            ecn: if self.path.sending_ecn {
1033                Some(EcnCodepoint::Ect0)
1034            } else {
1035                None
1036            },
1037            segment_size: match num_datagrams {
1038                1 => None,
1039                _ => Some(segment_size),
1040            },
1041            src_ip: self.local_ip,
1042        })
1043    }
1044
1045    /// Indicate what types of frames are ready to send for the given space
1046    fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1047        if self.spaces[space_id].crypto.is_none()
1048            && (space_id != SpaceId::Data
1049                || self.zero_rtt_crypto.is_none()
1050                || self.side.is_server())
1051        {
1052            // No keys available for this space
1053            return SendableFrames::empty();
1054        }
1055        let mut can_send = self.spaces[space_id].can_send(&self.streams);
1056        if space_id == SpaceId::Data {
1057            can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1058        }
1059        can_send
1060    }
1061
1062    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1063    ///
1064    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1065    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1066    /// extracted through the relevant methods.
1067    pub fn handle_event(&mut self, event: ConnectionEvent) {
1068        use ConnectionEventInner::*;
1069        match event.0 {
1070            Datagram(DatagramConnectionEvent {
1071                now,
1072                remote,
1073                ecn,
1074                first_decode,
1075                remaining,
1076            }) => {
1077                // If this packet could initiate a migration and we're a client or a server that
1078                // forbids migration, drop the datagram. This could be relaxed to heuristically
1079                // permit NAT-rebinding-like migration.
1080                if remote != self.path.remote && !self.side.remote_may_migrate() {
1081                    trace!("discarding packet from unrecognized peer {}", remote);
1082                    return;
1083                }
1084
1085                let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1086
1087                self.stats.udp_rx.datagrams += 1;
1088                self.stats.udp_rx.bytes += first_decode.len() as u64;
1089                let data_len = first_decode.len();
1090
1091                self.handle_decode(now, remote, ecn, first_decode);
1092                // The current `path` might have changed inside `handle_decode`,
1093                // since the packet could have triggered a migration. Make sure
1094                // the data received is accounted for the most recent path by accessing
1095                // `path` after `handle_decode`.
1096                self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1097
1098                if let Some(data) = remaining {
1099                    self.stats.udp_rx.bytes += data.len() as u64;
1100                    self.handle_coalesced(now, remote, ecn, data);
1101                }
1102
1103                if was_anti_amplification_blocked {
1104                    // A prior attempt to set the loss detection timer may have failed due to
1105                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1106                    // the server's first flight is lost.
1107                    self.set_loss_detection_timer(now);
1108                }
1109            }
1110            NewIdentifiers(ids, now) => {
1111                self.local_cid_state.new_cids(&ids, now);
1112                ids.into_iter().rev().for_each(|frame| {
1113                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1114                });
1115                // Update Timer::PushNewCid
1116                if self
1117                    .timers
1118                    .get(Timer::PushNewCid)
1119                    .map_or(true, |x| x <= now)
1120                {
1121                    self.reset_cid_retirement();
1122                }
1123            }
1124        }
1125    }
1126
1127    /// Process timer expirations
1128    ///
1129    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1130    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1131    /// methods.
1132    ///
1133    /// It is most efficient to call this immediately after the system clock reaches the latest
1134    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1135    /// no-op and therefore are safe.
1136    pub fn handle_timeout(&mut self, now: Instant) {
1137        for &timer in &Timer::VALUES {
1138            if !self.timers.is_expired(timer, now) {
1139                continue;
1140            }
1141            self.timers.stop(timer);
1142            trace!(timer = ?timer, "timeout");
1143            match timer {
1144                Timer::Close => {
1145                    self.state = State::Drained;
1146                    self.endpoint_events.push_back(EndpointEventInner::Drained);
1147                }
1148                Timer::Idle => {
1149                    self.kill(ConnectionError::TimedOut);
1150                }
1151                Timer::KeepAlive => {
1152                    trace!("sending keep-alive");
1153                    self.ping();
1154                }
1155                Timer::LossDetection => {
1156                    self.on_loss_detection_timeout(now);
1157                }
1158                Timer::KeyDiscard => {
1159                    self.zero_rtt_crypto = None;
1160                    self.prev_crypto = None;
1161                }
1162                Timer::PathValidation => {
1163                    debug!("path validation failed");
1164                    if let Some((_, prev)) = self.prev_path.take() {
1165                        self.path = prev;
1166                    }
1167                    self.path.challenge = None;
1168                    self.path.challenge_pending = false;
1169                }
1170                Timer::Pacing => trace!("pacing timer expired"),
1171                Timer::PushNewCid => {
1172                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1173                    let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1174                    if !self.state.is_closed() {
1175                        trace!(
1176                            "push a new cid to peer RETIRE_PRIOR_TO field {}",
1177                            self.local_cid_state.retire_prior_to()
1178                        );
1179                        self.endpoint_events
1180                            .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1181                    }
1182                }
1183                Timer::MaxAckDelay => {
1184                    trace!("max ack delay reached");
1185                    // This timer is only armed in the Data space
1186                    self.spaces[SpaceId::Data]
1187                        .pending_acks
1188                        .on_max_ack_delay_timeout()
1189                }
1190            }
1191        }
1192    }
1193
1194    /// Close a connection immediately
1195    ///
1196    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
1197    /// call this only when all important communications have been completed, e.g. by calling
1198    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
1199    /// [`StreamEvent::Finished`] event.
1200    ///
1201    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
1202    /// delivered. There may still be data from the peer that has not been received.
1203    ///
1204    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
1205    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1206        self.close_inner(
1207            now,
1208            Close::Application(frame::ApplicationClose { error_code, reason }),
1209        )
1210    }
1211
1212    fn close_inner(&mut self, now: Instant, reason: Close) {
1213        let was_closed = self.state.is_closed();
1214        if !was_closed {
1215            self.close_common();
1216            self.set_close_timer(now);
1217            self.close = true;
1218            self.state = State::Closed(state::Closed { reason });
1219        }
1220    }
1221
1222    /// Control datagrams
1223    pub fn datagrams(&mut self) -> Datagrams<'_> {
1224        Datagrams { conn: self }
1225    }
1226
1227    /// Returns connection statistics
1228    pub fn stats(&self) -> ConnectionStats {
1229        let mut stats = self.stats;
1230        stats.path.rtt = self.path.rtt.get();
1231        stats.path.cwnd = self.path.congestion.window();
1232        stats.path.current_mtu = self.path.mtud.current_mtu();
1233
1234        stats
1235    }
1236
1237    /// Ping the remote endpoint
1238    ///
1239    /// Causes an ACK-eliciting packet to be transmitted.
1240    pub fn ping(&mut self) {
1241        self.spaces[self.highest_space].ping_pending = true;
1242    }
1243
1244    /// Update traffic keys spontaneously
1245    ///
1246    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
1247    pub fn force_key_update(&mut self) {
1248        self.update_keys(None, false);
1249    }
1250
1251    /// Get a session reference
1252    pub fn crypto_session(&self) -> &dyn crypto::Session {
1253        &*self.crypto
1254    }
1255
1256    /// Whether the connection is in the process of being established
1257    ///
1258    /// If this returns `false`, the connection may be either established or closed, signaled by the
1259    /// emission of a `Connected` or `ConnectionLost` message respectively.
1260    pub fn is_handshaking(&self) -> bool {
1261        self.state.is_handshake()
1262    }
1263
1264    /// Whether the connection is closed
1265    ///
1266    /// Closed connections cannot transport any further data. A connection becomes closed when
1267    /// either peer application intentionally closes it, or when either transport layer detects an
1268    /// error such as a time-out or certificate validation failure.
1269    ///
1270    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
1271    pub fn is_closed(&self) -> bool {
1272        self.state.is_closed()
1273    }
1274
1275    /// Whether there is no longer any need to keep the connection around
1276    ///
1277    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
1278    /// packets from the peer. All drained connections have been closed.
1279    pub fn is_drained(&self) -> bool {
1280        self.state.is_drained()
1281    }
1282
1283    /// For clients, if the peer accepted the 0-RTT data packets
1284    ///
1285    /// The value is meaningless until after the handshake completes.
1286    pub fn accepted_0rtt(&self) -> bool {
1287        self.accepted_0rtt
1288    }
1289
1290    /// Whether 0-RTT is/was possible during the handshake
1291    pub fn has_0rtt(&self) -> bool {
1292        self.zero_rtt_enabled
1293    }
1294
1295    /// Whether there are any pending retransmits
1296    pub fn has_pending_retransmits(&self) -> bool {
1297        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1298    }
1299
1300    /// Look up whether we're the client or server of this Connection
1301    pub fn side(&self) -> Side {
1302        self.side.side()
1303    }
1304
1305    /// The latest socket address for this connection's peer
1306    pub fn remote_address(&self) -> SocketAddr {
1307        self.path.remote
1308    }
1309
1310    /// The local IP address which was used when the peer established
1311    /// the connection
1312    ///
1313    /// This can be different from the address the endpoint is bound to, in case
1314    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
1315    ///
1316    /// This will return `None` for clients, or when no `local_ip` was passed to
1317    /// [`Endpoint::handle()`](crate::Endpoint::handle) for the datagrams establishing this
1318    /// connection.
1319    pub fn local_ip(&self) -> Option<IpAddr> {
1320        self.local_ip
1321    }
1322
1323    /// Current best estimate of this connection's latency (round-trip-time)
1324    pub fn rtt(&self) -> Duration {
1325        self.path.rtt.get()
1326    }
1327
1328    /// Current state of this connection's congestion controller, for debugging purposes
1329    pub fn congestion_state(&self) -> &dyn Controller {
1330        self.path.congestion.as_ref()
1331    }
1332
1333    /// Resets path-specific settings.
1334    ///
1335    /// This will force-reset several subsystems related to a specific network path.
1336    /// Currently this is the congestion controller, round-trip estimator, and the MTU
1337    /// discovery.
1338    ///
1339    /// This is useful when it is known the underlying network path has changed and the old
1340    /// state of these subsystems is no longer valid or optimal. In this case it might be
1341    /// faster or reduce loss to settle on optimal values by restarting from the initial
1342    /// configuration in the [`TransportConfig`].
1343    pub fn path_changed(&mut self, now: Instant) {
1344        self.path.reset(now, &self.config);
1345    }
1346
1347    /// Modify the number of remotely initiated streams that may be concurrently open
1348    ///
1349    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
1350    /// `count`s increase both minimum and worst-case memory consumption.
1351    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1352        self.streams.set_max_concurrent(dir, count);
1353        // If the limit was reduced, then a flow control update previously deemed insignificant may
1354        // now be significant.
1355        let pending = &mut self.spaces[SpaceId::Data].pending;
1356        self.streams.queue_max_stream_id(pending);
1357    }
1358
1359    /// Current number of remotely initiated streams that may be concurrently open
1360    ///
1361    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
1362    /// it will not change immediately, even if fewer streams are open. Instead, it will
1363    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
1364    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1365        self.streams.max_concurrent(dir)
1366    }
1367
1368    /// See [`TransportConfig::receive_window()`]
1369    pub fn set_receive_window(&mut self, receive_window: VarInt) {
1370        if self.streams.set_receive_window(receive_window) {
1371            self.spaces[SpaceId::Data].pending.max_data = true;
1372        }
1373    }
1374
1375    fn on_ack_received(
1376        &mut self,
1377        now: Instant,
1378        space: SpaceId,
1379        ack: frame::Ack,
1380    ) -> Result<(), TransportError> {
1381        if ack.largest >= self.spaces[space].next_packet_number {
1382            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1383        }
1384        let new_largest = {
1385            let space = &mut self.spaces[space];
1386            if space
1387                .largest_acked_packet
1388                .map_or(true, |pn| ack.largest > pn)
1389            {
1390                space.largest_acked_packet = Some(ack.largest);
1391                if let Some(info) = space.sent_packets.get(&ack.largest) {
1392                    // This should always succeed, but a misbehaving peer might ACK a packet we
1393                    // haven't sent. At worst, that will result in us spuriously reducing the
1394                    // congestion window.
1395                    space.largest_acked_packet_sent = info.time_sent;
1396                }
1397                true
1398            } else {
1399                false
1400            }
1401        };
1402
1403        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
1404        let mut newly_acked = ArrayRangeSet::new();
1405        for range in ack.iter() {
1406            self.packet_number_filter.check_ack(space, range.clone())?;
1407            for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1408                newly_acked.insert_one(pn);
1409            }
1410        }
1411
1412        if newly_acked.is_empty() {
1413            return Ok(());
1414        }
1415
1416        let mut ack_eliciting_acked = false;
1417        for packet in newly_acked.elts() {
1418            if let Some(info) = self.spaces[space].take(packet) {
1419                if let Some(acked) = info.largest_acked {
1420                    // Assume ACKs for all packets below the largest acknowledged in `packet` have
1421                    // been received. This can cause the peer to spuriously retransmit if some of
1422                    // our earlier ACKs were lost, but allows for simpler state tracking. See
1423                    // discussion at
1424                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
1425                    self.spaces[space].pending_acks.subtract_below(acked);
1426                }
1427                ack_eliciting_acked |= info.ack_eliciting;
1428
1429                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
1430                let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1431                if mtu_updated {
1432                    self.path
1433                        .congestion
1434                        .on_mtu_update(self.path.mtud.current_mtu());
1435                }
1436
1437                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
1438                self.ack_frequency.on_acked(packet);
1439
1440                self.on_packet_acked(now, packet, info);
1441            }
1442        }
1443
1444        self.path.congestion.on_end_acks(
1445            now,
1446            self.path.in_flight.bytes,
1447            self.app_limited,
1448            self.spaces[space].largest_acked_packet,
1449        );
1450
1451        if new_largest && ack_eliciting_acked {
1452            let ack_delay = if space != SpaceId::Data {
1453                Duration::from_micros(0)
1454            } else {
1455                cmp::min(
1456                    self.ack_frequency.peer_max_ack_delay,
1457                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1458                )
1459            };
1460            let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1461            self.path.rtt.update(ack_delay, rtt);
1462            if self.path.first_packet_after_rtt_sample.is_none() {
1463                self.path.first_packet_after_rtt_sample =
1464                    Some((space, self.spaces[space].next_packet_number));
1465            }
1466        }
1467
1468        // Must be called before crypto/pto_count are clobbered
1469        self.detect_lost_packets(now, space, true);
1470
1471        if self.peer_completed_address_validation() {
1472            self.pto_count = 0;
1473        }
1474
1475        // Explicit congestion notification
1476        if self.path.sending_ecn {
1477            if let Some(ecn) = ack.ecn {
1478                // We only examine ECN counters from ACKs that we are certain we received in transmit
1479                // order, allowing us to compute an increase in ECN counts to compare against the number
1480                // of newly acked packets that remains well-defined in the presence of arbitrary packet
1481                // reordering.
1482                if new_largest {
1483                    let sent = self.spaces[space].largest_acked_packet_sent;
1484                    self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1485                }
1486            } else {
1487                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
1488                debug!("ECN not acknowledged by peer");
1489                self.path.sending_ecn = false;
1490            }
1491        }
1492
1493        self.set_loss_detection_timer(now);
1494        Ok(())
1495    }
1496
1497    /// Process a new ECN block from an in-order ACK
1498    fn process_ecn(
1499        &mut self,
1500        now: Instant,
1501        space: SpaceId,
1502        newly_acked: u64,
1503        ecn: frame::EcnCounts,
1504        largest_sent_time: Instant,
1505    ) {
1506        match self.spaces[space].detect_ecn(newly_acked, ecn) {
1507            Err(e) => {
1508                debug!("halting ECN due to verification failure: {}", e);
1509                self.path.sending_ecn = false;
1510                // Wipe out the existing value because it might be garbage and could interfere with
1511                // future attempts to use ECN on new paths.
1512                self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
1513            }
1514            Ok(false) => {}
1515            Ok(true) => {
1516                self.stats.path.congestion_events += 1;
1517                self.path
1518                    .congestion
1519                    .on_congestion_event(now, largest_sent_time, false, 0);
1520            }
1521        }
1522    }
1523
1524    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
1525    // high-latency handshakes
1526    fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
1527        self.remove_in_flight(pn, &info);
1528        if info.ack_eliciting && self.path.challenge.is_none() {
1529            // Only pass ACKs to the congestion controller if we are not validating the current
1530            // path, so as to ignore any ACKs from older paths still coming in.
1531            self.path.congestion.on_ack(
1532                now,
1533                info.time_sent,
1534                info.size.into(),
1535                self.app_limited,
1536                &self.path.rtt,
1537            );
1538        }
1539
1540        // Update state for confirmed delivery of frames
1541        if let Some(retransmits) = info.retransmits.get() {
1542            for (id, _) in retransmits.reset_stream.iter() {
1543                self.streams.reset_acked(*id);
1544            }
1545        }
1546
1547        for frame in info.stream_frames {
1548            self.streams.received_ack_of(frame);
1549        }
1550    }
1551
1552    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
1553        let start = if self.zero_rtt_crypto.is_some() {
1554            now
1555        } else {
1556            self.prev_crypto
1557                .as_ref()
1558                .expect("no previous keys")
1559                .end_packet
1560                .as_ref()
1561                .expect("update not acknowledged yet")
1562                .1
1563        };
1564        self.timers
1565            .set(Timer::KeyDiscard, start + self.pto(space) * 3);
1566    }
1567
1568    fn on_loss_detection_timeout(&mut self, now: Instant) {
1569        if let Some((_, pn_space)) = self.loss_time_and_space() {
1570            // Time threshold loss Detection
1571            self.detect_lost_packets(now, pn_space, false);
1572            self.set_loss_detection_timer(now);
1573            return;
1574        }
1575
1576        let (_, space) = match self.pto_time_and_space(now) {
1577            Some(x) => x,
1578            None => {
1579                error!("PTO expired while unset");
1580                return;
1581            }
1582        };
1583        trace!(
1584            in_flight = self.path.in_flight.bytes,
1585            count = self.pto_count,
1586            ?space,
1587            "PTO fired"
1588        );
1589
1590        let count = match self.path.in_flight.ack_eliciting {
1591            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
1592            // deadlock preventions
1593            0 => {
1594                debug_assert!(!self.peer_completed_address_validation());
1595                1
1596            }
1597            // Conventional loss probe
1598            _ => 2,
1599        };
1600        self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
1601        self.pto_count = self.pto_count.saturating_add(1);
1602        self.set_loss_detection_timer(now);
1603    }
1604
1605    fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
1606        let mut lost_packets = Vec::<u64>::new();
1607        let mut lost_mtu_probe = None;
1608        let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
1609        let rtt = self.path.rtt.conservative();
1610        let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
1611
1612        // Packets sent before this time are deemed lost.
1613        let lost_send_time = now.checked_sub(loss_delay).unwrap();
1614        let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
1615        let packet_threshold = self.config.packet_threshold as u64;
1616        let mut size_of_lost_packets = 0u64;
1617
1618        // InPersistentCongestion: Determine if all packets in the time period before the newest
1619        // lost packet, including the edges, are marked lost. PTO computation must always
1620        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
1621        let congestion_period =
1622            self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
1623        let mut persistent_congestion_start: Option<Instant> = None;
1624        let mut prev_packet = None;
1625        let mut in_persistent_congestion = false;
1626
1627        let space = &mut self.spaces[pn_space];
1628        space.loss_time = None;
1629
1630        for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
1631            if prev_packet != Some(packet.wrapping_sub(1)) {
1632                // An intervening packet was acknowledged
1633                persistent_congestion_start = None;
1634            }
1635
1636            if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
1637            {
1638                if Some(packet) == in_flight_mtu_probe {
1639                    // Lost MTU probes are not included in `lost_packets`, because they should not
1640                    // trigger a congestion control response
1641                    lost_mtu_probe = in_flight_mtu_probe;
1642                } else {
1643                    lost_packets.push(packet);
1644                    size_of_lost_packets += info.size as u64;
1645                    if info.ack_eliciting && due_to_ack {
1646                        match persistent_congestion_start {
1647                            // Two ACK-eliciting packets lost more than congestion_period apart, with no
1648                            // ACKed packets in between
1649                            Some(start) if info.time_sent - start > congestion_period => {
1650                                in_persistent_congestion = true;
1651                            }
1652                            // Persistent congestion must start after the first RTT sample
1653                            None if self
1654                                .path
1655                                .first_packet_after_rtt_sample
1656                                .is_some_and(|x| x < (pn_space, packet)) =>
1657                            {
1658                                persistent_congestion_start = Some(info.time_sent);
1659                            }
1660                            _ => {}
1661                        }
1662                    }
1663                }
1664            } else {
1665                let next_loss_time = info.time_sent + loss_delay;
1666                space.loss_time = Some(
1667                    space
1668                        .loss_time
1669                        .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
1670                );
1671                persistent_congestion_start = None;
1672            }
1673
1674            prev_packet = Some(packet);
1675        }
1676
1677        // OnPacketsLost
1678        if let Some(largest_lost) = lost_packets.last().cloned() {
1679            let old_bytes_in_flight = self.path.in_flight.bytes;
1680            let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
1681            self.lost_packets += lost_packets.len() as u64;
1682            self.stats.path.lost_packets += lost_packets.len() as u64;
1683            self.stats.path.lost_bytes += size_of_lost_packets;
1684            trace!(
1685                "packets lost: {:?}, bytes lost: {}",
1686                lost_packets,
1687                size_of_lost_packets
1688            );
1689
1690            for &packet in &lost_packets {
1691                let info = self.spaces[pn_space].take(packet).unwrap(); // safe: lost_packets is populated just above
1692                self.remove_in_flight(packet, &info);
1693                for frame in info.stream_frames {
1694                    self.streams.retransmit(frame);
1695                }
1696                self.spaces[pn_space].pending |= info.retransmits;
1697                self.path.mtud.on_non_probe_lost(packet, info.size);
1698            }
1699
1700            if self.path.mtud.black_hole_detected(now) {
1701                self.stats.path.black_holes_detected += 1;
1702                self.path
1703                    .congestion
1704                    .on_mtu_update(self.path.mtud.current_mtu());
1705                if let Some(max_datagram_size) = self.datagrams().max_size() {
1706                    self.datagrams.drop_oversized(max_datagram_size);
1707                }
1708            }
1709
1710            // Don't apply congestion penalty for lost ack-only packets
1711            let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
1712
1713            if lost_ack_eliciting {
1714                self.stats.path.congestion_events += 1;
1715                self.path.congestion.on_congestion_event(
1716                    now,
1717                    largest_lost_sent,
1718                    in_persistent_congestion,
1719                    size_of_lost_packets,
1720                );
1721            }
1722        }
1723
1724        // Handle a lost MTU probe
1725        if let Some(packet) = lost_mtu_probe {
1726            let info = self.spaces[SpaceId::Data].take(packet).unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and therefore must not have been removed yet
1727            self.remove_in_flight(packet, &info);
1728            self.path.mtud.on_probe_lost();
1729            self.stats.path.lost_plpmtud_probes += 1;
1730        }
1731    }
1732
1733    fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
1734        SpaceId::iter()
1735            .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
1736            .min_by_key(|&(time, _)| time)
1737    }
1738
1739    fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
1740        let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
1741        let mut duration = self.path.rtt.pto_base() * backoff;
1742
1743        if self.path.in_flight.ack_eliciting == 0 {
1744            debug_assert!(!self.peer_completed_address_validation());
1745            let space = match self.highest_space {
1746                SpaceId::Handshake => SpaceId::Handshake,
1747                _ => SpaceId::Initial,
1748            };
1749            return Some((now + duration, space));
1750        }
1751
1752        let mut result = None;
1753        for space in SpaceId::iter() {
1754            if self.spaces[space].in_flight == 0 {
1755                continue;
1756            }
1757            if space == SpaceId::Data {
1758                // Skip ApplicationData until handshake completes.
1759                if self.is_handshaking() {
1760                    return result;
1761                }
1762                // Include max_ack_delay and backoff for ApplicationData.
1763                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
1764            }
1765            let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
1766                Some(time) => time,
1767                None => continue,
1768            };
1769            let pto = last_ack_eliciting + duration;
1770            if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
1771                result = Some((pto, space));
1772            }
1773        }
1774        result
1775    }
1776
1777    fn peer_completed_address_validation(&self) -> bool {
1778        if self.side.is_server() || self.state.is_closed() {
1779            return true;
1780        }
1781        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
1782        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
1783        self.spaces[SpaceId::Handshake]
1784            .largest_acked_packet
1785            .is_some()
1786            || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
1787            || (self.spaces[SpaceId::Data].crypto.is_some()
1788                && self.spaces[SpaceId::Handshake].crypto.is_none())
1789    }
1790
1791    fn set_loss_detection_timer(&mut self, now: Instant) {
1792        if self.state.is_closed() {
1793            // No loss detection takes place on closed connections, and `close_common` already
1794            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
1795            // reordered packet being handled by state-insensitive code.
1796            return;
1797        }
1798
1799        if let Some((loss_time, _)) = self.loss_time_and_space() {
1800            // Time threshold loss detection.
1801            self.timers.set(Timer::LossDetection, loss_time);
1802            return;
1803        }
1804
1805        if self.path.anti_amplification_blocked(1) {
1806            // We wouldn't be able to send anything, so don't bother.
1807            self.timers.stop(Timer::LossDetection);
1808            return;
1809        }
1810
1811        if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
1812            // There is nothing to detect lost, so no timer is set. However, the client needs to arm
1813            // the timer if the server might be blocked by the anti-amplification limit.
1814            self.timers.stop(Timer::LossDetection);
1815            return;
1816        }
1817
1818        // Determine which PN space to arm PTO for.
1819        // Calculate PTO duration
1820        if let Some((timeout, _)) = self.pto_time_and_space(now) {
1821            self.timers.set(Timer::LossDetection, timeout);
1822        } else {
1823            self.timers.stop(Timer::LossDetection);
1824        }
1825    }
1826
1827    /// Probe Timeout
1828    fn pto(&self, space: SpaceId) -> Duration {
1829        let max_ack_delay = match space {
1830            SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
1831            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
1832        };
1833        self.path.rtt.pto_base() + max_ack_delay
1834    }
1835
1836    fn on_packet_authenticated(
1837        &mut self,
1838        now: Instant,
1839        space_id: SpaceId,
1840        ecn: Option<EcnCodepoint>,
1841        packet: Option<u64>,
1842        spin: bool,
1843        is_1rtt: bool,
1844    ) {
1845        self.total_authed_packets += 1;
1846        self.reset_keep_alive(now);
1847        self.reset_idle_timeout(now, space_id);
1848        self.permit_idle_reset = true;
1849        self.receiving_ecn |= ecn.is_some();
1850        if let Some(x) = ecn {
1851            let space = &mut self.spaces[space_id];
1852            space.ecn_counters += x;
1853
1854            if x.is_ce() {
1855                space.pending_acks.set_immediate_ack_required();
1856            }
1857        }
1858
1859        let packet = match packet {
1860            Some(x) => x,
1861            None => return,
1862        };
1863        if self.side.is_server() {
1864            if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
1865                // A server stops sending and processing Initial packets when it receives its first Handshake packet.
1866                self.discard_space(now, SpaceId::Initial);
1867            }
1868            if self.zero_rtt_crypto.is_some() && is_1rtt {
1869                // Discard 0-RTT keys soon after receiving a 1-RTT packet
1870                self.set_key_discard_timer(now, space_id)
1871            }
1872        }
1873        let space = &mut self.spaces[space_id];
1874        space.pending_acks.insert_one(packet, now);
1875        if packet >= space.rx_packet {
1876            space.rx_packet = packet;
1877            // Update outgoing spin bit, inverting iff we're the client
1878            self.spin = self.side.is_client() ^ spin;
1879        }
1880    }
1881
1882    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
1883        let timeout = match self.idle_timeout {
1884            None => return,
1885            Some(dur) => dur,
1886        };
1887        if self.state.is_closed() {
1888            self.timers.stop(Timer::Idle);
1889            return;
1890        }
1891        let dt = cmp::max(timeout, 3 * self.pto(space));
1892        self.timers.set(Timer::Idle, now + dt);
1893    }
1894
1895    fn reset_keep_alive(&mut self, now: Instant) {
1896        let interval = match self.config.keep_alive_interval {
1897            Some(x) if self.state.is_established() => x,
1898            _ => return,
1899        };
1900        self.timers.set(Timer::KeepAlive, now + interval);
1901    }
1902
1903    fn reset_cid_retirement(&mut self) {
1904        if let Some(t) = self.local_cid_state.next_timeout() {
1905            self.timers.set(Timer::PushNewCid, t);
1906        }
1907    }
1908
1909    /// Handle the already-decrypted first packet from the client
1910    ///
1911    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
1912    /// efficient.
1913    pub(crate) fn handle_first_packet(
1914        &mut self,
1915        now: Instant,
1916        remote: SocketAddr,
1917        ecn: Option<EcnCodepoint>,
1918        packet_number: u64,
1919        packet: InitialPacket,
1920        remaining: Option<BytesMut>,
1921    ) -> Result<(), ConnectionError> {
1922        let span = trace_span!("first recv");
1923        let _guard = span.enter();
1924        debug_assert!(self.side.is_server());
1925        let len = packet.header_data.len() + packet.payload.len();
1926        self.path.total_recvd = len as u64;
1927
1928        match self.state {
1929            State::Handshake(ref mut state) => {
1930                state.expected_token = packet.header.token.clone();
1931            }
1932            _ => unreachable!("first packet must be delivered in Handshake state"),
1933        }
1934
1935        self.on_packet_authenticated(
1936            now,
1937            SpaceId::Initial,
1938            ecn,
1939            Some(packet_number),
1940            false,
1941            false,
1942        );
1943
1944        self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
1945        if let Some(data) = remaining {
1946            self.handle_coalesced(now, remote, ecn, data);
1947        }
1948        Ok(())
1949    }
1950
1951    fn init_0rtt(&mut self) {
1952        let (header, packet) = match self.crypto.early_crypto() {
1953            Some(x) => x,
1954            None => return,
1955        };
1956        if self.side.is_client() {
1957            match self.crypto.transport_parameters() {
1958                Ok(params) => {
1959                    let params = params
1960                        .expect("crypto layer didn't supply transport parameters with ticket");
1961                    // Certain values must not be cached
1962                    let params = TransportParameters {
1963                        initial_src_cid: None,
1964                        original_dst_cid: None,
1965                        preferred_address: None,
1966                        retry_src_cid: None,
1967                        stateless_reset_token: None,
1968                        min_ack_delay: None,
1969                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
1970                        max_ack_delay: TransportParameters::default().max_ack_delay,
1971                        ..params
1972                    };
1973                    self.set_peer_params(params);
1974                }
1975                Err(e) => {
1976                    error!("session ticket has malformed transport parameters: {}", e);
1977                    return;
1978                }
1979            }
1980        }
1981        trace!("0-RTT enabled");
1982        self.zero_rtt_enabled = true;
1983        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
1984    }
1985
1986    fn read_crypto(
1987        &mut self,
1988        space: SpaceId,
1989        crypto: &frame::Crypto,
1990        payload_len: usize,
1991    ) -> Result<(), TransportError> {
1992        let expected = if !self.state.is_handshake() {
1993            SpaceId::Data
1994        } else if self.highest_space == SpaceId::Initial {
1995            SpaceId::Initial
1996        } else {
1997            // On the server, self.highest_space can be Data after receiving the client's first
1998            // flight, but we expect Handshake CRYPTO until the handshake is complete.
1999            SpaceId::Handshake
2000        };
2001        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
2002        // packets are illegal, and we don't process 1-RTT packets until the handshake is
2003        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
2004        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2005
2006        let end = crypto.offset + crypto.data.len() as u64;
2007        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2008            warn!(
2009                "received new {:?} CRYPTO data when expecting {:?}",
2010                space, expected
2011            );
2012            return Err(TransportError::PROTOCOL_VIOLATION(
2013                "new data at unexpected encryption level",
2014            ));
2015        }
2016
2017        let space = &mut self.spaces[space];
2018        let max = end.saturating_sub(space.crypto_stream.bytes_read());
2019        if max > self.config.crypto_buffer_size as u64 {
2020            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2021        }
2022
2023        space
2024            .crypto_stream
2025            .insert(crypto.offset, crypto.data.clone(), payload_len);
2026        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2027            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2028            if self.crypto.read_handshake(&chunk.bytes)? {
2029                self.events.push_back(Event::HandshakeDataReady);
2030            }
2031        }
2032
2033        Ok(())
2034    }
2035
2036    fn write_crypto(&mut self) {
2037        loop {
2038            let space = self.highest_space;
2039            let mut outgoing = Vec::new();
2040            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2041                match space {
2042                    SpaceId::Initial => {
2043                        self.upgrade_crypto(SpaceId::Handshake, crypto);
2044                    }
2045                    SpaceId::Handshake => {
2046                        self.upgrade_crypto(SpaceId::Data, crypto);
2047                    }
2048                    _ => unreachable!("got updated secrets during 1-RTT"),
2049                }
2050            }
2051            if outgoing.is_empty() {
2052                if space == self.highest_space {
2053                    break;
2054                } else {
2055                    // Keys updated, check for more data to send
2056                    continue;
2057                }
2058            }
2059            let offset = self.spaces[space].crypto_offset;
2060            let outgoing = Bytes::from(outgoing);
2061            if let State::Handshake(ref mut state) = self.state {
2062                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2063                    state.client_hello = Some(outgoing.clone());
2064                }
2065            }
2066            self.spaces[space].crypto_offset += outgoing.len() as u64;
2067            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2068            self.spaces[space].pending.crypto.push_back(frame::Crypto {
2069                offset,
2070                data: outgoing,
2071            });
2072        }
2073    }
2074
2075    /// Switch to stronger cryptography during handshake
2076    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2077        debug_assert!(
2078            self.spaces[space].crypto.is_none(),
2079            "already reached packet space {space:?}"
2080        );
2081        trace!("{:?} keys ready", space);
2082        if space == SpaceId::Data {
2083            // Precompute the first key update
2084            self.next_crypto = Some(
2085                self.crypto
2086                    .next_1rtt_keys()
2087                    .expect("handshake should be complete"),
2088            );
2089        }
2090
2091        self.spaces[space].crypto = Some(crypto);
2092        debug_assert!(space as usize > self.highest_space as usize);
2093        self.highest_space = space;
2094        if space == SpaceId::Data && self.side.is_client() {
2095            // Discard 0-RTT keys because 1-RTT keys are available.
2096            self.zero_rtt_crypto = None;
2097        }
2098    }
2099
2100    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2101        debug_assert!(space_id != SpaceId::Data);
2102        trace!("discarding {:?} keys", space_id);
2103        if space_id == SpaceId::Initial {
2104            // No longer needed
2105            if let ConnectionSide::Client { token, .. } = &mut self.side {
2106                *token = Bytes::new();
2107            }
2108        }
2109        let space = &mut self.spaces[space_id];
2110        space.crypto = None;
2111        space.time_of_last_ack_eliciting_packet = None;
2112        space.loss_time = None;
2113        space.in_flight = 0;
2114        let sent_packets = mem::take(&mut space.sent_packets);
2115        for (pn, packet) in sent_packets.into_iter() {
2116            self.remove_in_flight(pn, &packet);
2117        }
2118        self.set_loss_detection_timer(now)
2119    }
2120
2121    fn handle_coalesced(
2122        &mut self,
2123        now: Instant,
2124        remote: SocketAddr,
2125        ecn: Option<EcnCodepoint>,
2126        data: BytesMut,
2127    ) {
2128        self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2129        let mut remaining = Some(data);
2130        while let Some(data) = remaining {
2131            match PartialDecode::new(
2132                data,
2133                &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2134                &[self.version],
2135                self.endpoint_config.grease_quic_bit,
2136            ) {
2137                Ok((partial_decode, rest)) => {
2138                    remaining = rest;
2139                    self.handle_decode(now, remote, ecn, partial_decode);
2140                }
2141                Err(e) => {
2142                    trace!("malformed header: {}", e);
2143                    return;
2144                }
2145            }
2146        }
2147    }
2148
2149    fn handle_decode(
2150        &mut self,
2151        now: Instant,
2152        remote: SocketAddr,
2153        ecn: Option<EcnCodepoint>,
2154        partial_decode: PartialDecode,
2155    ) {
2156        if let Some(decoded) = packet_crypto::unprotect_header(
2157            partial_decode,
2158            &self.spaces,
2159            self.zero_rtt_crypto.as_ref(),
2160            self.peer_params.stateless_reset_token,
2161        ) {
2162            self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2163        }
2164    }
2165
2166    fn handle_packet(
2167        &mut self,
2168        now: Instant,
2169        remote: SocketAddr,
2170        ecn: Option<EcnCodepoint>,
2171        packet: Option<Packet>,
2172        stateless_reset: bool,
2173    ) {
2174        self.stats.udp_rx.ios += 1;
2175        if let Some(ref packet) = packet {
2176            trace!(
2177                "got {:?} packet ({} bytes) from {} using id {}",
2178                packet.header.space(),
2179                packet.payload.len() + packet.header_data.len(),
2180                remote,
2181                packet.header.dst_cid(),
2182            );
2183        }
2184
2185        if self.is_handshaking() && remote != self.path.remote {
2186            debug!("discarding packet with unexpected remote during handshake");
2187            return;
2188        }
2189
2190        let was_closed = self.state.is_closed();
2191        let was_drained = self.state.is_drained();
2192
2193        let decrypted = match packet {
2194            None => Err(None),
2195            Some(mut packet) => self
2196                .decrypt_packet(now, &mut packet)
2197                .map(move |number| (packet, number)),
2198        };
2199        let result = match decrypted {
2200            _ if stateless_reset => {
2201                debug!("got stateless reset");
2202                Err(ConnectionError::Reset)
2203            }
2204            Err(Some(e)) => {
2205                warn!("illegal packet: {}", e);
2206                Err(e.into())
2207            }
2208            Err(None) => {
2209                debug!("failed to authenticate packet");
2210                self.authentication_failures += 1;
2211                let integrity_limit = self.spaces[self.highest_space]
2212                    .crypto
2213                    .as_ref()
2214                    .unwrap()
2215                    .packet
2216                    .local
2217                    .integrity_limit();
2218                if self.authentication_failures > integrity_limit {
2219                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2220                } else {
2221                    return;
2222                }
2223            }
2224            Ok((packet, number)) => {
2225                let span = match number {
2226                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2227                    None => trace_span!("recv", space = ?packet.header.space()),
2228                };
2229                let _guard = span.enter();
2230
2231                let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2232                if number.is_some_and(is_duplicate) {
2233                    debug!("discarding possible duplicate packet");
2234                    return;
2235                } else if self.state.is_handshake() && packet.header.is_short() {
2236                    // TODO: SHOULD buffer these to improve reordering tolerance.
2237                    trace!("dropping short packet during handshake");
2238                    return;
2239                } else {
2240                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2241                        if let State::Handshake(ref hs) = self.state {
2242                            if self.side.is_server() && token != &hs.expected_token {
2243                                // Clients must send the same retry token in every Initial. Initial
2244                                // packets can be spoofed, so we discard rather than killing the
2245                                // connection.
2246                                warn!("discarding Initial with invalid retry token");
2247                                return;
2248                            }
2249                        }
2250                    }
2251
2252                    if !self.state.is_closed() {
2253                        let spin = match packet.header {
2254                            Header::Short { spin, .. } => spin,
2255                            _ => false,
2256                        };
2257                        self.on_packet_authenticated(
2258                            now,
2259                            packet.header.space(),
2260                            ecn,
2261                            number,
2262                            spin,
2263                            packet.header.is_1rtt(),
2264                        );
2265                    }
2266                    self.process_decrypted_packet(now, remote, number, packet)
2267                }
2268            }
2269        };
2270
2271        // State transitions for error cases
2272        if let Err(conn_err) = result {
2273            self.error = Some(conn_err.clone());
2274            self.state = match conn_err {
2275                ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2276                ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2277                ConnectionError::Reset
2278                | ConnectionError::TransportError(TransportError {
2279                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
2280                    ..
2281                }) => State::Drained,
2282                ConnectionError::TimedOut => {
2283                    unreachable!("timeouts aren't generated by packet processing");
2284                }
2285                ConnectionError::TransportError(err) => {
2286                    debug!("closing connection due to transport error: {}", err);
2287                    State::closed(err)
2288                }
2289                ConnectionError::VersionMismatch => State::Draining,
2290                ConnectionError::LocallyClosed => {
2291                    unreachable!("LocallyClosed isn't generated by packet processing");
2292                }
2293                ConnectionError::CidsExhausted => {
2294                    unreachable!("CidsExhausted isn't generated by packet processing");
2295                }
2296            };
2297        }
2298
2299        if !was_closed && self.state.is_closed() {
2300            self.close_common();
2301            if !self.state.is_drained() {
2302                self.set_close_timer(now);
2303            }
2304        }
2305        if !was_drained && self.state.is_drained() {
2306            self.endpoint_events.push_back(EndpointEventInner::Drained);
2307            // Close timer may have been started previously, e.g. if we sent a close and got a
2308            // stateless reset in response
2309            self.timers.stop(Timer::Close);
2310        }
2311
2312        // Transmit CONNECTION_CLOSE if necessary
2313        if let State::Closed(_) = self.state {
2314            self.close = remote == self.path.remote;
2315        }
2316    }
2317
2318    fn process_decrypted_packet(
2319        &mut self,
2320        now: Instant,
2321        remote: SocketAddr,
2322        number: Option<u64>,
2323        packet: Packet,
2324    ) -> Result<(), ConnectionError> {
2325        let state = match self.state {
2326            State::Established => {
2327                match packet.header.space() {
2328                    SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2329                    _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2330                    _ => {
2331                        trace!("discarding unexpected pre-handshake packet");
2332                    }
2333                }
2334                return Ok(());
2335            }
2336            State::Closed(_) => {
2337                for result in frame::Iter::new(packet.payload.freeze())? {
2338                    let frame = match result {
2339                        Ok(frame) => frame,
2340                        Err(err) => {
2341                            debug!("frame decoding error: {err:?}");
2342                            continue;
2343                        }
2344                    };
2345
2346                    if let Frame::Padding = frame {
2347                        continue;
2348                    };
2349
2350                    self.stats.frame_rx.record(&frame);
2351
2352                    if let Frame::Close(_) = frame {
2353                        trace!("draining");
2354                        self.state = State::Draining;
2355                        break;
2356                    }
2357                }
2358                return Ok(());
2359            }
2360            State::Draining | State::Drained => return Ok(()),
2361            State::Handshake(ref mut state) => state,
2362        };
2363
2364        match packet.header {
2365            Header::Retry {
2366                src_cid: rem_cid, ..
2367            } => {
2368                if self.side.is_server() {
2369                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2370                }
2371
2372                if self.total_authed_packets > 1
2373                            || packet.payload.len() <= 16 // token + 16 byte tag
2374                            || !self.crypto.is_valid_retry(
2375                                &self.rem_cids.active(),
2376                                &packet.header_data,
2377                                &packet.payload,
2378                            )
2379                {
2380                    trace!("discarding invalid Retry");
2381                    // - After the client has received and processed an Initial or Retry
2382                    //   packet from the server, it MUST discard any subsequent Retry
2383                    //   packets that it receives.
2384                    // - A client MUST discard a Retry packet with a zero-length Retry Token
2385                    //   field.
2386                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
2387                    //   that cannot be validated
2388                    return Ok(());
2389                }
2390
2391                trace!("retrying with CID {}", rem_cid);
2392                let client_hello = state.client_hello.take().unwrap();
2393                self.retry_src_cid = Some(rem_cid);
2394                self.rem_cids.update_initial_cid(rem_cid);
2395                self.rem_handshake_cid = rem_cid;
2396
2397                let space = &mut self.spaces[SpaceId::Initial];
2398                if let Some(info) = space.take(0) {
2399                    self.on_packet_acked(now, 0, info);
2400                };
2401
2402                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after any retransmitted Initials
2403                self.spaces[SpaceId::Initial] = PacketSpace {
2404                    crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2405                    next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2406                    crypto_offset: client_hello.len() as u64,
2407                    ..PacketSpace::new(now)
2408                };
2409                self.spaces[SpaceId::Initial]
2410                    .pending
2411                    .crypto
2412                    .push_back(frame::Crypto {
2413                        offset: 0,
2414                        data: client_hello,
2415                    });
2416
2417                // Retransmit all 0-RTT data
2418                let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2419                for (pn, info) in zero_rtt {
2420                    self.remove_in_flight(pn, &info);
2421                    self.spaces[SpaceId::Data].pending |= info.retransmits;
2422                }
2423                self.streams.retransmit_all_for_0rtt();
2424
2425                let token_len = packet.payload.len() - 16;
2426                let ConnectionSide::Client { ref mut token, .. } = self.side else {
2427                    unreachable!("we already short-circuited if we're server");
2428                };
2429                *token = packet.payload.freeze().split_to(token_len);
2430                self.state = State::Handshake(state::Handshake {
2431                    expected_token: Bytes::new(),
2432                    rem_cid_set: false,
2433                    client_hello: None,
2434                });
2435                Ok(())
2436            }
2437            Header::Long {
2438                ty: LongType::Handshake,
2439                src_cid: rem_cid,
2440                ..
2441            } => {
2442                if rem_cid != self.rem_handshake_cid {
2443                    debug!(
2444                        "discarding packet with mismatched remote CID: {} != {}",
2445                        self.rem_handshake_cid, rem_cid
2446                    );
2447                    return Ok(());
2448                }
2449                self.on_path_validated();
2450
2451                self.process_early_payload(now, packet)?;
2452                if self.state.is_closed() {
2453                    return Ok(());
2454                }
2455
2456                if self.crypto.is_handshaking() {
2457                    trace!("handshake ongoing");
2458                    return Ok(());
2459                }
2460
2461                if self.side.is_client() {
2462                    // Client-only because server params were set from the client's Initial
2463                    let params =
2464                        self.crypto
2465                            .transport_parameters()?
2466                            .ok_or_else(|| TransportError {
2467                                code: TransportErrorCode::crypto(0x6d),
2468                                frame: None,
2469                                reason: "transport parameters missing".into(),
2470                            })?;
2471
2472                    if self.has_0rtt() {
2473                        if !self.crypto.early_data_accepted().unwrap() {
2474                            debug_assert!(self.side.is_client());
2475                            debug!("0-RTT rejected");
2476                            self.accepted_0rtt = false;
2477                            self.streams.zero_rtt_rejected();
2478
2479                            // Discard already-queued frames
2480                            self.spaces[SpaceId::Data].pending = Retransmits::default();
2481
2482                            // Discard 0-RTT packets
2483                            let sent_packets =
2484                                mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2485                            for (pn, packet) in sent_packets {
2486                                self.remove_in_flight(pn, &packet);
2487                            }
2488                        } else {
2489                            self.accepted_0rtt = true;
2490                            params.validate_resumption_from(&self.peer_params)?;
2491                        }
2492                    }
2493                    if let Some(token) = params.stateless_reset_token {
2494                        self.endpoint_events
2495                            .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2496                    }
2497                    self.handle_peer_params(params)?;
2498                    self.issue_first_cids(now);
2499                } else {
2500                    // Server-only
2501                    self.spaces[SpaceId::Data].pending.handshake_done = true;
2502                    self.discard_space(now, SpaceId::Handshake);
2503                }
2504
2505                self.events.push_back(Event::Connected);
2506                self.state = State::Established;
2507                trace!("established");
2508                Ok(())
2509            }
2510            Header::Initial(InitialHeader {
2511                src_cid: rem_cid, ..
2512            }) => {
2513                if !state.rem_cid_set {
2514                    trace!("switching remote CID to {}", rem_cid);
2515                    let mut state = state.clone();
2516                    self.rem_cids.update_initial_cid(rem_cid);
2517                    self.rem_handshake_cid = rem_cid;
2518                    self.orig_rem_cid = rem_cid;
2519                    state.rem_cid_set = true;
2520                    self.state = State::Handshake(state);
2521                } else if rem_cid != self.rem_handshake_cid {
2522                    debug!(
2523                        "discarding packet with mismatched remote CID: {} != {}",
2524                        self.rem_handshake_cid, rem_cid
2525                    );
2526                    return Ok(());
2527                }
2528
2529                let starting_space = self.highest_space;
2530                self.process_early_payload(now, packet)?;
2531
2532                if self.side.is_server()
2533                    && starting_space == SpaceId::Initial
2534                    && self.highest_space != SpaceId::Initial
2535                {
2536                    let params =
2537                        self.crypto
2538                            .transport_parameters()?
2539                            .ok_or_else(|| TransportError {
2540                                code: TransportErrorCode::crypto(0x6d),
2541                                frame: None,
2542                                reason: "transport parameters missing".into(),
2543                            })?;
2544                    self.handle_peer_params(params)?;
2545                    self.issue_first_cids(now);
2546                    self.init_0rtt();
2547                }
2548                Ok(())
2549            }
2550            Header::Long {
2551                ty: LongType::ZeroRtt,
2552                ..
2553            } => {
2554                self.process_payload(now, remote, number.unwrap(), packet)?;
2555                Ok(())
2556            }
2557            Header::VersionNegotiate { .. } => {
2558                if self.total_authed_packets > 1 {
2559                    return Ok(());
2560                }
2561                let supported = packet
2562                    .payload
2563                    .chunks(4)
2564                    .any(|x| match <[u8; 4]>::try_from(x) {
2565                        Ok(version) => self.version == u32::from_be_bytes(version),
2566                        Err(_) => false,
2567                    });
2568                if supported {
2569                    return Ok(());
2570                }
2571                debug!("remote doesn't support our version");
2572                Err(ConnectionError::VersionMismatch)
2573            }
2574            Header::Short { .. } => unreachable!(
2575                "short packets received during handshake are discarded in handle_packet"
2576            ),
2577        }
2578    }
2579
2580    /// Process an Initial or Handshake packet payload
2581    fn process_early_payload(
2582        &mut self,
2583        now: Instant,
2584        packet: Packet,
2585    ) -> Result<(), TransportError> {
2586        debug_assert_ne!(packet.header.space(), SpaceId::Data);
2587        let payload_len = packet.payload.len();
2588        let mut ack_eliciting = false;
2589        for result in frame::Iter::new(packet.payload.freeze())? {
2590            let frame = result?;
2591            let span = match frame {
2592                Frame::Padding => continue,
2593                _ => Some(trace_span!("frame", ty = %frame.ty())),
2594            };
2595
2596            self.stats.frame_rx.record(&frame);
2597
2598            let _guard = span.as_ref().map(|x| x.enter());
2599            ack_eliciting |= frame.is_ack_eliciting();
2600
2601            // Process frames
2602            match frame {
2603                Frame::Padding | Frame::Ping => {}
2604                Frame::Crypto(frame) => {
2605                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
2606                }
2607                Frame::Ack(ack) => {
2608                    self.on_ack_received(now, packet.header.space(), ack)?;
2609                }
2610                Frame::Close(reason) => {
2611                    self.error = Some(reason.into());
2612                    self.state = State::Draining;
2613                    return Ok(());
2614                }
2615                _ => {
2616                    let mut err =
2617                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
2618                    err.frame = Some(frame.ty());
2619                    return Err(err);
2620                }
2621            }
2622        }
2623
2624        if ack_eliciting {
2625            // In the initial and handshake spaces, ACKs must be sent immediately
2626            self.spaces[packet.header.space()]
2627                .pending_acks
2628                .set_immediate_ack_required();
2629        }
2630
2631        self.write_crypto();
2632        Ok(())
2633    }
2634
2635    fn process_payload(
2636        &mut self,
2637        now: Instant,
2638        remote: SocketAddr,
2639        number: u64,
2640        packet: Packet,
2641    ) -> Result<(), TransportError> {
2642        let payload = packet.payload.freeze();
2643        let mut is_probing_packet = true;
2644        let mut close = None;
2645        let payload_len = payload.len();
2646        let mut ack_eliciting = false;
2647        // if this packet triggers a path migration and includes a observed address frame, it's
2648        // stored here
2649        let mut migration_observed_addr = None;
2650        for result in frame::Iter::new(payload)? {
2651            let frame = result?;
2652            let span = match frame {
2653                Frame::Padding => continue,
2654                _ => Some(trace_span!("frame", ty = %frame.ty())),
2655            };
2656
2657            self.stats.frame_rx.record(&frame);
2658            // Crypto, Stream and Datagram frames are special cased in order no pollute
2659            // the log with payload data
2660            match &frame {
2661                Frame::Crypto(f) => {
2662                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
2663                }
2664                Frame::Stream(f) => {
2665                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
2666                }
2667                Frame::Datagram(f) => {
2668                    trace!(len = f.data.len(), "got datagram frame");
2669                }
2670                f => {
2671                    trace!("got frame {:?}", f);
2672                }
2673            }
2674
2675            let _guard = span.as_ref().map(|x| x.enter());
2676            if packet.header.is_0rtt() {
2677                match frame {
2678                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
2679                        return Err(TransportError::PROTOCOL_VIOLATION(
2680                            "illegal frame type in 0-RTT",
2681                        ));
2682                    }
2683                    _ => {}
2684                }
2685            }
2686            ack_eliciting |= frame.is_ack_eliciting();
2687
2688            // Check whether this could be a probing packet
2689            match frame {
2690                Frame::Padding
2691                | Frame::PathChallenge(_)
2692                | Frame::PathResponse(_)
2693                | Frame::NewConnectionId(_)
2694                | Frame::ObservedAddr(_) => {}
2695                _ => {
2696                    is_probing_packet = false;
2697                }
2698            }
2699            match frame {
2700                Frame::Crypto(frame) => {
2701                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
2702                }
2703                Frame::Stream(frame) => {
2704                    if self.streams.received(frame, payload_len)?.should_transmit() {
2705                        self.spaces[SpaceId::Data].pending.max_data = true;
2706                    }
2707                }
2708                Frame::Ack(ack) => {
2709                    self.on_ack_received(now, SpaceId::Data, ack)?;
2710                }
2711                Frame::Padding | Frame::Ping => {}
2712                Frame::Close(reason) => {
2713                    close = Some(reason);
2714                }
2715                Frame::PathChallenge(token) => {
2716                    self.path_responses.push(number, token, remote);
2717                    if remote == self.path.remote {
2718                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
2719                        // attack. Send a non-probing packet to recover the active path.
2720                        match self.peer_supports_ack_frequency() {
2721                            true => self.immediate_ack(),
2722                            false => self.ping(),
2723                        }
2724                    }
2725                }
2726                Frame::PathResponse(token) => {
2727                    if self.path.challenge == Some(token) && remote == self.path.remote {
2728                        trace!("new path validated");
2729                        self.timers.stop(Timer::PathValidation);
2730                        self.path.challenge = None;
2731                        self.path.validated = true;
2732                        if let Some((_, ref mut prev_path)) = self.prev_path {
2733                            prev_path.challenge = None;
2734                            prev_path.challenge_pending = false;
2735                        }
2736                    } else {
2737                        debug!(token, "ignoring invalid PATH_RESPONSE");
2738                    }
2739                }
2740                Frame::MaxData(bytes) => {
2741                    self.streams.received_max_data(bytes);
2742                }
2743                Frame::MaxStreamData { id, offset } => {
2744                    self.streams.received_max_stream_data(id, offset)?;
2745                }
2746                Frame::MaxStreams { dir, count } => {
2747                    self.streams.received_max_streams(dir, count)?;
2748                }
2749                Frame::ResetStream(frame) => {
2750                    if self.streams.received_reset(frame)?.should_transmit() {
2751                        self.spaces[SpaceId::Data].pending.max_data = true;
2752                    }
2753                }
2754                Frame::DataBlocked { offset } => {
2755                    debug!(offset, "peer claims to be blocked at connection level");
2756                }
2757                Frame::StreamDataBlocked { id, offset } => {
2758                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
2759                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
2760                        return Err(TransportError::STREAM_STATE_ERROR(
2761                            "STREAM_DATA_BLOCKED on send-only stream",
2762                        ));
2763                    }
2764                    debug!(
2765                        stream = %id,
2766                        offset, "peer claims to be blocked at stream level"
2767                    );
2768                }
2769                Frame::StreamsBlocked { dir, limit } => {
2770                    if limit > MAX_STREAM_COUNT {
2771                        return Err(TransportError::FRAME_ENCODING_ERROR(
2772                            "unrepresentable stream limit",
2773                        ));
2774                    }
2775                    debug!(
2776                        "peer claims to be blocked opening more than {} {} streams",
2777                        limit, dir
2778                    );
2779                }
2780                Frame::StopSending(frame::StopSending { id, error_code }) => {
2781                    if id.initiator() != self.side.side() {
2782                        if id.dir() == Dir::Uni {
2783                            debug!("got STOP_SENDING on recv-only {}", id);
2784                            return Err(TransportError::STREAM_STATE_ERROR(
2785                                "STOP_SENDING on recv-only stream",
2786                            ));
2787                        }
2788                    } else if self.streams.is_local_unopened(id) {
2789                        return Err(TransportError::STREAM_STATE_ERROR(
2790                            "STOP_SENDING on unopened stream",
2791                        ));
2792                    }
2793                    self.streams.received_stop_sending(id, error_code);
2794                }
2795                Frame::RetireConnectionId { sequence } => {
2796                    let allow_more_cids = self
2797                        .local_cid_state
2798                        .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
2799                    self.endpoint_events
2800                        .push_back(EndpointEventInner::RetireConnectionId(
2801                            now,
2802                            sequence,
2803                            allow_more_cids,
2804                        ));
2805                }
2806                Frame::NewConnectionId(frame) => {
2807                    trace!(
2808                        sequence = frame.sequence,
2809                        id = %frame.id,
2810                        retire_prior_to = frame.retire_prior_to,
2811                    );
2812                    if self.rem_cids.active().is_empty() {
2813                        return Err(TransportError::PROTOCOL_VIOLATION(
2814                            "NEW_CONNECTION_ID when CIDs aren't in use",
2815                        ));
2816                    }
2817                    if frame.retire_prior_to > frame.sequence {
2818                        return Err(TransportError::PROTOCOL_VIOLATION(
2819                            "NEW_CONNECTION_ID retiring unissued CIDs",
2820                        ));
2821                    }
2822
2823                    use crate::cid_queue::InsertError;
2824                    match self.rem_cids.insert(frame) {
2825                        Ok(None) => {}
2826                        Ok(Some((retired, reset_token))) => {
2827                            let pending_retired =
2828                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
2829                            /// Ensure `pending_retired` cannot grow without bound. Limit is
2830                            /// somewhat arbitrary but very permissive.
2831                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
2832                            // We don't bother counting in-flight frames because those are bounded
2833                            // by congestion control.
2834                            if (pending_retired.len() as u64)
2835                                .saturating_add(retired.end.saturating_sub(retired.start))
2836                                > MAX_PENDING_RETIRED_CIDS
2837                            {
2838                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
2839                                    "queued too many retired CIDs",
2840                                ));
2841                            }
2842                            pending_retired.extend(retired);
2843                            self.set_reset_token(reset_token);
2844                        }
2845                        Err(InsertError::ExceedsLimit) => {
2846                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
2847                        }
2848                        Err(InsertError::Retired) => {
2849                            trace!("discarding already-retired");
2850                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
2851                            // range of connection IDs larger than the active connection ID limit
2852                            // was retired all at once via retire_prior_to.
2853                            self.spaces[SpaceId::Data]
2854                                .pending
2855                                .retire_cids
2856                                .push(frame.sequence);
2857                            continue;
2858                        }
2859                    };
2860
2861                    if self.side.is_server() && self.rem_cids.active_seq() == 0 {
2862                        // We're a server still using the initial remote CID for the client, so
2863                        // let's switch immediately to enable clientside stateless resets.
2864                        self.update_rem_cid();
2865                    }
2866                }
2867                Frame::NewToken(NewToken { token }) => {
2868                    let ConnectionSide::Client {
2869                        token_store,
2870                        server_name,
2871                        ..
2872                    } = &self.side
2873                    else {
2874                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
2875                    };
2876                    if token.is_empty() {
2877                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
2878                    }
2879                    trace!("got new token");
2880                    token_store.insert(server_name, token);
2881                }
2882                Frame::Datagram(datagram) => {
2883                    if self
2884                        .datagrams
2885                        .received(datagram, &self.config.datagram_receive_buffer_size)?
2886                    {
2887                        self.events.push_back(Event::DatagramReceived);
2888                    }
2889                }
2890                Frame::AckFrequency(ack_frequency) => {
2891                    // This frame can only be sent in the Data space
2892                    let space = &mut self.spaces[SpaceId::Data];
2893
2894                    if !self
2895                        .ack_frequency
2896                        .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
2897                    {
2898                        // The AckFrequency frame is stale (we have already received a more recent one)
2899                        continue;
2900                    }
2901
2902                    // Our `max_ack_delay` has been updated, so we may need to adjust its associated
2903                    // timeout
2904                    if let Some(timeout) = space
2905                        .pending_acks
2906                        .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
2907                    {
2908                        self.timers.set(Timer::MaxAckDelay, timeout);
2909                    }
2910                }
2911                Frame::ImmediateAck => {
2912                    // This frame can only be sent in the Data space
2913                    self.spaces[SpaceId::Data]
2914                        .pending_acks
2915                        .set_immediate_ack_required();
2916                }
2917                Frame::HandshakeDone => {
2918                    if self.side.is_server() {
2919                        return Err(TransportError::PROTOCOL_VIOLATION(
2920                            "client sent HANDSHAKE_DONE",
2921                        ));
2922                    }
2923                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
2924                        self.discard_space(now, SpaceId::Handshake);
2925                    }
2926                }
2927                Frame::ObservedAddr(observed) => {
2928                    // check if params allows the peer to send report and this node to receive it
2929                    if !self
2930                        .peer_params
2931                        .address_discovery_role
2932                        .should_report(&self.config.address_discovery_role)
2933                    {
2934                        return Err(TransportError::PROTOCOL_VIOLATION(
2935                            "received OBSERVED_ADDRESS frame when not negotiated",
2936                        ));
2937                    }
2938                    // must only be sent in data space
2939                    if packet.header.space() != SpaceId::Data {
2940                        return Err(TransportError::PROTOCOL_VIOLATION(
2941                            "OBSERVED_ADDRESS frame outside data space",
2942                        ));
2943                    }
2944
2945                    if remote == self.path.remote {
2946                        if let Some(updated) = self.path.update_observed_addr_report(observed) {
2947                            self.events.push_back(Event::ObservedAddr(updated));
2948                        }
2949                    } else {
2950                        // include in migration
2951                        migration_observed_addr = Some(observed)
2952                    }
2953                }
2954            }
2955        }
2956
2957        let space = &mut self.spaces[SpaceId::Data];
2958        if space
2959            .pending_acks
2960            .packet_received(now, number, ack_eliciting, &space.dedup)
2961        {
2962            self.timers
2963                .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
2964        }
2965
2966        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
2967        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
2968        // are only freed, and hence only issue credit, once the application has been notified
2969        // during a read on the stream.
2970        let pending = &mut self.spaces[SpaceId::Data].pending;
2971        self.streams.queue_max_stream_id(pending);
2972
2973        if let Some(reason) = close {
2974            self.error = Some(reason.into());
2975            self.state = State::Draining;
2976            self.close = true;
2977        }
2978
2979        if remote != self.path.remote
2980            && !is_probing_packet
2981            && number == self.spaces[SpaceId::Data].rx_packet
2982        {
2983            let ConnectionSide::Server { ref server_config } = self.side else {
2984                panic!("packets from unknown remote should be dropped by clients");
2985            };
2986            debug_assert!(
2987                server_config.migration,
2988                "migration-initiating packets should have been dropped immediately"
2989            );
2990            self.migrate(now, remote, migration_observed_addr);
2991            // Break linkability, if possible
2992            self.update_rem_cid();
2993            self.spin = false;
2994        }
2995
2996        Ok(())
2997    }
2998
2999    fn migrate(&mut self, now: Instant, remote: SocketAddr, observed_addr: Option<ObservedAddr>) {
3000        trace!(%remote, "migration initiated");
3001        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
3002        // Note that the congestion window will not grow until validation terminates. Helps mitigate
3003        // amplification attacks performed by spoofing source addresses.
3004        let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3005            PathData::from_previous(remote, &self.path, now)
3006        } else {
3007            let peer_max_udp_payload_size =
3008                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3009                    .unwrap_or(u16::MAX);
3010            PathData::new(
3011                remote,
3012                self.allow_mtud,
3013                Some(peer_max_udp_payload_size),
3014                now,
3015                &self.config,
3016            )
3017        };
3018        new_path.last_observed_addr_report = self.path.last_observed_addr_report.clone();
3019        if let Some(report) = observed_addr {
3020            if let Some(updated) = new_path.update_observed_addr_report(report) {
3021                self.events.push_back(Event::ObservedAddr(updated));
3022            }
3023        }
3024        new_path.challenge = Some(self.rng.gen());
3025        new_path.challenge_pending = true;
3026        let prev_pto = self.pto(SpaceId::Data);
3027
3028        let mut prev = mem::replace(&mut self.path, new_path);
3029        // Don't clobber the original path if the previous one hasn't been validated yet
3030        if prev.challenge.is_none() {
3031            prev.challenge = Some(self.rng.gen());
3032            prev.challenge_pending = true;
3033            // We haven't updated the remote CID yet, this captures the remote CID we were using on
3034            // the previous path.
3035            self.prev_path = Some((self.rem_cids.active(), prev));
3036        }
3037
3038        self.timers.set(
3039            Timer::PathValidation,
3040            now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3041        );
3042    }
3043
3044    /// Handle a change in the local address, i.e. an active migration
3045    pub fn local_address_changed(&mut self) {
3046        self.update_rem_cid();
3047        self.ping();
3048    }
3049
3050    /// Switch to a previously unused remote connection ID, if possible
3051    fn update_rem_cid(&mut self) {
3052        let (reset_token, retired) = match self.rem_cids.next() {
3053            Some(x) => x,
3054            None => return,
3055        };
3056
3057        // Retire the current remote CID and any CIDs we had to skip.
3058        self.spaces[SpaceId::Data]
3059            .pending
3060            .retire_cids
3061            .extend(retired);
3062        self.set_reset_token(reset_token);
3063    }
3064
3065    fn set_reset_token(&mut self, reset_token: ResetToken) {
3066        self.endpoint_events
3067            .push_back(EndpointEventInner::ResetToken(
3068                self.path.remote,
3069                reset_token,
3070            ));
3071        self.peer_params.stateless_reset_token = Some(reset_token);
3072    }
3073
3074    /// Issue an initial set of connection IDs to the peer upon connection
3075    fn issue_first_cids(&mut self, now: Instant) {
3076        if self.local_cid_state.cid_len() == 0 {
3077            return;
3078        }
3079
3080        // Subtract 1 to account for the CID we supplied while handshaking
3081        let n = self.peer_params.issue_cids_limit() - 1;
3082        self.endpoint_events
3083            .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3084    }
3085
3086    fn populate_packet(
3087        &mut self,
3088        now: Instant,
3089        space_id: SpaceId,
3090        buf: &mut Vec<u8>,
3091        max_size: usize,
3092        pn: u64,
3093    ) -> SentFrames {
3094        let mut sent = SentFrames::default();
3095        let space = &mut self.spaces[space_id];
3096        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3097        space.pending_acks.maybe_ack_non_eliciting();
3098
3099        // HANDSHAKE_DONE
3100        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3101            buf.write(frame::FrameType::HANDSHAKE_DONE);
3102            sent.retransmits.get_or_create().handshake_done = true;
3103            // This is just a u8 counter and the frame is typically just sent once
3104            self.stats.frame_tx.handshake_done =
3105                self.stats.frame_tx.handshake_done.saturating_add(1);
3106        }
3107
3108        // OBSERVED_ADDR
3109        let mut send_observed_address =
3110            |space_id: SpaceId,
3111             buf: &mut Vec<u8>,
3112             max_size: usize,
3113             space: &mut PacketSpace,
3114             sent: &mut SentFrames,
3115             stats: &mut ConnectionStats,
3116             skip_sent_check: bool| {
3117                // should only be sent within Data space and only if allowed by extension
3118                // negotiation
3119                // send is also skipped if the path has already sent an observed address
3120                let send_allowed = self
3121                    .config
3122                    .address_discovery_role
3123                    .should_report(&self.peer_params.address_discovery_role);
3124                let send_required =
3125                    space.pending.observed_addr || !self.path.observed_addr_sent || skip_sent_check;
3126                if space_id != SpaceId::Data || !send_allowed || !send_required {
3127                    return;
3128                }
3129
3130                let observed =
3131                    frame::ObservedAddr::new(self.path.remote, self.next_observed_addr_seq_no);
3132
3133                if buf.len() + observed.size() < max_size {
3134                    observed.write(buf);
3135
3136                    self.next_observed_addr_seq_no =
3137                        self.next_observed_addr_seq_no.saturating_add(1u8);
3138                    self.path.observed_addr_sent = true;
3139
3140                    stats.frame_tx.observed_addr += 1;
3141                    sent.retransmits.get_or_create().observed_addr = true;
3142                    space.pending.observed_addr = false;
3143                }
3144            };
3145        send_observed_address(
3146            space_id,
3147            buf,
3148            max_size,
3149            space,
3150            &mut sent,
3151            &mut self.stats,
3152            false,
3153        );
3154
3155        // PING
3156        if mem::replace(&mut space.ping_pending, false) {
3157            trace!("PING");
3158            buf.write(frame::FrameType::PING);
3159            sent.non_retransmits = true;
3160            self.stats.frame_tx.ping += 1;
3161        }
3162
3163        // IMMEDIATE_ACK
3164        if mem::replace(&mut space.immediate_ack_pending, false) {
3165            trace!("IMMEDIATE_ACK");
3166            buf.write(frame::FrameType::IMMEDIATE_ACK);
3167            sent.non_retransmits = true;
3168            self.stats.frame_tx.immediate_ack += 1;
3169        }
3170
3171        // ACK
3172        if space.pending_acks.can_send() {
3173            Self::populate_acks(
3174                now,
3175                self.receiving_ecn,
3176                &mut sent,
3177                space,
3178                buf,
3179                &mut self.stats,
3180            );
3181        }
3182
3183        // ACK_FREQUENCY
3184        if mem::replace(&mut space.pending.ack_frequency, false) {
3185            let sequence_number = self.ack_frequency.next_sequence_number();
3186
3187            // Safe to unwrap because this is always provided when ACK frequency is enabled
3188            let config = self.config.ack_frequency_config.as_ref().unwrap();
3189
3190            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
3191            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3192                self.path.rtt.get(),
3193                config,
3194                &self.peer_params,
3195            );
3196
3197            trace!(?max_ack_delay, "ACK_FREQUENCY");
3198
3199            frame::AckFrequency {
3200                sequence: sequence_number,
3201                ack_eliciting_threshold: config.ack_eliciting_threshold,
3202                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3203                reordering_threshold: config.reordering_threshold,
3204            }
3205            .encode(buf);
3206
3207            sent.retransmits.get_or_create().ack_frequency = true;
3208
3209            self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3210            self.stats.frame_tx.ack_frequency += 1;
3211        }
3212
3213        // PATH_CHALLENGE
3214        if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3215            // Transmit challenges with every outgoing frame on an unvalidated path
3216            if let Some(token) = self.path.challenge {
3217                // But only send a packet solely for that purpose at most once
3218                self.path.challenge_pending = false;
3219                sent.non_retransmits = true;
3220                sent.requires_padding = true;
3221                trace!("PATH_CHALLENGE {:08x}", token);
3222                buf.write(frame::FrameType::PATH_CHALLENGE);
3223                buf.write(token);
3224
3225                send_observed_address(
3226                    space_id,
3227                    buf,
3228                    max_size,
3229                    space,
3230                    &mut sent,
3231                    &mut self.stats,
3232                    true,
3233                );
3234            }
3235        }
3236
3237        // PATH_RESPONSE
3238        if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3239            if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3240                sent.non_retransmits = true;
3241                sent.requires_padding = true;
3242                trace!("PATH_RESPONSE {:08x}", token);
3243                buf.write(frame::FrameType::PATH_RESPONSE);
3244                buf.write(token);
3245                self.stats.frame_tx.path_response += 1;
3246
3247                // NOTE: this is technically not required but might be useful to ride the
3248                // request/response nature of path challenges to refresh an observation
3249                // Since PATH_RESPONSE is a probing frame, this is allowed by the spec.
3250                send_observed_address(
3251                    space_id,
3252                    buf,
3253                    max_size,
3254                    space,
3255                    &mut sent,
3256                    &mut self.stats,
3257                    true,
3258                );
3259            }
3260        }
3261
3262        // CRYPTO
3263        while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3264            let mut frame = match space.pending.crypto.pop_front() {
3265                Some(x) => x,
3266                None => break,
3267            };
3268
3269            // Calculate the maximum amount of crypto data we can store in the buffer.
3270            // Since the offset is known, we can reserve the exact size required to encode it.
3271            // For length we reserve 2bytes which allows to encode up to 2^14,
3272            // which is more than what fits into normally sized QUIC frames.
3273            let max_crypto_data_size = max_size
3274                - buf.len()
3275                - 1 // Frame Type
3276                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3277                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
3278
3279            let len = frame
3280                .data
3281                .len()
3282                .min(2usize.pow(14) - 1)
3283                .min(max_crypto_data_size);
3284
3285            let data = frame.data.split_to(len);
3286            let truncated = frame::Crypto {
3287                offset: frame.offset,
3288                data,
3289            };
3290            trace!(
3291                "CRYPTO: off {} len {}",
3292                truncated.offset,
3293                truncated.data.len()
3294            );
3295            truncated.encode(buf);
3296            self.stats.frame_tx.crypto += 1;
3297            sent.retransmits.get_or_create().crypto.push_back(truncated);
3298            if !frame.data.is_empty() {
3299                frame.offset += len as u64;
3300                space.pending.crypto.push_front(frame);
3301            }
3302        }
3303
3304        if space_id == SpaceId::Data {
3305            self.streams.write_control_frames(
3306                buf,
3307                &mut space.pending,
3308                &mut sent.retransmits,
3309                &mut self.stats.frame_tx,
3310                max_size,
3311            );
3312        }
3313
3314        // NEW_CONNECTION_ID
3315        while buf.len() + 44 < max_size {
3316            let issued = match space.pending.new_cids.pop() {
3317                Some(x) => x,
3318                None => break,
3319            };
3320            trace!(
3321                sequence = issued.sequence,
3322                id = %issued.id,
3323                "NEW_CONNECTION_ID"
3324            );
3325            frame::NewConnectionId {
3326                sequence: issued.sequence,
3327                retire_prior_to: self.local_cid_state.retire_prior_to(),
3328                id: issued.id,
3329                reset_token: issued.reset_token,
3330            }
3331            .encode(buf);
3332            sent.retransmits.get_or_create().new_cids.push(issued);
3333            self.stats.frame_tx.new_connection_id += 1;
3334        }
3335
3336        // RETIRE_CONNECTION_ID
3337        while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3338            let seq = match space.pending.retire_cids.pop() {
3339                Some(x) => x,
3340                None => break,
3341            };
3342            trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3343            buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3344            buf.write_var(seq);
3345            sent.retransmits.get_or_create().retire_cids.push(seq);
3346            self.stats.frame_tx.retire_connection_id += 1;
3347        }
3348
3349        // DATAGRAM
3350        let mut sent_datagrams = false;
3351        while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3352            match self.datagrams.write(buf, max_size) {
3353                true => {
3354                    sent_datagrams = true;
3355                    sent.non_retransmits = true;
3356                    self.stats.frame_tx.datagram += 1;
3357                }
3358                false => break,
3359            }
3360        }
3361        if self.datagrams.send_blocked && sent_datagrams {
3362            self.events.push_back(Event::DatagramsUnblocked);
3363            self.datagrams.send_blocked = false;
3364        }
3365
3366        // NEW_TOKEN
3367        while let Some(remote_addr) = space.pending.new_tokens.pop() {
3368            debug_assert_eq!(space_id, SpaceId::Data);
3369            let ConnectionSide::Server { server_config } = &self.side else {
3370                panic!("NEW_TOKEN frames should not be enqueued by clients");
3371            };
3372
3373            if remote_addr != self.path.remote {
3374                // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
3375                // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
3376                // frames upon an path change. Instead, when the new path becomes validated,
3377                // NEW_TOKEN frames may be enqueued for the new path instead.
3378                continue;
3379            }
3380
3381            let token = Token::new(
3382                TokenPayload::Validation {
3383                    ip: remote_addr.ip(),
3384                    issued: server_config.time_source.now(),
3385                },
3386                &mut self.rng,
3387            );
3388            let new_token = NewToken {
3389                token: token.encode(&*server_config.token_key).into(),
3390            };
3391
3392            if buf.len() + new_token.size() >= max_size {
3393                space.pending.new_tokens.push(remote_addr);
3394                break;
3395            }
3396
3397            new_token.encode(buf);
3398            sent.retransmits
3399                .get_or_create()
3400                .new_tokens
3401                .push(remote_addr);
3402            self.stats.frame_tx.new_token += 1;
3403        }
3404
3405        // STREAM
3406        if space_id == SpaceId::Data {
3407            sent.stream_frames =
3408                self.streams
3409                    .write_stream_frames(buf, max_size, self.config.send_fairness);
3410            self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
3411        }
3412
3413        sent
3414    }
3415
3416    /// Write pending ACKs into a buffer
3417    ///
3418    /// This method assumes ACKs are pending, and should only be called if
3419    /// `!PendingAcks::ranges().is_empty()` returns `true`.
3420    fn populate_acks(
3421        now: Instant,
3422        receiving_ecn: bool,
3423        sent: &mut SentFrames,
3424        space: &mut PacketSpace,
3425        buf: &mut Vec<u8>,
3426        stats: &mut ConnectionStats,
3427    ) {
3428        debug_assert!(!space.pending_acks.ranges().is_empty());
3429
3430        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
3431        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
3432        let ecn = if receiving_ecn {
3433            Some(&space.ecn_counters)
3434        } else {
3435            None
3436        };
3437        sent.largest_acked = space.pending_acks.ranges().max();
3438
3439        let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
3440
3441        // TODO: This should come from `TransportConfig` if that gets configurable.
3442        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
3443        let delay = delay_micros >> ack_delay_exp.into_inner();
3444
3445        trace!(
3446            "ACK {:?}, Delay = {}us",
3447            space.pending_acks.ranges(),
3448            delay_micros
3449        );
3450
3451        frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
3452        stats.frame_tx.acks += 1;
3453    }
3454
3455    fn close_common(&mut self) {
3456        trace!("connection closed");
3457        for &timer in &Timer::VALUES {
3458            self.timers.stop(timer);
3459        }
3460    }
3461
3462    fn set_close_timer(&mut self, now: Instant) {
3463        self.timers
3464            .set(Timer::Close, now + 3 * self.pto(self.highest_space));
3465    }
3466
3467    /// Handle transport parameters received from the peer
3468    fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
3469        if Some(self.orig_rem_cid) != params.initial_src_cid
3470            || (self.side.is_client()
3471                && (Some(self.initial_dst_cid) != params.original_dst_cid
3472                    || self.retry_src_cid != params.retry_src_cid))
3473        {
3474            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
3475                "CID authentication failure",
3476            ));
3477        }
3478
3479        self.set_peer_params(params);
3480
3481        Ok(())
3482    }
3483
3484    fn set_peer_params(&mut self, params: TransportParameters) {
3485        self.streams.set_params(&params);
3486        self.idle_timeout =
3487            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
3488        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
3489        if let Some(ref info) = params.preferred_address {
3490            self.rem_cids.insert(frame::NewConnectionId {
3491                sequence: 1,
3492                id: info.connection_id,
3493                reset_token: info.stateless_reset_token,
3494                retire_prior_to: 0,
3495            }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
3496        }
3497        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
3498        self.peer_params = params;
3499        self.path.mtud.on_peer_max_udp_payload_size_received(
3500            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
3501        );
3502    }
3503
3504    fn decrypt_packet(
3505        &mut self,
3506        now: Instant,
3507        packet: &mut Packet,
3508    ) -> Result<Option<u64>, Option<TransportError>> {
3509        let result = packet_crypto::decrypt_packet_body(
3510            packet,
3511            &self.spaces,
3512            self.zero_rtt_crypto.as_ref(),
3513            self.key_phase,
3514            self.prev_crypto.as_ref(),
3515            self.next_crypto.as_ref(),
3516        )?;
3517
3518        let result = match result {
3519            Some(r) => r,
3520            None => return Ok(None),
3521        };
3522
3523        if result.outgoing_key_update_acked {
3524            if let Some(prev) = self.prev_crypto.as_mut() {
3525                prev.end_packet = Some((result.number, now));
3526                self.set_key_discard_timer(now, packet.header.space());
3527            }
3528        }
3529
3530        if result.incoming_key_update {
3531            trace!("key update authenticated");
3532            self.update_keys(Some((result.number, now)), true);
3533            self.set_key_discard_timer(now, packet.header.space());
3534        }
3535
3536        Ok(Some(result.number))
3537    }
3538
3539    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
3540        trace!("executing key update");
3541        // Generate keys for the key phase after the one we're switching to, store them in
3542        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
3543        // `prev_crypto`.
3544        let new = self
3545            .crypto
3546            .next_1rtt_keys()
3547            .expect("only called for `Data` packets");
3548        self.key_phase_size = new
3549            .local
3550            .confidentiality_limit()
3551            .saturating_sub(KEY_UPDATE_MARGIN);
3552        let old = mem::replace(
3553            &mut self.spaces[SpaceId::Data]
3554                .crypto
3555                .as_mut()
3556                .unwrap() // safe because update_keys() can only be triggered by short packets
3557                .packet,
3558            mem::replace(self.next_crypto.as_mut().unwrap(), new),
3559        );
3560        self.spaces[SpaceId::Data].sent_with_keys = 0;
3561        self.prev_crypto = Some(PrevCrypto {
3562            crypto: old,
3563            end_packet,
3564            update_unacked: remote,
3565        });
3566        self.key_phase = !self.key_phase;
3567    }
3568
3569    fn peer_supports_ack_frequency(&self) -> bool {
3570        self.peer_params.min_ack_delay.is_some()
3571    }
3572
3573    /// Send an IMMEDIATE_ACK frame to the remote endpoint
3574    ///
3575    /// According to the spec, this will result in an error if the remote endpoint does not support
3576    /// the Acknowledgement Frequency extension
3577    pub(crate) fn immediate_ack(&mut self) {
3578        self.spaces[self.highest_space].immediate_ack_pending = true;
3579    }
3580
3581    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
3582    #[cfg(test)]
3583    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
3584        let (first_decode, remaining) = match &event.0 {
3585            ConnectionEventInner::Datagram(DatagramConnectionEvent {
3586                first_decode,
3587                remaining,
3588                ..
3589            }) => (first_decode, remaining),
3590            _ => return None,
3591        };
3592
3593        if remaining.is_some() {
3594            panic!("Packets should never be coalesced in tests");
3595        }
3596
3597        let decrypted_header = packet_crypto::unprotect_header(
3598            first_decode.clone(),
3599            &self.spaces,
3600            self.zero_rtt_crypto.as_ref(),
3601            self.peer_params.stateless_reset_token,
3602        )?;
3603
3604        let mut packet = decrypted_header.packet?;
3605        packet_crypto::decrypt_packet_body(
3606            &mut packet,
3607            &self.spaces,
3608            self.zero_rtt_crypto.as_ref(),
3609            self.key_phase,
3610            self.prev_crypto.as_ref(),
3611            self.next_crypto.as_ref(),
3612        )
3613        .ok()?;
3614
3615        Some(packet.payload.to_vec())
3616    }
3617
3618    /// The number of bytes of packets containing retransmittable frames that have not been
3619    /// acknowledged or declared lost.
3620    #[cfg(test)]
3621    pub(crate) fn bytes_in_flight(&self) -> u64 {
3622        self.path.in_flight.bytes
3623    }
3624
3625    /// Number of bytes worth of non-ack-only packets that may be sent
3626    #[cfg(test)]
3627    pub(crate) fn congestion_window(&self) -> u64 {
3628        self.path
3629            .congestion
3630            .window()
3631            .saturating_sub(self.path.in_flight.bytes)
3632    }
3633
3634    /// Whether no timers but keepalive, idle, rtt and pushnewcid are running
3635    #[cfg(test)]
3636    pub(crate) fn is_idle(&self) -> bool {
3637        Timer::VALUES
3638            .iter()
3639            .filter(|&&t| t != Timer::KeepAlive && t != Timer::PushNewCid)
3640            .filter_map(|&t| Some((t, self.timers.get(t)?)))
3641            .min_by_key(|&(_, time)| time)
3642            .map_or(true, |(timer, _)| timer == Timer::Idle)
3643    }
3644
3645    /// Total number of outgoing packets that have been deemed lost
3646    #[cfg(test)]
3647    pub(crate) fn lost_packets(&self) -> u64 {
3648        self.lost_packets
3649    }
3650
3651    /// Whether explicit congestion notification is in use on outgoing packets.
3652    #[cfg(test)]
3653    pub(crate) fn using_ecn(&self) -> bool {
3654        self.path.sending_ecn
3655    }
3656
3657    /// The number of received bytes in the current path
3658    #[cfg(test)]
3659    pub(crate) fn total_recvd(&self) -> u64 {
3660        self.path.total_recvd
3661    }
3662
3663    #[cfg(test)]
3664    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
3665        self.local_cid_state.active_seq()
3666    }
3667
3668    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
3669    /// with updated `retire_prior_to` field set to `v`
3670    #[cfg(test)]
3671    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
3672        let n = self.local_cid_state.assign_retire_seq(v);
3673        self.endpoint_events
3674            .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3675    }
3676
3677    /// Check the current active remote CID sequence
3678    #[cfg(test)]
3679    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
3680        self.rem_cids.active_seq()
3681    }
3682
3683    /// Returns the detected maximum udp payload size for the current path
3684    #[cfg(test)]
3685    pub(crate) fn path_mtu(&self) -> u16 {
3686        self.path.current_mtu()
3687    }
3688
3689    /// Whether we have 1-RTT data to send
3690    ///
3691    /// See also `self.space(SpaceId::Data).can_send()`
3692    fn can_send_1rtt(&self, max_size: usize) -> bool {
3693        self.streams.can_send_stream_data()
3694            || self.path.challenge_pending
3695            || self
3696                .prev_path
3697                .as_ref()
3698                .is_some_and(|(_, x)| x.challenge_pending)
3699            || !self.path_responses.is_empty()
3700            || self
3701                .datagrams
3702                .outgoing
3703                .front()
3704                .is_some_and(|x| x.size(true) <= max_size)
3705    }
3706
3707    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
3708    fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
3709        // Visit known paths from newest to oldest to find the one `pn` was sent on
3710        for path in [&mut self.path]
3711            .into_iter()
3712            .chain(self.prev_path.as_mut().map(|(_, data)| data))
3713        {
3714            if path.remove_in_flight(pn, packet) {
3715                return;
3716            }
3717        }
3718    }
3719
3720    /// Terminate the connection instantly, without sending a close packet
3721    fn kill(&mut self, reason: ConnectionError) {
3722        self.close_common();
3723        self.error = Some(reason);
3724        self.state = State::Drained;
3725        self.endpoint_events.push_back(EndpointEventInner::Drained);
3726    }
3727
3728    /// Storage size required for the largest packet known to be supported by the current path
3729    ///
3730    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
3731    pub fn current_mtu(&self) -> u16 {
3732        self.path.current_mtu()
3733    }
3734
3735    /// Size of non-frame data for a 1-RTT packet
3736    ///
3737    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
3738    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
3739    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
3740    /// latency and packet loss.
3741    fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
3742        let pn_len = match pn {
3743            Some(pn) => PacketNumber::new(
3744                pn,
3745                self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
3746            )
3747            .len(),
3748            // Upper bound
3749            None => 4,
3750        };
3751
3752        // 1 byte for flags
3753        1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
3754    }
3755
3756    fn tag_len_1rtt(&self) -> usize {
3757        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
3758            Some(crypto) => Some(&*crypto.packet.local),
3759            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
3760        };
3761        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
3762        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
3763        // but that would needlessly prevent sending datagrams during 0-RTT.
3764        key.map_or(16, |x| x.tag_len())
3765    }
3766
3767    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
3768    fn on_path_validated(&mut self) {
3769        self.path.validated = true;
3770        let ConnectionSide::Server { server_config } = &self.side else {
3771            return;
3772        };
3773        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
3774        new_tokens.clear();
3775        for _ in 0..server_config.validation_token.sent {
3776            new_tokens.push(self.path.remote);
3777        }
3778    }
3779}
3780
3781impl fmt::Debug for Connection {
3782    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3783        f.debug_struct("Connection")
3784            .field("handshake_cid", &self.handshake_cid)
3785            .finish()
3786    }
3787}
3788
3789/// Fields of `Connection` specific to it being client-side or server-side
3790enum ConnectionSide {
3791    Client {
3792        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
3793        token: Bytes,
3794        token_store: Arc<dyn TokenStore>,
3795        server_name: String,
3796    },
3797    Server {
3798        server_config: Arc<ServerConfig>,
3799    },
3800}
3801
3802impl ConnectionSide {
3803    fn remote_may_migrate(&self) -> bool {
3804        match self {
3805            Self::Server { server_config } => server_config.migration,
3806            Self::Client { .. } => false,
3807        }
3808    }
3809
3810    fn is_client(&self) -> bool {
3811        self.side().is_client()
3812    }
3813
3814    fn is_server(&self) -> bool {
3815        self.side().is_server()
3816    }
3817
3818    fn side(&self) -> Side {
3819        match *self {
3820            Self::Client { .. } => Side::Client,
3821            Self::Server { .. } => Side::Server,
3822        }
3823    }
3824}
3825
3826impl From<SideArgs> for ConnectionSide {
3827    fn from(side: SideArgs) -> Self {
3828        match side {
3829            SideArgs::Client {
3830                token_store,
3831                server_name,
3832            } => Self::Client {
3833                token: token_store.take(&server_name).unwrap_or_default(),
3834                token_store,
3835                server_name,
3836            },
3837            SideArgs::Server {
3838                server_config,
3839                pref_addr_cid: _,
3840                path_validated: _,
3841            } => Self::Server { server_config },
3842        }
3843    }
3844}
3845
3846/// Parameters to `Connection::new` specific to it being client-side or server-side
3847pub(crate) enum SideArgs {
3848    Client {
3849        token_store: Arc<dyn TokenStore>,
3850        server_name: String,
3851    },
3852    Server {
3853        server_config: Arc<ServerConfig>,
3854        pref_addr_cid: Option<ConnectionId>,
3855        path_validated: bool,
3856    },
3857}
3858
3859impl SideArgs {
3860    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
3861        match *self {
3862            Self::Client { .. } => None,
3863            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
3864        }
3865    }
3866
3867    pub(crate) fn path_validated(&self) -> bool {
3868        match *self {
3869            Self::Client { .. } => true,
3870            Self::Server { path_validated, .. } => path_validated,
3871        }
3872    }
3873
3874    pub(crate) fn side(&self) -> Side {
3875        match *self {
3876            Self::Client { .. } => Side::Client,
3877            Self::Server { .. } => Side::Server,
3878        }
3879    }
3880}
3881
3882/// Reasons why a connection might be lost
3883#[derive(Debug, Error, Clone, PartialEq, Eq)]
3884pub enum ConnectionError {
3885    /// The peer doesn't implement any supported version
3886    #[error("peer doesn't implement any supported version")]
3887    VersionMismatch,
3888    /// The peer violated the QUIC specification as understood by this implementation
3889    #[error(transparent)]
3890    TransportError(#[from] TransportError),
3891    /// The peer's QUIC stack aborted the connection automatically
3892    #[error("aborted by peer: {0}")]
3893    ConnectionClosed(frame::ConnectionClose),
3894    /// The peer closed the connection
3895    #[error("closed by peer: {0}")]
3896    ApplicationClosed(frame::ApplicationClose),
3897    /// The peer is unable to continue processing this connection, usually due to having restarted
3898    #[error("reset by peer")]
3899    Reset,
3900    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
3901    ///
3902    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
3903    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
3904    /// and [`TransportConfig::keep_alive_interval()`].
3905    #[error("timed out")]
3906    TimedOut,
3907    /// The local application closed the connection
3908    #[error("closed")]
3909    LocallyClosed,
3910    /// The connection could not be created because not enough of the CID space is available
3911    ///
3912    /// Try using longer connection IDs.
3913    #[error("CIDs exhausted")]
3914    CidsExhausted,
3915}
3916
3917impl From<Close> for ConnectionError {
3918    fn from(x: Close) -> Self {
3919        match x {
3920            Close::Connection(reason) => Self::ConnectionClosed(reason),
3921            Close::Application(reason) => Self::ApplicationClosed(reason),
3922        }
3923    }
3924}
3925
3926// For compatibility with API consumers
3927impl From<ConnectionError> for io::Error {
3928    fn from(x: ConnectionError) -> Self {
3929        use ConnectionError::*;
3930        let kind = match x {
3931            TimedOut => io::ErrorKind::TimedOut,
3932            Reset => io::ErrorKind::ConnectionReset,
3933            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
3934            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
3935                io::ErrorKind::Other
3936            }
3937        };
3938        Self::new(kind, x)
3939    }
3940}
3941
3942#[allow(unreachable_pub)] // fuzzing only
3943#[derive(Clone)]
3944pub enum State {
3945    Handshake(state::Handshake),
3946    Established,
3947    Closed(state::Closed),
3948    Draining,
3949    /// Waiting for application to call close so we can dispose of the resources
3950    Drained,
3951}
3952
3953impl State {
3954    fn closed<R: Into<Close>>(reason: R) -> Self {
3955        Self::Closed(state::Closed {
3956            reason: reason.into(),
3957        })
3958    }
3959
3960    fn is_handshake(&self) -> bool {
3961        matches!(*self, Self::Handshake(_))
3962    }
3963
3964    fn is_established(&self) -> bool {
3965        matches!(*self, Self::Established)
3966    }
3967
3968    fn is_closed(&self) -> bool {
3969        matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
3970    }
3971
3972    fn is_drained(&self) -> bool {
3973        matches!(*self, Self::Drained)
3974    }
3975}
3976
3977mod state {
3978    use super::*;
3979
3980    #[allow(unreachable_pub)] // fuzzing only
3981    #[derive(Clone)]
3982    pub struct Handshake {
3983        /// Whether the remote CID has been set by the peer yet
3984        ///
3985        /// Always set for servers
3986        pub(super) rem_cid_set: bool,
3987        /// Stateless retry token received in the first Initial by a server.
3988        ///
3989        /// Must be present in every Initial. Always empty for clients.
3990        pub(super) expected_token: Bytes,
3991        /// First cryptographic message
3992        ///
3993        /// Only set for clients
3994        pub(super) client_hello: Option<Bytes>,
3995    }
3996
3997    #[allow(unreachable_pub)] // fuzzing only
3998    #[derive(Clone)]
3999    pub struct Closed {
4000        pub(super) reason: Close,
4001    }
4002}
4003
4004/// Events of interest to the application
4005#[derive(Debug)]
4006pub enum Event {
4007    /// The connection's handshake data is ready
4008    HandshakeDataReady,
4009    /// The connection was successfully established
4010    Connected,
4011    /// The connection was lost
4012    ///
4013    /// Emitted if the peer closes the connection or an error is encountered.
4014    ConnectionLost {
4015        /// Reason that the connection was closed
4016        reason: ConnectionError,
4017    },
4018    /// Stream events
4019    Stream(StreamEvent),
4020    /// One or more application datagrams have been received
4021    DatagramReceived,
4022    /// One or more application datagrams have been sent after blocking
4023    DatagramsUnblocked,
4024    /// Received an observation of our external address from the peer.
4025    ObservedAddr(SocketAddr),
4026}
4027
4028fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
4029    if x > y {
4030        x - y
4031    } else {
4032        Duration::ZERO
4033    }
4034}
4035
4036fn get_max_ack_delay(params: &TransportParameters) -> Duration {
4037    Duration::from_micros(params.max_ack_delay.0 * 1000)
4038}
4039
4040// Prevents overflow and improves behavior in extreme circumstances
4041const MAX_BACKOFF_EXPONENT: u32 = 16;
4042
4043/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
4044///
4045/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
4046/// plus some space for frames. We only care about handshake headers because short header packets
4047/// necessarily have smaller headers, and initial packets are only ever the first packet in a
4048/// datagram (because we coalesce in ascending packet space order and the only reason to split a
4049/// packet is when packet space changes).
4050const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
4051
4052/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
4053///
4054/// Excludes packet-type-specific fields such as packet number or Initial token
4055// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
4056// scid len + scid + length + pn
4057const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
4058    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
4059
4060/// The maximum amount of datagrams that are sent in a single transmit
4061///
4062/// This can be lower than the maximum platform capabilities, to avoid excessive
4063/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
4064/// that numbers around 10 are a good compromise.
4065const MAX_TRANSMIT_SEGMENTS: usize = 10;
4066
4067/// Perform key updates this many packets before the AEAD confidentiality limit.
4068///
4069/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
4070const KEY_UPDATE_MARGIN: u64 = 10_000;
4071
4072#[derive(Default)]
4073struct SentFrames {
4074    retransmits: ThinRetransmits,
4075    largest_acked: Option<u64>,
4076    stream_frames: StreamMetaVec,
4077    /// Whether the packet contains non-retransmittable frames (like datagrams)
4078    non_retransmits: bool,
4079    requires_padding: bool,
4080}
4081
4082impl SentFrames {
4083    /// Returns whether the packet contains only ACKs
4084    fn is_ack_only(&self, streams: &StreamsState) -> bool {
4085        self.largest_acked.is_some()
4086            && !self.non_retransmits
4087            && self.stream_frames.is_empty()
4088            && self.retransmits.is_empty(streams)
4089    }
4090}
4091
4092/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
4093///
4094/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
4095///
4096/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
4097///
4098/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
4099fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
4100    match (x, y) {
4101        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
4102        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
4103        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
4104        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
4105    }
4106}
4107
4108#[cfg(test)]
4109mod tests {
4110    use super::*;
4111
4112    #[test]
4113    fn negotiate_max_idle_timeout_commutative() {
4114        let test_params = [
4115            (None, None, None),
4116            (None, Some(VarInt(0)), None),
4117            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
4118            (Some(VarInt(0)), Some(VarInt(0)), None),
4119            (
4120                Some(VarInt(2)),
4121                Some(VarInt(0)),
4122                Some(Duration::from_millis(2)),
4123            ),
4124            (
4125                Some(VarInt(1)),
4126                Some(VarInt(4)),
4127                Some(Duration::from_millis(1)),
4128            ),
4129        ];
4130
4131        for (left, right, result) in test_params {
4132            assert_eq!(negotiate_max_idle_timeout(left, right), result);
4133            assert_eq!(negotiate_max_idle_timeout(right, left), result);
4134        }
4135    }
4136}