Skip to main content

quinn_proto_jls/connection/
mod.rs

1use std::{
2    cmp,
3    collections::VecDeque,
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12
13use rand::{Rng, SeedableRng, rngs::StdRng};
14use thiserror::Error;
15use tracing::{debug, error, trace, trace_span, warn};
16
17use crate::{
18    Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
19    MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
20    TransportErrorCode, VarInt,
21    cid_generator::ConnectionIdGenerator,
22    cid_queue::CidQueue,
23    coding::BufMutExt,
24    config::{ServerConfig, TransportConfig},
25    crypto::{self, KeyPair, Keys, PacketKey},
26    frame::{self, Close, Datagram, FrameStruct, NewConnectionId, NewToken},
27    packet::{
28        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
29        PacketNumber, PartialDecode, SpaceId,
30    },
31    range_set::ArrayRangeSet,
32    shared::{
33        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
34        EndpointEvent, EndpointEventInner,
35    },
36    token::{ResetToken, Token, TokenPayload},
37    transport_parameters::TransportParameters,
38};
39
40mod ack_frequency;
41use ack_frequency::AckFrequencyState;
42
43mod assembler;
44pub use assembler::Chunk;
45
46mod cid_state;
47use cid_state::CidState;
48
49mod datagrams;
50use datagrams::DatagramState;
51pub use datagrams::{Datagrams, SendDatagramError};
52
53mod mtud;
54mod pacing;
55
56mod packet_builder;
57use packet_builder::PacketBuilder;
58
59mod packet_crypto;
60use packet_crypto::{PrevCrypto, ZeroRttCrypto};
61
62mod paths;
63pub use paths::RttEstimator;
64use paths::{PathData, PathResponses};
65
66pub(crate) mod qlog;
67
68mod send_buffer;
69
70mod spaces;
71#[cfg(fuzzing)]
72pub use spaces::Retransmits;
73#[cfg(not(fuzzing))]
74use spaces::Retransmits;
75use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
76
77mod stats;
78pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
79
80mod streams;
81#[cfg(fuzzing)]
82pub use streams::StreamsState;
83#[cfg(not(fuzzing))]
84use streams::StreamsState;
85pub use streams::{
86    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
87    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
88};
89
90mod timer;
91use crate::congestion::Controller;
92use timer::{Timer, TimerTable};
93
94/// Protocol state and logic for a single QUIC connection
95///
96/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
97/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
98/// expects timeouts through various methods. A number of simple getter methods are exposed
99/// to allow callers to inspect some of the connection state.
100///
101/// `Connection` has roughly 4 types of methods:
102///
103/// - A. Simple getters, taking `&self`
104/// - B. Handlers for incoming events from the network or system, named `handle_*`.
105/// - C. State machine mutators, for incoming commands from the application. For convenience we
106///   refer to this as "performing I/O" below, however as per the design of this library none of the
107///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
108///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
109/// - D. Polling functions for outgoing events or actions for the caller to
110///   take, named `poll_*`.
111///
112/// The simplest way to use this API correctly is to call (B) and (C) whenever
113/// appropriate, then after each of those calls, as soon as feasible call all
114/// polling methods (D) and deal with their outputs appropriately, e.g. by
115/// passing it to the application or by making a system-level I/O call. You
116/// should call the polling functions in this order:
117///
118/// 1. [`poll_transmit`](Self::poll_transmit)
119/// 2. [`poll_timeout`](Self::poll_timeout)
120/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
121/// 4. [`poll`](Self::poll)
122///
123/// Currently the only actual dependency is from (2) to (1), however additional
124/// dependencies may be added in future, so the above order is recommended.
125///
126/// (A) may be called whenever desired.
127///
128/// Care should be made to ensure that the input events represent monotonically
129/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
130/// with events of the same [`Instant`] may be interleaved in any order with a
131/// call to [`handle_event`](Self::handle_event) at that same instant; however
132/// events or timeouts with different instants must not be interleaved.
133pub struct Connection {
134    endpoint_config: Arc<EndpointConfig>,
135    config: Arc<TransportConfig>,
136    rng: StdRng,
137    crypto: Box<dyn crypto::Session>,
138    /// The CID we initially chose, for use during the handshake
139    handshake_cid: ConnectionId,
140    /// The CID the peer initially chose, for use during the handshake
141    rem_handshake_cid: ConnectionId,
142    /// The "real" local IP address which was was used to receive the initial packet.
143    /// This is only populated for the server case, and if known
144    local_ip: Option<IpAddr>,
145    path: PathData,
146    /// Incremented every time we see a new path
147    ///
148    /// Stored separately from `path.generation` to account for aborted migrations
149    path_counter: u64,
150    /// Whether MTU detection is supported in this environment
151    allow_mtud: bool,
152    prev_path: Option<(ConnectionId, PathData)>,
153    state: State,
154    side: ConnectionSide,
155    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
156    zero_rtt_enabled: bool,
157    /// Set if 0-RTT is supported, then cleared when no longer needed.
158    zero_rtt_crypto: Option<ZeroRttCrypto>,
159    key_phase: bool,
160    /// How many packets are in the current key phase. Used only for `Data` space.
161    key_phase_size: u64,
162    /// Transport parameters set by the peer
163    peer_params: TransportParameters,
164    /// Source ConnectionId of the first packet received from the peer
165    orig_rem_cid: ConnectionId,
166    /// Destination ConnectionId sent by the client on the first Initial
167    initial_dst_cid: ConnectionId,
168    /// The value that the server included in the Source Connection ID field of a Retry packet, if
169    /// one was received
170    retry_src_cid: Option<ConnectionId>,
171    events: VecDeque<Event>,
172    endpoint_events: VecDeque<EndpointEventInner>,
173    /// Whether the spin bit is in use for this connection
174    spin_enabled: bool,
175    /// Outgoing spin bit state
176    spin: bool,
177    /// Packet number spaces: initial, handshake, 1-RTT
178    spaces: [PacketSpace; 3],
179    /// Highest usable packet number space
180    highest_space: SpaceId,
181    /// 1-RTT keys used prior to a key update
182    prev_crypto: Option<PrevCrypto>,
183    /// 1-RTT keys to be used for the next key update
184    ///
185    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
186    /// spoofing key updates.
187    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
188    accepted_0rtt: bool,
189    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
190    permit_idle_reset: bool,
191    /// Negotiated idle timeout
192    idle_timeout: Option<Duration>,
193    timers: TimerTable,
194    /// Number of packets received which could not be authenticated
195    authentication_failures: u64,
196    /// Why the connection was lost, if it has been
197    error: Option<ConnectionError>,
198    /// Identifies Data-space packet numbers to skip. Not used in earlier spaces.
199    packet_number_filter: PacketNumberFilter,
200
201    //
202    // Queued non-retransmittable 1-RTT data
203    //
204    /// Responses to PATH_CHALLENGE frames
205    path_responses: PathResponses,
206    close: bool,
207
208    //
209    // ACK frequency
210    //
211    ack_frequency: AckFrequencyState,
212
213    //
214    // Loss Detection
215    //
216    /// The number of times a PTO has been sent without receiving an ack.
217    pto_count: u32,
218
219    //
220    // Congestion Control
221    //
222    /// Whether the most recently received packet had an ECN codepoint set
223    receiving_ecn: bool,
224    /// Number of packets authenticated
225    total_authed_packets: u64,
226    /// Whether the last `poll_transmit` call yielded no data because there was
227    /// no outgoing application data.
228    app_limited: bool,
229
230    streams: StreamsState,
231    /// Surplus remote CIDs for future use on new paths
232    rem_cids: CidQueue,
233    // Attributes of CIDs generated by local peer
234    local_cid_state: CidState,
235    /// State of the unreliable datagram extension
236    datagrams: DatagramState,
237    /// Connection level statistics
238    stats: ConnectionStats,
239    /// QUIC version used for the connection.
240    version: u32,
241}
242
243impl Connection {
244    pub(crate) fn new(
245        endpoint_config: Arc<EndpointConfig>,
246        config: Arc<TransportConfig>,
247        init_cid: ConnectionId,
248        loc_cid: ConnectionId,
249        rem_cid: ConnectionId,
250        remote: SocketAddr,
251        local_ip: Option<IpAddr>,
252        crypto: Box<dyn crypto::Session>,
253        cid_gen: &dyn ConnectionIdGenerator,
254        now: Instant,
255        version: u32,
256        allow_mtud: bool,
257        rng_seed: [u8; 32],
258        side_args: SideArgs,
259    ) -> Self {
260        let pref_addr_cid = side_args.pref_addr_cid();
261        let path_validated = side_args.path_validated();
262        let connection_side = ConnectionSide::from(side_args);
263        let side = connection_side.side();
264        let initial_space = PacketSpace {
265            crypto: Some(crypto.initial_keys(&init_cid, side)),
266            ..PacketSpace::new(now)
267        };
268        let state = State::Handshake(state::Handshake {
269            rem_cid_set: side.is_server(),
270            expected_token: Bytes::new(),
271            client_hello: None,
272        });
273        let mut rng = StdRng::from_seed(rng_seed);
274        let mut this = Self {
275            endpoint_config,
276            crypto,
277            handshake_cid: loc_cid,
278            rem_handshake_cid: rem_cid,
279            local_cid_state: CidState::new(
280                cid_gen.cid_len(),
281                cid_gen.cid_lifetime(),
282                now,
283                if pref_addr_cid.is_some() { 2 } else { 1 },
284            ),
285            path: PathData::new(remote, allow_mtud, None, 0, now, &config),
286            path_counter: 0,
287            allow_mtud,
288            local_ip,
289            prev_path: None,
290            state,
291            side: connection_side,
292            zero_rtt_enabled: false,
293            zero_rtt_crypto: None,
294            key_phase: false,
295            // A small initial key phase size ensures peers that don't handle key updates correctly
296            // fail sooner rather than later. It's okay for both peers to do this, as the first one
297            // to perform an update will reset the other's key phase size in `update_keys`, and a
298            // simultaneous key update by both is just like a regular key update with a really fast
299            // response. Inspired by quic-go's similar behavior of performing the first key update
300            // at the 100th short-header packet.
301            key_phase_size: rng.random_range(10..1000),
302            peer_params: TransportParameters::default(),
303            orig_rem_cid: rem_cid,
304            initial_dst_cid: init_cid,
305            retry_src_cid: None,
306            events: VecDeque::new(),
307            endpoint_events: VecDeque::new(),
308            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
309            spin: false,
310            spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
311            highest_space: SpaceId::Initial,
312            prev_crypto: None,
313            next_crypto: None,
314            accepted_0rtt: false,
315            permit_idle_reset: true,
316            idle_timeout: match config.max_idle_timeout {
317                None | Some(VarInt(0)) => None,
318                Some(dur) => Some(Duration::from_millis(dur.0)),
319            },
320            timers: TimerTable::default(),
321            authentication_failures: 0,
322            error: None,
323            #[cfg(test)]
324            packet_number_filter: match config.deterministic_packet_numbers {
325                false => PacketNumberFilter::new(&mut rng),
326                true => PacketNumberFilter::disabled(),
327            },
328            #[cfg(not(test))]
329            packet_number_filter: PacketNumberFilter::new(&mut rng),
330
331            path_responses: PathResponses::default(),
332            close: false,
333
334            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
335                &TransportParameters::default(),
336            )),
337
338            pto_count: 0,
339
340            app_limited: false,
341            receiving_ecn: false,
342            total_authed_packets: 0,
343
344            streams: StreamsState::new(
345                side,
346                config.max_concurrent_uni_streams,
347                config.max_concurrent_bidi_streams,
348                config.send_window,
349                config.receive_window,
350                config.stream_receive_window,
351            ),
352            datagrams: DatagramState::default(),
353            config,
354            rem_cids: CidQueue::new(rem_cid),
355            rng,
356            stats: ConnectionStats::default(),
357            version,
358        };
359        if path_validated {
360            this.on_path_validated();
361        }
362        if side.is_client() {
363            // Kick off the connection
364            this.write_crypto();
365            this.init_0rtt();
366        }
367        this
368    }
369
370    /// Returns the next time at which `handle_timeout` should be called
371    ///
372    /// The value returned may change after:
373    /// - the application performed some I/O on the connection
374    /// - a call was made to `handle_event`
375    /// - a call to `poll_transmit` returned `Some`
376    /// - a call was made to `handle_timeout`
377    #[must_use]
378    pub fn poll_timeout(&mut self) -> Option<Instant> {
379        self.timers.next_timeout()
380    }
381
382    /// Returns application-facing events
383    ///
384    /// Connections should be polled for events after:
385    /// - a call was made to `handle_event`
386    /// - a call was made to `handle_timeout`
387    #[must_use]
388    pub fn poll(&mut self) -> Option<Event> {
389        if let Some(x) = self.events.pop_front() {
390            return Some(x);
391        }
392
393        if let Some(event) = self.streams.poll() {
394            return Some(Event::Stream(event));
395        }
396
397        if let Some(err) = self.error.take() {
398            return Some(Event::ConnectionLost { reason: err });
399        }
400
401        None
402    }
403
404    /// Return endpoint-facing events
405    #[must_use]
406    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
407        self.endpoint_events.pop_front().map(EndpointEvent)
408    }
409
410    /// Provide control over streams
411    #[must_use]
412    pub fn streams(&mut self) -> Streams<'_> {
413        Streams {
414            state: &mut self.streams,
415            conn_state: &self.state,
416        }
417    }
418
419    /// Provide control over streams
420    #[must_use]
421    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
422        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
423        RecvStream {
424            id,
425            state: &mut self.streams,
426            pending: &mut self.spaces[SpaceId::Data].pending,
427        }
428    }
429
430    /// Provide control over streams
431    #[must_use]
432    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
433        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
434        SendStream {
435            id,
436            state: &mut self.streams,
437            pending: &mut self.spaces[SpaceId::Data].pending,
438            conn_state: &self.state,
439        }
440    }
441
442    /// Returns packets to transmit
443    ///
444    /// Connections should be polled for transmit after:
445    /// - the application performed some I/O on the connection
446    /// - a call was made to `handle_event`
447    /// - a call was made to `handle_timeout`
448    ///
449    /// `max_datagrams` specifies how many datagrams can be returned inside a
450    /// single Transmit using GSO. This must be at least 1.
451    #[must_use]
452    pub fn poll_transmit(
453        &mut self,
454        now: Instant,
455        max_datagrams: usize,
456        buf: &mut Vec<u8>,
457    ) -> Option<Transmit> {
458        assert!(max_datagrams != 0);
459        let max_datagrams = match self.config.enable_segmentation_offload {
460            false => 1,
461            true => max_datagrams,
462        };
463
464        let mut num_datagrams = 0;
465        // Position in `buf` of the first byte of the current UDP datagram. When coalescing QUIC
466        // packets, this can be earlier than the start of the current QUIC packet.
467        let mut datagram_start = 0;
468        let mut segment_size = usize::from(self.path.current_mtu());
469
470        if let Some(challenge) = self.send_path_challenge(now, buf) {
471            return Some(challenge);
472        }
473
474        // If we need to send a probe, make sure we have something to send.
475        for space in SpaceId::iter() {
476            let request_immediate_ack =
477                space == SpaceId::Data && self.peer_supports_ack_frequency();
478            self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
479        }
480
481        // Check whether we need to send a close message
482        let close = match self.state {
483            State::Drained => {
484                self.app_limited = true;
485                return None;
486            }
487            State::Draining | State::Closed(_) => {
488                // self.close is only reset once the associated packet had been
489                // encoded successfully
490                if !self.close {
491                    self.app_limited = true;
492                    return None;
493                }
494                true
495            }
496            _ => false,
497        };
498
499        // Check whether we need to send an ACK_FREQUENCY frame
500        if let Some(config) = &self.config.ack_frequency_config {
501            self.spaces[SpaceId::Data].pending.ack_frequency = self
502                .ack_frequency
503                .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
504                && self.highest_space == SpaceId::Data
505                && self.peer_supports_ack_frequency();
506        }
507
508        // Reserving capacity can provide more capacity than we asked for. However, we are not
509        // allowed to write more than `segment_size`. Therefore the maximum capacity is tracked
510        // separately.
511        let mut buf_capacity = 0;
512
513        let mut coalesce = true;
514        let mut builder_storage: Option<PacketBuilder> = None;
515        let mut sent_frames = None;
516        let mut pad_datagram = false;
517        let mut pad_datagram_to_mtu = false;
518        let mut congestion_blocked = false;
519
520        // Iterate over all spaces and find data to send
521        let mut space_idx = 0;
522        let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
523        // This loop will potentially spend multiple iterations in the same `SpaceId`,
524        // so we cannot trivially rewrite it to take advantage of `SpaceId::iter()`.
525        while space_idx < spaces.len() {
526            let space_id = spaces[space_idx];
527            // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed to
528            // be able to send an individual frame at least this large in the next 1-RTT
529            // packet. This could be generalized to support every space, but it's only needed to
530            // handle large fixed-size frames, which only exist in 1-RTT (application datagrams). We
531            // don't account for coalesced packets potentially occupying space because frames can
532            // always spill into the next datagram.
533            let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
534            let frame_space_1rtt =
535                segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
536
537            // Is there data or a close message to send in this space?
538            let can_send = self.space_can_send(space_id, frame_space_1rtt);
539            if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
540                space_idx += 1;
541                continue;
542            }
543
544            let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
545                || self.spaces[space_id].ping_pending
546                || self.spaces[space_id].immediate_ack_pending;
547            if space_id == SpaceId::Data {
548                ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
549            }
550
551            pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
552
553            // Can we append more data into the current buffer?
554            // It is not safe to assume that `buf.len()` is the end of the data,
555            // since the last packet might not have been finished.
556            let buf_end = if let Some(builder) = &builder_storage {
557                buf.len().max(builder.min_size) + builder.tag_len
558            } else {
559                buf.len()
560            };
561
562            let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
563                crypto.packet.local.tag_len()
564            } else if space_id == SpaceId::Data {
565                self.zero_rtt_crypto.as_ref().expect(
566                    "sending packets in the application data space requires known 0-RTT or 1-RTT keys",
567                ).packet.tag_len()
568            } else {
569                unreachable!("tried to send {:?} packet without keys", space_id)
570            };
571            if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
572                // We need to send 1 more datagram and extend the buffer for that.
573
574                // Is 1 more datagram allowed?
575                if num_datagrams >= max_datagrams {
576                    // No more datagrams allowed
577                    break;
578                }
579
580                // Anti-amplification is only based on `total_sent`, which gets
581                // updated at the end of this method. Therefore we pass the amount
582                // of bytes for datagrams that are already created, as well as 1 byte
583                // for starting another datagram. If there is any anti-amplification
584                // budget left, we always allow a full MTU to be sent
585                // (see https://github.com/quinn-rs/quinn/issues/1082)
586                if self
587                    .path
588                    .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
589                {
590                    trace!("blocked by anti-amplification");
591                    break;
592                }
593
594                // Congestion control and pacing checks
595                // Tail loss probes must not be blocked by congestion, or a deadlock could arise
596                if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
597                    // Assume the current packet will get padded to fill the segment
598                    let untracked_bytes = if let Some(builder) = &builder_storage {
599                        buf_capacity - builder.partial_encode.start
600                    } else {
601                        0
602                    } as u64;
603                    debug_assert!(untracked_bytes <= segment_size as u64);
604
605                    let bytes_to_send = segment_size as u64 + untracked_bytes;
606                    if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
607                        space_idx += 1;
608                        congestion_blocked = true;
609                        // We continue instead of breaking here in order to avoid
610                        // blocking loss probes queued for higher spaces.
611                        trace!("blocked by congestion control");
612                        continue;
613                    }
614
615                    // Check whether the next datagram is blocked by pacing
616                    let smoothed_rtt = self.path.rtt.get();
617                    if let Some(delay) = self.path.pacing.delay(
618                        smoothed_rtt,
619                        bytes_to_send,
620                        self.path.current_mtu(),
621                        self.path.congestion.window(),
622                        now,
623                    ) {
624                        self.timers.set(Timer::Pacing, delay);
625                        congestion_blocked = true;
626                        // Loss probes should be subject to pacing, even though
627                        // they are not congestion controlled.
628                        trace!("blocked by pacing");
629                        break;
630                    }
631                }
632
633                // Finish current packet
634                if let Some(mut builder) = builder_storage.take() {
635                    if pad_datagram {
636                        builder.pad_to(MIN_INITIAL_SIZE);
637                    }
638
639                    if num_datagrams > 1 || pad_datagram_to_mtu {
640                        // If too many padding bytes would be required to continue the GSO batch
641                        // after this packet, end the GSO batch here. Ensures that fixed-size frames
642                        // with heterogeneous sizes (e.g. application datagrams) won't inadvertently
643                        // waste large amounts of bandwidth. The exact threshold is a bit arbitrary
644                        // and might benefit from further tuning, though there's no universally
645                        // optimal value.
646                        //
647                        // Additionally, if this datagram is a loss probe and `segment_size` is
648                        // larger than `INITIAL_MTU`, then padding it to `segment_size` to continue
649                        // the GSO batch would risk failure to recover from a reduction in path
650                        // MTU. Loss probes are the only packets for which we might grow
651                        // `buf_capacity` by less than `segment_size`.
652                        const MAX_PADDING: usize = 16;
653                        let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
654                            - datagram_start
655                            + builder.tag_len;
656                        if (packet_len_unpadded + MAX_PADDING < segment_size
657                            && !pad_datagram_to_mtu)
658                            || datagram_start + segment_size > buf_capacity
659                        {
660                            trace!(
661                                "GSO truncated by demand for {} padding bytes or loss probe",
662                                segment_size - packet_len_unpadded
663                            );
664                            builder_storage = Some(builder);
665                            break;
666                        }
667
668                        // Pad the current datagram to GSO segment size so it can be included in the
669                        // GSO batch.
670                        builder.pad_to(segment_size as u16);
671                    }
672
673                    builder.finish_and_track(now, self, sent_frames.take(), buf);
674
675                    if num_datagrams == 1 {
676                        // Set the segment size for this GSO batch to the size of the first UDP
677                        // datagram in the batch. Larger data that cannot be fragmented
678                        // (e.g. application datagrams) will be included in a future batch. When
679                        // sending large enough volumes of data for GSO to be useful, we expect
680                        // packet sizes to usually be consistent, e.g. populated by max-size STREAM
681                        // frames or uniformly sized datagrams.
682                        segment_size = buf.len();
683                        // Clip the unused capacity out of the buffer so future packets don't
684                        // overrun
685                        buf_capacity = buf.len();
686
687                        // Check whether the data we planned to send will fit in the reduced segment
688                        // size. If not, bail out and leave it for the next GSO batch so we don't
689                        // end up trying to send an empty packet. We can't easily compute the right
690                        // segment size before the original call to `space_can_send`, because at
691                        // that time we haven't determined whether we're going to coalesce with the
692                        // first datagram or potentially pad it to `MIN_INITIAL_SIZE`.
693                        if space_id == SpaceId::Data {
694                            let frame_space_1rtt =
695                                segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
696                            if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
697                                break;
698                            }
699                        }
700                    }
701                }
702
703                // Allocate space for another datagram
704                let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
705                    0 => segment_size,
706                    _ => {
707                        self.spaces[space_id].loss_probes -= 1;
708                        // Clamp the datagram to at most the minimum MTU to ensure that loss probes
709                        // can get through and enable recovery even if the path MTU has shrank
710                        // unexpectedly.
711                        std::cmp::min(segment_size, usize::from(INITIAL_MTU))
712                    }
713                };
714                buf_capacity += next_datagram_size_limit;
715                if buf.capacity() < buf_capacity {
716                    // We reserve the maximum space for sending `max_datagrams` upfront
717                    // to avoid any reallocations if more datagrams have to be appended later on.
718                    // Benchmarks have shown shown a 5-10% throughput improvement
719                    // compared to continuously resizing the datagram buffer.
720                    // While this will lead to over-allocation for small transmits
721                    // (e.g. purely containing ACKs), modern memory allocators
722                    // (e.g. mimalloc and jemalloc) will pool certain allocation sizes
723                    // and therefore this is still rather efficient.
724                    buf.reserve(max_datagrams * segment_size);
725                }
726                num_datagrams += 1;
727                coalesce = true;
728                pad_datagram = false;
729                datagram_start = buf.len();
730
731                debug_assert_eq!(
732                    datagram_start % segment_size,
733                    0,
734                    "datagrams in a GSO batch must be aligned to the segment size"
735                );
736            } else {
737                // We can append/coalesce the next packet into the current
738                // datagram.
739                // Finish current packet without adding extra padding
740                if let Some(builder) = builder_storage.take() {
741                    builder.finish_and_track(now, self, sent_frames.take(), buf);
742                }
743            }
744
745            debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
746
747            //
748            // From here on, we've determined that a packet will definitely be sent.
749            //
750
751            if self.spaces[SpaceId::Initial].crypto.is_some()
752                && space_id == SpaceId::Handshake
753                && self.side.is_client()
754            {
755                // A client stops both sending and processing Initial packets when it
756                // sends its first Handshake packet.
757                self.discard_space(now, SpaceId::Initial);
758            }
759            if let Some(ref mut prev) = self.prev_crypto {
760                prev.update_unacked = false;
761            }
762
763            debug_assert!(
764                builder_storage.is_none() && sent_frames.is_none(),
765                "Previous packet must have been finished"
766            );
767
768            let builder = builder_storage.insert(PacketBuilder::new(
769                now,
770                space_id,
771                self.rem_cids.active(),
772                buf,
773                buf_capacity,
774                datagram_start,
775                ack_eliciting,
776                self,
777            )?);
778            coalesce = coalesce && !builder.short_header;
779
780            // https://tools.ietf.org/html/draft-ietf-quic-transport-34#section-14.1
781            pad_datagram |=
782                space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
783
784            if close {
785                trace!("sending CONNECTION_CLOSE");
786                // Encode ACKs before the ConnectionClose message, to give the receiver
787                // a better approximate on what data has been processed. This is
788                // especially important with ack delay, since the peer might not
789                // have gotten any other ACK for the data earlier on.
790                if !self.spaces[space_id].pending_acks.ranges().is_empty() {
791                    Self::populate_acks(
792                        now,
793                        self.receiving_ecn,
794                        &mut SentFrames::default(),
795                        &mut self.spaces[space_id],
796                        buf,
797                        &mut self.stats,
798                    );
799                }
800
801                // Since there only 64 ACK frames there will always be enough space
802                // to encode the ConnectionClose frame too. However we still have the
803                // check here to prevent crashes if something changes.
804                debug_assert!(
805                    buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
806                    "ACKs should leave space for ConnectionClose"
807                );
808                if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
809                    let max_frame_size = builder.max_size - buf.len();
810                    match self.state {
811                        State::Closed(state::Closed { ref reason }) => {
812                            if space_id == SpaceId::Data || reason.is_transport_layer() {
813                                reason.encode(buf, max_frame_size)
814                            } else {
815                                frame::ConnectionClose {
816                                    error_code: TransportErrorCode::APPLICATION_ERROR,
817                                    frame_type: None,
818                                    reason: Bytes::new(),
819                                }
820                                .encode(buf, max_frame_size)
821                            }
822                        }
823                        State::Draining => frame::ConnectionClose {
824                            error_code: TransportErrorCode::NO_ERROR,
825                            frame_type: None,
826                            reason: Bytes::new(),
827                        }
828                        .encode(buf, max_frame_size),
829                        _ => unreachable!(
830                            "tried to make a close packet when the connection wasn't closed"
831                        ),
832                    }
833                }
834                if space_id == self.highest_space {
835                    // Don't send another close packet
836                    self.close = false;
837                    // `CONNECTION_CLOSE` is the final packet
838                    break;
839                } else {
840                    // Send a close frame in every possible space for robustness, per RFC9000
841                    // "Immediate Close during the Handshake". Don't bother trying to send anything
842                    // else.
843                    space_idx += 1;
844                    continue;
845                }
846            }
847
848            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path
849            // validation can occur while the link is saturated.
850            if space_id == SpaceId::Data && num_datagrams == 1 {
851                if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
852                    // `unwrap` guaranteed to succeed because `builder_storage` was populated just
853                    // above.
854                    let mut builder = builder_storage.take().unwrap();
855                    trace!("PATH_RESPONSE {:08x} (off-path)", token);
856                    buf.write(frame::FrameType::PATH_RESPONSE);
857                    buf.write(token);
858                    self.stats.frame_tx.path_response += 1;
859                    builder.pad_to(MIN_INITIAL_SIZE);
860                    builder.finish_and_track(
861                        now,
862                        self,
863                        Some(SentFrames {
864                            non_retransmits: true,
865                            ..SentFrames::default()
866                        }),
867                        buf,
868                    );
869                    self.stats.udp_tx.on_sent(1, buf.len());
870                    return Some(Transmit {
871                        destination: remote,
872                        size: buf.len(),
873                        ecn: None,
874                        segment_size: None,
875                        src_ip: self.local_ip,
876                    });
877                }
878            }
879
880            let sent =
881                self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
882
883            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
884            // any other reason, there is a bug which leads to one component announcing write
885            // readiness while not writing any data. This degrades performance. The condition is
886            // only checked if the full MTU is available and when potentially large fixed-size
887            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
888            // writing ACKs.
889            debug_assert!(
890                !(sent.is_ack_only(&self.streams)
891                    && !can_send.acks
892                    && can_send.other
893                    && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
894                    && self.datagrams.outgoing.is_empty()),
895                "SendableFrames was {can_send:?}, but only ACKs have been written"
896            );
897            pad_datagram |= sent.requires_padding;
898
899            if sent.largest_acked.is_some() {
900                self.spaces[space_id].pending_acks.acks_sent();
901                self.timers.stop(Timer::MaxAckDelay);
902            }
903
904            // Keep information about the packet around until it gets finalized
905            sent_frames = Some(sent);
906
907            // Don't increment space_idx.
908            // We stay in the current space and check if there is more data to send.
909        }
910
911        // Finish the last packet
912        if let Some(mut builder) = builder_storage {
913            if pad_datagram {
914                builder.pad_to(MIN_INITIAL_SIZE);
915            }
916
917            // If this datagram is a loss probe and `segment_size` is larger than `INITIAL_MTU`,
918            // then padding it to `segment_size` would risk failure to recover from a reduction in
919            // path MTU.
920            // Loss probes are the only packets for which we might grow `buf_capacity`
921            // by less than `segment_size`.
922            if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
923                builder.pad_to(segment_size as u16);
924            }
925
926            let last_packet_number = builder.exact_number;
927            builder.finish_and_track(now, self, sent_frames, buf);
928            self.path
929                .congestion
930                .on_sent(now, buf.len() as u64, last_packet_number);
931
932            self.config.qlog_sink.emit_recovery_metrics(
933                self.pto_count,
934                &mut self.path,
935                now,
936                self.orig_rem_cid,
937            );
938        }
939
940        self.app_limited = buf.is_empty() && !congestion_blocked;
941
942        // Send MTU probe if necessary
943        if buf.is_empty() && self.state.is_established() {
944            let space_id = SpaceId::Data;
945            let probe_size = self
946                .path
947                .mtud
948                .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
949
950            let buf_capacity = probe_size as usize;
951            buf.reserve(buf_capacity);
952
953            let mut builder = PacketBuilder::new(
954                now,
955                space_id,
956                self.rem_cids.active(),
957                buf,
958                buf_capacity,
959                0,
960                true,
961                self,
962            )?;
963
964            // We implement MTU probes as ping packets padded up to the probe size
965            buf.write(frame::FrameType::PING);
966            self.stats.frame_tx.ping += 1;
967
968            // If supported by the peer, we want no delays to the probe's ACK
969            if self.peer_supports_ack_frequency() {
970                buf.write(frame::FrameType::IMMEDIATE_ACK);
971                self.stats.frame_tx.immediate_ack += 1;
972            }
973
974            builder.pad_to(probe_size);
975            let sent_frames = SentFrames {
976                non_retransmits: true,
977                ..Default::default()
978            };
979            builder.finish_and_track(now, self, Some(sent_frames), buf);
980
981            self.stats.path.sent_plpmtud_probes += 1;
982            num_datagrams = 1;
983
984            trace!(?probe_size, "writing MTUD probe");
985        }
986
987        if buf.is_empty() {
988            return None;
989        }
990
991        trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
992        self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
993
994        self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
995
996        Some(Transmit {
997            destination: self.path.remote,
998            size: buf.len(),
999            ecn: if self.path.sending_ecn {
1000                Some(EcnCodepoint::Ect0)
1001            } else {
1002                None
1003            },
1004            segment_size: match num_datagrams {
1005                1 => None,
1006                _ => Some(segment_size),
1007            },
1008            src_ip: self.local_ip,
1009        })
1010    }
1011
1012    /// Send PATH_CHALLENGE for a previous path if necessary
1013    fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1014        let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1015        if !prev_path.challenge_pending {
1016            return None;
1017        }
1018        prev_path.challenge_pending = false;
1019        let token = prev_path
1020            .challenge
1021            .expect("previous path challenge pending without token");
1022        let destination = prev_path.remote;
1023        debug_assert_eq!(
1024            self.highest_space,
1025            SpaceId::Data,
1026            "PATH_CHALLENGE queued without 1-RTT keys"
1027        );
1028        buf.reserve(MIN_INITIAL_SIZE as usize);
1029
1030        let buf_capacity = buf.capacity();
1031
1032        // Use the previous CID to avoid linking the new path with the previous path. We
1033        // don't bother accounting for possible retirement of that prev_cid because this is
1034        // sent once, immediately after migration, when the CID is known to be valid. Even
1035        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1036        // this is sent first.
1037        let mut builder = PacketBuilder::new(
1038            now,
1039            SpaceId::Data,
1040            *prev_cid,
1041            buf,
1042            buf_capacity,
1043            0,
1044            false,
1045            self,
1046        )?;
1047        trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1048        buf.write(frame::FrameType::PATH_CHALLENGE);
1049        buf.write(token);
1050        self.stats.frame_tx.path_challenge += 1;
1051
1052        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1053        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1054        // unless the anti-amplification limit for the path does not permit
1055        // sending a datagram of this size
1056        builder.pad_to(MIN_INITIAL_SIZE);
1057
1058        builder.finish(self, now, buf);
1059        self.stats.udp_tx.on_sent(1, buf.len());
1060
1061        Some(Transmit {
1062            destination,
1063            size: buf.len(),
1064            ecn: None,
1065            segment_size: None,
1066            src_ip: self.local_ip,
1067        })
1068    }
1069
1070    /// Indicate what types of frames are ready to send for the given space
1071    fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1072        if self.spaces[space_id].crypto.is_none()
1073            && (space_id != SpaceId::Data
1074                || self.zero_rtt_crypto.is_none()
1075                || self.side.is_server())
1076        {
1077            // No keys available for this space
1078            return SendableFrames::empty();
1079        }
1080        let mut can_send = self.spaces[space_id].can_send(&self.streams);
1081        if space_id == SpaceId::Data {
1082            can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1083        }
1084        can_send
1085    }
1086
1087    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1088    ///
1089    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1090    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1091    /// extracted through the relevant methods.
1092    pub fn handle_event(&mut self, event: ConnectionEvent) {
1093        use ConnectionEventInner::*;
1094        match event.0 {
1095            Datagram(DatagramConnectionEvent {
1096                now,
1097                remote,
1098                ecn,
1099                first_decode,
1100                remaining,
1101            }) => {
1102                // If this packet could initiate a migration and we're a client or a server that
1103                // forbids migration, drop the datagram. This could be relaxed to heuristically
1104                // permit NAT-rebinding-like migration.
1105                if remote != self.path.remote && !self.side.remote_may_migrate() {
1106                    trace!("discarding packet from unrecognized peer {}", remote);
1107                    return;
1108                }
1109
1110                let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1111
1112                self.stats.udp_rx.datagrams += 1;
1113                self.stats.udp_rx.bytes += first_decode.len() as u64;
1114                let data_len = first_decode.len();
1115
1116                self.handle_decode(now, remote, ecn, first_decode);
1117                // The current `path` might have changed inside `handle_decode`,
1118                // since the packet could have triggered a migration. Make sure
1119                // the data received is accounted for the most recent path by accessing
1120                // `path` after `handle_decode`.
1121                self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1122
1123                if let Some(data) = remaining {
1124                    self.stats.udp_rx.bytes += data.len() as u64;
1125                    self.handle_coalesced(now, remote, ecn, data);
1126                }
1127
1128                self.config.qlog_sink.emit_recovery_metrics(
1129                    self.pto_count,
1130                    &mut self.path,
1131                    now,
1132                    self.orig_rem_cid,
1133                );
1134
1135                if was_anti_amplification_blocked {
1136                    // A prior attempt to set the loss detection timer may have failed due to
1137                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1138                    // the server's first flight is lost.
1139                    self.set_loss_detection_timer(now);
1140                }
1141            }
1142            NewIdentifiers(ids, now) => {
1143                self.local_cid_state.new_cids(&ids, now);
1144                ids.into_iter().rev().for_each(|frame| {
1145                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1146                });
1147                // Update Timer::PushNewCid
1148                if self
1149                    .timers
1150                    .get(Timer::PushNewCid)
1151                    .map_or(true, |x| x <= now)
1152                {
1153                    self.reset_cid_retirement();
1154                }
1155            }
1156        }
1157    }
1158
1159    /// Process timer expirations
1160    ///
1161    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1162    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1163    /// methods.
1164    ///
1165    /// It is most efficient to call this immediately after the system clock reaches the latest
1166    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1167    /// no-op and therefore are safe.
1168    pub fn handle_timeout(&mut self, now: Instant) {
1169        for &timer in &Timer::VALUES {
1170            if !self.timers.is_expired(timer, now) {
1171                continue;
1172            }
1173            self.timers.stop(timer);
1174            trace!(timer = ?timer, "timeout");
1175            match timer {
1176                Timer::Close => {
1177                    self.state = State::Drained;
1178                    self.endpoint_events.push_back(EndpointEventInner::Drained);
1179                }
1180                Timer::Idle => {
1181                    self.kill(ConnectionError::TimedOut);
1182                }
1183                Timer::KeepAlive => {
1184                    trace!("sending keep-alive");
1185                    self.ping();
1186                }
1187                Timer::LossDetection => {
1188                    self.on_loss_detection_timeout(now);
1189
1190                    self.config.qlog_sink.emit_recovery_metrics(
1191                        self.pto_count,
1192                        &mut self.path,
1193                        now,
1194                        self.orig_rem_cid,
1195                    );
1196                }
1197                Timer::KeyDiscard => {
1198                    self.zero_rtt_crypto = None;
1199                    self.prev_crypto = None;
1200                }
1201                Timer::PathValidation => {
1202                    debug!("path validation failed");
1203                    if let Some((_, prev)) = self.prev_path.take() {
1204                        self.path = prev;
1205                    }
1206                    self.path.challenge = None;
1207                    self.path.challenge_pending = false;
1208                }
1209                Timer::Pacing => trace!("pacing timer expired"),
1210                Timer::PushNewCid => {
1211                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1212                    let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1213                    if !self.state.is_closed() {
1214                        trace!(
1215                            "push a new cid to peer RETIRE_PRIOR_TO field {}",
1216                            self.local_cid_state.retire_prior_to()
1217                        );
1218                        self.endpoint_events
1219                            .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1220                    }
1221                }
1222                Timer::MaxAckDelay => {
1223                    trace!("max ack delay reached");
1224                    // This timer is only armed in the Data space
1225                    self.spaces[SpaceId::Data]
1226                        .pending_acks
1227                        .on_max_ack_delay_timeout()
1228                }
1229            }
1230        }
1231    }
1232
1233    /// Close a connection immediately
1234    ///
1235    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
1236    /// call this only when all important communications have been completed, e.g. by calling
1237    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
1238    /// [`StreamEvent::Finished`] event.
1239    ///
1240    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
1241    /// delivered. There may still be data from the peer that has not been received.
1242    ///
1243    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
1244    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1245        self.close_inner(
1246            now,
1247            Close::Application(frame::ApplicationClose { error_code, reason }),
1248        )
1249    }
1250
1251    fn close_inner(&mut self, now: Instant, reason: Close) {
1252        let was_closed = self.state.is_closed();
1253        if !was_closed {
1254            self.close_common();
1255            self.set_close_timer(now);
1256            self.close = true;
1257            self.state = State::Closed(state::Closed { reason });
1258        }
1259    }
1260
1261    /// Control datagrams
1262    pub fn datagrams(&mut self) -> Datagrams<'_> {
1263        Datagrams { conn: self }
1264    }
1265
1266    /// Returns connection statistics
1267    pub fn stats(&self) -> ConnectionStats {
1268        let mut stats = self.stats;
1269        stats.path.rtt = self.path.rtt.get();
1270        stats.path.cwnd = self.path.congestion.window();
1271        stats.path.current_mtu = self.path.mtud.current_mtu();
1272
1273        stats
1274    }
1275
1276    /// Ping the remote endpoint
1277    ///
1278    /// Causes an ACK-eliciting packet to be transmitted.
1279    pub fn ping(&mut self) {
1280        self.spaces[self.highest_space].ping_pending = true;
1281    }
1282
1283    /// Update traffic keys spontaneously
1284    ///
1285    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
1286    pub fn force_key_update(&mut self) {
1287        if !self.state.is_established() {
1288            debug!("ignoring forced key update in illegal state");
1289            return;
1290        }
1291        if self.prev_crypto.is_some() {
1292            // We already just updated, or are currently updating, the keys. Concurrent key updates
1293            // are illegal.
1294            debug!("ignoring redundant forced key update");
1295            return;
1296        }
1297        self.update_keys(None, false);
1298    }
1299
1300    // Compatibility wrapper for quinn < 0.11.7. Remove for 0.12.
1301    #[doc(hidden)]
1302    #[deprecated]
1303    pub fn initiate_key_update(&mut self) {
1304        self.force_key_update();
1305    }
1306
1307    /// Get a session reference
1308    pub fn crypto_session(&self) -> &dyn crypto::Session {
1309        &*self.crypto
1310    }
1311
1312    /// Whether the connection is in the process of being established
1313    ///
1314    /// If this returns `false`, the connection may be either established or closed, signaled by the
1315    /// emission of a `Connected` or `ConnectionLost` message respectively.
1316    pub fn is_handshaking(&self) -> bool {
1317        self.state.is_handshake()
1318    }
1319
1320    /// Whether the connection is closed
1321    ///
1322    /// Closed connections cannot transport any further data. A connection becomes closed when
1323    /// either peer application intentionally closes it, or when either transport layer detects an
1324    /// error such as a time-out or certificate validation failure.
1325    ///
1326    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
1327    pub fn is_closed(&self) -> bool {
1328        self.state.is_closed()
1329    }
1330
1331    /// Whether there is no longer any need to keep the connection around
1332    ///
1333    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
1334    /// packets from the peer. All drained connections have been closed.
1335    pub fn is_drained(&self) -> bool {
1336        self.state.is_drained()
1337    }
1338
1339    /// For clients, if the peer accepted the 0-RTT data packets
1340    ///
1341    /// The value is meaningless until after the handshake completes.
1342    pub fn accepted_0rtt(&self) -> bool {
1343        self.accepted_0rtt
1344    }
1345
1346    /// Whether 0-RTT is/was possible during the handshake
1347    pub fn has_0rtt(&self) -> bool {
1348        self.zero_rtt_enabled
1349    }
1350
1351    /// Whether there are any pending retransmits
1352    pub fn has_pending_retransmits(&self) -> bool {
1353        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1354    }
1355
1356    /// Look up whether we're the client or server of this Connection
1357    pub fn side(&self) -> Side {
1358        self.side.side()
1359    }
1360
1361    /// The latest socket address for this connection's peer
1362    pub fn remote_address(&self) -> SocketAddr {
1363        self.path.remote
1364    }
1365
1366    /// The local IP address which was used when the peer established
1367    /// the connection
1368    ///
1369    /// This can be different from the address the endpoint is bound to, in case
1370    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
1371    ///
1372    /// This will return `None` for clients, or when no `local_ip` was passed to
1373    /// [`Endpoint::handle()`](crate::Endpoint::handle) for the datagrams establishing this
1374    /// connection.
1375    pub fn local_ip(&self) -> Option<IpAddr> {
1376        self.local_ip
1377    }
1378
1379    /// Current best estimate of this connection's latency (round-trip-time)
1380    pub fn rtt(&self) -> Duration {
1381        self.path.rtt.get()
1382    }
1383
1384    /// Current state of this connection's congestion controller, for debugging purposes
1385    pub fn congestion_state(&self) -> &dyn Controller {
1386        self.path.congestion.as_ref()
1387    }
1388
1389    /// Resets path-specific settings.
1390    ///
1391    /// This will force-reset several subsystems related to a specific network path.
1392    /// Currently this is the congestion controller, round-trip estimator, and the MTU
1393    /// discovery.
1394    ///
1395    /// This is useful when it is known the underlying network path has changed and the old
1396    /// state of these subsystems is no longer valid or optimal. In this case it might be
1397    /// faster or reduce loss to settle on optimal values by restarting from the initial
1398    /// configuration in the [`TransportConfig`].
1399    pub fn path_changed(&mut self, now: Instant) {
1400        self.path.reset(now, &self.config);
1401    }
1402
1403    /// Modify the number of remotely initiated streams that may be concurrently open
1404    ///
1405    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
1406    /// `count`s increase both minimum and worst-case memory consumption.
1407    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1408        self.streams.set_max_concurrent(dir, count);
1409        // If the limit was reduced, then a flow control update previously deemed insignificant may
1410        // now be significant.
1411        let pending = &mut self.spaces[SpaceId::Data].pending;
1412        self.streams.queue_max_stream_id(pending);
1413    }
1414
1415    /// Current number of remotely initiated streams that may be concurrently open
1416    ///
1417    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
1418    /// it will not change immediately, even if fewer streams are open. Instead, it will
1419    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
1420    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1421        self.streams.max_concurrent(dir)
1422    }
1423
1424    /// See [`TransportConfig::send_window()`]
1425    pub fn set_send_window(&mut self, send_window: u64) {
1426        self.streams.set_send_window(send_window);
1427    }
1428
1429    /// See [`TransportConfig::receive_window()`]
1430    pub fn set_receive_window(&mut self, receive_window: VarInt) {
1431        if self.streams.set_receive_window(receive_window) {
1432            self.spaces[SpaceId::Data].pending.max_data = true;
1433        }
1434    }
1435
1436    fn on_ack_received(
1437        &mut self,
1438        now: Instant,
1439        space: SpaceId,
1440        ack: frame::Ack,
1441    ) -> Result<(), TransportError> {
1442        if ack.largest >= self.spaces[space].next_packet_number {
1443            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1444        }
1445        let new_largest = {
1446            let space = &mut self.spaces[space];
1447            if space
1448                .largest_acked_packet
1449                .map_or(true, |pn| ack.largest > pn)
1450            {
1451                space.largest_acked_packet = Some(ack.largest);
1452                if let Some(info) = space.sent_packets.get(&ack.largest) {
1453                    // This should always succeed, but a misbehaving peer might ACK a packet we
1454                    // haven't sent. At worst, that will result in us spuriously reducing the
1455                    // congestion window.
1456                    space.largest_acked_packet_sent = info.time_sent;
1457                }
1458                true
1459            } else {
1460                false
1461            }
1462        };
1463
1464        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
1465        let mut newly_acked = ArrayRangeSet::new();
1466        for range in ack.iter() {
1467            self.packet_number_filter.check_ack(space, range.clone())?;
1468            for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1469                newly_acked.insert_one(pn);
1470            }
1471        }
1472
1473        if newly_acked.is_empty() {
1474            return Ok(());
1475        }
1476
1477        let mut ack_eliciting_acked = false;
1478        for packet in newly_acked.elts() {
1479            if let Some(info) = self.spaces[space].take(packet) {
1480                if let Some(acked) = info.largest_acked {
1481                    // Assume ACKs for all packets below the largest acknowledged in `packet` have
1482                    // been received. This can cause the peer to spuriously retransmit if some of
1483                    // our earlier ACKs were lost, but allows for simpler state tracking. See
1484                    // discussion at
1485                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
1486                    self.spaces[space].pending_acks.subtract_below(acked);
1487                }
1488                ack_eliciting_acked |= info.ack_eliciting;
1489
1490                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
1491                let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1492                if mtu_updated {
1493                    self.path
1494                        .congestion
1495                        .on_mtu_update(self.path.mtud.current_mtu());
1496                }
1497
1498                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
1499                self.ack_frequency.on_acked(packet);
1500
1501                self.on_packet_acked(now, info);
1502            }
1503        }
1504
1505        self.path.congestion.on_end_acks(
1506            now,
1507            self.path.in_flight.bytes,
1508            self.app_limited,
1509            self.spaces[space].largest_acked_packet,
1510        );
1511
1512        if new_largest && ack_eliciting_acked {
1513            let ack_delay = if space != SpaceId::Data {
1514                Duration::from_micros(0)
1515            } else {
1516                cmp::min(
1517                    self.ack_frequency.peer_max_ack_delay,
1518                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1519                )
1520            };
1521            let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1522            self.path.rtt.update(ack_delay, rtt);
1523            if self.path.first_packet_after_rtt_sample.is_none() {
1524                self.path.first_packet_after_rtt_sample =
1525                    Some((space, self.spaces[space].next_packet_number));
1526            }
1527        }
1528
1529        // Must be called before crypto/pto_count are clobbered
1530        self.detect_lost_packets(now, space, true);
1531
1532        if self.peer_completed_address_validation() {
1533            self.pto_count = 0;
1534        }
1535
1536        // Explicit congestion notification
1537        if self.path.sending_ecn {
1538            if let Some(ecn) = ack.ecn {
1539                // We only examine ECN counters from ACKs that we are certain we received in transmit
1540                // order, allowing us to compute an increase in ECN counts to compare against the number
1541                // of newly acked packets that remains well-defined in the presence of arbitrary packet
1542                // reordering.
1543                if new_largest {
1544                    let sent = self.spaces[space].largest_acked_packet_sent;
1545                    self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1546                }
1547            } else {
1548                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
1549                debug!("ECN not acknowledged by peer");
1550                self.path.sending_ecn = false;
1551            }
1552        }
1553
1554        self.set_loss_detection_timer(now);
1555        Ok(())
1556    }
1557
1558    /// Process a new ECN block from an in-order ACK
1559    fn process_ecn(
1560        &mut self,
1561        now: Instant,
1562        space: SpaceId,
1563        newly_acked: u64,
1564        ecn: frame::EcnCounts,
1565        largest_sent_time: Instant,
1566    ) {
1567        match self.spaces[space].detect_ecn(newly_acked, ecn) {
1568            Err(e) => {
1569                debug!("halting ECN due to verification failure: {}", e);
1570                self.path.sending_ecn = false;
1571                // Wipe out the existing value because it might be garbage and could interfere with
1572                // future attempts to use ECN on new paths.
1573                self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
1574            }
1575            Ok(false) => {}
1576            Ok(true) => {
1577                self.stats.path.congestion_events += 1;
1578                self.path
1579                    .congestion
1580                    .on_congestion_event(now, largest_sent_time, false, 0);
1581            }
1582        }
1583    }
1584
1585    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
1586    // high-latency handshakes
1587    fn on_packet_acked(&mut self, now: Instant, info: SentPacket) {
1588        self.remove_in_flight(&info);
1589        if info.ack_eliciting && self.path.challenge.is_none() {
1590            // Only pass ACKs to the congestion controller if we are not validating the current
1591            // path, so as to ignore any ACKs from older paths still coming in.
1592            self.path.congestion.on_ack(
1593                now,
1594                info.time_sent,
1595                info.size.into(),
1596                self.app_limited,
1597                &self.path.rtt,
1598            );
1599        }
1600
1601        // Update state for confirmed delivery of frames
1602        if let Some(retransmits) = info.retransmits.get() {
1603            for (id, _) in retransmits.reset_stream.iter() {
1604                self.streams.reset_acked(*id);
1605            }
1606        }
1607
1608        for frame in info.stream_frames {
1609            self.streams.received_ack_of(frame);
1610        }
1611    }
1612
1613    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
1614        let start = if self.zero_rtt_crypto.is_some() {
1615            now
1616        } else {
1617            self.prev_crypto
1618                .as_ref()
1619                .expect("no previous keys")
1620                .end_packet
1621                .as_ref()
1622                .expect("update not acknowledged yet")
1623                .1
1624        };
1625        self.timers
1626            .set(Timer::KeyDiscard, start + self.pto(space) * 3);
1627    }
1628
1629    fn on_loss_detection_timeout(&mut self, now: Instant) {
1630        if let Some((_, pn_space)) = self.loss_time_and_space() {
1631            // Time threshold loss Detection
1632            self.detect_lost_packets(now, pn_space, false);
1633            self.set_loss_detection_timer(now);
1634            return;
1635        }
1636
1637        let (_, space) = match self.pto_time_and_space(now) {
1638            Some(x) => x,
1639            None => {
1640                error!("PTO expired while unset");
1641                return;
1642            }
1643        };
1644        trace!(
1645            in_flight = self.path.in_flight.bytes,
1646            count = self.pto_count,
1647            ?space,
1648            "PTO fired"
1649        );
1650
1651        let count = match self.path.in_flight.ack_eliciting {
1652            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
1653            // deadlock preventions
1654            0 => {
1655                debug_assert!(!self.peer_completed_address_validation());
1656                1
1657            }
1658            // Conventional loss probe
1659            _ => 2,
1660        };
1661        self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
1662        self.pto_count = self.pto_count.saturating_add(1);
1663        self.set_loss_detection_timer(now);
1664    }
1665
1666    fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
1667        let mut lost_packets = Vec::<u64>::new();
1668        let mut lost_mtu_probe = None;
1669        let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
1670        let rtt = self.path.rtt.conservative();
1671        let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
1672
1673        // Packets sent before this time are deemed lost.
1674        let lost_send_time = now.checked_sub(loss_delay).unwrap();
1675        let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
1676        let packet_threshold = self.config.packet_threshold as u64;
1677        let mut size_of_lost_packets = 0u64;
1678
1679        // InPersistentCongestion: Determine if all packets in the time period before the newest
1680        // lost packet, including the edges, are marked lost. PTO computation must always
1681        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
1682        let congestion_period =
1683            self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
1684        let mut persistent_congestion_start: Option<Instant> = None;
1685        let mut prev_packet = None;
1686        let mut in_persistent_congestion = false;
1687
1688        let space = &mut self.spaces[pn_space];
1689        space.loss_time = None;
1690
1691        for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
1692            if prev_packet != Some(packet.wrapping_sub(1)) {
1693                // An intervening packet was acknowledged
1694                persistent_congestion_start = None;
1695            }
1696
1697            if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
1698            {
1699                if Some(packet) == in_flight_mtu_probe {
1700                    // Lost MTU probes are not included in `lost_packets`, because they should not
1701                    // trigger a congestion control response
1702                    lost_mtu_probe = in_flight_mtu_probe;
1703                } else {
1704                    lost_packets.push(packet);
1705                    size_of_lost_packets += info.size as u64;
1706                    if info.ack_eliciting && due_to_ack {
1707                        match persistent_congestion_start {
1708                            // Two ACK-eliciting packets lost more than congestion_period apart, with no
1709                            // ACKed packets in between
1710                            Some(start) if info.time_sent - start > congestion_period => {
1711                                in_persistent_congestion = true;
1712                            }
1713                            // Persistent congestion must start after the first RTT sample
1714                            None if self
1715                                .path
1716                                .first_packet_after_rtt_sample
1717                                .is_some_and(|x| x < (pn_space, packet)) =>
1718                            {
1719                                persistent_congestion_start = Some(info.time_sent);
1720                            }
1721                            _ => {}
1722                        }
1723                    }
1724                }
1725            } else {
1726                let next_loss_time = info.time_sent + loss_delay;
1727                space.loss_time = Some(
1728                    space
1729                        .loss_time
1730                        .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
1731                );
1732                persistent_congestion_start = None;
1733            }
1734
1735            prev_packet = Some(packet);
1736        }
1737
1738        // OnPacketsLost
1739        if let Some(largest_lost) = lost_packets.last().cloned() {
1740            let old_bytes_in_flight = self.path.in_flight.bytes;
1741            let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
1742            self.stats.path.lost_packets += lost_packets.len() as u64;
1743            self.stats.path.lost_bytes += size_of_lost_packets;
1744            trace!(
1745                "packets lost: {:?}, bytes lost: {}",
1746                lost_packets, size_of_lost_packets
1747            );
1748
1749            for &packet in &lost_packets {
1750                let info = self.spaces[pn_space].take(packet).unwrap(); // safe: lost_packets is populated just above
1751                self.config.qlog_sink.emit_packet_lost(
1752                    packet,
1753                    &info,
1754                    lost_send_time,
1755                    pn_space,
1756                    now,
1757                    self.orig_rem_cid,
1758                );
1759                self.remove_in_flight(&info);
1760                for frame in info.stream_frames {
1761                    self.streams.retransmit(frame);
1762                }
1763                self.spaces[pn_space].pending |= info.retransmits;
1764                self.path.mtud.on_non_probe_lost(packet, info.size);
1765            }
1766
1767            if self.path.mtud.black_hole_detected(now) {
1768                self.stats.path.black_holes_detected += 1;
1769                self.path
1770                    .congestion
1771                    .on_mtu_update(self.path.mtud.current_mtu());
1772                if let Some(max_datagram_size) = self.datagrams().max_size() {
1773                    self.datagrams.drop_oversized(max_datagram_size);
1774                }
1775            }
1776
1777            // Don't apply congestion penalty for lost ack-only packets
1778            let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
1779
1780            if lost_ack_eliciting {
1781                self.stats.path.congestion_events += 1;
1782                self.path.congestion.on_congestion_event(
1783                    now,
1784                    largest_lost_sent,
1785                    in_persistent_congestion,
1786                    size_of_lost_packets,
1787                );
1788            }
1789        }
1790
1791        // Handle a lost MTU probe
1792        if let Some(packet) = lost_mtu_probe {
1793            let info = self.spaces[SpaceId::Data].take(packet).unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and therefore must not have been removed yet
1794            self.remove_in_flight(&info);
1795            self.path.mtud.on_probe_lost();
1796            self.stats.path.lost_plpmtud_probes += 1;
1797        }
1798    }
1799
1800    fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
1801        SpaceId::iter()
1802            .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
1803            .min_by_key(|&(time, _)| time)
1804    }
1805
1806    fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
1807        let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
1808        let mut duration = self.path.rtt.pto_base() * backoff;
1809
1810        if self.path.in_flight.ack_eliciting == 0 {
1811            debug_assert!(!self.peer_completed_address_validation());
1812            let space = match self.highest_space {
1813                SpaceId::Handshake => SpaceId::Handshake,
1814                _ => SpaceId::Initial,
1815            };
1816            return Some((now + duration, space));
1817        }
1818
1819        let mut result = None;
1820        for space in SpaceId::iter() {
1821            if !self.spaces[space].has_in_flight() {
1822                continue;
1823            }
1824            if space == SpaceId::Data {
1825                // Skip ApplicationData until handshake completes.
1826                if self.is_handshaking() {
1827                    return result;
1828                }
1829                // Include max_ack_delay and backoff for ApplicationData.
1830                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
1831            }
1832            let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
1833                Some(time) => time,
1834                None => continue,
1835            };
1836            let pto = last_ack_eliciting + duration;
1837            if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
1838                result = Some((pto, space));
1839            }
1840        }
1841        result
1842    }
1843
1844    fn peer_completed_address_validation(&self) -> bool {
1845        if self.side.is_server() || self.state.is_closed() {
1846            return true;
1847        }
1848        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
1849        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
1850        self.spaces[SpaceId::Handshake]
1851            .largest_acked_packet
1852            .is_some()
1853            || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
1854            || (self.spaces[SpaceId::Data].crypto.is_some()
1855                && self.spaces[SpaceId::Handshake].crypto.is_none())
1856    }
1857
1858    fn set_loss_detection_timer(&mut self, now: Instant) {
1859        if self.state.is_closed() {
1860            // No loss detection takes place on closed connections, and `close_common` already
1861            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
1862            // reordered packet being handled by state-insensitive code.
1863            return;
1864        }
1865
1866        if let Some((loss_time, _)) = self.loss_time_and_space() {
1867            // Time threshold loss detection.
1868            self.timers.set(Timer::LossDetection, loss_time);
1869            return;
1870        }
1871
1872        if self.path.anti_amplification_blocked(1) {
1873            // We wouldn't be able to send anything, so don't bother.
1874            self.timers.stop(Timer::LossDetection);
1875            return;
1876        }
1877
1878        if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
1879            // There is nothing to detect lost, so no timer is set. However, the client needs to arm
1880            // the timer if the server might be blocked by the anti-amplification limit.
1881            self.timers.stop(Timer::LossDetection);
1882            return;
1883        }
1884
1885        // Determine which PN space to arm PTO for.
1886        // Calculate PTO duration
1887        if let Some((timeout, _)) = self.pto_time_and_space(now) {
1888            self.timers.set(Timer::LossDetection, timeout);
1889        } else {
1890            self.timers.stop(Timer::LossDetection);
1891        }
1892    }
1893
1894    /// Probe Timeout
1895    fn pto(&self, space: SpaceId) -> Duration {
1896        let max_ack_delay = match space {
1897            SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
1898            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
1899        };
1900        self.path.rtt.pto_base() + max_ack_delay
1901    }
1902
1903    fn on_packet_authenticated(
1904        &mut self,
1905        now: Instant,
1906        space_id: SpaceId,
1907        ecn: Option<EcnCodepoint>,
1908        packet: Option<u64>,
1909        spin: bool,
1910        is_1rtt: bool,
1911    ) {
1912        self.total_authed_packets += 1;
1913        self.reset_keep_alive(now);
1914        self.reset_idle_timeout(now, space_id);
1915        self.permit_idle_reset = true;
1916        self.receiving_ecn |= ecn.is_some();
1917        if let Some(x) = ecn {
1918            let space = &mut self.spaces[space_id];
1919            space.ecn_counters += x;
1920
1921            if x.is_ce() {
1922                space.pending_acks.set_immediate_ack_required();
1923            }
1924        }
1925
1926        let packet = match packet {
1927            Some(x) => x,
1928            None => return,
1929        };
1930        if self.side.is_server() {
1931            if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
1932                // A server stops sending and processing Initial packets when it receives its first Handshake packet.
1933                self.discard_space(now, SpaceId::Initial);
1934            }
1935            if self.zero_rtt_crypto.is_some() && is_1rtt {
1936                // Discard 0-RTT keys soon after receiving a 1-RTT packet
1937                self.set_key_discard_timer(now, space_id)
1938            }
1939        }
1940        let space = &mut self.spaces[space_id];
1941        space.pending_acks.insert_one(packet, now);
1942        if packet >= space.rx_packet {
1943            space.rx_packet = packet;
1944            // Update outgoing spin bit, inverting iff we're the client
1945            self.spin = self.side.is_client() ^ spin;
1946        }
1947
1948        self.config.qlog_sink.emit_packet_received(
1949            packet,
1950            space_id,
1951            !is_1rtt,
1952            now,
1953            self.orig_rem_cid,
1954        );
1955    }
1956
1957    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
1958        let timeout = match self.idle_timeout {
1959            None => return,
1960            Some(dur) => dur,
1961        };
1962        if self.state.is_closed() {
1963            self.timers.stop(Timer::Idle);
1964            return;
1965        }
1966        let dt = cmp::max(timeout, 3 * self.pto(space));
1967        self.timers.set(Timer::Idle, now + dt);
1968    }
1969
1970    fn reset_keep_alive(&mut self, now: Instant) {
1971        let interval = match self.config.keep_alive_interval {
1972            Some(x) if self.state.is_established() => x,
1973            _ => return,
1974        };
1975        self.timers.set(Timer::KeepAlive, now + interval);
1976    }
1977
1978    fn reset_cid_retirement(&mut self) {
1979        if let Some(t) = self.local_cid_state.next_timeout() {
1980            self.timers.set(Timer::PushNewCid, t);
1981        }
1982    }
1983
1984    /// Handle the already-decrypted first packet from the client
1985    ///
1986    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
1987    /// efficient.
1988    pub(crate) fn handle_first_packet(
1989        &mut self,
1990        now: Instant,
1991        remote: SocketAddr,
1992        ecn: Option<EcnCodepoint>,
1993        packet_number: u64,
1994        packet: InitialPacket,
1995        remaining: Option<BytesMut>,
1996    ) -> Result<(), ConnectionError> {
1997        let span = trace_span!("first recv");
1998        let _guard = span.enter();
1999        debug_assert!(self.side.is_server());
2000        let len = packet.header_data.len() + packet.payload.len();
2001        self.path.total_recvd = len as u64;
2002
2003        match self.state {
2004            State::Handshake(ref mut state) => {
2005                state.expected_token = packet.header.token.clone();
2006            }
2007            _ => unreachable!("first packet must be delivered in Handshake state"),
2008        }
2009
2010        self.on_packet_authenticated(
2011            now,
2012            SpaceId::Initial,
2013            ecn,
2014            Some(packet_number),
2015            false,
2016            false,
2017        );
2018
2019        self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2020        if let Some(data) = remaining {
2021            self.handle_coalesced(now, remote, ecn, data);
2022        }
2023
2024        self.config.qlog_sink.emit_recovery_metrics(
2025            self.pto_count,
2026            &mut self.path,
2027            now,
2028            self.orig_rem_cid,
2029        );
2030
2031        Ok(())
2032    }
2033
2034    fn init_0rtt(&mut self) {
2035        let (header, packet) = match self.crypto.early_crypto() {
2036            Some(x) => x,
2037            None => return,
2038        };
2039        if self.side.is_client() {
2040            match self.crypto.transport_parameters() {
2041                Ok(params) => {
2042                    let params = params
2043                        .expect("crypto layer didn't supply transport parameters with ticket");
2044                    // Certain values must not be cached
2045                    let params = TransportParameters {
2046                        initial_src_cid: None,
2047                        original_dst_cid: None,
2048                        preferred_address: None,
2049                        retry_src_cid: None,
2050                        stateless_reset_token: None,
2051                        min_ack_delay: None,
2052                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2053                        max_ack_delay: TransportParameters::default().max_ack_delay,
2054                        ..params
2055                    };
2056                    self.set_peer_params(params);
2057                }
2058                Err(e) => {
2059                    error!("session ticket has malformed transport parameters: {}", e);
2060                    return;
2061                }
2062            }
2063        }
2064        trace!("0-RTT enabled");
2065        self.zero_rtt_enabled = true;
2066        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2067    }
2068
2069    fn read_crypto(
2070        &mut self,
2071        space: SpaceId,
2072        crypto: &frame::Crypto,
2073        payload_len: usize,
2074    ) -> Result<(), TransportError> {
2075        let expected = if !self.state.is_handshake() {
2076            SpaceId::Data
2077        } else if self.highest_space == SpaceId::Initial {
2078            SpaceId::Initial
2079        } else {
2080            // On the server, self.highest_space can be Data after receiving the client's first
2081            // flight, but we expect Handshake CRYPTO until the handshake is complete.
2082            SpaceId::Handshake
2083        };
2084        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
2085        // packets are illegal, and we don't process 1-RTT packets until the handshake is
2086        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
2087        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2088
2089        let end = crypto.offset + crypto.data.len() as u64;
2090        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2091            warn!(
2092                "received new {:?} CRYPTO data when expecting {:?}",
2093                space, expected
2094            );
2095            return Err(TransportError::PROTOCOL_VIOLATION(
2096                "new data at unexpected encryption level",
2097            ));
2098        }
2099
2100        let space = &mut self.spaces[space];
2101        let max = end.saturating_sub(space.crypto_stream.bytes_read());
2102        if max > self.config.crypto_buffer_size as u64 {
2103            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2104        }
2105
2106        space
2107            .crypto_stream
2108            .insert(crypto.offset, crypto.data.clone(), payload_len);
2109        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2110            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2111            if self.crypto.read_handshake(&chunk.bytes)? {
2112                self.events.push_back(Event::HandshakeDataReady);
2113            }
2114        }
2115
2116        Ok(())
2117    }
2118
2119    fn write_crypto(&mut self) {
2120        loop {
2121            let space = self.highest_space;
2122            let mut outgoing = Vec::new();
2123            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2124                match space {
2125                    SpaceId::Initial => {
2126                        self.upgrade_crypto(SpaceId::Handshake, crypto);
2127                    }
2128                    SpaceId::Handshake => {
2129                        self.upgrade_crypto(SpaceId::Data, crypto);
2130                    }
2131                    _ => unreachable!("got updated secrets during 1-RTT"),
2132                }
2133            }
2134            if outgoing.is_empty() {
2135                if space == self.highest_space {
2136                    break;
2137                } else {
2138                    // Keys updated, check for more data to send
2139                    continue;
2140                }
2141            }
2142            let offset = self.spaces[space].crypto_offset;
2143            let outgoing = Bytes::from(outgoing);
2144            if let State::Handshake(ref mut state) = self.state {
2145                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2146                    state.client_hello = Some(outgoing.clone());
2147                }
2148            }
2149            self.spaces[space].crypto_offset += outgoing.len() as u64;
2150            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2151            self.spaces[space].pending.crypto.push_back(frame::Crypto {
2152                offset,
2153                data: outgoing,
2154            });
2155        }
2156    }
2157
2158    /// Switch to stronger cryptography during handshake
2159    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2160        debug_assert!(
2161            self.spaces[space].crypto.is_none(),
2162            "already reached packet space {space:?}"
2163        );
2164        trace!("{:?} keys ready", space);
2165        if space == SpaceId::Data {
2166            // Precompute the first key update
2167            self.next_crypto = Some(
2168                self.crypto
2169                    .next_1rtt_keys()
2170                    .expect("handshake should be complete"),
2171            );
2172        }
2173
2174        self.spaces[space].crypto = Some(crypto);
2175        debug_assert!(space as usize > self.highest_space as usize);
2176        self.highest_space = space;
2177        if space == SpaceId::Data && self.side.is_client() {
2178            // Discard 0-RTT keys because 1-RTT keys are available.
2179            self.zero_rtt_crypto = None;
2180        }
2181    }
2182
2183    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2184        debug_assert!(space_id != SpaceId::Data);
2185        trace!("discarding {:?} keys", space_id);
2186        if space_id == SpaceId::Initial {
2187            // No longer needed
2188            if let ConnectionSide::Client { token, .. } = &mut self.side {
2189                *token = Bytes::new();
2190            }
2191        }
2192        let space = &mut self.spaces[space_id];
2193        space.crypto = None;
2194        space.time_of_last_ack_eliciting_packet = None;
2195        space.loss_time = None;
2196        let sent_packets = mem::take(&mut space.sent_packets);
2197        for packet in sent_packets.into_values() {
2198            self.remove_in_flight(&packet);
2199        }
2200        self.set_loss_detection_timer(now)
2201    }
2202
2203    fn handle_coalesced(
2204        &mut self,
2205        now: Instant,
2206        remote: SocketAddr,
2207        ecn: Option<EcnCodepoint>,
2208        data: BytesMut,
2209    ) {
2210        self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2211        let mut remaining = Some(data);
2212        while let Some(data) = remaining {
2213            match PartialDecode::new(
2214                data,
2215                &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2216                &[self.version],
2217                self.endpoint_config.grease_quic_bit,
2218            ) {
2219                Ok((partial_decode, rest)) => {
2220                    remaining = rest;
2221                    self.handle_decode(now, remote, ecn, partial_decode);
2222                }
2223                Err(e) => {
2224                    trace!("malformed header: {}", e);
2225                    return;
2226                }
2227            }
2228        }
2229    }
2230
2231    fn handle_decode(
2232        &mut self,
2233        now: Instant,
2234        remote: SocketAddr,
2235        ecn: Option<EcnCodepoint>,
2236        partial_decode: PartialDecode,
2237    ) {
2238        if let Some(decoded) = packet_crypto::unprotect_header(
2239            partial_decode,
2240            &self.spaces,
2241            self.zero_rtt_crypto.as_ref(),
2242            self.peer_params.stateless_reset_token,
2243        ) {
2244            self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2245        }
2246    }
2247
2248    fn handle_packet(
2249        &mut self,
2250        now: Instant,
2251        remote: SocketAddr,
2252        ecn: Option<EcnCodepoint>,
2253        packet: Option<Packet>,
2254        stateless_reset: bool,
2255    ) {
2256        self.stats.udp_rx.ios += 1;
2257        if let Some(ref packet) = packet {
2258            trace!(
2259                "got {:?} packet ({} bytes) from {} using id {}",
2260                packet.header.space(),
2261                packet.payload.len() + packet.header_data.len(),
2262                remote,
2263                packet.header.dst_cid(),
2264            );
2265        }
2266
2267        if self.is_handshaking() && remote != self.path.remote {
2268            debug!("discarding packet with unexpected remote during handshake");
2269            return;
2270        }
2271
2272        let was_closed = self.state.is_closed();
2273        let was_drained = self.state.is_drained();
2274
2275        let decrypted = match packet {
2276            None => Err(None),
2277            Some(mut packet) => self
2278                .decrypt_packet(now, &mut packet)
2279                .map(move |number| (packet, number)),
2280        };
2281        let result = match decrypted {
2282            _ if stateless_reset => {
2283                debug!("got stateless reset");
2284                Err(ConnectionError::Reset)
2285            }
2286            Err(Some(e)) => {
2287                warn!("illegal packet: {}", e);
2288                Err(e.into())
2289            }
2290            Err(None) => {
2291                debug!("failed to authenticate packet");
2292                self.authentication_failures += 1;
2293                let integrity_limit = self.spaces[self.highest_space]
2294                    .crypto
2295                    .as_ref()
2296                    .unwrap()
2297                    .packet
2298                    .local
2299                    .integrity_limit();
2300                if self.authentication_failures > integrity_limit {
2301                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2302                } else {
2303                    return;
2304                }
2305            }
2306            Ok((packet, number)) => {
2307                let span = match number {
2308                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2309                    None => trace_span!("recv", space = ?packet.header.space()),
2310                };
2311                let _guard = span.enter();
2312
2313                let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2314                if number.is_some_and(is_duplicate) {
2315                    debug!("discarding possible duplicate packet");
2316                    return;
2317                } else if self.state.is_handshake() && packet.header.is_short() {
2318                    // TODO: SHOULD buffer these to improve reordering tolerance.
2319                    trace!("dropping short packet during handshake");
2320                    return;
2321                } else {
2322                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2323                        if let State::Handshake(ref hs) = self.state {
2324                            if self.side.is_server() && token != &hs.expected_token {
2325                                // Clients must send the same retry token in every Initial. Initial
2326                                // packets can be spoofed, so we discard rather than killing the
2327                                // connection.
2328                                warn!("discarding Initial with invalid retry token");
2329                                return;
2330                            }
2331                        }
2332                    }
2333
2334                    if !self.state.is_closed() {
2335                        let spin = match packet.header {
2336                            Header::Short { spin, .. } => spin,
2337                            _ => false,
2338                        };
2339                        self.on_packet_authenticated(
2340                            now,
2341                            packet.header.space(),
2342                            ecn,
2343                            number,
2344                            spin,
2345                            packet.header.is_1rtt(),
2346                        );
2347                    }
2348
2349                    self.process_decrypted_packet(now, remote, number, packet)
2350                }
2351            }
2352        };
2353
2354        // State transitions for error cases
2355        if let Err(conn_err) = result {
2356            self.error = Some(conn_err.clone());
2357            self.state = match conn_err {
2358                ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2359                ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2360                ConnectionError::Reset
2361                | ConnectionError::TransportError(TransportError {
2362                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
2363                    ..
2364                }) => State::Drained,
2365                ConnectionError::TimedOut => {
2366                    unreachable!("timeouts aren't generated by packet processing");
2367                }
2368                ConnectionError::TransportError(err) => {
2369                    debug!("closing connection due to transport error: {}", err);
2370                    State::closed(err)
2371                }
2372                ConnectionError::VersionMismatch => State::Draining,
2373                ConnectionError::LocallyClosed => {
2374                    unreachable!("LocallyClosed isn't generated by packet processing");
2375                }
2376                ConnectionError::CidsExhausted => {
2377                    unreachable!("CidsExhausted isn't generated by packet processing");
2378                }
2379                ConnectionError::JlsAuthFailed(_) => {
2380                    unreachable!("JLSAuthFailed isn't generated by packet processing");
2381                }
2382                ConnectionError::JlsForwardError(_) => {
2383                    unreachable!("JlsForwardError isn't generated by packet processing");
2384                }
2385            };
2386        }
2387
2388        if !was_closed && self.state.is_closed() {
2389            self.close_common();
2390            if !self.state.is_drained() {
2391                self.set_close_timer(now);
2392            }
2393        }
2394        if !was_drained && self.state.is_drained() {
2395            self.endpoint_events.push_back(EndpointEventInner::Drained);
2396            // Close timer may have been started previously, e.g. if we sent a close and got a
2397            // stateless reset in response
2398            self.timers.stop(Timer::Close);
2399        }
2400
2401        // Transmit CONNECTION_CLOSE if necessary
2402        if let State::Closed(_) = self.state {
2403            self.close = remote == self.path.remote;
2404        }
2405    }
2406
2407    fn process_decrypted_packet(
2408        &mut self,
2409        now: Instant,
2410        remote: SocketAddr,
2411        number: Option<u64>,
2412        packet: Packet,
2413    ) -> Result<(), ConnectionError> {
2414        let state = match self.state {
2415            State::Established => {
2416                match packet.header.space() {
2417                    SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2418                    _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2419                    _ => {
2420                        trace!("discarding unexpected pre-handshake packet");
2421                    }
2422                }
2423                return Ok(());
2424            }
2425            State::Closed(_) => {
2426                for result in frame::Iter::new(packet.payload.freeze())? {
2427                    let frame = match result {
2428                        Ok(frame) => frame,
2429                        Err(err) => {
2430                            debug!("frame decoding error: {err:?}");
2431                            continue;
2432                        }
2433                    };
2434
2435                    if let Frame::Padding = frame {
2436                        continue;
2437                    };
2438
2439                    self.stats.frame_rx.record(&frame);
2440
2441                    if let Frame::Close(_) = frame {
2442                        trace!("draining");
2443                        self.state = State::Draining;
2444                        break;
2445                    }
2446                }
2447                return Ok(());
2448            }
2449            State::Draining | State::Drained => return Ok(()),
2450            State::Handshake(ref mut state) => state,
2451        };
2452
2453        match packet.header {
2454            Header::Retry {
2455                src_cid: rem_cid, ..
2456            } => {
2457                if self.side.is_server() {
2458                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2459                }
2460
2461                if self.total_authed_packets > 1
2462                            || packet.payload.len() <= 16 // token + 16 byte tag
2463                            || !self.crypto.is_valid_retry(
2464                                &self.rem_cids.active(),
2465                                &packet.header_data,
2466                                &packet.payload,
2467                            )
2468                {
2469                    trace!("discarding invalid Retry");
2470                    // - After the client has received and processed an Initial or Retry
2471                    //   packet from the server, it MUST discard any subsequent Retry
2472                    //   packets that it receives.
2473                    // - A client MUST discard a Retry packet with a zero-length Retry Token
2474                    //   field.
2475                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
2476                    //   that cannot be validated
2477                    return Ok(());
2478                }
2479
2480                trace!("retrying with CID {}", rem_cid);
2481                let client_hello = state.client_hello.take().unwrap();
2482                self.retry_src_cid = Some(rem_cid);
2483                self.rem_cids.update_initial_cid(rem_cid);
2484                self.rem_handshake_cid = rem_cid;
2485
2486                let space = &mut self.spaces[SpaceId::Initial];
2487                if let Some(info) = space.take(0) {
2488                    self.on_packet_acked(now, info);
2489                };
2490
2491                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after any retransmitted Initials
2492                self.spaces[SpaceId::Initial] = PacketSpace {
2493                    crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2494                    next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2495                    crypto_offset: client_hello.len() as u64,
2496                    ..PacketSpace::new(now)
2497                };
2498                self.spaces[SpaceId::Initial]
2499                    .pending
2500                    .crypto
2501                    .push_back(frame::Crypto {
2502                        offset: 0,
2503                        data: client_hello,
2504                    });
2505
2506                // Retransmit all 0-RTT data
2507                let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2508                for info in zero_rtt.into_values() {
2509                    self.remove_in_flight(&info);
2510                    self.spaces[SpaceId::Data].pending |= info.retransmits;
2511                }
2512                self.streams.retransmit_all_for_0rtt();
2513
2514                let token_len = packet.payload.len() - 16;
2515                let ConnectionSide::Client { ref mut token, .. } = self.side else {
2516                    unreachable!("we already short-circuited if we're server");
2517                };
2518                *token = packet.payload.freeze().split_to(token_len);
2519                self.state = State::Handshake(state::Handshake {
2520                    expected_token: Bytes::new(),
2521                    rem_cid_set: false,
2522                    client_hello: None,
2523                });
2524                Ok(())
2525            }
2526            Header::Long {
2527                ty: LongType::Handshake,
2528                src_cid: rem_cid,
2529                ..
2530            } => {
2531                if rem_cid != self.rem_handshake_cid {
2532                    debug!(
2533                        "discarding packet with mismatched remote CID: {} != {}",
2534                        self.rem_handshake_cid, rem_cid
2535                    );
2536                    return Ok(());
2537                }
2538                self.on_path_validated();
2539
2540                self.process_early_payload(now, packet)?;
2541                if self.state.is_closed() {
2542                    return Ok(());
2543                }
2544
2545                if self.crypto.is_handshaking() {
2546                    trace!("handshake ongoing");
2547                    return Ok(());
2548                }
2549
2550                if self.side.is_client() {
2551                    // Client-only because server params were set from the client's Initial
2552                    let params =
2553                        self.crypto
2554                            .transport_parameters()?
2555                            .ok_or_else(|| TransportError {
2556                                code: TransportErrorCode::crypto(0x6d),
2557                                frame: None,
2558                                reason: "transport parameters missing".into(),
2559                            })?;
2560
2561                    if self.has_0rtt() {
2562                        if !self.crypto.early_data_accepted().unwrap() {
2563                            debug_assert!(self.side.is_client());
2564                            debug!("0-RTT rejected");
2565                            self.accepted_0rtt = false;
2566                            self.streams.zero_rtt_rejected();
2567
2568                            // Discard already-queued frames
2569                            self.spaces[SpaceId::Data].pending = Retransmits::default();
2570
2571                            // Discard 0-RTT packets
2572                            let sent_packets =
2573                                mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2574                            for packet in sent_packets.into_values() {
2575                                self.remove_in_flight(&packet);
2576                            }
2577                        } else {
2578                            self.accepted_0rtt = true;
2579                            params.validate_resumption_from(&self.peer_params)?;
2580                        }
2581                    }
2582                    if let Some(token) = params.stateless_reset_token {
2583                        self.endpoint_events
2584                            .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2585                    }
2586                    self.handle_peer_params(params)?;
2587                    self.issue_first_cids(now);
2588                } else {
2589                    // Server-only
2590                    self.spaces[SpaceId::Data].pending.handshake_done = true;
2591                    self.discard_space(now, SpaceId::Handshake);
2592                }
2593
2594                self.events.push_back(Event::Connected);
2595                self.state = State::Established;
2596                trace!("established");
2597                Ok(())
2598            }
2599            Header::Initial(InitialHeader {
2600                src_cid: rem_cid, ..
2601            }) => {
2602                if !state.rem_cid_set {
2603                    trace!("switching remote CID to {}", rem_cid);
2604                    let mut state = state.clone();
2605                    self.rem_cids.update_initial_cid(rem_cid);
2606                    self.rem_handshake_cid = rem_cid;
2607                    self.orig_rem_cid = rem_cid;
2608                    state.rem_cid_set = true;
2609                    self.state = State::Handshake(state);
2610                } else if rem_cid != self.rem_handshake_cid {
2611                    debug!(
2612                        "discarding packet with mismatched remote CID: {} != {}",
2613                        self.rem_handshake_cid, rem_cid
2614                    );
2615                    return Ok(());
2616                }
2617
2618                let starting_space = self.highest_space;
2619                self.process_early_payload(now, packet)?;
2620
2621                if self.side.is_server()
2622                    && starting_space == SpaceId::Initial
2623                    && self.highest_space != SpaceId::Initial
2624                {
2625                    let params =
2626                        self.crypto
2627                            .transport_parameters()?
2628                            .ok_or_else(|| TransportError {
2629                                code: TransportErrorCode::crypto(0x6d),
2630                                frame: None,
2631                                reason: "transport parameters missing".into(),
2632                            })?;
2633                    self.handle_peer_params(params)?;
2634                    self.issue_first_cids(now);
2635                    self.init_0rtt();
2636                }
2637                Ok(())
2638            }
2639            Header::Long {
2640                ty: LongType::ZeroRtt,
2641                ..
2642            } => {
2643                self.process_payload(now, remote, number.unwrap(), packet)?;
2644                Ok(())
2645            }
2646            Header::VersionNegotiate { .. } => {
2647                if self.total_authed_packets > 1 {
2648                    return Ok(());
2649                }
2650                let supported = packet
2651                    .payload
2652                    .chunks(4)
2653                    .any(|x| match <[u8; 4]>::try_from(x) {
2654                        Ok(version) => self.version == u32::from_be_bytes(version),
2655                        Err(_) => false,
2656                    });
2657                if supported {
2658                    return Ok(());
2659                }
2660                debug!("remote doesn't support our version");
2661                Err(ConnectionError::VersionMismatch)
2662            }
2663            Header::Short { .. } => unreachable!(
2664                "short packets received during handshake are discarded in handle_packet"
2665            ),
2666        }
2667    }
2668
2669    /// Process an Initial or Handshake packet payload
2670    fn process_early_payload(
2671        &mut self,
2672        now: Instant,
2673        packet: Packet,
2674    ) -> Result<(), TransportError> {
2675        debug_assert_ne!(packet.header.space(), SpaceId::Data);
2676        let payload_len = packet.payload.len();
2677        let mut ack_eliciting = false;
2678        for result in frame::Iter::new(packet.payload.freeze())? {
2679            let frame = result?;
2680            let span = match frame {
2681                Frame::Padding => continue,
2682                _ => Some(trace_span!("frame", ty = %frame.ty())),
2683            };
2684
2685            self.stats.frame_rx.record(&frame);
2686
2687            let _guard = span.as_ref().map(|x| x.enter());
2688            ack_eliciting |= frame.is_ack_eliciting();
2689
2690            // Process frames
2691            match frame {
2692                Frame::Padding | Frame::Ping => {}
2693                Frame::Crypto(frame) => {
2694                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
2695                }
2696                Frame::Ack(ack) => {
2697                    self.on_ack_received(now, packet.header.space(), ack)?;
2698                }
2699                Frame::Close(reason) => {
2700                    self.error = Some(reason.into());
2701                    self.state = State::Draining;
2702                    return Ok(());
2703                }
2704                _ => {
2705                    let mut err =
2706                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
2707                    err.frame = Some(frame.ty());
2708                    return Err(err);
2709                }
2710            }
2711        }
2712
2713        if ack_eliciting {
2714            // In the initial and handshake spaces, ACKs must be sent immediately
2715            self.spaces[packet.header.space()]
2716                .pending_acks
2717                .set_immediate_ack_required();
2718        }
2719
2720        // Stop sending serverhello if jls authentication failed
2721        match (self.crypto.is_jls(),self.crypto.is_jls_enabled()) {
2722            (Some(true), true) => {
2723                debug!("JLS authenticated");
2724            }
2725            (Some(false), true) => {
2726                warn!("JLS authentication falied");
2727                if self.side() == Side::Server {
2728                    return Ok(());
2729                }
2730            }
2731            (None, true) => {
2732                warn!("JLS not authenticated");
2733            }
2734            (_, false) => {
2735                debug!("JLS disabled");
2736            }
2737        }
2738
2739        self.write_crypto();
2740        Ok(())
2741    }
2742
2743    fn process_payload(
2744        &mut self,
2745        now: Instant,
2746        remote: SocketAddr,
2747        number: u64,
2748        packet: Packet,
2749    ) -> Result<(), TransportError> {
2750        let payload = packet.payload.freeze();
2751        let mut is_probing_packet = true;
2752        let mut close = None;
2753        let payload_len = payload.len();
2754        let mut ack_eliciting = false;
2755        for result in frame::Iter::new(payload)? {
2756            let frame = result?;
2757            let span = match frame {
2758                Frame::Padding => continue,
2759                _ => Some(trace_span!("frame", ty = %frame.ty())),
2760            };
2761
2762            self.stats.frame_rx.record(&frame);
2763            // Crypto, Stream and Datagram frames are special cased in order no pollute
2764            // the log with payload data
2765            match &frame {
2766                Frame::Crypto(f) => {
2767                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
2768                }
2769                Frame::Stream(f) => {
2770                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
2771                }
2772                Frame::Datagram(f) => {
2773                    trace!(len = f.data.len(), "got datagram frame");
2774                }
2775                f => {
2776                    trace!("got frame {:?}", f);
2777                }
2778            }
2779
2780            let _guard = span.as_ref().map(|x| x.enter());
2781            if packet.header.is_0rtt() {
2782                match frame {
2783                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
2784                        return Err(TransportError::PROTOCOL_VIOLATION(
2785                            "illegal frame type in 0-RTT",
2786                        ));
2787                    }
2788                    _ => {}
2789                }
2790            }
2791            ack_eliciting |= frame.is_ack_eliciting();
2792
2793            // Check whether this could be a probing packet
2794            match frame {
2795                Frame::Padding
2796                | Frame::PathChallenge(_)
2797                | Frame::PathResponse(_)
2798                | Frame::NewConnectionId(_) => {}
2799                _ => {
2800                    is_probing_packet = false;
2801                }
2802            }
2803            match frame {
2804                Frame::Crypto(frame) => {
2805                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
2806                }
2807                Frame::Stream(frame) => {
2808                    if self.streams.received(frame, payload_len)?.should_transmit() {
2809                        self.spaces[SpaceId::Data].pending.max_data = true;
2810                    }
2811                }
2812                Frame::Ack(ack) => {
2813                    self.on_ack_received(now, SpaceId::Data, ack)?;
2814                }
2815                Frame::Padding | Frame::Ping => {}
2816                Frame::Close(reason) => {
2817                    close = Some(reason);
2818                }
2819                Frame::PathChallenge(token) => {
2820                    self.path_responses.push(number, token, remote);
2821                    if remote == self.path.remote {
2822                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
2823                        // attack. Send a non-probing packet to recover the active path.
2824                        match self.peer_supports_ack_frequency() {
2825                            true => self.immediate_ack(),
2826                            false => self.ping(),
2827                        }
2828                    }
2829                }
2830                Frame::PathResponse(token) => {
2831                    if self.path.challenge == Some(token) && remote == self.path.remote {
2832                        trace!("new path validated");
2833                        self.timers.stop(Timer::PathValidation);
2834                        self.path.challenge = None;
2835                        self.path.validated = true;
2836                        if let Some((_, ref mut prev_path)) = self.prev_path {
2837                            prev_path.challenge = None;
2838                            prev_path.challenge_pending = false;
2839                        }
2840                    } else {
2841                        debug!(token, "ignoring invalid PATH_RESPONSE");
2842                    }
2843                }
2844                Frame::MaxData(bytes) => {
2845                    self.streams.received_max_data(bytes);
2846                }
2847                Frame::MaxStreamData { id, offset } => {
2848                    self.streams.received_max_stream_data(id, offset)?;
2849                }
2850                Frame::MaxStreams { dir, count } => {
2851                    self.streams.received_max_streams(dir, count)?;
2852                }
2853                Frame::ResetStream(frame) => {
2854                    if self.streams.received_reset(frame)?.should_transmit() {
2855                        self.spaces[SpaceId::Data].pending.max_data = true;
2856                    }
2857                }
2858                Frame::DataBlocked { offset } => {
2859                    debug!(offset, "peer claims to be blocked at connection level");
2860                }
2861                Frame::StreamDataBlocked { id, offset } => {
2862                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
2863                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
2864                        return Err(TransportError::STREAM_STATE_ERROR(
2865                            "STREAM_DATA_BLOCKED on send-only stream",
2866                        ));
2867                    }
2868                    debug!(
2869                        stream = %id,
2870                        offset, "peer claims to be blocked at stream level"
2871                    );
2872                }
2873                Frame::StreamsBlocked { dir, limit } => {
2874                    if limit > MAX_STREAM_COUNT {
2875                        return Err(TransportError::FRAME_ENCODING_ERROR(
2876                            "unrepresentable stream limit",
2877                        ));
2878                    }
2879                    debug!(
2880                        "peer claims to be blocked opening more than {} {} streams",
2881                        limit, dir
2882                    );
2883                }
2884                Frame::StopSending(frame::StopSending { id, error_code }) => {
2885                    if id.initiator() != self.side.side() {
2886                        if id.dir() == Dir::Uni {
2887                            debug!("got STOP_SENDING on recv-only {}", id);
2888                            return Err(TransportError::STREAM_STATE_ERROR(
2889                                "STOP_SENDING on recv-only stream",
2890                            ));
2891                        }
2892                    } else if self.streams.is_local_unopened(id) {
2893                        return Err(TransportError::STREAM_STATE_ERROR(
2894                            "STOP_SENDING on unopened stream",
2895                        ));
2896                    }
2897                    self.streams.received_stop_sending(id, error_code);
2898                }
2899                Frame::RetireConnectionId { sequence } => {
2900                    let allow_more_cids = self
2901                        .local_cid_state
2902                        .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
2903                    self.endpoint_events
2904                        .push_back(EndpointEventInner::RetireConnectionId(
2905                            now,
2906                            sequence,
2907                            allow_more_cids,
2908                        ));
2909                }
2910                Frame::NewConnectionId(frame) => {
2911                    trace!(
2912                        sequence = frame.sequence,
2913                        id = %frame.id,
2914                        retire_prior_to = frame.retire_prior_to,
2915                    );
2916                    if self.rem_cids.active().is_empty() {
2917                        return Err(TransportError::PROTOCOL_VIOLATION(
2918                            "NEW_CONNECTION_ID when CIDs aren't in use",
2919                        ));
2920                    }
2921                    if frame.retire_prior_to > frame.sequence {
2922                        return Err(TransportError::PROTOCOL_VIOLATION(
2923                            "NEW_CONNECTION_ID retiring unissued CIDs",
2924                        ));
2925                    }
2926
2927                    use crate::cid_queue::InsertError;
2928                    match self.rem_cids.insert(frame) {
2929                        Ok(None) => {}
2930                        Ok(Some((retired, reset_token))) => {
2931                            let pending_retired =
2932                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
2933                            /// Ensure `pending_retired` cannot grow without bound. Limit is
2934                            /// somewhat arbitrary but very permissive.
2935                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
2936                            // We don't bother counting in-flight frames because those are bounded
2937                            // by congestion control.
2938                            if (pending_retired.len() as u64)
2939                                .saturating_add(retired.end.saturating_sub(retired.start))
2940                                > MAX_PENDING_RETIRED_CIDS
2941                            {
2942                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
2943                                    "queued too many retired CIDs",
2944                                ));
2945                            }
2946                            pending_retired.extend(retired);
2947                            self.set_reset_token(reset_token);
2948                        }
2949                        Err(InsertError::ExceedsLimit) => {
2950                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
2951                        }
2952                        Err(InsertError::Retired) => {
2953                            trace!("discarding already-retired");
2954                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
2955                            // range of connection IDs larger than the active connection ID limit
2956                            // was retired all at once via retire_prior_to.
2957                            self.spaces[SpaceId::Data]
2958                                .pending
2959                                .retire_cids
2960                                .push(frame.sequence);
2961                            continue;
2962                        }
2963                    };
2964
2965                    if self.side.is_server() && self.rem_cids.active_seq() == 0 {
2966                        // We're a server still using the initial remote CID for the client, so
2967                        // let's switch immediately to enable clientside stateless resets.
2968                        self.update_rem_cid();
2969                    }
2970                }
2971                Frame::NewToken(NewToken { token }) => {
2972                    let ConnectionSide::Client {
2973                        token_store,
2974                        server_name,
2975                        ..
2976                    } = &self.side
2977                    else {
2978                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
2979                    };
2980                    if token.is_empty() {
2981                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
2982                    }
2983                    trace!("got new token");
2984                    token_store.insert(server_name, token);
2985                }
2986                Frame::Datagram(datagram) => {
2987                    if self
2988                        .datagrams
2989                        .received(datagram, &self.config.datagram_receive_buffer_size)?
2990                    {
2991                        self.events.push_back(Event::DatagramReceived);
2992                    }
2993                }
2994                Frame::AckFrequency(ack_frequency) => {
2995                    // This frame can only be sent in the Data space
2996                    let space = &mut self.spaces[SpaceId::Data];
2997
2998                    if !self
2999                        .ack_frequency
3000                        .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3001                    {
3002                        // The AckFrequency frame is stale (we have already received a more recent one)
3003                        continue;
3004                    }
3005
3006                    // Our `max_ack_delay` has been updated, so we may need to adjust its associated
3007                    // timeout
3008                    if let Some(timeout) = space
3009                        .pending_acks
3010                        .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3011                    {
3012                        self.timers.set(Timer::MaxAckDelay, timeout);
3013                    }
3014                }
3015                Frame::ImmediateAck => {
3016                    // This frame can only be sent in the Data space
3017                    self.spaces[SpaceId::Data]
3018                        .pending_acks
3019                        .set_immediate_ack_required();
3020                }
3021                Frame::HandshakeDone => {
3022                    if self.side.is_server() {
3023                        return Err(TransportError::PROTOCOL_VIOLATION(
3024                            "client sent HANDSHAKE_DONE",
3025                        ));
3026                    }
3027                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
3028                        self.discard_space(now, SpaceId::Handshake);
3029                    }
3030                }
3031            }
3032        }
3033
3034        let space = &mut self.spaces[SpaceId::Data];
3035        if space
3036            .pending_acks
3037            .packet_received(now, number, ack_eliciting, &space.dedup)
3038        {
3039            self.timers
3040                .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3041        }
3042
3043        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
3044        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
3045        // are only freed, and hence only issue credit, once the application has been notified
3046        // during a read on the stream.
3047        let pending = &mut self.spaces[SpaceId::Data].pending;
3048        self.streams.queue_max_stream_id(pending);
3049
3050        if let Some(reason) = close {
3051            self.error = Some(reason.into());
3052            self.state = State::Draining;
3053            self.close = true;
3054        }
3055
3056        if remote != self.path.remote
3057            && !is_probing_packet
3058            && number == self.spaces[SpaceId::Data].rx_packet
3059        {
3060            let ConnectionSide::Server { ref server_config } = self.side else {
3061                panic!("packets from unknown remote should be dropped by clients");
3062            };
3063            debug_assert!(
3064                server_config.migration,
3065                "migration-initiating packets should have been dropped immediately"
3066            );
3067            self.migrate(now, remote);
3068            // Break linkability, if possible
3069            self.update_rem_cid();
3070            self.spin = false;
3071        }
3072
3073        Ok(())
3074    }
3075
3076    fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3077        trace!(%remote, "migration initiated");
3078        self.path_counter = self.path_counter.wrapping_add(1);
3079        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
3080        // Note that the congestion window will not grow until validation terminates. Helps mitigate
3081        // amplification attacks performed by spoofing source addresses.
3082        let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3083            PathData::from_previous(remote, &self.path, self.path_counter, now)
3084        } else {
3085            let peer_max_udp_payload_size =
3086                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3087                    .unwrap_or(u16::MAX);
3088            PathData::new(
3089                remote,
3090                self.allow_mtud,
3091                Some(peer_max_udp_payload_size),
3092                self.path_counter,
3093                now,
3094                &self.config,
3095            )
3096        };
3097        new_path.challenge = Some(self.rng.random());
3098        new_path.challenge_pending = true;
3099        let prev_pto = self.pto(SpaceId::Data);
3100
3101        let mut prev = mem::replace(&mut self.path, new_path);
3102        // Don't clobber the original path if the previous one hasn't been validated yet
3103        if prev.challenge.is_none() {
3104            prev.challenge = Some(self.rng.random());
3105            prev.challenge_pending = true;
3106            // We haven't updated the remote CID yet, this captures the remote CID we were using on
3107            // the previous path.
3108            self.prev_path = Some((self.rem_cids.active(), prev));
3109        }
3110
3111        self.timers.set(
3112            Timer::PathValidation,
3113            now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3114        );
3115    }
3116
3117    /// Handle a change in the local address, i.e. an active migration
3118    pub fn local_address_changed(&mut self) {
3119        self.update_rem_cid();
3120        self.ping();
3121    }
3122
3123    /// Switch to a previously unused remote connection ID, if possible
3124    fn update_rem_cid(&mut self) {
3125        let (reset_token, retired) = match self.rem_cids.next() {
3126            Some(x) => x,
3127            None => return,
3128        };
3129
3130        // Retire the current remote CID and any CIDs we had to skip.
3131        self.spaces[SpaceId::Data]
3132            .pending
3133            .retire_cids
3134            .extend(retired);
3135        self.set_reset_token(reset_token);
3136    }
3137
3138    fn set_reset_token(&mut self, reset_token: ResetToken) {
3139        self.endpoint_events
3140            .push_back(EndpointEventInner::ResetToken(
3141                self.path.remote,
3142                reset_token,
3143            ));
3144        self.peer_params.stateless_reset_token = Some(reset_token);
3145    }
3146
3147    /// Issue an initial set of connection IDs to the peer upon connection
3148    fn issue_first_cids(&mut self, now: Instant) {
3149        if self.local_cid_state.cid_len() == 0 {
3150            return;
3151        }
3152
3153        // Subtract 1 to account for the CID we supplied while handshaking
3154        let mut n = self.peer_params.issue_cids_limit() - 1;
3155        if let ConnectionSide::Server { server_config } = &self.side {
3156            if server_config.has_preferred_address() {
3157                // We also sent a CID in the transport parameters
3158                n -= 1;
3159            }
3160        }
3161        self.endpoint_events
3162            .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3163    }
3164
3165    fn populate_packet(
3166        &mut self,
3167        now: Instant,
3168        space_id: SpaceId,
3169        buf: &mut Vec<u8>,
3170        max_size: usize,
3171        pn: u64,
3172    ) -> SentFrames {
3173        let mut sent = SentFrames::default();
3174        let space = &mut self.spaces[space_id];
3175        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3176        space.pending_acks.maybe_ack_non_eliciting();
3177
3178        // HANDSHAKE_DONE
3179        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3180            buf.write(frame::FrameType::HANDSHAKE_DONE);
3181            sent.retransmits.get_or_create().handshake_done = true;
3182            // This is just a u8 counter and the frame is typically just sent once
3183            self.stats.frame_tx.handshake_done =
3184                self.stats.frame_tx.handshake_done.saturating_add(1);
3185        }
3186
3187        // PING
3188        if mem::replace(&mut space.ping_pending, false) {
3189            trace!("PING");
3190            buf.write(frame::FrameType::PING);
3191            sent.non_retransmits = true;
3192            self.stats.frame_tx.ping += 1;
3193        }
3194
3195        // IMMEDIATE_ACK
3196        if mem::replace(&mut space.immediate_ack_pending, false) {
3197            trace!("IMMEDIATE_ACK");
3198            buf.write(frame::FrameType::IMMEDIATE_ACK);
3199            sent.non_retransmits = true;
3200            self.stats.frame_tx.immediate_ack += 1;
3201        }
3202
3203        // ACK
3204        if space.pending_acks.can_send() {
3205            Self::populate_acks(
3206                now,
3207                self.receiving_ecn,
3208                &mut sent,
3209                space,
3210                buf,
3211                &mut self.stats,
3212            );
3213        }
3214
3215        // ACK_FREQUENCY
3216        if mem::replace(&mut space.pending.ack_frequency, false) {
3217            let sequence_number = self.ack_frequency.next_sequence_number();
3218
3219            // Safe to unwrap because this is always provided when ACK frequency is enabled
3220            let config = self.config.ack_frequency_config.as_ref().unwrap();
3221
3222            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
3223            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3224                self.path.rtt.get(),
3225                config,
3226                &self.peer_params,
3227            );
3228
3229            trace!(?max_ack_delay, "ACK_FREQUENCY");
3230
3231            frame::AckFrequency {
3232                sequence: sequence_number,
3233                ack_eliciting_threshold: config.ack_eliciting_threshold,
3234                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3235                reordering_threshold: config.reordering_threshold,
3236            }
3237            .encode(buf);
3238
3239            sent.retransmits.get_or_create().ack_frequency = true;
3240
3241            self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3242            self.stats.frame_tx.ack_frequency += 1;
3243        }
3244
3245        // PATH_CHALLENGE
3246        if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3247            // Transmit challenges with every outgoing frame on an unvalidated path
3248            if let Some(token) = self.path.challenge {
3249                // But only send a packet solely for that purpose at most once
3250                self.path.challenge_pending = false;
3251                sent.non_retransmits = true;
3252                sent.requires_padding = true;
3253                trace!("PATH_CHALLENGE {:08x}", token);
3254                buf.write(frame::FrameType::PATH_CHALLENGE);
3255                buf.write(token);
3256                self.stats.frame_tx.path_challenge += 1;
3257            }
3258        }
3259
3260        // PATH_RESPONSE
3261        if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3262            if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3263                sent.non_retransmits = true;
3264                sent.requires_padding = true;
3265                trace!("PATH_RESPONSE {:08x}", token);
3266                buf.write(frame::FrameType::PATH_RESPONSE);
3267                buf.write(token);
3268                self.stats.frame_tx.path_response += 1;
3269            }
3270        }
3271
3272        // CRYPTO
3273        while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3274            let mut frame = match space.pending.crypto.pop_front() {
3275                Some(x) => x,
3276                None => break,
3277            };
3278
3279            // Calculate the maximum amount of crypto data we can store in the buffer.
3280            // Since the offset is known, we can reserve the exact size required to encode it.
3281            // For length we reserve 2bytes which allows to encode up to 2^14,
3282            // which is more than what fits into normally sized QUIC frames.
3283            let max_crypto_data_size = max_size
3284                - buf.len()
3285                - 1 // Frame Type
3286                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3287                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
3288
3289            let len = frame
3290                .data
3291                .len()
3292                .min(2usize.pow(14) - 1)
3293                .min(max_crypto_data_size);
3294
3295            let data = frame.data.split_to(len);
3296            let truncated = frame::Crypto {
3297                offset: frame.offset,
3298                data,
3299            };
3300            trace!(
3301                "CRYPTO: off {} len {}",
3302                truncated.offset,
3303                truncated.data.len()
3304            );
3305            truncated.encode(buf);
3306            self.stats.frame_tx.crypto += 1;
3307            sent.retransmits.get_or_create().crypto.push_back(truncated);
3308            if !frame.data.is_empty() {
3309                frame.offset += len as u64;
3310                space.pending.crypto.push_front(frame);
3311            }
3312        }
3313
3314        if space_id == SpaceId::Data {
3315            self.streams.write_control_frames(
3316                buf,
3317                &mut space.pending,
3318                &mut sent.retransmits,
3319                &mut self.stats.frame_tx,
3320                max_size,
3321            );
3322        }
3323
3324        // NEW_CONNECTION_ID
3325        while buf.len() + NewConnectionId::SIZE_BOUND < max_size {
3326            let issued = match space.pending.new_cids.pop() {
3327                Some(x) => x,
3328                None => break,
3329            };
3330            trace!(
3331                sequence = issued.sequence,
3332                id = %issued.id,
3333                "NEW_CONNECTION_ID"
3334            );
3335            frame::NewConnectionId {
3336                sequence: issued.sequence,
3337                retire_prior_to: self.local_cid_state.retire_prior_to(),
3338                id: issued.id,
3339                reset_token: issued.reset_token,
3340            }
3341            .encode(buf);
3342            sent.retransmits.get_or_create().new_cids.push(issued);
3343            self.stats.frame_tx.new_connection_id += 1;
3344        }
3345
3346        // RETIRE_CONNECTION_ID
3347        while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3348            let seq = match space.pending.retire_cids.pop() {
3349                Some(x) => x,
3350                None => break,
3351            };
3352            trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3353            buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3354            buf.write_var(seq);
3355            sent.retransmits.get_or_create().retire_cids.push(seq);
3356            self.stats.frame_tx.retire_connection_id += 1;
3357        }
3358
3359        // DATAGRAM
3360        let mut sent_datagrams = false;
3361        while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3362            match self.datagrams.write(buf, max_size) {
3363                true => {
3364                    sent_datagrams = true;
3365                    sent.non_retransmits = true;
3366                    self.stats.frame_tx.datagram += 1;
3367                }
3368                false => break,
3369            }
3370        }
3371        if self.datagrams.send_blocked && sent_datagrams {
3372            self.events.push_back(Event::DatagramsUnblocked);
3373            self.datagrams.send_blocked = false;
3374        }
3375
3376        // NEW_TOKEN
3377        while let Some(remote_addr) = space.pending.new_tokens.pop() {
3378            debug_assert_eq!(space_id, SpaceId::Data);
3379            let ConnectionSide::Server { server_config } = &self.side else {
3380                panic!("NEW_TOKEN frames should not be enqueued by clients");
3381            };
3382
3383            if remote_addr != self.path.remote {
3384                // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
3385                // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
3386                // frames upon an path change. Instead, when the new path becomes validated,
3387                // NEW_TOKEN frames may be enqueued for the new path instead.
3388                continue;
3389            }
3390
3391            let token = Token::new(
3392                TokenPayload::Validation {
3393                    ip: remote_addr.ip(),
3394                    issued: server_config.time_source.now(),
3395                },
3396                &mut self.rng,
3397            );
3398            let new_token = NewToken {
3399                token: token.encode(&*server_config.token_key).into(),
3400            };
3401
3402            if buf.len() + new_token.size() >= max_size {
3403                space.pending.new_tokens.push(remote_addr);
3404                break;
3405            }
3406
3407            new_token.encode(buf);
3408            sent.retransmits
3409                .get_or_create()
3410                .new_tokens
3411                .push(remote_addr);
3412            self.stats.frame_tx.new_token += 1;
3413        }
3414
3415        // STREAM
3416        if space_id == SpaceId::Data {
3417            sent.stream_frames =
3418                self.streams
3419                    .write_stream_frames(buf, max_size, self.config.send_fairness);
3420            self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
3421        }
3422
3423        sent
3424    }
3425
3426    /// Write pending ACKs into a buffer
3427    ///
3428    /// This method assumes ACKs are pending, and should only be called if
3429    /// `!PendingAcks::ranges().is_empty()` returns `true`.
3430    fn populate_acks(
3431        now: Instant,
3432        receiving_ecn: bool,
3433        sent: &mut SentFrames,
3434        space: &mut PacketSpace,
3435        buf: &mut Vec<u8>,
3436        stats: &mut ConnectionStats,
3437    ) {
3438        debug_assert!(!space.pending_acks.ranges().is_empty());
3439
3440        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
3441        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
3442        let ecn = if receiving_ecn {
3443            Some(&space.ecn_counters)
3444        } else {
3445            None
3446        };
3447        sent.largest_acked = space.pending_acks.ranges().max();
3448
3449        let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
3450
3451        // TODO: This should come from `TransportConfig` if that gets configurable.
3452        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
3453        let delay = delay_micros >> ack_delay_exp.into_inner();
3454
3455        trace!(
3456            "ACK {:?}, Delay = {}us",
3457            space.pending_acks.ranges(),
3458            delay_micros
3459        );
3460
3461        frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
3462        stats.frame_tx.acks += 1;
3463    }
3464
3465    fn close_common(&mut self) {
3466        trace!("connection closed");
3467        for &timer in &Timer::VALUES {
3468            self.timers.stop(timer);
3469        }
3470    }
3471
3472    fn set_close_timer(&mut self, now: Instant) {
3473        self.timers
3474            .set(Timer::Close, now + 3 * self.pto(self.highest_space));
3475    }
3476
3477    /// Handle transport parameters received from the peer
3478    fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
3479        if Some(self.orig_rem_cid) != params.initial_src_cid
3480            || (self.side.is_client()
3481                && (Some(self.initial_dst_cid) != params.original_dst_cid
3482                    || self.retry_src_cid != params.retry_src_cid))
3483        {
3484            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
3485                "CID authentication failure",
3486            ));
3487        }
3488
3489        self.set_peer_params(params);
3490
3491        Ok(())
3492    }
3493
3494    fn set_peer_params(&mut self, params: TransportParameters) {
3495        self.streams.set_params(&params);
3496        self.idle_timeout =
3497            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
3498        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
3499        if let Some(ref info) = params.preferred_address {
3500            self.rem_cids.insert(frame::NewConnectionId {
3501                sequence: 1,
3502                id: info.connection_id,
3503                reset_token: info.stateless_reset_token,
3504                retire_prior_to: 0,
3505            }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
3506        }
3507        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
3508        self.peer_params = params;
3509        self.path.mtud.on_peer_max_udp_payload_size_received(
3510            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
3511        );
3512    }
3513
3514    fn decrypt_packet(
3515        &mut self,
3516        now: Instant,
3517        packet: &mut Packet,
3518    ) -> Result<Option<u64>, Option<TransportError>> {
3519        let result = packet_crypto::decrypt_packet_body(
3520            packet,
3521            &self.spaces,
3522            self.zero_rtt_crypto.as_ref(),
3523            self.key_phase,
3524            self.prev_crypto.as_ref(),
3525            self.next_crypto.as_ref(),
3526        )?;
3527
3528        let result = match result {
3529            Some(r) => r,
3530            None => return Ok(None),
3531        };
3532
3533        if result.outgoing_key_update_acked {
3534            if let Some(prev) = self.prev_crypto.as_mut() {
3535                prev.end_packet = Some((result.number, now));
3536                self.set_key_discard_timer(now, packet.header.space());
3537            }
3538        }
3539
3540        if result.incoming_key_update {
3541            trace!("key update authenticated");
3542            self.update_keys(Some((result.number, now)), true);
3543            self.set_key_discard_timer(now, packet.header.space());
3544        }
3545
3546        Ok(Some(result.number))
3547    }
3548
3549    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
3550        trace!("executing key update");
3551        // Generate keys for the key phase after the one we're switching to, store them in
3552        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
3553        // `prev_crypto`.
3554        let new = self
3555            .crypto
3556            .next_1rtt_keys()
3557            .expect("only called for `Data` packets");
3558        self.key_phase_size = new
3559            .local
3560            .confidentiality_limit()
3561            .saturating_sub(KEY_UPDATE_MARGIN);
3562        let old = mem::replace(
3563            &mut self.spaces[SpaceId::Data]
3564                .crypto
3565                .as_mut()
3566                .unwrap() // safe because update_keys() can only be triggered by short packets
3567                .packet,
3568            mem::replace(self.next_crypto.as_mut().unwrap(), new),
3569        );
3570        self.spaces[SpaceId::Data].sent_with_keys = 0;
3571        self.prev_crypto = Some(PrevCrypto {
3572            crypto: old,
3573            end_packet,
3574            update_unacked: remote,
3575        });
3576        self.key_phase = !self.key_phase;
3577    }
3578
3579    fn peer_supports_ack_frequency(&self) -> bool {
3580        self.peer_params.min_ack_delay.is_some()
3581    }
3582
3583    /// Send an IMMEDIATE_ACK frame to the remote endpoint
3584    ///
3585    /// According to the spec, this will result in an error if the remote endpoint does not support
3586    /// the Acknowledgement Frequency extension
3587    pub(crate) fn immediate_ack(&mut self) {
3588        self.spaces[self.highest_space].immediate_ack_pending = true;
3589    }
3590
3591    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
3592    #[cfg(test)]
3593    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
3594        let (first_decode, remaining) = match &event.0 {
3595            ConnectionEventInner::Datagram(DatagramConnectionEvent {
3596                first_decode,
3597                remaining,
3598                ..
3599            }) => (first_decode, remaining),
3600            _ => return None,
3601        };
3602
3603        if remaining.is_some() {
3604            panic!("Packets should never be coalesced in tests");
3605        }
3606
3607        let decrypted_header = packet_crypto::unprotect_header(
3608            first_decode.clone(),
3609            &self.spaces,
3610            self.zero_rtt_crypto.as_ref(),
3611            self.peer_params.stateless_reset_token,
3612        )?;
3613
3614        let mut packet = decrypted_header.packet?;
3615        packet_crypto::decrypt_packet_body(
3616            &mut packet,
3617            &self.spaces,
3618            self.zero_rtt_crypto.as_ref(),
3619            self.key_phase,
3620            self.prev_crypto.as_ref(),
3621            self.next_crypto.as_ref(),
3622        )
3623        .ok()?;
3624
3625        Some(packet.payload.to_vec())
3626    }
3627
3628    /// The number of bytes of packets containing retransmittable frames that have not been
3629    /// acknowledged or declared lost.
3630    #[cfg(test)]
3631    pub(crate) fn bytes_in_flight(&self) -> u64 {
3632        self.path.in_flight.bytes
3633    }
3634
3635    /// Number of bytes worth of non-ack-only packets that may be sent
3636    #[cfg(test)]
3637    pub(crate) fn congestion_window(&self) -> u64 {
3638        self.path
3639            .congestion
3640            .window()
3641            .saturating_sub(self.path.in_flight.bytes)
3642    }
3643
3644    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
3645    #[cfg(test)]
3646    pub(crate) fn is_idle(&self) -> bool {
3647        Timer::VALUES
3648            .iter()
3649            .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
3650            .filter_map(|&t| Some((t, self.timers.get(t)?)))
3651            .min_by_key(|&(_, time)| time)
3652            .map_or(true, |(timer, _)| timer == Timer::Idle)
3653    }
3654
3655    /// Whether explicit congestion notification is in use on outgoing packets.
3656    #[cfg(test)]
3657    pub(crate) fn using_ecn(&self) -> bool {
3658        self.path.sending_ecn
3659    }
3660
3661    /// The number of received bytes in the current path
3662    #[cfg(test)]
3663    pub(crate) fn total_recvd(&self) -> u64 {
3664        self.path.total_recvd
3665    }
3666
3667    #[cfg(test)]
3668    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
3669        self.local_cid_state.active_seq()
3670    }
3671
3672    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
3673    /// with updated `retire_prior_to` field set to `v`
3674    #[cfg(test)]
3675    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
3676        let n = self.local_cid_state.assign_retire_seq(v);
3677        self.endpoint_events
3678            .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3679    }
3680
3681    /// Check the current active remote CID sequence
3682    #[cfg(test)]
3683    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
3684        self.rem_cids.active_seq()
3685    }
3686
3687    /// Returns the detected maximum udp payload size for the current path
3688    #[cfg(test)]
3689    pub(crate) fn path_mtu(&self) -> u16 {
3690        self.path.current_mtu()
3691    }
3692
3693    /// Whether we have 1-RTT data to send
3694    ///
3695    /// See also `self.space(SpaceId::Data).can_send()`
3696    fn can_send_1rtt(&self, max_size: usize) -> bool {
3697        self.streams.can_send_stream_data()
3698            || self.path.challenge_pending
3699            || self
3700                .prev_path
3701                .as_ref()
3702                .is_some_and(|(_, x)| x.challenge_pending)
3703            || !self.path_responses.is_empty()
3704            || self
3705                .datagrams
3706                .outgoing
3707                .front()
3708                .is_some_and(|x| x.size(true) <= max_size)
3709    }
3710
3711    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
3712    fn remove_in_flight(&mut self, packet: &SentPacket) {
3713        // Visit known paths from newest to oldest to find the one `packet` was sent on
3714        for path in [&mut self.path]
3715            .into_iter()
3716            .chain(self.prev_path.as_mut().map(|(_, data)| data))
3717        {
3718            if path.remove_in_flight(packet) {
3719                return;
3720            }
3721        }
3722    }
3723
3724    /// Terminate the connection instantly, without sending a close packet
3725    fn kill(&mut self, reason: ConnectionError) {
3726        self.close_common();
3727        self.error = Some(reason);
3728        self.state = State::Drained;
3729        self.endpoint_events.push_back(EndpointEventInner::Drained);
3730    }
3731
3732    /// Storage size required for the largest packet known to be supported by the current path
3733    ///
3734    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
3735    pub fn current_mtu(&self) -> u16 {
3736        self.path.current_mtu()
3737    }
3738
3739    /// Size of non-frame data for a 1-RTT packet
3740    ///
3741    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
3742    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
3743    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
3744    /// latency and packet loss.
3745    fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
3746        let pn_len = match pn {
3747            Some(pn) => PacketNumber::new(
3748                pn,
3749                self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
3750            )
3751            .len(),
3752            // Upper bound
3753            None => 4,
3754        };
3755
3756        // 1 byte for flags
3757        1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
3758    }
3759
3760    fn tag_len_1rtt(&self) -> usize {
3761        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
3762            Some(crypto) => Some(&*crypto.packet.local),
3763            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
3764        };
3765        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
3766        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
3767        // but that would needlessly prevent sending datagrams during 0-RTT.
3768        key.map_or(16, |x| x.tag_len())
3769    }
3770
3771    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
3772    fn on_path_validated(&mut self) {
3773        self.path.validated = true;
3774        let ConnectionSide::Server { server_config } = &self.side else {
3775            return;
3776        };
3777        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
3778        new_tokens.clear();
3779        for _ in 0..server_config.validation_token.sent {
3780            new_tokens.push(self.path.remote);
3781        }
3782    }
3783}
3784
3785impl fmt::Debug for Connection {
3786    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3787        f.debug_struct("Connection")
3788            .field("handshake_cid", &self.handshake_cid)
3789            .finish()
3790    }
3791}
3792
3793/// Fields of `Connection` specific to it being client-side or server-side
3794enum ConnectionSide {
3795    Client {
3796        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
3797        token: Bytes,
3798        token_store: Arc<dyn TokenStore>,
3799        server_name: String,
3800    },
3801    Server {
3802        server_config: Arc<ServerConfig>,
3803    },
3804}
3805
3806impl ConnectionSide {
3807    fn remote_may_migrate(&self) -> bool {
3808        match self {
3809            Self::Server { server_config } => server_config.migration,
3810            Self::Client { .. } => false,
3811        }
3812    }
3813
3814    fn is_client(&self) -> bool {
3815        self.side().is_client()
3816    }
3817
3818    fn is_server(&self) -> bool {
3819        self.side().is_server()
3820    }
3821
3822    fn side(&self) -> Side {
3823        match *self {
3824            Self::Client { .. } => Side::Client,
3825            Self::Server { .. } => Side::Server,
3826        }
3827    }
3828}
3829
3830impl From<SideArgs> for ConnectionSide {
3831    fn from(side: SideArgs) -> Self {
3832        match side {
3833            SideArgs::Client {
3834                token_store,
3835                server_name,
3836            } => Self::Client {
3837                token: token_store.take(&server_name).unwrap_or_default(),
3838                token_store,
3839                server_name,
3840            },
3841            SideArgs::Server {
3842                server_config,
3843                pref_addr_cid: _,
3844                path_validated: _,
3845            } => Self::Server { server_config },
3846        }
3847    }
3848}
3849
3850/// Parameters to `Connection::new` specific to it being client-side or server-side
3851pub(crate) enum SideArgs {
3852    Client {
3853        token_store: Arc<dyn TokenStore>,
3854        server_name: String,
3855    },
3856    Server {
3857        server_config: Arc<ServerConfig>,
3858        pref_addr_cid: Option<ConnectionId>,
3859        path_validated: bool,
3860    },
3861}
3862
3863impl SideArgs {
3864    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
3865        match *self {
3866            Self::Client { .. } => None,
3867            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
3868        }
3869    }
3870
3871    pub(crate) fn path_validated(&self) -> bool {
3872        match *self {
3873            Self::Client { .. } => true,
3874            Self::Server { path_validated, .. } => path_validated,
3875        }
3876    }
3877
3878    pub(crate) fn side(&self) -> Side {
3879        match *self {
3880            Self::Client { .. } => Side::Client,
3881            Self::Server { .. } => Side::Server,
3882        }
3883    }
3884}
3885
3886/// Reasons why a connection might be lost
3887#[derive(Debug, Error, Clone, PartialEq, Eq)]
3888pub enum ConnectionError {
3889    /// The peer doesn't implement any supported version
3890    #[error("peer doesn't implement any supported version")]
3891    VersionMismatch,
3892    /// The peer violated the QUIC specification as understood by this implementation
3893    #[error(transparent)]
3894    TransportError(#[from] TransportError),
3895    /// The peer's QUIC stack aborted the connection automatically
3896    #[error("aborted by peer: {0}")]
3897    ConnectionClosed(frame::ConnectionClose),
3898    /// The peer closed the connection
3899    #[error("closed by peer: {0}")]
3900    ApplicationClosed(frame::ApplicationClose),
3901    /// The peer is unable to continue processing this connection, usually due to having restarted
3902    #[error("reset by peer")]
3903    Reset,
3904    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
3905    ///
3906    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
3907    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
3908    /// and [`TransportConfig::keep_alive_interval()`].
3909    #[error("timed out")]
3910    TimedOut,
3911    /// The local application closed the connection
3912    #[error("closed")]
3913    LocallyClosed,
3914    /// The connection could not be created because not enough of the CID space is available
3915    ///
3916    /// Try using longer connection IDs.
3917    #[error("CIDs exhausted")]
3918    CidsExhausted,
3919
3920    /// JLS Authentication failed, return the upstream addr for forwarding
3921    #[error("JLS Authentication failed")]
3922    JlsAuthFailed(JlsAuthInner),
3923
3924    /// Jls Forward error
3925    #[error("JLS Forward Error")]
3926    JlsForwardError(String),
3927}
3928
3929#[derive(Debug, Clone, Eq, PartialEq)]
3930pub struct JlsAuthInner {
3931    pub upstream_addr: Option<String>,
3932}
3933
3934impl From<Close> for ConnectionError {
3935    fn from(x: Close) -> Self {
3936        match x {
3937            Close::Connection(reason) => Self::ConnectionClosed(reason),
3938            Close::Application(reason) => Self::ApplicationClosed(reason),
3939        }
3940    }
3941}
3942
3943// For compatibility with API consumers
3944impl From<ConnectionError> for io::Error {
3945    fn from(x: ConnectionError) -> Self {
3946        use ConnectionError::*;
3947        let kind = match x {
3948            TimedOut => io::ErrorKind::TimedOut,
3949            Reset => io::ErrorKind::ConnectionReset,
3950            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
3951            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
3952                io::ErrorKind::Other
3953            }
3954            _ => io::ErrorKind::Other,
3955        };
3956        Self::new(kind, x)
3957    }
3958}
3959
3960#[allow(unreachable_pub)] // fuzzing only
3961#[derive(Clone)]
3962pub enum State {
3963    Handshake(state::Handshake),
3964    Established,
3965    Closed(state::Closed),
3966    Draining,
3967    /// Waiting for application to call close so we can dispose of the resources
3968    Drained,
3969}
3970
3971impl State {
3972    fn closed<R: Into<Close>>(reason: R) -> Self {
3973        Self::Closed(state::Closed {
3974            reason: reason.into(),
3975        })
3976    }
3977
3978    fn is_handshake(&self) -> bool {
3979        matches!(*self, Self::Handshake(_))
3980    }
3981
3982    fn is_established(&self) -> bool {
3983        matches!(*self, Self::Established)
3984    }
3985
3986    fn is_closed(&self) -> bool {
3987        matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
3988    }
3989
3990    fn is_drained(&self) -> bool {
3991        matches!(*self, Self::Drained)
3992    }
3993}
3994
3995mod state {
3996    use super::*;
3997
3998    #[allow(unreachable_pub)] // fuzzing only
3999    #[derive(Clone)]
4000    pub struct Handshake {
4001        /// Whether the remote CID has been set by the peer yet
4002        ///
4003        /// Always set for servers
4004        pub(super) rem_cid_set: bool,
4005        /// Stateless retry token received in the first Initial by a server.
4006        ///
4007        /// Must be present in every Initial. Always empty for clients.
4008        pub(super) expected_token: Bytes,
4009        /// First cryptographic message
4010        ///
4011        /// Only set for clients
4012        pub(super) client_hello: Option<Bytes>,
4013    }
4014
4015    #[allow(unreachable_pub)] // fuzzing only
4016    #[derive(Clone)]
4017    pub struct Closed {
4018        pub(super) reason: Close,
4019    }
4020}
4021
4022/// Events of interest to the application
4023#[derive(Debug)]
4024pub enum Event {
4025    /// The connection's handshake data is ready
4026    HandshakeDataReady,
4027    /// The connection was successfully established
4028    Connected,
4029    /// The connection was lost
4030    ///
4031    /// Emitted if the peer closes the connection or an error is encountered.
4032    ConnectionLost {
4033        /// Reason that the connection was closed
4034        reason: ConnectionError,
4035    },
4036    /// Stream events
4037    Stream(StreamEvent),
4038    /// One or more application datagrams have been received
4039    DatagramReceived,
4040    /// One or more application datagrams have been sent after blocking
4041    DatagramsUnblocked,
4042}
4043
4044fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
4045    if x > y { x - y } else { Duration::ZERO }
4046}
4047
4048fn get_max_ack_delay(params: &TransportParameters) -> Duration {
4049    Duration::from_micros(params.max_ack_delay.0 * 1000)
4050}
4051
4052// Prevents overflow and improves behavior in extreme circumstances
4053const MAX_BACKOFF_EXPONENT: u32 = 16;
4054
4055/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
4056///
4057/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
4058/// plus some space for frames. We only care about handshake headers because short header packets
4059/// necessarily have smaller headers, and initial packets are only ever the first packet in a
4060/// datagram (because we coalesce in ascending packet space order and the only reason to split a
4061/// packet is when packet space changes).
4062const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
4063
4064/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
4065///
4066/// Excludes packet-type-specific fields such as packet number or Initial token
4067// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
4068// scid len + scid + length + pn
4069const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
4070    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
4071
4072/// Perform key updates this many packets before the AEAD confidentiality limit.
4073///
4074/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
4075const KEY_UPDATE_MARGIN: u64 = 10_000;
4076
4077#[derive(Default)]
4078struct SentFrames {
4079    retransmits: ThinRetransmits,
4080    largest_acked: Option<u64>,
4081    stream_frames: StreamMetaVec,
4082    /// Whether the packet contains non-retransmittable frames (like datagrams)
4083    non_retransmits: bool,
4084    requires_padding: bool,
4085}
4086
4087impl SentFrames {
4088    /// Returns whether the packet contains only ACKs
4089    fn is_ack_only(&self, streams: &StreamsState) -> bool {
4090        self.largest_acked.is_some()
4091            && !self.non_retransmits
4092            && self.stream_frames.is_empty()
4093            && self.retransmits.is_empty(streams)
4094    }
4095}
4096
4097/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
4098///
4099/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
4100///
4101/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
4102///
4103/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
4104fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
4105    match (x, y) {
4106        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
4107        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
4108        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
4109        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
4110    }
4111}
4112
4113#[cfg(test)]
4114mod tests {
4115    use super::*;
4116
4117    #[test]
4118    fn negotiate_max_idle_timeout_commutative() {
4119        let test_params = [
4120            (None, None, None),
4121            (None, Some(VarInt(0)), None),
4122            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
4123            (Some(VarInt(0)), Some(VarInt(0)), None),
4124            (
4125                Some(VarInt(2)),
4126                Some(VarInt(0)),
4127                Some(Duration::from_millis(2)),
4128            ),
4129            (
4130                Some(VarInt(1)),
4131                Some(VarInt(4)),
4132                Some(Duration::from_millis(1)),
4133            ),
4134        ];
4135
4136        for (left, right, result) in test_params {
4137            assert_eq!(negotiate_max_idle_timeout(left, right), result);
4138            assert_eq!(negotiate_max_idle_timeout(right, left), result);
4139        }
4140    }
4141}