quinn_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::VecDeque,
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12
13use rand::{Rng, SeedableRng, rngs::StdRng};
14use thiserror::Error;
15use tracing::{debug, error, trace, trace_span, warn};
16
17use crate::{
18    Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
19    MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
20    TransportErrorCode, VarInt,
21    cid_generator::ConnectionIdGenerator,
22    cid_queue::CidQueue,
23    coding::BufMutExt,
24    config::{ServerConfig, TransportConfig},
25    crypto::{self, KeyPair, Keys, PacketKey},
26    frame::{self, Close, Datagram, FrameStruct, NewConnectionId, NewToken},
27    packet::{
28        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
29        PacketNumber, PartialDecode, SpaceId,
30    },
31    range_set::ArrayRangeSet,
32    shared::{
33        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
34        EndpointEvent, EndpointEventInner,
35    },
36    token::{ResetToken, Token, TokenPayload},
37    transport_parameters::TransportParameters,
38};
39
40mod ack_frequency;
41use ack_frequency::AckFrequencyState;
42
43mod assembler;
44pub use assembler::Chunk;
45
46mod cid_state;
47use cid_state::CidState;
48
49mod datagrams;
50use datagrams::DatagramState;
51pub use datagrams::{Datagrams, SendDatagramError};
52
53mod mtud;
54mod pacing;
55
56mod packet_builder;
57use packet_builder::PacketBuilder;
58
59mod packet_crypto;
60use packet_crypto::{PrevCrypto, ZeroRttCrypto};
61
62mod paths;
63pub use paths::RttEstimator;
64use paths::{PathData, PathResponses};
65
66pub(crate) mod qlog;
67
68mod send_buffer;
69
70mod spaces;
71#[cfg(fuzzing)]
72pub use spaces::Retransmits;
73#[cfg(not(fuzzing))]
74use spaces::Retransmits;
75use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
76
77mod stats;
78pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
79
80mod streams;
81#[cfg(fuzzing)]
82pub use streams::StreamsState;
83#[cfg(not(fuzzing))]
84use streams::StreamsState;
85pub use streams::{
86    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
87    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
88};
89
90mod timer;
91use crate::congestion::Controller;
92use timer::{Timer, TimerTable};
93
94/// Protocol state and logic for a single QUIC connection
95///
96/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
97/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
98/// expects timeouts through various methods. A number of simple getter methods are exposed
99/// to allow callers to inspect some of the connection state.
100///
101/// `Connection` has roughly 4 types of methods:
102///
103/// - A. Simple getters, taking `&self`
104/// - B. Handlers for incoming events from the network or system, named `handle_*`.
105/// - C. State machine mutators, for incoming commands from the application. For convenience we
106///   refer to this as "performing I/O" below, however as per the design of this library none of the
107///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
108///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
109/// - D. Polling functions for outgoing events or actions for the caller to
110///   take, named `poll_*`.
111///
112/// The simplest way to use this API correctly is to call (B) and (C) whenever
113/// appropriate, then after each of those calls, as soon as feasible call all
114/// polling methods (D) and deal with their outputs appropriately, e.g. by
115/// passing it to the application or by making a system-level I/O call. You
116/// should call the polling functions in this order:
117///
118/// 1. [`poll_transmit`](Self::poll_transmit)
119/// 2. [`poll_timeout`](Self::poll_timeout)
120/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
121/// 4. [`poll`](Self::poll)
122///
123/// Currently the only actual dependency is from (2) to (1), however additional
124/// dependencies may be added in future, so the above order is recommended.
125///
126/// (A) may be called whenever desired.
127///
128/// Care should be made to ensure that the input events represent monotonically
129/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
130/// with events of the same [`Instant`] may be interleaved in any order with a
131/// call to [`handle_event`](Self::handle_event) at that same instant; however
132/// events or timeouts with different instants must not be interleaved.
133pub struct Connection {
134    endpoint_config: Arc<EndpointConfig>,
135    config: Arc<TransportConfig>,
136    rng: StdRng,
137    crypto: Box<dyn crypto::Session>,
138    /// The CID we initially chose, for use during the handshake
139    handshake_cid: ConnectionId,
140    /// The CID the peer initially chose, for use during the handshake
141    rem_handshake_cid: ConnectionId,
142    /// The "real" local IP address which was was used to receive the initial packet.
143    /// This is only populated for the server case, and if known
144    local_ip: Option<IpAddr>,
145    path: PathData,
146    /// Incremented every time we see a new path
147    ///
148    /// Stored separately from `path.generation` to account for aborted migrations
149    path_counter: u64,
150    /// Whether MTU detection is supported in this environment
151    allow_mtud: bool,
152    prev_path: Option<(ConnectionId, PathData)>,
153    state: State,
154    side: ConnectionSide,
155    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
156    zero_rtt_enabled: bool,
157    /// Set if 0-RTT is supported, then cleared when no longer needed.
158    zero_rtt_crypto: Option<ZeroRttCrypto>,
159    key_phase: bool,
160    /// How many packets are in the current key phase. Used only for `Data` space.
161    key_phase_size: u64,
162    /// Transport parameters set by the peer
163    peer_params: TransportParameters,
164    /// Source ConnectionId of the first packet received from the peer
165    orig_rem_cid: ConnectionId,
166    /// Destination ConnectionId sent by the client on the first Initial
167    initial_dst_cid: ConnectionId,
168    /// The value that the server included in the Source Connection ID field of a Retry packet, if
169    /// one was received
170    retry_src_cid: Option<ConnectionId>,
171    events: VecDeque<Event>,
172    endpoint_events: VecDeque<EndpointEventInner>,
173    /// Whether the spin bit is in use for this connection
174    spin_enabled: bool,
175    /// Outgoing spin bit state
176    spin: bool,
177    /// Packet number spaces: initial, handshake, 1-RTT
178    spaces: [PacketSpace; 3],
179    /// Highest usable packet number space
180    highest_space: SpaceId,
181    /// 1-RTT keys used prior to a key update
182    prev_crypto: Option<PrevCrypto>,
183    /// 1-RTT keys to be used for the next key update
184    ///
185    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
186    /// spoofing key updates.
187    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
188    accepted_0rtt: bool,
189    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
190    permit_idle_reset: bool,
191    /// Negotiated idle timeout
192    idle_timeout: Option<Duration>,
193    timers: TimerTable,
194    /// Number of packets received which could not be authenticated
195    authentication_failures: u64,
196    /// Why the connection was lost, if it has been
197    error: Option<ConnectionError>,
198    /// Identifies Data-space packet numbers to skip. Not used in earlier spaces.
199    packet_number_filter: PacketNumberFilter,
200
201    //
202    // Queued non-retransmittable 1-RTT data
203    //
204    /// Responses to PATH_CHALLENGE frames
205    path_responses: PathResponses,
206    close: bool,
207
208    //
209    // ACK frequency
210    //
211    ack_frequency: AckFrequencyState,
212
213    //
214    // Loss Detection
215    //
216    /// The number of times a PTO has been sent without receiving an ack.
217    pto_count: u32,
218
219    //
220    // Congestion Control
221    //
222    /// Whether the most recently received packet had an ECN codepoint set
223    receiving_ecn: bool,
224    /// Number of packets authenticated
225    total_authed_packets: u64,
226    /// Whether the last `poll_transmit` call yielded no data because there was
227    /// no outgoing application data.
228    app_limited: bool,
229
230    streams: StreamsState,
231    /// Surplus remote CIDs for future use on new paths
232    rem_cids: CidQueue,
233    // Attributes of CIDs generated by local peer
234    local_cid_state: CidState,
235    /// State of the unreliable datagram extension
236    datagrams: DatagramState,
237    /// Connection level statistics
238    stats: ConnectionStats,
239    /// QUIC version used for the connection.
240    version: u32,
241}
242
243impl Connection {
244    pub(crate) fn new(
245        endpoint_config: Arc<EndpointConfig>,
246        config: Arc<TransportConfig>,
247        init_cid: ConnectionId,
248        loc_cid: ConnectionId,
249        rem_cid: ConnectionId,
250        remote: SocketAddr,
251        local_ip: Option<IpAddr>,
252        crypto: Box<dyn crypto::Session>,
253        cid_gen: &dyn ConnectionIdGenerator,
254        now: Instant,
255        version: u32,
256        allow_mtud: bool,
257        rng_seed: [u8; 32],
258        side_args: SideArgs,
259    ) -> Self {
260        let pref_addr_cid = side_args.pref_addr_cid();
261        let path_validated = side_args.path_validated();
262        let connection_side = ConnectionSide::from(side_args);
263        let side = connection_side.side();
264        let initial_space = PacketSpace {
265            crypto: Some(crypto.initial_keys(&init_cid, side)),
266            ..PacketSpace::new(now)
267        };
268        let state = State::Handshake(state::Handshake {
269            rem_cid_set: side.is_server(),
270            expected_token: Bytes::new(),
271            client_hello: None,
272        });
273        let mut rng = StdRng::from_seed(rng_seed);
274        let mut this = Self {
275            endpoint_config,
276            crypto,
277            handshake_cid: loc_cid,
278            rem_handshake_cid: rem_cid,
279            local_cid_state: CidState::new(
280                cid_gen.cid_len(),
281                cid_gen.cid_lifetime(),
282                now,
283                if pref_addr_cid.is_some() { 2 } else { 1 },
284            ),
285            path: PathData::new(remote, allow_mtud, None, 0, now, &config),
286            path_counter: 0,
287            allow_mtud,
288            local_ip,
289            prev_path: None,
290            state,
291            side: connection_side,
292            zero_rtt_enabled: false,
293            zero_rtt_crypto: None,
294            key_phase: false,
295            // A small initial key phase size ensures peers that don't handle key updates correctly
296            // fail sooner rather than later. It's okay for both peers to do this, as the first one
297            // to perform an update will reset the other's key phase size in `update_keys`, and a
298            // simultaneous key update by both is just like a regular key update with a really fast
299            // response. Inspired by quic-go's similar behavior of performing the first key update
300            // at the 100th short-header packet.
301            key_phase_size: rng.random_range(10..1000),
302            peer_params: TransportParameters::default(),
303            orig_rem_cid: rem_cid,
304            initial_dst_cid: init_cid,
305            retry_src_cid: None,
306            events: VecDeque::new(),
307            endpoint_events: VecDeque::new(),
308            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
309            spin: false,
310            spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
311            highest_space: SpaceId::Initial,
312            prev_crypto: None,
313            next_crypto: None,
314            accepted_0rtt: false,
315            permit_idle_reset: true,
316            idle_timeout: match config.max_idle_timeout {
317                None | Some(VarInt(0)) => None,
318                Some(dur) => Some(Duration::from_millis(dur.0)),
319            },
320            timers: TimerTable::default(),
321            authentication_failures: 0,
322            error: None,
323            #[cfg(test)]
324            packet_number_filter: match config.deterministic_packet_numbers {
325                false => PacketNumberFilter::new(&mut rng),
326                true => PacketNumberFilter::disabled(),
327            },
328            #[cfg(not(test))]
329            packet_number_filter: PacketNumberFilter::new(&mut rng),
330
331            path_responses: PathResponses::default(),
332            close: false,
333
334            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
335                &TransportParameters::default(),
336            )),
337
338            pto_count: 0,
339
340            app_limited: false,
341            receiving_ecn: false,
342            total_authed_packets: 0,
343
344            streams: StreamsState::new(
345                side,
346                config.max_concurrent_uni_streams,
347                config.max_concurrent_bidi_streams,
348                config.send_window,
349                config.receive_window,
350                config.stream_receive_window,
351            ),
352            datagrams: DatagramState::default(),
353            config,
354            rem_cids: CidQueue::new(rem_cid),
355            rng,
356            stats: ConnectionStats::default(),
357            version,
358        };
359        if path_validated {
360            this.on_path_validated();
361        }
362        if side.is_client() {
363            // Kick off the connection
364            this.write_crypto();
365            this.init_0rtt();
366        }
367        this
368    }
369
370    /// Returns the next time at which `handle_timeout` should be called
371    ///
372    /// The value returned may change after:
373    /// - the application performed some I/O on the connection
374    /// - a call was made to `handle_event`
375    /// - a call to `poll_transmit` returned `Some`
376    /// - a call was made to `handle_timeout`
377    #[must_use]
378    pub fn poll_timeout(&mut self) -> Option<Instant> {
379        self.timers.next_timeout()
380    }
381
382    /// Returns application-facing events
383    ///
384    /// Connections should be polled for events after:
385    /// - a call was made to `handle_event`
386    /// - a call was made to `handle_timeout`
387    #[must_use]
388    pub fn poll(&mut self) -> Option<Event> {
389        if let Some(x) = self.events.pop_front() {
390            return Some(x);
391        }
392
393        if let Some(event) = self.streams.poll() {
394            return Some(Event::Stream(event));
395        }
396
397        if let Some(err) = self.error.take() {
398            return Some(Event::ConnectionLost { reason: err });
399        }
400
401        None
402    }
403
404    /// Return endpoint-facing events
405    #[must_use]
406    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
407        self.endpoint_events.pop_front().map(EndpointEvent)
408    }
409
410    /// Provide control over streams
411    #[must_use]
412    pub fn streams(&mut self) -> Streams<'_> {
413        Streams {
414            state: &mut self.streams,
415            conn_state: &self.state,
416        }
417    }
418
419    /// Provide control over streams
420    #[must_use]
421    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
422        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
423        RecvStream {
424            id,
425            state: &mut self.streams,
426            pending: &mut self.spaces[SpaceId::Data].pending,
427        }
428    }
429
430    /// Provide control over streams
431    #[must_use]
432    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
433        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
434        SendStream {
435            id,
436            state: &mut self.streams,
437            pending: &mut self.spaces[SpaceId::Data].pending,
438            conn_state: &self.state,
439        }
440    }
441
442    /// Returns packets to transmit
443    ///
444    /// Connections should be polled for transmit after:
445    /// - the application performed some I/O on the connection
446    /// - a call was made to `handle_event`
447    /// - a call was made to `handle_timeout`
448    ///
449    /// `max_datagrams` specifies how many datagrams can be returned inside a
450    /// single Transmit using GSO. This must be at least 1.
451    #[must_use]
452    pub fn poll_transmit(
453        &mut self,
454        now: Instant,
455        max_datagrams: usize,
456        buf: &mut Vec<u8>,
457    ) -> Option<Transmit> {
458        assert!(max_datagrams != 0);
459        let max_datagrams = match self.config.enable_segmentation_offload {
460            false => 1,
461            true => max_datagrams,
462        };
463
464        let mut num_datagrams = 0;
465        // Position in `buf` of the first byte of the current UDP datagram. When coalescing QUIC
466        // packets, this can be earlier than the start of the current QUIC packet.
467        let mut datagram_start = 0;
468        let mut segment_size = usize::from(self.path.current_mtu());
469
470        if let Some(challenge) = self.send_path_challenge(now, buf) {
471            return Some(challenge);
472        }
473
474        // If we need to send a probe, make sure we have something to send.
475        for space in SpaceId::iter() {
476            let request_immediate_ack =
477                space == SpaceId::Data && self.peer_supports_ack_frequency();
478            self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
479        }
480
481        // Check whether we need to send a close message
482        let close = match self.state {
483            State::Drained => {
484                self.app_limited = true;
485                return None;
486            }
487            State::Draining | State::Closed(_) => {
488                // self.close is only reset once the associated packet had been
489                // encoded successfully
490                if !self.close {
491                    self.app_limited = true;
492                    return None;
493                }
494                true
495            }
496            _ => false,
497        };
498
499        // Check whether we need to send an ACK_FREQUENCY frame
500        if let Some(config) = &self.config.ack_frequency_config {
501            self.spaces[SpaceId::Data].pending.ack_frequency = self
502                .ack_frequency
503                .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
504                && self.highest_space == SpaceId::Data
505                && self.peer_supports_ack_frequency();
506        }
507
508        // Reserving capacity can provide more capacity than we asked for. However, we are not
509        // allowed to write more than `segment_size`. Therefore the maximum capacity is tracked
510        // separately.
511        let mut buf_capacity = 0;
512
513        let mut coalesce = true;
514        let mut builder_storage: Option<PacketBuilder> = None;
515        let mut sent_frames = None;
516        let mut pad_datagram = false;
517        let mut pad_datagram_to_mtu = false;
518        let mut congestion_blocked = false;
519
520        // Iterate over all spaces and find data to send
521        let mut space_idx = 0;
522        let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
523        // This loop will potentially spend multiple iterations in the same `SpaceId`,
524        // so we cannot trivially rewrite it to take advantage of `SpaceId::iter()`.
525        while space_idx < spaces.len() {
526            let space_id = spaces[space_idx];
527            // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed to
528            // be able to send an individual frame at least this large in the next 1-RTT
529            // packet. This could be generalized to support every space, but it's only needed to
530            // handle large fixed-size frames, which only exist in 1-RTT (application datagrams). We
531            // don't account for coalesced packets potentially occupying space because frames can
532            // always spill into the next datagram.
533            let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
534            let frame_space_1rtt =
535                segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
536
537            // Is there data or a close message to send in this space?
538            let can_send = self.space_can_send(space_id, frame_space_1rtt);
539            if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
540                space_idx += 1;
541                continue;
542            }
543
544            let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
545                || self.spaces[space_id].ping_pending
546                || self.spaces[space_id].immediate_ack_pending;
547            if space_id == SpaceId::Data {
548                ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
549            }
550
551            pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
552
553            // Can we append more data into the current buffer?
554            // It is not safe to assume that `buf.len()` is the end of the data,
555            // since the last packet might not have been finished.
556            let buf_end = if let Some(builder) = &builder_storage {
557                buf.len().max(builder.min_size) + builder.tag_len
558            } else {
559                buf.len()
560            };
561
562            let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
563                crypto.packet.local.tag_len()
564            } else if space_id == SpaceId::Data {
565                self.zero_rtt_crypto.as_ref().expect(
566                    "sending packets in the application data space requires known 0-RTT or 1-RTT keys",
567                ).packet.tag_len()
568            } else {
569                unreachable!("tried to send {:?} packet without keys", space_id)
570            };
571            if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
572                // We need to send 1 more datagram and extend the buffer for that.
573
574                // Is 1 more datagram allowed?
575                if num_datagrams >= max_datagrams {
576                    // No more datagrams allowed
577                    break;
578                }
579
580                // Anti-amplification is only based on `total_sent`, which gets
581                // updated at the end of this method. Therefore we pass the amount
582                // of bytes for datagrams that are already created, as well as 1 byte
583                // for starting another datagram. If there is any anti-amplification
584                // budget left, we always allow a full MTU to be sent
585                // (see https://github.com/quinn-rs/quinn/issues/1082)
586                if self
587                    .path
588                    .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
589                {
590                    trace!("blocked by anti-amplification");
591                    break;
592                }
593
594                // Congestion control and pacing checks
595                // Tail loss probes must not be blocked by congestion, or a deadlock could arise
596                if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
597                    // Assume the current packet will get padded to fill the segment
598                    let untracked_bytes = if let Some(builder) = &builder_storage {
599                        buf_capacity - builder.partial_encode.start
600                    } else {
601                        0
602                    } as u64;
603                    debug_assert!(untracked_bytes <= segment_size as u64);
604
605                    let bytes_to_send = segment_size as u64 + untracked_bytes;
606                    if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
607                        space_idx += 1;
608                        congestion_blocked = true;
609                        // We continue instead of breaking here in order to avoid
610                        // blocking loss probes queued for higher spaces.
611                        trace!("blocked by congestion control");
612                        continue;
613                    }
614
615                    // Check whether the next datagram is blocked by pacing
616                    let smoothed_rtt = self.path.rtt.get();
617                    if let Some(delay) = self.path.pacing.delay(
618                        smoothed_rtt,
619                        bytes_to_send,
620                        self.path.current_mtu(),
621                        self.path.congestion.window(),
622                        now,
623                    ) {
624                        self.timers.set(Timer::Pacing, delay);
625                        congestion_blocked = true;
626                        // Loss probes should be subject to pacing, even though
627                        // they are not congestion controlled.
628                        trace!("blocked by pacing");
629                        break;
630                    }
631                }
632
633                // Finish current packet
634                if let Some(mut builder) = builder_storage.take() {
635                    if pad_datagram {
636                        builder.pad_to(MIN_INITIAL_SIZE);
637                    }
638
639                    if num_datagrams > 1 || pad_datagram_to_mtu {
640                        // If too many padding bytes would be required to continue the GSO batch
641                        // after this packet, end the GSO batch here. Ensures that fixed-size frames
642                        // with heterogeneous sizes (e.g. application datagrams) won't inadvertently
643                        // waste large amounts of bandwidth. The exact threshold is a bit arbitrary
644                        // and might benefit from further tuning, though there's no universally
645                        // optimal value.
646                        //
647                        // Additionally, if this datagram is a loss probe and `segment_size` is
648                        // larger than `INITIAL_MTU`, then padding it to `segment_size` to continue
649                        // the GSO batch would risk failure to recover from a reduction in path
650                        // MTU. Loss probes are the only packets for which we might grow
651                        // `buf_capacity` by less than `segment_size`.
652                        const MAX_PADDING: usize = 16;
653                        let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
654                            - datagram_start
655                            + builder.tag_len;
656                        if (packet_len_unpadded + MAX_PADDING < segment_size
657                            && !pad_datagram_to_mtu)
658                            || datagram_start + segment_size > buf_capacity
659                        {
660                            trace!(
661                                "GSO truncated by demand for {} padding bytes or loss probe",
662                                segment_size - packet_len_unpadded
663                            );
664                            builder_storage = Some(builder);
665                            break;
666                        }
667
668                        // Pad the current datagram to GSO segment size so it can be included in the
669                        // GSO batch.
670                        builder.pad_to(segment_size as u16);
671                    }
672
673                    builder.finish_and_track(now, self, sent_frames.take(), buf);
674
675                    if num_datagrams == 1 {
676                        // Set the segment size for this GSO batch to the size of the first UDP
677                        // datagram in the batch. Larger data that cannot be fragmented
678                        // (e.g. application datagrams) will be included in a future batch. When
679                        // sending large enough volumes of data for GSO to be useful, we expect
680                        // packet sizes to usually be consistent, e.g. populated by max-size STREAM
681                        // frames or uniformly sized datagrams.
682                        segment_size = buf.len();
683                        // Clip the unused capacity out of the buffer so future packets don't
684                        // overrun
685                        buf_capacity = buf.len();
686
687                        // Check whether the data we planned to send will fit in the reduced segment
688                        // size. If not, bail out and leave it for the next GSO batch so we don't
689                        // end up trying to send an empty packet. We can't easily compute the right
690                        // segment size before the original call to `space_can_send`, because at
691                        // that time we haven't determined whether we're going to coalesce with the
692                        // first datagram or potentially pad it to `MIN_INITIAL_SIZE`.
693                        if space_id == SpaceId::Data {
694                            let frame_space_1rtt =
695                                segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
696                            if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
697                                break;
698                            }
699                        }
700                    }
701                }
702
703                // Allocate space for another datagram
704                let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
705                    0 => segment_size,
706                    _ => {
707                        self.spaces[space_id].loss_probes -= 1;
708                        // Clamp the datagram to at most the minimum MTU to ensure that loss probes
709                        // can get through and enable recovery even if the path MTU has shrank
710                        // unexpectedly.
711                        std::cmp::min(segment_size, usize::from(INITIAL_MTU))
712                    }
713                };
714                buf_capacity += next_datagram_size_limit;
715                if buf.capacity() < buf_capacity {
716                    // We reserve the maximum space for sending `max_datagrams` upfront
717                    // to avoid any reallocations if more datagrams have to be appended later on.
718                    // Benchmarks have shown shown a 5-10% throughput improvement
719                    // compared to continuously resizing the datagram buffer.
720                    // While this will lead to over-allocation for small transmits
721                    // (e.g. purely containing ACKs), modern memory allocators
722                    // (e.g. mimalloc and jemalloc) will pool certain allocation sizes
723                    // and therefore this is still rather efficient.
724                    buf.reserve(max_datagrams * segment_size);
725                }
726                num_datagrams += 1;
727                coalesce = true;
728                pad_datagram = false;
729                datagram_start = buf.len();
730
731                debug_assert_eq!(
732                    datagram_start % segment_size,
733                    0,
734                    "datagrams in a GSO batch must be aligned to the segment size"
735                );
736            } else {
737                // We can append/coalesce the next packet into the current
738                // datagram.
739                // Finish current packet without adding extra padding
740                if let Some(builder) = builder_storage.take() {
741                    builder.finish_and_track(now, self, sent_frames.take(), buf);
742                }
743            }
744
745            debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
746
747            //
748            // From here on, we've determined that a packet will definitely be sent.
749            //
750
751            if self.spaces[SpaceId::Initial].crypto.is_some()
752                && space_id == SpaceId::Handshake
753                && self.side.is_client()
754            {
755                // A client stops both sending and processing Initial packets when it
756                // sends its first Handshake packet.
757                self.discard_space(now, SpaceId::Initial);
758            }
759            if let Some(ref mut prev) = self.prev_crypto {
760                prev.update_unacked = false;
761            }
762
763            debug_assert!(
764                builder_storage.is_none() && sent_frames.is_none(),
765                "Previous packet must have been finished"
766            );
767
768            let builder = builder_storage.insert(PacketBuilder::new(
769                now,
770                space_id,
771                self.rem_cids.active(),
772                buf,
773                buf_capacity,
774                datagram_start,
775                ack_eliciting,
776                self,
777            )?);
778            coalesce = coalesce && !builder.short_header;
779
780            // https://tools.ietf.org/html/draft-ietf-quic-transport-34#section-14.1
781            pad_datagram |=
782                space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
783
784            if close {
785                trace!("sending CONNECTION_CLOSE");
786                // Encode ACKs before the ConnectionClose message, to give the receiver
787                // a better approximate on what data has been processed. This is
788                // especially important with ack delay, since the peer might not
789                // have gotten any other ACK for the data earlier on.
790                if !self.spaces[space_id].pending_acks.ranges().is_empty() {
791                    Self::populate_acks(
792                        now,
793                        self.receiving_ecn,
794                        &mut SentFrames::default(),
795                        &mut self.spaces[space_id],
796                        buf,
797                        &mut self.stats,
798                    );
799                }
800
801                // Since there only 64 ACK frames there will always be enough space
802                // to encode the ConnectionClose frame too. However we still have the
803                // check here to prevent crashes if something changes.
804                debug_assert!(
805                    buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
806                    "ACKs should leave space for ConnectionClose"
807                );
808                if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
809                    let max_frame_size = builder.max_size - buf.len();
810                    match self.state {
811                        State::Closed(state::Closed { ref reason }) => {
812                            if space_id == SpaceId::Data || reason.is_transport_layer() {
813                                reason.encode(buf, max_frame_size)
814                            } else {
815                                frame::ConnectionClose {
816                                    error_code: TransportErrorCode::APPLICATION_ERROR,
817                                    frame_type: None,
818                                    reason: Bytes::new(),
819                                }
820                                .encode(buf, max_frame_size)
821                            }
822                        }
823                        State::Draining => frame::ConnectionClose {
824                            error_code: TransportErrorCode::NO_ERROR,
825                            frame_type: None,
826                            reason: Bytes::new(),
827                        }
828                        .encode(buf, max_frame_size),
829                        _ => unreachable!(
830                            "tried to make a close packet when the connection wasn't closed"
831                        ),
832                    }
833                }
834                if space_id == self.highest_space {
835                    // Don't send another close packet
836                    self.close = false;
837                    // `CONNECTION_CLOSE` is the final packet
838                    break;
839                } else {
840                    // Send a close frame in every possible space for robustness, per RFC9000
841                    // "Immediate Close during the Handshake". Don't bother trying to send anything
842                    // else.
843                    space_idx += 1;
844                    continue;
845                }
846            }
847
848            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path
849            // validation can occur while the link is saturated.
850            if space_id == SpaceId::Data && num_datagrams == 1 {
851                if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
852                    // `unwrap` guaranteed to succeed because `builder_storage` was populated just
853                    // above.
854                    let mut builder = builder_storage.take().unwrap();
855                    trace!("PATH_RESPONSE {:08x} (off-path)", token);
856                    buf.write(frame::FrameType::PATH_RESPONSE);
857                    buf.write(token);
858                    self.stats.frame_tx.path_response += 1;
859                    builder.pad_to(MIN_INITIAL_SIZE);
860                    builder.finish_and_track(
861                        now,
862                        self,
863                        Some(SentFrames {
864                            non_retransmits: true,
865                            ..SentFrames::default()
866                        }),
867                        buf,
868                    );
869                    self.stats.udp_tx.on_sent(1, buf.len());
870                    return Some(Transmit {
871                        destination: remote,
872                        size: buf.len(),
873                        ecn: None,
874                        segment_size: None,
875                        src_ip: self.local_ip,
876                    });
877                }
878            }
879
880            let sent =
881                self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
882
883            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
884            // any other reason, there is a bug which leads to one component announcing write
885            // readiness while not writing any data. This degrades performance. The condition is
886            // only checked if the full MTU is available and when potentially large fixed-size
887            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
888            // writing ACKs.
889            debug_assert!(
890                !(sent.is_ack_only(&self.streams)
891                    && !can_send.acks
892                    && can_send.other
893                    && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
894                    && self.datagrams.outgoing.is_empty()),
895                "SendableFrames was {can_send:?}, but only ACKs have been written"
896            );
897            pad_datagram |= sent.requires_padding;
898
899            if sent.largest_acked.is_some() {
900                self.spaces[space_id].pending_acks.acks_sent();
901                self.timers.stop(Timer::MaxAckDelay);
902            }
903
904            // Keep information about the packet around until it gets finalized
905            sent_frames = Some(sent);
906
907            // Don't increment space_idx.
908            // We stay in the current space and check if there is more data to send.
909        }
910
911        // Finish the last packet
912        if let Some(mut builder) = builder_storage {
913            if pad_datagram {
914                builder.pad_to(MIN_INITIAL_SIZE);
915            }
916
917            // If this datagram is a loss probe and `segment_size` is larger than `INITIAL_MTU`,
918            // then padding it to `segment_size` would risk failure to recover from a reduction in
919            // path MTU.
920            // Loss probes are the only packets for which we might grow `buf_capacity`
921            // by less than `segment_size`.
922            if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
923                builder.pad_to(segment_size as u16);
924            }
925
926            let last_packet_number = builder.exact_number;
927            builder.finish_and_track(now, self, sent_frames, buf);
928            self.path
929                .congestion
930                .on_sent(now, buf.len() as u64, last_packet_number);
931
932            self.config.qlog_sink.emit_recovery_metrics(
933                self.pto_count,
934                &mut self.path,
935                now,
936                self.orig_rem_cid,
937            );
938        }
939
940        self.app_limited = buf.is_empty() && !congestion_blocked;
941
942        // Send MTU probe if necessary
943        if buf.is_empty() && self.state.is_established() {
944            let space_id = SpaceId::Data;
945            let probe_size = self
946                .path
947                .mtud
948                .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
949
950            let buf_capacity = probe_size as usize;
951            buf.reserve(buf_capacity);
952
953            let mut builder = PacketBuilder::new(
954                now,
955                space_id,
956                self.rem_cids.active(),
957                buf,
958                buf_capacity,
959                0,
960                true,
961                self,
962            )?;
963
964            // We implement MTU probes as ping packets padded up to the probe size
965            buf.write(frame::FrameType::PING);
966            self.stats.frame_tx.ping += 1;
967
968            // If supported by the peer, we want no delays to the probe's ACK
969            if self.peer_supports_ack_frequency() {
970                buf.write(frame::FrameType::IMMEDIATE_ACK);
971                self.stats.frame_tx.immediate_ack += 1;
972            }
973
974            builder.pad_to(probe_size);
975            let sent_frames = SentFrames {
976                non_retransmits: true,
977                ..Default::default()
978            };
979            builder.finish_and_track(now, self, Some(sent_frames), buf);
980
981            self.stats.path.sent_plpmtud_probes += 1;
982            num_datagrams = 1;
983
984            trace!(?probe_size, "writing MTUD probe");
985        }
986
987        if buf.is_empty() {
988            return None;
989        }
990
991        trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
992        self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
993
994        self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
995
996        Some(Transmit {
997            destination: self.path.remote,
998            size: buf.len(),
999            ecn: if self.path.sending_ecn {
1000                Some(EcnCodepoint::Ect0)
1001            } else {
1002                None
1003            },
1004            segment_size: match num_datagrams {
1005                1 => None,
1006                _ => Some(segment_size),
1007            },
1008            src_ip: self.local_ip,
1009        })
1010    }
1011
1012    /// Send PATH_CHALLENGE for a previous path if necessary
1013    fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1014        let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1015        if !prev_path.challenge_pending {
1016            return None;
1017        }
1018        prev_path.challenge_pending = false;
1019        let token = prev_path
1020            .challenge
1021            .expect("previous path challenge pending without token");
1022        let destination = prev_path.remote;
1023        debug_assert_eq!(
1024            self.highest_space,
1025            SpaceId::Data,
1026            "PATH_CHALLENGE queued without 1-RTT keys"
1027        );
1028        buf.reserve(MIN_INITIAL_SIZE as usize);
1029
1030        let buf_capacity = buf.capacity();
1031
1032        // Use the previous CID to avoid linking the new path with the previous path. We
1033        // don't bother accounting for possible retirement of that prev_cid because this is
1034        // sent once, immediately after migration, when the CID is known to be valid. Even
1035        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1036        // this is sent first.
1037        let mut builder = PacketBuilder::new(
1038            now,
1039            SpaceId::Data,
1040            *prev_cid,
1041            buf,
1042            buf_capacity,
1043            0,
1044            false,
1045            self,
1046        )?;
1047        trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1048        buf.write(frame::FrameType::PATH_CHALLENGE);
1049        buf.write(token);
1050        self.stats.frame_tx.path_challenge += 1;
1051
1052        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1053        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1054        // unless the anti-amplification limit for the path does not permit
1055        // sending a datagram of this size
1056        builder.pad_to(MIN_INITIAL_SIZE);
1057
1058        builder.finish(self, now, buf);
1059        self.stats.udp_tx.on_sent(1, buf.len());
1060
1061        Some(Transmit {
1062            destination,
1063            size: buf.len(),
1064            ecn: None,
1065            segment_size: None,
1066            src_ip: self.local_ip,
1067        })
1068    }
1069
1070    /// Indicate what types of frames are ready to send for the given space
1071    fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1072        if self.spaces[space_id].crypto.is_none()
1073            && (space_id != SpaceId::Data
1074                || self.zero_rtt_crypto.is_none()
1075                || self.side.is_server())
1076        {
1077            // No keys available for this space
1078            return SendableFrames::empty();
1079        }
1080        let mut can_send = self.spaces[space_id].can_send(&self.streams);
1081        if space_id == SpaceId::Data {
1082            can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1083        }
1084        can_send
1085    }
1086
1087    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1088    ///
1089    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1090    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1091    /// extracted through the relevant methods.
1092    pub fn handle_event(&mut self, event: ConnectionEvent) {
1093        use ConnectionEventInner::*;
1094        match event.0 {
1095            Datagram(DatagramConnectionEvent {
1096                now,
1097                remote,
1098                ecn,
1099                first_decode,
1100                remaining,
1101            }) => {
1102                // If this packet could initiate a migration and we're a client or a server that
1103                // forbids migration, drop the datagram. This could be relaxed to heuristically
1104                // permit NAT-rebinding-like migration.
1105                if remote != self.path.remote && !self.side.remote_may_migrate() {
1106                    trace!("discarding packet from unrecognized peer {}", remote);
1107                    return;
1108                }
1109
1110                let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1111
1112                self.stats.udp_rx.datagrams += 1;
1113                self.stats.udp_rx.bytes += first_decode.len() as u64;
1114                let data_len = first_decode.len();
1115
1116                self.handle_decode(now, remote, ecn, first_decode);
1117                // The current `path` might have changed inside `handle_decode`,
1118                // since the packet could have triggered a migration. Make sure
1119                // the data received is accounted for the most recent path by accessing
1120                // `path` after `handle_decode`.
1121                self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1122
1123                if let Some(data) = remaining {
1124                    self.stats.udp_rx.bytes += data.len() as u64;
1125                    self.handle_coalesced(now, remote, ecn, data);
1126                }
1127
1128                self.config.qlog_sink.emit_recovery_metrics(
1129                    self.pto_count,
1130                    &mut self.path,
1131                    now,
1132                    self.orig_rem_cid,
1133                );
1134
1135                if was_anti_amplification_blocked {
1136                    // A prior attempt to set the loss detection timer may have failed due to
1137                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1138                    // the server's first flight is lost.
1139                    self.set_loss_detection_timer(now);
1140                }
1141            }
1142            NewIdentifiers(ids, now) => {
1143                self.local_cid_state.new_cids(&ids, now);
1144                ids.into_iter().rev().for_each(|frame| {
1145                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1146                });
1147                // Update Timer::PushNewCid
1148                if self
1149                    .timers
1150                    .get(Timer::PushNewCid)
1151                    .map_or(true, |x| x <= now)
1152                {
1153                    self.reset_cid_retirement();
1154                }
1155            }
1156        }
1157    }
1158
1159    /// Process timer expirations
1160    ///
1161    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1162    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1163    /// methods.
1164    ///
1165    /// It is most efficient to call this immediately after the system clock reaches the latest
1166    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1167    /// no-op and therefore are safe.
1168    pub fn handle_timeout(&mut self, now: Instant) {
1169        for &timer in &Timer::VALUES {
1170            if !self.timers.is_expired(timer, now) {
1171                continue;
1172            }
1173            self.timers.stop(timer);
1174            trace!(timer = ?timer, "timeout");
1175            match timer {
1176                Timer::Close => {
1177                    self.state = State::Drained;
1178                    self.endpoint_events.push_back(EndpointEventInner::Drained);
1179                }
1180                Timer::Idle => {
1181                    self.kill(ConnectionError::TimedOut);
1182                }
1183                Timer::KeepAlive => {
1184                    trace!("sending keep-alive");
1185                    self.ping();
1186                }
1187                Timer::LossDetection => {
1188                    self.on_loss_detection_timeout(now);
1189
1190                    self.config.qlog_sink.emit_recovery_metrics(
1191                        self.pto_count,
1192                        &mut self.path,
1193                        now,
1194                        self.orig_rem_cid,
1195                    );
1196                }
1197                Timer::KeyDiscard => {
1198                    self.zero_rtt_crypto = None;
1199                    self.prev_crypto = None;
1200                }
1201                Timer::PathValidation => {
1202                    debug!("path validation failed");
1203                    if let Some((_, prev)) = self.prev_path.take() {
1204                        self.path = prev;
1205                    }
1206                    self.path.challenge = None;
1207                    self.path.challenge_pending = false;
1208                }
1209                Timer::Pacing => trace!("pacing timer expired"),
1210                Timer::PushNewCid => {
1211                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1212                    let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1213                    if !self.state.is_closed() {
1214                        trace!(
1215                            "push a new cid to peer RETIRE_PRIOR_TO field {}",
1216                            self.local_cid_state.retire_prior_to()
1217                        );
1218                        self.endpoint_events
1219                            .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1220                    }
1221                }
1222                Timer::MaxAckDelay => {
1223                    trace!("max ack delay reached");
1224                    // This timer is only armed in the Data space
1225                    self.spaces[SpaceId::Data]
1226                        .pending_acks
1227                        .on_max_ack_delay_timeout()
1228                }
1229            }
1230        }
1231    }
1232
1233    /// Close a connection immediately
1234    ///
1235    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
1236    /// call this only when all important communications have been completed, e.g. by calling
1237    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
1238    /// [`StreamEvent::Finished`] event.
1239    ///
1240    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
1241    /// delivered. There may still be data from the peer that has not been received.
1242    ///
1243    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
1244    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1245        self.close_inner(
1246            now,
1247            Close::Application(frame::ApplicationClose { error_code, reason }),
1248        )
1249    }
1250
1251    fn close_inner(&mut self, now: Instant, reason: Close) {
1252        let was_closed = self.state.is_closed();
1253        if !was_closed {
1254            self.close_common();
1255            self.set_close_timer(now);
1256            self.close = true;
1257            self.state = State::Closed(state::Closed { reason });
1258        }
1259    }
1260
1261    /// Control datagrams
1262    pub fn datagrams(&mut self) -> Datagrams<'_> {
1263        Datagrams { conn: self }
1264    }
1265
1266    /// Returns connection statistics
1267    pub fn stats(&self) -> ConnectionStats {
1268        let mut stats = self.stats;
1269        stats.path.rtt = self.path.rtt.get();
1270        stats.path.cwnd = self.path.congestion.window();
1271        stats.path.current_mtu = self.path.mtud.current_mtu();
1272
1273        stats
1274    }
1275
1276    /// Ping the remote endpoint
1277    ///
1278    /// Causes an ACK-eliciting packet to be transmitted.
1279    pub fn ping(&mut self) {
1280        self.spaces[self.highest_space].ping_pending = true;
1281    }
1282
1283    /// Update traffic keys spontaneously
1284    ///
1285    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
1286    pub fn force_key_update(&mut self) {
1287        if !self.state.is_established() {
1288            debug!("ignoring forced key update in illegal state");
1289            return;
1290        }
1291        if self.prev_crypto.is_some() {
1292            // We already just updated, or are currently updating, the keys. Concurrent key updates
1293            // are illegal.
1294            debug!("ignoring redundant forced key update");
1295            return;
1296        }
1297        self.update_keys(None, false);
1298    }
1299
1300    // Compatibility wrapper for quinn < 0.11.7. Remove for 0.12.
1301    #[doc(hidden)]
1302    #[deprecated]
1303    pub fn initiate_key_update(&mut self) {
1304        self.force_key_update();
1305    }
1306
1307    /// Get a session reference
1308    pub fn crypto_session(&self) -> &dyn crypto::Session {
1309        &*self.crypto
1310    }
1311
1312    /// Whether the connection is in the process of being established
1313    ///
1314    /// If this returns `false`, the connection may be either established or closed, signaled by the
1315    /// emission of a `Connected` or `ConnectionLost` message respectively.
1316    pub fn is_handshaking(&self) -> bool {
1317        self.state.is_handshake()
1318    }
1319
1320    /// Whether the connection is closed
1321    ///
1322    /// Closed connections cannot transport any further data. A connection becomes closed when
1323    /// either peer application intentionally closes it, or when either transport layer detects an
1324    /// error such as a time-out or certificate validation failure.
1325    ///
1326    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
1327    pub fn is_closed(&self) -> bool {
1328        self.state.is_closed()
1329    }
1330
1331    /// Whether there is no longer any need to keep the connection around
1332    ///
1333    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
1334    /// packets from the peer. All drained connections have been closed.
1335    pub fn is_drained(&self) -> bool {
1336        self.state.is_drained()
1337    }
1338
1339    /// For clients, if the peer accepted the 0-RTT data packets
1340    ///
1341    /// The value is meaningless until after the handshake completes.
1342    pub fn accepted_0rtt(&self) -> bool {
1343        self.accepted_0rtt
1344    }
1345
1346    /// Whether 0-RTT is/was possible during the handshake
1347    pub fn has_0rtt(&self) -> bool {
1348        self.zero_rtt_enabled
1349    }
1350
1351    /// Whether there are any pending retransmits
1352    pub fn has_pending_retransmits(&self) -> bool {
1353        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1354    }
1355
1356    /// Look up whether we're the client or server of this Connection
1357    pub fn side(&self) -> Side {
1358        self.side.side()
1359    }
1360
1361    /// The latest socket address for this connection's peer
1362    pub fn remote_address(&self) -> SocketAddr {
1363        self.path.remote
1364    }
1365
1366    /// The local IP address which was used when the peer established
1367    /// the connection
1368    ///
1369    /// This can be different from the address the endpoint is bound to, in case
1370    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
1371    ///
1372    /// This will return `None` for clients, or when no `local_ip` was passed to
1373    /// [`Endpoint::handle()`](crate::Endpoint::handle) for the datagrams establishing this
1374    /// connection.
1375    pub fn local_ip(&self) -> Option<IpAddr> {
1376        self.local_ip
1377    }
1378
1379    /// Current best estimate of this connection's latency (round-trip-time)
1380    pub fn rtt(&self) -> Duration {
1381        self.path.rtt.get()
1382    }
1383
1384    /// Current state of this connection's congestion controller, for debugging purposes
1385    pub fn congestion_state(&self) -> &dyn Controller {
1386        self.path.congestion.as_ref()
1387    }
1388
1389    /// Resets path-specific settings.
1390    ///
1391    /// This will force-reset several subsystems related to a specific network path.
1392    /// Currently this is the congestion controller, round-trip estimator, and the MTU
1393    /// discovery.
1394    ///
1395    /// This is useful when it is known the underlying network path has changed and the old
1396    /// state of these subsystems is no longer valid or optimal. In this case it might be
1397    /// faster or reduce loss to settle on optimal values by restarting from the initial
1398    /// configuration in the [`TransportConfig`].
1399    pub fn path_changed(&mut self, now: Instant) {
1400        self.path.reset(now, &self.config);
1401    }
1402
1403    /// Modify the number of remotely initiated streams that may be concurrently open
1404    ///
1405    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
1406    /// `count`s increase both minimum and worst-case memory consumption.
1407    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1408        self.streams.set_max_concurrent(dir, count);
1409        // If the limit was reduced, then a flow control update previously deemed insignificant may
1410        // now be significant.
1411        let pending = &mut self.spaces[SpaceId::Data].pending;
1412        self.streams.queue_max_stream_id(pending);
1413    }
1414
1415    /// Current number of remotely initiated streams that may be concurrently open
1416    ///
1417    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
1418    /// it will not change immediately, even if fewer streams are open. Instead, it will
1419    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
1420    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1421        self.streams.max_concurrent(dir)
1422    }
1423
1424    /// See [`TransportConfig::send_window()`]
1425    pub fn set_send_window(&mut self, send_window: u64) {
1426        self.streams.set_send_window(send_window);
1427    }
1428
1429    /// See [`TransportConfig::receive_window()`]
1430    pub fn set_receive_window(&mut self, receive_window: VarInt) {
1431        if self.streams.set_receive_window(receive_window) {
1432            self.spaces[SpaceId::Data].pending.max_data = true;
1433        }
1434    }
1435
1436    fn on_ack_received(
1437        &mut self,
1438        now: Instant,
1439        space: SpaceId,
1440        ack: frame::Ack,
1441    ) -> Result<(), TransportError> {
1442        if ack.largest >= self.spaces[space].next_packet_number {
1443            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1444        }
1445        let new_largest = {
1446            let space = &mut self.spaces[space];
1447            if space
1448                .largest_acked_packet
1449                .map_or(true, |pn| ack.largest > pn)
1450            {
1451                space.largest_acked_packet = Some(ack.largest);
1452                if let Some(info) = space.sent_packets.get(&ack.largest) {
1453                    // This should always succeed, but a misbehaving peer might ACK a packet we
1454                    // haven't sent. At worst, that will result in us spuriously reducing the
1455                    // congestion window.
1456                    space.largest_acked_packet_sent = info.time_sent;
1457                }
1458                true
1459            } else {
1460                false
1461            }
1462        };
1463
1464        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
1465        let mut newly_acked = ArrayRangeSet::new();
1466        for range in ack.iter() {
1467            self.packet_number_filter.check_ack(space, range.clone())?;
1468            for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1469                newly_acked.insert_one(pn);
1470            }
1471        }
1472
1473        if newly_acked.is_empty() {
1474            return Ok(());
1475        }
1476
1477        let mut ack_eliciting_acked = false;
1478        for packet in newly_acked.elts() {
1479            if let Some(info) = self.spaces[space].take(packet) {
1480                if let Some(acked) = info.largest_acked {
1481                    // Assume ACKs for all packets below the largest acknowledged in `packet` have
1482                    // been received. This can cause the peer to spuriously retransmit if some of
1483                    // our earlier ACKs were lost, but allows for simpler state tracking. See
1484                    // discussion at
1485                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
1486                    self.spaces[space].pending_acks.subtract_below(acked);
1487                }
1488                ack_eliciting_acked |= info.ack_eliciting;
1489
1490                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
1491                let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1492                if mtu_updated {
1493                    self.path
1494                        .congestion
1495                        .on_mtu_update(self.path.mtud.current_mtu());
1496                }
1497
1498                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
1499                self.ack_frequency.on_acked(packet);
1500
1501                self.on_packet_acked(now, info);
1502            }
1503        }
1504
1505        self.path.congestion.on_end_acks(
1506            now,
1507            self.path.in_flight.bytes,
1508            self.app_limited,
1509            self.spaces[space].largest_acked_packet,
1510        );
1511
1512        if new_largest && ack_eliciting_acked {
1513            let ack_delay = if space != SpaceId::Data {
1514                Duration::from_micros(0)
1515            } else {
1516                cmp::min(
1517                    self.ack_frequency.peer_max_ack_delay,
1518                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1519                )
1520            };
1521            let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1522            self.path.rtt.update(ack_delay, rtt);
1523            if self.path.first_packet_after_rtt_sample.is_none() {
1524                self.path.first_packet_after_rtt_sample =
1525                    Some((space, self.spaces[space].next_packet_number));
1526            }
1527        }
1528
1529        // Must be called before crypto/pto_count are clobbered
1530        self.detect_lost_packets(now, space, true);
1531
1532        if self.peer_completed_address_validation() {
1533            self.pto_count = 0;
1534        }
1535
1536        // Explicit congestion notification
1537        if self.path.sending_ecn {
1538            if let Some(ecn) = ack.ecn {
1539                // We only examine ECN counters from ACKs that we are certain we received in transmit
1540                // order, allowing us to compute an increase in ECN counts to compare against the number
1541                // of newly acked packets that remains well-defined in the presence of arbitrary packet
1542                // reordering.
1543                if new_largest {
1544                    let sent = self.spaces[space].largest_acked_packet_sent;
1545                    self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1546                }
1547            } else {
1548                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
1549                debug!("ECN not acknowledged by peer");
1550                self.path.sending_ecn = false;
1551            }
1552        }
1553
1554        self.set_loss_detection_timer(now);
1555        Ok(())
1556    }
1557
1558    /// Process a new ECN block from an in-order ACK
1559    fn process_ecn(
1560        &mut self,
1561        now: Instant,
1562        space: SpaceId,
1563        newly_acked: u64,
1564        ecn: frame::EcnCounts,
1565        largest_sent_time: Instant,
1566    ) {
1567        match self.spaces[space].detect_ecn(newly_acked, ecn) {
1568            Err(e) => {
1569                debug!("halting ECN due to verification failure: {}", e);
1570                self.path.sending_ecn = false;
1571                // Wipe out the existing value because it might be garbage and could interfere with
1572                // future attempts to use ECN on new paths.
1573                self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
1574            }
1575            Ok(false) => {}
1576            Ok(true) => {
1577                self.stats.path.congestion_events += 1;
1578                self.path
1579                    .congestion
1580                    .on_congestion_event(now, largest_sent_time, false, 0);
1581            }
1582        }
1583    }
1584
1585    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
1586    // high-latency handshakes
1587    fn on_packet_acked(&mut self, now: Instant, info: SentPacket) {
1588        self.remove_in_flight(&info);
1589        if info.ack_eliciting && self.path.challenge.is_none() {
1590            // Only pass ACKs to the congestion controller if we are not validating the current
1591            // path, so as to ignore any ACKs from older paths still coming in.
1592            self.path.congestion.on_ack(
1593                now,
1594                info.time_sent,
1595                info.size.into(),
1596                self.app_limited,
1597                &self.path.rtt,
1598            );
1599        }
1600
1601        // Update state for confirmed delivery of frames
1602        if let Some(retransmits) = info.retransmits.get() {
1603            for (id, _) in retransmits.reset_stream.iter() {
1604                self.streams.reset_acked(*id);
1605            }
1606        }
1607
1608        for frame in info.stream_frames {
1609            self.streams.received_ack_of(frame);
1610        }
1611    }
1612
1613    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
1614        let start = if self.zero_rtt_crypto.is_some() {
1615            now
1616        } else {
1617            self.prev_crypto
1618                .as_ref()
1619                .expect("no previous keys")
1620                .end_packet
1621                .as_ref()
1622                .expect("update not acknowledged yet")
1623                .1
1624        };
1625        self.timers
1626            .set(Timer::KeyDiscard, start + self.pto(space) * 3);
1627    }
1628
1629    fn on_loss_detection_timeout(&mut self, now: Instant) {
1630        if let Some((_, pn_space)) = self.loss_time_and_space() {
1631            // Time threshold loss Detection
1632            self.detect_lost_packets(now, pn_space, false);
1633            self.set_loss_detection_timer(now);
1634            return;
1635        }
1636
1637        let (_, space) = match self.pto_time_and_space(now) {
1638            Some(x) => x,
1639            None => {
1640                error!("PTO expired while unset");
1641                return;
1642            }
1643        };
1644        trace!(
1645            in_flight = self.path.in_flight.bytes,
1646            count = self.pto_count,
1647            ?space,
1648            "PTO fired"
1649        );
1650
1651        let count = match self.path.in_flight.ack_eliciting {
1652            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
1653            // deadlock preventions
1654            0 => {
1655                debug_assert!(!self.peer_completed_address_validation());
1656                1
1657            }
1658            // Conventional loss probe
1659            _ => 2,
1660        };
1661        self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
1662        self.pto_count = self.pto_count.saturating_add(1);
1663        self.set_loss_detection_timer(now);
1664    }
1665
1666    fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
1667        let mut lost_packets = Vec::<u64>::new();
1668        let mut lost_mtu_probe = None;
1669        let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
1670        let rtt = self.path.rtt.conservative();
1671        let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
1672
1673        // Packets sent before this time are deemed lost.
1674        let lost_send_time = now.checked_sub(loss_delay).unwrap();
1675        let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
1676        let packet_threshold = self.config.packet_threshold as u64;
1677        let mut size_of_lost_packets = 0u64;
1678
1679        // InPersistentCongestion: Determine if all packets in the time period before the newest
1680        // lost packet, including the edges, are marked lost. PTO computation must always
1681        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
1682        let congestion_period =
1683            self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
1684        let mut persistent_congestion_start: Option<Instant> = None;
1685        let mut prev_packet = None;
1686        let mut in_persistent_congestion = false;
1687
1688        let space = &mut self.spaces[pn_space];
1689        space.loss_time = None;
1690
1691        for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
1692            if prev_packet != Some(packet.wrapping_sub(1)) {
1693                // An intervening packet was acknowledged
1694                persistent_congestion_start = None;
1695            }
1696
1697            if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
1698            {
1699                if Some(packet) == in_flight_mtu_probe {
1700                    // Lost MTU probes are not included in `lost_packets`, because they should not
1701                    // trigger a congestion control response
1702                    lost_mtu_probe = in_flight_mtu_probe;
1703                } else {
1704                    lost_packets.push(packet);
1705                    size_of_lost_packets += info.size as u64;
1706                    if info.ack_eliciting && due_to_ack {
1707                        match persistent_congestion_start {
1708                            // Two ACK-eliciting packets lost more than congestion_period apart, with no
1709                            // ACKed packets in between
1710                            Some(start) if info.time_sent - start > congestion_period => {
1711                                in_persistent_congestion = true;
1712                            }
1713                            // Persistent congestion must start after the first RTT sample
1714                            None if self
1715                                .path
1716                                .first_packet_after_rtt_sample
1717                                .is_some_and(|x| x < (pn_space, packet)) =>
1718                            {
1719                                persistent_congestion_start = Some(info.time_sent);
1720                            }
1721                            _ => {}
1722                        }
1723                    }
1724                }
1725            } else {
1726                let next_loss_time = info.time_sent + loss_delay;
1727                space.loss_time = Some(
1728                    space
1729                        .loss_time
1730                        .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
1731                );
1732                persistent_congestion_start = None;
1733            }
1734
1735            prev_packet = Some(packet);
1736        }
1737
1738        // OnPacketsLost
1739        if let Some(largest_lost) = lost_packets.last().cloned() {
1740            let old_bytes_in_flight = self.path.in_flight.bytes;
1741            let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
1742            self.stats.path.lost_packets += lost_packets.len() as u64;
1743            self.stats.path.lost_bytes += size_of_lost_packets;
1744            trace!(
1745                "packets lost: {:?}, bytes lost: {}",
1746                lost_packets, size_of_lost_packets
1747            );
1748
1749            for &packet in &lost_packets {
1750                let info = self.spaces[pn_space].take(packet).unwrap(); // safe: lost_packets is populated just above
1751                self.config.qlog_sink.emit_packet_lost(
1752                    packet,
1753                    &info,
1754                    lost_send_time,
1755                    pn_space,
1756                    now,
1757                    self.orig_rem_cid,
1758                );
1759                self.remove_in_flight(&info);
1760                for frame in info.stream_frames {
1761                    self.streams.retransmit(frame);
1762                }
1763                self.spaces[pn_space].pending |= info.retransmits;
1764                self.path.mtud.on_non_probe_lost(packet, info.size);
1765            }
1766
1767            if self.path.mtud.black_hole_detected(now) {
1768                self.stats.path.black_holes_detected += 1;
1769                self.path
1770                    .congestion
1771                    .on_mtu_update(self.path.mtud.current_mtu());
1772                if let Some(max_datagram_size) = self.datagrams().max_size() {
1773                    self.datagrams.drop_oversized(max_datagram_size);
1774                }
1775            }
1776
1777            // Don't apply congestion penalty for lost ack-only packets
1778            let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
1779
1780            if lost_ack_eliciting {
1781                self.stats.path.congestion_events += 1;
1782                self.path.congestion.on_congestion_event(
1783                    now,
1784                    largest_lost_sent,
1785                    in_persistent_congestion,
1786                    size_of_lost_packets,
1787                );
1788            }
1789        }
1790
1791        // Handle a lost MTU probe
1792        if let Some(packet) = lost_mtu_probe {
1793            let info = self.spaces[SpaceId::Data].take(packet).unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and therefore must not have been removed yet
1794            self.remove_in_flight(&info);
1795            self.path.mtud.on_probe_lost();
1796            self.stats.path.lost_plpmtud_probes += 1;
1797        }
1798    }
1799
1800    fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
1801        SpaceId::iter()
1802            .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
1803            .min_by_key(|&(time, _)| time)
1804    }
1805
1806    fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
1807        let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
1808        let mut duration = self.path.rtt.pto_base() * backoff;
1809
1810        if self.path.in_flight.ack_eliciting == 0 {
1811            debug_assert!(!self.peer_completed_address_validation());
1812            let space = match self.highest_space {
1813                SpaceId::Handshake => SpaceId::Handshake,
1814                _ => SpaceId::Initial,
1815            };
1816            return Some((now + duration, space));
1817        }
1818
1819        let mut result = None;
1820        for space in SpaceId::iter() {
1821            if !self.spaces[space].has_in_flight() {
1822                continue;
1823            }
1824            if space == SpaceId::Data {
1825                // Skip ApplicationData until handshake completes.
1826                if self.is_handshaking() {
1827                    return result;
1828                }
1829                // Include max_ack_delay and backoff for ApplicationData.
1830                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
1831            }
1832            let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
1833                Some(time) => time,
1834                None => continue,
1835            };
1836            let pto = last_ack_eliciting + duration;
1837            if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
1838                result = Some((pto, space));
1839            }
1840        }
1841        result
1842    }
1843
1844    fn peer_completed_address_validation(&self) -> bool {
1845        if self.side.is_server() || self.state.is_closed() {
1846            return true;
1847        }
1848        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
1849        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
1850        self.spaces[SpaceId::Handshake]
1851            .largest_acked_packet
1852            .is_some()
1853            || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
1854            || (self.spaces[SpaceId::Data].crypto.is_some()
1855                && self.spaces[SpaceId::Handshake].crypto.is_none())
1856    }
1857
1858    fn set_loss_detection_timer(&mut self, now: Instant) {
1859        if self.state.is_closed() {
1860            // No loss detection takes place on closed connections, and `close_common` already
1861            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
1862            // reordered packet being handled by state-insensitive code.
1863            return;
1864        }
1865
1866        if let Some((loss_time, _)) = self.loss_time_and_space() {
1867            // Time threshold loss detection.
1868            self.timers.set(Timer::LossDetection, loss_time);
1869            return;
1870        }
1871
1872        if self.path.anti_amplification_blocked(1) {
1873            // We wouldn't be able to send anything, so don't bother.
1874            self.timers.stop(Timer::LossDetection);
1875            return;
1876        }
1877
1878        if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
1879            // There is nothing to detect lost, so no timer is set. However, the client needs to arm
1880            // the timer if the server might be blocked by the anti-amplification limit.
1881            self.timers.stop(Timer::LossDetection);
1882            return;
1883        }
1884
1885        // Determine which PN space to arm PTO for.
1886        // Calculate PTO duration
1887        if let Some((timeout, _)) = self.pto_time_and_space(now) {
1888            self.timers.set(Timer::LossDetection, timeout);
1889        } else {
1890            self.timers.stop(Timer::LossDetection);
1891        }
1892    }
1893
1894    /// Probe Timeout
1895    fn pto(&self, space: SpaceId) -> Duration {
1896        let max_ack_delay = match space {
1897            SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
1898            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
1899        };
1900        self.path.rtt.pto_base() + max_ack_delay
1901    }
1902
1903    fn on_packet_authenticated(
1904        &mut self,
1905        now: Instant,
1906        space_id: SpaceId,
1907        ecn: Option<EcnCodepoint>,
1908        packet: Option<u64>,
1909        spin: bool,
1910        is_1rtt: bool,
1911    ) {
1912        self.total_authed_packets += 1;
1913        self.reset_keep_alive(now);
1914        self.reset_idle_timeout(now, space_id);
1915        self.permit_idle_reset = true;
1916        self.receiving_ecn |= ecn.is_some();
1917        if let Some(x) = ecn {
1918            let space = &mut self.spaces[space_id];
1919            space.ecn_counters += x;
1920
1921            if x.is_ce() {
1922                space.pending_acks.set_immediate_ack_required();
1923            }
1924        }
1925
1926        let packet = match packet {
1927            Some(x) => x,
1928            None => return,
1929        };
1930        if self.side.is_server() {
1931            if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
1932                // A server stops sending and processing Initial packets when it receives its first Handshake packet.
1933                self.discard_space(now, SpaceId::Initial);
1934            }
1935            if self.zero_rtt_crypto.is_some() && is_1rtt {
1936                // Discard 0-RTT keys soon after receiving a 1-RTT packet
1937                self.set_key_discard_timer(now, space_id)
1938            }
1939        }
1940        let space = &mut self.spaces[space_id];
1941        space.pending_acks.insert_one(packet, now);
1942        if packet >= space.rx_packet {
1943            space.rx_packet = packet;
1944            // Update outgoing spin bit, inverting iff we're the client
1945            self.spin = self.side.is_client() ^ spin;
1946        }
1947
1948        self.config.qlog_sink.emit_packet_received(
1949            packet,
1950            space_id,
1951            !is_1rtt,
1952            now,
1953            self.orig_rem_cid,
1954        );
1955    }
1956
1957    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
1958        let timeout = match self.idle_timeout {
1959            None => return,
1960            Some(dur) => dur,
1961        };
1962        if self.state.is_closed() {
1963            self.timers.stop(Timer::Idle);
1964            return;
1965        }
1966        let dt = cmp::max(timeout, 3 * self.pto(space));
1967        self.timers.set(Timer::Idle, now + dt);
1968    }
1969
1970    fn reset_keep_alive(&mut self, now: Instant) {
1971        let interval = match self.config.keep_alive_interval {
1972            Some(x) if self.state.is_established() => x,
1973            _ => return,
1974        };
1975        self.timers.set(Timer::KeepAlive, now + interval);
1976    }
1977
1978    fn reset_cid_retirement(&mut self) {
1979        if let Some(t) = self.local_cid_state.next_timeout() {
1980            self.timers.set(Timer::PushNewCid, t);
1981        }
1982    }
1983
1984    /// Handle the already-decrypted first packet from the client
1985    ///
1986    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
1987    /// efficient.
1988    pub(crate) fn handle_first_packet(
1989        &mut self,
1990        now: Instant,
1991        remote: SocketAddr,
1992        ecn: Option<EcnCodepoint>,
1993        packet_number: u64,
1994        packet: InitialPacket,
1995        remaining: Option<BytesMut>,
1996    ) -> Result<(), ConnectionError> {
1997        let span = trace_span!("first recv");
1998        let _guard = span.enter();
1999        debug_assert!(self.side.is_server());
2000        let len = packet.header_data.len() + packet.payload.len();
2001        self.path.total_recvd = len as u64;
2002
2003        match self.state {
2004            State::Handshake(ref mut state) => {
2005                state.expected_token = packet.header.token.clone();
2006            }
2007            _ => unreachable!("first packet must be delivered in Handshake state"),
2008        }
2009
2010        self.on_packet_authenticated(
2011            now,
2012            SpaceId::Initial,
2013            ecn,
2014            Some(packet_number),
2015            false,
2016            false,
2017        );
2018
2019        self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2020        if let Some(data) = remaining {
2021            self.handle_coalesced(now, remote, ecn, data);
2022        }
2023
2024        self.config.qlog_sink.emit_recovery_metrics(
2025            self.pto_count,
2026            &mut self.path,
2027            now,
2028            self.orig_rem_cid,
2029        );
2030
2031        Ok(())
2032    }
2033
2034    fn init_0rtt(&mut self) {
2035        let (header, packet) = match self.crypto.early_crypto() {
2036            Some(x) => x,
2037            None => return,
2038        };
2039        if self.side.is_client() {
2040            match self.crypto.transport_parameters() {
2041                Ok(params) => {
2042                    let params = params
2043                        .expect("crypto layer didn't supply transport parameters with ticket");
2044                    // Certain values must not be cached
2045                    let params = TransportParameters {
2046                        initial_src_cid: None,
2047                        original_dst_cid: None,
2048                        preferred_address: None,
2049                        retry_src_cid: None,
2050                        stateless_reset_token: None,
2051                        min_ack_delay: None,
2052                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2053                        max_ack_delay: TransportParameters::default().max_ack_delay,
2054                        ..params
2055                    };
2056                    self.set_peer_params(params);
2057                }
2058                Err(e) => {
2059                    error!("session ticket has malformed transport parameters: {}", e);
2060                    return;
2061                }
2062            }
2063        }
2064        trace!("0-RTT enabled");
2065        self.zero_rtt_enabled = true;
2066        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2067    }
2068
2069    fn read_crypto(
2070        &mut self,
2071        space: SpaceId,
2072        crypto: &frame::Crypto,
2073        payload_len: usize,
2074    ) -> Result<(), TransportError> {
2075        let expected = if !self.state.is_handshake() {
2076            SpaceId::Data
2077        } else if self.highest_space == SpaceId::Initial {
2078            SpaceId::Initial
2079        } else {
2080            // On the server, self.highest_space can be Data after receiving the client's first
2081            // flight, but we expect Handshake CRYPTO until the handshake is complete.
2082            SpaceId::Handshake
2083        };
2084        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
2085        // packets are illegal, and we don't process 1-RTT packets until the handshake is
2086        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
2087        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2088
2089        let end = crypto.offset + crypto.data.len() as u64;
2090        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2091            warn!(
2092                "received new {:?} CRYPTO data when expecting {:?}",
2093                space, expected
2094            );
2095            return Err(TransportError::PROTOCOL_VIOLATION(
2096                "new data at unexpected encryption level",
2097            ));
2098        }
2099
2100        let space = &mut self.spaces[space];
2101        let max = end.saturating_sub(space.crypto_stream.bytes_read());
2102        if max > self.config.crypto_buffer_size as u64 {
2103            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2104        }
2105
2106        space
2107            .crypto_stream
2108            .insert(crypto.offset, crypto.data.clone(), payload_len);
2109        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2110            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2111            if self.crypto.read_handshake(&chunk.bytes)? {
2112                self.events.push_back(Event::HandshakeDataReady);
2113            }
2114        }
2115
2116        Ok(())
2117    }
2118
2119    fn write_crypto(&mut self) {
2120        loop {
2121            let space = self.highest_space;
2122            let mut outgoing = Vec::new();
2123            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2124                match space {
2125                    SpaceId::Initial => {
2126                        self.upgrade_crypto(SpaceId::Handshake, crypto);
2127                    }
2128                    SpaceId::Handshake => {
2129                        self.upgrade_crypto(SpaceId::Data, crypto);
2130                    }
2131                    _ => unreachable!("got updated secrets during 1-RTT"),
2132                }
2133            }
2134            if outgoing.is_empty() {
2135                if space == self.highest_space {
2136                    break;
2137                } else {
2138                    // Keys updated, check for more data to send
2139                    continue;
2140                }
2141            }
2142            let offset = self.spaces[space].crypto_offset;
2143            let outgoing = Bytes::from(outgoing);
2144            if let State::Handshake(ref mut state) = self.state {
2145                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2146                    state.client_hello = Some(outgoing.clone());
2147                }
2148            }
2149            self.spaces[space].crypto_offset += outgoing.len() as u64;
2150            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2151            self.spaces[space].pending.crypto.push_back(frame::Crypto {
2152                offset,
2153                data: outgoing,
2154            });
2155        }
2156    }
2157
2158    /// Switch to stronger cryptography during handshake
2159    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2160        debug_assert!(
2161            self.spaces[space].crypto.is_none(),
2162            "already reached packet space {space:?}"
2163        );
2164        trace!("{:?} keys ready", space);
2165        if space == SpaceId::Data {
2166            // Precompute the first key update
2167            self.next_crypto = Some(
2168                self.crypto
2169                    .next_1rtt_keys()
2170                    .expect("handshake should be complete"),
2171            );
2172        }
2173
2174        self.spaces[space].crypto = Some(crypto);
2175        debug_assert!(space as usize > self.highest_space as usize);
2176        self.highest_space = space;
2177        if space == SpaceId::Data && self.side.is_client() {
2178            // Discard 0-RTT keys because 1-RTT keys are available.
2179            self.zero_rtt_crypto = None;
2180        }
2181    }
2182
2183    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2184        debug_assert!(space_id != SpaceId::Data);
2185        trace!("discarding {:?} keys", space_id);
2186        if space_id == SpaceId::Initial {
2187            // No longer needed
2188            if let ConnectionSide::Client { token, .. } = &mut self.side {
2189                *token = Bytes::new();
2190            }
2191        }
2192        let space = &mut self.spaces[space_id];
2193        space.crypto = None;
2194        space.time_of_last_ack_eliciting_packet = None;
2195        space.loss_time = None;
2196        let sent_packets = mem::take(&mut space.sent_packets);
2197        for packet in sent_packets.into_values() {
2198            self.remove_in_flight(&packet);
2199        }
2200        self.set_loss_detection_timer(now)
2201    }
2202
2203    fn handle_coalesced(
2204        &mut self,
2205        now: Instant,
2206        remote: SocketAddr,
2207        ecn: Option<EcnCodepoint>,
2208        data: BytesMut,
2209    ) {
2210        self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2211        let mut remaining = Some(data);
2212        while let Some(data) = remaining {
2213            match PartialDecode::new(
2214                data,
2215                &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2216                &[self.version],
2217                self.endpoint_config.grease_quic_bit,
2218            ) {
2219                Ok((partial_decode, rest)) => {
2220                    remaining = rest;
2221                    self.handle_decode(now, remote, ecn, partial_decode);
2222                }
2223                Err(e) => {
2224                    trace!("malformed header: {}", e);
2225                    return;
2226                }
2227            }
2228        }
2229    }
2230
2231    fn handle_decode(
2232        &mut self,
2233        now: Instant,
2234        remote: SocketAddr,
2235        ecn: Option<EcnCodepoint>,
2236        partial_decode: PartialDecode,
2237    ) {
2238        if let Some(decoded) = packet_crypto::unprotect_header(
2239            partial_decode,
2240            &self.spaces,
2241            self.zero_rtt_crypto.as_ref(),
2242            self.peer_params.stateless_reset_token,
2243        ) {
2244            self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2245        }
2246    }
2247
2248    fn handle_packet(
2249        &mut self,
2250        now: Instant,
2251        remote: SocketAddr,
2252        ecn: Option<EcnCodepoint>,
2253        packet: Option<Packet>,
2254        stateless_reset: bool,
2255    ) {
2256        self.stats.udp_rx.ios += 1;
2257        if let Some(ref packet) = packet {
2258            trace!(
2259                "got {:?} packet ({} bytes) from {} using id {}",
2260                packet.header.space(),
2261                packet.payload.len() + packet.header_data.len(),
2262                remote,
2263                packet.header.dst_cid(),
2264            );
2265        }
2266
2267        if self.is_handshaking() && remote != self.path.remote {
2268            debug!("discarding packet with unexpected remote during handshake");
2269            return;
2270        }
2271
2272        let was_closed = self.state.is_closed();
2273        let was_drained = self.state.is_drained();
2274
2275        let decrypted = match packet {
2276            None => Err(None),
2277            Some(mut packet) => self
2278                .decrypt_packet(now, &mut packet)
2279                .map(move |number| (packet, number)),
2280        };
2281        let result = match decrypted {
2282            _ if stateless_reset => {
2283                debug!("got stateless reset");
2284                Err(ConnectionError::Reset)
2285            }
2286            Err(Some(e)) => {
2287                warn!("illegal packet: {}", e);
2288                Err(e.into())
2289            }
2290            Err(None) => {
2291                debug!("failed to authenticate packet");
2292                self.authentication_failures += 1;
2293                let integrity_limit = self.spaces[self.highest_space]
2294                    .crypto
2295                    .as_ref()
2296                    .unwrap()
2297                    .packet
2298                    .local
2299                    .integrity_limit();
2300                if self.authentication_failures > integrity_limit {
2301                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2302                } else {
2303                    return;
2304                }
2305            }
2306            Ok((packet, number)) => {
2307                let span = match number {
2308                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2309                    None => trace_span!("recv", space = ?packet.header.space()),
2310                };
2311                let _guard = span.enter();
2312
2313                let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2314                if number.is_some_and(is_duplicate) {
2315                    debug!("discarding possible duplicate packet");
2316                    return;
2317                } else if self.state.is_handshake() && packet.header.is_short() {
2318                    // TODO: SHOULD buffer these to improve reordering tolerance.
2319                    trace!("dropping short packet during handshake");
2320                    return;
2321                } else {
2322                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2323                        if let State::Handshake(ref hs) = self.state {
2324                            if self.side.is_server() && token != &hs.expected_token {
2325                                // Clients must send the same retry token in every Initial. Initial
2326                                // packets can be spoofed, so we discard rather than killing the
2327                                // connection.
2328                                warn!("discarding Initial with invalid retry token");
2329                                return;
2330                            }
2331                        }
2332                    }
2333
2334                    if !self.state.is_closed() {
2335                        let spin = match packet.header {
2336                            Header::Short { spin, .. } => spin,
2337                            _ => false,
2338                        };
2339                        self.on_packet_authenticated(
2340                            now,
2341                            packet.header.space(),
2342                            ecn,
2343                            number,
2344                            spin,
2345                            packet.header.is_1rtt(),
2346                        );
2347                    }
2348
2349                    self.process_decrypted_packet(now, remote, number, packet)
2350                }
2351            }
2352        };
2353
2354        // State transitions for error cases
2355        if let Err(conn_err) = result {
2356            self.error = Some(conn_err.clone());
2357            self.state = match conn_err {
2358                ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2359                ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2360                ConnectionError::Reset
2361                | ConnectionError::TransportError(TransportError {
2362                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
2363                    ..
2364                }) => State::Drained,
2365                ConnectionError::TimedOut => {
2366                    unreachable!("timeouts aren't generated by packet processing");
2367                }
2368                ConnectionError::TransportError(err) => {
2369                    debug!("closing connection due to transport error: {}", err);
2370                    State::closed(err)
2371                }
2372                ConnectionError::VersionMismatch => State::Draining,
2373                ConnectionError::LocallyClosed => {
2374                    unreachable!("LocallyClosed isn't generated by packet processing");
2375                }
2376                ConnectionError::CidsExhausted => {
2377                    unreachable!("CidsExhausted isn't generated by packet processing");
2378                }
2379            };
2380        }
2381
2382        if !was_closed && self.state.is_closed() {
2383            self.close_common();
2384            if !self.state.is_drained() {
2385                self.set_close_timer(now);
2386            }
2387        }
2388        if !was_drained && self.state.is_drained() {
2389            self.endpoint_events.push_back(EndpointEventInner::Drained);
2390            // Close timer may have been started previously, e.g. if we sent a close and got a
2391            // stateless reset in response
2392            self.timers.stop(Timer::Close);
2393        }
2394
2395        // Transmit CONNECTION_CLOSE if necessary
2396        if let State::Closed(_) = self.state {
2397            self.close = remote == self.path.remote;
2398        }
2399    }
2400
2401    fn process_decrypted_packet(
2402        &mut self,
2403        now: Instant,
2404        remote: SocketAddr,
2405        number: Option<u64>,
2406        packet: Packet,
2407    ) -> Result<(), ConnectionError> {
2408        let state = match self.state {
2409            State::Established => {
2410                match packet.header.space() {
2411                    SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2412                    _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2413                    _ => {
2414                        trace!("discarding unexpected pre-handshake packet");
2415                    }
2416                }
2417                return Ok(());
2418            }
2419            State::Closed(_) => {
2420                for result in frame::Iter::new(packet.payload.freeze())? {
2421                    let frame = match result {
2422                        Ok(frame) => frame,
2423                        Err(err) => {
2424                            debug!("frame decoding error: {err:?}");
2425                            continue;
2426                        }
2427                    };
2428
2429                    if let Frame::Padding = frame {
2430                        continue;
2431                    };
2432
2433                    self.stats.frame_rx.record(&frame);
2434
2435                    if let Frame::Close(_) = frame {
2436                        trace!("draining");
2437                        self.state = State::Draining;
2438                        break;
2439                    }
2440                }
2441                return Ok(());
2442            }
2443            State::Draining | State::Drained => return Ok(()),
2444            State::Handshake(ref mut state) => state,
2445        };
2446
2447        match packet.header {
2448            Header::Retry {
2449                src_cid: rem_cid, ..
2450            } => {
2451                if self.side.is_server() {
2452                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2453                }
2454
2455                if self.total_authed_packets > 1
2456                            || packet.payload.len() <= 16 // token + 16 byte tag
2457                            || !self.crypto.is_valid_retry(
2458                                &self.rem_cids.active(),
2459                                &packet.header_data,
2460                                &packet.payload,
2461                            )
2462                {
2463                    trace!("discarding invalid Retry");
2464                    // - After the client has received and processed an Initial or Retry
2465                    //   packet from the server, it MUST discard any subsequent Retry
2466                    //   packets that it receives.
2467                    // - A client MUST discard a Retry packet with a zero-length Retry Token
2468                    //   field.
2469                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
2470                    //   that cannot be validated
2471                    return Ok(());
2472                }
2473
2474                trace!("retrying with CID {}", rem_cid);
2475                let client_hello = state.client_hello.take().unwrap();
2476                self.retry_src_cid = Some(rem_cid);
2477                self.rem_cids.update_initial_cid(rem_cid);
2478                self.rem_handshake_cid = rem_cid;
2479
2480                let space = &mut self.spaces[SpaceId::Initial];
2481                if let Some(info) = space.take(0) {
2482                    self.on_packet_acked(now, info);
2483                };
2484
2485                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after any retransmitted Initials
2486                self.spaces[SpaceId::Initial] = PacketSpace {
2487                    crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2488                    next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2489                    crypto_offset: client_hello.len() as u64,
2490                    ..PacketSpace::new(now)
2491                };
2492                self.spaces[SpaceId::Initial]
2493                    .pending
2494                    .crypto
2495                    .push_back(frame::Crypto {
2496                        offset: 0,
2497                        data: client_hello,
2498                    });
2499
2500                // Retransmit all 0-RTT data
2501                let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2502                for info in zero_rtt.into_values() {
2503                    self.remove_in_flight(&info);
2504                    self.spaces[SpaceId::Data].pending |= info.retransmits;
2505                }
2506                self.streams.retransmit_all_for_0rtt();
2507
2508                let token_len = packet.payload.len() - 16;
2509                let ConnectionSide::Client { ref mut token, .. } = self.side else {
2510                    unreachable!("we already short-circuited if we're server");
2511                };
2512                *token = packet.payload.freeze().split_to(token_len);
2513                self.state = State::Handshake(state::Handshake {
2514                    expected_token: Bytes::new(),
2515                    rem_cid_set: false,
2516                    client_hello: None,
2517                });
2518                Ok(())
2519            }
2520            Header::Long {
2521                ty: LongType::Handshake,
2522                src_cid: rem_cid,
2523                ..
2524            } => {
2525                if rem_cid != self.rem_handshake_cid {
2526                    debug!(
2527                        "discarding packet with mismatched remote CID: {} != {}",
2528                        self.rem_handshake_cid, rem_cid
2529                    );
2530                    return Ok(());
2531                }
2532                self.on_path_validated();
2533
2534                self.process_early_payload(now, packet)?;
2535                if self.state.is_closed() {
2536                    return Ok(());
2537                }
2538
2539                if self.crypto.is_handshaking() {
2540                    trace!("handshake ongoing");
2541                    return Ok(());
2542                }
2543
2544                if self.side.is_client() {
2545                    // Client-only because server params were set from the client's Initial
2546                    let params =
2547                        self.crypto
2548                            .transport_parameters()?
2549                            .ok_or_else(|| TransportError {
2550                                code: TransportErrorCode::crypto(0x6d),
2551                                frame: None,
2552                                reason: "transport parameters missing".into(),
2553                            })?;
2554
2555                    if self.has_0rtt() {
2556                        if !self.crypto.early_data_accepted().unwrap() {
2557                            debug_assert!(self.side.is_client());
2558                            debug!("0-RTT rejected");
2559                            self.accepted_0rtt = false;
2560                            self.streams.zero_rtt_rejected();
2561
2562                            // Discard already-queued frames
2563                            self.spaces[SpaceId::Data].pending = Retransmits::default();
2564
2565                            // Discard 0-RTT packets
2566                            let sent_packets =
2567                                mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2568                            for packet in sent_packets.into_values() {
2569                                self.remove_in_flight(&packet);
2570                            }
2571                        } else {
2572                            self.accepted_0rtt = true;
2573                            params.validate_resumption_from(&self.peer_params)?;
2574                        }
2575                    }
2576                    if let Some(token) = params.stateless_reset_token {
2577                        self.endpoint_events
2578                            .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2579                    }
2580                    self.handle_peer_params(params)?;
2581                    self.issue_first_cids(now);
2582                } else {
2583                    // Server-only
2584                    self.spaces[SpaceId::Data].pending.handshake_done = true;
2585                    self.discard_space(now, SpaceId::Handshake);
2586                }
2587
2588                self.events.push_back(Event::Connected);
2589                self.state = State::Established;
2590                trace!("established");
2591                Ok(())
2592            }
2593            Header::Initial(InitialHeader {
2594                src_cid: rem_cid, ..
2595            }) => {
2596                if !state.rem_cid_set {
2597                    trace!("switching remote CID to {}", rem_cid);
2598                    let mut state = state.clone();
2599                    self.rem_cids.update_initial_cid(rem_cid);
2600                    self.rem_handshake_cid = rem_cid;
2601                    self.orig_rem_cid = rem_cid;
2602                    state.rem_cid_set = true;
2603                    self.state = State::Handshake(state);
2604                } else if rem_cid != self.rem_handshake_cid {
2605                    debug!(
2606                        "discarding packet with mismatched remote CID: {} != {}",
2607                        self.rem_handshake_cid, rem_cid
2608                    );
2609                    return Ok(());
2610                }
2611
2612                let starting_space = self.highest_space;
2613                self.process_early_payload(now, packet)?;
2614
2615                if self.side.is_server()
2616                    && starting_space == SpaceId::Initial
2617                    && self.highest_space != SpaceId::Initial
2618                {
2619                    let params =
2620                        self.crypto
2621                            .transport_parameters()?
2622                            .ok_or_else(|| TransportError {
2623                                code: TransportErrorCode::crypto(0x6d),
2624                                frame: None,
2625                                reason: "transport parameters missing".into(),
2626                            })?;
2627                    self.handle_peer_params(params)?;
2628                    self.issue_first_cids(now);
2629                    self.init_0rtt();
2630                }
2631                Ok(())
2632            }
2633            Header::Long {
2634                ty: LongType::ZeroRtt,
2635                ..
2636            } => {
2637                self.process_payload(now, remote, number.unwrap(), packet)?;
2638                Ok(())
2639            }
2640            Header::VersionNegotiate { .. } => {
2641                if self.total_authed_packets > 1 {
2642                    return Ok(());
2643                }
2644                let supported = packet
2645                    .payload
2646                    .chunks(4)
2647                    .any(|x| match <[u8; 4]>::try_from(x) {
2648                        Ok(version) => self.version == u32::from_be_bytes(version),
2649                        Err(_) => false,
2650                    });
2651                if supported {
2652                    return Ok(());
2653                }
2654                debug!("remote doesn't support our version");
2655                Err(ConnectionError::VersionMismatch)
2656            }
2657            Header::Short { .. } => unreachable!(
2658                "short packets received during handshake are discarded in handle_packet"
2659            ),
2660        }
2661    }
2662
2663    /// Process an Initial or Handshake packet payload
2664    fn process_early_payload(
2665        &mut self,
2666        now: Instant,
2667        packet: Packet,
2668    ) -> Result<(), TransportError> {
2669        debug_assert_ne!(packet.header.space(), SpaceId::Data);
2670        let payload_len = packet.payload.len();
2671        let mut ack_eliciting = false;
2672        for result in frame::Iter::new(packet.payload.freeze())? {
2673            let frame = result?;
2674            let span = match frame {
2675                Frame::Padding => continue,
2676                _ => Some(trace_span!("frame", ty = %frame.ty())),
2677            };
2678
2679            self.stats.frame_rx.record(&frame);
2680
2681            let _guard = span.as_ref().map(|x| x.enter());
2682            ack_eliciting |= frame.is_ack_eliciting();
2683
2684            // Process frames
2685            match frame {
2686                Frame::Padding | Frame::Ping => {}
2687                Frame::Crypto(frame) => {
2688                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
2689                }
2690                Frame::Ack(ack) => {
2691                    self.on_ack_received(now, packet.header.space(), ack)?;
2692                }
2693                Frame::Close(reason) => {
2694                    self.error = Some(reason.into());
2695                    self.state = State::Draining;
2696                    return Ok(());
2697                }
2698                _ => {
2699                    let mut err =
2700                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
2701                    err.frame = Some(frame.ty());
2702                    return Err(err);
2703                }
2704            }
2705        }
2706
2707        if ack_eliciting {
2708            // In the initial and handshake spaces, ACKs must be sent immediately
2709            self.spaces[packet.header.space()]
2710                .pending_acks
2711                .set_immediate_ack_required();
2712        }
2713
2714        self.write_crypto();
2715        Ok(())
2716    }
2717
2718    fn process_payload(
2719        &mut self,
2720        now: Instant,
2721        remote: SocketAddr,
2722        number: u64,
2723        packet: Packet,
2724    ) -> Result<(), TransportError> {
2725        let payload = packet.payload.freeze();
2726        let mut is_probing_packet = true;
2727        let mut close = None;
2728        let payload_len = payload.len();
2729        let mut ack_eliciting = false;
2730        for result in frame::Iter::new(payload)? {
2731            let frame = result?;
2732            let span = match frame {
2733                Frame::Padding => continue,
2734                _ => Some(trace_span!("frame", ty = %frame.ty())),
2735            };
2736
2737            self.stats.frame_rx.record(&frame);
2738            // Crypto, Stream and Datagram frames are special cased in order no pollute
2739            // the log with payload data
2740            match &frame {
2741                Frame::Crypto(f) => {
2742                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
2743                }
2744                Frame::Stream(f) => {
2745                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
2746                }
2747                Frame::Datagram(f) => {
2748                    trace!(len = f.data.len(), "got datagram frame");
2749                }
2750                f => {
2751                    trace!("got frame {:?}", f);
2752                }
2753            }
2754
2755            let _guard = span.as_ref().map(|x| x.enter());
2756            if packet.header.is_0rtt() {
2757                match frame {
2758                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
2759                        return Err(TransportError::PROTOCOL_VIOLATION(
2760                            "illegal frame type in 0-RTT",
2761                        ));
2762                    }
2763                    _ => {}
2764                }
2765            }
2766            ack_eliciting |= frame.is_ack_eliciting();
2767
2768            // Check whether this could be a probing packet
2769            match frame {
2770                Frame::Padding
2771                | Frame::PathChallenge(_)
2772                | Frame::PathResponse(_)
2773                | Frame::NewConnectionId(_) => {}
2774                _ => {
2775                    is_probing_packet = false;
2776                }
2777            }
2778            match frame {
2779                Frame::Crypto(frame) => {
2780                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
2781                }
2782                Frame::Stream(frame) => {
2783                    if self.streams.received(frame, payload_len)?.should_transmit() {
2784                        self.spaces[SpaceId::Data].pending.max_data = true;
2785                    }
2786                }
2787                Frame::Ack(ack) => {
2788                    self.on_ack_received(now, SpaceId::Data, ack)?;
2789                }
2790                Frame::Padding | Frame::Ping => {}
2791                Frame::Close(reason) => {
2792                    close = Some(reason);
2793                }
2794                Frame::PathChallenge(token) => {
2795                    self.path_responses.push(number, token, remote);
2796                    if remote == self.path.remote {
2797                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
2798                        // attack. Send a non-probing packet to recover the active path.
2799                        match self.peer_supports_ack_frequency() {
2800                            true => self.immediate_ack(),
2801                            false => self.ping(),
2802                        }
2803                    }
2804                }
2805                Frame::PathResponse(token) => {
2806                    if self.path.challenge == Some(token) && remote == self.path.remote {
2807                        trace!("new path validated");
2808                        self.timers.stop(Timer::PathValidation);
2809                        self.path.challenge = None;
2810                        self.path.validated = true;
2811                        if let Some((_, ref mut prev_path)) = self.prev_path {
2812                            prev_path.challenge = None;
2813                            prev_path.challenge_pending = false;
2814                        }
2815                    } else {
2816                        debug!(token, "ignoring invalid PATH_RESPONSE");
2817                    }
2818                }
2819                Frame::MaxData(bytes) => {
2820                    self.streams.received_max_data(bytes);
2821                }
2822                Frame::MaxStreamData { id, offset } => {
2823                    self.streams.received_max_stream_data(id, offset)?;
2824                }
2825                Frame::MaxStreams { dir, count } => {
2826                    self.streams.received_max_streams(dir, count)?;
2827                }
2828                Frame::ResetStream(frame) => {
2829                    if self.streams.received_reset(frame)?.should_transmit() {
2830                        self.spaces[SpaceId::Data].pending.max_data = true;
2831                    }
2832                }
2833                Frame::DataBlocked { offset } => {
2834                    debug!(offset, "peer claims to be blocked at connection level");
2835                }
2836                Frame::StreamDataBlocked { id, offset } => {
2837                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
2838                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
2839                        return Err(TransportError::STREAM_STATE_ERROR(
2840                            "STREAM_DATA_BLOCKED on send-only stream",
2841                        ));
2842                    }
2843                    debug!(
2844                        stream = %id,
2845                        offset, "peer claims to be blocked at stream level"
2846                    );
2847                }
2848                Frame::StreamsBlocked { dir, limit } => {
2849                    if limit > MAX_STREAM_COUNT {
2850                        return Err(TransportError::FRAME_ENCODING_ERROR(
2851                            "unrepresentable stream limit",
2852                        ));
2853                    }
2854                    debug!(
2855                        "peer claims to be blocked opening more than {} {} streams",
2856                        limit, dir
2857                    );
2858                }
2859                Frame::StopSending(frame::StopSending { id, error_code }) => {
2860                    if id.initiator() != self.side.side() {
2861                        if id.dir() == Dir::Uni {
2862                            debug!("got STOP_SENDING on recv-only {}", id);
2863                            return Err(TransportError::STREAM_STATE_ERROR(
2864                                "STOP_SENDING on recv-only stream",
2865                            ));
2866                        }
2867                    } else if self.streams.is_local_unopened(id) {
2868                        return Err(TransportError::STREAM_STATE_ERROR(
2869                            "STOP_SENDING on unopened stream",
2870                        ));
2871                    }
2872                    self.streams.received_stop_sending(id, error_code);
2873                }
2874                Frame::RetireConnectionId { sequence } => {
2875                    let allow_more_cids = self
2876                        .local_cid_state
2877                        .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
2878                    self.endpoint_events
2879                        .push_back(EndpointEventInner::RetireConnectionId(
2880                            now,
2881                            sequence,
2882                            allow_more_cids,
2883                        ));
2884                }
2885                Frame::NewConnectionId(frame) => {
2886                    trace!(
2887                        sequence = frame.sequence,
2888                        id = %frame.id,
2889                        retire_prior_to = frame.retire_prior_to,
2890                    );
2891                    if self.rem_cids.active().is_empty() {
2892                        return Err(TransportError::PROTOCOL_VIOLATION(
2893                            "NEW_CONNECTION_ID when CIDs aren't in use",
2894                        ));
2895                    }
2896                    if frame.retire_prior_to > frame.sequence {
2897                        return Err(TransportError::PROTOCOL_VIOLATION(
2898                            "NEW_CONNECTION_ID retiring unissued CIDs",
2899                        ));
2900                    }
2901
2902                    use crate::cid_queue::InsertError;
2903                    match self.rem_cids.insert(frame) {
2904                        Ok(None) => {}
2905                        Ok(Some((retired, reset_token))) => {
2906                            let pending_retired =
2907                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
2908                            /// Ensure `pending_retired` cannot grow without bound. Limit is
2909                            /// somewhat arbitrary but very permissive.
2910                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
2911                            // We don't bother counting in-flight frames because those are bounded
2912                            // by congestion control.
2913                            if (pending_retired.len() as u64)
2914                                .saturating_add(retired.end.saturating_sub(retired.start))
2915                                > MAX_PENDING_RETIRED_CIDS
2916                            {
2917                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
2918                                    "queued too many retired CIDs",
2919                                ));
2920                            }
2921                            pending_retired.extend(retired);
2922                            self.set_reset_token(reset_token);
2923                        }
2924                        Err(InsertError::ExceedsLimit) => {
2925                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
2926                        }
2927                        Err(InsertError::Retired) => {
2928                            trace!("discarding already-retired");
2929                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
2930                            // range of connection IDs larger than the active connection ID limit
2931                            // was retired all at once via retire_prior_to.
2932                            self.spaces[SpaceId::Data]
2933                                .pending
2934                                .retire_cids
2935                                .push(frame.sequence);
2936                            continue;
2937                        }
2938                    };
2939
2940                    if self.side.is_server() && self.rem_cids.active_seq() == 0 {
2941                        // We're a server still using the initial remote CID for the client, so
2942                        // let's switch immediately to enable clientside stateless resets.
2943                        self.update_rem_cid();
2944                    }
2945                }
2946                Frame::NewToken(NewToken { token }) => {
2947                    let ConnectionSide::Client {
2948                        token_store,
2949                        server_name,
2950                        ..
2951                    } = &self.side
2952                    else {
2953                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
2954                    };
2955                    if token.is_empty() {
2956                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
2957                    }
2958                    trace!("got new token");
2959                    token_store.insert(server_name, token);
2960                }
2961                Frame::Datagram(datagram) => {
2962                    if self
2963                        .datagrams
2964                        .received(datagram, &self.config.datagram_receive_buffer_size)?
2965                    {
2966                        self.events.push_back(Event::DatagramReceived);
2967                    }
2968                }
2969                Frame::AckFrequency(ack_frequency) => {
2970                    // This frame can only be sent in the Data space
2971                    let space = &mut self.spaces[SpaceId::Data];
2972
2973                    if !self
2974                        .ack_frequency
2975                        .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
2976                    {
2977                        // The AckFrequency frame is stale (we have already received a more recent one)
2978                        continue;
2979                    }
2980
2981                    // Our `max_ack_delay` has been updated, so we may need to adjust its associated
2982                    // timeout
2983                    if let Some(timeout) = space
2984                        .pending_acks
2985                        .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
2986                    {
2987                        self.timers.set(Timer::MaxAckDelay, timeout);
2988                    }
2989                }
2990                Frame::ImmediateAck => {
2991                    // This frame can only be sent in the Data space
2992                    self.spaces[SpaceId::Data]
2993                        .pending_acks
2994                        .set_immediate_ack_required();
2995                }
2996                Frame::HandshakeDone => {
2997                    if self.side.is_server() {
2998                        return Err(TransportError::PROTOCOL_VIOLATION(
2999                            "client sent HANDSHAKE_DONE",
3000                        ));
3001                    }
3002                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
3003                        self.discard_space(now, SpaceId::Handshake);
3004                    }
3005                }
3006            }
3007        }
3008
3009        let space = &mut self.spaces[SpaceId::Data];
3010        if space
3011            .pending_acks
3012            .packet_received(now, number, ack_eliciting, &space.dedup)
3013        {
3014            self.timers
3015                .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3016        }
3017
3018        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
3019        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
3020        // are only freed, and hence only issue credit, once the application has been notified
3021        // during a read on the stream.
3022        let pending = &mut self.spaces[SpaceId::Data].pending;
3023        self.streams.queue_max_stream_id(pending);
3024
3025        if let Some(reason) = close {
3026            self.error = Some(reason.into());
3027            self.state = State::Draining;
3028            self.close = true;
3029        }
3030
3031        if remote != self.path.remote
3032            && !is_probing_packet
3033            && number == self.spaces[SpaceId::Data].rx_packet
3034        {
3035            let ConnectionSide::Server { ref server_config } = self.side else {
3036                panic!("packets from unknown remote should be dropped by clients");
3037            };
3038            debug_assert!(
3039                server_config.migration,
3040                "migration-initiating packets should have been dropped immediately"
3041            );
3042            self.migrate(now, remote);
3043            // Break linkability, if possible
3044            self.update_rem_cid();
3045            self.spin = false;
3046        }
3047
3048        Ok(())
3049    }
3050
3051    fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3052        trace!(%remote, "migration initiated");
3053        self.path_counter = self.path_counter.wrapping_add(1);
3054        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
3055        // Note that the congestion window will not grow until validation terminates. Helps mitigate
3056        // amplification attacks performed by spoofing source addresses.
3057        let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3058            PathData::from_previous(remote, &self.path, self.path_counter, now)
3059        } else {
3060            let peer_max_udp_payload_size =
3061                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3062                    .unwrap_or(u16::MAX);
3063            PathData::new(
3064                remote,
3065                self.allow_mtud,
3066                Some(peer_max_udp_payload_size),
3067                self.path_counter,
3068                now,
3069                &self.config,
3070            )
3071        };
3072        new_path.challenge = Some(self.rng.random());
3073        new_path.challenge_pending = true;
3074        let prev_pto = self.pto(SpaceId::Data);
3075
3076        let mut prev = mem::replace(&mut self.path, new_path);
3077        // Don't clobber the original path if the previous one hasn't been validated yet
3078        if prev.challenge.is_none() {
3079            prev.challenge = Some(self.rng.random());
3080            prev.challenge_pending = true;
3081            // We haven't updated the remote CID yet, this captures the remote CID we were using on
3082            // the previous path.
3083            self.prev_path = Some((self.rem_cids.active(), prev));
3084        }
3085
3086        self.timers.set(
3087            Timer::PathValidation,
3088            now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3089        );
3090    }
3091
3092    /// Handle a change in the local address, i.e. an active migration
3093    pub fn local_address_changed(&mut self) {
3094        self.update_rem_cid();
3095        self.ping();
3096    }
3097
3098    /// Switch to a previously unused remote connection ID, if possible
3099    fn update_rem_cid(&mut self) {
3100        let (reset_token, retired) = match self.rem_cids.next() {
3101            Some(x) => x,
3102            None => return,
3103        };
3104
3105        // Retire the current remote CID and any CIDs we had to skip.
3106        self.spaces[SpaceId::Data]
3107            .pending
3108            .retire_cids
3109            .extend(retired);
3110        self.set_reset_token(reset_token);
3111    }
3112
3113    fn set_reset_token(&mut self, reset_token: ResetToken) {
3114        self.endpoint_events
3115            .push_back(EndpointEventInner::ResetToken(
3116                self.path.remote,
3117                reset_token,
3118            ));
3119        self.peer_params.stateless_reset_token = Some(reset_token);
3120    }
3121
3122    /// Issue an initial set of connection IDs to the peer upon connection
3123    fn issue_first_cids(&mut self, now: Instant) {
3124        if self.local_cid_state.cid_len() == 0 {
3125            return;
3126        }
3127
3128        // Subtract 1 to account for the CID we supplied while handshaking
3129        let mut n = self.peer_params.issue_cids_limit() - 1;
3130        if let ConnectionSide::Server { server_config } = &self.side {
3131            if server_config.has_preferred_address() {
3132                // We also sent a CID in the transport parameters
3133                n -= 1;
3134            }
3135        }
3136        self.endpoint_events
3137            .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3138    }
3139
3140    fn populate_packet(
3141        &mut self,
3142        now: Instant,
3143        space_id: SpaceId,
3144        buf: &mut Vec<u8>,
3145        max_size: usize,
3146        pn: u64,
3147    ) -> SentFrames {
3148        let mut sent = SentFrames::default();
3149        let space = &mut self.spaces[space_id];
3150        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3151        space.pending_acks.maybe_ack_non_eliciting();
3152
3153        // HANDSHAKE_DONE
3154        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3155            buf.write(frame::FrameType::HANDSHAKE_DONE);
3156            sent.retransmits.get_or_create().handshake_done = true;
3157            // This is just a u8 counter and the frame is typically just sent once
3158            self.stats.frame_tx.handshake_done =
3159                self.stats.frame_tx.handshake_done.saturating_add(1);
3160        }
3161
3162        // PING
3163        if mem::replace(&mut space.ping_pending, false) {
3164            trace!("PING");
3165            buf.write(frame::FrameType::PING);
3166            sent.non_retransmits = true;
3167            self.stats.frame_tx.ping += 1;
3168        }
3169
3170        // IMMEDIATE_ACK
3171        if mem::replace(&mut space.immediate_ack_pending, false) {
3172            trace!("IMMEDIATE_ACK");
3173            buf.write(frame::FrameType::IMMEDIATE_ACK);
3174            sent.non_retransmits = true;
3175            self.stats.frame_tx.immediate_ack += 1;
3176        }
3177
3178        // ACK
3179        if space.pending_acks.can_send() {
3180            Self::populate_acks(
3181                now,
3182                self.receiving_ecn,
3183                &mut sent,
3184                space,
3185                buf,
3186                &mut self.stats,
3187            );
3188        }
3189
3190        // ACK_FREQUENCY
3191        if mem::replace(&mut space.pending.ack_frequency, false) {
3192            let sequence_number = self.ack_frequency.next_sequence_number();
3193
3194            // Safe to unwrap because this is always provided when ACK frequency is enabled
3195            let config = self.config.ack_frequency_config.as_ref().unwrap();
3196
3197            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
3198            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3199                self.path.rtt.get(),
3200                config,
3201                &self.peer_params,
3202            );
3203
3204            trace!(?max_ack_delay, "ACK_FREQUENCY");
3205
3206            frame::AckFrequency {
3207                sequence: sequence_number,
3208                ack_eliciting_threshold: config.ack_eliciting_threshold,
3209                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3210                reordering_threshold: config.reordering_threshold,
3211            }
3212            .encode(buf);
3213
3214            sent.retransmits.get_or_create().ack_frequency = true;
3215
3216            self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3217            self.stats.frame_tx.ack_frequency += 1;
3218        }
3219
3220        // PATH_CHALLENGE
3221        if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3222            // Transmit challenges with every outgoing frame on an unvalidated path
3223            if let Some(token) = self.path.challenge {
3224                // But only send a packet solely for that purpose at most once
3225                self.path.challenge_pending = false;
3226                sent.non_retransmits = true;
3227                sent.requires_padding = true;
3228                trace!("PATH_CHALLENGE {:08x}", token);
3229                buf.write(frame::FrameType::PATH_CHALLENGE);
3230                buf.write(token);
3231                self.stats.frame_tx.path_challenge += 1;
3232            }
3233        }
3234
3235        // PATH_RESPONSE
3236        if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3237            if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3238                sent.non_retransmits = true;
3239                sent.requires_padding = true;
3240                trace!("PATH_RESPONSE {:08x}", token);
3241                buf.write(frame::FrameType::PATH_RESPONSE);
3242                buf.write(token);
3243                self.stats.frame_tx.path_response += 1;
3244            }
3245        }
3246
3247        // CRYPTO
3248        while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3249            let mut frame = match space.pending.crypto.pop_front() {
3250                Some(x) => x,
3251                None => break,
3252            };
3253
3254            // Calculate the maximum amount of crypto data we can store in the buffer.
3255            // Since the offset is known, we can reserve the exact size required to encode it.
3256            // For length we reserve 2bytes which allows to encode up to 2^14,
3257            // which is more than what fits into normally sized QUIC frames.
3258            let max_crypto_data_size = max_size
3259                - buf.len()
3260                - 1 // Frame Type
3261                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3262                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
3263
3264            let len = frame
3265                .data
3266                .len()
3267                .min(2usize.pow(14) - 1)
3268                .min(max_crypto_data_size);
3269
3270            let data = frame.data.split_to(len);
3271            let truncated = frame::Crypto {
3272                offset: frame.offset,
3273                data,
3274            };
3275            trace!(
3276                "CRYPTO: off {} len {}",
3277                truncated.offset,
3278                truncated.data.len()
3279            );
3280            truncated.encode(buf);
3281            self.stats.frame_tx.crypto += 1;
3282            sent.retransmits.get_or_create().crypto.push_back(truncated);
3283            if !frame.data.is_empty() {
3284                frame.offset += len as u64;
3285                space.pending.crypto.push_front(frame);
3286            }
3287        }
3288
3289        if space_id == SpaceId::Data {
3290            self.streams.write_control_frames(
3291                buf,
3292                &mut space.pending,
3293                &mut sent.retransmits,
3294                &mut self.stats.frame_tx,
3295                max_size,
3296            );
3297        }
3298
3299        // NEW_CONNECTION_ID
3300        while buf.len() + NewConnectionId::SIZE_BOUND < max_size {
3301            let issued = match space.pending.new_cids.pop() {
3302                Some(x) => x,
3303                None => break,
3304            };
3305            trace!(
3306                sequence = issued.sequence,
3307                id = %issued.id,
3308                "NEW_CONNECTION_ID"
3309            );
3310            frame::NewConnectionId {
3311                sequence: issued.sequence,
3312                retire_prior_to: self.local_cid_state.retire_prior_to(),
3313                id: issued.id,
3314                reset_token: issued.reset_token,
3315            }
3316            .encode(buf);
3317            sent.retransmits.get_or_create().new_cids.push(issued);
3318            self.stats.frame_tx.new_connection_id += 1;
3319        }
3320
3321        // RETIRE_CONNECTION_ID
3322        while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3323            let seq = match space.pending.retire_cids.pop() {
3324                Some(x) => x,
3325                None => break,
3326            };
3327            trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3328            buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3329            buf.write_var(seq);
3330            sent.retransmits.get_or_create().retire_cids.push(seq);
3331            self.stats.frame_tx.retire_connection_id += 1;
3332        }
3333
3334        // DATAGRAM
3335        let mut sent_datagrams = false;
3336        while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3337            match self.datagrams.write(buf, max_size) {
3338                true => {
3339                    sent_datagrams = true;
3340                    sent.non_retransmits = true;
3341                    self.stats.frame_tx.datagram += 1;
3342                }
3343                false => break,
3344            }
3345        }
3346        if self.datagrams.send_blocked && sent_datagrams {
3347            self.events.push_back(Event::DatagramsUnblocked);
3348            self.datagrams.send_blocked = false;
3349        }
3350
3351        // NEW_TOKEN
3352        while let Some(remote_addr) = space.pending.new_tokens.pop() {
3353            debug_assert_eq!(space_id, SpaceId::Data);
3354            let ConnectionSide::Server { server_config } = &self.side else {
3355                panic!("NEW_TOKEN frames should not be enqueued by clients");
3356            };
3357
3358            if remote_addr != self.path.remote {
3359                // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
3360                // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
3361                // frames upon an path change. Instead, when the new path becomes validated,
3362                // NEW_TOKEN frames may be enqueued for the new path instead.
3363                continue;
3364            }
3365
3366            let token = Token::new(
3367                TokenPayload::Validation {
3368                    ip: remote_addr.ip(),
3369                    issued: server_config.time_source.now(),
3370                },
3371                &mut self.rng,
3372            );
3373            let new_token = NewToken {
3374                token: token.encode(&*server_config.token_key).into(),
3375            };
3376
3377            if buf.len() + new_token.size() >= max_size {
3378                space.pending.new_tokens.push(remote_addr);
3379                break;
3380            }
3381
3382            new_token.encode(buf);
3383            sent.retransmits
3384                .get_or_create()
3385                .new_tokens
3386                .push(remote_addr);
3387            self.stats.frame_tx.new_token += 1;
3388        }
3389
3390        // STREAM
3391        if space_id == SpaceId::Data {
3392            sent.stream_frames =
3393                self.streams
3394                    .write_stream_frames(buf, max_size, self.config.send_fairness);
3395            self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
3396        }
3397
3398        sent
3399    }
3400
3401    /// Write pending ACKs into a buffer
3402    ///
3403    /// This method assumes ACKs are pending, and should only be called if
3404    /// `!PendingAcks::ranges().is_empty()` returns `true`.
3405    fn populate_acks(
3406        now: Instant,
3407        receiving_ecn: bool,
3408        sent: &mut SentFrames,
3409        space: &mut PacketSpace,
3410        buf: &mut Vec<u8>,
3411        stats: &mut ConnectionStats,
3412    ) {
3413        debug_assert!(!space.pending_acks.ranges().is_empty());
3414
3415        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
3416        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
3417        let ecn = if receiving_ecn {
3418            Some(&space.ecn_counters)
3419        } else {
3420            None
3421        };
3422        sent.largest_acked = space.pending_acks.ranges().max();
3423
3424        let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
3425
3426        // TODO: This should come from `TransportConfig` if that gets configurable.
3427        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
3428        let delay = delay_micros >> ack_delay_exp.into_inner();
3429
3430        trace!(
3431            "ACK {:?}, Delay = {}us",
3432            space.pending_acks.ranges(),
3433            delay_micros
3434        );
3435
3436        frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
3437        stats.frame_tx.acks += 1;
3438    }
3439
3440    fn close_common(&mut self) {
3441        trace!("connection closed");
3442        for &timer in &Timer::VALUES {
3443            self.timers.stop(timer);
3444        }
3445    }
3446
3447    fn set_close_timer(&mut self, now: Instant) {
3448        self.timers
3449            .set(Timer::Close, now + 3 * self.pto(self.highest_space));
3450    }
3451
3452    /// Handle transport parameters received from the peer
3453    fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
3454        if Some(self.orig_rem_cid) != params.initial_src_cid
3455            || (self.side.is_client()
3456                && (Some(self.initial_dst_cid) != params.original_dst_cid
3457                    || self.retry_src_cid != params.retry_src_cid))
3458        {
3459            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
3460                "CID authentication failure",
3461            ));
3462        }
3463
3464        self.set_peer_params(params);
3465
3466        Ok(())
3467    }
3468
3469    fn set_peer_params(&mut self, params: TransportParameters) {
3470        self.streams.set_params(&params);
3471        self.idle_timeout =
3472            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
3473        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
3474        if let Some(ref info) = params.preferred_address {
3475            self.rem_cids.insert(frame::NewConnectionId {
3476                sequence: 1,
3477                id: info.connection_id,
3478                reset_token: info.stateless_reset_token,
3479                retire_prior_to: 0,
3480            }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
3481        }
3482        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
3483        self.peer_params = params;
3484        self.path.mtud.on_peer_max_udp_payload_size_received(
3485            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
3486        );
3487    }
3488
3489    fn decrypt_packet(
3490        &mut self,
3491        now: Instant,
3492        packet: &mut Packet,
3493    ) -> Result<Option<u64>, Option<TransportError>> {
3494        let result = packet_crypto::decrypt_packet_body(
3495            packet,
3496            &self.spaces,
3497            self.zero_rtt_crypto.as_ref(),
3498            self.key_phase,
3499            self.prev_crypto.as_ref(),
3500            self.next_crypto.as_ref(),
3501        )?;
3502
3503        let result = match result {
3504            Some(r) => r,
3505            None => return Ok(None),
3506        };
3507
3508        if result.outgoing_key_update_acked {
3509            if let Some(prev) = self.prev_crypto.as_mut() {
3510                prev.end_packet = Some((result.number, now));
3511                self.set_key_discard_timer(now, packet.header.space());
3512            }
3513        }
3514
3515        if result.incoming_key_update {
3516            trace!("key update authenticated");
3517            self.update_keys(Some((result.number, now)), true);
3518            self.set_key_discard_timer(now, packet.header.space());
3519        }
3520
3521        Ok(Some(result.number))
3522    }
3523
3524    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
3525        trace!("executing key update");
3526        // Generate keys for the key phase after the one we're switching to, store them in
3527        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
3528        // `prev_crypto`.
3529        let new = self
3530            .crypto
3531            .next_1rtt_keys()
3532            .expect("only called for `Data` packets");
3533        self.key_phase_size = new
3534            .local
3535            .confidentiality_limit()
3536            .saturating_sub(KEY_UPDATE_MARGIN);
3537        let old = mem::replace(
3538            &mut self.spaces[SpaceId::Data]
3539                .crypto
3540                .as_mut()
3541                .unwrap() // safe because update_keys() can only be triggered by short packets
3542                .packet,
3543            mem::replace(self.next_crypto.as_mut().unwrap(), new),
3544        );
3545        self.spaces[SpaceId::Data].sent_with_keys = 0;
3546        self.prev_crypto = Some(PrevCrypto {
3547            crypto: old,
3548            end_packet,
3549            update_unacked: remote,
3550        });
3551        self.key_phase = !self.key_phase;
3552    }
3553
3554    fn peer_supports_ack_frequency(&self) -> bool {
3555        self.peer_params.min_ack_delay.is_some()
3556    }
3557
3558    /// Send an IMMEDIATE_ACK frame to the remote endpoint
3559    ///
3560    /// According to the spec, this will result in an error if the remote endpoint does not support
3561    /// the Acknowledgement Frequency extension
3562    pub(crate) fn immediate_ack(&mut self) {
3563        self.spaces[self.highest_space].immediate_ack_pending = true;
3564    }
3565
3566    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
3567    #[cfg(test)]
3568    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
3569        let (first_decode, remaining) = match &event.0 {
3570            ConnectionEventInner::Datagram(DatagramConnectionEvent {
3571                first_decode,
3572                remaining,
3573                ..
3574            }) => (first_decode, remaining),
3575            _ => return None,
3576        };
3577
3578        if remaining.is_some() {
3579            panic!("Packets should never be coalesced in tests");
3580        }
3581
3582        let decrypted_header = packet_crypto::unprotect_header(
3583            first_decode.clone(),
3584            &self.spaces,
3585            self.zero_rtt_crypto.as_ref(),
3586            self.peer_params.stateless_reset_token,
3587        )?;
3588
3589        let mut packet = decrypted_header.packet?;
3590        packet_crypto::decrypt_packet_body(
3591            &mut packet,
3592            &self.spaces,
3593            self.zero_rtt_crypto.as_ref(),
3594            self.key_phase,
3595            self.prev_crypto.as_ref(),
3596            self.next_crypto.as_ref(),
3597        )
3598        .ok()?;
3599
3600        Some(packet.payload.to_vec())
3601    }
3602
3603    /// The number of bytes of packets containing retransmittable frames that have not been
3604    /// acknowledged or declared lost.
3605    #[cfg(test)]
3606    pub(crate) fn bytes_in_flight(&self) -> u64 {
3607        self.path.in_flight.bytes
3608    }
3609
3610    /// Number of bytes worth of non-ack-only packets that may be sent
3611    #[cfg(test)]
3612    pub(crate) fn congestion_window(&self) -> u64 {
3613        self.path
3614            .congestion
3615            .window()
3616            .saturating_sub(self.path.in_flight.bytes)
3617    }
3618
3619    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
3620    #[cfg(test)]
3621    pub(crate) fn is_idle(&self) -> bool {
3622        Timer::VALUES
3623            .iter()
3624            .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
3625            .filter_map(|&t| Some((t, self.timers.get(t)?)))
3626            .min_by_key(|&(_, time)| time)
3627            .map_or(true, |(timer, _)| timer == Timer::Idle)
3628    }
3629
3630    /// Whether explicit congestion notification is in use on outgoing packets.
3631    #[cfg(test)]
3632    pub(crate) fn using_ecn(&self) -> bool {
3633        self.path.sending_ecn
3634    }
3635
3636    /// The number of received bytes in the current path
3637    #[cfg(test)]
3638    pub(crate) fn total_recvd(&self) -> u64 {
3639        self.path.total_recvd
3640    }
3641
3642    #[cfg(test)]
3643    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
3644        self.local_cid_state.active_seq()
3645    }
3646
3647    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
3648    /// with updated `retire_prior_to` field set to `v`
3649    #[cfg(test)]
3650    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
3651        let n = self.local_cid_state.assign_retire_seq(v);
3652        self.endpoint_events
3653            .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3654    }
3655
3656    /// Check the current active remote CID sequence
3657    #[cfg(test)]
3658    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
3659        self.rem_cids.active_seq()
3660    }
3661
3662    /// Returns the detected maximum udp payload size for the current path
3663    #[cfg(test)]
3664    pub(crate) fn path_mtu(&self) -> u16 {
3665        self.path.current_mtu()
3666    }
3667
3668    /// Whether we have 1-RTT data to send
3669    ///
3670    /// See also `self.space(SpaceId::Data).can_send()`
3671    fn can_send_1rtt(&self, max_size: usize) -> bool {
3672        self.streams.can_send_stream_data()
3673            || self.path.challenge_pending
3674            || self
3675                .prev_path
3676                .as_ref()
3677                .is_some_and(|(_, x)| x.challenge_pending)
3678            || !self.path_responses.is_empty()
3679            || self
3680                .datagrams
3681                .outgoing
3682                .front()
3683                .is_some_and(|x| x.size(true) <= max_size)
3684    }
3685
3686    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
3687    fn remove_in_flight(&mut self, packet: &SentPacket) {
3688        // Visit known paths from newest to oldest to find the one `packet` was sent on
3689        for path in [&mut self.path]
3690            .into_iter()
3691            .chain(self.prev_path.as_mut().map(|(_, data)| data))
3692        {
3693            if path.remove_in_flight(packet) {
3694                return;
3695            }
3696        }
3697    }
3698
3699    /// Terminate the connection instantly, without sending a close packet
3700    fn kill(&mut self, reason: ConnectionError) {
3701        self.close_common();
3702        self.error = Some(reason);
3703        self.state = State::Drained;
3704        self.endpoint_events.push_back(EndpointEventInner::Drained);
3705    }
3706
3707    /// Storage size required for the largest packet known to be supported by the current path
3708    ///
3709    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
3710    pub fn current_mtu(&self) -> u16 {
3711        self.path.current_mtu()
3712    }
3713
3714    /// Size of non-frame data for a 1-RTT packet
3715    ///
3716    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
3717    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
3718    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
3719    /// latency and packet loss.
3720    fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
3721        let pn_len = match pn {
3722            Some(pn) => PacketNumber::new(
3723                pn,
3724                self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
3725            )
3726            .len(),
3727            // Upper bound
3728            None => 4,
3729        };
3730
3731        // 1 byte for flags
3732        1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
3733    }
3734
3735    fn tag_len_1rtt(&self) -> usize {
3736        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
3737            Some(crypto) => Some(&*crypto.packet.local),
3738            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
3739        };
3740        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
3741        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
3742        // but that would needlessly prevent sending datagrams during 0-RTT.
3743        key.map_or(16, |x| x.tag_len())
3744    }
3745
3746    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
3747    fn on_path_validated(&mut self) {
3748        self.path.validated = true;
3749        let ConnectionSide::Server { server_config } = &self.side else {
3750            return;
3751        };
3752        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
3753        new_tokens.clear();
3754        for _ in 0..server_config.validation_token.sent {
3755            new_tokens.push(self.path.remote);
3756        }
3757    }
3758}
3759
3760impl fmt::Debug for Connection {
3761    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3762        f.debug_struct("Connection")
3763            .field("handshake_cid", &self.handshake_cid)
3764            .finish()
3765    }
3766}
3767
3768/// Fields of `Connection` specific to it being client-side or server-side
3769enum ConnectionSide {
3770    Client {
3771        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
3772        token: Bytes,
3773        token_store: Arc<dyn TokenStore>,
3774        server_name: String,
3775    },
3776    Server {
3777        server_config: Arc<ServerConfig>,
3778    },
3779}
3780
3781impl ConnectionSide {
3782    fn remote_may_migrate(&self) -> bool {
3783        match self {
3784            Self::Server { server_config } => server_config.migration,
3785            Self::Client { .. } => false,
3786        }
3787    }
3788
3789    fn is_client(&self) -> bool {
3790        self.side().is_client()
3791    }
3792
3793    fn is_server(&self) -> bool {
3794        self.side().is_server()
3795    }
3796
3797    fn side(&self) -> Side {
3798        match *self {
3799            Self::Client { .. } => Side::Client,
3800            Self::Server { .. } => Side::Server,
3801        }
3802    }
3803}
3804
3805impl From<SideArgs> for ConnectionSide {
3806    fn from(side: SideArgs) -> Self {
3807        match side {
3808            SideArgs::Client {
3809                token_store,
3810                server_name,
3811            } => Self::Client {
3812                token: token_store.take(&server_name).unwrap_or_default(),
3813                token_store,
3814                server_name,
3815            },
3816            SideArgs::Server {
3817                server_config,
3818                pref_addr_cid: _,
3819                path_validated: _,
3820            } => Self::Server { server_config },
3821        }
3822    }
3823}
3824
3825/// Parameters to `Connection::new` specific to it being client-side or server-side
3826pub(crate) enum SideArgs {
3827    Client {
3828        token_store: Arc<dyn TokenStore>,
3829        server_name: String,
3830    },
3831    Server {
3832        server_config: Arc<ServerConfig>,
3833        pref_addr_cid: Option<ConnectionId>,
3834        path_validated: bool,
3835    },
3836}
3837
3838impl SideArgs {
3839    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
3840        match *self {
3841            Self::Client { .. } => None,
3842            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
3843        }
3844    }
3845
3846    pub(crate) fn path_validated(&self) -> bool {
3847        match *self {
3848            Self::Client { .. } => true,
3849            Self::Server { path_validated, .. } => path_validated,
3850        }
3851    }
3852
3853    pub(crate) fn side(&self) -> Side {
3854        match *self {
3855            Self::Client { .. } => Side::Client,
3856            Self::Server { .. } => Side::Server,
3857        }
3858    }
3859}
3860
3861/// Reasons why a connection might be lost
3862#[derive(Debug, Error, Clone, PartialEq, Eq)]
3863pub enum ConnectionError {
3864    /// The peer doesn't implement any supported version
3865    #[error("peer doesn't implement any supported version")]
3866    VersionMismatch,
3867    /// The peer violated the QUIC specification as understood by this implementation
3868    #[error(transparent)]
3869    TransportError(#[from] TransportError),
3870    /// The peer's QUIC stack aborted the connection automatically
3871    #[error("aborted by peer: {0}")]
3872    ConnectionClosed(frame::ConnectionClose),
3873    /// The peer closed the connection
3874    #[error("closed by peer: {0}")]
3875    ApplicationClosed(frame::ApplicationClose),
3876    /// The peer is unable to continue processing this connection, usually due to having restarted
3877    #[error("reset by peer")]
3878    Reset,
3879    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
3880    ///
3881    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
3882    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
3883    /// and [`TransportConfig::keep_alive_interval()`].
3884    #[error("timed out")]
3885    TimedOut,
3886    /// The local application closed the connection
3887    #[error("closed")]
3888    LocallyClosed,
3889    /// The connection could not be created because not enough of the CID space is available
3890    ///
3891    /// Try using longer connection IDs.
3892    #[error("CIDs exhausted")]
3893    CidsExhausted,
3894}
3895
3896impl From<Close> for ConnectionError {
3897    fn from(x: Close) -> Self {
3898        match x {
3899            Close::Connection(reason) => Self::ConnectionClosed(reason),
3900            Close::Application(reason) => Self::ApplicationClosed(reason),
3901        }
3902    }
3903}
3904
3905// For compatibility with API consumers
3906impl From<ConnectionError> for io::Error {
3907    fn from(x: ConnectionError) -> Self {
3908        use ConnectionError::*;
3909        let kind = match x {
3910            TimedOut => io::ErrorKind::TimedOut,
3911            Reset => io::ErrorKind::ConnectionReset,
3912            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
3913            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
3914                io::ErrorKind::Other
3915            }
3916        };
3917        Self::new(kind, x)
3918    }
3919}
3920
3921#[allow(unreachable_pub)] // fuzzing only
3922#[derive(Clone)]
3923pub enum State {
3924    Handshake(state::Handshake),
3925    Established,
3926    Closed(state::Closed),
3927    Draining,
3928    /// Waiting for application to call close so we can dispose of the resources
3929    Drained,
3930}
3931
3932impl State {
3933    fn closed<R: Into<Close>>(reason: R) -> Self {
3934        Self::Closed(state::Closed {
3935            reason: reason.into(),
3936        })
3937    }
3938
3939    fn is_handshake(&self) -> bool {
3940        matches!(*self, Self::Handshake(_))
3941    }
3942
3943    fn is_established(&self) -> bool {
3944        matches!(*self, Self::Established)
3945    }
3946
3947    fn is_closed(&self) -> bool {
3948        matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
3949    }
3950
3951    fn is_drained(&self) -> bool {
3952        matches!(*self, Self::Drained)
3953    }
3954}
3955
3956mod state {
3957    use super::*;
3958
3959    #[allow(unreachable_pub)] // fuzzing only
3960    #[derive(Clone)]
3961    pub struct Handshake {
3962        /// Whether the remote CID has been set by the peer yet
3963        ///
3964        /// Always set for servers
3965        pub(super) rem_cid_set: bool,
3966        /// Stateless retry token received in the first Initial by a server.
3967        ///
3968        /// Must be present in every Initial. Always empty for clients.
3969        pub(super) expected_token: Bytes,
3970        /// First cryptographic message
3971        ///
3972        /// Only set for clients
3973        pub(super) client_hello: Option<Bytes>,
3974    }
3975
3976    #[allow(unreachable_pub)] // fuzzing only
3977    #[derive(Clone)]
3978    pub struct Closed {
3979        pub(super) reason: Close,
3980    }
3981}
3982
3983/// Events of interest to the application
3984#[derive(Debug)]
3985pub enum Event {
3986    /// The connection's handshake data is ready
3987    HandshakeDataReady,
3988    /// The connection was successfully established
3989    Connected,
3990    /// The connection was lost
3991    ///
3992    /// Emitted if the peer closes the connection or an error is encountered.
3993    ConnectionLost {
3994        /// Reason that the connection was closed
3995        reason: ConnectionError,
3996    },
3997    /// Stream events
3998    Stream(StreamEvent),
3999    /// One or more application datagrams have been received
4000    DatagramReceived,
4001    /// One or more application datagrams have been sent after blocking
4002    DatagramsUnblocked,
4003}
4004
4005fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
4006    if x > y { x - y } else { Duration::ZERO }
4007}
4008
4009fn get_max_ack_delay(params: &TransportParameters) -> Duration {
4010    Duration::from_micros(params.max_ack_delay.0 * 1000)
4011}
4012
4013// Prevents overflow and improves behavior in extreme circumstances
4014const MAX_BACKOFF_EXPONENT: u32 = 16;
4015
4016/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
4017///
4018/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
4019/// plus some space for frames. We only care about handshake headers because short header packets
4020/// necessarily have smaller headers, and initial packets are only ever the first packet in a
4021/// datagram (because we coalesce in ascending packet space order and the only reason to split a
4022/// packet is when packet space changes).
4023const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
4024
4025/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
4026///
4027/// Excludes packet-type-specific fields such as packet number or Initial token
4028// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
4029// scid len + scid + length + pn
4030const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
4031    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
4032
4033/// Perform key updates this many packets before the AEAD confidentiality limit.
4034///
4035/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
4036const KEY_UPDATE_MARGIN: u64 = 10_000;
4037
4038#[derive(Default)]
4039struct SentFrames {
4040    retransmits: ThinRetransmits,
4041    largest_acked: Option<u64>,
4042    stream_frames: StreamMetaVec,
4043    /// Whether the packet contains non-retransmittable frames (like datagrams)
4044    non_retransmits: bool,
4045    requires_padding: bool,
4046}
4047
4048impl SentFrames {
4049    /// Returns whether the packet contains only ACKs
4050    fn is_ack_only(&self, streams: &StreamsState) -> bool {
4051        self.largest_acked.is_some()
4052            && !self.non_retransmits
4053            && self.stream_frames.is_empty()
4054            && self.retransmits.is_empty(streams)
4055    }
4056}
4057
4058/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
4059///
4060/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
4061///
4062/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
4063///
4064/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
4065fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
4066    match (x, y) {
4067        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
4068        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
4069        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
4070        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
4071    }
4072}
4073
4074#[cfg(test)]
4075mod tests {
4076    use super::*;
4077
4078    #[test]
4079    fn negotiate_max_idle_timeout_commutative() {
4080        let test_params = [
4081            (None, None, None),
4082            (None, Some(VarInt(0)), None),
4083            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
4084            (Some(VarInt(0)), Some(VarInt(0)), None),
4085            (
4086                Some(VarInt(2)),
4087                Some(VarInt(0)),
4088                Some(Duration::from_millis(2)),
4089            ),
4090            (
4091                Some(VarInt(1)),
4092                Some(VarInt(4)),
4093                Some(Duration::from_millis(1)),
4094            ),
4095        ];
4096
4097        for (left, right, result) in test_params {
4098            assert_eq!(negotiate_max_idle_timeout(left, right), result);
4099            assert_eq!(negotiate_max_idle_timeout(right, left), result);
4100        }
4101    }
4102}