ant_quic/connection/
mod.rs

1use std::{
2    cmp,
3    collections::VecDeque,
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12// Removed qlog feature
13
14use rand::{Rng, SeedableRng, rngs::StdRng};
15use thiserror::Error;
16use tracing::{debug, error, info, trace, trace_span, warn};
17
18use crate::{
19    Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
20    MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
21    TransportErrorCode, VarInt,
22    cid_generator::ConnectionIdGenerator,
23    cid_queue::CidQueue,
24    coding::BufMutExt,
25    config::{ServerConfig, TransportConfig},
26    crypto::{self, KeyPair, Keys, PacketKey},
27    frame::{self, Close, Datagram, FrameStruct, NewToken},
28    packet::{
29        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
30        PacketNumber, PartialDecode, SpaceId,
31    },
32    range_set::ArrayRangeSet,
33    shared::{
34        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
35        EndpointEvent, EndpointEventInner,
36    },
37    token::{ResetToken, Token, TokenPayload},
38    transport_parameters::TransportParameters,
39};
40
41mod ack_frequency;
42use ack_frequency::AckFrequencyState;
43
44pub(crate) mod nat_traversal;
45pub(crate) use nat_traversal::{NatTraversalRole, NatTraversalError, CoordinationPhase};
46use nat_traversal::NatTraversalState;
47
48mod assembler;
49pub use assembler::Chunk;
50
51mod cid_state;
52use cid_state::CidState;
53
54mod datagrams;
55use datagrams::DatagramState;
56pub use datagrams::{Datagrams, SendDatagramError};
57
58mod mtud;
59mod pacing;
60
61mod packet_builder;
62use packet_builder::PacketBuilder;
63
64mod packet_crypto;
65use packet_crypto::{PrevCrypto, ZeroRttCrypto};
66
67mod paths;
68pub use paths::RttEstimator;
69use paths::{PathData, PathResponses, NatTraversalChallenges};
70
71mod send_buffer;
72
73mod spaces;
74#[cfg(fuzzing)]
75pub use spaces::Retransmits;
76#[cfg(not(fuzzing))]
77use spaces::Retransmits;
78use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
79
80mod stats;
81pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
82
83mod streams;
84#[cfg(fuzzing)]
85pub use streams::StreamsState;
86#[cfg(not(fuzzing))]
87use streams::StreamsState;
88pub use streams::{
89    Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
90    ShouldTransmit, StreamEvent, Streams, WriteError, Written,
91};
92
93mod timer;
94use crate::congestion::Controller;
95use timer::{Timer, TimerTable};
96
97/// Protocol state and logic for a single QUIC connection
98///
99/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
100/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
101/// expects timeouts through various methods. A number of simple getter methods are exposed
102/// to allow callers to inspect some of the connection state.
103///
104/// `Connection` has roughly 4 types of methods:
105///
106/// - A. Simple getters, taking `&self`
107/// - B. Handlers for incoming events from the network or system, named `handle_*`.
108/// - C. State machine mutators, for incoming commands from the application. For convenience we
109///   refer to this as "performing I/O" below, however as per the design of this library none of the
110///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
111///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
112/// - D. Polling functions for outgoing events or actions for the caller to
113///   take, named `poll_*`.
114///
115/// The simplest way to use this API correctly is to call (B) and (C) whenever
116/// appropriate, then after each of those calls, as soon as feasible call all
117/// polling methods (D) and deal with their outputs appropriately, e.g. by
118/// passing it to the application or by making a system-level I/O call. You
119/// should call the polling functions in this order:
120///
121/// 1. [`poll_transmit`](Self::poll_transmit)
122/// 2. [`poll_timeout`](Self::poll_timeout)
123/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
124/// 4. [`poll`](Self::poll)
125///
126/// Currently the only actual dependency is from (2) to (1), however additional
127/// dependencies may be added in future, so the above order is recommended.
128///
129/// (A) may be called whenever desired.
130///
131/// Care should be made to ensure that the input events represent monotonically
132/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
133/// with events of the same [`Instant`] may be interleaved in any order with a
134/// call to [`handle_event`](Self::handle_event) at that same instant; however
135/// events or timeouts with different instants must not be interleaved.
136pub struct Connection {
137    endpoint_config: Arc<EndpointConfig>,
138    config: Arc<TransportConfig>,
139    rng: StdRng,
140    crypto: Box<dyn crypto::Session>,
141    /// The CID we initially chose, for use during the handshake
142    handshake_cid: ConnectionId,
143    /// The CID the peer initially chose, for use during the handshake
144    rem_handshake_cid: ConnectionId,
145    /// The "real" local IP address which was was used to receive the initial packet.
146    /// This is only populated for the server case, and if known
147    local_ip: Option<IpAddr>,
148    path: PathData,
149    /// Whether MTU detection is supported in this environment
150    allow_mtud: bool,
151    prev_path: Option<(ConnectionId, PathData)>,
152    state: State,
153    side: ConnectionSide,
154    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
155    zero_rtt_enabled: bool,
156    /// Set if 0-RTT is supported, then cleared when no longer needed.
157    zero_rtt_crypto: Option<ZeroRttCrypto>,
158    key_phase: bool,
159    /// How many packets are in the current key phase. Used only for `Data` space.
160    key_phase_size: u64,
161    /// Transport parameters set by the peer
162    peer_params: TransportParameters,
163    /// Source ConnectionId of the first packet received from the peer
164    orig_rem_cid: ConnectionId,
165    /// Destination ConnectionId sent by the client on the first Initial
166    initial_dst_cid: ConnectionId,
167    /// The value that the server included in the Source Connection ID field of a Retry packet, if
168    /// one was received
169    retry_src_cid: Option<ConnectionId>,
170    /// Total number of outgoing packets that have been deemed lost
171    lost_packets: u64,
172    events: VecDeque<Event>,
173    endpoint_events: VecDeque<EndpointEventInner>,
174    /// Whether the spin bit is in use for this connection
175    spin_enabled: bool,
176    /// Outgoing spin bit state
177    spin: bool,
178    /// Packet number spaces: initial, handshake, 1-RTT
179    spaces: [PacketSpace; 3],
180    /// Highest usable packet number space
181    highest_space: SpaceId,
182    /// 1-RTT keys used prior to a key update
183    prev_crypto: Option<PrevCrypto>,
184    /// 1-RTT keys to be used for the next key update
185    ///
186    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
187    /// spoofing key updates.
188    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
189    accepted_0rtt: bool,
190    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
191    permit_idle_reset: bool,
192    /// Negotiated idle timeout
193    idle_timeout: Option<Duration>,
194    timers: TimerTable,
195    /// Number of packets received which could not be authenticated
196    authentication_failures: u64,
197    /// Why the connection was lost, if it has been
198    error: Option<ConnectionError>,
199    /// Identifies Data-space packet numbers to skip. Not used in earlier spaces.
200    packet_number_filter: PacketNumberFilter,
201
202    //
203    // Queued non-retransmittable 1-RTT data
204    //
205    /// Responses to PATH_CHALLENGE frames
206    path_responses: PathResponses,
207    /// Challenges for NAT traversal candidate validation
208    nat_traversal_challenges: NatTraversalChallenges,
209    close: bool,
210
211    //
212    // ACK frequency
213    //
214    ack_frequency: AckFrequencyState,
215
216    //
217    // Loss Detection
218    //
219    /// The number of times a PTO has been sent without receiving an ack.
220    pto_count: u32,
221
222    //
223    // Congestion Control
224    //
225    /// Whether the most recently received packet had an ECN codepoint set
226    receiving_ecn: bool,
227    /// Number of packets authenticated
228    total_authed_packets: u64,
229    /// Whether the last `poll_transmit` call yielded no data because there was
230    /// no outgoing application data.
231    app_limited: bool,
232
233    streams: StreamsState,
234    /// Surplus remote CIDs for future use on new paths
235    rem_cids: CidQueue,
236    // Attributes of CIDs generated by local peer
237    local_cid_state: CidState,
238    /// State of the unreliable datagram extension
239    datagrams: DatagramState,
240    /// Connection level statistics
241    stats: ConnectionStats,
242    /// QUIC version used for the connection.
243    version: u32,
244
245    /// NAT traversal state for establishing direct P2P connections
246    nat_traversal: Option<NatTraversalState>,
247    
248    /// Qlog writer
249    #[cfg(feature = "__qlog")]
250    qlog_streamer: Option<Box<dyn std::io::Write + Send + Sync>>,
251}
252
253impl Connection {
254    pub(crate) fn new(
255        endpoint_config: Arc<EndpointConfig>,
256        config: Arc<TransportConfig>,
257        init_cid: ConnectionId,
258        loc_cid: ConnectionId,
259        rem_cid: ConnectionId,
260        remote: SocketAddr,
261        local_ip: Option<IpAddr>,
262        crypto: Box<dyn crypto::Session>,
263        cid_gen: &dyn ConnectionIdGenerator,
264        now: Instant,
265        version: u32,
266        allow_mtud: bool,
267        rng_seed: [u8; 32],
268        side_args: SideArgs,
269    ) -> Self {
270        let pref_addr_cid = side_args.pref_addr_cid();
271        let path_validated = side_args.path_validated();
272        let connection_side = ConnectionSide::from(side_args);
273        let side = connection_side.side();
274        let initial_space = PacketSpace {
275            crypto: Some(crypto.initial_keys(&init_cid, side)),
276            ..PacketSpace::new(now)
277        };
278        let state = State::Handshake(state::Handshake {
279            rem_cid_set: side.is_server(),
280            expected_token: Bytes::new(),
281            client_hello: None,
282        });
283        let mut rng = StdRng::from_seed(rng_seed);
284        let mut this = Self {
285            endpoint_config,
286            crypto,
287            handshake_cid: loc_cid,
288            rem_handshake_cid: rem_cid,
289            local_cid_state: CidState::new(
290                cid_gen.cid_len(),
291                cid_gen.cid_lifetime(),
292                now,
293                if pref_addr_cid.is_some() { 2 } else { 1 },
294            ),
295            path: PathData::new(remote, allow_mtud, None, now, &config),
296            allow_mtud,
297            local_ip,
298            prev_path: None,
299            state,
300            side: connection_side,
301            zero_rtt_enabled: false,
302            zero_rtt_crypto: None,
303            key_phase: false,
304            // A small initial key phase size ensures peers that don't handle key updates correctly
305            // fail sooner rather than later. It's okay for both peers to do this, as the first one
306            // to perform an update will reset the other's key phase size in `update_keys`, and a
307            // simultaneous key update by both is just like a regular key update with a really fast
308            // response. Inspired by quic-go's similar behavior of performing the first key update
309            // at the 100th short-header packet.
310            key_phase_size: rng.gen_range(10..1000),
311            peer_params: TransportParameters::default(),
312            orig_rem_cid: rem_cid,
313            initial_dst_cid: init_cid,
314            retry_src_cid: None,
315            lost_packets: 0,
316            events: VecDeque::new(),
317            endpoint_events: VecDeque::new(),
318            spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
319            spin: false,
320            spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
321            highest_space: SpaceId::Initial,
322            prev_crypto: None,
323            next_crypto: None,
324            accepted_0rtt: false,
325            permit_idle_reset: true,
326            idle_timeout: match config.max_idle_timeout {
327                None | Some(VarInt(0)) => None,
328                Some(dur) => Some(Duration::from_millis(dur.0)),
329            },
330            timers: TimerTable::default(),
331            authentication_failures: 0,
332            error: None,
333            #[cfg(test)]
334            packet_number_filter: match config.deterministic_packet_numbers {
335                false => PacketNumberFilter::new(&mut rng),
336                true => PacketNumberFilter::disabled(),
337            },
338            #[cfg(not(test))]
339            packet_number_filter: PacketNumberFilter::new(&mut rng),
340
341            path_responses: PathResponses::default(),
342            nat_traversal_challenges: NatTraversalChallenges::default(),
343            close: false,
344
345            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
346                &TransportParameters::default(),
347            )),
348
349            pto_count: 0,
350
351            app_limited: false,
352            receiving_ecn: false,
353            total_authed_packets: 0,
354
355            streams: StreamsState::new(
356                side,
357                config.max_concurrent_uni_streams,
358                config.max_concurrent_bidi_streams,
359                config.send_window,
360                config.receive_window,
361                config.stream_receive_window,
362            ),
363            datagrams: DatagramState::default(),
364            config,
365            rem_cids: CidQueue::new(rem_cid),
366            rng,
367            stats: ConnectionStats::default(),
368            version,
369            nat_traversal: None, // Will be initialized when NAT traversal is negotiated
370
371            #[cfg(feature = "__qlog")]
372            qlog_streamer: None,
373        };
374        if path_validated {
375            this.on_path_validated();
376        }
377        if side.is_client() {
378            // Kick off the connection
379            this.write_crypto();
380            this.init_0rtt();
381        }
382        this
383    }
384
385    /// Set up qlog for this connection
386    #[cfg(feature = "__qlog")]
387    pub fn set_qlog(
388        &mut self,
389        writer: Box<dyn std::io::Write + Send + Sync>,
390        _title: Option<String>,
391        _description: Option<String>,
392        _now: Instant,
393    ) {
394        self.qlog_streamer = Some(writer);
395    }
396
397    /// Emit qlog recovery metrics
398    #[cfg(feature = "__qlog")]
399    fn emit_qlog_recovery_metrics(&mut self, _now: Instant) {
400        // TODO: Implement actual qlog recovery metrics emission
401        // For now, this is a stub to allow compilation
402    }
403
404    /// Returns the next time at which `handle_timeout` should be called
405    ///
406    /// The value returned may change after:
407    /// - the application performed some I/O on the connection
408    /// - a call was made to `handle_event`
409    /// - a call to `poll_transmit` returned `Some`
410    /// - a call was made to `handle_timeout`
411    #[must_use]
412    pub fn poll_timeout(&mut self) -> Option<Instant> {
413        let mut next_timeout = self.timers.next_timeout();
414        
415        // Check NAT traversal timeouts
416        if let Some(nat_state) = &self.nat_traversal {
417            if let Some(nat_timeout) = nat_state.get_next_timeout(Instant::now()) {
418                // Schedule NAT traversal timer
419                self.timers.set(Timer::NatTraversal, nat_timeout);
420                next_timeout = Some(next_timeout.map_or(nat_timeout, |t| t.min(nat_timeout)));
421            }
422        }
423        
424        next_timeout
425    }
426
427    /// Returns application-facing events
428    ///
429    /// Connections should be polled for events after:
430    /// - a call was made to `handle_event`
431    /// - a call was made to `handle_timeout`
432    #[must_use]
433    pub fn poll(&mut self) -> Option<Event> {
434        if let Some(x) = self.events.pop_front() {
435            return Some(x);
436        }
437
438        if let Some(event) = self.streams.poll() {
439            return Some(Event::Stream(event));
440        }
441
442        if let Some(err) = self.error.take() {
443            return Some(Event::ConnectionLost { reason: err });
444        }
445
446        None
447    }
448
449    /// Return endpoint-facing events
450    #[must_use]
451    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
452        self.endpoint_events.pop_front().map(EndpointEvent)
453    }
454
455    /// Provide control over streams
456    #[must_use]
457    pub fn streams(&mut self) -> Streams<'_> {
458        Streams {
459            state: &mut self.streams,
460            conn_state: &self.state,
461        }
462    }
463
464    /// Provide control over streams
465    #[must_use]
466    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
467        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
468        RecvStream {
469            id,
470            state: &mut self.streams,
471            pending: &mut self.spaces[SpaceId::Data].pending,
472        }
473    }
474
475    /// Provide control over streams
476    #[must_use]
477    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
478        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
479        SendStream {
480            id,
481            state: &mut self.streams,
482            pending: &mut self.spaces[SpaceId::Data].pending,
483            conn_state: &self.state,
484        }
485    }
486
487    /// Returns packets to transmit
488    ///
489    /// Connections should be polled for transmit after:
490    /// - the application performed some I/O on the connection
491    /// - a call was made to `handle_event`
492    /// - a call was made to `handle_timeout`
493    ///
494    /// `max_datagrams` specifies how many datagrams can be returned inside a
495    /// single Transmit using GSO. This must be at least 1.
496    #[must_use]
497    pub fn poll_transmit(
498        &mut self,
499        now: Instant,
500        max_datagrams: usize,
501        buf: &mut Vec<u8>,
502    ) -> Option<Transmit> {
503        assert!(max_datagrams != 0);
504        let max_datagrams = match self.config.enable_segmentation_offload {
505            false => 1,
506            true => max_datagrams,
507        };
508
509        let mut num_datagrams = 0;
510        // Position in `buf` of the first byte of the current UDP datagram. When coalescing QUIC
511        // packets, this can be earlier than the start of the current QUIC packet.
512        let mut datagram_start = 0;
513        let mut segment_size = usize::from(self.path.current_mtu());
514
515        // Check for NAT traversal coordination timeouts
516        if let Some(nat_traversal) = &mut self.nat_traversal {
517            if nat_traversal.check_coordination_timeout(now) {
518                trace!("NAT traversal coordination timed out, may retry");
519            }
520        }
521
522        // First priority: NAT traversal PATH_CHALLENGE packets (includes coordination)
523        if let Some(challenge) = self.send_nat_traversal_challenge(now, buf) {
524            return Some(challenge);
525        }
526        
527        if let Some(challenge) = self.send_path_challenge(now, buf) {
528            return Some(challenge);
529        }
530
531        // If we need to send a probe, make sure we have something to send.
532        for space in SpaceId::iter() {
533            let request_immediate_ack =
534                space == SpaceId::Data && self.peer_supports_ack_frequency();
535            self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
536        }
537
538        // Check whether we need to send a close message
539        let close = match self.state {
540            State::Drained => {
541                self.app_limited = true;
542                return None;
543            }
544            State::Draining | State::Closed(_) => {
545                // self.close is only reset once the associated packet had been
546                // encoded successfully
547                if !self.close {
548                    self.app_limited = true;
549                    return None;
550                }
551                true
552            }
553            _ => false,
554        };
555
556        // Check whether we need to send an ACK_FREQUENCY frame
557        if let Some(config) = &self.config.ack_frequency_config {
558            self.spaces[SpaceId::Data].pending.ack_frequency = self
559                .ack_frequency
560                .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
561                && self.highest_space == SpaceId::Data
562                && self.peer_supports_ack_frequency();
563        }
564
565        // Reserving capacity can provide more capacity than we asked for. However, we are not
566        // allowed to write more than `segment_size`. Therefore the maximum capacity is tracked
567        // separately.
568        let mut buf_capacity = 0;
569
570        let mut coalesce = true;
571        let mut builder_storage: Option<PacketBuilder> = None;
572        let mut sent_frames = None;
573        let mut pad_datagram = false;
574        let mut pad_datagram_to_mtu = false;
575        let mut congestion_blocked = false;
576
577        // Iterate over all spaces and find data to send
578        let mut space_idx = 0;
579        let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
580        // This loop will potentially spend multiple iterations in the same `SpaceId`,
581        // so we cannot trivially rewrite it to take advantage of `SpaceId::iter()`.
582        while space_idx < spaces.len() {
583            let space_id = spaces[space_idx];
584            // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed to
585            // be able to send an individual frame at least this large in the next 1-RTT
586            // packet. This could be generalized to support every space, but it's only needed to
587            // handle large fixed-size frames, which only exist in 1-RTT (application datagrams). We
588            // don't account for coalesced packets potentially occupying space because frames can
589            // always spill into the next datagram.
590            let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
591            let frame_space_1rtt =
592                segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
593
594            // Is there data or a close message to send in this space?
595            let can_send = self.space_can_send(space_id, frame_space_1rtt);
596            if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
597                space_idx += 1;
598                continue;
599            }
600
601            let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
602                || self.spaces[space_id].ping_pending
603                || self.spaces[space_id].immediate_ack_pending;
604            if space_id == SpaceId::Data {
605                ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
606            }
607
608            pad_datagram_to_mtu |= space_id == SpaceId::Data && self.config.pad_to_mtu;
609
610            // Can we append more data into the current buffer?
611            // It is not safe to assume that `buf.len()` is the end of the data,
612            // since the last packet might not have been finished.
613            let buf_end = if let Some(builder) = &builder_storage {
614                buf.len().max(builder.min_size) + builder.tag_len
615            } else {
616                buf.len()
617            };
618
619            let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
620                crypto.packet.local.tag_len()
621            } else if space_id == SpaceId::Data {
622                self.zero_rtt_crypto.as_ref().expect(
623                    "sending packets in the application data space requires known 0-RTT or 1-RTT keys",
624                ).packet.tag_len()
625            } else {
626                unreachable!("tried to send {:?} packet without keys", space_id)
627            };
628            if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
629                // We need to send 1 more datagram and extend the buffer for that.
630
631                // Is 1 more datagram allowed?
632                if num_datagrams >= max_datagrams {
633                    // No more datagrams allowed
634                    break;
635                }
636
637                // Anti-amplification is only based on `total_sent`, which gets
638                // updated at the end of this method. Therefore we pass the amount
639                // of bytes for datagrams that are already created, as well as 1 byte
640                // for starting another datagram. If there is any anti-amplification
641                // budget left, we always allow a full MTU to be sent
642                // (see https://github.com/quinn-rs/quinn/issues/1082)
643                if self
644                    .path
645                    .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
646                {
647                    trace!("blocked by anti-amplification");
648                    break;
649                }
650
651                // Congestion control and pacing checks
652                // Tail loss probes must not be blocked by congestion, or a deadlock could arise
653                if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
654                    // Assume the current packet will get padded to fill the segment
655                    let untracked_bytes = if let Some(builder) = &builder_storage {
656                        buf_capacity - builder.partial_encode.start
657                    } else {
658                        0
659                    } as u64;
660                    debug_assert!(untracked_bytes <= segment_size as u64);
661
662                    let bytes_to_send = segment_size as u64 + untracked_bytes;
663                    if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
664                        space_idx += 1;
665                        congestion_blocked = true;
666                        // We continue instead of breaking here in order to avoid
667                        // blocking loss probes queued for higher spaces.
668                        trace!("blocked by congestion control");
669                        continue;
670                    }
671
672                    // Check whether the next datagram is blocked by pacing
673                    let smoothed_rtt = self.path.rtt.get();
674                    if let Some(delay) = self.path.pacing.delay(
675                        smoothed_rtt,
676                        bytes_to_send,
677                        self.path.current_mtu(),
678                        self.path.congestion.window(),
679                        now,
680                    ) {
681                        self.timers.set(Timer::Pacing, delay);
682                        congestion_blocked = true;
683                        // Loss probes should be subject to pacing, even though
684                        // they are not congestion controlled.
685                        trace!("blocked by pacing");
686                        break;
687                    }
688                }
689
690                // Finish current packet
691                if let Some(mut builder) = builder_storage.take() {
692                    if pad_datagram {
693                        builder.pad_to(MIN_INITIAL_SIZE);
694                    }
695
696                    if num_datagrams > 1 || pad_datagram_to_mtu {
697                        // If too many padding bytes would be required to continue the GSO batch
698                        // after this packet, end the GSO batch here. Ensures that fixed-size frames
699                        // with heterogeneous sizes (e.g. application datagrams) won't inadvertently
700                        // waste large amounts of bandwidth. The exact threshold is a bit arbitrary
701                        // and might benefit from further tuning, though there's no universally
702                        // optimal value.
703                        //
704                        // Additionally, if this datagram is a loss probe and `segment_size` is
705                        // larger than `INITIAL_MTU`, then padding it to `segment_size` to continue
706                        // the GSO batch would risk failure to recover from a reduction in path
707                        // MTU. Loss probes are the only packets for which we might grow
708                        // `buf_capacity` by less than `segment_size`.
709                        const MAX_PADDING: usize = 16;
710                        let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
711                            - datagram_start
712                            + builder.tag_len;
713                        if (packet_len_unpadded + MAX_PADDING < segment_size
714                            && !pad_datagram_to_mtu)
715                            || datagram_start + segment_size > buf_capacity
716                        {
717                            trace!(
718                                "GSO truncated by demand for {} padding bytes or loss probe",
719                                segment_size - packet_len_unpadded
720                            );
721                            builder_storage = Some(builder);
722                            break;
723                        }
724
725                        // Pad the current datagram to GSO segment size so it can be included in the
726                        // GSO batch.
727                        builder.pad_to(segment_size as u16);
728                    }
729
730                    builder.finish_and_track(now, self, sent_frames.take(), buf);
731
732                    if num_datagrams == 1 {
733                        // Set the segment size for this GSO batch to the size of the first UDP
734                        // datagram in the batch. Larger data that cannot be fragmented
735                        // (e.g. application datagrams) will be included in a future batch. When
736                        // sending large enough volumes of data for GSO to be useful, we expect
737                        // packet sizes to usually be consistent, e.g. populated by max-size STREAM
738                        // frames or uniformly sized datagrams.
739                        segment_size = buf.len();
740                        // Clip the unused capacity out of the buffer so future packets don't
741                        // overrun
742                        buf_capacity = buf.len();
743
744                        // Check whether the data we planned to send will fit in the reduced segment
745                        // size. If not, bail out and leave it for the next GSO batch so we don't
746                        // end up trying to send an empty packet. We can't easily compute the right
747                        // segment size before the original call to `space_can_send`, because at
748                        // that time we haven't determined whether we're going to coalesce with the
749                        // first datagram or potentially pad it to `MIN_INITIAL_SIZE`.
750                        if space_id == SpaceId::Data {
751                            let frame_space_1rtt =
752                                segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
753                            if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
754                                break;
755                            }
756                        }
757                    }
758                }
759
760                // Allocate space for another datagram
761                let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
762                    0 => segment_size,
763                    _ => {
764                        self.spaces[space_id].loss_probes -= 1;
765                        // Clamp the datagram to at most the minimum MTU to ensure that loss probes
766                        // can get through and enable recovery even if the path MTU has shrank
767                        // unexpectedly.
768                        std::cmp::min(segment_size, usize::from(INITIAL_MTU))
769                    }
770                };
771                buf_capacity += next_datagram_size_limit;
772                if buf.capacity() < buf_capacity {
773                    // We reserve the maximum space for sending `max_datagrams` upfront
774                    // to avoid any reallocations if more datagrams have to be appended later on.
775                    // Benchmarks have shown shown a 5-10% throughput improvement
776                    // compared to continuously resizing the datagram buffer.
777                    // While this will lead to over-allocation for small transmits
778                    // (e.g. purely containing ACKs), modern memory allocators
779                    // (e.g. mimalloc and jemalloc) will pool certain allocation sizes
780                    // and therefore this is still rather efficient.
781                    buf.reserve(max_datagrams * segment_size);
782                }
783                num_datagrams += 1;
784                coalesce = true;
785                pad_datagram = false;
786                datagram_start = buf.len();
787
788                debug_assert_eq!(
789                    datagram_start % segment_size,
790                    0,
791                    "datagrams in a GSO batch must be aligned to the segment size"
792                );
793            } else {
794                // We can append/coalesce the next packet into the current
795                // datagram.
796                // Finish current packet without adding extra padding
797                if let Some(builder) = builder_storage.take() {
798                    builder.finish_and_track(now, self, sent_frames.take(), buf);
799                }
800            }
801
802            debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
803
804            //
805            // From here on, we've determined that a packet will definitely be sent.
806            //
807
808            if self.spaces[SpaceId::Initial].crypto.is_some()
809                && space_id == SpaceId::Handshake
810                && self.side.is_client()
811            {
812                // A client stops both sending and processing Initial packets when it
813                // sends its first Handshake packet.
814                self.discard_space(now, SpaceId::Initial);
815            }
816            if let Some(ref mut prev) = self.prev_crypto {
817                prev.update_unacked = false;
818            }
819
820            debug_assert!(
821                builder_storage.is_none() && sent_frames.is_none(),
822                "Previous packet must have been finished"
823            );
824
825            let builder = builder_storage.insert(PacketBuilder::new(
826                now,
827                space_id,
828                self.rem_cids.active(),
829                buf,
830                buf_capacity,
831                datagram_start,
832                ack_eliciting,
833                self,
834            )?);
835            coalesce = coalesce && !builder.short_header;
836
837            // https://tools.ietf.org/html/draft-ietf-quic-transport-34#section-14.1
838            pad_datagram |=
839                space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
840
841            if close {
842                trace!("sending CONNECTION_CLOSE");
843                // Encode ACKs before the ConnectionClose message, to give the receiver
844                // a better approximate on what data has been processed. This is
845                // especially important with ack delay, since the peer might not
846                // have gotten any other ACK for the data earlier on.
847                if !self.spaces[space_id].pending_acks.ranges().is_empty() {
848                    Self::populate_acks(
849                        now,
850                        self.receiving_ecn,
851                        &mut SentFrames::default(),
852                        &mut self.spaces[space_id],
853                        buf,
854                        &mut self.stats,
855                    );
856                }
857
858                // Since there only 64 ACK frames there will always be enough space
859                // to encode the ConnectionClose frame too. However we still have the
860                // check here to prevent crashes if something changes.
861                debug_assert!(
862                    buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
863                    "ACKs should leave space for ConnectionClose"
864                );
865                if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
866                    let max_frame_size = builder.max_size - buf.len();
867                    match self.state {
868                        State::Closed(state::Closed { ref reason }) => {
869                            if space_id == SpaceId::Data || reason.is_transport_layer() {
870                                reason.encode(buf, max_frame_size)
871                            } else {
872                                frame::ConnectionClose {
873                                    error_code: TransportErrorCode::APPLICATION_ERROR,
874                                    frame_type: None,
875                                    reason: Bytes::new(),
876                                }
877                                .encode(buf, max_frame_size)
878                            }
879                        }
880                        State::Draining => frame::ConnectionClose {
881                            error_code: TransportErrorCode::NO_ERROR,
882                            frame_type: None,
883                            reason: Bytes::new(),
884                        }
885                        .encode(buf, max_frame_size),
886                        _ => unreachable!(
887                            "tried to make a close packet when the connection wasn't closed"
888                        ),
889                    }
890                }
891                if space_id == self.highest_space {
892                    // Don't send another close packet
893                    self.close = false;
894                    // `CONNECTION_CLOSE` is the final packet
895                    break;
896                } else {
897                    // Send a close frame in every possible space for robustness, per RFC9000
898                    // "Immediate Close during the Handshake". Don't bother trying to send anything
899                    // else.
900                    space_idx += 1;
901                    continue;
902                }
903            }
904
905            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path
906            // validation can occur while the link is saturated.
907            if space_id == SpaceId::Data && num_datagrams == 1 {
908                if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
909                    // `unwrap` guaranteed to succeed because `builder_storage` was populated just
910                    // above.
911                    let mut builder = builder_storage.take().unwrap();
912                    trace!("PATH_RESPONSE {:08x} (off-path)", token);
913                    buf.write(frame::FrameType::PATH_RESPONSE);
914                    buf.write(token);
915                    self.stats.frame_tx.path_response += 1;
916                    builder.pad_to(MIN_INITIAL_SIZE);
917                    builder.finish_and_track(
918                        now,
919                        self,
920                        Some(SentFrames {
921                            non_retransmits: true,
922                            ..SentFrames::default()
923                        }),
924                        buf,
925                    );
926                    self.stats.udp_tx.on_sent(1, buf.len());
927                    return Some(Transmit {
928                        destination: remote,
929                        size: buf.len(),
930                        ecn: None,
931                        segment_size: None,
932                        src_ip: self.local_ip,
933                    });
934                }
935            }
936
937            let sent =
938                self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
939
940            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
941            // any other reason, there is a bug which leads to one component announcing write
942            // readiness while not writing any data. This degrades performance. The condition is
943            // only checked if the full MTU is available and when potentially large fixed-size
944            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
945            // writing ACKs.
946            debug_assert!(
947                !(sent.is_ack_only(&self.streams)
948                    && !can_send.acks
949                    && can_send.other
950                    && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
951                    && self.datagrams.outgoing.is_empty()),
952                "SendableFrames was {can_send:?}, but only ACKs have been written"
953            );
954            pad_datagram |= sent.requires_padding;
955
956            if sent.largest_acked.is_some() {
957                self.spaces[space_id].pending_acks.acks_sent();
958                self.timers.stop(Timer::MaxAckDelay);
959            }
960
961            // Keep information about the packet around until it gets finalized
962            sent_frames = Some(sent);
963
964            // Don't increment space_idx.
965            // We stay in the current space and check if there is more data to send.
966        }
967
968        // Finish the last packet
969        if let Some(mut builder) = builder_storage {
970            if pad_datagram {
971                builder.pad_to(MIN_INITIAL_SIZE);
972            }
973
974            // If this datagram is a loss probe and `segment_size` is larger than `INITIAL_MTU`,
975            // then padding it to `segment_size` would risk failure to recover from a reduction in
976            // path MTU.
977            // Loss probes are the only packets for which we might grow `buf_capacity`
978            // by less than `segment_size`.
979            if pad_datagram_to_mtu && buf_capacity >= datagram_start + segment_size {
980                builder.pad_to(segment_size as u16);
981            }
982
983            let last_packet_number = builder.exact_number;
984            builder.finish_and_track(now, self, sent_frames, buf);
985            self.path
986                .congestion
987                .on_sent(now, buf.len() as u64, last_packet_number);
988
989            #[cfg(feature = "__qlog")]
990            self.emit_qlog_recovery_metrics(now);
991        }
992
993        self.app_limited = buf.is_empty() && !congestion_blocked;
994
995        // Send MTU probe if necessary
996        if buf.is_empty() && self.state.is_established() {
997            let space_id = SpaceId::Data;
998            let probe_size = self
999                .path
1000                .mtud
1001                .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
1002
1003            let buf_capacity = probe_size as usize;
1004            buf.reserve(buf_capacity);
1005
1006            let mut builder = PacketBuilder::new(
1007                now,
1008                space_id,
1009                self.rem_cids.active(),
1010                buf,
1011                buf_capacity,
1012                0,
1013                true,
1014                self,
1015            )?;
1016
1017            // We implement MTU probes as ping packets padded up to the probe size
1018            buf.write(frame::FrameType::PING);
1019            self.stats.frame_tx.ping += 1;
1020
1021            // If supported by the peer, we want no delays to the probe's ACK
1022            if self.peer_supports_ack_frequency() {
1023                buf.write(frame::FrameType::IMMEDIATE_ACK);
1024                self.stats.frame_tx.immediate_ack += 1;
1025            }
1026
1027            builder.pad_to(probe_size);
1028            let sent_frames = SentFrames {
1029                non_retransmits: true,
1030                ..Default::default()
1031            };
1032            builder.finish_and_track(now, self, Some(sent_frames), buf);
1033
1034            self.stats.path.sent_plpmtud_probes += 1;
1035            num_datagrams = 1;
1036
1037            trace!(?probe_size, "writing MTUD probe");
1038        }
1039
1040        if buf.is_empty() {
1041            return None;
1042        }
1043
1044        trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1045        self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1046
1047        self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
1048
1049        Some(Transmit {
1050            destination: self.path.remote,
1051            size: buf.len(),
1052            ecn: if self.path.sending_ecn {
1053                Some(EcnCodepoint::Ect0)
1054            } else {
1055                None
1056            },
1057            segment_size: match num_datagrams {
1058                1 => None,
1059                _ => Some(segment_size),
1060            },
1061            src_ip: self.local_ip,
1062        })
1063    }
1064
1065    /// Send PUNCH_ME_NOW for coordination if necessary
1066    fn send_coordination_request(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1067        // Get coordination info without borrowing mutably
1068        let should_send = self.nat_traversal.as_ref()?.should_send_punch_request();
1069        if !should_send {
1070            return None;
1071        }
1072        
1073        let (round, target_addrs, coordinator_addr) = {
1074            let nat_traversal = self.nat_traversal.as_ref()?;
1075            let coord = nat_traversal.coordination.as_ref()?;
1076            let addrs: Vec<_> = coord.punch_targets.iter().map(|t| t.remote_addr).collect();
1077            (coord.round, addrs, self.path.remote) // Placeholder - should be bootstrap node
1078        };
1079        
1080        if target_addrs.is_empty() {
1081            return None;
1082        }
1083        
1084        debug_assert_eq!(
1085            self.highest_space,
1086            SpaceId::Data,
1087            "PUNCH_ME_NOW queued without 1-RTT keys"
1088        );
1089        
1090        buf.reserve(MIN_INITIAL_SIZE as usize);
1091        let buf_capacity = buf.capacity();
1092        
1093        let mut builder = PacketBuilder::new(
1094            now,
1095            SpaceId::Data,
1096            self.rem_cids.active(),
1097            buf,
1098            buf_capacity,
1099            0,
1100            false,
1101            self,
1102        )?;
1103        
1104        trace!("sending PUNCH_ME_NOW round {} with {} targets", round, target_addrs.len());
1105        
1106        // Write PUNCH_ME_NOW frame
1107        buf.write(frame::FrameType::PUNCH_ME_NOW);
1108        buf.write(round);
1109        buf.write(target_addrs.len() as u8);
1110        for addr in target_addrs {
1111            match addr {
1112                SocketAddr::V4(v4) => {
1113                    buf.write(4u8); // IPv4
1114                    buf.write(u32::from(*v4.ip()));
1115                    buf.write(v4.port());
1116                }
1117                SocketAddr::V6(v6) => {
1118                    buf.write(6u8); // IPv6  
1119                    buf.write(*v6.ip());
1120                    buf.write(v6.port());
1121                }
1122            }
1123        }
1124        
1125        self.stats.frame_tx.ping += 1; // Use ping counter for now
1126        
1127        builder.pad_to(MIN_INITIAL_SIZE);
1128        builder.finish_and_track(now, self, None, buf);
1129        
1130        // Mark request sent after packet is built
1131        if let Some(nat_traversal) = &mut self.nat_traversal {
1132            nat_traversal.mark_punch_request_sent();
1133        }
1134        
1135        Some(Transmit {
1136            destination: coordinator_addr,
1137            size: buf.len(),
1138            ecn: if self.path.sending_ecn {
1139                Some(EcnCodepoint::Ect0)
1140            } else {
1141                None
1142            },
1143            segment_size: None,
1144            src_ip: self.local_ip,
1145        })
1146    }
1147
1148    /// Send coordinated PATH_CHALLENGE for hole punching
1149    fn send_coordinated_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1150        // Check if it's time to start synchronized hole punching
1151        if let Some(nat_traversal) = &mut self.nat_traversal {
1152            if nat_traversal.should_start_punching(now) {
1153                nat_traversal.start_punching_phase(now);
1154            }
1155        }
1156        
1157        // Get punch targets if we're in punching phase
1158        let (target_addr, challenge) = {
1159            let nat_traversal = self.nat_traversal.as_ref()?;
1160            match nat_traversal.get_coordination_phase() {
1161                Some(CoordinationPhase::Punching) => {
1162                    let targets = nat_traversal.get_punch_targets_from_coordination()?;
1163                    if targets.is_empty() {
1164                        return None;
1165                    }
1166                    // Send PATH_CHALLENGE to the first target (could be round-robin in future)
1167                    let target = &targets[0];
1168                    (target.remote_addr, target.challenge)
1169                }
1170                _ => return None,
1171            }
1172        };
1173        
1174        debug_assert_eq!(
1175            self.highest_space,
1176            SpaceId::Data,
1177            "PATH_CHALLENGE queued without 1-RTT keys"
1178        );
1179        
1180        buf.reserve(MIN_INITIAL_SIZE as usize);
1181        let buf_capacity = buf.capacity();
1182        
1183        let mut builder = PacketBuilder::new(
1184            now,
1185            SpaceId::Data,
1186            self.rem_cids.active(),
1187            buf,
1188            buf_capacity,
1189            0,
1190            false,
1191            self,
1192        )?;
1193        
1194        trace!("sending coordinated PATH_CHALLENGE {:08x} to {}", challenge, target_addr);
1195        buf.write(frame::FrameType::PATH_CHALLENGE);
1196        buf.write(challenge);
1197        self.stats.frame_tx.path_challenge += 1;
1198
1199        builder.pad_to(MIN_INITIAL_SIZE);
1200        builder.finish_and_track(now, self, None, buf);
1201        
1202        // Mark coordination as validating after packet is built
1203        if let Some(nat_traversal) = &mut self.nat_traversal {
1204            nat_traversal.mark_coordination_validating();
1205        }
1206        
1207        Some(Transmit {
1208            destination: target_addr,
1209            size: buf.len(),
1210            ecn: if self.path.sending_ecn {
1211                Some(EcnCodepoint::Ect0)
1212            } else {
1213                None
1214            },
1215            segment_size: None,
1216            src_ip: self.local_ip,
1217        })
1218    }
1219
1220    /// Send PATH_CHALLENGE for NAT traversal candidates if necessary
1221    fn send_nat_traversal_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1222        // Priority 1: Coordination protocol requests
1223        if let Some(request) = self.send_coordination_request(now, buf) {
1224            return Some(request);
1225        }
1226        
1227        // Priority 2: Coordinated hole punching
1228        if let Some(punch) = self.send_coordinated_path_challenge(now, buf) {
1229            return Some(punch);
1230        }
1231        
1232        // Priority 3: Regular candidate validation (fallback)
1233        let (remote_addr, remote_sequence) = {
1234            let nat_traversal = self.nat_traversal.as_ref()?;
1235            let candidates = nat_traversal.get_validation_candidates();
1236            if candidates.is_empty() {
1237                return None;
1238            }
1239            // Get the highest priority candidate
1240            let (sequence, candidate) = candidates[0];
1241            (candidate.address, sequence)
1242        };
1243        
1244        let challenge = self.rng.gen::<u64>();
1245        
1246        // Start validation for this candidate
1247        if let Err(e) = self.nat_traversal.as_mut()?.start_validation(remote_sequence, challenge, now) {
1248            warn!("Failed to start NAT traversal validation: {}", e);
1249            return None;
1250        }
1251        
1252        debug_assert_eq!(
1253            self.highest_space,
1254            SpaceId::Data,
1255            "PATH_CHALLENGE queued without 1-RTT keys"
1256        );
1257        
1258        buf.reserve(MIN_INITIAL_SIZE as usize);
1259        let buf_capacity = buf.capacity();
1260        
1261        // Use current connection ID for NAT traversal PATH_CHALLENGE
1262        let mut builder = PacketBuilder::new(
1263            now,
1264            SpaceId::Data,
1265            self.rem_cids.active(),
1266            buf,
1267            buf_capacity,
1268            0,
1269            false,
1270            self,
1271        )?;
1272        
1273        trace!("sending PATH_CHALLENGE {:08x} to NAT candidate {}", challenge, remote_addr);
1274        buf.write(frame::FrameType::PATH_CHALLENGE);
1275        buf.write(challenge);
1276        self.stats.frame_tx.path_challenge += 1;
1277
1278        // PATH_CHALLENGE frames must be padded to at least 1200 bytes
1279        builder.pad_to(MIN_INITIAL_SIZE);
1280        
1281        builder.finish_and_track(now, self, None, buf);
1282        
1283        Some(Transmit {
1284            destination: remote_addr,
1285            size: buf.len(),
1286            ecn: if self.path.sending_ecn {
1287                Some(EcnCodepoint::Ect0)
1288            } else {
1289                None
1290            },
1291            segment_size: None,
1292            src_ip: self.local_ip,
1293        })
1294    }
1295
1296    /// Send PATH_CHALLENGE for a previous path if necessary
1297    fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
1298        let (prev_cid, prev_path) = self.prev_path.as_mut()?;
1299        if !prev_path.challenge_pending {
1300            return None;
1301        }
1302        prev_path.challenge_pending = false;
1303        let token = prev_path
1304            .challenge
1305            .expect("previous path challenge pending without token");
1306        let destination = prev_path.remote;
1307        debug_assert_eq!(
1308            self.highest_space,
1309            SpaceId::Data,
1310            "PATH_CHALLENGE queued without 1-RTT keys"
1311        );
1312        buf.reserve(MIN_INITIAL_SIZE as usize);
1313
1314        let buf_capacity = buf.capacity();
1315
1316        // Use the previous CID to avoid linking the new path with the previous path. We
1317        // don't bother accounting for possible retirement of that prev_cid because this is
1318        // sent once, immediately after migration, when the CID is known to be valid. Even
1319        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1320        // this is sent first.
1321        let mut builder = PacketBuilder::new(
1322            now,
1323            SpaceId::Data,
1324            *prev_cid,
1325            buf,
1326            buf_capacity,
1327            0,
1328            false,
1329            self,
1330        )?;
1331        trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1332        buf.write(frame::FrameType::PATH_CHALLENGE);
1333        buf.write(token);
1334        self.stats.frame_tx.path_challenge += 1;
1335
1336        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1337        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1338        // unless the anti-amplification limit for the path does not permit
1339        // sending a datagram of this size
1340        builder.pad_to(MIN_INITIAL_SIZE);
1341
1342        builder.finish(self, buf);
1343        self.stats.udp_tx.on_sent(1, buf.len());
1344
1345        Some(Transmit {
1346            destination,
1347            size: buf.len(),
1348            ecn: None,
1349            segment_size: None,
1350            src_ip: self.local_ip,
1351        })
1352    }
1353
1354    /// Indicate what types of frames are ready to send for the given space
1355    fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1356        if self.spaces[space_id].crypto.is_none()
1357            && (space_id != SpaceId::Data
1358                || self.zero_rtt_crypto.is_none()
1359                || self.side.is_server())
1360        {
1361            // No keys available for this space
1362            return SendableFrames::empty();
1363        }
1364        let mut can_send = self.spaces[space_id].can_send(&self.streams);
1365        if space_id == SpaceId::Data {
1366            can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1367        }
1368        can_send
1369    }
1370
1371    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1372    ///
1373    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1374    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1375    /// extracted through the relevant methods.
1376    pub fn handle_event(&mut self, event: ConnectionEvent) {
1377        use ConnectionEventInner::*;
1378        match event.0 {
1379            Datagram(DatagramConnectionEvent {
1380                now,
1381                remote,
1382                ecn,
1383                first_decode,
1384                remaining,
1385            }) => {
1386                // If this packet could initiate a migration and we're a client or a server that
1387                // forbids migration, drop the datagram. This could be relaxed to heuristically
1388                // permit NAT-rebinding-like migration.
1389                if remote != self.path.remote && !self.side.remote_may_migrate() {
1390                    trace!("discarding packet from unrecognized peer {}", remote);
1391                    return;
1392                }
1393
1394                let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1395
1396                self.stats.udp_rx.datagrams += 1;
1397                self.stats.udp_rx.bytes += first_decode.len() as u64;
1398                let data_len = first_decode.len();
1399
1400                self.handle_decode(now, remote, ecn, first_decode);
1401                // The current `path` might have changed inside `handle_decode`,
1402                // since the packet could have triggered a migration. Make sure
1403                // the data received is accounted for the most recent path by accessing
1404                // `path` after `handle_decode`.
1405                self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1406
1407                if let Some(data) = remaining {
1408                    self.stats.udp_rx.bytes += data.len() as u64;
1409                    self.handle_coalesced(now, remote, ecn, data);
1410                }
1411
1412                #[cfg(feature = "__qlog")]
1413                self.emit_qlog_recovery_metrics(now);
1414
1415                if was_anti_amplification_blocked {
1416                    // A prior attempt to set the loss detection timer may have failed due to
1417                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1418                    // the server's first flight is lost.
1419                    self.set_loss_detection_timer(now);
1420                }
1421            }
1422            NewIdentifiers(ids, now) => {
1423                self.local_cid_state.new_cids(&ids, now);
1424                ids.into_iter().rev().for_each(|frame| {
1425                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1426                });
1427                // Update Timer::PushNewCid
1428                if self
1429                    .timers
1430                    .get(Timer::PushNewCid)
1431                    .map_or(true, |x| x <= now)
1432                {
1433                    self.reset_cid_retirement();
1434                }
1435            }
1436        }
1437    }
1438
1439    /// Process timer expirations
1440    ///
1441    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1442    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1443    /// methods.
1444    ///
1445    /// It is most efficient to call this immediately after the system clock reaches the latest
1446    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1447    /// no-op and therefore are safe.
1448    pub fn handle_timeout(&mut self, now: Instant) {
1449        for &timer in &Timer::VALUES {
1450            if !self.timers.is_expired(timer, now) {
1451                continue;
1452            }
1453            self.timers.stop(timer);
1454            trace!(timer = ?timer, "timeout");
1455            match timer {
1456                Timer::Close => {
1457                    self.state = State::Drained;
1458                    self.endpoint_events.push_back(EndpointEventInner::Drained);
1459                }
1460                Timer::Idle => {
1461                    self.kill(ConnectionError::TimedOut);
1462                }
1463                Timer::KeepAlive => {
1464                    trace!("sending keep-alive");
1465                    self.ping();
1466                }
1467                Timer::LossDetection => {
1468                    self.on_loss_detection_timeout(now);
1469
1470                    #[cfg(feature = "__qlog")]
1471                    self.emit_qlog_recovery_metrics(now);
1472                }
1473                Timer::KeyDiscard => {
1474                    self.zero_rtt_crypto = None;
1475                    self.prev_crypto = None;
1476                }
1477                Timer::PathValidation => {
1478                    debug!("path validation failed");
1479                    if let Some((_, prev)) = self.prev_path.take() {
1480                        self.path = prev;
1481                    }
1482                    self.path.challenge = None;
1483                    self.path.challenge_pending = false;
1484                }
1485                Timer::Pacing => trace!("pacing timer expired"),
1486                Timer::NatTraversal => {
1487                    self.handle_nat_traversal_timeout(now);
1488                }
1489                Timer::PushNewCid => {
1490                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1491                    let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1492                    if !self.state.is_closed() {
1493                        trace!(
1494                            "push a new cid to peer RETIRE_PRIOR_TO field {}",
1495                            self.local_cid_state.retire_prior_to()
1496                        );
1497                        self.endpoint_events
1498                            .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1499                    }
1500                }
1501                Timer::MaxAckDelay => {
1502                    trace!("max ack delay reached");
1503                    // This timer is only armed in the Data space
1504                    self.spaces[SpaceId::Data]
1505                        .pending_acks
1506                        .on_max_ack_delay_timeout()
1507                }
1508            }
1509        }
1510    }
1511
1512    /// Close a connection immediately
1513    ///
1514    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
1515    /// call this only when all important communications have been completed, e.g. by calling
1516    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
1517    /// [`StreamEvent::Finished`] event.
1518    ///
1519    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
1520    /// delivered. There may still be data from the peer that has not been received.
1521    ///
1522    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
1523    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1524        self.close_inner(
1525            now,
1526            Close::Application(frame::ApplicationClose { error_code, reason }),
1527        )
1528    }
1529
1530    fn close_inner(&mut self, now: Instant, reason: Close) {
1531        let was_closed = self.state.is_closed();
1532        if !was_closed {
1533            self.close_common();
1534            self.set_close_timer(now);
1535            self.close = true;
1536            self.state = State::Closed(state::Closed { reason });
1537        }
1538    }
1539
1540    /// Control datagrams
1541    pub fn datagrams(&mut self) -> Datagrams<'_> {
1542        Datagrams { conn: self }
1543    }
1544
1545    /// Returns connection statistics
1546    pub fn stats(&self) -> ConnectionStats {
1547        let mut stats = self.stats;
1548        stats.path.rtt = self.path.rtt.get();
1549        stats.path.cwnd = self.path.congestion.window();
1550        stats.path.current_mtu = self.path.mtud.current_mtu();
1551
1552        stats
1553    }
1554
1555    /// Ping the remote endpoint
1556    ///
1557    /// Causes an ACK-eliciting packet to be transmitted.
1558    pub fn ping(&mut self) {
1559        self.spaces[self.highest_space].ping_pending = true;
1560    }
1561
1562    /// Update traffic keys spontaneously
1563    ///
1564    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
1565    pub fn force_key_update(&mut self) {
1566        if !self.state.is_established() {
1567            debug!("ignoring forced key update in illegal state");
1568            return;
1569        }
1570        if self.prev_crypto.is_some() {
1571            // We already just updated, or are currently updating, the keys. Concurrent key updates
1572            // are illegal.
1573            debug!("ignoring redundant forced key update");
1574            return;
1575        }
1576        self.update_keys(None, false);
1577    }
1578
1579    // Compatibility wrapper for quinn < 0.11.7. Remove for 0.12.
1580    #[doc(hidden)]
1581    #[deprecated]
1582    pub fn initiate_key_update(&mut self) {
1583        self.force_key_update();
1584    }
1585
1586    /// Get a session reference
1587    pub fn crypto_session(&self) -> &dyn crypto::Session {
1588        &*self.crypto
1589    }
1590
1591    /// Whether the connection is in the process of being established
1592    ///
1593    /// If this returns `false`, the connection may be either established or closed, signaled by the
1594    /// emission of a `Connected` or `ConnectionLost` message respectively.
1595    pub fn is_handshaking(&self) -> bool {
1596        self.state.is_handshake()
1597    }
1598
1599    /// Whether the connection is closed
1600    ///
1601    /// Closed connections cannot transport any further data. A connection becomes closed when
1602    /// either peer application intentionally closes it, or when either transport layer detects an
1603    /// error such as a time-out or certificate validation failure.
1604    ///
1605    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
1606    pub fn is_closed(&self) -> bool {
1607        self.state.is_closed()
1608    }
1609
1610    /// Whether there is no longer any need to keep the connection around
1611    ///
1612    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
1613    /// packets from the peer. All drained connections have been closed.
1614    pub fn is_drained(&self) -> bool {
1615        self.state.is_drained()
1616    }
1617
1618    /// For clients, if the peer accepted the 0-RTT data packets
1619    ///
1620    /// The value is meaningless until after the handshake completes.
1621    pub fn accepted_0rtt(&self) -> bool {
1622        self.accepted_0rtt
1623    }
1624
1625    /// Whether 0-RTT is/was possible during the handshake
1626    pub fn has_0rtt(&self) -> bool {
1627        self.zero_rtt_enabled
1628    }
1629
1630    /// Whether there are any pending retransmits
1631    pub fn has_pending_retransmits(&self) -> bool {
1632        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1633    }
1634
1635    /// Look up whether we're the client or server of this Connection
1636    pub fn side(&self) -> Side {
1637        self.side.side()
1638    }
1639
1640    /// The latest socket address for this connection's peer
1641    pub fn remote_address(&self) -> SocketAddr {
1642        self.path.remote
1643    }
1644
1645    /// The local IP address which was used when the peer established
1646    /// the connection
1647    ///
1648    /// This can be different from the address the endpoint is bound to, in case
1649    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
1650    ///
1651    /// This will return `None` for clients, or when no `local_ip` was passed to
1652    /// [`Endpoint::handle()`](crate::Endpoint::handle) for the datagrams establishing this
1653    /// connection.
1654    pub fn local_ip(&self) -> Option<IpAddr> {
1655        self.local_ip
1656    }
1657
1658    /// Current best estimate of this connection's latency (round-trip-time)
1659    pub fn rtt(&self) -> Duration {
1660        self.path.rtt.get()
1661    }
1662
1663    /// Current state of this connection's congestion controller, for debugging purposes
1664    pub fn congestion_state(&self) -> &dyn Controller {
1665        self.path.congestion.as_ref()
1666    }
1667
1668    /// Resets path-specific settings.
1669    ///
1670    /// This will force-reset several subsystems related to a specific network path.
1671    /// Currently this is the congestion controller, round-trip estimator, and the MTU
1672    /// discovery.
1673    ///
1674    /// This is useful when it is known the underlying network path has changed and the old
1675    /// state of these subsystems is no longer valid or optimal. In this case it might be
1676    /// faster or reduce loss to settle on optimal values by restarting from the initial
1677    /// configuration in the [`TransportConfig`].
1678    pub fn path_changed(&mut self, now: Instant) {
1679        self.path.reset(now, &self.config);
1680    }
1681
1682    /// Modify the number of remotely initiated streams that may be concurrently open
1683    ///
1684    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
1685    /// `count`s increase both minimum and worst-case memory consumption.
1686    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1687        self.streams.set_max_concurrent(dir, count);
1688        // If the limit was reduced, then a flow control update previously deemed insignificant may
1689        // now be significant.
1690        let pending = &mut self.spaces[SpaceId::Data].pending;
1691        self.streams.queue_max_stream_id(pending);
1692    }
1693
1694    /// Current number of remotely initiated streams that may be concurrently open
1695    ///
1696    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
1697    /// it will not change immediately, even if fewer streams are open. Instead, it will
1698    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
1699    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1700        self.streams.max_concurrent(dir)
1701    }
1702
1703    /// See [`TransportConfig::receive_window()`]
1704    pub fn set_receive_window(&mut self, receive_window: VarInt) {
1705        if self.streams.set_receive_window(receive_window) {
1706            self.spaces[SpaceId::Data].pending.max_data = true;
1707        }
1708    }
1709
1710    fn on_ack_received(
1711        &mut self,
1712        now: Instant,
1713        space: SpaceId,
1714        ack: frame::Ack,
1715    ) -> Result<(), TransportError> {
1716        if ack.largest >= self.spaces[space].next_packet_number {
1717            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1718        }
1719        let new_largest = {
1720            let space = &mut self.spaces[space];
1721            if space
1722                .largest_acked_packet
1723                .map_or(true, |pn| ack.largest > pn)
1724            {
1725                space.largest_acked_packet = Some(ack.largest);
1726                if let Some(info) = space.sent_packets.get(&ack.largest) {
1727                    // This should always succeed, but a misbehaving peer might ACK a packet we
1728                    // haven't sent. At worst, that will result in us spuriously reducing the
1729                    // congestion window.
1730                    space.largest_acked_packet_sent = info.time_sent;
1731                }
1732                true
1733            } else {
1734                false
1735            }
1736        };
1737
1738        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
1739        let mut newly_acked = ArrayRangeSet::new();
1740        for range in ack.iter() {
1741            self.packet_number_filter.check_ack(space, range.clone())?;
1742            for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1743                newly_acked.insert_one(pn);
1744            }
1745        }
1746
1747        if newly_acked.is_empty() {
1748            return Ok(());
1749        }
1750
1751        let mut ack_eliciting_acked = false;
1752        for packet in newly_acked.elts() {
1753            if let Some(info) = self.spaces[space].take(packet) {
1754                if let Some(acked) = info.largest_acked {
1755                    // Assume ACKs for all packets below the largest acknowledged in `packet` have
1756                    // been received. This can cause the peer to spuriously retransmit if some of
1757                    // our earlier ACKs were lost, but allows for simpler state tracking. See
1758                    // discussion at
1759                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
1760                    self.spaces[space].pending_acks.subtract_below(acked);
1761                }
1762                ack_eliciting_acked |= info.ack_eliciting;
1763
1764                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
1765                let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1766                if mtu_updated {
1767                    self.path
1768                        .congestion
1769                        .on_mtu_update(self.path.mtud.current_mtu());
1770                }
1771
1772                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
1773                self.ack_frequency.on_acked(packet);
1774
1775                self.on_packet_acked(now, packet, info);
1776            }
1777        }
1778
1779        self.path.congestion.on_end_acks(
1780            now,
1781            self.path.in_flight.bytes,
1782            self.app_limited,
1783            self.spaces[space].largest_acked_packet,
1784        );
1785
1786        if new_largest && ack_eliciting_acked {
1787            let ack_delay = if space != SpaceId::Data {
1788                Duration::from_micros(0)
1789            } else {
1790                cmp::min(
1791                    self.ack_frequency.peer_max_ack_delay,
1792                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1793                )
1794            };
1795            let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1796            self.path.rtt.update(ack_delay, rtt);
1797            if self.path.first_packet_after_rtt_sample.is_none() {
1798                self.path.first_packet_after_rtt_sample =
1799                    Some((space, self.spaces[space].next_packet_number));
1800            }
1801        }
1802
1803        // Must be called before crypto/pto_count are clobbered
1804        self.detect_lost_packets(now, space, true);
1805
1806        if self.peer_completed_address_validation() {
1807            self.pto_count = 0;
1808        }
1809
1810        // Explicit congestion notification
1811        if self.path.sending_ecn {
1812            if let Some(ecn) = ack.ecn {
1813                // We only examine ECN counters from ACKs that we are certain we received in transmit
1814                // order, allowing us to compute an increase in ECN counts to compare against the number
1815                // of newly acked packets that remains well-defined in the presence of arbitrary packet
1816                // reordering.
1817                if new_largest {
1818                    let sent = self.spaces[space].largest_acked_packet_sent;
1819                    self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1820                }
1821            } else {
1822                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
1823                debug!("ECN not acknowledged by peer");
1824                self.path.sending_ecn = false;
1825            }
1826        }
1827
1828        self.set_loss_detection_timer(now);
1829        Ok(())
1830    }
1831
1832    /// Process a new ECN block from an in-order ACK
1833    fn process_ecn(
1834        &mut self,
1835        now: Instant,
1836        space: SpaceId,
1837        newly_acked: u64,
1838        ecn: frame::EcnCounts,
1839        largest_sent_time: Instant,
1840    ) {
1841        match self.spaces[space].detect_ecn(newly_acked, ecn) {
1842            Err(e) => {
1843                debug!("halting ECN due to verification failure: {}", e);
1844                self.path.sending_ecn = false;
1845                // Wipe out the existing value because it might be garbage and could interfere with
1846                // future attempts to use ECN on new paths.
1847                self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
1848            }
1849            Ok(false) => {}
1850            Ok(true) => {
1851                self.stats.path.congestion_events += 1;
1852                self.path
1853                    .congestion
1854                    .on_congestion_event(now, largest_sent_time, false, 0);
1855            }
1856        }
1857    }
1858
1859    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
1860    // high-latency handshakes
1861    fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
1862        self.remove_in_flight(pn, &info);
1863        if info.ack_eliciting && self.path.challenge.is_none() {
1864            // Only pass ACKs to the congestion controller if we are not validating the current
1865            // path, so as to ignore any ACKs from older paths still coming in.
1866            self.path.congestion.on_ack(
1867                now,
1868                info.time_sent,
1869                info.size.into(),
1870                self.app_limited,
1871                &self.path.rtt,
1872            );
1873        }
1874
1875        // Update state for confirmed delivery of frames
1876        if let Some(retransmits) = info.retransmits.get() {
1877            for (id, _) in retransmits.reset_stream.iter() {
1878                self.streams.reset_acked(*id);
1879            }
1880        }
1881
1882        for frame in info.stream_frames {
1883            self.streams.received_ack_of(frame);
1884        }
1885    }
1886
1887    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
1888        let start = if self.zero_rtt_crypto.is_some() {
1889            now
1890        } else {
1891            self.prev_crypto
1892                .as_ref()
1893                .expect("no previous keys")
1894                .end_packet
1895                .as_ref()
1896                .expect("update not acknowledged yet")
1897                .1
1898        };
1899        self.timers
1900            .set(Timer::KeyDiscard, start + self.pto(space) * 3);
1901    }
1902
1903    fn on_loss_detection_timeout(&mut self, now: Instant) {
1904        if let Some((_, pn_space)) = self.loss_time_and_space() {
1905            // Time threshold loss Detection
1906            self.detect_lost_packets(now, pn_space, false);
1907            self.set_loss_detection_timer(now);
1908            return;
1909        }
1910
1911        let (_, space) = match self.pto_time_and_space(now) {
1912            Some(x) => x,
1913            None => {
1914                error!("PTO expired while unset");
1915                return;
1916            }
1917        };
1918        trace!(
1919            in_flight = self.path.in_flight.bytes,
1920            count = self.pto_count,
1921            ?space,
1922            "PTO fired"
1923        );
1924
1925        let count = match self.path.in_flight.ack_eliciting {
1926            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
1927            // deadlock preventions
1928            0 => {
1929                debug_assert!(!self.peer_completed_address_validation());
1930                1
1931            }
1932            // Conventional loss probe
1933            _ => 2,
1934        };
1935        self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
1936        self.pto_count = self.pto_count.saturating_add(1);
1937        self.set_loss_detection_timer(now);
1938    }
1939
1940    fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
1941        let mut lost_packets = Vec::<u64>::new();
1942        let mut lost_mtu_probe = None;
1943        let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
1944        let rtt = self.path.rtt.conservative();
1945        let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
1946
1947        // Packets sent before this time are deemed lost.
1948        let lost_send_time = now.checked_sub(loss_delay).unwrap();
1949        let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
1950        let packet_threshold = self.config.packet_threshold as u64;
1951        let mut size_of_lost_packets = 0u64;
1952
1953        // InPersistentCongestion: Determine if all packets in the time period before the newest
1954        // lost packet, including the edges, are marked lost. PTO computation must always
1955        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
1956        let congestion_period =
1957            self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
1958        let mut persistent_congestion_start: Option<Instant> = None;
1959        let mut prev_packet = None;
1960        let mut in_persistent_congestion = false;
1961
1962        let space = &mut self.spaces[pn_space];
1963        space.loss_time = None;
1964
1965        for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
1966            if prev_packet != Some(packet.wrapping_sub(1)) {
1967                // An intervening packet was acknowledged
1968                persistent_congestion_start = None;
1969            }
1970
1971            if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
1972            {
1973                if Some(packet) == in_flight_mtu_probe {
1974                    // Lost MTU probes are not included in `lost_packets`, because they should not
1975                    // trigger a congestion control response
1976                    lost_mtu_probe = in_flight_mtu_probe;
1977                } else {
1978                    lost_packets.push(packet);
1979                    size_of_lost_packets += info.size as u64;
1980                    if info.ack_eliciting && due_to_ack {
1981                        match persistent_congestion_start {
1982                            // Two ACK-eliciting packets lost more than congestion_period apart, with no
1983                            // ACKed packets in between
1984                            Some(start) if info.time_sent - start > congestion_period => {
1985                                in_persistent_congestion = true;
1986                            }
1987                            // Persistent congestion must start after the first RTT sample
1988                            None if self
1989                                .path
1990                                .first_packet_after_rtt_sample
1991                                .is_some_and(|x| x < (pn_space, packet)) =>
1992                            {
1993                                persistent_congestion_start = Some(info.time_sent);
1994                            }
1995                            _ => {}
1996                        }
1997                    }
1998                }
1999            } else {
2000                let next_loss_time = info.time_sent + loss_delay;
2001                space.loss_time = Some(
2002                    space
2003                        .loss_time
2004                        .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
2005                );
2006                persistent_congestion_start = None;
2007            }
2008
2009            prev_packet = Some(packet);
2010        }
2011
2012        // OnPacketsLost
2013        if let Some(largest_lost) = lost_packets.last().cloned() {
2014            let old_bytes_in_flight = self.path.in_flight.bytes;
2015            let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
2016            self.lost_packets += lost_packets.len() as u64;
2017            self.stats.path.lost_packets += lost_packets.len() as u64;
2018            self.stats.path.lost_bytes += size_of_lost_packets;
2019            trace!(
2020                "packets lost: {:?}, bytes lost: {}",
2021                lost_packets, size_of_lost_packets
2022            );
2023
2024            for &packet in &lost_packets {
2025                let info = self.spaces[pn_space].take(packet).unwrap(); // safe: lost_packets is populated just above
2026                self.remove_in_flight(packet, &info);
2027                for frame in info.stream_frames {
2028                    self.streams.retransmit(frame);
2029                }
2030                self.spaces[pn_space].pending |= info.retransmits;
2031                self.path.mtud.on_non_probe_lost(packet, info.size);
2032            }
2033
2034            if self.path.mtud.black_hole_detected(now) {
2035                self.stats.path.black_holes_detected += 1;
2036                self.path
2037                    .congestion
2038                    .on_mtu_update(self.path.mtud.current_mtu());
2039                if let Some(max_datagram_size) = self.datagrams().max_size() {
2040                    self.datagrams.drop_oversized(max_datagram_size);
2041                }
2042            }
2043
2044            // Don't apply congestion penalty for lost ack-only packets
2045            let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
2046
2047            if lost_ack_eliciting {
2048                self.stats.path.congestion_events += 1;
2049                self.path.congestion.on_congestion_event(
2050                    now,
2051                    largest_lost_sent,
2052                    in_persistent_congestion,
2053                    size_of_lost_packets,
2054                );
2055            }
2056        }
2057
2058        // Handle a lost MTU probe
2059        if let Some(packet) = lost_mtu_probe {
2060            let info = self.spaces[SpaceId::Data].take(packet).unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and therefore must not have been removed yet
2061            self.remove_in_flight(packet, &info);
2062            self.path.mtud.on_probe_lost();
2063            self.stats.path.lost_plpmtud_probes += 1;
2064        }
2065    }
2066
2067    fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
2068        SpaceId::iter()
2069            .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
2070            .min_by_key(|&(time, _)| time)
2071    }
2072
2073    fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
2074        let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
2075        let mut duration = self.path.rtt.pto_base() * backoff;
2076
2077        if self.path.in_flight.ack_eliciting == 0 {
2078            debug_assert!(!self.peer_completed_address_validation());
2079            let space = match self.highest_space {
2080                SpaceId::Handshake => SpaceId::Handshake,
2081                _ => SpaceId::Initial,
2082            };
2083            return Some((now + duration, space));
2084        }
2085
2086        let mut result = None;
2087        for space in SpaceId::iter() {
2088            if self.spaces[space].in_flight == 0 {
2089                continue;
2090            }
2091            if space == SpaceId::Data {
2092                // Skip ApplicationData until handshake completes.
2093                if self.is_handshaking() {
2094                    return result;
2095                }
2096                // Include max_ack_delay and backoff for ApplicationData.
2097                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2098            }
2099            let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
2100                Some(time) => time,
2101                None => continue,
2102            };
2103            let pto = last_ack_eliciting + duration;
2104            if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
2105                result = Some((pto, space));
2106            }
2107        }
2108        result
2109    }
2110
2111    fn peer_completed_address_validation(&self) -> bool {
2112        if self.side.is_server() || self.state.is_closed() {
2113            return true;
2114        }
2115        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
2116        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
2117        self.spaces[SpaceId::Handshake]
2118            .largest_acked_packet
2119            .is_some()
2120            || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
2121            || (self.spaces[SpaceId::Data].crypto.is_some()
2122                && self.spaces[SpaceId::Handshake].crypto.is_none())
2123    }
2124
2125    fn set_loss_detection_timer(&mut self, now: Instant) {
2126        if self.state.is_closed() {
2127            // No loss detection takes place on closed connections, and `close_common` already
2128            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
2129            // reordered packet being handled by state-insensitive code.
2130            return;
2131        }
2132
2133        if let Some((loss_time, _)) = self.loss_time_and_space() {
2134            // Time threshold loss detection.
2135            self.timers.set(Timer::LossDetection, loss_time);
2136            return;
2137        }
2138
2139        if self.path.anti_amplification_blocked(1) {
2140            // We wouldn't be able to send anything, so don't bother.
2141            self.timers.stop(Timer::LossDetection);
2142            return;
2143        }
2144
2145        if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
2146            // There is nothing to detect lost, so no timer is set. However, the client needs to arm
2147            // the timer if the server might be blocked by the anti-amplification limit.
2148            self.timers.stop(Timer::LossDetection);
2149            return;
2150        }
2151
2152        // Determine which PN space to arm PTO for.
2153        // Calculate PTO duration
2154        if let Some((timeout, _)) = self.pto_time_and_space(now) {
2155            self.timers.set(Timer::LossDetection, timeout);
2156        } else {
2157            self.timers.stop(Timer::LossDetection);
2158        }
2159    }
2160
2161    /// Probe Timeout
2162    fn pto(&self, space: SpaceId) -> Duration {
2163        let max_ack_delay = match space {
2164            SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
2165            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
2166        };
2167        self.path.rtt.pto_base() + max_ack_delay
2168    }
2169
2170    fn on_packet_authenticated(
2171        &mut self,
2172        now: Instant,
2173        space_id: SpaceId,
2174        ecn: Option<EcnCodepoint>,
2175        packet: Option<u64>,
2176        spin: bool,
2177        is_1rtt: bool,
2178    ) {
2179        self.total_authed_packets += 1;
2180        self.reset_keep_alive(now);
2181        self.reset_idle_timeout(now, space_id);
2182        self.permit_idle_reset = true;
2183        self.receiving_ecn |= ecn.is_some();
2184        if let Some(x) = ecn {
2185            let space = &mut self.spaces[space_id];
2186            space.ecn_counters += x;
2187
2188            if x.is_ce() {
2189                space.pending_acks.set_immediate_ack_required();
2190            }
2191        }
2192
2193        let packet = match packet {
2194            Some(x) => x,
2195            None => return,
2196        };
2197        if self.side.is_server() {
2198            if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
2199                // A server stops sending and processing Initial packets when it receives its first Handshake packet.
2200                self.discard_space(now, SpaceId::Initial);
2201            }
2202            if self.zero_rtt_crypto.is_some() && is_1rtt {
2203                // Discard 0-RTT keys soon after receiving a 1-RTT packet
2204                self.set_key_discard_timer(now, space_id)
2205            }
2206        }
2207        let space = &mut self.spaces[space_id];
2208        space.pending_acks.insert_one(packet, now);
2209        if packet >= space.rx_packet {
2210            space.rx_packet = packet;
2211            // Update outgoing spin bit, inverting iff we're the client
2212            self.spin = self.side.is_client() ^ spin;
2213        }
2214    }
2215
2216    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
2217        let timeout = match self.idle_timeout {
2218            None => return,
2219            Some(dur) => dur,
2220        };
2221        if self.state.is_closed() {
2222            self.timers.stop(Timer::Idle);
2223            return;
2224        }
2225        let dt = cmp::max(timeout, 3 * self.pto(space));
2226        self.timers.set(Timer::Idle, now + dt);
2227    }
2228
2229    fn reset_keep_alive(&mut self, now: Instant) {
2230        let interval = match self.config.keep_alive_interval {
2231            Some(x) if self.state.is_established() => x,
2232            _ => return,
2233        };
2234        self.timers.set(Timer::KeepAlive, now + interval);
2235    }
2236
2237    fn reset_cid_retirement(&mut self) {
2238        if let Some(t) = self.local_cid_state.next_timeout() {
2239            self.timers.set(Timer::PushNewCid, t);
2240        }
2241    }
2242
2243    /// Handle the already-decrypted first packet from the client
2244    ///
2245    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
2246    /// efficient.
2247    pub(crate) fn handle_first_packet(
2248        &mut self,
2249        now: Instant,
2250        remote: SocketAddr,
2251        ecn: Option<EcnCodepoint>,
2252        packet_number: u64,
2253        packet: InitialPacket,
2254        remaining: Option<BytesMut>,
2255    ) -> Result<(), ConnectionError> {
2256        let span = trace_span!("first recv");
2257        let _guard = span.enter();
2258        debug_assert!(self.side.is_server());
2259        let len = packet.header_data.len() + packet.payload.len();
2260        self.path.total_recvd = len as u64;
2261
2262        match self.state {
2263            State::Handshake(ref mut state) => {
2264                state.expected_token = packet.header.token.clone();
2265            }
2266            _ => unreachable!("first packet must be delivered in Handshake state"),
2267        }
2268
2269        self.on_packet_authenticated(
2270            now,
2271            SpaceId::Initial,
2272            ecn,
2273            Some(packet_number),
2274            false,
2275            false,
2276        );
2277
2278        self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
2279        if let Some(data) = remaining {
2280            self.handle_coalesced(now, remote, ecn, data);
2281        }
2282
2283        #[cfg(feature = "__qlog")]
2284        self.emit_qlog_recovery_metrics(now);
2285
2286        Ok(())
2287    }
2288
2289    fn init_0rtt(&mut self) {
2290        let (header, packet) = match self.crypto.early_crypto() {
2291            Some(x) => x,
2292            None => return,
2293        };
2294        if self.side.is_client() {
2295            match self.crypto.transport_parameters() {
2296                Ok(params) => {
2297                    let params = params
2298                        .expect("crypto layer didn't supply transport parameters with ticket");
2299                    // Certain values must not be cached
2300                    let params = TransportParameters {
2301                        initial_src_cid: None,
2302                        original_dst_cid: None,
2303                        preferred_address: None,
2304                        retry_src_cid: None,
2305                        stateless_reset_token: None,
2306                        min_ack_delay: None,
2307                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
2308                        max_ack_delay: TransportParameters::default().max_ack_delay,
2309                        ..params
2310                    };
2311                    self.set_peer_params(params);
2312                }
2313                Err(e) => {
2314                    error!("session ticket has malformed transport parameters: {}", e);
2315                    return;
2316                }
2317            }
2318        }
2319        trace!("0-RTT enabled");
2320        self.zero_rtt_enabled = true;
2321        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
2322    }
2323
2324    fn read_crypto(
2325        &mut self,
2326        space: SpaceId,
2327        crypto: &frame::Crypto,
2328        payload_len: usize,
2329    ) -> Result<(), TransportError> {
2330        let expected = if !self.state.is_handshake() {
2331            SpaceId::Data
2332        } else if self.highest_space == SpaceId::Initial {
2333            SpaceId::Initial
2334        } else {
2335            // On the server, self.highest_space can be Data after receiving the client's first
2336            // flight, but we expect Handshake CRYPTO until the handshake is complete.
2337            SpaceId::Handshake
2338        };
2339        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
2340        // packets are illegal, and we don't process 1-RTT packets until the handshake is
2341        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
2342        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2343
2344        let end = crypto.offset + crypto.data.len() as u64;
2345        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2346            warn!(
2347                "received new {:?} CRYPTO data when expecting {:?}",
2348                space, expected
2349            );
2350            return Err(TransportError::PROTOCOL_VIOLATION(
2351                "new data at unexpected encryption level",
2352            ));
2353        }
2354
2355        let space = &mut self.spaces[space];
2356        let max = end.saturating_sub(space.crypto_stream.bytes_read());
2357        if max > self.config.crypto_buffer_size as u64 {
2358            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2359        }
2360
2361        space
2362            .crypto_stream
2363            .insert(crypto.offset, crypto.data.clone(), payload_len);
2364        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2365            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2366            if self.crypto.read_handshake(&chunk.bytes)? {
2367                self.events.push_back(Event::HandshakeDataReady);
2368            }
2369        }
2370
2371        Ok(())
2372    }
2373
2374    fn write_crypto(&mut self) {
2375        loop {
2376            let space = self.highest_space;
2377            let mut outgoing = Vec::new();
2378            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2379                match space {
2380                    SpaceId::Initial => {
2381                        self.upgrade_crypto(SpaceId::Handshake, crypto);
2382                    }
2383                    SpaceId::Handshake => {
2384                        self.upgrade_crypto(SpaceId::Data, crypto);
2385                    }
2386                    _ => unreachable!("got updated secrets during 1-RTT"),
2387                }
2388            }
2389            if outgoing.is_empty() {
2390                if space == self.highest_space {
2391                    break;
2392                } else {
2393                    // Keys updated, check for more data to send
2394                    continue;
2395                }
2396            }
2397            let offset = self.spaces[space].crypto_offset;
2398            let outgoing = Bytes::from(outgoing);
2399            if let State::Handshake(ref mut state) = self.state {
2400                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2401                    state.client_hello = Some(outgoing.clone());
2402                }
2403            }
2404            self.spaces[space].crypto_offset += outgoing.len() as u64;
2405            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2406            self.spaces[space].pending.crypto.push_back(frame::Crypto {
2407                offset,
2408                data: outgoing,
2409            });
2410        }
2411    }
2412
2413    /// Switch to stronger cryptography during handshake
2414    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2415        debug_assert!(
2416            self.spaces[space].crypto.is_none(),
2417            "already reached packet space {space:?}"
2418        );
2419        trace!("{:?} keys ready", space);
2420        if space == SpaceId::Data {
2421            // Precompute the first key update
2422            self.next_crypto = Some(
2423                self.crypto
2424                    .next_1rtt_keys()
2425                    .expect("handshake should be complete"),
2426            );
2427        }
2428
2429        self.spaces[space].crypto = Some(crypto);
2430        debug_assert!(space as usize > self.highest_space as usize);
2431        self.highest_space = space;
2432        if space == SpaceId::Data && self.side.is_client() {
2433            // Discard 0-RTT keys because 1-RTT keys are available.
2434            self.zero_rtt_crypto = None;
2435        }
2436    }
2437
2438    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2439        debug_assert!(space_id != SpaceId::Data);
2440        trace!("discarding {:?} keys", space_id);
2441        if space_id == SpaceId::Initial {
2442            // No longer needed
2443            if let ConnectionSide::Client { token, .. } = &mut self.side {
2444                *token = Bytes::new();
2445            }
2446        }
2447        let space = &mut self.spaces[space_id];
2448        space.crypto = None;
2449        space.time_of_last_ack_eliciting_packet = None;
2450        space.loss_time = None;
2451        space.in_flight = 0;
2452        let sent_packets = mem::take(&mut space.sent_packets);
2453        for (pn, packet) in sent_packets.into_iter() {
2454            self.remove_in_flight(pn, &packet);
2455        }
2456        self.set_loss_detection_timer(now)
2457    }
2458
2459    fn handle_coalesced(
2460        &mut self,
2461        now: Instant,
2462        remote: SocketAddr,
2463        ecn: Option<EcnCodepoint>,
2464        data: BytesMut,
2465    ) {
2466        self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2467        let mut remaining = Some(data);
2468        while let Some(data) = remaining {
2469            match PartialDecode::new(
2470                data,
2471                &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2472                &[self.version],
2473                self.endpoint_config.grease_quic_bit,
2474            ) {
2475                Ok((partial_decode, rest)) => {
2476                    remaining = rest;
2477                    self.handle_decode(now, remote, ecn, partial_decode);
2478                }
2479                Err(e) => {
2480                    trace!("malformed header: {}", e);
2481                    return;
2482                }
2483            }
2484        }
2485    }
2486
2487    fn handle_decode(
2488        &mut self,
2489        now: Instant,
2490        remote: SocketAddr,
2491        ecn: Option<EcnCodepoint>,
2492        partial_decode: PartialDecode,
2493    ) {
2494        if let Some(decoded) = packet_crypto::unprotect_header(
2495            partial_decode,
2496            &self.spaces,
2497            self.zero_rtt_crypto.as_ref(),
2498            self.peer_params.stateless_reset_token,
2499        ) {
2500            self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2501        }
2502    }
2503
2504    fn handle_packet(
2505        &mut self,
2506        now: Instant,
2507        remote: SocketAddr,
2508        ecn: Option<EcnCodepoint>,
2509        packet: Option<Packet>,
2510        stateless_reset: bool,
2511    ) {
2512        self.stats.udp_rx.ios += 1;
2513        if let Some(ref packet) = packet {
2514            trace!(
2515                "got {:?} packet ({} bytes) from {} using id {}",
2516                packet.header.space(),
2517                packet.payload.len() + packet.header_data.len(),
2518                remote,
2519                packet.header.dst_cid(),
2520            );
2521        }
2522
2523        if self.is_handshaking() && remote != self.path.remote {
2524            debug!("discarding packet with unexpected remote during handshake");
2525            return;
2526        }
2527
2528        let was_closed = self.state.is_closed();
2529        let was_drained = self.state.is_drained();
2530
2531        let decrypted = match packet {
2532            None => Err(None),
2533            Some(mut packet) => self
2534                .decrypt_packet(now, &mut packet)
2535                .map(move |number| (packet, number)),
2536        };
2537        let result = match decrypted {
2538            _ if stateless_reset => {
2539                debug!("got stateless reset");
2540                Err(ConnectionError::Reset)
2541            }
2542            Err(Some(e)) => {
2543                warn!("illegal packet: {}", e);
2544                Err(e.into())
2545            }
2546            Err(None) => {
2547                debug!("failed to authenticate packet");
2548                self.authentication_failures += 1;
2549                let integrity_limit = self.spaces[self.highest_space]
2550                    .crypto
2551                    .as_ref()
2552                    .unwrap()
2553                    .packet
2554                    .local
2555                    .integrity_limit();
2556                if self.authentication_failures > integrity_limit {
2557                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2558                } else {
2559                    return;
2560                }
2561            }
2562            Ok((packet, number)) => {
2563                let span = match number {
2564                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2565                    None => trace_span!("recv", space = ?packet.header.space()),
2566                };
2567                let _guard = span.enter();
2568
2569                let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2570                if number.is_some_and(is_duplicate) {
2571                    debug!("discarding possible duplicate packet");
2572                    return;
2573                } else if self.state.is_handshake() && packet.header.is_short() {
2574                    // TODO: SHOULD buffer these to improve reordering tolerance.
2575                    trace!("dropping short packet during handshake");
2576                    return;
2577                } else {
2578                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2579                        if let State::Handshake(ref hs) = self.state {
2580                            if self.side.is_server() && token != &hs.expected_token {
2581                                // Clients must send the same retry token in every Initial. Initial
2582                                // packets can be spoofed, so we discard rather than killing the
2583                                // connection.
2584                                warn!("discarding Initial with invalid retry token");
2585                                return;
2586                            }
2587                        }
2588                    }
2589
2590                    if !self.state.is_closed() {
2591                        let spin = match packet.header {
2592                            Header::Short { spin, .. } => spin,
2593                            _ => false,
2594                        };
2595                        self.on_packet_authenticated(
2596                            now,
2597                            packet.header.space(),
2598                            ecn,
2599                            number,
2600                            spin,
2601                            packet.header.is_1rtt(),
2602                        );
2603                    }
2604
2605                    self.process_decrypted_packet(now, remote, number, packet)
2606                }
2607            }
2608        };
2609
2610        // State transitions for error cases
2611        if let Err(conn_err) = result {
2612            self.error = Some(conn_err.clone());
2613            self.state = match conn_err {
2614                ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2615                ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2616                ConnectionError::Reset
2617                | ConnectionError::TransportError(TransportError {
2618                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
2619                    ..
2620                }) => State::Drained,
2621                ConnectionError::TimedOut => {
2622                    unreachable!("timeouts aren't generated by packet processing");
2623                }
2624                ConnectionError::TransportError(err) => {
2625                    debug!("closing connection due to transport error: {}", err);
2626                    State::closed(err)
2627                }
2628                ConnectionError::VersionMismatch => State::Draining,
2629                ConnectionError::LocallyClosed => {
2630                    unreachable!("LocallyClosed isn't generated by packet processing");
2631                }
2632                ConnectionError::CidsExhausted => {
2633                    unreachable!("CidsExhausted isn't generated by packet processing");
2634                }
2635            };
2636        }
2637
2638        if !was_closed && self.state.is_closed() {
2639            self.close_common();
2640            if !self.state.is_drained() {
2641                self.set_close_timer(now);
2642            }
2643        }
2644        if !was_drained && self.state.is_drained() {
2645            self.endpoint_events.push_back(EndpointEventInner::Drained);
2646            // Close timer may have been started previously, e.g. if we sent a close and got a
2647            // stateless reset in response
2648            self.timers.stop(Timer::Close);
2649        }
2650
2651        // Transmit CONNECTION_CLOSE if necessary
2652        if let State::Closed(_) = self.state {
2653            self.close = remote == self.path.remote;
2654        }
2655    }
2656
2657    fn process_decrypted_packet(
2658        &mut self,
2659        now: Instant,
2660        remote: SocketAddr,
2661        number: Option<u64>,
2662        packet: Packet,
2663    ) -> Result<(), ConnectionError> {
2664        let state = match self.state {
2665            State::Established => {
2666                match packet.header.space() {
2667                    SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2668                    _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2669                    _ => {
2670                        trace!("discarding unexpected pre-handshake packet");
2671                    }
2672                }
2673                return Ok(());
2674            }
2675            State::Closed(_) => {
2676                for result in frame::Iter::new(packet.payload.freeze())? {
2677                    let frame = match result {
2678                        Ok(frame) => frame,
2679                        Err(err) => {
2680                            debug!("frame decoding error: {err:?}");
2681                            continue;
2682                        }
2683                    };
2684
2685                    if let Frame::Padding = frame {
2686                        continue;
2687                    };
2688
2689                    self.stats.frame_rx.record(&frame);
2690
2691                    if let Frame::Close(_) = frame {
2692                        trace!("draining");
2693                        self.state = State::Draining;
2694                        break;
2695                    }
2696                }
2697                return Ok(());
2698            }
2699            State::Draining | State::Drained => return Ok(()),
2700            State::Handshake(ref mut state) => state,
2701        };
2702
2703        match packet.header {
2704            Header::Retry {
2705                src_cid: rem_cid, ..
2706            } => {
2707                if self.side.is_server() {
2708                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2709                }
2710
2711                if self.total_authed_packets > 1
2712                            || packet.payload.len() <= 16 // token + 16 byte tag
2713                            || !self.crypto.is_valid_retry(
2714                                &self.rem_cids.active(),
2715                                &packet.header_data,
2716                                &packet.payload,
2717                            )
2718                {
2719                    trace!("discarding invalid Retry");
2720                    // - After the client has received and processed an Initial or Retry
2721                    //   packet from the server, it MUST discard any subsequent Retry
2722                    //   packets that it receives.
2723                    // - A client MUST discard a Retry packet with a zero-length Retry Token
2724                    //   field.
2725                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
2726                    //   that cannot be validated
2727                    return Ok(());
2728                }
2729
2730                trace!("retrying with CID {}", rem_cid);
2731                let client_hello = state.client_hello.take().unwrap();
2732                self.retry_src_cid = Some(rem_cid);
2733                self.rem_cids.update_initial_cid(rem_cid);
2734                self.rem_handshake_cid = rem_cid;
2735
2736                let space = &mut self.spaces[SpaceId::Initial];
2737                if let Some(info) = space.take(0) {
2738                    self.on_packet_acked(now, 0, info);
2739                };
2740
2741                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after any retransmitted Initials
2742                self.spaces[SpaceId::Initial] = PacketSpace {
2743                    crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2744                    next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2745                    crypto_offset: client_hello.len() as u64,
2746                    ..PacketSpace::new(now)
2747                };
2748                self.spaces[SpaceId::Initial]
2749                    .pending
2750                    .crypto
2751                    .push_back(frame::Crypto {
2752                        offset: 0,
2753                        data: client_hello,
2754                    });
2755
2756                // Retransmit all 0-RTT data
2757                let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2758                for (pn, info) in zero_rtt {
2759                    self.remove_in_flight(pn, &info);
2760                    self.spaces[SpaceId::Data].pending |= info.retransmits;
2761                }
2762                self.streams.retransmit_all_for_0rtt();
2763
2764                let token_len = packet.payload.len() - 16;
2765                let ConnectionSide::Client { ref mut token, .. } = self.side else {
2766                    unreachable!("we already short-circuited if we're server");
2767                };
2768                *token = packet.payload.freeze().split_to(token_len);
2769                self.state = State::Handshake(state::Handshake {
2770                    expected_token: Bytes::new(),
2771                    rem_cid_set: false,
2772                    client_hello: None,
2773                });
2774                Ok(())
2775            }
2776            Header::Long {
2777                ty: LongType::Handshake,
2778                src_cid: rem_cid,
2779                ..
2780            } => {
2781                if rem_cid != self.rem_handshake_cid {
2782                    debug!(
2783                        "discarding packet with mismatched remote CID: {} != {}",
2784                        self.rem_handshake_cid, rem_cid
2785                    );
2786                    return Ok(());
2787                }
2788                self.on_path_validated();
2789
2790                self.process_early_payload(now, packet)?;
2791                if self.state.is_closed() {
2792                    return Ok(());
2793                }
2794
2795                if self.crypto.is_handshaking() {
2796                    trace!("handshake ongoing");
2797                    return Ok(());
2798                }
2799
2800                if self.side.is_client() {
2801                    // Client-only because server params were set from the client's Initial
2802                    let params =
2803                        self.crypto
2804                            .transport_parameters()?
2805                            .ok_or_else(|| TransportError {
2806                                code: TransportErrorCode::crypto(0x6d),
2807                                frame: None,
2808                                reason: "transport parameters missing".into(),
2809                            })?;
2810
2811                    if self.has_0rtt() {
2812                        if !self.crypto.early_data_accepted().unwrap() {
2813                            debug_assert!(self.side.is_client());
2814                            debug!("0-RTT rejected");
2815                            self.accepted_0rtt = false;
2816                            self.streams.zero_rtt_rejected();
2817
2818                            // Discard already-queued frames
2819                            self.spaces[SpaceId::Data].pending = Retransmits::default();
2820
2821                            // Discard 0-RTT packets
2822                            let sent_packets =
2823                                mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2824                            for (pn, packet) in sent_packets {
2825                                self.remove_in_flight(pn, &packet);
2826                            }
2827                        } else {
2828                            self.accepted_0rtt = true;
2829                            params.validate_resumption_from(&self.peer_params)?;
2830                        }
2831                    }
2832                    if let Some(token) = params.stateless_reset_token {
2833                        self.endpoint_events
2834                            .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2835                    }
2836                    self.handle_peer_params(params)?;
2837                    self.issue_first_cids(now);
2838                } else {
2839                    // Server-only
2840                    self.spaces[SpaceId::Data].pending.handshake_done = true;
2841                    self.discard_space(now, SpaceId::Handshake);
2842                }
2843
2844                self.events.push_back(Event::Connected);
2845                self.state = State::Established;
2846                trace!("established");
2847                Ok(())
2848            }
2849            Header::Initial(InitialHeader {
2850                src_cid: rem_cid, ..
2851            }) => {
2852                if !state.rem_cid_set {
2853                    trace!("switching remote CID to {}", rem_cid);
2854                    let mut state = state.clone();
2855                    self.rem_cids.update_initial_cid(rem_cid);
2856                    self.rem_handshake_cid = rem_cid;
2857                    self.orig_rem_cid = rem_cid;
2858                    state.rem_cid_set = true;
2859                    self.state = State::Handshake(state);
2860                } else if rem_cid != self.rem_handshake_cid {
2861                    debug!(
2862                        "discarding packet with mismatched remote CID: {} != {}",
2863                        self.rem_handshake_cid, rem_cid
2864                    );
2865                    return Ok(());
2866                }
2867
2868                let starting_space = self.highest_space;
2869                self.process_early_payload(now, packet)?;
2870
2871                if self.side.is_server()
2872                    && starting_space == SpaceId::Initial
2873                    && self.highest_space != SpaceId::Initial
2874                {
2875                    let params =
2876                        self.crypto
2877                            .transport_parameters()?
2878                            .ok_or_else(|| TransportError {
2879                                code: TransportErrorCode::crypto(0x6d),
2880                                frame: None,
2881                                reason: "transport parameters missing".into(),
2882                            })?;
2883                    self.handle_peer_params(params)?;
2884                    self.issue_first_cids(now);
2885                    self.init_0rtt();
2886                }
2887                Ok(())
2888            }
2889            Header::Long {
2890                ty: LongType::ZeroRtt,
2891                ..
2892            } => {
2893                self.process_payload(now, remote, number.unwrap(), packet)?;
2894                Ok(())
2895            }
2896            Header::VersionNegotiate { .. } => {
2897                if self.total_authed_packets > 1 {
2898                    return Ok(());
2899                }
2900                let supported = packet
2901                    .payload
2902                    .chunks(4)
2903                    .any(|x| match <[u8; 4]>::try_from(x) {
2904                        Ok(version) => self.version == u32::from_be_bytes(version),
2905                        Err(_) => false,
2906                    });
2907                if supported {
2908                    return Ok(());
2909                }
2910                debug!("remote doesn't support our version");
2911                Err(ConnectionError::VersionMismatch)
2912            }
2913            Header::Short { .. } => unreachable!(
2914                "short packets received during handshake are discarded in handle_packet"
2915            ),
2916        }
2917    }
2918
2919    /// Process an Initial or Handshake packet payload
2920    fn process_early_payload(
2921        &mut self,
2922        now: Instant,
2923        packet: Packet,
2924    ) -> Result<(), TransportError> {
2925        debug_assert_ne!(packet.header.space(), SpaceId::Data);
2926        let payload_len = packet.payload.len();
2927        let mut ack_eliciting = false;
2928        for result in frame::Iter::new(packet.payload.freeze())? {
2929            let frame = result?;
2930            let span = match frame {
2931                Frame::Padding => continue,
2932                _ => Some(trace_span!("frame", ty = %frame.ty())),
2933            };
2934
2935            self.stats.frame_rx.record(&frame);
2936
2937            let _guard = span.as_ref().map(|x| x.enter());
2938            ack_eliciting |= frame.is_ack_eliciting();
2939
2940            // Process frames
2941            match frame {
2942                Frame::Padding | Frame::Ping => {}
2943                Frame::Crypto(frame) => {
2944                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
2945                }
2946                Frame::Ack(ack) => {
2947                    self.on_ack_received(now, packet.header.space(), ack)?;
2948                }
2949                Frame::Close(reason) => {
2950                    self.error = Some(reason.into());
2951                    self.state = State::Draining;
2952                    return Ok(());
2953                }
2954                _ => {
2955                    let mut err =
2956                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
2957                    err.frame = Some(frame.ty());
2958                    return Err(err);
2959                }
2960            }
2961        }
2962
2963        if ack_eliciting {
2964            // In the initial and handshake spaces, ACKs must be sent immediately
2965            self.spaces[packet.header.space()]
2966                .pending_acks
2967                .set_immediate_ack_required();
2968        }
2969
2970        self.write_crypto();
2971        Ok(())
2972    }
2973
2974    fn process_payload(
2975        &mut self,
2976        now: Instant,
2977        remote: SocketAddr,
2978        number: u64,
2979        packet: Packet,
2980    ) -> Result<(), TransportError> {
2981        let payload = packet.payload.freeze();
2982        let mut is_probing_packet = true;
2983        let mut close = None;
2984        let payload_len = payload.len();
2985        let mut ack_eliciting = false;
2986        for result in frame::Iter::new(payload)? {
2987            let frame = result?;
2988            let span = match frame {
2989                Frame::Padding => continue,
2990                _ => Some(trace_span!("frame", ty = %frame.ty())),
2991            };
2992
2993            self.stats.frame_rx.record(&frame);
2994            // Crypto, Stream and Datagram frames are special cased in order no pollute
2995            // the log with payload data
2996            match &frame {
2997                Frame::Crypto(f) => {
2998                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
2999                }
3000                Frame::Stream(f) => {
3001                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
3002                }
3003                Frame::Datagram(f) => {
3004                    trace!(len = f.data.len(), "got datagram frame");
3005                }
3006                f => {
3007                    trace!("got frame {:?}", f);
3008                }
3009            }
3010
3011            let _guard = span.as_ref().map(|x| x.enter());
3012            if packet.header.is_0rtt() {
3013                match frame {
3014                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
3015                        return Err(TransportError::PROTOCOL_VIOLATION(
3016                            "illegal frame type in 0-RTT",
3017                        ));
3018                    }
3019                    _ => {}
3020                }
3021            }
3022            ack_eliciting |= frame.is_ack_eliciting();
3023
3024            // Check whether this could be a probing packet
3025            match frame {
3026                Frame::Padding
3027                | Frame::PathChallenge(_)
3028                | Frame::PathResponse(_)
3029                | Frame::NewConnectionId(_) => {}
3030                _ => {
3031                    is_probing_packet = false;
3032                }
3033            }
3034            match frame {
3035                Frame::Crypto(frame) => {
3036                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
3037                }
3038                Frame::Stream(frame) => {
3039                    if self.streams.received(frame, payload_len)?.should_transmit() {
3040                        self.spaces[SpaceId::Data].pending.max_data = true;
3041                    }
3042                }
3043                Frame::Ack(ack) => {
3044                    self.on_ack_received(now, SpaceId::Data, ack)?;
3045                }
3046                Frame::Padding | Frame::Ping => {}
3047                Frame::Close(reason) => {
3048                    close = Some(reason);
3049                }
3050                Frame::PathChallenge(token) => {
3051                    self.path_responses.push(number, token, remote);
3052                    if remote == self.path.remote {
3053                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
3054                        // attack. Send a non-probing packet to recover the active path.
3055                        match self.peer_supports_ack_frequency() {
3056                            true => self.immediate_ack(),
3057                            false => self.ping(),
3058                        }
3059                    }
3060                }
3061                Frame::PathResponse(token) => {
3062                    if self.path.challenge == Some(token) && remote == self.path.remote {
3063                        trace!("new path validated");
3064                        self.timers.stop(Timer::PathValidation);
3065                        self.path.challenge = None;
3066                        self.path.validated = true;
3067                        if let Some((_, ref mut prev_path)) = self.prev_path {
3068                            prev_path.challenge = None;
3069                            prev_path.challenge_pending = false;
3070                        }
3071                        self.on_path_validated();
3072                    } else if let Some(nat_traversal) = &mut self.nat_traversal {
3073                        // Check if this is a response to NAT traversal PATH_CHALLENGE
3074                        match nat_traversal.handle_validation_success(remote, token, now) {
3075                            Ok(sequence) => {
3076                                trace!("NAT traversal candidate {} validated for sequence {}", remote, sequence);
3077                                
3078                                // Check if this was part of a coordination round
3079                                if nat_traversal.handle_coordination_success(remote, now) {
3080                                    trace!("Coordination succeeded via {}", remote);
3081                                    
3082                                    // Check if we should migrate to this better path
3083                                    let can_migrate = match &self.side {
3084                                        ConnectionSide::Client { .. } => true, // Clients can always migrate
3085                                        ConnectionSide::Server { server_config } => server_config.migration,
3086                                    };
3087                                    
3088                                    if can_migrate {
3089                                        // Get the best paths to see if this new one is better
3090                                        let best_pairs = nat_traversal.get_best_succeeded_pairs();
3091                                        if let Some(best) = best_pairs.first() {
3092                                            if best.remote_addr == remote && best.remote_addr != self.path.remote {
3093                                                debug!("NAT traversal found better path, initiating migration");
3094                                                // Trigger migration to the better NAT-traversed path
3095                                                if let Err(e) = self.migrate_to_nat_traversal_path(now) {
3096                                                    warn!("Failed to migrate to NAT traversal path: {:?}", e);
3097                                                }
3098                                            }
3099                                        }
3100                                    }
3101                                } else {
3102                                    // Mark the candidate pair as succeeded for regular validation
3103                                    if nat_traversal.mark_pair_succeeded(remote) {
3104                                        trace!("NAT traversal pair succeeded for {}", remote);
3105                                    }
3106                                }
3107                            }
3108                            Err(NatTraversalError::ChallengeMismatch) => {
3109                                debug!("PATH_RESPONSE challenge mismatch for NAT candidate {}", remote);
3110                            }
3111                            Err(e) => {
3112                                debug!("NAT traversal validation error: {}", e);
3113                            }
3114                        }
3115                    } else {
3116                        debug!(token, "ignoring invalid PATH_RESPONSE");
3117                    }
3118                }
3119                Frame::MaxData(bytes) => {
3120                    self.streams.received_max_data(bytes);
3121                }
3122                Frame::MaxStreamData { id, offset } => {
3123                    self.streams.received_max_stream_data(id, offset)?;
3124                }
3125                Frame::MaxStreams { dir, count } => {
3126                    self.streams.received_max_streams(dir, count)?;
3127                }
3128                Frame::ResetStream(frame) => {
3129                    if self.streams.received_reset(frame)?.should_transmit() {
3130                        self.spaces[SpaceId::Data].pending.max_data = true;
3131                    }
3132                }
3133                Frame::DataBlocked { offset } => {
3134                    debug!(offset, "peer claims to be blocked at connection level");
3135                }
3136                Frame::StreamDataBlocked { id, offset } => {
3137                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
3138                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
3139                        return Err(TransportError::STREAM_STATE_ERROR(
3140                            "STREAM_DATA_BLOCKED on send-only stream",
3141                        ));
3142                    }
3143                    debug!(
3144                        stream = %id,
3145                        offset, "peer claims to be blocked at stream level"
3146                    );
3147                }
3148                Frame::StreamsBlocked { dir, limit } => {
3149                    if limit > MAX_STREAM_COUNT {
3150                        return Err(TransportError::FRAME_ENCODING_ERROR(
3151                            "unrepresentable stream limit",
3152                        ));
3153                    }
3154                    debug!(
3155                        "peer claims to be blocked opening more than {} {} streams",
3156                        limit, dir
3157                    );
3158                }
3159                Frame::StopSending(frame::StopSending { id, error_code }) => {
3160                    if id.initiator() != self.side.side() {
3161                        if id.dir() == Dir::Uni {
3162                            debug!("got STOP_SENDING on recv-only {}", id);
3163                            return Err(TransportError::STREAM_STATE_ERROR(
3164                                "STOP_SENDING on recv-only stream",
3165                            ));
3166                        }
3167                    } else if self.streams.is_local_unopened(id) {
3168                        return Err(TransportError::STREAM_STATE_ERROR(
3169                            "STOP_SENDING on unopened stream",
3170                        ));
3171                    }
3172                    self.streams.received_stop_sending(id, error_code);
3173                }
3174                Frame::RetireConnectionId { sequence } => {
3175                    let allow_more_cids = self
3176                        .local_cid_state
3177                        .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
3178                    self.endpoint_events
3179                        .push_back(EndpointEventInner::RetireConnectionId(
3180                            now,
3181                            sequence,
3182                            allow_more_cids,
3183                        ));
3184                }
3185                Frame::NewConnectionId(frame) => {
3186                    trace!(
3187                        sequence = frame.sequence,
3188                        id = %frame.id,
3189                        retire_prior_to = frame.retire_prior_to,
3190                    );
3191                    if self.rem_cids.active().is_empty() {
3192                        return Err(TransportError::PROTOCOL_VIOLATION(
3193                            "NEW_CONNECTION_ID when CIDs aren't in use",
3194                        ));
3195                    }
3196                    if frame.retire_prior_to > frame.sequence {
3197                        return Err(TransportError::PROTOCOL_VIOLATION(
3198                            "NEW_CONNECTION_ID retiring unissued CIDs",
3199                        ));
3200                    }
3201
3202                    use crate::cid_queue::InsertError;
3203                    match self.rem_cids.insert(frame) {
3204                        Ok(None) => {}
3205                        Ok(Some((retired, reset_token))) => {
3206                            let pending_retired =
3207                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
3208                            /// Ensure `pending_retired` cannot grow without bound. Limit is
3209                            /// somewhat arbitrary but very permissive.
3210                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
3211                            // We don't bother counting in-flight frames because those are bounded
3212                            // by congestion control.
3213                            if (pending_retired.len() as u64)
3214                                .saturating_add(retired.end.saturating_sub(retired.start))
3215                                > MAX_PENDING_RETIRED_CIDS
3216                            {
3217                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
3218                                    "queued too many retired CIDs",
3219                                ));
3220                            }
3221                            pending_retired.extend(retired);
3222                            self.set_reset_token(reset_token);
3223                        }
3224                        Err(InsertError::ExceedsLimit) => {
3225                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
3226                        }
3227                        Err(InsertError::Retired) => {
3228                            trace!("discarding already-retired");
3229                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
3230                            // range of connection IDs larger than the active connection ID limit
3231                            // was retired all at once via retire_prior_to.
3232                            self.spaces[SpaceId::Data]
3233                                .pending
3234                                .retire_cids
3235                                .push(frame.sequence);
3236                            continue;
3237                        }
3238                    };
3239
3240                    if self.side.is_server() && self.rem_cids.active_seq() == 0 {
3241                        // We're a server still using the initial remote CID for the client, so
3242                        // let's switch immediately to enable clientside stateless resets.
3243                        self.update_rem_cid();
3244                    }
3245                }
3246                Frame::NewToken(NewToken { token }) => {
3247                    let ConnectionSide::Client {
3248                        token_store,
3249                        server_name,
3250                        ..
3251                    } = &self.side
3252                    else {
3253                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
3254                    };
3255                    if token.is_empty() {
3256                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
3257                    }
3258                    trace!("got new token");
3259                    token_store.insert(server_name, token);
3260                }
3261                Frame::Datagram(datagram) => {
3262                    if self
3263                        .datagrams
3264                        .received(datagram, &self.config.datagram_receive_buffer_size)?
3265                    {
3266                        self.events.push_back(Event::DatagramReceived);
3267                    }
3268                }
3269                Frame::AckFrequency(ack_frequency) => {
3270                    // This frame can only be sent in the Data space
3271                    let space = &mut self.spaces[SpaceId::Data];
3272
3273                    if !self
3274                        .ack_frequency
3275                        .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
3276                    {
3277                        // The AckFrequency frame is stale (we have already received a more recent one)
3278                        continue;
3279                    }
3280
3281                    // Our `max_ack_delay` has been updated, so we may need to adjust its associated
3282                    // timeout
3283                    if let Some(timeout) = space
3284                        .pending_acks
3285                        .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
3286                    {
3287                        self.timers.set(Timer::MaxAckDelay, timeout);
3288                    }
3289                }
3290                Frame::ImmediateAck => {
3291                    // This frame can only be sent in the Data space
3292                    self.spaces[SpaceId::Data]
3293                        .pending_acks
3294                        .set_immediate_ack_required();
3295                }
3296                Frame::HandshakeDone => {
3297                    if self.side.is_server() {
3298                        return Err(TransportError::PROTOCOL_VIOLATION(
3299                            "client sent HANDSHAKE_DONE",
3300                        ));
3301                    }
3302                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
3303                        self.discard_space(now, SpaceId::Handshake);
3304                    }
3305                }
3306                Frame::AddAddress(add_address) => {
3307                    self.handle_add_address(&add_address, now)?;
3308                }
3309                Frame::PunchMeNow(punch_me_now) => {
3310                    self.handle_punch_me_now(&punch_me_now, now)?;
3311                }
3312                Frame::RemoveAddress(remove_address) => {
3313                    self.handle_remove_address(&remove_address)?;
3314                }
3315            }
3316        }
3317
3318        let space = &mut self.spaces[SpaceId::Data];
3319        if space
3320            .pending_acks
3321            .packet_received(now, number, ack_eliciting, &space.dedup)
3322        {
3323            self.timers
3324                .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
3325        }
3326
3327        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
3328        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
3329        // are only freed, and hence only issue credit, once the application has been notified
3330        // during a read on the stream.
3331        let pending = &mut self.spaces[SpaceId::Data].pending;
3332        self.streams.queue_max_stream_id(pending);
3333
3334        if let Some(reason) = close {
3335            self.error = Some(reason.into());
3336            self.state = State::Draining;
3337            self.close = true;
3338        }
3339
3340        if remote != self.path.remote
3341            && !is_probing_packet
3342            && number == self.spaces[SpaceId::Data].rx_packet
3343        {
3344            let ConnectionSide::Server { ref server_config } = self.side else {
3345                panic!("packets from unknown remote should be dropped by clients");
3346            };
3347            debug_assert!(
3348                server_config.migration,
3349                "migration-initiating packets should have been dropped immediately"
3350            );
3351            self.migrate(now, remote);
3352            // Break linkability, if possible
3353            self.update_rem_cid();
3354            self.spin = false;
3355        }
3356
3357        Ok(())
3358    }
3359
3360    fn migrate(&mut self, now: Instant, remote: SocketAddr) {
3361        trace!(%remote, "migration initiated");
3362        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
3363        // Note that the congestion window will not grow until validation terminates. Helps mitigate
3364        // amplification attacks performed by spoofing source addresses.
3365        let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
3366            PathData::from_previous(remote, &self.path, now)
3367        } else {
3368            let peer_max_udp_payload_size =
3369                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
3370                    .unwrap_or(u16::MAX);
3371            PathData::new(
3372                remote,
3373                self.allow_mtud,
3374                Some(peer_max_udp_payload_size),
3375                now,
3376                &self.config,
3377            )
3378        };
3379        new_path.challenge = Some(self.rng.gen());
3380        new_path.challenge_pending = true;
3381        let prev_pto = self.pto(SpaceId::Data);
3382
3383        let mut prev = mem::replace(&mut self.path, new_path);
3384        // Don't clobber the original path if the previous one hasn't been validated yet
3385        if prev.challenge.is_none() {
3386            prev.challenge = Some(self.rng.gen());
3387            prev.challenge_pending = true;
3388            // We haven't updated the remote CID yet, this captures the remote CID we were using on
3389            // the previous path.
3390            self.prev_path = Some((self.rem_cids.active(), prev));
3391        }
3392
3393        self.timers.set(
3394            Timer::PathValidation,
3395            now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3396        );
3397    }
3398
3399    /// Handle a change in the local address, i.e. an active migration
3400    pub fn local_address_changed(&mut self) {
3401        self.update_rem_cid();
3402        self.ping();
3403    }
3404    
3405    /// Migrate to a better path discovered through NAT traversal
3406    pub fn migrate_to_nat_traversal_path(&mut self, now: Instant) -> Result<(), TransportError> {
3407        // Extract necessary data before mutable operations
3408        let (remote_addr, local_addr) = {
3409            let nat_state = self.nat_traversal.as_ref()
3410                .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
3411            
3412            // Get the best validated NAT traversal path
3413            let best_pairs = nat_state.get_best_succeeded_pairs();
3414            if best_pairs.is_empty() {
3415                return Err(TransportError::PROTOCOL_VIOLATION("No validated NAT traversal paths"));
3416            }
3417            
3418            // Select the best path (highest priority that's different from current)
3419            let best_path = best_pairs.iter()
3420                .find(|pair| pair.remote_addr != self.path.remote)
3421                .or_else(|| best_pairs.first());
3422            
3423            let best_path = best_path
3424                .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("No suitable NAT traversal path"))?;
3425            
3426            debug!("Migrating to NAT traversal path: {} -> {} (priority: {})",
3427                   self.path.remote, best_path.remote_addr, best_path.priority);
3428            
3429            (best_path.remote_addr, best_path.local_addr)
3430        };
3431        
3432        // Perform the migration
3433        self.migrate(now, remote_addr);
3434        
3435        // Update local address if needed
3436        if local_addr != SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0) {
3437            self.local_ip = Some(local_addr.ip());
3438        }
3439        
3440        // Queue a PATH_CHALLENGE to confirm the new path
3441        self.path.challenge_pending = true;
3442        
3443        Ok(())
3444    }
3445
3446    /// Switch to a previously unused remote connection ID, if possible
3447    fn update_rem_cid(&mut self) {
3448        let (reset_token, retired) = match self.rem_cids.next() {
3449            Some(x) => x,
3450            None => return,
3451        };
3452
3453        // Retire the current remote CID and any CIDs we had to skip.
3454        self.spaces[SpaceId::Data]
3455            .pending
3456            .retire_cids
3457            .extend(retired);
3458        self.set_reset_token(reset_token);
3459    }
3460
3461    fn set_reset_token(&mut self, reset_token: ResetToken) {
3462        self.endpoint_events
3463            .push_back(EndpointEventInner::ResetToken(
3464                self.path.remote,
3465                reset_token,
3466            ));
3467        self.peer_params.stateless_reset_token = Some(reset_token);
3468    }
3469
3470    /// Issue an initial set of connection IDs to the peer upon connection
3471    fn issue_first_cids(&mut self, now: Instant) {
3472        if self.local_cid_state.cid_len() == 0 {
3473            return;
3474        }
3475
3476        // Subtract 1 to account for the CID we supplied while handshaking
3477        let mut n = self.peer_params.issue_cids_limit() - 1;
3478        if let ConnectionSide::Server { server_config } = &self.side {
3479            if server_config.has_preferred_address() {
3480                // We also sent a CID in the transport parameters
3481                n -= 1;
3482            }
3483        }
3484        self.endpoint_events
3485            .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3486    }
3487
3488    fn populate_packet(
3489        &mut self,
3490        now: Instant,
3491        space_id: SpaceId,
3492        buf: &mut Vec<u8>,
3493        max_size: usize,
3494        pn: u64,
3495    ) -> SentFrames {
3496        let mut sent = SentFrames::default();
3497        let space = &mut self.spaces[space_id];
3498        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3499        space.pending_acks.maybe_ack_non_eliciting();
3500
3501        // HANDSHAKE_DONE
3502        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3503            buf.write(frame::FrameType::HANDSHAKE_DONE);
3504            sent.retransmits.get_or_create().handshake_done = true;
3505            // This is just a u8 counter and the frame is typically just sent once
3506            self.stats.frame_tx.handshake_done =
3507                self.stats.frame_tx.handshake_done.saturating_add(1);
3508        }
3509
3510        // PING
3511        if mem::replace(&mut space.ping_pending, false) {
3512            trace!("PING");
3513            buf.write(frame::FrameType::PING);
3514            sent.non_retransmits = true;
3515            self.stats.frame_tx.ping += 1;
3516        }
3517
3518        // IMMEDIATE_ACK
3519        if mem::replace(&mut space.immediate_ack_pending, false) {
3520            trace!("IMMEDIATE_ACK");
3521            buf.write(frame::FrameType::IMMEDIATE_ACK);
3522            sent.non_retransmits = true;
3523            self.stats.frame_tx.immediate_ack += 1;
3524        }
3525
3526        // ACK
3527        if space.pending_acks.can_send() {
3528            Self::populate_acks(
3529                now,
3530                self.receiving_ecn,
3531                &mut sent,
3532                space,
3533                buf,
3534                &mut self.stats,
3535            );
3536        }
3537
3538        // ACK_FREQUENCY
3539        if mem::replace(&mut space.pending.ack_frequency, false) {
3540            let sequence_number = self.ack_frequency.next_sequence_number();
3541
3542            // Safe to unwrap because this is always provided when ACK frequency is enabled
3543            let config = self.config.ack_frequency_config.as_ref().unwrap();
3544
3545            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
3546            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3547                self.path.rtt.get(),
3548                config,
3549                &self.peer_params,
3550            );
3551
3552            trace!(?max_ack_delay, "ACK_FREQUENCY");
3553
3554            frame::AckFrequency {
3555                sequence: sequence_number,
3556                ack_eliciting_threshold: config.ack_eliciting_threshold,
3557                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3558                reordering_threshold: config.reordering_threshold,
3559            }
3560            .encode(buf);
3561
3562            sent.retransmits.get_or_create().ack_frequency = true;
3563
3564            self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3565            self.stats.frame_tx.ack_frequency += 1;
3566        }
3567
3568        // PATH_CHALLENGE
3569        if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3570            // Transmit challenges with every outgoing frame on an unvalidated path
3571            if let Some(token) = self.path.challenge {
3572                // But only send a packet solely for that purpose at most once
3573                self.path.challenge_pending = false;
3574                sent.non_retransmits = true;
3575                sent.requires_padding = true;
3576                trace!("PATH_CHALLENGE {:08x}", token);
3577                buf.write(frame::FrameType::PATH_CHALLENGE);
3578                buf.write(token);
3579                self.stats.frame_tx.path_challenge += 1;
3580            }
3581            
3582            // TODO: Send NAT traversal PATH_CHALLENGE frames
3583            // Currently, the packet sending infrastructure only supports sending to the
3584            // primary path (self.path.remote). To properly support NAT traversal, we need
3585            // to modify poll_transmit and the packet building logic to generate packets
3586            // for multiple destination addresses. For now, NAT traversal challenges are
3587            // queued in self.nat_traversal_challenges but not yet sent.
3588            // This will be implemented in a future phase when we add multi-destination
3589            // packet support to the endpoint.
3590        }
3591
3592        // PATH_RESPONSE
3593        if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3594            if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3595                sent.non_retransmits = true;
3596                sent.requires_padding = true;
3597                trace!("PATH_RESPONSE {:08x}", token);
3598                buf.write(frame::FrameType::PATH_RESPONSE);
3599                buf.write(token);
3600                self.stats.frame_tx.path_response += 1;
3601            }
3602        }
3603
3604        // CRYPTO
3605        while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3606            let mut frame = match space.pending.crypto.pop_front() {
3607                Some(x) => x,
3608                None => break,
3609            };
3610
3611            // Calculate the maximum amount of crypto data we can store in the buffer.
3612            // Since the offset is known, we can reserve the exact size required to encode it.
3613            // For length we reserve 2bytes which allows to encode up to 2^14,
3614            // which is more than what fits into normally sized QUIC frames.
3615            let max_crypto_data_size = max_size
3616                - buf.len()
3617                - 1 // Frame Type
3618                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3619                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
3620
3621            let len = frame
3622                .data
3623                .len()
3624                .min(2usize.pow(14) - 1)
3625                .min(max_crypto_data_size);
3626
3627            let data = frame.data.split_to(len);
3628            let truncated = frame::Crypto {
3629                offset: frame.offset,
3630                data,
3631            };
3632            trace!(
3633                "CRYPTO: off {} len {}",
3634                truncated.offset,
3635                truncated.data.len()
3636            );
3637            truncated.encode(buf);
3638            self.stats.frame_tx.crypto += 1;
3639            sent.retransmits.get_or_create().crypto.push_back(truncated);
3640            if !frame.data.is_empty() {
3641                frame.offset += len as u64;
3642                space.pending.crypto.push_front(frame);
3643            }
3644        }
3645
3646        if space_id == SpaceId::Data {
3647            self.streams.write_control_frames(
3648                buf,
3649                &mut space.pending,
3650                &mut sent.retransmits,
3651                &mut self.stats.frame_tx,
3652                max_size,
3653            );
3654        }
3655
3656        // NEW_CONNECTION_ID
3657        while buf.len() + 44 < max_size {
3658            let issued = match space.pending.new_cids.pop() {
3659                Some(x) => x,
3660                None => break,
3661            };
3662            trace!(
3663                sequence = issued.sequence,
3664                id = %issued.id,
3665                "NEW_CONNECTION_ID"
3666            );
3667            frame::NewConnectionId {
3668                sequence: issued.sequence,
3669                retire_prior_to: self.local_cid_state.retire_prior_to(),
3670                id: issued.id,
3671                reset_token: issued.reset_token,
3672            }
3673            .encode(buf);
3674            sent.retransmits.get_or_create().new_cids.push(issued);
3675            self.stats.frame_tx.new_connection_id += 1;
3676        }
3677
3678        // RETIRE_CONNECTION_ID
3679        while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3680            let seq = match space.pending.retire_cids.pop() {
3681                Some(x) => x,
3682                None => break,
3683            };
3684            trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3685            buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3686            buf.write_var(seq);
3687            sent.retransmits.get_or_create().retire_cids.push(seq);
3688            self.stats.frame_tx.retire_connection_id += 1;
3689        }
3690
3691        // DATAGRAM
3692        let mut sent_datagrams = false;
3693        while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3694            match self.datagrams.write(buf, max_size) {
3695                true => {
3696                    sent_datagrams = true;
3697                    sent.non_retransmits = true;
3698                    self.stats.frame_tx.datagram += 1;
3699                }
3700                false => break,
3701            }
3702        }
3703        if self.datagrams.send_blocked && sent_datagrams {
3704            self.events.push_back(Event::DatagramsUnblocked);
3705            self.datagrams.send_blocked = false;
3706        }
3707
3708        // NEW_TOKEN
3709        while let Some(remote_addr) = space.pending.new_tokens.pop() {
3710            debug_assert_eq!(space_id, SpaceId::Data);
3711            let ConnectionSide::Server { server_config } = &self.side else {
3712                panic!("NEW_TOKEN frames should not be enqueued by clients");
3713            };
3714
3715            if remote_addr != self.path.remote {
3716                // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
3717                // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
3718                // frames upon an path change. Instead, when the new path becomes validated,
3719                // NEW_TOKEN frames may be enqueued for the new path instead.
3720                continue;
3721            }
3722
3723            let token = Token::new(
3724                TokenPayload::Validation {
3725                    ip: remote_addr.ip(),
3726                    issued: server_config.time_source.now(),
3727                },
3728                &mut self.rng,
3729            );
3730            let new_token = NewToken {
3731                token: token.encode(&*server_config.token_key).into(),
3732            };
3733
3734            if buf.len() + new_token.size() >= max_size {
3735                space.pending.new_tokens.push(remote_addr);
3736                break;
3737            }
3738
3739            new_token.encode(buf);
3740            sent.retransmits
3741                .get_or_create()
3742                .new_tokens
3743                .push(remote_addr);
3744            self.stats.frame_tx.new_token += 1;
3745        }
3746
3747        // NAT traversal frames - AddAddress
3748        while buf.len() + frame::AddAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3749            let add_address = match space.pending.add_addresses.pop() {
3750                Some(x) => x,
3751                None => break,
3752            };
3753            trace!(
3754                sequence = %add_address.sequence,
3755                address = %add_address.address,
3756                "ADD_ADDRESS"
3757            );
3758            add_address.encode(buf);
3759            sent.retransmits.get_or_create().add_addresses.push(add_address);
3760            self.stats.frame_tx.add_address += 1;
3761        }
3762
3763        // NAT traversal frames - PunchMeNow
3764        while buf.len() + frame::PunchMeNow::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3765            let punch_me_now = match space.pending.punch_me_now.pop() {
3766                Some(x) => x,
3767                None => break,
3768            };
3769            trace!(
3770                round = %punch_me_now.round,
3771                target_sequence = %punch_me_now.target_sequence,
3772                "PUNCH_ME_NOW"
3773            );
3774            punch_me_now.encode(buf);
3775            sent.retransmits.get_or_create().punch_me_now.push(punch_me_now);
3776            self.stats.frame_tx.punch_me_now += 1;
3777        }
3778
3779        // NAT traversal frames - RemoveAddress
3780        while buf.len() + frame::RemoveAddress::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3781            let remove_address = match space.pending.remove_addresses.pop() {
3782                Some(x) => x,
3783                None => break,
3784            };
3785            trace!(
3786                sequence = %remove_address.sequence,
3787                "REMOVE_ADDRESS"
3788            );
3789            remove_address.encode(buf);
3790            sent.retransmits.get_or_create().remove_addresses.push(remove_address);
3791            self.stats.frame_tx.remove_address += 1;
3792        }
3793
3794        // STREAM
3795        if space_id == SpaceId::Data {
3796            sent.stream_frames =
3797                self.streams
3798                    .write_stream_frames(buf, max_size, self.config.send_fairness);
3799            self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
3800        }
3801
3802        sent
3803    }
3804
3805    /// Write pending ACKs into a buffer
3806    ///
3807    /// This method assumes ACKs are pending, and should only be called if
3808    /// `!PendingAcks::ranges().is_empty()` returns `true`.
3809    fn populate_acks(
3810        now: Instant,
3811        receiving_ecn: bool,
3812        sent: &mut SentFrames,
3813        space: &mut PacketSpace,
3814        buf: &mut Vec<u8>,
3815        stats: &mut ConnectionStats,
3816    ) {
3817        debug_assert!(!space.pending_acks.ranges().is_empty());
3818
3819        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
3820        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
3821        let ecn = if receiving_ecn {
3822            Some(&space.ecn_counters)
3823        } else {
3824            None
3825        };
3826        sent.largest_acked = space.pending_acks.ranges().max();
3827
3828        let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
3829
3830        // TODO: This should come from `TransportConfig` if that gets configurable.
3831        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
3832        let delay = delay_micros >> ack_delay_exp.into_inner();
3833
3834        trace!(
3835            "ACK {:?}, Delay = {}us",
3836            space.pending_acks.ranges(),
3837            delay_micros
3838        );
3839
3840        frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
3841        stats.frame_tx.acks += 1;
3842    }
3843
3844    fn close_common(&mut self) {
3845        trace!("connection closed");
3846        for &timer in &Timer::VALUES {
3847            self.timers.stop(timer);
3848        }
3849    }
3850
3851    fn set_close_timer(&mut self, now: Instant) {
3852        self.timers
3853            .set(Timer::Close, now + 3 * self.pto(self.highest_space));
3854    }
3855
3856    /// Handle transport parameters received from the peer
3857    fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
3858        if Some(self.orig_rem_cid) != params.initial_src_cid
3859            || (self.side.is_client()
3860                && (Some(self.initial_dst_cid) != params.original_dst_cid
3861                    || self.retry_src_cid != params.retry_src_cid))
3862        {
3863            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
3864                "CID authentication failure",
3865            ));
3866        }
3867
3868        self.set_peer_params(params);
3869
3870        Ok(())
3871    }
3872
3873    fn set_peer_params(&mut self, params: TransportParameters) {
3874        self.streams.set_params(&params);
3875        self.idle_timeout =
3876            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
3877        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
3878        if let Some(ref info) = params.preferred_address {
3879            self.rem_cids.insert(frame::NewConnectionId {
3880                sequence: 1,
3881                id: info.connection_id,
3882                reset_token: info.stateless_reset_token,
3883                retire_prior_to: 0,
3884            }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
3885        }
3886        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
3887        
3888        // Handle NAT traversal capability negotiation
3889        self.negotiate_nat_traversal_capability(&params);
3890        
3891        self.peer_params = params;
3892        self.path.mtud.on_peer_max_udp_payload_size_received(
3893            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
3894        );
3895    }
3896
3897    /// Negotiate NAT traversal capability between local and peer configurations
3898    fn negotiate_nat_traversal_capability(&mut self, params: &TransportParameters) {
3899        // Check if peer supports NAT traversal
3900        let peer_nat_config = match &params.nat_traversal {
3901            Some(config) => config,
3902            None => {
3903                // Peer doesn't support NAT traversal - handle backward compatibility
3904                if self.config.nat_traversal_config.is_some() {
3905                    debug!("Peer does not support NAT traversal, maintaining backward compatibility");
3906                    self.emit_nat_traversal_capability_event(false);
3907                    
3908                    // Set connection state to indicate NAT traversal is not available
3909                    self.set_nat_traversal_compatibility_mode(false);
3910                }
3911                return;
3912            }
3913        };
3914
3915        // Check if we support NAT traversal locally
3916        let local_nat_config = match &self.config.nat_traversal_config {
3917            Some(config) => config,
3918            None => {
3919                debug!("NAT traversal not enabled locally, ignoring peer support");
3920                self.emit_nat_traversal_capability_event(false);
3921                self.set_nat_traversal_compatibility_mode(false);
3922                return;
3923            }
3924        };
3925
3926        // Both peers support NAT traversal - proceed with capability negotiation
3927        info!("Both peers support NAT traversal, negotiating capabilities");
3928        
3929        // Validate role compatibility and negotiate parameters
3930        match self.negotiate_nat_traversal_parameters(local_nat_config, peer_nat_config) {
3931            Ok(negotiated_config) => {
3932                info!("NAT traversal capability negotiated successfully");
3933                self.emit_nat_traversal_capability_event(true);
3934                
3935                // Initialize NAT traversal with negotiated parameters
3936                self.init_nat_traversal_with_negotiated_config(&negotiated_config);
3937                
3938                // Set connection state to indicate NAT traversal is available
3939                self.set_nat_traversal_compatibility_mode(true);
3940                
3941                // Start NAT traversal process if we're in a client role
3942                if matches!(negotiated_config.role, crate::transport_parameters::NatTraversalRole::Client) {
3943                    self.initiate_nat_traversal_process();
3944                }
3945            }
3946            Err(e) => {
3947                warn!("NAT traversal capability negotiation failed: {}", e);
3948                self.emit_nat_traversal_capability_event(false);
3949                self.set_nat_traversal_compatibility_mode(false);
3950            }
3951        }
3952    }
3953
3954    /// Validate that NAT traversal roles are compatible
3955    fn validate_nat_traversal_roles(
3956        &self,
3957        local_config: &crate::transport_parameters::NatTraversalConfig,
3958        peer_config: &crate::transport_parameters::NatTraversalConfig,
3959    ) -> Result<(), String> {
3960
3961        // Check for invalid role combinations
3962        match (&local_config.role, &peer_config.role) {
3963            // Both bootstrap nodes - this is unusual but allowed
3964            (crate::transport_parameters::NatTraversalRole::Bootstrap, crate::transport_parameters::NatTraversalRole::Bootstrap) => {
3965                debug!("Both endpoints are bootstrap nodes - unusual but allowed");
3966            }
3967            // Client-Server combinations are ideal
3968            (crate::transport_parameters::NatTraversalRole::Client, crate::transport_parameters::NatTraversalRole::Server { .. }) | 
3969            (crate::transport_parameters::NatTraversalRole::Server { .. }, crate::transport_parameters::NatTraversalRole::Client) => {
3970                debug!("Client-Server NAT traversal role combination");
3971            }
3972            // Bootstrap can coordinate with anyone
3973            (crate::transport_parameters::NatTraversalRole::Bootstrap, _) | (_, crate::transport_parameters::NatTraversalRole::Bootstrap) => {
3974                debug!("Bootstrap node coordination");
3975            }
3976            // Client-Client requires bootstrap coordination
3977            (crate::transport_parameters::NatTraversalRole::Client, crate::transport_parameters::NatTraversalRole::Client) => {
3978                debug!("Client-Client connection requires bootstrap coordination");
3979            }
3980            // Server-Server is allowed but may need coordination
3981            (crate::transport_parameters::NatTraversalRole::Server { .. }, crate::transport_parameters::NatTraversalRole::Server { .. }) => {
3982                debug!("Server-Server connection");
3983            }
3984        }
3985
3986        Ok(())
3987    }
3988
3989    /// Emit NAT traversal capability negotiation event
3990    fn emit_nat_traversal_capability_event(&mut self, negotiated: bool) {
3991        // For now, we'll just log the event
3992        // In a full implementation, this could emit an event that applications can listen to
3993        if negotiated {
3994            info!("NAT traversal capability successfully negotiated");
3995        } else {
3996            info!("NAT traversal capability not available (peer or local support missing)");
3997        }
3998        
3999        // Could add to events queue if needed:
4000        // self.events.push_back(Event::NatTraversalCapability { negotiated });
4001    }
4002
4003    /// Set NAT traversal compatibility mode for backward compatibility
4004    fn set_nat_traversal_compatibility_mode(&mut self, enabled: bool) {
4005        if enabled {
4006            debug!("NAT traversal enabled for this connection");
4007            // Connection supports NAT traversal - no special handling needed
4008        } else {
4009            debug!("NAT traversal disabled for this connection (backward compatibility mode)");
4010            // Ensure NAT traversal state is cleared if it was partially initialized
4011            if self.nat_traversal.is_some() {
4012                warn!("Clearing NAT traversal state due to compatibility mode");
4013                self.nat_traversal = None;
4014            }
4015        }
4016    }
4017
4018    /// Negotiate NAT traversal parameters between local and peer configurations
4019    fn negotiate_nat_traversal_parameters(
4020        &self,
4021        local_config: &crate::transport_parameters::NatTraversalConfig,
4022        peer_config: &crate::transport_parameters::NatTraversalConfig,
4023    ) -> Result<crate::transport_parameters::NatTraversalConfig, String> {
4024
4025        // Validate role compatibility first
4026        self.validate_nat_traversal_roles(local_config, peer_config)?;
4027
4028        // Use local role as the basis for negotiation
4029        let negotiated_role = local_config.role;
4030
4031        // Negotiate parameters using the more restrictive values
4032        let max_candidates = local_config.max_candidates.into_inner()
4033            .min(peer_config.max_candidates.into_inner())
4034            .min(50); // Cap at reasonable limit
4035
4036        let coordination_timeout = local_config.coordination_timeout.into_inner()
4037            .min(peer_config.coordination_timeout.into_inner())
4038            .min(30000); // Cap at 30 seconds
4039
4040        let max_concurrent_attempts = local_config.max_concurrent_attempts.into_inner()
4041            .min(peer_config.max_concurrent_attempts.into_inner())
4042            .min(10); // Cap at reasonable limit
4043
4044        // Use local peer ID if available, otherwise use peer's
4045        let peer_id = local_config.peer_id.or(peer_config.peer_id);
4046
4047        let negotiated_config = crate::transport_parameters::NatTraversalConfig {
4048            role: negotiated_role,
4049            max_candidates: VarInt::from_u64(max_candidates).unwrap(),
4050            coordination_timeout: VarInt::from_u64(coordination_timeout).unwrap(),
4051            max_concurrent_attempts: VarInt::from_u64(max_concurrent_attempts).unwrap(),
4052            peer_id,
4053        };
4054
4055        // Validate the negotiated configuration
4056        negotiated_config.validate()
4057            .map_err(|e| format!("Negotiated configuration validation failed: {:?}", e))?;
4058
4059        debug!(
4060            "NAT traversal parameters negotiated: role={:?}, max_candidates={}, timeout={}ms, max_attempts={}",
4061            negotiated_role, max_candidates, coordination_timeout, max_concurrent_attempts
4062        );
4063
4064        Ok(negotiated_config)
4065    }
4066
4067    /// Initialize NAT traversal with negotiated configuration
4068    fn init_nat_traversal_with_negotiated_config(&mut self, config: &crate::transport_parameters::NatTraversalConfig) {
4069        
4070        // Convert transport parameter role to internal role
4071        let role = match config.role {
4072            crate::transport_parameters::NatTraversalRole::Client => NatTraversalRole::Client,
4073            crate::transport_parameters::NatTraversalRole::Server { can_relay } => NatTraversalRole::Server { can_relay },
4074            crate::transport_parameters::NatTraversalRole::Bootstrap => NatTraversalRole::Bootstrap,
4075        };
4076
4077        let max_candidates = config.max_candidates.into_inner().min(50) as u32;
4078        let coordination_timeout = Duration::from_millis(
4079            config.coordination_timeout.into_inner().min(30000)
4080        );
4081
4082        // Initialize NAT traversal state
4083        self.nat_traversal = Some(NatTraversalState::new(
4084            role,
4085            max_candidates,
4086            coordination_timeout,
4087        ));
4088
4089        trace!("NAT traversal initialized with negotiated config: role={:?}", role);
4090        
4091        // Perform role-specific initialization
4092        match role {
4093            NatTraversalRole::Bootstrap => {
4094                // Bootstrap nodes should be ready to observe addresses
4095                self.prepare_address_observation();
4096            }
4097            NatTraversalRole::Client => {
4098                // Clients should start candidate discovery
4099                self.schedule_candidate_discovery();
4100            }
4101            NatTraversalRole::Server { .. } => {
4102                // Servers should be ready to accept coordination requests
4103                self.prepare_coordination_handling();
4104            }
4105        }
4106    }
4107
4108    /// Initiate NAT traversal process for client endpoints
4109    fn initiate_nat_traversal_process(&mut self) {
4110        if let Some(nat_state) = &mut self.nat_traversal {
4111            match nat_state.start_candidate_discovery() {
4112                Ok(()) => {
4113                    debug!("NAT traversal process initiated - candidate discovery started");
4114                    // Schedule the first coordination attempt
4115                    self.timers.set(Timer::NatTraversal, Instant::now() + Duration::from_millis(100));
4116                }
4117                Err(e) => {
4118                    warn!("Failed to initiate NAT traversal process: {}", e);
4119                }
4120            }
4121        }
4122    }
4123
4124    /// Prepare for address observation (bootstrap nodes)
4125    fn prepare_address_observation(&mut self) {
4126        debug!("Preparing for address observation as bootstrap node");
4127        // Bootstrap nodes are ready to observe peer addresses immediately
4128        // No additional setup needed - observation happens during connection establishment
4129    }
4130
4131    /// Schedule candidate discovery for later execution
4132    fn schedule_candidate_discovery(&mut self) {
4133        debug!("Scheduling candidate discovery for client endpoint");
4134        // Set a timer to start candidate discovery after connection establishment
4135        self.timers.set(Timer::NatTraversal, Instant::now() + Duration::from_millis(50));
4136    }
4137
4138    /// Prepare to handle coordination requests (server nodes)
4139    fn prepare_coordination_handling(&mut self) {
4140        debug!("Preparing to handle coordination requests as server endpoint");
4141        // Server nodes are ready to handle coordination requests immediately
4142        // No additional setup needed - coordination happens via frame processing
4143    }
4144
4145    /// Handle NAT traversal timeout events
4146    fn handle_nat_traversal_timeout(&mut self, now: Instant) {
4147        // First get the actions from nat_state
4148        let timeout_result = if let Some(nat_state) = &mut self.nat_traversal {
4149            nat_state.handle_timeout(now)
4150        } else {
4151            return;
4152        };
4153
4154        // Then handle the actions without holding a mutable borrow to nat_state
4155        match timeout_result {
4156            Ok(actions) => {
4157                for action in actions {
4158                    match action {
4159                        nat_traversal::TimeoutAction::RetryDiscovery => {
4160                            debug!("NAT traversal timeout: retrying candidate discovery");
4161                            if let Some(nat_state) = &mut self.nat_traversal {
4162                                if let Err(e) = nat_state.start_candidate_discovery() {
4163                                    warn!("Failed to retry candidate discovery: {}", e);
4164                                }
4165                            }
4166                        }
4167                        nat_traversal::TimeoutAction::RetryCoordination => {
4168                            debug!("NAT traversal timeout: retrying coordination");
4169                            // Schedule next coordination attempt
4170                            self.timers.set(Timer::NatTraversal, now + Duration::from_secs(2));
4171                        }
4172                        nat_traversal::TimeoutAction::StartValidation => {
4173                            debug!("NAT traversal timeout: starting path validation");
4174                            self.start_nat_traversal_validation(now);
4175                        }
4176                        nat_traversal::TimeoutAction::Complete => {
4177                            debug!("NAT traversal completed successfully");
4178                            // NAT traversal is complete, no more timeouts needed
4179                            self.timers.stop(Timer::NatTraversal);
4180                        }
4181                        nat_traversal::TimeoutAction::Failed => {
4182                            warn!("NAT traversal failed after timeout");
4183                            // Consider fallback options or connection failure
4184                            self.handle_nat_traversal_failure();
4185                        }
4186                    }
4187                }
4188            }
4189            Err(e) => {
4190                warn!("NAT traversal timeout handling failed: {}", e);
4191                self.handle_nat_traversal_failure();
4192            }
4193        }
4194    }
4195
4196    /// Start NAT traversal path validation
4197    fn start_nat_traversal_validation(&mut self, now: Instant) {
4198        if let Some(nat_state) = &mut self.nat_traversal {
4199            // Get candidate pairs that need validation
4200            let pairs = nat_state.get_next_validation_pairs(3);
4201            
4202            for pair in pairs {
4203                // Send PATH_CHALLENGE to validate the path
4204                let challenge = self.rng.gen();
4205                self.path.challenge = Some(challenge);
4206                self.path.challenge_pending = true;
4207                
4208                debug!("Starting path validation for NAT traversal candidate: {}", pair.remote_addr);
4209            }
4210            
4211            // Set validation timeout
4212            self.timers.set(Timer::PathValidation, now + Duration::from_secs(3));
4213        }
4214    }
4215
4216    /// Handle NAT traversal failure
4217    fn handle_nat_traversal_failure(&mut self) {
4218        warn!("NAT traversal failed, considering fallback options");
4219        
4220        // Clear NAT traversal state
4221        self.nat_traversal = None;
4222        self.timers.stop(Timer::NatTraversal);
4223        
4224        // In a full implementation, this could:
4225        // 1. Try relay connections
4226        // 2. Emit failure events to the application
4227        // 3. Attempt direct connection as fallback
4228        
4229        // For now, we'll just log the failure
4230        debug!("NAT traversal disabled for this connection due to failure");
4231    }
4232
4233    /// Check if NAT traversal is supported and enabled for this connection
4234    pub fn nat_traversal_supported(&self) -> bool {
4235        self.nat_traversal.is_some() && 
4236        self.config.nat_traversal_config.is_some() &&
4237        self.peer_params.nat_traversal.is_some()
4238    }
4239
4240    /// Get the negotiated NAT traversal configuration
4241    pub fn nat_traversal_config(&self) -> Option<&crate::transport_parameters::NatTraversalConfig> {
4242        self.peer_params.nat_traversal.as_ref()
4243    }
4244
4245    /// Check if the connection is ready for NAT traversal operations
4246    pub fn nat_traversal_ready(&self) -> bool {
4247        self.nat_traversal_supported() && 
4248        matches!(self.state, State::Established)
4249    }
4250
4251    /// Get NAT traversal statistics for this connection
4252    /// 
4253    /// This method is preserved for debugging and monitoring purposes.
4254    /// It may be used in future telemetry or diagnostic features.
4255    #[allow(dead_code)]
4256    pub(crate) fn nat_traversal_stats(&self) -> Option<nat_traversal::NatTraversalStats> {
4257        self.nat_traversal.as_ref().map(|state| state.stats.clone())
4258    }
4259
4260    /// Force enable NAT traversal for testing purposes
4261    #[cfg(test)]
4262    pub(crate) fn force_enable_nat_traversal(&mut self, role: NatTraversalRole) {
4263        use crate::transport_parameters::{NatTraversalConfig, NatTraversalRole as TPRole};
4264        
4265        let tp_role = match role {
4266            NatTraversalRole::Client => TPRole::Client,
4267            NatTraversalRole::Server { can_relay } => TPRole::Server { can_relay },
4268            NatTraversalRole::Bootstrap => TPRole::Bootstrap,
4269        };
4270        
4271        let config = NatTraversalConfig {
4272            role: tp_role,
4273            max_candidates: VarInt::from_u32(8),
4274            coordination_timeout: VarInt::from_u32(10000),
4275            max_concurrent_attempts: VarInt::from_u32(3),
4276            peer_id: None,
4277        };
4278        
4279        self.peer_params.nat_traversal = Some(config.clone());
4280        self.config = Arc::new({
4281            let mut transport_config = (*self.config).clone();
4282            transport_config.nat_traversal_config = Some(config);
4283            transport_config
4284        });
4285        
4286        self.nat_traversal = Some(NatTraversalState::new(
4287            role,
4288            8,
4289            Duration::from_secs(10),
4290        ));
4291    }
4292
4293
4294
4295
4296
4297    /// Queue an ADD_ADDRESS frame to be sent to the peer
4298    /// Derive peer ID from connection context
4299    fn derive_peer_id_from_connection(&self) -> [u8; 32] {
4300        // Generate a peer ID based on connection IDs
4301        let mut hasher = std::collections::hash_map::DefaultHasher::new();
4302        use std::hash::Hasher;
4303        hasher.write(&self.rem_handshake_cid);
4304        hasher.write(&self.handshake_cid);
4305        hasher.write(&self.path.remote.to_string().into_bytes());
4306        let hash = hasher.finish();
4307        let mut peer_id = [0u8; 32];
4308        peer_id[..8].copy_from_slice(&hash.to_be_bytes());
4309        // Fill remaining bytes with connection ID data
4310        let cid_bytes = self.rem_handshake_cid.as_ref();
4311        let copy_len = (cid_bytes.len()).min(24);
4312        peer_id[8..8+copy_len].copy_from_slice(&cid_bytes[..copy_len]);
4313        peer_id
4314    }
4315
4316    /// Handle AddAddress frame from peer
4317    fn handle_add_address(
4318        &mut self,
4319        add_address: &crate::frame::AddAddress,
4320        now: Instant,
4321    ) -> Result<(), TransportError> {
4322        let nat_state = self.nat_traversal.as_mut()
4323            .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("AddAddress frame without NAT traversal negotiation"))?;
4324
4325        match nat_state.add_remote_candidate(
4326            add_address.sequence,
4327            add_address.address,
4328            add_address.priority,
4329            now,
4330        ) {
4331            Ok(()) => {
4332                trace!("Added remote candidate: {} (seq={}, priority={})", 
4333                       add_address.address, add_address.sequence, add_address.priority);
4334                
4335                // Trigger validation of this new candidate
4336                self.trigger_candidate_validation(add_address.address, now)?;
4337                Ok(())
4338            }
4339            Err(NatTraversalError::TooManyCandidates) => {
4340                Err(TransportError::PROTOCOL_VIOLATION("too many NAT traversal candidates"))
4341            }
4342            Err(NatTraversalError::DuplicateAddress) => {
4343                // Silently ignore duplicates (peer may resend)
4344                Ok(())
4345            }
4346            Err(e) => {
4347                warn!("Failed to add remote candidate: {}", e);
4348                Ok(()) // Don't terminate connection for non-critical errors
4349            }
4350        }
4351    }
4352
4353    /// Handle PunchMeNow frame from peer (via coordinator)
4354    fn handle_punch_me_now(
4355        &mut self,
4356        punch_me_now: &crate::frame::PunchMeNow,
4357        now: Instant,
4358    ) -> Result<(), TransportError> {
4359        trace!("Received PunchMeNow: round={}, target_seq={}, local_addr={}", 
4360               punch_me_now.round, punch_me_now.target_sequence, punch_me_now.local_address);
4361
4362        // Check if we're a bootstrap node that should coordinate this
4363        if let Some(nat_state) = &self.nat_traversal {
4364            if matches!(nat_state.role, NatTraversalRole::Bootstrap) {
4365                // We're a bootstrap node - process coordination request
4366                let from_peer_id = self.derive_peer_id_from_connection();
4367                
4368                // Clone the frame to avoid borrow checker issues
4369                let punch_me_now_clone = punch_me_now.clone();
4370                drop(nat_state); // Release the borrow
4371                
4372                match self.nat_traversal.as_mut().unwrap().handle_punch_me_now_frame(from_peer_id, self.path.remote, &punch_me_now_clone, now) {
4373                    Ok(Some(coordination_frame)) => {
4374                        trace!("Bootstrap node coordinating PUNCH_ME_NOW between peers");
4375                        
4376                        // Send coordination frame to target peer via endpoint
4377                        if let Some(target_peer_id) = punch_me_now.target_peer_id {
4378                            self.endpoint_events.push_back(
4379                                crate::shared::EndpointEventInner::RelayPunchMeNow(target_peer_id, coordination_frame)
4380                            );
4381                        }
4382                        
4383                        return Ok(());
4384                    }
4385                    Ok(None) => {
4386                        trace!("Bootstrap coordination completed or no action needed");
4387                        return Ok(());
4388                    }
4389                    Err(e) => {
4390                        warn!("Bootstrap coordination failed: {}", e);
4391                        return Ok(());
4392                    }
4393                }
4394            }
4395        }
4396
4397        // We're a regular peer receiving coordination from bootstrap
4398        let nat_state = self.nat_traversal.as_mut()
4399            .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation"))?;
4400        
4401        // Handle peer's coordination request
4402        if nat_state.handle_peer_punch_request(punch_me_now.round, now)
4403            .map_err(|_e| TransportError::PROTOCOL_VIOLATION("Failed to handle peer punch request"))? {
4404            trace!("Coordination synchronized for round {}", punch_me_now.round);
4405            
4406            // Create punch targets based on the received information
4407            // The peer's local_address tells us where they'll be listening
4408            let _local_addr = self.local_ip
4409                .map(|ip| SocketAddr::new(ip, 0))
4410                .unwrap_or_else(|| SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0));
4411            
4412            let target = nat_traversal::PunchTarget {
4413                remote_addr: punch_me_now.local_address,
4414                remote_sequence: punch_me_now.target_sequence,
4415                challenge: self.rng.gen(),
4416            };
4417            
4418            // Start coordination with this target
4419            nat_state.start_coordination_round(vec![target], now);
4420            
4421        } else {
4422            debug!("Failed to synchronize coordination for round {}", punch_me_now.round);
4423        }
4424        
4425        Ok(())
4426    }
4427
4428    /// Handle RemoveAddress frame from peer
4429    fn handle_remove_address(
4430        &mut self,
4431        remove_address: &crate::frame::RemoveAddress,
4432    ) -> Result<(), TransportError> {
4433        let nat_state = self.nat_traversal.as_mut()
4434            .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("RemoveAddress frame without NAT traversal negotiation"))?;
4435
4436        if nat_state.remove_candidate(remove_address.sequence) {
4437            trace!("Removed candidate with sequence {}", remove_address.sequence);
4438        } else {
4439            trace!("Attempted to remove unknown candidate sequence {}", remove_address.sequence);
4440        }
4441
4442        Ok(())
4443    }
4444
4445    /// Queue an AddAddress frame to advertise a new candidate address
4446    pub fn queue_add_address(&mut self, sequence: VarInt, address: SocketAddr, priority: VarInt) {
4447        // Queue the AddAddress frame
4448        let add_address = frame::AddAddress {
4449            sequence,
4450            address,
4451            priority,
4452        };
4453        
4454        self.spaces[SpaceId::Data].pending.add_addresses.push(add_address);
4455        trace!("Queued AddAddress frame: seq={}, addr={}, priority={}", sequence, address, priority);
4456    }
4457
4458    /// Queue a PunchMeNow frame to coordinate NAT traversal
4459    pub fn queue_punch_me_now(&mut self, round: VarInt, target_sequence: VarInt, local_address: SocketAddr) {
4460        let punch_me_now = frame::PunchMeNow {
4461            round,
4462            target_sequence,
4463            local_address,
4464            target_peer_id: None, // Direct peer-to-peer communication
4465        };
4466        
4467        self.spaces[SpaceId::Data].pending.punch_me_now.push(punch_me_now);
4468        trace!("Queued PunchMeNow frame: round={}, target={}", round, target_sequence);
4469    }
4470
4471    /// Queue a RemoveAddress frame to remove a candidate
4472    pub fn queue_remove_address(&mut self, sequence: VarInt) {
4473        let remove_address = frame::RemoveAddress { sequence };
4474        
4475        self.spaces[SpaceId::Data].pending.remove_addresses.push(remove_address);
4476        trace!("Queued RemoveAddress frame: seq={}", sequence);
4477    }
4478
4479    /// Trigger validation of a candidate address using PATH_CHALLENGE
4480    fn trigger_candidate_validation(
4481        &mut self,
4482        candidate_address: SocketAddr,
4483        now: Instant,
4484    ) -> Result<(), TransportError> {
4485        let nat_state = self.nat_traversal.as_mut()
4486            .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
4487        
4488        // Check if we already have an active validation for this address
4489        if nat_state.active_validations.contains_key(&candidate_address) {
4490            trace!("Validation already in progress for {}", candidate_address);
4491            return Ok(());
4492        }
4493        
4494        // Generate a random challenge value
4495        let challenge = self.rng.gen::<u64>();
4496        
4497        // Create path validation state
4498        let validation_state = nat_traversal::PathValidationState {
4499            challenge,
4500            sent_at: now,
4501            retry_count: 0,
4502            max_retries: 3,
4503            coordination_round: None,
4504            timeout_state: nat_traversal::AdaptiveTimeoutState::new(),
4505            last_retry_at: None,
4506        };
4507        
4508        // Store the validation attempt
4509        nat_state.active_validations.insert(candidate_address, validation_state);
4510        
4511        // Queue PATH_CHALLENGE frame to be sent to the candidate address
4512        self.nat_traversal_challenges.push(candidate_address, challenge);
4513        
4514        // Update statistics
4515        nat_state.stats.validations_succeeded += 1; // Will be decremented if validation fails
4516        
4517        trace!("Triggered PATH_CHALLENGE validation for {} with challenge {:016x}", 
4518               candidate_address, challenge);
4519        
4520        Ok(())
4521    }
4522    
4523
4524
4525
4526    /// Get current NAT traversal state information
4527    pub fn nat_traversal_state(&self) -> Option<(NatTraversalRole, usize, usize)> {
4528        self.nat_traversal.as_ref().map(|state| {
4529            (
4530                state.role,
4531                state.local_candidates.len(),
4532                state.remote_candidates.len(),
4533            )
4534        })
4535    }
4536    
4537    /// Initiate NAT traversal coordination through a bootstrap node
4538    pub fn initiate_nat_traversal_coordination(&mut self, now: Instant) -> Result<(), TransportError> {
4539        let nat_state = self.nat_traversal.as_mut()
4540            .ok_or_else(|| TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled"))?;
4541        
4542        // Check if we should send PUNCH_ME_NOW to coordinator
4543        if nat_state.should_send_punch_request() {
4544            // Generate candidate pairs for coordination
4545            nat_state.generate_candidate_pairs(now);
4546            
4547            // Get the best candidate pairs to try
4548            let pairs = nat_state.get_next_validation_pairs(3);
4549            if pairs.is_empty() {
4550                return Err(TransportError::PROTOCOL_VIOLATION("No candidate pairs for coordination"));
4551            }
4552            
4553            // Create punch targets from the pairs
4554            let targets: Vec<_> = pairs.into_iter()
4555                .map(|pair| nat_traversal::PunchTarget {
4556                    remote_addr: pair.remote_addr,
4557                    remote_sequence: pair.remote_sequence,
4558                    challenge: self.rng.gen(),
4559                })
4560                .collect();
4561            
4562            // Start coordination round
4563            let round = nat_state.start_coordination_round(targets, now)
4564                .map_err(|_e| TransportError::PROTOCOL_VIOLATION("Failed to start coordination round"))?;
4565            
4566            // Queue PUNCH_ME_NOW frame to be sent to bootstrap node
4567            // Include our best local address for the peer to target
4568            let local_addr = self.local_ip
4569                .map(|ip| SocketAddr::new(ip, self.local_ip.map(|_| 0).unwrap_or(0)))
4570                .unwrap_or_else(|| SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0));
4571            
4572            let punch_me_now = frame::PunchMeNow {
4573                round,
4574                target_sequence: VarInt::from_u32(0), // Will be filled by bootstrap
4575                local_address: local_addr,
4576                target_peer_id: None, // Direct peer-to-peer communication
4577            };
4578            
4579            self.spaces[SpaceId::Data].pending.punch_me_now.push(punch_me_now);
4580            nat_state.mark_punch_request_sent();
4581            
4582            trace!("Initiated NAT traversal coordination round {}", round);
4583        }
4584        
4585        Ok(())
4586    }
4587
4588    /// Trigger validation of NAT traversal candidates using PATH_CHALLENGE
4589    pub fn validate_nat_candidates(&mut self, now: Instant) {
4590        self.generate_nat_traversal_challenges(now);
4591    }
4592
4593    // === PUBLIC NAT TRAVERSAL FRAME TRANSMISSION API ===
4594
4595    /// Send an ADD_ADDRESS frame to advertise a candidate address to the peer
4596    /// 
4597    /// This is the primary method for sending NAT traversal address advertisements.
4598    /// The frame will be transmitted in the next outgoing QUIC packet.
4599    ///
4600    /// # Arguments
4601    /// * `address` - The candidate address to advertise
4602    /// * `priority` - ICE-style priority for this candidate (higher = better)
4603    ///
4604    /// # Returns
4605    /// * `Ok(sequence)` - The sequence number assigned to this candidate
4606    /// * `Err(ConnectionError)` - If NAT traversal is not enabled or other error
4607    pub fn send_nat_address_advertisement(
4608        &mut self, 
4609        address: SocketAddr, 
4610        priority: u32
4611    ) -> Result<u64, ConnectionError> {
4612        // Verify NAT traversal is enabled
4613        let nat_state = self.nat_traversal.as_mut()
4614            .ok_or_else(|| ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled on this connection")))?;
4615
4616        // Generate sequence number and add to local candidates
4617        let sequence = nat_state.next_sequence;
4618        nat_state.next_sequence = VarInt::from_u64(nat_state.next_sequence.into_inner() + 1).unwrap();
4619
4620        // Add to local candidates
4621        let now = Instant::now();
4622        nat_state.local_candidates.insert(
4623            sequence,
4624            nat_traversal::AddressCandidate {
4625                address,
4626                priority,
4627                source: nat_traversal::CandidateSource::Local,
4628                discovered_at: now,
4629                state: nat_traversal::CandidateState::New,
4630                attempt_count: 0,
4631                last_attempt: None,
4632            }
4633        );
4634
4635        // Update statistics
4636        nat_state.stats.local_candidates_sent += 1;
4637
4638        // Queue the frame for transmission (must be done after releasing nat_state borrow)
4639        self.queue_add_address(sequence, address, VarInt::from_u32(priority));
4640
4641        debug!("Queued ADD_ADDRESS frame: addr={}, priority={}, seq={}", address, priority, sequence);
4642        Ok(sequence.into_inner())
4643    }
4644
4645    /// Send a PUNCH_ME_NOW frame to coordinate hole punching with a peer
4646    ///
4647    /// This triggers synchronized hole punching for NAT traversal.
4648    ///
4649    /// # Arguments
4650    /// * `target_sequence` - Sequence number of the target candidate address
4651    /// * `local_address` - Our local address for the hole punching attempt
4652    /// * `round` - Coordination round number for synchronization
4653    ///
4654    /// # Returns
4655    /// * `Ok(())` - Frame queued for transmission
4656    /// * `Err(ConnectionError)` - If NAT traversal is not enabled
4657    pub fn send_nat_punch_coordination(
4658        &mut self,
4659        target_sequence: u64,
4660        local_address: SocketAddr,
4661        round: u32,
4662    ) -> Result<(), ConnectionError> {
4663        // Verify NAT traversal is enabled
4664        let _nat_state = self.nat_traversal.as_ref()
4665            .ok_or_else(|| ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled on this connection")))?;
4666
4667        // Queue the frame for transmission
4668        self.queue_punch_me_now(
4669            VarInt::from_u32(round),
4670            VarInt::from_u64(target_sequence).map_err(|_| {
4671                ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION("Invalid target sequence number"))
4672            })?,
4673            local_address,
4674        );
4675
4676        debug!("Queued PUNCH_ME_NOW frame: target_seq={}, local_addr={}, round={}", 
4677               target_sequence, local_address, round);
4678        Ok(())
4679    }
4680
4681    /// Send a REMOVE_ADDRESS frame to remove a previously advertised candidate
4682    ///
4683    /// This removes a candidate address that is no longer valid or available.
4684    ///
4685    /// # Arguments
4686    /// * `sequence` - Sequence number of the candidate to remove
4687    ///
4688    /// # Returns
4689    /// * `Ok(())` - Frame queued for transmission
4690    /// * `Err(ConnectionError)` - If NAT traversal is not enabled
4691    pub fn send_nat_address_removal(
4692        &mut self,
4693        sequence: u64,
4694    ) -> Result<(), ConnectionError> {
4695        // Verify NAT traversal is enabled
4696        let nat_state = self.nat_traversal.as_mut()
4697            .ok_or_else(|| ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION("NAT traversal not enabled on this connection")))?;
4698
4699        let sequence_varint = VarInt::from_u64(sequence).map_err(|_| {
4700            ConnectionError::TransportError(TransportError::PROTOCOL_VIOLATION("Invalid sequence number"))
4701        })?;
4702
4703        // Remove from local candidates
4704        nat_state.local_candidates.remove(&sequence_varint);
4705
4706        // Queue the frame for transmission
4707        self.queue_remove_address(sequence_varint);
4708
4709        debug!("Queued REMOVE_ADDRESS frame: seq={}", sequence);
4710        Ok(())
4711    }
4712
4713    /// Get statistics about NAT traversal activity on this connection
4714    ///
4715    /// # Returns
4716    /// * `Some(stats)` - Current NAT traversal statistics
4717    /// * `None` - If NAT traversal is not enabled
4718    /// 
4719    /// This method is preserved for debugging and monitoring purposes.
4720    /// It may be used in future telemetry or diagnostic features.
4721    #[allow(dead_code)]
4722    pub(crate) fn get_nat_traversal_stats(&self) -> Option<&nat_traversal::NatTraversalStats> {
4723        self.nat_traversal.as_ref().map(|state| &state.stats)
4724    }
4725
4726    /// Check if NAT traversal is enabled and active on this connection
4727    pub fn is_nat_traversal_enabled(&self) -> bool {
4728        self.nat_traversal.is_some()
4729    }
4730
4731    /// Get the current NAT traversal role for this connection
4732    pub fn get_nat_traversal_role(&self) -> Option<NatTraversalRole> {
4733        self.nat_traversal.as_ref().map(|state| state.role)
4734    }
4735
4736
4737    fn decrypt_packet(
4738        &mut self,
4739        now: Instant,
4740        packet: &mut Packet,
4741    ) -> Result<Option<u64>, Option<TransportError>> {
4742        let result = packet_crypto::decrypt_packet_body(
4743            packet,
4744            &self.spaces,
4745            self.zero_rtt_crypto.as_ref(),
4746            self.key_phase,
4747            self.prev_crypto.as_ref(),
4748            self.next_crypto.as_ref(),
4749        )?;
4750
4751        let result = match result {
4752            Some(r) => r,
4753            None => return Ok(None),
4754        };
4755
4756        if result.outgoing_key_update_acked {
4757            if let Some(prev) = self.prev_crypto.as_mut() {
4758                prev.end_packet = Some((result.number, now));
4759                self.set_key_discard_timer(now, packet.header.space());
4760            }
4761        }
4762
4763        if result.incoming_key_update {
4764            trace!("key update authenticated");
4765            self.update_keys(Some((result.number, now)), true);
4766            self.set_key_discard_timer(now, packet.header.space());
4767        }
4768
4769        Ok(Some(result.number))
4770    }
4771
4772    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
4773        trace!("executing key update");
4774        // Generate keys for the key phase after the one we're switching to, store them in
4775        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
4776        // `prev_crypto`.
4777        let new = self
4778            .crypto
4779            .next_1rtt_keys()
4780            .expect("only called for `Data` packets");
4781        self.key_phase_size = new
4782            .local
4783            .confidentiality_limit()
4784            .saturating_sub(KEY_UPDATE_MARGIN);
4785        let old = mem::replace(
4786            &mut self.spaces[SpaceId::Data]
4787                .crypto
4788                .as_mut()
4789                .unwrap() // safe because update_keys() can only be triggered by short packets
4790                .packet,
4791            mem::replace(self.next_crypto.as_mut().unwrap(), new),
4792        );
4793        self.spaces[SpaceId::Data].sent_with_keys = 0;
4794        self.prev_crypto = Some(PrevCrypto {
4795            crypto: old,
4796            end_packet,
4797            update_unacked: remote,
4798        });
4799        self.key_phase = !self.key_phase;
4800    }
4801
4802    fn peer_supports_ack_frequency(&self) -> bool {
4803        self.peer_params.min_ack_delay.is_some()
4804    }
4805
4806    /// Send an IMMEDIATE_ACK frame to the remote endpoint
4807    ///
4808    /// According to the spec, this will result in an error if the remote endpoint does not support
4809    /// the Acknowledgement Frequency extension
4810    pub(crate) fn immediate_ack(&mut self) {
4811        self.spaces[self.highest_space].immediate_ack_pending = true;
4812    }
4813
4814    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
4815    #[cfg(test)]
4816    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
4817        let (first_decode, remaining) = match &event.0 {
4818            ConnectionEventInner::Datagram(DatagramConnectionEvent {
4819                first_decode,
4820                remaining,
4821                ..
4822            }) => (first_decode, remaining),
4823            _ => return None,
4824        };
4825
4826        if remaining.is_some() {
4827            panic!("Packets should never be coalesced in tests");
4828        }
4829
4830        let decrypted_header = packet_crypto::unprotect_header(
4831            first_decode.clone(),
4832            &self.spaces,
4833            self.zero_rtt_crypto.as_ref(),
4834            self.peer_params.stateless_reset_token,
4835        )?;
4836
4837        let mut packet = decrypted_header.packet?;
4838        packet_crypto::decrypt_packet_body(
4839            &mut packet,
4840            &self.spaces,
4841            self.zero_rtt_crypto.as_ref(),
4842            self.key_phase,
4843            self.prev_crypto.as_ref(),
4844            self.next_crypto.as_ref(),
4845        )
4846        .ok()?;
4847
4848        Some(packet.payload.to_vec())
4849    }
4850
4851    /// The number of bytes of packets containing retransmittable frames that have not been
4852    /// acknowledged or declared lost.
4853    #[cfg(test)]
4854    pub(crate) fn bytes_in_flight(&self) -> u64 {
4855        self.path.in_flight.bytes
4856    }
4857
4858    /// Number of bytes worth of non-ack-only packets that may be sent
4859    #[cfg(test)]
4860    pub(crate) fn congestion_window(&self) -> u64 {
4861        self.path
4862            .congestion
4863            .window()
4864            .saturating_sub(self.path.in_flight.bytes)
4865    }
4866
4867    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
4868    #[cfg(test)]
4869    pub(crate) fn is_idle(&self) -> bool {
4870        Timer::VALUES
4871            .iter()
4872            .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
4873            .filter_map(|&t| Some((t, self.timers.get(t)?)))
4874            .min_by_key(|&(_, time)| time)
4875            .map_or(true, |(timer, _)| timer == Timer::Idle)
4876    }
4877
4878    /// Total number of outgoing packets that have been deemed lost
4879    #[cfg(test)]
4880    pub(crate) fn lost_packets(&self) -> u64 {
4881        self.lost_packets
4882    }
4883
4884    /// Whether explicit congestion notification is in use on outgoing packets.
4885    #[cfg(test)]
4886    pub(crate) fn using_ecn(&self) -> bool {
4887        self.path.sending_ecn
4888    }
4889
4890    /// The number of received bytes in the current path
4891    #[cfg(test)]
4892    pub(crate) fn total_recvd(&self) -> u64 {
4893        self.path.total_recvd
4894    }
4895
4896    #[cfg(test)]
4897    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
4898        self.local_cid_state.active_seq()
4899    }
4900
4901    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
4902    /// with updated `retire_prior_to` field set to `v`
4903    #[cfg(test)]
4904    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
4905        let n = self.local_cid_state.assign_retire_seq(v);
4906        self.endpoint_events
4907            .push_back(EndpointEventInner::NeedIdentifiers(now, n));
4908    }
4909
4910    /// Check the current active remote CID sequence
4911    #[cfg(test)]
4912    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
4913        self.rem_cids.active_seq()
4914    }
4915
4916    /// Returns the detected maximum udp payload size for the current path
4917    #[cfg(test)]
4918    pub(crate) fn path_mtu(&self) -> u16 {
4919        self.path.current_mtu()
4920    }
4921
4922    /// Whether we have 1-RTT data to send
4923    ///
4924    /// See also `self.space(SpaceId::Data).can_send()`
4925    fn can_send_1rtt(&self, max_size: usize) -> bool {
4926        self.streams.can_send_stream_data()
4927            || self.path.challenge_pending
4928            || self
4929                .prev_path
4930                .as_ref()
4931                .is_some_and(|(_, x)| x.challenge_pending)
4932            || !self.path_responses.is_empty()
4933            || !self.nat_traversal_challenges.is_empty()
4934            || self
4935                .datagrams
4936                .outgoing
4937                .front()
4938                .is_some_and(|x| x.size(true) <= max_size)
4939    }
4940
4941    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
4942    fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
4943        // Visit known paths from newest to oldest to find the one `pn` was sent on
4944        for path in [&mut self.path]
4945            .into_iter()
4946            .chain(self.prev_path.as_mut().map(|(_, data)| data))
4947        {
4948            if path.remove_in_flight(pn, packet) {
4949                return;
4950            }
4951        }
4952    }
4953
4954    /// Terminate the connection instantly, without sending a close packet
4955    fn kill(&mut self, reason: ConnectionError) {
4956        self.close_common();
4957        self.error = Some(reason);
4958        self.state = State::Drained;
4959        self.endpoint_events.push_back(EndpointEventInner::Drained);
4960    }
4961    
4962    /// Generate PATH_CHALLENGE frames for NAT traversal candidate validation
4963    fn generate_nat_traversal_challenges(&mut self, now: Instant) {
4964        // Get candidates ready for validation first
4965        let candidates: Vec<(VarInt, SocketAddr)> = if let Some(nat_state) = &self.nat_traversal {
4966            nat_state.get_validation_candidates()
4967                .into_iter()
4968                .take(3) // Validate up to 3 candidates in parallel
4969                .map(|(seq, candidate)| (seq, candidate.address))
4970                .collect()
4971        } else {
4972            return;
4973        };
4974        
4975        if candidates.is_empty() {
4976            return;
4977        }
4978        
4979        // Now process candidates with mutable access
4980        if let Some(nat_state) = &mut self.nat_traversal {
4981            for (seq, address) in candidates {
4982                // Generate a random challenge token
4983                let challenge: u64 = self.rng.gen();
4984                
4985                // Start validation for this candidate
4986                if let Err(e) = nat_state.start_validation(seq, challenge, now) {
4987                    debug!("Failed to start validation for candidate {}: {}", seq, e);
4988                    continue;
4989                }
4990                
4991                // Queue the challenge
4992                self.nat_traversal_challenges.push(address, challenge);
4993                trace!("Queuing NAT validation PATH_CHALLENGE for {} with token {:08x}", 
4994                       address, challenge);
4995            }
4996        }
4997    }
4998
4999    /// Storage size required for the largest packet known to be supported by the current path
5000    ///
5001    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
5002    pub fn current_mtu(&self) -> u16 {
5003        self.path.current_mtu()
5004    }
5005
5006    /// Size of non-frame data for a 1-RTT packet
5007    ///
5008    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
5009    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
5010    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
5011    /// latency and packet loss.
5012    fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
5013        let pn_len = match pn {
5014            Some(pn) => PacketNumber::new(
5015                pn,
5016                self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
5017            )
5018            .len(),
5019            // Upper bound
5020            None => 4,
5021        };
5022
5023        // 1 byte for flags
5024        1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
5025    }
5026
5027    fn tag_len_1rtt(&self) -> usize {
5028        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
5029            Some(crypto) => Some(&*crypto.packet.local),
5030            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
5031        };
5032        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
5033        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
5034        // but that would needlessly prevent sending datagrams during 0-RTT.
5035        key.map_or(16, |x| x.tag_len())
5036    }
5037
5038    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
5039    fn on_path_validated(&mut self) {
5040        self.path.validated = true;
5041        let ConnectionSide::Server { server_config } = &self.side else {
5042            return;
5043        };
5044        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
5045        new_tokens.clear();
5046        for _ in 0..server_config.validation_token.sent {
5047            new_tokens.push(self.path.remote);
5048        }
5049    }
5050}
5051
5052impl fmt::Debug for Connection {
5053    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5054        f.debug_struct("Connection")
5055            .field("handshake_cid", &self.handshake_cid)
5056            .finish()
5057    }
5058}
5059
5060/// Fields of `Connection` specific to it being client-side or server-side
5061enum ConnectionSide {
5062    Client {
5063        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
5064        token: Bytes,
5065        token_store: Arc<dyn TokenStore>,
5066        server_name: String,
5067    },
5068    Server {
5069        server_config: Arc<ServerConfig>,
5070    },
5071}
5072
5073impl ConnectionSide {
5074    fn remote_may_migrate(&self) -> bool {
5075        match self {
5076            Self::Server { server_config } => server_config.migration,
5077            Self::Client { .. } => false,
5078        }
5079    }
5080
5081    fn is_client(&self) -> bool {
5082        self.side().is_client()
5083    }
5084
5085    fn is_server(&self) -> bool {
5086        self.side().is_server()
5087    }
5088
5089    fn side(&self) -> Side {
5090        match *self {
5091            Self::Client { .. } => Side::Client,
5092            Self::Server { .. } => Side::Server,
5093        }
5094    }
5095}
5096
5097impl From<SideArgs> for ConnectionSide {
5098    fn from(side: SideArgs) -> Self {
5099        match side {
5100            SideArgs::Client {
5101                token_store,
5102                server_name,
5103            } => Self::Client {
5104                token: token_store.take(&server_name).unwrap_or_default(),
5105                token_store,
5106                server_name,
5107            },
5108            SideArgs::Server {
5109                server_config,
5110                pref_addr_cid: _,
5111                path_validated: _,
5112            } => Self::Server { server_config },
5113        }
5114    }
5115}
5116
5117/// Parameters to `Connection::new` specific to it being client-side or server-side
5118pub(crate) enum SideArgs {
5119    Client {
5120        token_store: Arc<dyn TokenStore>,
5121        server_name: String,
5122    },
5123    Server {
5124        server_config: Arc<ServerConfig>,
5125        pref_addr_cid: Option<ConnectionId>,
5126        path_validated: bool,
5127    },
5128}
5129
5130impl SideArgs {
5131    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
5132        match *self {
5133            Self::Client { .. } => None,
5134            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
5135        }
5136    }
5137
5138    pub(crate) fn path_validated(&self) -> bool {
5139        match *self {
5140            Self::Client { .. } => true,
5141            Self::Server { path_validated, .. } => path_validated,
5142        }
5143    }
5144
5145    pub(crate) fn side(&self) -> Side {
5146        match *self {
5147            Self::Client { .. } => Side::Client,
5148            Self::Server { .. } => Side::Server,
5149        }
5150    }
5151}
5152
5153/// Reasons why a connection might be lost
5154#[derive(Debug, Error, Clone, PartialEq, Eq)]
5155pub enum ConnectionError {
5156    /// The peer doesn't implement any supported version
5157    #[error("peer doesn't implement any supported version")]
5158    VersionMismatch,
5159    /// The peer violated the QUIC specification as understood by this implementation
5160    #[error(transparent)]
5161    TransportError(#[from] TransportError),
5162    /// The peer's QUIC stack aborted the connection automatically
5163    #[error("aborted by peer: {0}")]
5164    ConnectionClosed(frame::ConnectionClose),
5165    /// The peer closed the connection
5166    #[error("closed by peer: {0}")]
5167    ApplicationClosed(frame::ApplicationClose),
5168    /// The peer is unable to continue processing this connection, usually due to having restarted
5169    #[error("reset by peer")]
5170    Reset,
5171    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
5172    ///
5173    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
5174    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
5175    /// and [`TransportConfig::keep_alive_interval()`].
5176    #[error("timed out")]
5177    TimedOut,
5178    /// The local application closed the connection
5179    #[error("closed")]
5180    LocallyClosed,
5181    /// The connection could not be created because not enough of the CID space is available
5182    ///
5183    /// Try using longer connection IDs.
5184    #[error("CIDs exhausted")]
5185    CidsExhausted,
5186}
5187
5188impl From<Close> for ConnectionError {
5189    fn from(x: Close) -> Self {
5190        match x {
5191            Close::Connection(reason) => Self::ConnectionClosed(reason),
5192            Close::Application(reason) => Self::ApplicationClosed(reason),
5193        }
5194    }
5195}
5196
5197// For compatibility with API consumers
5198impl From<ConnectionError> for io::Error {
5199    fn from(x: ConnectionError) -> Self {
5200        use ConnectionError::*;
5201        let kind = match x {
5202            TimedOut => io::ErrorKind::TimedOut,
5203            Reset => io::ErrorKind::ConnectionReset,
5204            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
5205            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
5206                io::ErrorKind::Other
5207            }
5208        };
5209        Self::new(kind, x)
5210    }
5211}
5212
5213#[allow(unreachable_pub)] // fuzzing only
5214#[derive(Clone)]
5215pub enum State {
5216    Handshake(state::Handshake),
5217    Established,
5218    Closed(state::Closed),
5219    Draining,
5220    /// Waiting for application to call close so we can dispose of the resources
5221    Drained,
5222}
5223
5224impl State {
5225    fn closed<R: Into<Close>>(reason: R) -> Self {
5226        Self::Closed(state::Closed {
5227            reason: reason.into(),
5228        })
5229    }
5230
5231    fn is_handshake(&self) -> bool {
5232        matches!(*self, Self::Handshake(_))
5233    }
5234
5235    fn is_established(&self) -> bool {
5236        matches!(*self, Self::Established)
5237    }
5238
5239    fn is_closed(&self) -> bool {
5240        matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
5241    }
5242
5243    fn is_drained(&self) -> bool {
5244        matches!(*self, Self::Drained)
5245    }
5246}
5247
5248mod state {
5249    use super::*;
5250
5251    #[allow(unreachable_pub)] // fuzzing only
5252    #[derive(Clone)]
5253    pub struct Handshake {
5254        /// Whether the remote CID has been set by the peer yet
5255        ///
5256        /// Always set for servers
5257        pub(super) rem_cid_set: bool,
5258        /// Stateless retry token received in the first Initial by a server.
5259        ///
5260        /// Must be present in every Initial. Always empty for clients.
5261        pub(super) expected_token: Bytes,
5262        /// First cryptographic message
5263        ///
5264        /// Only set for clients
5265        pub(super) client_hello: Option<Bytes>,
5266    }
5267
5268    #[allow(unreachable_pub)] // fuzzing only
5269    #[derive(Clone)]
5270    pub struct Closed {
5271        pub(super) reason: Close,
5272    }
5273}
5274
5275/// Events of interest to the application
5276#[derive(Debug)]
5277pub enum Event {
5278    /// The connection's handshake data is ready
5279    HandshakeDataReady,
5280    /// The connection was successfully established
5281    Connected,
5282    /// The connection was lost
5283    ///
5284    /// Emitted if the peer closes the connection or an error is encountered.
5285    ConnectionLost {
5286        /// Reason that the connection was closed
5287        reason: ConnectionError,
5288    },
5289    /// Stream events
5290    Stream(StreamEvent),
5291    /// One or more application datagrams have been received
5292    DatagramReceived,
5293    /// One or more application datagrams have been sent after blocking
5294    DatagramsUnblocked,
5295}
5296
5297fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
5298    if x > y { x - y } else { Duration::ZERO }
5299}
5300
5301fn get_max_ack_delay(params: &TransportParameters) -> Duration {
5302    Duration::from_micros(params.max_ack_delay.0 * 1000)
5303}
5304
5305// Prevents overflow and improves behavior in extreme circumstances
5306const MAX_BACKOFF_EXPONENT: u32 = 16;
5307
5308/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
5309///
5310/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
5311/// plus some space for frames. We only care about handshake headers because short header packets
5312/// necessarily have smaller headers, and initial packets are only ever the first packet in a
5313/// datagram (because we coalesce in ascending packet space order and the only reason to split a
5314/// packet is when packet space changes).
5315const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
5316
5317/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
5318///
5319/// Excludes packet-type-specific fields such as packet number or Initial token
5320// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
5321// scid len + scid + length + pn
5322const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
5323    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
5324
5325/// Perform key updates this many packets before the AEAD confidentiality limit.
5326///
5327/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
5328const KEY_UPDATE_MARGIN: u64 = 10_000;
5329
5330#[derive(Default)]
5331struct SentFrames {
5332    retransmits: ThinRetransmits,
5333    largest_acked: Option<u64>,
5334    stream_frames: StreamMetaVec,
5335    /// Whether the packet contains non-retransmittable frames (like datagrams)
5336    non_retransmits: bool,
5337    requires_padding: bool,
5338}
5339
5340impl SentFrames {
5341    /// Returns whether the packet contains only ACKs
5342    fn is_ack_only(&self, streams: &StreamsState) -> bool {
5343        self.largest_acked.is_some()
5344            && !self.non_retransmits
5345            && self.stream_frames.is_empty()
5346            && self.retransmits.is_empty(streams)
5347    }
5348}
5349
5350/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
5351///
5352/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
5353///
5354/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
5355///
5356/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
5357fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
5358    match (x, y) {
5359        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
5360        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
5361        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
5362        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
5363    }
5364}
5365
5366#[cfg(test)]
5367mod tests {
5368    use super::*;
5369
5370    #[test]
5371    fn negotiate_max_idle_timeout_commutative() {
5372        let test_params = [
5373            (None, None, None),
5374            (None, Some(VarInt(0)), None),
5375            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
5376            (Some(VarInt(0)), Some(VarInt(0)), None),
5377            (
5378                Some(VarInt(2)),
5379                Some(VarInt(0)),
5380                Some(Duration::from_millis(2)),
5381            ),
5382            (
5383                Some(VarInt(1)),
5384                Some(VarInt(4)),
5385                Some(Duration::from_millis(1)),
5386            ),
5387        ];
5388
5389        for (left, right, result) in test_params {
5390            assert_eq!(negotiate_max_idle_timeout(left, right), result);
5391            assert_eq!(negotiate_max_idle_timeout(right, left), result);
5392        }
5393    }
5394}