ant_quic/connection/
mod.rs

1use std::{
2    cmp,
3    collections::VecDeque,
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12#[cfg(feature = "__qlog")]
13use qlog::{
14    Configuration, QLOG_VERSION, TraceSeq, VantagePoint, VantagePointType, events::EventData,
15    events::EventImportance, streamer::QlogStreamer,
16};
17
18use rand::{Rng, SeedableRng, rngs::StdRng};
19use thiserror::Error;
20use tracing::{debug, error, trace, trace_span, warn};
21
22use crate::{
23    Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
24    MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
25    TransportErrorCode, VarInt,
26    cid_generator::ConnectionIdGenerator,
27    cid_queue::CidQueue,
28    coding::BufMutExt,
29    config::{ServerConfig, TransportConfig},
30    crypto::{self, KeyPair, Keys, PacketKey},
31    frame::{self, Close, Datagram, FrameStruct, NewToken},
32    packet::{
33        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
34        PacketNumber, PartialDecode, SpaceId,
35    },
36    range_set::ArrayRangeSet,
37    shared::{
38        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
39        EndpointEvent, EndpointEventInner,
40    },
41    token::{ResetToken, Token, TokenPayload},
42    transport_parameters::TransportParameters,
43};
44
45mod ack_frequency;
46use ack_frequency::AckFrequencyState;
47
48pub mod nat_traversal;
49pub use nat_traversal::{NatTraversalRole, NatTraversalError, CoordinationPhase};
50use nat_traversal::NatTraversalState;
51
52mod assembler;
53pub use assembler::Chunk;
54
55mod cid_state;
56use cid_state::CidState;
57
58mod datagrams;
59use datagrams::DatagramState;
60pub use datagrams::{Datagrams, SendDatagramError};
61
62mod mtud;
63mod pacing;
64
65mod packet_builder;
66use packet_builder::PacketBuilder;
67
68mod packet_crypto;
69use packet_crypto::{PrevCrypto, ZeroRttCrypto};
70
71mod paths;
72pub use paths::RttEstimator;
73use paths::{PathData, PathResponses};
74
75mod send_buffer;
76
77mod spaces;
78#[cfg(fuzzing)]
79pub use spaces::Retransmits;
80#[cfg(not(fuzzing))]
81use spaces::Retransmits;
82use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
83
84mod stats;
85pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
86
87mod streams;
88#[cfg(fuzzing)]
89pub use streams::StreamsState;
90#[cfg(not(fuzzing))]
91use streams::StreamsState;
92pub use streams::{
93    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
94    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
95};
96
97mod timer;
98use crate::congestion::Controller;
99use timer::{Timer, TimerTable};
100
101/// Protocol state and logic for a single QUIC connection
102///
103/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
104/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
105/// expects timeouts through various methods. A number of simple getter methods are exposed
106/// to allow callers to inspect some of the connection state.
107///
108/// `Connection` has roughly 4 types of methods:
109///
110/// - A. Simple getters, taking `&self`
111/// - B. Handlers for incoming events from the network or system, named `handle_*`.
112/// - C. State machine mutators, for incoming commands from the application. For convenience we
113///   refer to this as "performing I/O" below, however as per the design of this library none of the
114///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
115///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
116/// - D. Polling functions for outgoing events or actions for the caller to
117///   take, named `poll_*`.
118///
119/// The simplest way to use this API correctly is to call (B) and (C) whenever
120/// appropriate, then after each of those calls, as soon as feasible call all
121/// polling methods (D) and deal with their outputs appropriately, e.g. by
122/// passing it to the application or by making a system-level I/O call. You
123/// should call the polling functions in this order:
124///
125/// 1. [`poll_transmit`](Self::poll_transmit)
126/// 2. [`poll_timeout`](Self::poll_timeout)
127/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
128/// 4. [`poll`](Self::poll)
129///
130/// Currently the only actual dependency is from (2) to (1), however additional
131/// dependencies may be added in future, so the above order is recommended.
132///
133/// (A) may be called whenever desired.
134///
135/// Care should be made to ensure that the input events represent monotonically
136/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
137/// with events of the same [`Instant`] may be interleaved in any order with a
138/// call to [`handle_event`](Self::handle_event) at that same instant; however
139/// events or timeouts with different instants must not be interleaved.
140pub struct Connection {
141    endpoint_config: Arc<EndpointConfig>,
142    config: Arc<TransportConfig>,
143    rng: StdRng,
144    crypto: Box<dyn crypto::Session>,
145    /// The CID we initially chose, for use during the handshake
146    handshake_cid: ConnectionId,
147    /// The CID the peer initially chose, for use during the handshake
148    rem_handshake_cid: ConnectionId,
149    /// The "real" local IP address which was was used to receive the initial packet.
150    /// This is only populated for the server case, and if known
151    local_ip: Option<IpAddr>,
152    path: PathData,
153    /// Whether MTU detection is supported in this environment
154    allow_mtud: bool,
155    prev_path: Option<(ConnectionId, PathData)>,
156    state: State,
157    side: ConnectionSide,
158    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
159    zero_rtt_enabled: bool,
160    /// Set if 0-RTT is supported, then cleared when no longer needed.
161    zero_rtt_crypto: Option<ZeroRttCrypto>,
162    key_phase: bool,
163    /// How many packets are in the current key phase. Used only for `Data` space.
164    key_phase_size: u64,
165    /// Transport parameters set by the peer
166    peer_params: TransportParameters,
167    /// Source ConnectionId of the first packet received from the peer
168    orig_rem_cid: ConnectionId,
169    /// Destination ConnectionId sent by the client on the first Initial
170    initial_dst_cid: ConnectionId,
171    /// The value that the server included in the Source Connection ID field of a Retry packet, if
172    /// one was received
173    retry_src_cid: Option<ConnectionId>,
174    /// Total number of outgoing packets that have been deemed lost
175    lost_packets: u64,
176    events: VecDeque<Event>,
177    endpoint_events: VecDeque<EndpointEventInner>,
178    /// Whether the spin bit is in use for this connection
179    spin_enabled: bool,
180    /// Outgoing spin bit state
181    spin: bool,
182    /// Packet number spaces: initial, handshake, 1-RTT
183    spaces: [PacketSpace; 3],
184    /// Highest usable packet number space
185    highest_space: SpaceId,
186    /// 1-RTT keys used prior to a key update
187    prev_crypto: Option<PrevCrypto>,
188    /// 1-RTT keys to be used for the next key update
189    ///
190    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
191    /// spoofing key updates.
192    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
193    accepted_0rtt: bool,
194    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
195    permit_idle_reset: bool,
196    /// Negotiated idle timeout
197    idle_timeout: Option<Duration>,
198    timers: TimerTable,
199    /// Number of packets received which could not be authenticated
200    authentication_failures: u64,
201    /// Why the connection was lost, if it has been
202    error: Option<ConnectionError>,
203    /// Identifies Data-space packet numbers to skip. Not used in earlier spaces.
204    packet_number_filter: PacketNumberFilter,
205
206    //
207    // Queued non-retransmittable 1-RTT data
208    //
209    /// Responses to PATH_CHALLENGE frames
210    path_responses: PathResponses,
211    close: bool,
212
213    //
214    // ACK frequency
215    //
216    ack_frequency: AckFrequencyState,
217
218    //
219    // Loss Detection
220    //
221    /// The number of times a PTO has been sent without receiving an ack.
222    pto_count: u32,
223
224    //
225    // Congestion Control
226    //
227    /// Whether the most recently received packet had an ECN codepoint set
228    receiving_ecn: bool,
229    /// Number of packets authenticated
230    total_authed_packets: u64,
231    /// Whether the last `poll_transmit` call yielded no data because there was
232    /// no outgoing application data.
233    app_limited: bool,
234
235    streams: StreamsState,
236    /// Surplus remote CIDs for future use on new paths
237    rem_cids: CidQueue,
238    // Attributes of CIDs generated by local peer
239    local_cid_state: CidState,
240    /// State of the unreliable datagram extension
241    datagrams: DatagramState,
242    /// Connection level statistics
243    stats: ConnectionStats,
244    /// QUIC version used for the connection.
245    version: u32,
246
247    /// NAT traversal state for establishing direct P2P connections
248    nat_traversal: Option<NatTraversalState>,
249
250    /// qlog streamer to ouput events as JSON text sequences
251    #[cfg(feature = "__qlog")]
252    qlog_streamer: Option<QlogStreamer>,
253}
254
255impl Connection {
256    pub(crate) fn new(
257        endpoint_config: Arc<EndpointConfig>,
258        config: Arc<TransportConfig>,
259        init_cid: ConnectionId,
260        loc_cid: ConnectionId,
261        rem_cid: ConnectionId,
262        remote: SocketAddr,
263        local_ip: Option<IpAddr>,
264        crypto: Box<dyn crypto::Session>,
265        cid_gen: &dyn ConnectionIdGenerator,
266        now: Instant,
267        version: u32,
268        allow_mtud: bool,
269        rng_seed: [u8; 32],
270        side_args: SideArgs,
271    ) -> Self {
272        let pref_addr_cid = side_args.pref_addr_cid();
273        let path_validated = side_args.path_validated();
274        let connection_side = ConnectionSide::from(side_args);
275        let side = connection_side.side();
276        let initial_space = PacketSpace {
277            crypto: Some(crypto.initial_keys(&init_cid, side)),
278            ..PacketSpace::new(now)
279        };
280        let state = State::Handshake(state::Handshake {
281            rem_cid_set: side.is_server(),
282            expected_token: Bytes::new(),
283            client_hello: None,
284        });
285        let mut rng = StdRng::from_seed(rng_seed);
286        let mut this = Self {
287            endpoint_config,
288            crypto,
289            handshake_cid: loc_cid,
290            rem_handshake_cid: rem_cid,
291            local_cid_state: CidState::new(
292                cid_gen.cid_len(),
293                cid_gen.cid_lifetime(),
294                now,
295                if pref_addr_cid.is_some() { 2 } else { 1 },
296            ),
297            path: PathData::new(remote, allow_mtud, None, now, &config),
298            allow_mtud,
299            local_ip,
300            prev_path: None,
301            state,
302            side: connection_side,
303            zero_rtt_enabled: false,
304            zero_rtt_crypto: None,
305            key_phase: false,
306            // A small initial key phase size ensures peers that don't handle key updates correctly
307            // fail sooner rather than later. It's okay for both peers to do this, as the first one
308            // to perform an update will reset the other's key phase size in `update_keys`, and a
309            // simultaneous key update by both is just like a regular key update with a really fast
310            // response. Inspired by quic-go's similar behavior of performing the first key update
311            // at the 100th short-header packet.
312            key_phase_size: rng.random_range(10..1000),
313            peer_params: TransportParameters::default(),
314            orig_rem_cid: rem_cid,
315            initial_dst_cid: init_cid,
316            retry_src_cid: None,
317            lost_packets: 0,
318            events: VecDeque::new(),
319            endpoint_events: VecDeque::new(),
320            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
321            spin: false,
322            spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
323            highest_space: SpaceId::Initial,
324            prev_crypto: None,
325            next_crypto: None,
326            accepted_0rtt: false,
327            permit_idle_reset: true,
328            idle_timeout: match config.max_idle_timeout {
329                None | Some(VarInt(0)) => None,
330                Some(dur) => Some(Duration::from_millis(dur.0)),
331            },
332            timers: TimerTable::default(),
333            authentication_failures: 0,
334            error: None,
335            #[cfg(test)]
336            packet_number_filter: match config.deterministic_packet_numbers {
337                false => PacketNumberFilter::new(&mut rng),
338                true => PacketNumberFilter::disabled(),
339            },
340            #[cfg(not(test))]
341            packet_number_filter: PacketNumberFilter::new(&mut rng),
342
343            path_responses: PathResponses::default(),
344            close: false,
345
346            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
347                &TransportParameters::default(),
348            )),
349
350            pto_count: 0,
351
352            app_limited: false,
353            receiving_ecn: false,
354            total_authed_packets: 0,
355
356            streams: StreamsState::new(
357                side,
358                config.max_concurrent_uni_streams,
359                config.max_concurrent_bidi_streams,
360                config.send_window,
361                config.receive_window,
362                config.stream_receive_window,
363            ),
364            datagrams: DatagramState::default(),
365            config,
366            rem_cids: CidQueue::new(rem_cid),
367            rng,
368            stats: ConnectionStats::default(),
369            version,
370            nat_traversal: None, // Will be initialized when NAT traversal is negotiated
371
372            #[cfg(feature = "__qlog")]
373            qlog_streamer: None,
374        };
375        if path_validated {
376            this.on_path_validated();
377        }
378        if side.is_client() {
379            // Kick off the connection
380            this.write_crypto();
381            this.init_0rtt();
382        }
383        this
384    }
385
386    /// Set up qlog for this connection
387    #[cfg(feature = "__qlog")]
388    pub fn set_qlog(
389        &mut self,
390        writer: Box<dyn io::Write + Send + Sync>,
391        title: Option<String>,
392        description: Option<String>,
393        now: Instant,
394    ) {
395        let ty = if self.side.is_server() {
396            VantagePointType::Server
397        } else {
398            VantagePointType::Client
399        };
400
401        let level = EventImportance::Core;
402
403        let trace = TraceSeq::new(
404            VantagePoint {
405                name: None,
406                ty,
407                flow: None,
408            },
409            title.clone(),
410            description.clone(),
411            Some(Configuration {
412                time_offset: Some(0.0),
413                original_uris: None,
414            }),
415            None,
416        );
417
418        let mut streamer = QlogStreamer::new(
419            QLOG_VERSION.to_string(),
420            title,
421            description,
422            None,
423            now,
424            trace,
425            level,
426            writer,
427        );
428
429        if let Err(e) = streamer.start_log() {
430            warn!("could not initialize qlog streamer: {e}");
431            return;
432        }
433
434        self.qlog_streamer = Some(streamer);
435    }
436
437    /// Emit a `MetricsUpdated` event to the qlog streamer containing only updated values
438    #[cfg(feature = "__qlog")]
439    fn emit_qlog_recovery_metrics(&mut self, now: Instant) {
440        let Some(qlog_streamer) = &mut self.qlog_streamer else {
441            return;
442        };
443
444        let Some(metrics) = self.path.qlog_congestion_metrics(self.pto_count) else {
445            return;
446        };
447
448        let event = EventData::MetricsUpdated(metrics);
449
450        if let Err(e) = qlog_streamer.add_event_data_with_instant(event, now) {
451            warn!("could not emit qlog event, dropping qlog streamer: {e}");
452            self.qlog_streamer = None;
453        }
454    }
455
456    /// Returns the next time at which `handle_timeout` should be called
457    ///
458    /// The value returned may change after:
459    /// - the application performed some I/O on the connection
460    /// - a call was made to `handle_event`
461    /// - a call to `poll_transmit` returned `Some`
462    /// - a call was made to `handle_timeout`
463    #[must_use]
464    pub fn poll_timeout(&mut self) -> Option<Instant> {
465        self.timers.next_timeout()
466    }
467
468    /// Returns application-facing events
469    ///
470    /// Connections should be polled for events after:
471    /// - a call was made to `handle_event`
472    /// - a call was made to `handle_timeout`
473    #[must_use]
474    pub fn poll(&mut self) -> Option<Event> {
475        if let Some(x) = self.events.pop_front() {
476            return Some(x);
477        }
478
479        if let Some(event) = self.streams.poll() {
480            return Some(Event::Stream(event));
481        }
482
483        if let Some(err) = self.error.take() {
484            return Some(Event::ConnectionLost { reason: err });
485        }
486
487        None
488    }
489
490    /// Return endpoint-facing events
491    #[must_use]
492    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
493        self.endpoint_events.pop_front().map(EndpointEvent)
494    }
495
496    /// Provide control over streams
497    #[must_use]
498    pub fn streams(&mut self) -> Streams<'_> {
499        Streams {
500            state: &mut self.streams,
501            conn_state: &self.state,
502        }
503    }
504
505    /// Provide control over streams
506    #[must_use]
507    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
508        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
509        RecvStream {
510            id,
511            state: &mut self.streams,
512            pending: &mut self.spaces[SpaceId::Data].pending,
513        }
514    }
515
516    /// Provide control over streams
517    #[must_use]
518    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
519        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
520        SendStream {
521            id,
522            state: &mut self.streams,
523            pending: &mut self.spaces[SpaceId::Data].pending,
524            conn_state: &self.state,
525        }
526    }
527
528    /// Returns packets to transmit
529    ///
530    /// Connections should be polled for transmit after:
531    /// - the application performed some I/O on the connection
532    /// - a call was made to `handle_event`
533    /// - a call was made to `handle_timeout`
534    ///
535    /// `max_datagrams` specifies how many datagrams can be returned inside a
536    /// single Transmit using GSO. This must be at least 1.
537    #[must_use]
538    pub fn poll_transmit(
539        &mut self,
540        now: Instant,
541        max_datagrams: usize,
542        buf: &mut Vec<u8>,
543    ) -> Option<Transmit> {
544        assert!(max_datagrams != 0);
545        let max_datagrams = match self.config.enable_segmentation_offload {
546            false => 1,
547            true => max_datagrams,
548        };
549
550        let mut num_datagrams = 0;
551        // Position in `buf` of the first byte of the current UDP datagram. When coalescing QUIC
552        // packets, this can be earlier than the start of the current QUIC packet.
553        let mut datagram_start = 0;
554        let mut segment_size = usize::from(self.path.current_mtu());
555
556        // Check for NAT traversal coordination timeouts
557        if let Some(nat_traversal) = &mut self.nat_traversal {
558            if nat_traversal.check_coordination_timeout(now) {
559                trace!("NAT traversal coordination timed out, may retry");
560            }
561        }
562
563        // First priority: NAT traversal PATH_CHALLENGE packets (includes coordination)
564        if let Some(challenge) = self.send_nat_traversal_challenge(now, buf) {
565            return Some(challenge);
566        }
567        
568        if let Some(challenge) = self.send_path_challenge(now, buf) {
569            return Some(challenge);
570        }
571
572        // If we need to send a probe, make sure we have something to send.
573        for space in SpaceId::iter() {
574            let request_immediate_ack =
575                space == SpaceId::Data && self.peer_supports_ack_frequency();
576            self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
577        }
578
579        // Check whether we need to send a close message
580        let close = match self.state {
581            State::Drained => {
582                self.app_limited = true;
583                return None;
584            }
585            State::Draining | State::Closed(_) => {
586                // self.close is only reset once the associated packet had been
587                // encoded successfully
588                if !self.close {
589                    self.app_limited = true;
590                    return None;
591                }
592                true
593            }
594            _ => false,
595        };
596
597        // Check whether we need to send an ACK_FREQUENCY frame
598        if let Some(config) = &self.config.ack_frequency_config {
599            self.spaces[SpaceId::Data].pending.ack_frequency = self
600                .ack_frequency
601                .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
602                && self.highest_space == SpaceId::Data
603                && self.peer_supports_ack_frequency();
604        }
605
606        // Reserving capacity can provide more capacity than we asked for. However, we are not
607        // allowed to write more than `segment_size`. Therefore the maximum capacity is tracked
608        // separately.
609        let mut buf_capacity = 0;
610
611        let mut coalesce = true;
612        let mut builder_storage: Option<PacketBuilder> = None;
613        let mut sent_frames = None;
614        let mut pad_datagram = false;
615        let mut pad_datagram_to_mtu = false;
616        let mut congestion_blocked = false;
617
618        // Iterate over all spaces and find data to send
619        let mut space_idx = 0;
620        let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
621        // This loop will potentially spend multiple iterations in the same `SpaceId`,
622        // so we cannot trivially rewrite it to take advantage of `SpaceId::iter()`.
623        while space_idx < spaces.len() {
624            let space_id = spaces[space_idx];
625            // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed to
626            // be able to send an individual frame at least this large in the next 1-RTT
627            // packet. This could be generalized to support every space, but it's only needed to
628            // handle large fixed-size frames, which only exist in 1-RTT (application datagrams). We
629            // don't account for coalesced packets potentially occupying space because frames can
630            // always spill into the next datagram.
631            let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
632            let frame_space_1rtt =
633                segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
634
635            // Is there data or a close message to send in this space?
636            let can_send = self.space_can_send(space_id, frame_space_1rtt);
637            if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
638                space_idx += 1;
639                continue;
640            }
641
642            let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
643                || self.spaces[space_id].ping_pending
644                || self.spaces[space_id].immediate_ack_pending;
645            if space_id == SpaceId::Data {
646                ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
647            }
648
649            pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
650
651            // Can we append more data into the current buffer?
652            // It is not safe to assume that `buf.len()` is the end of the data,
653            // since the last packet might not have been finished.
654            let buf_end = if let Some(builder) = &builder_storage {
655                buf.len().max(builder.min_size) + builder.tag_len
656            } else {
657                buf.len()
658            };
659
660            let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
661                crypto.packet.local.tag_len()
662            } else if space_id == SpaceId::Data {
663                self.zero_rtt_crypto.as_ref().expect(
664                    "sending packets in the application data space requires known 0-RTT or 1-RTT keys",
665                ).packet.tag_len()
666            } else {
667                unreachable!("tried to send {:?} packet without keys", space_id)
668            };
669            if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
670                // We need to send 1 more datagram and extend the buffer for that.
671
672                // Is 1 more datagram allowed?
673                if num_datagrams >= max_datagrams {
674                    // No more datagrams allowed
675                    break;
676                }
677
678                // Anti-amplification is only based on `total_sent`, which gets
679                // updated at the end of this method. Therefore we pass the amount
680                // of bytes for datagrams that are already created, as well as 1 byte
681                // for starting another datagram. If there is any anti-amplification
682                // budget left, we always allow a full MTU to be sent
683                // (see https://github.com/quinn-rs/quinn/issues/1082)
684                if self
685                    .path
686                    .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
687                {
688                    trace!("blocked by anti-amplification");
689                    break;
690                }
691
692                // Congestion control and pacing checks
693                // Tail loss probes must not be blocked by congestion, or a deadlock could arise
694                if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
695                    // Assume the current packet will get padded to fill the segment
696                    let untracked_bytes = if let Some(builder) = &builder_storage {
697                        buf_capacity - builder.partial_encode.start
698                    } else {
699                        0
700                    } as u64;
701                    debug_assert!(untracked_bytes <= segment_size as u64);
702
703                    let bytes_to_send = segment_size as u64 + untracked_bytes;
704                    if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
705                        space_idx += 1;
706                        congestion_blocked = true;
707                        // We continue instead of breaking here in order to avoid
708                        // blocking loss probes queued for higher spaces.
709                        trace!("blocked by congestion control");
710                        continue;
711                    }
712
713                    // Check whether the next datagram is blocked by pacing
714                    let smoothed_rtt = self.path.rtt.get();
715                    if let Some(delay) = self.path.pacing.delay(
716                        smoothed_rtt,
717                        bytes_to_send,
718                        self.path.current_mtu(),
719                        self.path.congestion.window(),
720                        now,
721                    ) {
722                        self.timers.set(Timer::Pacing, delay);
723                        congestion_blocked = true;
724                        // Loss probes should be subject to pacing, even though
725                        // they are not congestion controlled.
726                        trace!("blocked by pacing");
727                        break;
728                    }
729                }
730
731                // Finish current packet
732                if let Some(mut builder) = builder_storage.take() {
733                    if pad_datagram {
734                        builder.pad_to(MIN_INITIAL_SIZE);
735                    }
736
737                    if num_datagrams > 1 || pad_datagram_to_mtu {
738                        // If too many padding bytes would be required to continue the GSO batch
739                        // after this packet, end the GSO batch here. Ensures that fixed-size frames
740                        // with heterogeneous sizes (e.g. application datagrams) won't inadvertently
741                        // waste large amounts of bandwidth. The exact threshold is a bit arbitrary
742                        // and might benefit from further tuning, though there's no universally
743                        // optimal value.
744                        //
745                        // Additionally, if this datagram is a loss probe and `segment_size` is
746                        // larger than `INITIAL_MTU`, then padding it to `segment_size` to continue
747                        // the GSO batch would risk failure to recover from a reduction in path
748                        // MTU. Loss probes are the only packets for which we might grow
749                        // `buf_capacity` by less than `segment_size`.
750                        const MAX_PADDING: usize = 16;
751                        let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
752                            - datagram_start
753                            + builder.tag_len;
754                        if (packet_len_unpadded + MAX_PADDING < segment_size
755                            && !pad_datagram_to_mtu)
756                            || datagram_start + segment_size > buf_capacity
757                        {
758                            trace!(
759                                "GSO truncated by demand for {} padding bytes or loss probe",
760                                segment_size - packet_len_unpadded
761                            );
762                            builder_storage = Some(builder);
763                            break;
764                        }
765
766                        // Pad the current datagram to GSO segment size so it can be included in the
767                        // GSO batch.
768                        builder.pad_to(segment_size as u16);
769                    }
770
771                    builder.finish_and_track(now, self, sent_frames.take(), buf);
772
773                    if num_datagrams == 1 {
774                        // Set the segment size for this GSO batch to the size of the first UDP
775                        // datagram in the batch. Larger data that cannot be fragmented
776                        // (e.g. application datagrams) will be included in a future batch. When
777                        // sending large enough volumes of data for GSO to be useful, we expect
778                        // packet sizes to usually be consistent, e.g. populated by max-size STREAM
779                        // frames or uniformly sized datagrams.
780                        segment_size = buf.len();
781                        // Clip the unused capacity out of the buffer so future packets don't
782                        // overrun
783                        buf_capacity = buf.len();
784
785                        // Check whether the data we planned to send will fit in the reduced segment
786                        // size. If not, bail out and leave it for the next GSO batch so we don't
787                        // end up trying to send an empty packet. We can't easily compute the right
788                        // segment size before the original call to `space_can_send`, because at
789                        // that time we haven't determined whether we're going to coalesce with the
790                        // first datagram or potentially pad it to `MIN_INITIAL_SIZE`.
791                        if space_id == SpaceId::Data {
792                            let frame_space_1rtt =
793                                segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
794                            if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
795                                break;
796                            }
797                        }
798                    }
799                }
800
801                // Allocate space for another datagram
802                let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
803                    0 => segment_size,
804                    _ => {
805                        self.spaces[space_id].loss_probes -= 1;
806                        // Clamp the datagram to at most the minimum MTU to ensure that loss probes
807                        // can get through and enable recovery even if the path MTU has shrank
808                        // unexpectedly.
809                        std::cmp::min(segment_size, usize::from(INITIAL_MTU))
810                    }
811                };
812                buf_capacity += next_datagram_size_limit;
813                if buf.capacity() < buf_capacity {
814                    // We reserve the maximum space for sending `max_datagrams` upfront
815                    // to avoid any reallocations if more datagrams have to be appended later on.
816                    // Benchmarks have shown shown a 5-10% throughput improvement
817                    // compared to continuously resizing the datagram buffer.
818                    // While this will lead to over-allocation for small transmits
819                    // (e.g. purely containing ACKs), modern memory allocators
820                    // (e.g. mimalloc and jemalloc) will pool certain allocation sizes
821                    // and therefore this is still rather efficient.
822                    buf.reserve(max_datagrams * segment_size);
823                }
824                num_datagrams += 1;
825                coalesce = true;
826                pad_datagram = false;
827                datagram_start = buf.len();
828
829                debug_assert_eq!(
830                    datagram_start % segment_size,
831                    0,
832                    "datagrams in a GSO batch must be aligned to the segment size"
833                );
834            } else {
835                // We can append/coalesce the next packet into the current
836                // datagram.
837                // Finish current packet without adding extra padding
838                if let Some(builder) = builder_storage.take() {
839                    builder.finish_and_track(now, self, sent_frames.take(), buf);
840                }
841            }
842
843            debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
844
845            //
846            // From here on, we've determined that a packet will definitely be sent.
847            //
848
849            if self.spaces[SpaceId::Initial].crypto.is_some()
850                && space_id == SpaceId::Handshake
851                && self.side.is_client()
852            {
853                // A client stops both sending and processing Initial packets when it
854                // sends its first Handshake packet.
855                self.discard_space(now, SpaceId::Initial);
856            }
857            if let Some(ref mut prev) = self.prev_crypto {
858                prev.update_unacked = false;
859            }
860
861            debug_assert!(
862                builder_storage.is_none() && sent_frames.is_none(),
863                "Previous packet must have been finished"
864            );
865
866            let builder = builder_storage.insert(PacketBuilder::new(
867                now,
868                space_id,
869                self.rem_cids.active(),
870                buf,
871                buf_capacity,
872                datagram_start,
873                ack_eliciting,
874                self,
875            )?);
876            coalesce = coalesce && !builder.short_header;
877
878            // https://tools.ietf.org/html/draft-ietf-quic-transport-34#section-14.1
879            pad_datagram |=
880                space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
881
882            if close {
883                trace!("sending CONNECTION_CLOSE");
884                // Encode ACKs before the ConnectionClose message, to give the receiver
885                // a better approximate on what data has been processed. This is
886                // especially important with ack delay, since the peer might not
887                // have gotten any other ACK for the data earlier on.
888                if !self.spaces[space_id].pending_acks.ranges().is_empty() {
889                    Self::populate_acks(
890                        now,
891                        self.receiving_ecn,
892                        &mut SentFrames::default(),
893                        &mut self.spaces[space_id],
894                        buf,
895                        &mut self.stats,
896                    );
897                }
898
899                // Since there only 64 ACK frames there will always be enough space
900                // to encode the ConnectionClose frame too. However we still have the
901                // check here to prevent crashes if something changes.
902                debug_assert!(
903                    buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
904                    "ACKs should leave space for ConnectionClose"
905                );
906                if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
907                    let max_frame_size = builder.max_size - buf.len();
908                    match self.state {
909                        State::Closed(state::Closed { ref reason }) => {
910                            if space_id == SpaceId::Data || reason.is_transport_layer() {
911                                reason.encode(buf, max_frame_size)
912                            } else {
913                                frame::ConnectionClose {
914                                    error_code: TransportErrorCode::APPLICATION_ERROR,
915                                    frame_type: None,
916                                    reason: Bytes::new(),
917                                }
918                                .encode(buf, max_frame_size)
919                            }
920                        }
921                        State::Draining => frame::ConnectionClose {
922                            error_code: TransportErrorCode::NO_ERROR,
923                            frame_type: None,
924                            reason: Bytes::new(),
925                        }
926                        .encode(buf, max_frame_size),
927                        _ => unreachable!(
928                            "tried to make a close packet when the connection wasn't closed"
929                        ),
930                    }
931                }
932                if space_id == self.highest_space {
933                    // Don't send another close packet
934                    self.close = false;
935                    // `CONNECTION_CLOSE` is the final packet
936                    break;
937                } else {
938                    // Send a close frame in every possible space for robustness, per RFC9000
939                    // "Immediate Close during the Handshake". Don't bother trying to send anything
940                    // else.
941                    space_idx += 1;
942                    continue;
943                }
944            }
945
946            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path
947            // validation can occur while the link is saturated.
948            if space_id == SpaceId::Data && num_datagrams == 1 {
949                if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
950                    // `unwrap` guaranteed to succeed because `builder_storage` was populated just
951                    // above.
952                    let mut builder = builder_storage.take().unwrap();
953                    trace!("PATH_RESPONSE {:08x} (off-path)", token);
954                    buf.write(frame::FrameType::PATH_RESPONSE);
955                    buf.write(token);
956                    self.stats.frame_tx.path_response += 1;
957                    builder.pad_to(MIN_INITIAL_SIZE);
958                    builder.finish_and_track(
959                        now,
960                        self,
961                        Some(SentFrames {
962                            non_retransmits: true,
963                            ..SentFrames::default()
964                        }),
965                        buf,
966                    );
967                    self.stats.udp_tx.on_sent(1, buf.len());
968                    return Some(Transmit {
969                        destination: remote,
970                        size: buf.len(),
971                        ecn: None,
972                        segment_size: None,
973                        src_ip: self.local_ip,
974                    });
975                }
976            }
977
978            let sent =
979                self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
980
981            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
982            // any other reason, there is a bug which leads to one component announcing write
983            // readiness while not writing any data. This degrades performance. The condition is
984            // only checked if the full MTU is available and when potentially large fixed-size
985            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
986            // writing ACKs.
987            debug_assert!(
988                !(sent.is_ack_only(&self.streams)
989                    && !can_send.acks
990                    && can_send.other
991                    && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
992                    && self.datagrams.outgoing.is_empty()),
993                "SendableFrames was {can_send:?}, but only ACKs have been written"
994            );
995            pad_datagram |= sent.requires_padding;
996
997            if sent.largest_acked.is_some() {
998                self.spaces[space_id].pending_acks.acks_sent();
999                self.timers.stop(Timer::MaxAckDelay);
1000            }
1001
1002            // Keep information about the packet around until it gets finalized
1003            sent_frames = Some(sent);
1004
1005            // Don't increment space_idx.
1006            // We stay in the current space and check if there is more data to send.
1007        }
1008
1009        // Finish the last packet
1010        if let Some(mut builder) = builder_storage {
1011            if pad_datagram {
1012                builder.pad_to(MIN_INITIAL_SIZE);
1013            }
1014
1015            // If this datagram is a loss probe and `segment_size` is larger than `INITIAL_MTU`,
1016            // then padding it to `segment_size` would risk failure to recover from a reduction in
1017            // path MTU.
1018            // Loss probes are the only packets for which we might grow `buf_capacity`
1019            // by less than `segment_size`.
1020            if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
1021                builder.pad_to(segment_size as u16);
1022            }
1023
1024            let last_packet_number = builder.exact_number;
1025            builder.finish_and_track(now, self, sent_frames, buf);
1026            self.path
1027                .congestion
1028                .on_sent(now, buf.len() as u64, last_packet_number);
1029
1030            #[cfg(feature = "__qlog")]
1031            self.emit_qlog_recovery_metrics(now);
1032        }
1033
1034        self.app_limited = buf.is_empty() && !congestion_blocked;
1035
1036        // Send MTU probe if necessary
1037        if buf.is_empty() && self.state.is_established() {
1038            let space_id = SpaceId::Data;
1039            let probe_size = self
1040                .path
1041                .mtud
1042                .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
1043
1044            let buf_capacity = probe_size as usize;
1045            buf.reserve(buf_capacity);
1046
1047            let mut builder = PacketBuilder::new(
1048                now,
1049                space_id,
1050                self.rem_cids.active(),
1051                buf,
1052                buf_capacity,
1053                0,
1054                true,
1055                self,
1056            )?;
1057
1058            // We implement MTU probes as ping packets padded up to the probe size
1059            buf.write(frame::FrameType::PING);
1060            self.stats.frame_tx.ping += 1;
1061
1062            // If supported by the peer, we want no delays to the probe's ACK
1063            if self.peer_supports_ack_frequency() {
1064                buf.write(frame::FrameType::IMMEDIATE_ACK);
1065                self.stats.frame_tx.immediate_ack += 1;
1066            }
1067
1068            builder.pad_to(probe_size);
1069            let sent_frames = SentFrames {
1070                non_retransmits: true,
1071                ..Default::default()
1072            };
1073            builder.finish_and_track(now, self, Some(sent_frames), buf);
1074
1075            self.stats.path.sent_plpmtud_probes += 1;
1076            num_datagrams = 1;
1077
1078            trace!(?probe_size, "writing MTUD probe");
1079        }
1080
1081        if buf.is_empty() {
1082            return None;
1083        }
1084
1085        trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1086        self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1087
1088        self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
1089
1090        Some(Transmit {
1091            destination: self.path.remote,
1092            size: buf.len(),
1093            ecn: if self.path.sending_ecn {
1094                Some(EcnCodepoint::Ect0)
1095            } else {
1096                None
1097            },
1098            segment_size: match num_datagrams {
1099                1 => None,
1100                _ => Some(segment_size),
1101            },
1102            src_ip: self.local_ip,
1103        })
1104    }
1105
1106    /// Send PUNCH_ME_NOW for coordination if necessary
1107    fn send_coordination_request(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1108        // Get coordination info without borrowing mutably
1109        let should_send = self.nat_traversal.as_ref()?.should_send_punch_request();
1110        if !should_send {
1111            return None;
1112        }
1113        
1114        let (round, target_addrs, coordinator_addr) = {
1115            let nat_traversal = self.nat_traversal.as_ref()?;
1116            let coord = nat_traversal.coordination.as_ref()?;
1117            let addrs: Vec<_> = coord.punch_targets.iter().map(|t| t.remote_addr).collect();
1118            (coord.round, addrs, self.path.remote) // Placeholder - should be bootstrap node
1119        };
1120        
1121        if target_addrs.is_empty() {
1122            return None;
1123        }
1124        
1125        debug_assert_eq!(
1126            self.highest_space,
1127            SpaceId::Data,
1128            "PUNCH_ME_NOW queued without 1-RTT keys"
1129        );
1130        
1131        buf.reserve(MIN_INITIAL_SIZE as usize);
1132        let buf_capacity = buf.capacity();
1133        
1134        let mut builder = PacketBuilder::new(
1135            now,
1136            SpaceId::Data,
1137            self.rem_cids.active(),
1138            buf,
1139            buf_capacity,
1140            0,
1141            false,
1142            self,
1143        )?;
1144        
1145        trace!("sending PUNCH_ME_NOW round {} with {} targets", round, target_addrs.len());
1146        
1147        // Write PUNCH_ME_NOW frame
1148        buf.write(frame::FrameType::PUNCH_ME_NOW);
1149        buf.write(round);
1150        buf.write(target_addrs.len() as u8);
1151        for addr in target_addrs {
1152            match addr {
1153                SocketAddr::V4(v4) => {
1154                    buf.write(4u8); // IPv4
1155                    buf.write(u32::from(*v4.ip()));
1156                    buf.write(v4.port());
1157                }
1158                SocketAddr::V6(v6) => {
1159                    buf.write(6u8); // IPv6  
1160                    buf.write(*v6.ip());
1161                    buf.write(v6.port());
1162                }
1163            }
1164        }
1165        
1166        self.stats.frame_tx.ping += 1; // Use ping counter for now
1167        
1168        builder.pad_to(MIN_INITIAL_SIZE);
1169        builder.finish_and_track(now, self, None, buf);
1170        
1171        // Mark request sent after packet is built
1172        if let Some(nat_traversal) = &mut self.nat_traversal {
1173            nat_traversal.mark_punch_request_sent();
1174        }
1175        
1176        Some(Transmit {
1177            destination: coordinator_addr,
1178            size: buf.len(),
1179            ecn: if self.path.sending_ecn {
1180                Some(EcnCodepoint::Ect0)
1181            } else {
1182                None
1183            },
1184            segment_size: None,
1185            src_ip: self.local_ip,
1186        })
1187    }
1188
1189    /// Send coordinated PATH_CHALLENGE for hole punching
1190    fn send_coordinated_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1191        // Check if it's time to start synchronized hole punching
1192        if let Some(nat_traversal) = &mut self.nat_traversal {
1193            if nat_traversal.should_start_punching(now) {
1194                nat_traversal.start_punching_phase(now);
1195            }
1196        }
1197        
1198        // Get punch targets if we're in punching phase
1199        let (target_addr, challenge) = {
1200            let nat_traversal = self.nat_traversal.as_ref()?;
1201            match nat_traversal.get_coordination_phase() {
1202                Some(CoordinationPhase::Punching) => {
1203                    let targets = nat_traversal.get_punch_targets()?;
1204                    if targets.is_empty() {
1205                        return None;
1206                    }
1207                    // Send PATH_CHALLENGE to the first target (could be round-robin in future)
1208                    let target = &targets[0];
1209                    (target.remote_addr, target.challenge)
1210                }
1211                _ => return None,
1212            }
1213        };
1214        
1215        debug_assert_eq!(
1216            self.highest_space,
1217            SpaceId::Data,
1218            "PATH_CHALLENGE queued without 1-RTT keys"
1219        );
1220        
1221        buf.reserve(MIN_INITIAL_SIZE as usize);
1222        let buf_capacity = buf.capacity();
1223        
1224        let mut builder = PacketBuilder::new(
1225            now,
1226            SpaceId::Data,
1227            self.rem_cids.active(),
1228            buf,
1229            buf_capacity,
1230            0,
1231            false,
1232            self,
1233        )?;
1234        
1235        trace!("sending coordinated PATH_CHALLENGE {:08x} to {}", challenge, target_addr);
1236        buf.write(frame::FrameType::PATH_CHALLENGE);
1237        buf.write(challenge);
1238        self.stats.frame_tx.path_challenge += 1;
1239
1240        builder.pad_to(MIN_INITIAL_SIZE);
1241        builder.finish_and_track(now, self, None, buf);
1242        
1243        // Mark coordination as validating after packet is built
1244        if let Some(nat_traversal) = &mut self.nat_traversal {
1245            nat_traversal.mark_coordination_validating();
1246        }
1247        
1248        Some(Transmit {
1249            destination: target_addr,
1250            size: buf.len(),
1251            ecn: if self.path.sending_ecn {
1252                Some(EcnCodepoint::Ect0)
1253            } else {
1254                None
1255            },
1256            segment_size: None,
1257            src_ip: self.local_ip,
1258        })
1259    }
1260
1261    /// Send PATH_CHALLENGE for NAT traversal candidates if necessary
1262    fn send_nat_traversal_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1263        // Priority 1: Coordination protocol requests
1264        if let Some(request) = self.send_coordination_request(now, buf) {
1265            return Some(request);
1266        }
1267        
1268        // Priority 2: Coordinated hole punching
1269        if let Some(punch) = self.send_coordinated_path_challenge(now, buf) {
1270            return Some(punch);
1271        }
1272        
1273        // Priority 3: Regular candidate validation (fallback)
1274        let (remote_addr, remote_sequence) = {
1275            let nat_traversal = self.nat_traversal.as_ref()?;
1276            let candidates = nat_traversal.get_validation_candidates();
1277            if candidates.is_empty() {
1278                return None;
1279            }
1280            // Get the highest priority candidate
1281            let (sequence, candidate) = candidates[0];
1282            (candidate.address, sequence)
1283        };
1284        
1285        let challenge = rand::Rng::random::<u64>(&mut self.rng);
1286        
1287        // Start validation for this candidate
1288        if let Err(e) = self.nat_traversal.as_mut()?.start_validation(remote_sequence, challenge, now) {
1289            warn!("Failed to start NAT traversal validation: {}", e);
1290            return None;
1291        }
1292        
1293        debug_assert_eq!(
1294            self.highest_space,
1295            SpaceId::Data,
1296            "PATH_CHALLENGE queued without 1-RTT keys"
1297        );
1298        
1299        buf.reserve(MIN_INITIAL_SIZE as usize);
1300        let buf_capacity = buf.capacity();
1301        
1302        // Use current connection ID for NAT traversal PATH_CHALLENGE
1303        let mut builder = PacketBuilder::new(
1304            now,
1305            SpaceId::Data,
1306            self.rem_cids.active(),
1307            buf,
1308            buf_capacity,
1309            0,
1310            false,
1311            self,
1312        )?;
1313        
1314        trace!("sending PATH_CHALLENGE {:08x} to NAT candidate {}", challenge, remote_addr);
1315        buf.write(frame::FrameType::PATH_CHALLENGE);
1316        buf.write(challenge);
1317        self.stats.frame_tx.path_challenge += 1;
1318
1319        // PATH_CHALLENGE frames must be padded to at least 1200 bytes
1320        builder.pad_to(MIN_INITIAL_SIZE);
1321        
1322        builder.finish_and_track(now, self, None, buf);
1323        
1324        Some(Transmit {
1325            destination: remote_addr,
1326            size: buf.len(),
1327            ecn: if self.path.sending_ecn {
1328                Some(EcnCodepoint::Ect0)
1329            } else {
1330                None
1331            },
1332            segment_size: None,
1333            src_ip: self.local_ip,
1334        })
1335    }
1336
1337    /// Send PATH_CHALLENGE for a previous path if necessary
1338    fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1339        let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1340        if !prev_path.challenge_pending {
1341            return None;
1342        }
1343        prev_path.challenge_pending = false;
1344        let token = prev_path
1345            .challenge
1346            .expect("previous path challenge pending without token");
1347        let destination = prev_path.remote;
1348        debug_assert_eq!(
1349            self.highest_space,
1350            SpaceId::Data,
1351            "PATH_CHALLENGE queued without 1-RTT keys"
1352        );
1353        buf.reserve(MIN_INITIAL_SIZE as usize);
1354
1355        let buf_capacity = buf.capacity();
1356
1357        // Use the previous CID to avoid linking the new path with the previous path. We
1358        // don't bother accounting for possible retirement of that prev_cid because this is
1359        // sent once, immediately after migration, when the CID is known to be valid. Even
1360        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1361        // this is sent first.
1362        let mut builder = PacketBuilder::new(
1363            now,
1364            SpaceId::Data,
1365            *prev_cid,
1366            buf,
1367            buf_capacity,
1368            0,
1369            false,
1370            self,
1371        )?;
1372        trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1373        buf.write(frame::FrameType::PATH_CHALLENGE);
1374        buf.write(token);
1375        self.stats.frame_tx.path_challenge += 1;
1376
1377        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1378        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1379        // unless the anti-amplification limit for the path does not permit
1380        // sending a datagram of this size
1381        builder.pad_to(MIN_INITIAL_SIZE);
1382
1383        builder.finish(self, buf);
1384        self.stats.udp_tx.on_sent(1, buf.len());
1385
1386        Some(Transmit {
1387            destination,
1388            size: buf.len(),
1389            ecn: None,
1390            segment_size: None,
1391            src_ip: self.local_ip,
1392        })
1393    }
1394
1395    /// Indicate what types of frames are ready to send for the given space
1396    fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1397        if self.spaces[space_id].crypto.is_none()
1398            && (space_id != SpaceId::Data
1399                || self.zero_rtt_crypto.is_none()
1400                || self.side.is_server())
1401        {
1402            // No keys available for this space
1403            return SendableFrames::empty();
1404        }
1405        let mut can_send = self.spaces[space_id].can_send(&self.streams);
1406        if space_id == SpaceId::Data {
1407            can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1408        }
1409        can_send
1410    }
1411
1412    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1413    ///
1414    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1415    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1416    /// extracted through the relevant methods.
1417    pub fn handle_event(&mut self, event: ConnectionEvent) {
1418        use ConnectionEventInner::*;
1419        match event.0 {
1420            Datagram(DatagramConnectionEvent {
1421                now,
1422                remote,
1423                ecn,
1424                first_decode,
1425                remaining,
1426            }) => {
1427                // If this packet could initiate a migration and we're a client or a server that
1428                // forbids migration, drop the datagram. This could be relaxed to heuristically
1429                // permit NAT-rebinding-like migration.
1430                if remote != self.path.remote && !self.side.remote_may_migrate() {
1431                    trace!("discarding packet from unrecognized peer {}", remote);
1432                    return;
1433                }
1434
1435                let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1436
1437                self.stats.udp_rx.datagrams += 1;
1438                self.stats.udp_rx.bytes += first_decode.len() as u64;
1439                let data_len = first_decode.len();
1440
1441                self.handle_decode(now, remote, ecn, first_decode);
1442                // The current `path` might have changed inside `handle_decode`,
1443                // since the packet could have triggered a migration. Make sure
1444                // the data received is accounted for the most recent path by accessing
1445                // `path` after `handle_decode`.
1446                self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1447
1448                if let Some(data) = remaining {
1449                    self.stats.udp_rx.bytes += data.len() as u64;
1450                    self.handle_coalesced(now, remote, ecn, data);
1451                }
1452
1453                #[cfg(feature = "__qlog")]
1454                self.emit_qlog_recovery_metrics(now);
1455
1456                if was_anti_amplification_blocked {
1457                    // A prior attempt to set the loss detection timer may have failed due to
1458                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1459                    // the server's first flight is lost.
1460                    self.set_loss_detection_timer(now);
1461                }
1462            }
1463            NewIdentifiers(ids, now) => {
1464                self.local_cid_state.new_cids(&ids, now);
1465                ids.into_iter().rev().for_each(|frame| {
1466                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1467                });
1468                // Update Timer::PushNewCid
1469                if self
1470                    .timers
1471                    .get(Timer::PushNewCid)
1472                    .map_or(true, |x| x <= now)
1473                {
1474                    self.reset_cid_retirement();
1475                }
1476            }
1477        }
1478    }
1479
1480    /// Process timer expirations
1481    ///
1482    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1483    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1484    /// methods.
1485    ///
1486    /// It is most efficient to call this immediately after the system clock reaches the latest
1487    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1488    /// no-op and therefore are safe.
1489    pub fn handle_timeout(&mut self, now: Instant) {
1490        for &timer in &Timer::VALUES {
1491            if !self.timers.is_expired(timer, now) {
1492                continue;
1493            }
1494            self.timers.stop(timer);
1495            trace!(timer = ?timer, "timeout");
1496            match timer {
1497                Timer::Close => {
1498                    self.state = State::Drained;
1499                    self.endpoint_events.push_back(EndpointEventInner::Drained);
1500                }
1501                Timer::Idle => {
1502                    self.kill(ConnectionError::TimedOut);
1503                }
1504                Timer::KeepAlive => {
1505                    trace!("sending keep-alive");
1506                    self.ping();
1507                }
1508                Timer::LossDetection => {
1509                    self.on_loss_detection_timeout(now);
1510
1511                    #[cfg(feature = "__qlog")]
1512                    self.emit_qlog_recovery_metrics(now);
1513                }
1514                Timer::KeyDiscard => {
1515                    self.zero_rtt_crypto = None;
1516                    self.prev_crypto = None;
1517                }
1518                Timer::PathValidation => {
1519                    debug!("path validation failed");
1520                    if let Some((_, prev)) = self.prev_path.take() {
1521                        self.path = prev;
1522                    }
1523                    self.path.challenge = None;
1524                    self.path.challenge_pending = false;
1525                }
1526                Timer::Pacing => trace!("pacing timer expired"),
1527                Timer::PushNewCid => {
1528                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1529                    let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1530                    if !self.state.is_closed() {
1531                        trace!(
1532                            "push a new cid to peer RETIRE_PRIOR_TO field {}",
1533                            self.local_cid_state.retire_prior_to()
1534                        );
1535                        self.endpoint_events
1536                            .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1537                    }
1538                }
1539                Timer::MaxAckDelay => {
1540                    trace!("max ack delay reached");
1541                    // This timer is only armed in the Data space
1542                    self.spaces[SpaceId::Data]
1543                        .pending_acks
1544                        .on_max_ack_delay_timeout()
1545                }
1546            }
1547        }
1548    }
1549
1550    /// Close a connection immediately
1551    ///
1552    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
1553    /// call this only when all important communications have been completed, e.g. by calling
1554    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
1555    /// [`StreamEvent::Finished`] event.
1556    ///
1557    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
1558    /// delivered. There may still be data from the peer that has not been received.
1559    ///
1560    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
1561    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1562        self.close_inner(
1563            now,
1564            Close::Application(frame::ApplicationClose { error_code, reason }),
1565        )
1566    }
1567
1568    fn close_inner(&mut self, now: Instant, reason: Close) {
1569        let was_closed = self.state.is_closed();
1570        if !was_closed {
1571            self.close_common();
1572            self.set_close_timer(now);
1573            self.close = true;
1574            self.state = State::Closed(state::Closed { reason });
1575        }
1576    }
1577
1578    /// Control datagrams
1579    pub fn datagrams(&mut self) -> Datagrams<'_> {
1580        Datagrams { conn: self }
1581    }
1582
1583    /// Returns connection statistics
1584    pub fn stats(&self) -> ConnectionStats {
1585        let mut stats = self.stats;
1586        stats.path.rtt = self.path.rtt.get();
1587        stats.path.cwnd = self.path.congestion.window();
1588        stats.path.current_mtu = self.path.mtud.current_mtu();
1589
1590        stats
1591    }
1592
1593    /// Ping the remote endpoint
1594    ///
1595    /// Causes an ACK-eliciting packet to be transmitted.
1596    pub fn ping(&mut self) {
1597        self.spaces[self.highest_space].ping_pending = true;
1598    }
1599
1600    /// Update traffic keys spontaneously
1601    ///
1602    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
1603    pub fn force_key_update(&mut self) {
1604        if !self.state.is_established() {
1605            debug!("ignoring forced key update in illegal state");
1606            return;
1607        }
1608        if self.prev_crypto.is_some() {
1609            // We already just updated, or are currently updating, the keys. Concurrent key updates
1610            // are illegal.
1611            debug!("ignoring redundant forced key update");
1612            return;
1613        }
1614        self.update_keys(None, false);
1615    }
1616
1617    // Compatibility wrapper for quinn < 0.11.7. Remove for 0.12.
1618    #[doc(hidden)]
1619    #[deprecated]
1620    pub fn initiate_key_update(&mut self) {
1621        self.force_key_update();
1622    }
1623
1624    /// Get a session reference
1625    pub fn crypto_session(&self) -> &dyn crypto::Session {
1626        &*self.crypto
1627    }
1628
1629    /// Whether the connection is in the process of being established
1630    ///
1631    /// If this returns `false`, the connection may be either established or closed, signaled by the
1632    /// emission of a `Connected` or `ConnectionLost` message respectively.
1633    pub fn is_handshaking(&self) -> bool {
1634        self.state.is_handshake()
1635    }
1636
1637    /// Whether the connection is closed
1638    ///
1639    /// Closed connections cannot transport any further data. A connection becomes closed when
1640    /// either peer application intentionally closes it, or when either transport layer detects an
1641    /// error such as a time-out or certificate validation failure.
1642    ///
1643    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
1644    pub fn is_closed(&self) -> bool {
1645        self.state.is_closed()
1646    }
1647
1648    /// Whether there is no longer any need to keep the connection around
1649    ///
1650    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
1651    /// packets from the peer. All drained connections have been closed.
1652    pub fn is_drained(&self) -> bool {
1653        self.state.is_drained()
1654    }
1655
1656    /// For clients, if the peer accepted the 0-RTT data packets
1657    ///
1658    /// The value is meaningless until after the handshake completes.
1659    pub fn accepted_0rtt(&self) -> bool {
1660        self.accepted_0rtt
1661    }
1662
1663    /// Whether 0-RTT is/was possible during the handshake
1664    pub fn has_0rtt(&self) -> bool {
1665        self.zero_rtt_enabled
1666    }
1667
1668    /// Whether there are any pending retransmits
1669    pub fn has_pending_retransmits(&self) -> bool {
1670        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1671    }
1672
1673    /// Look up whether we're the client or server of this Connection
1674    pub fn side(&self) -> Side {
1675        self.side.side()
1676    }
1677
1678    /// The latest socket address for this connection's peer
1679    pub fn remote_address(&self) -> SocketAddr {
1680        self.path.remote
1681    }
1682
1683    /// The local IP address which was used when the peer established
1684    /// the connection
1685    ///
1686    /// This can be different from the address the endpoint is bound to, in case
1687    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
1688    ///
1689    /// This will return `None` for clients, or when no `local_ip` was passed to
1690    /// [`Endpoint::handle()`](crate::Endpoint::handle) for the datagrams establishing this
1691    /// connection.
1692    pub fn local_ip(&self) -> Option<IpAddr> {
1693        self.local_ip
1694    }
1695
1696    /// Current best estimate of this connection's latency (round-trip-time)
1697    pub fn rtt(&self) -> Duration {
1698        self.path.rtt.get()
1699    }
1700
1701    /// Current state of this connection's congestion controller, for debugging purposes
1702    pub fn congestion_state(&self) -> &dyn Controller {
1703        self.path.congestion.as_ref()
1704    }
1705
1706    /// Resets path-specific settings.
1707    ///
1708    /// This will force-reset several subsystems related to a specific network path.
1709    /// Currently this is the congestion controller, round-trip estimator, and the MTU
1710    /// discovery.
1711    ///
1712    /// This is useful when it is known the underlying network path has changed and the old
1713    /// state of these subsystems is no longer valid or optimal. In this case it might be
1714    /// faster or reduce loss to settle on optimal values by restarting from the initial
1715    /// configuration in the [`TransportConfig`].
1716    pub fn path_changed(&mut self, now: Instant) {
1717        self.path.reset(now, &self.config);
1718    }
1719
1720    /// Modify the number of remotely initiated streams that may be concurrently open
1721    ///
1722    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
1723    /// `count`s increase both minimum and worst-case memory consumption.
1724    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1725        self.streams.set_max_concurrent(dir, count);
1726        // If the limit was reduced, then a flow control update previously deemed insignificant may
1727        // now be significant.
1728        let pending = &mut self.spaces[SpaceId::Data].pending;
1729        self.streams.queue_max_stream_id(pending);
1730    }
1731
1732    /// Current number of remotely initiated streams that may be concurrently open
1733    ///
1734    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
1735    /// it will not change immediately, even if fewer streams are open. Instead, it will
1736    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
1737    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1738        self.streams.max_concurrent(dir)
1739    }
1740
1741    /// See [`TransportConfig::receive_window()`]
1742    pub fn set_receive_window(&mut self, receive_window: VarInt) {
1743        if self.streams.set_receive_window(receive_window) {
1744            self.spaces[SpaceId::Data].pending.max_data = true;
1745        }
1746    }
1747
1748    fn on_ack_received(
1749        &mut self,
1750        now: Instant,
1751        space: SpaceId,
1752        ack: frame::Ack,
1753    ) -> Result<(), TransportError> {
1754        if ack.largest >= self.spaces[space].next_packet_number {
1755            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1756        }
1757        let new_largest = {
1758            let space = &mut self.spaces[space];
1759            if space
1760                .largest_acked_packet
1761                .map_or(true, |pn| ack.largest > pn)
1762            {
1763                space.largest_acked_packet = Some(ack.largest);
1764                if let Some(info) = space.sent_packets.get(&ack.largest) {
1765                    // This should always succeed, but a misbehaving peer might ACK a packet we
1766                    // haven't sent. At worst, that will result in us spuriously reducing the
1767                    // congestion window.
1768                    space.largest_acked_packet_sent = info.time_sent;
1769                }
1770                true
1771            } else {
1772                false
1773            }
1774        };
1775
1776        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
1777        let mut newly_acked = ArrayRangeSet::new();
1778        for range in ack.iter() {
1779            self.packet_number_filter.check_ack(space, range.clone())?;
1780            for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1781                newly_acked.insert_one(pn);
1782            }
1783        }
1784
1785        if newly_acked.is_empty() {
1786            return Ok(());
1787        }
1788
1789        let mut ack_eliciting_acked = false;
1790        for packet in newly_acked.elts() {
1791            if let Some(info) = self.spaces[space].take(packet) {
1792                if let Some(acked) = info.largest_acked {
1793                    // Assume ACKs for all packets below the largest acknowledged in `packet` have
1794                    // been received. This can cause the peer to spuriously retransmit if some of
1795                    // our earlier ACKs were lost, but allows for simpler state tracking. See
1796                    // discussion at
1797                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
1798                    self.spaces[space].pending_acks.subtract_below(acked);
1799                }
1800                ack_eliciting_acked |= info.ack_eliciting;
1801
1802                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
1803                let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1804                if mtu_updated {
1805                    self.path
1806                        .congestion
1807                        .on_mtu_update(self.path.mtud.current_mtu());
1808                }
1809
1810                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
1811                self.ack_frequency.on_acked(packet);
1812
1813                self.on_packet_acked(now, packet, info);
1814            }
1815        }
1816
1817        self.path.congestion.on_end_acks(
1818            now,
1819            self.path.in_flight.bytes,
1820            self.app_limited,
1821            self.spaces[space].largest_acked_packet,
1822        );
1823
1824        if new_largest && ack_eliciting_acked {
1825            let ack_delay = if space != SpaceId::Data {
1826                Duration::from_micros(0)
1827            } else {
1828                cmp::min(
1829                    self.ack_frequency.peer_max_ack_delay,
1830                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1831                )
1832            };
1833            let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1834            self.path.rtt.update(ack_delay, rtt);
1835            if self.path.first_packet_after_rtt_sample.is_none() {
1836                self.path.first_packet_after_rtt_sample =
1837                    Some((space, self.spaces[space].next_packet_number));
1838            }
1839        }
1840
1841        // Must be called before crypto/pto_count are clobbered
1842        self.detect_lost_packets(now, space, true);
1843
1844        if self.peer_completed_address_validation() {
1845            self.pto_count = 0;
1846        }
1847
1848        // Explicit congestion notification
1849        if self.path.sending_ecn {
1850            if let Some(ecn) = ack.ecn {
1851                // We only examine ECN counters from ACKs that we are certain we received in transmit
1852                // order, allowing us to compute an increase in ECN counts to compare against the number
1853                // of newly acked packets that remains well-defined in the presence of arbitrary packet
1854                // reordering.
1855                if new_largest {
1856                    let sent = self.spaces[space].largest_acked_packet_sent;
1857                    self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1858                }
1859            } else {
1860                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
1861                debug!("ECN not acknowledged by peer");
1862                self.path.sending_ecn = false;
1863            }
1864        }
1865
1866        self.set_loss_detection_timer(now);
1867        Ok(())
1868    }
1869
1870    /// Process a new ECN block from an in-order ACK
1871    fn process_ecn(
1872        &mut self,
1873        now: Instant,
1874        space: SpaceId,
1875        newly_acked: u64,
1876        ecn: frame::EcnCounts,
1877        largest_sent_time: Instant,
1878    ) {
1879        match self.spaces[space].detect_ecn(newly_acked, ecn) {
1880            Err(e) => {
1881                debug!("halting ECN due to verification failure: {}", e);
1882                self.path.sending_ecn = false;
1883                // Wipe out the existing value because it might be garbage and could interfere with
1884                // future attempts to use ECN on new paths.
1885                self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
1886            }
1887            Ok(false) => {}
1888            Ok(true) => {
1889                self.stats.path.congestion_events += 1;
1890                self.path
1891                    .congestion
1892                    .on_congestion_event(now, largest_sent_time, false, 0);
1893            }
1894        }
1895    }
1896
1897    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
1898    // high-latency handshakes
1899    fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
1900        self.remove_in_flight(pn, &info);
1901        if info.ack_eliciting && self.path.challenge.is_none() {
1902            // Only pass ACKs to the congestion controller if we are not validating the current
1903            // path, so as to ignore any ACKs from older paths still coming in.
1904            self.path.congestion.on_ack(
1905                now,
1906                info.time_sent,
1907                info.size.into(),
1908                self.app_limited,
1909                &self.path.rtt,
1910            );
1911        }
1912
1913        // Update state for confirmed delivery of frames
1914        if let Some(retransmits) = info.retransmits.get() {
1915            for (id, _) in retransmits.reset_stream.iter() {
1916                self.streams.reset_acked(*id);
1917            }
1918        }
1919
1920        for frame in info.stream_frames {
1921            self.streams.received_ack_of(frame);
1922        }
1923    }
1924
1925    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
1926        let start = if self.zero_rtt_crypto.is_some() {
1927            now
1928        } else {
1929            self.prev_crypto
1930                .as_ref()
1931                .expect("no previous keys")
1932                .end_packet
1933                .as_ref()
1934                .expect("update not acknowledged yet")
1935                .1
1936        };
1937        self.timers
1938            .set(Timer::KeyDiscard, start + self.pto(space) * 3);
1939    }
1940
1941    fn on_loss_detection_timeout(&mut self, now: Instant) {
1942        if let Some((_, pn_space)) = self.loss_time_and_space() {
1943            // Time threshold loss Detection
1944            self.detect_lost_packets(now, pn_space, false);
1945            self.set_loss_detection_timer(now);
1946            return;
1947        }
1948
1949        let (_, space) = match self.pto_time_and_space(now) {
1950            Some(x) => x,
1951            None => {
1952                error!("PTO expired while unset");
1953                return;
1954            }
1955        };
1956        trace!(
1957            in_flight = self.path.in_flight.bytes,
1958            count = self.pto_count,
1959            ?space,
1960            "PTO fired"
1961        );
1962
1963        let count = match self.path.in_flight.ack_eliciting {
1964            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
1965            // deadlock preventions
1966            0 => {
1967                debug_assert!(!self.peer_completed_address_validation());
1968                1
1969            }
1970            // Conventional loss probe
1971            _ => 2,
1972        };
1973        self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
1974        self.pto_count = self.pto_count.saturating_add(1);
1975        self.set_loss_detection_timer(now);
1976    }
1977
1978    fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
1979        let mut lost_packets = Vec::<u64>::new();
1980        let mut lost_mtu_probe = None;
1981        let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
1982        let rtt = self.path.rtt.conservative();
1983        let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
1984
1985        // Packets sent before this time are deemed lost.
1986        let lost_send_time = now.checked_sub(loss_delay).unwrap();
1987        let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
1988        let packet_threshold = self.config.packet_threshold as u64;
1989        let mut size_of_lost_packets = 0u64;
1990
1991        // InPersistentCongestion: Determine if all packets in the time period before the newest
1992        // lost packet, including the edges, are marked lost. PTO computation must always
1993        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
1994        let congestion_period =
1995            self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
1996        let mut persistent_congestion_start: Option<Instant> = None;
1997        let mut prev_packet = None;
1998        let mut in_persistent_congestion = false;
1999
2000        let space = &mut self.spaces[pn_space];
2001        space.loss_time = None;
2002
2003        for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
2004            if prev_packet != Some(packet.wrapping_sub(1)) {
2005                // An intervening packet was acknowledged
2006                persistent_congestion_start = None;
2007            }
2008
2009            if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
2010            {
2011                if Some(packet) == in_flight_mtu_probe {
2012                    // Lost MTU probes are not included in `lost_packets`, because they should not
2013                    // trigger a congestion control response
2014                    lost_mtu_probe = in_flight_mtu_probe;
2015                } else {
2016                    lost_packets.push(packet);
2017                    size_of_lost_packets += info.size as u64;
2018                    if info.ack_eliciting && due_to_ack {
2019                        match persistent_congestion_start {
2020                            // Two ACK-eliciting packets lost more than congestion_period apart, with no
2021                            // ACKed packets in between
2022                            Some(start) if info.time_sent - start > congestion_period => {
2023                                in_persistent_congestion = true;
2024                            }
2025                            // Persistent congestion must start after the first RTT sample
2026                            None if self
2027                                .path
2028                                .first_packet_after_rtt_sample
2029                                .is_some_and(|x| x < (pn_space, packet)) =>
2030                            {
2031                                persistent_congestion_start = Some(info.time_sent);
2032                            }
2033                            _ => {}
2034                        }
2035                    }
2036                }
2037            } else {
2038                let next_loss_time = info.time_sent + loss_delay;
2039                space.loss_time = Some(
2040                    space
2041                        .loss_time
2042                        .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
2043                );
2044                persistent_congestion_start = None;
2045            }
2046
2047            prev_packet = Some(packet);
2048        }
2049
2050        // OnPacketsLost
2051        if let Some(largest_lost) = lost_packets.last().cloned() {
2052            let old_bytes_in_flight = self.path.in_flight.bytes;
2053            let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
2054            self.lost_packets += lost_packets.len() as u64;
2055            self.stats.path.lost_packets += lost_packets.len() as u64;
2056            self.stats.path.lost_bytes += size_of_lost_packets;
2057            trace!(
2058                "packets lost: {:?}, bytes lost: {}",
2059                lost_packets, size_of_lost_packets
2060            );
2061
2062            for &packet in &lost_packets {
2063                let info = self.spaces[pn_space].take(packet).unwrap(); // safe: lost_packets is populated just above
2064                self.remove_in_flight(packet, &info);
2065                for frame in info.stream_frames {
2066                    self.streams.retransmit(frame);
2067                }
2068                self.spaces[pn_space].pending |= info.retransmits;
2069                self.path.mtud.on_non_probe_lost(packet, info.size);
2070            }
2071
2072            if self.path.mtud.black_hole_detected(now) {
2073                self.stats.path.black_holes_detected += 1;
2074                self.path
2075                    .congestion
2076                    .on_mtu_update(self.path.mtud.current_mtu());
2077                if let Some(max_datagram_size) = self.datagrams().max_size() {
2078                    self.datagrams.drop_oversized(max_datagram_size);
2079                }
2080            }
2081
2082            // Don't apply congestion penalty for lost ack-only packets
2083            let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
2084
2085            if lost_ack_eliciting {
2086                self.stats.path.congestion_events += 1;
2087                self.path.congestion.on_congestion_event(
2088                    now,
2089                    largest_lost_sent,
2090                    in_persistent_congestion,
2091                    size_of_lost_packets,
2092                );
2093            }
2094        }
2095
2096        // Handle a lost MTU probe
2097        if let Some(packet) = lost_mtu_probe {
2098            let info = self.spaces[SpaceId::Data].take(packet).unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and therefore must not have been removed yet
2099            self.remove_in_flight(packet, &info);
2100            self.path.mtud.on_probe_lost();
2101            self.stats.path.lost_plpmtud_probes += 1;
2102        }
2103    }
2104
2105    fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
2106        SpaceId::iter()
2107            .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
2108            .min_by_key(|&(time, _)| time)
2109    }
2110
2111    fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
2112        let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
2113        let mut duration = self.path.rtt.pto_base() * backoff;
2114
2115        if self.path.in_flight.ack_eliciting == 0 {
2116            debug_assert!(!self.peer_completed_address_validation());
2117            let space = match self.highest_space {
2118                SpaceId::Handshake => SpaceId::Handshake,
2119                _ => SpaceId::Initial,
2120            };
2121            return Some((now + duration, space));
2122        }
2123
2124        let mut result = None;
2125        for space in SpaceId::iter() {
2126            if self.spaces[space].in_flight == 0 {
2127                continue;
2128            }
2129            if space == SpaceId::Data {
2130                // Skip ApplicationData until handshake completes.
2131                if self.is_handshaking() {
2132                    return result;
2133                }
2134                // Include max_ack_delay and backoff for ApplicationData.
2135                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2136            }
2137            let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
2138                Some(time) => time,
2139                None => continue,
2140            };
2141            let pto = last_ack_eliciting + duration;
2142            if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
2143                result = Some((pto, space));
2144            }
2145        }
2146        result
2147    }
2148
2149    fn peer_completed_address_validation(&self) -> bool {
2150        if self.side.is_server() || self.state.is_closed() {
2151            return true;
2152        }
2153        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
2154        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
2155        self.spaces[SpaceId::Handshake]
2156            .largest_acked_packet
2157            .is_some()
2158            || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
2159            || (self.spaces[SpaceId::Data].crypto.is_some()
2160                && self.spaces[SpaceId::Handshake].crypto.is_none())
2161    }
2162
2163    fn set_loss_detection_timer(&mut self, now: Instant) {
2164        if self.state.is_closed() {
2165            // No loss detection takes place on closed connections, and `close_common` already
2166            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
2167            // reordered packet being handled by state-insensitive code.
2168            return;
2169        }
2170
2171        if let Some((loss_time, _)) = self.loss_time_and_space() {
2172            // Time threshold loss detection.
2173            self.timers.set(Timer::LossDetection, loss_time);
2174            return;
2175        }
2176
2177        if self.path.anti_amplification_blocked(1) {
2178            // We wouldn't be able to send anything, so don't bother.
2179            self.timers.stop(Timer::LossDetection);
2180            return;
2181        }
2182
2183        if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
2184            // There is nothing to detect lost, so no timer is set. However, the client needs to arm
2185            // the timer if the server might be blocked by the anti-amplification limit.
2186            self.timers.stop(Timer::LossDetection);
2187            return;
2188        }
2189
2190        // Determine which PN space to arm PTO for.
2191        // Calculate PTO duration
2192        if let Some((timeout, _)) = self.pto_time_and_space(now) {
2193            self.timers.set(Timer::LossDetection, timeout);
2194        } else {
2195            self.timers.stop(Timer::LossDetection);
2196        }
2197    }
2198
2199    /// Probe Timeout
2200    fn pto(&self, space: SpaceId) -> Duration {
2201        let max_ack_delay = match space {
2202            SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
2203            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
2204        };
2205        self.path.rtt.pto_base() + max_ack_delay
2206    }
2207
2208    fn on_packet_authenticated(
2209        &mut self,
2210        now: Instant,
2211        space_id: SpaceId,
2212        ecn: Option<EcnCodepoint>,
2213        packet: Option<u64>,
2214        spin: bool,
2215        is_1rtt: bool,
2216    ) {
2217        self.total_authed_packets += 1;
2218        self.reset_keep_alive(now);
2219        self.reset_idle_timeout(now, space_id);
2220        self.permit_idle_reset = true;
2221        self.receiving_ecn |= ecn.is_some();
2222        if let Some(x) = ecn {
2223            let space = &mut self.spaces[space_id];
2224            space.ecn_counters += x;
2225
2226            if x.is_ce() {
2227                space.pending_acks.set_immediate_ack_required();
2228            }
2229        }
2230
2231        let packet = match packet {
2232            Some(x) => x,
2233            None => return,
2234        };
2235        if self.side.is_server() {
2236            if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
2237                // A server stops sending and processing Initial packets when it receives its first Handshake packet.
2238                self.discard_space(now, SpaceId::Initial);
2239            }
2240            if self.zero_rtt_crypto.is_some() && is_1rtt {
2241                // Discard 0-RTT keys soon after receiving a 1-RTT packet
2242                self.set_key_discard_timer(now, space_id)
2243            }
2244        }
2245        let space = &mut self.spaces[space_id];
2246        space.pending_acks.insert_one(packet, now);
2247        if packet >= space.rx_packet {
2248            space.rx_packet = packet;
2249            // Update outgoing spin bit, inverting iff we're the client
2250            self.spin = self.side.is_client() ^ spin;
2251        }
2252    }
2253
2254    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
2255        let timeout = match self.idle_timeout {
2256            None => return,
2257            Some(dur) => dur,
2258        };
2259        if self.state.is_closed() {
2260            self.timers.stop(Timer::Idle);
2261            return;
2262        }
2263        let dt = cmp::max(timeout, 3 * self.pto(space));
2264        self.timers.set(Timer::Idle, now + dt);
2265    }
2266
2267    fn reset_keep_alive(&mut self, now: Instant) {
2268        let interval = match self.config.keep_alive_interval {
2269            Some(x) if self.state.is_established() => x,
2270            _ => return,
2271        };
2272        self.timers.set(Timer::KeepAlive, now + interval);
2273    }
2274
2275    fn reset_cid_retirement(&mut self) {
2276        if let Some(t) = self.local_cid_state.next_timeout() {
2277            self.timers.set(Timer::PushNewCid, t);
2278        }
2279    }
2280
2281    /// Handle the already-decrypted first packet from the client
2282    ///
2283    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
2284    /// efficient.
2285    pub(crate) fn handle_first_packet(
2286        &mut self,
2287        now: Instant,
2288        remote: SocketAddr,
2289        ecn: Option<EcnCodepoint>,
2290        packet_number: u64,
2291        packet: InitialPacket,
2292        remaining: Option<BytesMut>,
2293    ) -> Result<(), ConnectionError> {
2294        let span = trace_span!("first recv");
2295        let _guard = span.enter();
2296        debug_assert!(self.side.is_server());
2297        let len = packet.header_data.len() + packet.payload.len();
2298        self.path.total_recvd = len as u64;
2299
2300        match self.state {
2301            State::Handshake(ref mut state) => {
2302                state.expected_token = packet.header.token.clone();
2303            }
2304            _ => unreachable!("first packet must be delivered in Handshake state"),
2305        }
2306
2307        self.on_packet_authenticated(
2308            now,
2309            SpaceId::Initial,
2310            ecn,
2311            Some(packet_number),
2312            false,
2313            false,
2314        );
2315
2316        self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2317        if let Some(data) = remaining {
2318            self.handle_coalesced(now, remote, ecn, data);
2319        }
2320
2321        #[cfg(feature = "__qlog")]
2322        self.emit_qlog_recovery_metrics(now);
2323
2324        Ok(())
2325    }
2326
2327    fn init_0rtt(&mut self) {
2328        let (header, packet) = match self.crypto.early_crypto() {
2329            Some(x) => x,
2330            None => return,
2331        };
2332        if self.side.is_client() {
2333            match self.crypto.transport_parameters() {
2334                Ok(params) => {
2335                    let params = params
2336                        .expect("crypto layer didn't supply transport parameters with ticket");
2337                    // Certain values must not be cached
2338                    let params = TransportParameters {
2339                        initial_src_cid: None,
2340                        original_dst_cid: None,
2341                        preferred_address: None,
2342                        retry_src_cid: None,
2343                        stateless_reset_token: None,
2344                        min_ack_delay: None,
2345                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2346                        max_ack_delay: TransportParameters::default().max_ack_delay,
2347                        ..params
2348                    };
2349                    self.set_peer_params(params);
2350                }
2351                Err(e) => {
2352                    error!("session ticket has malformed transport parameters: {}", e);
2353                    return;
2354                }
2355            }
2356        }
2357        trace!("0-RTT enabled");
2358        self.zero_rtt_enabled = true;
2359        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2360    }
2361
2362    fn read_crypto(
2363        &mut self,
2364        space: SpaceId,
2365        crypto: &frame::Crypto,
2366        payload_len: usize,
2367    ) -> Result<(), TransportError> {
2368        let expected = if !self.state.is_handshake() {
2369            SpaceId::Data
2370        } else if self.highest_space == SpaceId::Initial {
2371            SpaceId::Initial
2372        } else {
2373            // On the server, self.highest_space can be Data after receiving the client's first
2374            // flight, but we expect Handshake CRYPTO until the handshake is complete.
2375            SpaceId::Handshake
2376        };
2377        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
2378        // packets are illegal, and we don't process 1-RTT packets until the handshake is
2379        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
2380        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2381
2382        let end = crypto.offset + crypto.data.len() as u64;
2383        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2384            warn!(
2385                "received new {:?} CRYPTO data when expecting {:?}",
2386                space, expected
2387            );
2388            return Err(TransportError::PROTOCOL_VIOLATION(
2389                "new data at unexpected encryption level",
2390            ));
2391        }
2392
2393        let space = &mut self.spaces[space];
2394        let max = end.saturating_sub(space.crypto_stream.bytes_read());
2395        if max > self.config.crypto_buffer_size as u64 {
2396            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2397        }
2398
2399        space
2400            .crypto_stream
2401            .insert(crypto.offset, crypto.data.clone(), payload_len);
2402        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2403            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2404            if self.crypto.read_handshake(&chunk.bytes)? {
2405                self.events.push_back(Event::HandshakeDataReady);
2406            }
2407        }
2408
2409        Ok(())
2410    }
2411
2412    fn write_crypto(&mut self) {
2413        loop {
2414            let space = self.highest_space;
2415            let mut outgoing = Vec::new();
2416            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2417                match space {
2418                    SpaceId::Initial => {
2419                        self.upgrade_crypto(SpaceId::Handshake, crypto);
2420                    }
2421                    SpaceId::Handshake => {
2422                        self.upgrade_crypto(SpaceId::Data, crypto);
2423                    }
2424                    _ => unreachable!("got updated secrets during 1-RTT"),
2425                }
2426            }
2427            if outgoing.is_empty() {
2428                if space == self.highest_space {
2429                    break;
2430                } else {
2431                    // Keys updated, check for more data to send
2432                    continue;
2433                }
2434            }
2435            let offset = self.spaces[space].crypto_offset;
2436            let outgoing = Bytes::from(outgoing);
2437            if let State::Handshake(ref mut state) = self.state {
2438                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2439                    state.client_hello = Some(outgoing.clone());
2440                }
2441            }
2442            self.spaces[space].crypto_offset += outgoing.len() as u64;
2443            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2444            self.spaces[space].pending.crypto.push_back(frame::Crypto {
2445                offset,
2446                data: outgoing,
2447            });
2448        }
2449    }
2450
2451    /// Switch to stronger cryptography during handshake
2452    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2453        debug_assert!(
2454            self.spaces[space].crypto.is_none(),
2455            "already reached packet space {space:?}"
2456        );
2457        trace!("{:?} keys ready", space);
2458        if space == SpaceId::Data {
2459            // Precompute the first key update
2460            self.next_crypto = Some(
2461                self.crypto
2462                    .next_1rtt_keys()
2463                    .expect("handshake should be complete"),
2464            );
2465        }
2466
2467        self.spaces[space].crypto = Some(crypto);
2468        debug_assert!(space as usize > self.highest_space as usize);
2469        self.highest_space = space;
2470        if space == SpaceId::Data && self.side.is_client() {
2471            // Discard 0-RTT keys because 1-RTT keys are available.
2472            self.zero_rtt_crypto = None;
2473        }
2474    }
2475
2476    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2477        debug_assert!(space_id != SpaceId::Data);
2478        trace!("discarding {:?} keys", space_id);
2479        if space_id == SpaceId::Initial {
2480            // No longer needed
2481            if let ConnectionSide::Client { token, .. } = &mut self.side {
2482                *token = Bytes::new();
2483            }
2484        }
2485        let space = &mut self.spaces[space_id];
2486        space.crypto = None;
2487        space.time_of_last_ack_eliciting_packet = None;
2488        space.loss_time = None;
2489        space.in_flight = 0;
2490        let sent_packets = mem::take(&mut space.sent_packets);
2491        for (pn, packet) in sent_packets.into_iter() {
2492            self.remove_in_flight(pn, &packet);
2493        }
2494        self.set_loss_detection_timer(now)
2495    }
2496
2497    fn handle_coalesced(
2498        &mut self,
2499        now: Instant,
2500        remote: SocketAddr,
2501        ecn: Option<EcnCodepoint>,
2502        data: BytesMut,
2503    ) {
2504        self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2505        let mut remaining = Some(data);
2506        while let Some(data) = remaining {
2507            match PartialDecode::new(
2508                data,
2509                &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2510                &[self.version],
2511                self.endpoint_config.grease_quic_bit,
2512            ) {
2513                Ok((partial_decode, rest)) => {
2514                    remaining = rest;
2515                    self.handle_decode(now, remote, ecn, partial_decode);
2516                }
2517                Err(e) => {
2518                    trace!("malformed header: {}", e);
2519                    return;
2520                }
2521            }
2522        }
2523    }
2524
2525    fn handle_decode(
2526        &mut self,
2527        now: Instant,
2528        remote: SocketAddr,
2529        ecn: Option<EcnCodepoint>,
2530        partial_decode: PartialDecode,
2531    ) {
2532        if let Some(decoded) = packet_crypto::unprotect_header(
2533            partial_decode,
2534            &self.spaces,
2535            self.zero_rtt_crypto.as_ref(),
2536            self.peer_params.stateless_reset_token,
2537        ) {
2538            self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2539        }
2540    }
2541
2542    fn handle_packet(
2543        &mut self,
2544        now: Instant,
2545        remote: SocketAddr,
2546        ecn: Option<EcnCodepoint>,
2547        packet: Option<Packet>,
2548        stateless_reset: bool,
2549    ) {
2550        self.stats.udp_rx.ios += 1;
2551        if let Some(ref packet) = packet {
2552            trace!(
2553                "got {:?} packet ({} bytes) from {} using id {}",
2554                packet.header.space(),
2555                packet.payload.len() + packet.header_data.len(),
2556                remote,
2557                packet.header.dst_cid(),
2558            );
2559        }
2560
2561        if self.is_handshaking() && remote != self.path.remote {
2562            debug!("discarding packet with unexpected remote during handshake");
2563            return;
2564        }
2565
2566        let was_closed = self.state.is_closed();
2567        let was_drained = self.state.is_drained();
2568
2569        let decrypted = match packet {
2570            None => Err(None),
2571            Some(mut packet) => self
2572                .decrypt_packet(now, &mut packet)
2573                .map(move |number| (packet, number)),
2574        };
2575        let result = match decrypted {
2576            _ if stateless_reset => {
2577                debug!("got stateless reset");
2578                Err(ConnectionError::Reset)
2579            }
2580            Err(Some(e)) => {
2581                warn!("illegal packet: {}", e);
2582                Err(e.into())
2583            }
2584            Err(None) => {
2585                debug!("failed to authenticate packet");
2586                self.authentication_failures += 1;
2587                let integrity_limit = self.spaces[self.highest_space]
2588                    .crypto
2589                    .as_ref()
2590                    .unwrap()
2591                    .packet
2592                    .local
2593                    .integrity_limit();
2594                if self.authentication_failures > integrity_limit {
2595                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2596                } else {
2597                    return;
2598                }
2599            }
2600            Ok((packet, number)) => {
2601                let span = match number {
2602                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2603                    None => trace_span!("recv", space = ?packet.header.space()),
2604                };
2605                let _guard = span.enter();
2606
2607                let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2608                if number.is_some_and(is_duplicate) {
2609                    debug!("discarding possible duplicate packet");
2610                    return;
2611                } else if self.state.is_handshake() && packet.header.is_short() {
2612                    // TODO: SHOULD buffer these to improve reordering tolerance.
2613                    trace!("dropping short packet during handshake");
2614                    return;
2615                } else {
2616                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2617                        if let State::Handshake(ref hs) = self.state {
2618                            if self.side.is_server() && token != &hs.expected_token {
2619                                // Clients must send the same retry token in every Initial. Initial
2620                                // packets can be spoofed, so we discard rather than killing the
2621                                // connection.
2622                                warn!("discarding Initial with invalid retry token");
2623                                return;
2624                            }
2625                        }
2626                    }
2627
2628                    if !self.state.is_closed() {
2629                        let spin = match packet.header {
2630                            Header::Short { spin, .. } => spin,
2631                            _ => false,
2632                        };
2633                        self.on_packet_authenticated(
2634                            now,
2635                            packet.header.space(),
2636                            ecn,
2637                            number,
2638                            spin,
2639                            packet.header.is_1rtt(),
2640                        );
2641                    }
2642
2643                    self.process_decrypted_packet(now, remote, number, packet)
2644                }
2645            }
2646        };
2647
2648        // State transitions for error cases
2649        if let Err(conn_err) = result {
2650            self.error = Some(conn_err.clone());
2651            self.state = match conn_err {
2652                ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2653                ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2654                ConnectionError::Reset
2655                | ConnectionError::TransportError(TransportError {
2656                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
2657                    ..
2658                }) => State::Drained,
2659                ConnectionError::TimedOut => {
2660                    unreachable!("timeouts aren't generated by packet processing");
2661                }
2662                ConnectionError::TransportError(err) => {
2663                    debug!("closing connection due to transport error: {}", err);
2664                    State::closed(err)
2665                }
2666                ConnectionError::VersionMismatch => State::Draining,
2667                ConnectionError::LocallyClosed => {
2668                    unreachable!("LocallyClosed isn't generated by packet processing");
2669                }
2670                ConnectionError::CidsExhausted => {
2671                    unreachable!("CidsExhausted isn't generated by packet processing");
2672                }
2673            };
2674        }
2675
2676        if !was_closed && self.state.is_closed() {
2677            self.close_common();
2678            if !self.state.is_drained() {
2679                self.set_close_timer(now);
2680            }
2681        }
2682        if !was_drained && self.state.is_drained() {
2683            self.endpoint_events.push_back(EndpointEventInner::Drained);
2684            // Close timer may have been started previously, e.g. if we sent a close and got a
2685            // stateless reset in response
2686            self.timers.stop(Timer::Close);
2687        }
2688
2689        // Transmit CONNECTION_CLOSE if necessary
2690        if let State::Closed(_) = self.state {
2691            self.close = remote == self.path.remote;
2692        }
2693    }
2694
2695    fn process_decrypted_packet(
2696        &mut self,
2697        now: Instant,
2698        remote: SocketAddr,
2699        number: Option<u64>,
2700        packet: Packet,
2701    ) -> Result<(), ConnectionError> {
2702        let state = match self.state {
2703            State::Established => {
2704                match packet.header.space() {
2705                    SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2706                    _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2707                    _ => {
2708                        trace!("discarding unexpected pre-handshake packet");
2709                    }
2710                }
2711                return Ok(());
2712            }
2713            State::Closed(_) => {
2714                for result in frame::Iter::new(packet.payload.freeze())? {
2715                    let frame = match result {
2716                        Ok(frame) => frame,
2717                        Err(err) => {
2718                            debug!("frame decoding error: {err:?}");
2719                            continue;
2720                        }
2721                    };
2722
2723                    if let Frame::Padding = frame {
2724                        continue;
2725                    };
2726
2727                    self.stats.frame_rx.record(&frame);
2728
2729                    if let Frame::Close(_) = frame {
2730                        trace!("draining");
2731                        self.state = State::Draining;
2732                        break;
2733                    }
2734                }
2735                return Ok(());
2736            }
2737            State::Draining | State::Drained => return Ok(()),
2738            State::Handshake(ref mut state) => state,
2739        };
2740
2741        match packet.header {
2742            Header::Retry {
2743                src_cid: rem_cid, ..
2744            } => {
2745                if self.side.is_server() {
2746                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2747                }
2748
2749                if self.total_authed_packets > 1
2750                            || packet.payload.len() <= 16 // token + 16 byte tag
2751                            || !self.crypto.is_valid_retry(
2752                                &self.rem_cids.active(),
2753                                &packet.header_data,
2754                                &packet.payload,
2755                            )
2756                {
2757                    trace!("discarding invalid Retry");
2758                    // - After the client has received and processed an Initial or Retry
2759                    //   packet from the server, it MUST discard any subsequent Retry
2760                    //   packets that it receives.
2761                    // - A client MUST discard a Retry packet with a zero-length Retry Token
2762                    //   field.
2763                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
2764                    //   that cannot be validated
2765                    return Ok(());
2766                }
2767
2768                trace!("retrying with CID {}", rem_cid);
2769                let client_hello = state.client_hello.take().unwrap();
2770                self.retry_src_cid = Some(rem_cid);
2771                self.rem_cids.update_initial_cid(rem_cid);
2772                self.rem_handshake_cid = rem_cid;
2773
2774                let space = &mut self.spaces[SpaceId::Initial];
2775                if let Some(info) = space.take(0) {
2776                    self.on_packet_acked(now, 0, info);
2777                };
2778
2779                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after any retransmitted Initials
2780                self.spaces[SpaceId::Initial] = PacketSpace {
2781                    crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2782                    next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2783                    crypto_offset: client_hello.len() as u64,
2784                    ..PacketSpace::new(now)
2785                };
2786                self.spaces[SpaceId::Initial]
2787                    .pending
2788                    .crypto
2789                    .push_back(frame::Crypto {
2790                        offset: 0,
2791                        data: client_hello,
2792                    });
2793
2794                // Retransmit all 0-RTT data
2795                let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2796                for (pn, info) in zero_rtt {
2797                    self.remove_in_flight(pn, &info);
2798                    self.spaces[SpaceId::Data].pending |= info.retransmits;
2799                }
2800                self.streams.retransmit_all_for_0rtt();
2801
2802                let token_len = packet.payload.len() - 16;
2803                let ConnectionSide::Client { ref mut token, .. } = self.side else {
2804                    unreachable!("we already short-circuited if we're server");
2805                };
2806                *token = packet.payload.freeze().split_to(token_len);
2807                self.state = State::Handshake(state::Handshake {
2808                    expected_token: Bytes::new(),
2809                    rem_cid_set: false,
2810                    client_hello: None,
2811                });
2812                Ok(())
2813            }
2814            Header::Long {
2815                ty: LongType::Handshake,
2816                src_cid: rem_cid,
2817                ..
2818            } => {
2819                if rem_cid != self.rem_handshake_cid {
2820                    debug!(
2821                        "discarding packet with mismatched remote CID: {} != {}",
2822                        self.rem_handshake_cid, rem_cid
2823                    );
2824                    return Ok(());
2825                }
2826                self.on_path_validated();
2827
2828                self.process_early_payload(now, packet)?;
2829                if self.state.is_closed() {
2830                    return Ok(());
2831                }
2832
2833                if self.crypto.is_handshaking() {
2834                    trace!("handshake ongoing");
2835                    return Ok(());
2836                }
2837
2838                if self.side.is_client() {
2839                    // Client-only because server params were set from the client's Initial
2840                    let params =
2841                        self.crypto
2842                            .transport_parameters()?
2843                            .ok_or_else(|| TransportError {
2844                                code: TransportErrorCode::crypto(0x6d),
2845                                frame: None,
2846                                reason: "transport parameters missing".into(),
2847                            })?;
2848
2849                    if self.has_0rtt() {
2850                        if !self.crypto.early_data_accepted().unwrap() {
2851                            debug_assert!(self.side.is_client());
2852                            debug!("0-RTT rejected");
2853                            self.accepted_0rtt = false;
2854                            self.streams.zero_rtt_rejected();
2855
2856                            // Discard already-queued frames
2857                            self.spaces[SpaceId::Data].pending = Retransmits::default();
2858
2859                            // Discard 0-RTT packets
2860                            let sent_packets =
2861                                mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2862                            for (pn, packet) in sent_packets {
2863                                self.remove_in_flight(pn, &packet);
2864                            }
2865                        } else {
2866                            self.accepted_0rtt = true;
2867                            params.validate_resumption_from(&self.peer_params)?;
2868                        }
2869                    }
2870                    if let Some(token) = params.stateless_reset_token {
2871                        self.endpoint_events
2872                            .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2873                    }
2874                    self.handle_peer_params(params)?;
2875                    self.issue_first_cids(now);
2876                } else {
2877                    // Server-only
2878                    self.spaces[SpaceId::Data].pending.handshake_done = true;
2879                    self.discard_space(now, SpaceId::Handshake);
2880                }
2881
2882                self.events.push_back(Event::Connected);
2883                self.state = State::Established;
2884                trace!("established");
2885                Ok(())
2886            }
2887            Header::Initial(InitialHeader {
2888                src_cid: rem_cid, ..
2889            }) => {
2890                if !state.rem_cid_set {
2891                    trace!("switching remote CID to {}", rem_cid);
2892                    let mut state = state.clone();
2893                    self.rem_cids.update_initial_cid(rem_cid);
2894                    self.rem_handshake_cid = rem_cid;
2895                    self.orig_rem_cid = rem_cid;
2896                    state.rem_cid_set = true;
2897                    self.state = State::Handshake(state);
2898                } else if rem_cid != self.rem_handshake_cid {
2899                    debug!(
2900                        "discarding packet with mismatched remote CID: {} != {}",
2901                        self.rem_handshake_cid, rem_cid
2902                    );
2903                    return Ok(());
2904                }
2905
2906                let starting_space = self.highest_space;
2907                self.process_early_payload(now, packet)?;
2908
2909                if self.side.is_server()
2910                    && starting_space == SpaceId::Initial
2911                    && self.highest_space != SpaceId::Initial
2912                {
2913                    let params =
2914                        self.crypto
2915                            .transport_parameters()?
2916                            .ok_or_else(|| TransportError {
2917                                code: TransportErrorCode::crypto(0x6d),
2918                                frame: None,
2919                                reason: "transport parameters missing".into(),
2920                            })?;
2921                    self.handle_peer_params(params)?;
2922                    self.issue_first_cids(now);
2923                    self.init_0rtt();
2924                }
2925                Ok(())
2926            }
2927            Header::Long {
2928                ty: LongType::ZeroRtt,
2929                ..
2930            } => {
2931                self.process_payload(now, remote, number.unwrap(), packet)?;
2932                Ok(())
2933            }
2934            Header::VersionNegotiate { .. } => {
2935                if self.total_authed_packets > 1 {
2936                    return Ok(());
2937                }
2938                let supported = packet
2939                    .payload
2940                    .chunks(4)
2941                    .any(|x| match <[u8; 4]>::try_from(x) {
2942                        Ok(version) => self.version == u32::from_be_bytes(version),
2943                        Err(_) => false,
2944                    });
2945                if supported {
2946                    return Ok(());
2947                }
2948                debug!("remote doesn't support our version");
2949                Err(ConnectionError::VersionMismatch)
2950            }
2951            Header::Short { .. } => unreachable!(
2952                "short packets received during handshake are discarded in handle_packet"
2953            ),
2954        }
2955    }
2956
2957    /// Process an Initial or Handshake packet payload
2958    fn process_early_payload(
2959        &mut self,
2960        now: Instant,
2961        packet: Packet,
2962    ) -> Result<(), TransportError> {
2963        debug_assert_ne!(packet.header.space(), SpaceId::Data);
2964        let payload_len = packet.payload.len();
2965        let mut ack_eliciting = false;
2966        for result in frame::Iter::new(packet.payload.freeze())? {
2967            let frame = result?;
2968            let span = match frame {
2969                Frame::Padding => continue,
2970                _ => Some(trace_span!("frame", ty = %frame.ty())),
2971            };
2972
2973            self.stats.frame_rx.record(&frame);
2974
2975            let _guard = span.as_ref().map(|x| x.enter());
2976            ack_eliciting |= frame.is_ack_eliciting();
2977
2978            // Process frames
2979            match frame {
2980                Frame::Padding | Frame::Ping => {}
2981                Frame::Crypto(frame) => {
2982                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
2983                }
2984                Frame::Ack(ack) => {
2985                    self.on_ack_received(now, packet.header.space(), ack)?;
2986                }
2987                Frame::Close(reason) => {
2988                    self.error = Some(reason.into());
2989                    self.state = State::Draining;
2990                    return Ok(());
2991                }
2992                _ => {
2993                    let mut err =
2994                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
2995                    err.frame = Some(frame.ty());
2996                    return Err(err);
2997                }
2998            }
2999        }
3000
3001        if ack_eliciting {
3002            // In the initial and handshake spaces, ACKs must be sent immediately
3003            self.spaces[packet.header.space()]
3004                .pending_acks
3005                .set_immediate_ack_required();
3006        }
3007
3008        self.write_crypto();
3009        Ok(())
3010    }
3011
3012    fn process_payload(
3013        &mut self,
3014        now: Instant,
3015        remote: SocketAddr,
3016        number: u64,
3017        packet: Packet,
3018    ) -> Result<(), TransportError> {
3019        let payload = packet.payload.freeze();
3020        let mut is_probing_packet = true;
3021        let mut close = None;
3022        let payload_len = payload.len();
3023        let mut ack_eliciting = false;
3024        for result in frame::Iter::new(payload)? {
3025            let frame = result?;
3026            let span = match frame {
3027                Frame::Padding => continue,
3028                _ => Some(trace_span!("frame", ty = %frame.ty())),
3029            };
3030
3031            self.stats.frame_rx.record(&frame);
3032            // Crypto, Stream and Datagram frames are special cased in order no pollute
3033            // the log with payload data
3034            match &frame {
3035                Frame::Crypto(f) => {
3036                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
3037                }
3038                Frame::Stream(f) => {
3039                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
3040                }
3041                Frame::Datagram(f) => {
3042                    trace!(len = f.data.len(), "got datagram frame");
3043                }
3044                f => {
3045                    trace!("got frame {:?}", f);
3046                }
3047            }
3048
3049            let _guard = span.as_ref().map(|x| x.enter());
3050            if packet.header.is_0rtt() {
3051                match frame {
3052                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
3053                        return Err(TransportError::PROTOCOL_VIOLATION(
3054                            "illegal frame type in 0-RTT",
3055                        ));
3056                    }
3057                    _ => {}
3058                }
3059            }
3060            ack_eliciting |= frame.is_ack_eliciting();
3061
3062            // Check whether this could be a probing packet
3063            match frame {
3064                Frame::Padding
3065                | Frame::PathChallenge(_)
3066                | Frame::PathResponse(_)
3067                | Frame::NewConnectionId(_) => {}
3068                _ => {
3069                    is_probing_packet = false;
3070                }
3071            }
3072            match frame {
3073                Frame::Crypto(frame) => {
3074                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
3075                }
3076                Frame::Stream(frame) => {
3077                    if self.streams.received(frame, payload_len)?.should_transmit() {
3078                        self.spaces[SpaceId::Data].pending.max_data = true;
3079                    }
3080                }
3081                Frame::Ack(ack) => {
3082                    self.on_ack_received(now, SpaceId::Data, ack)?;
3083                }
3084                Frame::Padding | Frame::Ping => {}
3085                Frame::Close(reason) => {
3086                    close = Some(reason);
3087                }
3088                Frame::PathChallenge(token) => {
3089                    self.path_responses.push(number, token, remote);
3090                    if remote == self.path.remote {
3091                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
3092                        // attack. Send a non-probing packet to recover the active path.
3093                        match self.peer_supports_ack_frequency() {
3094                            true => self.immediate_ack(),
3095                            false => self.ping(),
3096                        }
3097                    }
3098                }
3099                Frame::PathResponse(token) => {
3100                    if self.path.challenge == Some(token) && remote == self.path.remote {
3101                        trace!("new path validated");
3102                        self.timers.stop(Timer::PathValidation);
3103                        self.path.challenge = None;
3104                        self.path.validated = true;
3105                        if let Some((_, ref mut prev_path)) = self.prev_path {
3106                            prev_path.challenge = None;
3107                            prev_path.challenge_pending = false;
3108                        }
3109                    } else if let Some(nat_traversal) = &mut self.nat_traversal {
3110                        // Check if this is a response to NAT traversal PATH_CHALLENGE
3111                        match nat_traversal.handle_validation_success(remote, token) {
3112                            Ok(sequence) => {
3113                                trace!("NAT traversal candidate {} validated for sequence {}", remote, sequence);
3114                                
3115                                // Check if this was part of a coordination round
3116                                if nat_traversal.handle_coordination_success(remote) {
3117                                    trace!("Coordination succeeded via {}", remote);
3118                                    // Could trigger connection migration here if this is the best path
3119                                } else {
3120                                    // Mark the candidate pair as succeeded for regular validation
3121                                    if nat_traversal.mark_pair_succeeded(remote) {
3122                                        trace!("NAT traversal pair succeeded for {}", remote);
3123                                    }
3124                                }
3125                            }
3126                            Err(NatTraversalError::ChallengeMismatch) => {
3127                                debug!("PATH_RESPONSE challenge mismatch for NAT candidate {}", remote);
3128                            }
3129                            Err(e) => {
3130                                debug!("NAT traversal validation error: {}", e);
3131                            }
3132                        }
3133                    } else {
3134                        debug!(token, "ignoring invalid PATH_RESPONSE");
3135                    }
3136                }
3137                Frame::MaxData(bytes) => {
3138                    self.streams.received_max_data(bytes);
3139                }
3140                Frame::MaxStreamData { id, offset } => {
3141                    self.streams.received_max_stream_data(id, offset)?;
3142                }
3143                Frame::MaxStreams { dir, count } => {
3144                    self.streams.received_max_streams(dir, count)?;
3145                }
3146                Frame::ResetStream(frame) => {
3147                    if self.streams.received_reset(frame)?.should_transmit() {
3148                        self.spaces[SpaceId::Data].pending.max_data = true;
3149                    }
3150                }
3151                Frame::DataBlocked { offset } => {
3152                    debug!(offset, "peer claims to be blocked at connection level");
3153                }
3154                Frame::StreamDataBlocked { id, offset } => {
3155                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
3156                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
3157                        return Err(TransportError::STREAM_STATE_ERROR(
3158                            "STREAM_DATA_BLOCKED on send-only stream",
3159                        ));
3160                    }
3161                    debug!(
3162                        stream = %id,
3163                        offset, "peer claims to be blocked at stream level"
3164                    );
3165                }
3166                Frame::StreamsBlocked { dir, limit } => {
3167                    if limit > MAX_STREAM_COUNT {
3168                        return Err(TransportError::FRAME_ENCODING_ERROR(
3169                            "unrepresentable stream limit",
3170                        ));
3171                    }
3172                    debug!(
3173                        "peer claims to be blocked opening more than {} {} streams",
3174                        limit, dir
3175                    );
3176                }
3177                Frame::StopSending(frame::StopSending { id, error_code }) => {
3178                    if id.initiator() != self.side.side() {
3179                        if id.dir() == Dir::Uni {
3180                            debug!("got STOP_SENDING on recv-only {}", id);
3181                            return Err(TransportError::STREAM_STATE_ERROR(
3182                                "STOP_SENDING on recv-only stream",
3183                            ));
3184                        }
3185                    } else if self.streams.is_local_unopened(id) {
3186                        return Err(TransportError::STREAM_STATE_ERROR(
3187                            "STOP_SENDING on unopened stream",
3188                        ));
3189                    }
3190                    self.streams.received_stop_sending(id, error_code);
3191                }
3192                Frame::RetireConnectionId { sequence } => {
3193                    let allow_more_cids = self
3194                        .local_cid_state
3195                        .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
3196                    self.endpoint_events
3197                        .push_back(EndpointEventInner::RetireConnectionId(
3198                            now,
3199                            sequence,
3200                            allow_more_cids,
3201                        ));
3202                }
3203                Frame::NewConnectionId(frame) => {
3204                    trace!(
3205                        sequence = frame.sequence,
3206                        id = %frame.id,
3207                        retire_prior_to = frame.retire_prior_to,
3208                    );
3209                    if self.rem_cids.active().is_empty() {
3210                        return Err(TransportError::PROTOCOL_VIOLATION(
3211                            "NEW_CONNECTION_ID when CIDs aren't in use",
3212                        ));
3213                    }
3214                    if frame.retire_prior_to > frame.sequence {
3215                        return Err(TransportError::PROTOCOL_VIOLATION(
3216                            "NEW_CONNECTION_ID retiring unissued CIDs",
3217                        ));
3218                    }
3219
3220                    use crate::cid_queue::InsertError;
3221                    match self.rem_cids.insert(frame) {
3222                        Ok(None) => {}
3223                        Ok(Some((retired, reset_token))) => {
3224                            let pending_retired =
3225                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
3226                            /// Ensure `pending_retired` cannot grow without bound. Limit is
3227                            /// somewhat arbitrary but very permissive.
3228                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
3229                            // We don't bother counting in-flight frames because those are bounded
3230                            // by congestion control.
3231                            if (pending_retired.len() as u64)
3232                                .saturating_add(retired.end.saturating_sub(retired.start))
3233                                > MAX_PENDING_RETIRED_CIDS
3234                            {
3235                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
3236                                    "queued too many retired CIDs",
3237                                ));
3238                            }
3239                            pending_retired.extend(retired);
3240                            self.set_reset_token(reset_token);
3241                        }
3242                        Err(InsertError::ExceedsLimit) => {
3243                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
3244                        }
3245                        Err(InsertError::Retired) => {
3246                            trace!("discarding already-retired");
3247                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
3248                            // range of connection IDs larger than the active connection ID limit
3249                            // was retired all at once via retire_prior_to.
3250                            self.spaces[SpaceId::Data]
3251                                .pending
3252                                .retire_cids
3253                                .push(frame.sequence);
3254                            continue;
3255                        }
3256                    };
3257
3258                    if self.side.is_server() && self.rem_cids.active_seq() == 0 {
3259                        // We're a server still using the initial remote CID for the client, so
3260                        // let's switch immediately to enable clientside stateless resets.
3261                        self.update_rem_cid();
3262                    }
3263                }
3264                Frame::NewToken(NewToken { token }) => {
3265                    let ConnectionSide::Client {
3266                        token_store,
3267                        server_name,
3268                        ..
3269                    } = &self.side
3270                    else {
3271                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
3272                    };
3273                    if token.is_empty() {
3274                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
3275                    }
3276                    trace!("got new token");
3277                    token_store.insert(server_name, token);
3278                }
3279                Frame::Datagram(datagram) => {
3280                    if self
3281                        .datagrams
3282                        .received(datagram, &self.config.datagram_receive_buffer_size)?
3283                    {
3284                        self.events.push_back(Event::DatagramReceived);
3285                    }
3286                }
3287                Frame::AckFrequency(ack_frequency) => {
3288                    // This frame can only be sent in the Data space
3289                    let space = &mut self.spaces[SpaceId::Data];
3290
3291                    if !self
3292                        .ack_frequency
3293                        .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3294                    {
3295                        // The AckFrequency frame is stale (we have already received a more recent one)
3296                        continue;
3297                    }
3298
3299                    // Our `max_ack_delay` has been updated, so we may need to adjust its associated
3300                    // timeout
3301                    if let Some(timeout) = space
3302                        .pending_acks
3303                        .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3304                    {
3305                        self.timers.set(Timer::MaxAckDelay, timeout);
3306                    }
3307                }
3308                Frame::ImmediateAck => {
3309                    // This frame can only be sent in the Data space
3310                    self.spaces[SpaceId::Data]
3311                        .pending_acks
3312                        .set_immediate_ack_required();
3313                }
3314                Frame::HandshakeDone => {
3315                    if self.side.is_server() {
3316                        return Err(TransportError::PROTOCOL_VIOLATION(
3317                            "client sent HANDSHAKE_DONE",
3318                        ));
3319                    }
3320                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
3321                        self.discard_space(now, SpaceId::Handshake);
3322                    }
3323                }
3324                Frame::AddAddress(add_address) => {
3325                    self.handle_add_address(&add_address, now)?;
3326                }
3327                Frame::PunchMeNow(punch_me_now) => {
3328                    self.handle_punch_me_now(&punch_me_now, now)?;
3329                }
3330                Frame::RemoveAddress(remove_address) => {
3331                    self.handle_remove_address(&remove_address)?;
3332                }
3333            }
3334        }
3335
3336        let space = &mut self.spaces[SpaceId::Data];
3337        if space
3338            .pending_acks
3339            .packet_received(now, number, ack_eliciting, &space.dedup)
3340        {
3341            self.timers
3342                .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3343        }
3344
3345        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
3346        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
3347        // are only freed, and hence only issue credit, once the application has been notified
3348        // during a read on the stream.
3349        let pending = &mut self.spaces[SpaceId::Data].pending;
3350        self.streams.queue_max_stream_id(pending);
3351
3352        if let Some(reason) = close {
3353            self.error = Some(reason.into());
3354            self.state = State::Draining;
3355            self.close = true;
3356        }
3357
3358        if remote != self.path.remote
3359            && !is_probing_packet
3360            && number == self.spaces[SpaceId::Data].rx_packet
3361        {
3362            let ConnectionSide::Server { ref server_config } = self.side else {
3363                panic!("packets from unknown remote should be dropped by clients");
3364            };
3365            debug_assert!(
3366                server_config.migration,
3367                "migration-initiating packets should have been dropped immediately"
3368            );
3369            self.migrate(now, remote);
3370            // Break linkability, if possible
3371            self.update_rem_cid();
3372            self.spin = false;
3373        }
3374
3375        Ok(())
3376    }
3377
3378    fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3379        trace!(%remote, "migration initiated");
3380        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
3381        // Note that the congestion window will not grow until validation terminates. Helps mitigate
3382        // amplification attacks performed by spoofing source addresses.
3383        let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3384            PathData::from_previous(remote, &self.path, now)
3385        } else {
3386            let peer_max_udp_payload_size =
3387                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3388                    .unwrap_or(u16::MAX);
3389            PathData::new(
3390                remote,
3391                self.allow_mtud,
3392                Some(peer_max_udp_payload_size),
3393                now,
3394                &self.config,
3395            )
3396        };
3397        new_path.challenge = Some(self.rng.random());
3398        new_path.challenge_pending = true;
3399        let prev_pto = self.pto(SpaceId::Data);
3400
3401        let mut prev = mem::replace(&mut self.path, new_path);
3402        // Don't clobber the original path if the previous one hasn't been validated yet
3403        if prev.challenge.is_none() {
3404            prev.challenge = Some(self.rng.random());
3405            prev.challenge_pending = true;
3406            // We haven't updated the remote CID yet, this captures the remote CID we were using on
3407            // the previous path.
3408            self.prev_path = Some((self.rem_cids.active(), prev));
3409        }
3410
3411        self.timers.set(
3412            Timer::PathValidation,
3413            now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3414        );
3415    }
3416
3417    /// Handle a change in the local address, i.e. an active migration
3418    pub fn local_address_changed(&mut self) {
3419        self.update_rem_cid();
3420        self.ping();
3421    }
3422
3423    /// Switch to a previously unused remote connection ID, if possible
3424    fn update_rem_cid(&mut self) {
3425        let (reset_token, retired) = match self.rem_cids.next() {
3426            Some(x) => x,
3427            None => return,
3428        };
3429
3430        // Retire the current remote CID and any CIDs we had to skip.
3431        self.spaces[SpaceId::Data]
3432            .pending
3433            .retire_cids
3434            .extend(retired);
3435        self.set_reset_token(reset_token);
3436    }
3437
3438    fn set_reset_token(&mut self, reset_token: ResetToken) {
3439        self.endpoint_events
3440            .push_back(EndpointEventInner::ResetToken(
3441                self.path.remote,
3442                reset_token,
3443            ));
3444        self.peer_params.stateless_reset_token = Some(reset_token);
3445    }
3446
3447    /// Issue an initial set of connection IDs to the peer upon connection
3448    fn issue_first_cids(&mut self, now: Instant) {
3449        if self.local_cid_state.cid_len() == 0 {
3450            return;
3451        }
3452
3453        // Subtract 1 to account for the CID we supplied while handshaking
3454        let mut n = self.peer_params.issue_cids_limit() - 1;
3455        if let ConnectionSide::Server { server_config } = &self.side {
3456            if server_config.has_preferred_address() {
3457                // We also sent a CID in the transport parameters
3458                n -= 1;
3459            }
3460        }
3461        self.endpoint_events
3462            .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3463    }
3464
3465    fn populate_packet(
3466        &mut self,
3467        now: Instant,
3468        space_id: SpaceId,
3469        buf: &mut Vec<u8>,
3470        max_size: usize,
3471        pn: u64,
3472    ) -> SentFrames {
3473        let mut sent = SentFrames::default();
3474        let space = &mut self.spaces[space_id];
3475        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3476        space.pending_acks.maybe_ack_non_eliciting();
3477
3478        // HANDSHAKE_DONE
3479        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3480            buf.write(frame::FrameType::HANDSHAKE_DONE);
3481            sent.retransmits.get_or_create().handshake_done = true;
3482            // This is just a u8 counter and the frame is typically just sent once
3483            self.stats.frame_tx.handshake_done =
3484                self.stats.frame_tx.handshake_done.saturating_add(1);
3485        }
3486
3487        // PING
3488        if mem::replace(&mut space.ping_pending, false) {
3489            trace!("PING");
3490            buf.write(frame::FrameType::PING);
3491            sent.non_retransmits = true;
3492            self.stats.frame_tx.ping += 1;
3493        }
3494
3495        // IMMEDIATE_ACK
3496        if mem::replace(&mut space.immediate_ack_pending, false) {
3497            trace!("IMMEDIATE_ACK");
3498            buf.write(frame::FrameType::IMMEDIATE_ACK);
3499            sent.non_retransmits = true;
3500            self.stats.frame_tx.immediate_ack += 1;
3501        }
3502
3503        // ACK
3504        if space.pending_acks.can_send() {
3505            Self::populate_acks(
3506                now,
3507                self.receiving_ecn,
3508                &mut sent,
3509                space,
3510                buf,
3511                &mut self.stats,
3512            );
3513        }
3514
3515        // ACK_FREQUENCY
3516        if mem::replace(&mut space.pending.ack_frequency, false) {
3517            let sequence_number = self.ack_frequency.next_sequence_number();
3518
3519            // Safe to unwrap because this is always provided when ACK frequency is enabled
3520            let config = self.config.ack_frequency_config.as_ref().unwrap();
3521
3522            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
3523            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3524                self.path.rtt.get(),
3525                config,
3526                &self.peer_params,
3527            );
3528
3529            trace!(?max_ack_delay, "ACK_FREQUENCY");
3530
3531            frame::AckFrequency {
3532                sequence: sequence_number,
3533                ack_eliciting_threshold: config.ack_eliciting_threshold,
3534                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3535                reordering_threshold: config.reordering_threshold,
3536            }
3537            .encode(buf);
3538
3539            sent.retransmits.get_or_create().ack_frequency = true;
3540
3541            self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3542            self.stats.frame_tx.ack_frequency += 1;
3543        }
3544
3545        // PATH_CHALLENGE
3546        if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3547            // Transmit challenges with every outgoing frame on an unvalidated path
3548            if let Some(token) = self.path.challenge {
3549                // But only send a packet solely for that purpose at most once
3550                self.path.challenge_pending = false;
3551                sent.non_retransmits = true;
3552                sent.requires_padding = true;
3553                trace!("PATH_CHALLENGE {:08x}", token);
3554                buf.write(frame::FrameType::PATH_CHALLENGE);
3555                buf.write(token);
3556                self.stats.frame_tx.path_challenge += 1;
3557            }
3558        }
3559
3560        // PATH_RESPONSE
3561        if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3562            if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3563                sent.non_retransmits = true;
3564                sent.requires_padding = true;
3565                trace!("PATH_RESPONSE {:08x}", token);
3566                buf.write(frame::FrameType::PATH_RESPONSE);
3567                buf.write(token);
3568                self.stats.frame_tx.path_response += 1;
3569            }
3570        }
3571
3572        // CRYPTO
3573        while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3574            let mut frame = match space.pending.crypto.pop_front() {
3575                Some(x) => x,
3576                None => break,
3577            };
3578
3579            // Calculate the maximum amount of crypto data we can store in the buffer.
3580            // Since the offset is known, we can reserve the exact size required to encode it.
3581            // For length we reserve 2bytes which allows to encode up to 2^14,
3582            // which is more than what fits into normally sized QUIC frames.
3583            let max_crypto_data_size = max_size
3584                - buf.len()
3585                - 1 // Frame Type
3586                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3587                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
3588
3589            let len = frame
3590                .data
3591                .len()
3592                .min(2usize.pow(14) - 1)
3593                .min(max_crypto_data_size);
3594
3595            let data = frame.data.split_to(len);
3596            let truncated = frame::Crypto {
3597                offset: frame.offset,
3598                data,
3599            };
3600            trace!(
3601                "CRYPTO: off {} len {}",
3602                truncated.offset,
3603                truncated.data.len()
3604            );
3605            truncated.encode(buf);
3606            self.stats.frame_tx.crypto += 1;
3607            sent.retransmits.get_or_create().crypto.push_back(truncated);
3608            if !frame.data.is_empty() {
3609                frame.offset += len as u64;
3610                space.pending.crypto.push_front(frame);
3611            }
3612        }
3613
3614        if space_id == SpaceId::Data {
3615            self.streams.write_control_frames(
3616                buf,
3617                &mut space.pending,
3618                &mut sent.retransmits,
3619                &mut self.stats.frame_tx,
3620                max_size,
3621            );
3622        }
3623
3624        // NEW_CONNECTION_ID
3625        while buf.len() + 44 < max_size {
3626            let issued = match space.pending.new_cids.pop() {
3627                Some(x) => x,
3628                None => break,
3629            };
3630            trace!(
3631                sequence = issued.sequence,
3632                id = %issued.id,
3633                "NEW_CONNECTION_ID"
3634            );
3635            frame::NewConnectionId {
3636                sequence: issued.sequence,
3637                retire_prior_to: self.local_cid_state.retire_prior_to(),
3638                id: issued.id,
3639                reset_token: issued.reset_token,
3640            }
3641            .encode(buf);
3642            sent.retransmits.get_or_create().new_cids.push(issued);
3643            self.stats.frame_tx.new_connection_id += 1;
3644        }
3645
3646        // RETIRE_CONNECTION_ID
3647        while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3648            let seq = match space.pending.retire_cids.pop() {
3649                Some(x) => x,
3650                None => break,
3651            };
3652            trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3653            buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3654            buf.write_var(seq);
3655            sent.retransmits.get_or_create().retire_cids.push(seq);
3656            self.stats.frame_tx.retire_connection_id += 1;
3657        }
3658
3659        // DATAGRAM
3660        let mut sent_datagrams = false;
3661        while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3662            match self.datagrams.write(buf, max_size) {
3663                true => {
3664                    sent_datagrams = true;
3665                    sent.non_retransmits = true;
3666                    self.stats.frame_tx.datagram += 1;
3667                }
3668                false => break,
3669            }
3670        }
3671        if self.datagrams.send_blocked && sent_datagrams {
3672            self.events.push_back(Event::DatagramsUnblocked);
3673            self.datagrams.send_blocked = false;
3674        }
3675
3676        // NEW_TOKEN
3677        while let Some(remote_addr) = space.pending.new_tokens.pop() {
3678            debug_assert_eq!(space_id, SpaceId::Data);
3679            let ConnectionSide::Server { server_config } = &self.side else {
3680                panic!("NEW_TOKEN frames should not be enqueued by clients");
3681            };
3682
3683            if remote_addr != self.path.remote {
3684                // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
3685                // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
3686                // frames upon an path change. Instead, when the new path becomes validated,
3687                // NEW_TOKEN frames may be enqueued for the new path instead.
3688                continue;
3689            }
3690
3691            let token = Token::new(
3692                TokenPayload::Validation {
3693                    ip: remote_addr.ip(),
3694                    issued: server_config.time_source.now(),
3695                },
3696                &mut self.rng,
3697            );
3698            let new_token = NewToken {
3699                token: token.encode(&*server_config.token_key).into(),
3700            };
3701
3702            if buf.len() + new_token.size() >= max_size {
3703                space.pending.new_tokens.push(remote_addr);
3704                break;
3705            }
3706
3707            new_token.encode(buf);
3708            sent.retransmits
3709                .get_or_create()
3710                .new_tokens
3711                .push(remote_addr);
3712            self.stats.frame_tx.new_token += 1;
3713        }
3714
3715        // STREAM
3716        if space_id == SpaceId::Data {
3717            sent.stream_frames =
3718                self.streams
3719                    .write_stream_frames(buf, max_size, self.config.send_fairness);
3720            self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
3721        }
3722
3723        sent
3724    }
3725
3726    /// Write pending ACKs into a buffer
3727    ///
3728    /// This method assumes ACKs are pending, and should only be called if
3729    /// `!PendingAcks::ranges().is_empty()` returns `true`.
3730    fn populate_acks(
3731        now: Instant,
3732        receiving_ecn: bool,
3733        sent: &mut SentFrames,
3734        space: &mut PacketSpace,
3735        buf: &mut Vec<u8>,
3736        stats: &mut ConnectionStats,
3737    ) {
3738        debug_assert!(!space.pending_acks.ranges().is_empty());
3739
3740        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
3741        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
3742        let ecn = if receiving_ecn {
3743            Some(&space.ecn_counters)
3744        } else {
3745            None
3746        };
3747        sent.largest_acked = space.pending_acks.ranges().max();
3748
3749        let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
3750
3751        // TODO: This should come from `TransportConfig` if that gets configurable.
3752        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
3753        let delay = delay_micros >> ack_delay_exp.into_inner();
3754
3755        trace!(
3756            "ACK {:?}, Delay = {}us",
3757            space.pending_acks.ranges(),
3758            delay_micros
3759        );
3760
3761        frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
3762        stats.frame_tx.acks += 1;
3763    }
3764
3765    fn close_common(&mut self) {
3766        trace!("connection closed");
3767        for &timer in &Timer::VALUES {
3768            self.timers.stop(timer);
3769        }
3770    }
3771
3772    fn set_close_timer(&mut self, now: Instant) {
3773        self.timers
3774            .set(Timer::Close, now + 3 * self.pto(self.highest_space));
3775    }
3776
3777    /// Handle transport parameters received from the peer
3778    fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
3779        if Some(self.orig_rem_cid) != params.initial_src_cid
3780            || (self.side.is_client()
3781                && (Some(self.initial_dst_cid) != params.original_dst_cid
3782                    || self.retry_src_cid != params.retry_src_cid))
3783        {
3784            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
3785                "CID authentication failure",
3786            ));
3787        }
3788
3789        self.set_peer_params(params);
3790
3791        Ok(())
3792    }
3793
3794    fn set_peer_params(&mut self, params: TransportParameters) {
3795        self.streams.set_params(&params);
3796        self.idle_timeout =
3797            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
3798        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
3799        if let Some(ref info) = params.preferred_address {
3800            self.rem_cids.insert(frame::NewConnectionId {
3801                sequence: 1,
3802                id: info.connection_id,
3803                reset_token: info.stateless_reset_token,
3804                retire_prior_to: 0,
3805            }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
3806        }
3807        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
3808        self.peer_params = params;
3809        self.path.mtud.on_peer_max_udp_payload_size_received(
3810            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
3811        );
3812
3813        // Initialize NAT traversal if both peers support it
3814        if let Some(nat_config) = &params.nat_traversal {
3815            self.init_nat_traversal(nat_config);
3816        }
3817    }
3818
3819    /// Initialize NAT traversal state from negotiated transport parameters
3820    fn init_nat_traversal(&mut self, peer_config: &crate::transport_parameters::NatTraversalConfig) {
3821        use crate::transport_parameters::NatTraversalRole as TPRole;
3822        
3823        // Convert transport parameter role to internal role
3824        let role = match peer_config.role {
3825            TPRole::Client => NatTraversalRole::Client,
3826            TPRole::Server { can_relay } => NatTraversalRole::Server { can_relay },
3827            TPRole::Bootstrap => NatTraversalRole::Bootstrap,
3828        };
3829
3830        // Use more restrictive limits
3831        let max_candidates = peer_config.max_candidates.into_inner()
3832            .min(50) as u32; // Cap at reasonable limit
3833        let coordination_timeout = Duration::from_millis(
3834            peer_config.coordination_timeout.into_inner().min(10000) // Cap at 10 seconds
3835        );
3836
3837        self.nat_traversal = Some(NatTraversalState::new(
3838            role,
3839            max_candidates,
3840            coordination_timeout,
3841        ));
3842
3843        trace!("NAT traversal initialized with role {:?}", role);
3844    }
3845
3846    /// Handle AddAddress frame from peer
3847    fn handle_add_address(
3848        &mut self,
3849        add_address: &crate::frame::AddAddress,
3850        now: Instant,
3851    ) -> Result<(), TransportError> {
3852        let nat_state = self.nat_traversal.as_mut()
3853            .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("AddAddress frame without NAT traversal negotiation"))?;
3854
3855        match nat_state.add_remote_candidate(
3856            add_address.sequence,
3857            add_address.address,
3858            add_address.priority,
3859            now,
3860        ) {
3861            Ok(()) => {
3862                trace!("Added remote candidate: {} (seq={}, priority={})", 
3863                       add_address.address, add_address.sequence, add_address.priority);
3864                
3865                // TODO: Trigger validation of this candidate
3866                // This will be implemented when we add the packet sending logic
3867                Ok(())
3868            }
3869            Err(NatTraversalError::TooManyCandidates) => {
3870                Err(TransportError::PROTOCOL_VIOLATION("too many NAT traversal candidates"))
3871            }
3872            Err(NatTraversalError::DuplicateAddress) => {
3873                // Silently ignore duplicates (peer may resend)
3874                Ok(())
3875            }
3876            Err(e) => {
3877                warn!("Failed to add remote candidate: {}", e);
3878                Ok(()) // Don't terminate connection for non-critical errors
3879            }
3880        }
3881    }
3882
3883    /// Handle PunchMeNow frame from peer (via coordinator)
3884    fn handle_punch_me_now(
3885        &mut self,
3886        punch_me_now: &crate::frame::PunchMeNow,
3887        now: Instant,
3888    ) -> Result<(), TransportError> {
3889        let nat_state = self.nat_traversal.as_mut()
3890            .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation"))?;
3891
3892        trace!("Received PunchMeNow: round={}, target_seq={}, local_addr={}", 
3893               punch_me_now.round, punch_me_now.target_sequence, punch_me_now.local_address);
3894        
3895        // Handle peer's coordination request
3896        if nat_state.handle_peer_punch_request(punch_me_now.round, now) {
3897            trace!("Coordination synchronized for round {}", punch_me_now.round);
3898            
3899            // Extract peer's target addresses if this was relayed by coordinator
3900            // In full implementation, the coordinator would relay both sides' addresses
3901            // For now, we use the local_address from the frame as the peer's target
3902            
3903            // TODO: Create punch targets based on peer's candidates
3904            // This is where we'd use the candidate pairing system
3905            
3906        } else {
3907            debug!("Failed to synchronize coordination for round {}", punch_me_now.round);
3908        }
3909        
3910        Ok(())
3911    }
3912
3913    /// Handle RemoveAddress frame from peer
3914    fn handle_remove_address(
3915        &mut self,
3916        remove_address: &crate::frame::RemoveAddress,
3917    ) -> Result<(), TransportError> {
3918        let nat_state = self.nat_traversal.as_mut()
3919            .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("RemoveAddress frame without NAT traversal negotiation"))?;
3920
3921        if nat_state.remove_candidate(remove_address.sequence) {
3922            trace!("Removed candidate with sequence {}", remove_address.sequence);
3923        } else {
3924            trace!("Attempted to remove unknown candidate sequence {}", remove_address.sequence);
3925        }
3926
3927        Ok(())
3928    }
3929
3930    fn decrypt_packet(
3931        &mut self,
3932        now: Instant,
3933        packet: &mut Packet,
3934    ) -> Result<Option<u64>, Option<TransportError>> {
3935        let result = packet_crypto::decrypt_packet_body(
3936            packet,
3937            &self.spaces,
3938            self.zero_rtt_crypto.as_ref(),
3939            self.key_phase,
3940            self.prev_crypto.as_ref(),
3941            self.next_crypto.as_ref(),
3942        )?;
3943
3944        let result = match result {
3945            Some(r) => r,
3946            None => return Ok(None),
3947        };
3948
3949        if result.outgoing_key_update_acked {
3950            if let Some(prev) = self.prev_crypto.as_mut() {
3951                prev.end_packet = Some((result.number, now));
3952                self.set_key_discard_timer(now, packet.header.space());
3953            }
3954        }
3955
3956        if result.incoming_key_update {
3957            trace!("key update authenticated");
3958            self.update_keys(Some((result.number, now)), true);
3959            self.set_key_discard_timer(now, packet.header.space());
3960        }
3961
3962        Ok(Some(result.number))
3963    }
3964
3965    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
3966        trace!("executing key update");
3967        // Generate keys for the key phase after the one we're switching to, store them in
3968        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
3969        // `prev_crypto`.
3970        let new = self
3971            .crypto
3972            .next_1rtt_keys()
3973            .expect("only called for `Data` packets");
3974        self.key_phase_size = new
3975            .local
3976            .confidentiality_limit()
3977            .saturating_sub(KEY_UPDATE_MARGIN);
3978        let old = mem::replace(
3979            &mut self.spaces[SpaceId::Data]
3980                .crypto
3981                .as_mut()
3982                .unwrap() // safe because update_keys() can only be triggered by short packets
3983                .packet,
3984            mem::replace(self.next_crypto.as_mut().unwrap(), new),
3985        );
3986        self.spaces[SpaceId::Data].sent_with_keys = 0;
3987        self.prev_crypto = Some(PrevCrypto {
3988            crypto: old,
3989            end_packet,
3990            update_unacked: remote,
3991        });
3992        self.key_phase = !self.key_phase;
3993    }
3994
3995    fn peer_supports_ack_frequency(&self) -> bool {
3996        self.peer_params.min_ack_delay.is_some()
3997    }
3998
3999    /// Send an IMMEDIATE_ACK frame to the remote endpoint
4000    ///
4001    /// According to the spec, this will result in an error if the remote endpoint does not support
4002    /// the Acknowledgement Frequency extension
4003    pub(crate) fn immediate_ack(&mut self) {
4004        self.spaces[self.highest_space].immediate_ack_pending = true;
4005    }
4006
4007    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
4008    #[cfg(test)]
4009    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
4010        let (first_decode, remaining) = match &event.0 {
4011            ConnectionEventInner::Datagram(DatagramConnectionEvent {
4012                first_decode,
4013                remaining,
4014                ..
4015            }) => (first_decode, remaining),
4016            _ => return None,
4017        };
4018
4019        if remaining.is_some() {
4020            panic!("Packets should never be coalesced in tests");
4021        }
4022
4023        let decrypted_header = packet_crypto::unprotect_header(
4024            first_decode.clone(),
4025            &self.spaces,
4026            self.zero_rtt_crypto.as_ref(),
4027            self.peer_params.stateless_reset_token,
4028        )?;
4029
4030        let mut packet = decrypted_header.packet?;
4031        packet_crypto::decrypt_packet_body(
4032            &mut packet,
4033            &self.spaces,
4034            self.zero_rtt_crypto.as_ref(),
4035            self.key_phase,
4036            self.prev_crypto.as_ref(),
4037            self.next_crypto.as_ref(),
4038        )
4039        .ok()?;
4040
4041        Some(packet.payload.to_vec())
4042    }
4043
4044    /// The number of bytes of packets containing retransmittable frames that have not been
4045    /// acknowledged or declared lost.
4046    #[cfg(test)]
4047    pub(crate) fn bytes_in_flight(&self) -> u64 {
4048        self.path.in_flight.bytes
4049    }
4050
4051    /// Number of bytes worth of non-ack-only packets that may be sent
4052    #[cfg(test)]
4053    pub(crate) fn congestion_window(&self) -> u64 {
4054        self.path
4055            .congestion
4056            .window()
4057            .saturating_sub(self.path.in_flight.bytes)
4058    }
4059
4060    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
4061    #[cfg(test)]
4062    pub(crate) fn is_idle(&self) -> bool {
4063        Timer::VALUES
4064            .iter()
4065            .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
4066            .filter_map(|&t| Some((t, self.timers.get(t)?)))
4067            .min_by_key(|&(_, time)| time)
4068            .map_or(true, |(timer, _)| timer == Timer::Idle)
4069    }
4070
4071    /// Total number of outgoing packets that have been deemed lost
4072    #[cfg(test)]
4073    pub(crate) fn lost_packets(&self) -> u64 {
4074        self.lost_packets
4075    }
4076
4077    /// Whether explicit congestion notification is in use on outgoing packets.
4078    #[cfg(test)]
4079    pub(crate) fn using_ecn(&self) -> bool {
4080        self.path.sending_ecn
4081    }
4082
4083    /// The number of received bytes in the current path
4084    #[cfg(test)]
4085    pub(crate) fn total_recvd(&self) -> u64 {
4086        self.path.total_recvd
4087    }
4088
4089    #[cfg(test)]
4090    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
4091        self.local_cid_state.active_seq()
4092    }
4093
4094    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
4095    /// with updated `retire_prior_to` field set to `v`
4096    #[cfg(test)]
4097    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
4098        let n = self.local_cid_state.assign_retire_seq(v);
4099        self.endpoint_events
4100            .push_back(EndpointEventInner::NeedIdentifiers(now, n));
4101    }
4102
4103    /// Check the current active remote CID sequence
4104    #[cfg(test)]
4105    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
4106        self.rem_cids.active_seq()
4107    }
4108
4109    /// Returns the detected maximum udp payload size for the current path
4110    #[cfg(test)]
4111    pub(crate) fn path_mtu(&self) -> u16 {
4112        self.path.current_mtu()
4113    }
4114
4115    /// Whether we have 1-RTT data to send
4116    ///
4117    /// See also `self.space(SpaceId::Data).can_send()`
4118    fn can_send_1rtt(&self, max_size: usize) -> bool {
4119        self.streams.can_send_stream_data()
4120            || self.path.challenge_pending
4121            || self
4122                .prev_path
4123                .as_ref()
4124                .is_some_and(|(_, x)| x.challenge_pending)
4125            || !self.path_responses.is_empty()
4126            || self
4127                .datagrams
4128                .outgoing
4129                .front()
4130                .is_some_and(|x| x.size(true) <= max_size)
4131    }
4132
4133    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
4134    fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
4135        // Visit known paths from newest to oldest to find the one `pn` was sent on
4136        for path in [&mut self.path]
4137            .into_iter()
4138            .chain(self.prev_path.as_mut().map(|(_, data)| data))
4139        {
4140            if path.remove_in_flight(pn, packet) {
4141                return;
4142            }
4143        }
4144    }
4145
4146    /// Terminate the connection instantly, without sending a close packet
4147    fn kill(&mut self, reason: ConnectionError) {
4148        self.close_common();
4149        self.error = Some(reason);
4150        self.state = State::Drained;
4151        self.endpoint_events.push_back(EndpointEventInner::Drained);
4152    }
4153
4154    /// Storage size required for the largest packet known to be supported by the current path
4155    ///
4156    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
4157    pub fn current_mtu(&self) -> u16 {
4158        self.path.current_mtu()
4159    }
4160
4161    /// Size of non-frame data for a 1-RTT packet
4162    ///
4163    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
4164    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
4165    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
4166    /// latency and packet loss.
4167    fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
4168        let pn_len = match pn {
4169            Some(pn) => PacketNumber::new(
4170                pn,
4171                self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
4172            )
4173            .len(),
4174            // Upper bound
4175            None => 4,
4176        };
4177
4178        // 1 byte for flags
4179        1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
4180    }
4181
4182    fn tag_len_1rtt(&self) -> usize {
4183        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
4184            Some(crypto) => Some(&*crypto.packet.local),
4185            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
4186        };
4187        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
4188        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
4189        // but that would needlessly prevent sending datagrams during 0-RTT.
4190        key.map_or(16, |x| x.tag_len())
4191    }
4192
4193    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
4194    fn on_path_validated(&mut self) {
4195        self.path.validated = true;
4196        let ConnectionSide::Server { server_config } = &self.side else {
4197            return;
4198        };
4199        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
4200        new_tokens.clear();
4201        for _ in 0..server_config.validation_token.sent {
4202            new_tokens.push(self.path.remote);
4203        }
4204    }
4205}
4206
4207impl fmt::Debug for Connection {
4208    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4209        f.debug_struct("Connection")
4210            .field("handshake_cid", &self.handshake_cid)
4211            .finish()
4212    }
4213}
4214
4215/// Fields of `Connection` specific to it being client-side or server-side
4216enum ConnectionSide {
4217    Client {
4218        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
4219        token: Bytes,
4220        token_store: Arc<dyn TokenStore>,
4221        server_name: String,
4222    },
4223    Server {
4224        server_config: Arc<ServerConfig>,
4225    },
4226}
4227
4228impl ConnectionSide {
4229    fn remote_may_migrate(&self) -> bool {
4230        match self {
4231            Self::Server { server_config } => server_config.migration,
4232            Self::Client { .. } => false,
4233        }
4234    }
4235
4236    fn is_client(&self) -> bool {
4237        self.side().is_client()
4238    }
4239
4240    fn is_server(&self) -> bool {
4241        self.side().is_server()
4242    }
4243
4244    fn side(&self) -> Side {
4245        match *self {
4246            Self::Client { .. } => Side::Client,
4247            Self::Server { .. } => Side::Server,
4248        }
4249    }
4250}
4251
4252impl From<SideArgs> for ConnectionSide {
4253    fn from(side: SideArgs) -> Self {
4254        match side {
4255            SideArgs::Client {
4256                token_store,
4257                server_name,
4258            } => Self::Client {
4259                token: token_store.take(&server_name).unwrap_or_default(),
4260                token_store,
4261                server_name,
4262            },
4263            SideArgs::Server {
4264                server_config,
4265                pref_addr_cid: _,
4266                path_validated: _,
4267            } => Self::Server { server_config },
4268        }
4269    }
4270}
4271
4272/// Parameters to `Connection::new` specific to it being client-side or server-side
4273pub(crate) enum SideArgs {
4274    Client {
4275        token_store: Arc<dyn TokenStore>,
4276        server_name: String,
4277    },
4278    Server {
4279        server_config: Arc<ServerConfig>,
4280        pref_addr_cid: Option<ConnectionId>,
4281        path_validated: bool,
4282    },
4283}
4284
4285impl SideArgs {
4286    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
4287        match *self {
4288            Self::Client { .. } => None,
4289            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
4290        }
4291    }
4292
4293    pub(crate) fn path_validated(&self) -> bool {
4294        match *self {
4295            Self::Client { .. } => true,
4296            Self::Server { path_validated, .. } => path_validated,
4297        }
4298    }
4299
4300    pub(crate) fn side(&self) -> Side {
4301        match *self {
4302            Self::Client { .. } => Side::Client,
4303            Self::Server { .. } => Side::Server,
4304        }
4305    }
4306}
4307
4308/// Reasons why a connection might be lost
4309#[derive(Debug, Error, Clone, PartialEq, Eq)]
4310pub enum ConnectionError {
4311    /// The peer doesn't implement any supported version
4312    #[error("peer doesn't implement any supported version")]
4313    VersionMismatch,
4314    /// The peer violated the QUIC specification as understood by this implementation
4315    #[error(transparent)]
4316    TransportError(#[from] TransportError),
4317    /// The peer's QUIC stack aborted the connection automatically
4318    #[error("aborted by peer: {0}")]
4319    ConnectionClosed(frame::ConnectionClose),
4320    /// The peer closed the connection
4321    #[error("closed by peer: {0}")]
4322    ApplicationClosed(frame::ApplicationClose),
4323    /// The peer is unable to continue processing this connection, usually due to having restarted
4324    #[error("reset by peer")]
4325    Reset,
4326    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
4327    ///
4328    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
4329    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
4330    /// and [`TransportConfig::keep_alive_interval()`].
4331    #[error("timed out")]
4332    TimedOut,
4333    /// The local application closed the connection
4334    #[error("closed")]
4335    LocallyClosed,
4336    /// The connection could not be created because not enough of the CID space is available
4337    ///
4338    /// Try using longer connection IDs.
4339    #[error("CIDs exhausted")]
4340    CidsExhausted,
4341}
4342
4343impl From<Close> for ConnectionError {
4344    fn from(x: Close) -> Self {
4345        match x {
4346            Close::Connection(reason) => Self::ConnectionClosed(reason),
4347            Close::Application(reason) => Self::ApplicationClosed(reason),
4348        }
4349    }
4350}
4351
4352// For compatibility with API consumers
4353impl From<ConnectionError> for io::Error {
4354    fn from(x: ConnectionError) -> Self {
4355        use ConnectionError::*;
4356        let kind = match x {
4357            TimedOut => io::ErrorKind::TimedOut,
4358            Reset => io::ErrorKind::ConnectionReset,
4359            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
4360            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
4361                io::ErrorKind::Other
4362            }
4363        };
4364        Self::new(kind, x)
4365    }
4366}
4367
4368#[allow(unreachable_pub)] // fuzzing only
4369#[derive(Clone)]
4370pub enum State {
4371    Handshake(state::Handshake),
4372    Established,
4373    Closed(state::Closed),
4374    Draining,
4375    /// Waiting for application to call close so we can dispose of the resources
4376    Drained,
4377}
4378
4379impl State {
4380    fn closed<R: Into<Close>>(reason: R) -> Self {
4381        Self::Closed(state::Closed {
4382            reason: reason.into(),
4383        })
4384    }
4385
4386    fn is_handshake(&self) -> bool {
4387        matches!(*self, Self::Handshake(_))
4388    }
4389
4390    fn is_established(&self) -> bool {
4391        matches!(*self, Self::Established)
4392    }
4393
4394    fn is_closed(&self) -> bool {
4395        matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
4396    }
4397
4398    fn is_drained(&self) -> bool {
4399        matches!(*self, Self::Drained)
4400    }
4401}
4402
4403mod state {
4404    use super::*;
4405
4406    #[allow(unreachable_pub)] // fuzzing only
4407    #[derive(Clone)]
4408    pub struct Handshake {
4409        /// Whether the remote CID has been set by the peer yet
4410        ///
4411        /// Always set for servers
4412        pub(super) rem_cid_set: bool,
4413        /// Stateless retry token received in the first Initial by a server.
4414        ///
4415        /// Must be present in every Initial. Always empty for clients.
4416        pub(super) expected_token: Bytes,
4417        /// First cryptographic message
4418        ///
4419        /// Only set for clients
4420        pub(super) client_hello: Option<Bytes>,
4421    }
4422
4423    #[allow(unreachable_pub)] // fuzzing only
4424    #[derive(Clone)]
4425    pub struct Closed {
4426        pub(super) reason: Close,
4427    }
4428}
4429
4430/// Events of interest to the application
4431#[derive(Debug)]
4432pub enum Event {
4433    /// The connection's handshake data is ready
4434    HandshakeDataReady,
4435    /// The connection was successfully established
4436    Connected,
4437    /// The connection was lost
4438    ///
4439    /// Emitted if the peer closes the connection or an error is encountered.
4440    ConnectionLost {
4441        /// Reason that the connection was closed
4442        reason: ConnectionError,
4443    },
4444    /// Stream events
4445    Stream(StreamEvent),
4446    /// One or more application datagrams have been received
4447    DatagramReceived,
4448    /// One or more application datagrams have been sent after blocking
4449    DatagramsUnblocked,
4450}
4451
4452fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
4453    if x > y { x - y } else { Duration::ZERO }
4454}
4455
4456fn get_max_ack_delay(params: &TransportParameters) -> Duration {
4457    Duration::from_micros(params.max_ack_delay.0 * 1000)
4458}
4459
4460// Prevents overflow and improves behavior in extreme circumstances
4461const MAX_BACKOFF_EXPONENT: u32 = 16;
4462
4463/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
4464///
4465/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
4466/// plus some space for frames. We only care about handshake headers because short header packets
4467/// necessarily have smaller headers, and initial packets are only ever the first packet in a
4468/// datagram (because we coalesce in ascending packet space order and the only reason to split a
4469/// packet is when packet space changes).
4470const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
4471
4472/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
4473///
4474/// Excludes packet-type-specific fields such as packet number or Initial token
4475// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
4476// scid len + scid + length + pn
4477const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
4478    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
4479
4480/// Perform key updates this many packets before the AEAD confidentiality limit.
4481///
4482/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
4483const KEY_UPDATE_MARGIN: u64 = 10_000;
4484
4485#[derive(Default)]
4486struct SentFrames {
4487    retransmits: ThinRetransmits,
4488    largest_acked: Option<u64>,
4489    stream_frames: StreamMetaVec,
4490    /// Whether the packet contains non-retransmittable frames (like datagrams)
4491    non_retransmits: bool,
4492    requires_padding: bool,
4493}
4494
4495impl SentFrames {
4496    /// Returns whether the packet contains only ACKs
4497    fn is_ack_only(&self, streams: &StreamsState) -> bool {
4498        self.largest_acked.is_some()
4499            && !self.non_retransmits
4500            && self.stream_frames.is_empty()
4501            && self.retransmits.is_empty(streams)
4502    }
4503}
4504
4505/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
4506///
4507/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
4508///
4509/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
4510///
4511/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
4512fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
4513    match (x, y) {
4514        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
4515        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
4516        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
4517        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
4518    }
4519}
4520
4521#[cfg(test)]
4522mod tests {
4523    use super::*;
4524
4525    #[test]
4526    fn negotiate_max_idle_timeout_commutative() {
4527        let test_params = [
4528            (None, None, None),
4529            (None, Some(VarInt(0)), None),
4530            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
4531            (Some(VarInt(0)), Some(VarInt(0)), None),
4532            (
4533                Some(VarInt(2)),
4534                Some(VarInt(0)),
4535                Some(Duration::from_millis(2)),
4536            ),
4537            (
4538                Some(VarInt(1)),
4539                Some(VarInt(4)),
4540                Some(Duration::from_millis(1)),
4541            ),
4542        ];
4543
4544        for (left, right, result) in test_params {
4545            assert_eq!(negotiate_max_idle_timeout(left, right), result);
4546            assert_eq!(negotiate_max_idle_timeout(right, left), result);
4547        }
4548    }
4549}