Skip to main content

specter/transport/h3/
handshake.rs

1//! Native QUIC handshake state for HTTP/3.
2
3use std::collections::BTreeMap;
4use std::net::SocketAddr;
5use std::time::{Duration, Instant};
6
7use bytes::{Bytes, BytesMut};
8
9use crate::error::{Error, Result};
10use crate::fingerprint::{Http3Fingerprint, QuicTransportParams, TlsFingerprint};
11use crate::headers::Headers;
12use crate::transport::h3::native;
13use crate::transport::h3::path::{match_local_connection_id, QuicConnectionIdInventory};
14use crate::transport::h3::quic::{
15    build_initial_crypto_packet, decode_frames, decode_long_header, decode_transport_parameters,
16    decode_version_negotiation_packet, derive_initial_key_material,
17    derive_next_packet_key_material, encode_frame, encode_long_header, open_long_header_packet,
18    open_short_header_packet, protect_long_header_packet, protect_short_header_packet,
19    split_long_header_datagram, validate_retry_integrity_tag_v1, ConnectionId, LongHeaderPacket,
20    LongHeaderType, OpenedShortHeaderPacket, QuicAckTracker, QuicCloseState, QuicCryptoAssembler,
21    QuicEcnMark, QuicFrame, QuicLossDetector, QuicPacketKeyMaterial, QuicPathValidator,
22    QuicPmtuProbePolicy, TransportParameter,
23};
24use crate::transport::h3::recovery::{
25    LossDetectionOutcome, PacketNumberSpace, RecoveryState, SentPacketInfo,
26};
27use crate::transport::h3::tls::{
28    build_client_initial_packet_from_capture_with_size,
29    build_client_initial_packet_from_capture_with_version_and_size, ClientInitialPacket,
30    NativeH3HandshakeStatus, NativeH3SessionTicket, NativeQuicTlsSession, QuicEncryptionLevel,
31    QuicSecretDirection, QuicTlsSecret,
32};
33
34use getrandom::fill as getrandom_fill;
35
36const QUIC_VERSION_1: u32 = 1;
37const INITIAL_PACKET_NUMBER_LEN: usize = 4;
38const AES_GCM_TAG_LEN: usize = 16;
39const MIN_PATH_VALIDATION_DATAGRAM: usize = 1200;
40const QUIC_CONNECTION_MIGRATION_ERROR: u64 = 0x0a;
41
42fn new_server_cid_inventory(
43    fingerprint: &Http3Fingerprint,
44    server_source_cid: &ConnectionId,
45    client_source_cid: &ConnectionId,
46) -> QuicConnectionIdInventory {
47    let mut inventory =
48        QuicConnectionIdInventory::new(fingerprint.transport.active_connection_id_limit);
49    inventory.install_initial_local(server_source_cid.clone(), [0u8; 16]);
50    inventory.install_initial_peer(
51        Bytes::copy_from_slice(client_source_cid.as_bytes()),
52        [0u8; 16],
53    );
54    inventory
55}
56
57fn new_client_cid_inventory(
58    fingerprint: &Http3Fingerprint,
59    source_cid: &ConnectionId,
60) -> QuicConnectionIdInventory {
61    let mut inventory =
62        QuicConnectionIdInventory::new(fingerprint.transport.active_connection_id_limit);
63    inventory.install_initial_local(source_cid.clone(), [0u8; 16]);
64    inventory
65}
66
67fn pad_short_header_payload_to_min_datagram(
68    payload: Bytes,
69    short_header_len: usize,
70    min_datagram: usize,
71) -> Bytes {
72    let mut out = payload.to_vec();
73    while short_header_len + out.len() + AES_GCM_TAG_LEN < min_datagram {
74        out.push(0);
75    }
76    Bytes::from(out)
77}
78
79fn recovery_state_from_transport(params: &QuicTransportParams) -> RecoveryState {
80    let max_ack_delay = Duration::from_millis(params.max_ack_delay_ms);
81    let datagram = params.max_recv_udp_payload_size.max(1200) as u64;
82    RecoveryState::new(max_ack_delay, datagram)
83}
84
85fn observe_packet_with_ecn(
86    tracker: &mut QuicAckTracker,
87    packet_number: u64,
88    ecn_mark: Option<QuicEcnMark>,
89    now: Instant,
90) {
91    if let Some(mark) = ecn_mark {
92        tracker.observe_ecn_at(packet_number, mark, now);
93    } else {
94        tracker.observe_at(packet_number, now);
95    }
96}
97
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub struct ProcessedServerInitial {
100    pub packet_number: u64,
101    pub crypto_data: Bytes,
102    pub initial_crypto_out: Bytes,
103    pub handshake_crypto_out: Bytes,
104    pub secrets: Vec<QuicTlsSecret>,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct ClientHandshakePacket {
109    pub packet: Bytes,
110    pub packet_number: u64,
111    pub packet_number_offset: usize,
112    pub crypto_data: Bytes,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct ServerHandshakePacket {
117    pub packet: Bytes,
118    pub packet_type: LongHeaderType,
119    pub packet_number: u64,
120    pub packet_number_offset: usize,
121    pub crypto_data: Bytes,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct ServerHandshakeFlight {
126    pub datagram: Bytes,
127    pub packets: Vec<ServerHandshakePacket>,
128    pub secrets: Vec<QuicTlsSecret>,
129}
130
131#[derive(Debug, Clone, PartialEq, Eq)]
132pub struct ProcessedClientHandshake {
133    pub packet_number: u64,
134    pub crypto_data: Bytes,
135    pub secrets: Vec<QuicTlsSecret>,
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub struct ClientAckPacket {
140    pub packet: Bytes,
141    pub packet_type: LongHeaderType,
142    pub packet_number: u64,
143    pub packet_number_offset: usize,
144}
145
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct ClientApplicationPacket {
148    pub packet: Bytes,
149    pub packet_number: u64,
150    pub stream_id: u64,
151    pub packet_number_offset: usize,
152    pub data: Bytes,
153}
154
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct ServerApplicationPacket {
157    pub packet: Bytes,
158    pub packet_number: u64,
159    pub stream_id: u64,
160    pub packet_number_offset: usize,
161    pub data: Bytes,
162}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub struct ServerApplicationAckPacket {
166    pub packet: Bytes,
167    pub packet_number: u64,
168    pub packet_number_offset: usize,
169}
170
171#[derive(Debug, Clone, PartialEq, Eq)]
172pub struct ServerApplicationControlPacket {
173    pub packet: Bytes,
174    pub packet_number: u64,
175    pub packet_number_offset: usize,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub struct ClientApplicationAckPacket {
180    pub packet: Bytes,
181    pub packet_number: u64,
182    pub packet_number_offset: usize,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct ClientApplicationControlPacket {
187    pub packet: Bytes,
188    pub packet_number: u64,
189    pub packet_number_offset: usize,
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
193pub struct ServerH3StreamEvent {
194    pub stream_id: u64,
195    pub stream_type: Option<native::H3StreamType>,
196    pub fin: bool,
197    pub frames: Vec<native::H3Frame>,
198}
199
200#[derive(Debug, Clone, PartialEq, Eq)]
201pub struct ClientH3StreamEvent {
202    pub stream_id: u64,
203    pub stream_type: Option<native::H3StreamType>,
204    pub fin: bool,
205    pub frames: Vec<native::H3Frame>,
206}
207
208#[derive(Debug, Clone, PartialEq, Eq)]
209pub enum ClientH3Event {
210    Stream(ClientH3StreamEvent),
211    ResetStream {
212        stream_id: u64,
213        error_code: u64,
214        final_size: u64,
215    },
216    StopSending {
217        stream_id: u64,
218        error_code: u64,
219    },
220    ConnectionClose {
221        error_code: u64,
222        frame_type: Option<u64>,
223        reason: Bytes,
224    },
225    PathChallenge([u8; 8]),
226}
227
228#[derive(Debug, Clone, PartialEq, Eq)]
229pub enum ServerH3Event {
230    Stream(ServerH3StreamEvent),
231    ResetStream {
232        stream_id: u64,
233        error_code: u64,
234        final_size: u64,
235    },
236    StopSending {
237        stream_id: u64,
238        error_code: u64,
239    },
240    ConnectionClose {
241        error_code: u64,
242        frame_type: Option<u64>,
243        reason: Bytes,
244    },
245    PathChallenge([u8; 8]),
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249struct SentApplicationStreamPacket {
250    stream_id: u64,
251    stream_offset: u64,
252    fin: bool,
253    data: Bytes,
254}
255
256/// Retained previous-phase packet protection keys for the RFC9001 § 6.2
257/// previous-key window. Reordered packets at the old phase are decrypted via
258/// these keys until `retire_at` elapses, then they are dropped.
259#[derive(Debug, Clone, PartialEq, Eq)]
260struct PreviousKeys {
261    keys: QuicPacketKeyMaterial,
262    phase: bool,
263    retire_at: Instant,
264}
265
266/// Bound on how long the previous-phase keys remain valid after a successful
267/// key update. RFC9001 § 6.5 recommends retaining old keys for "three times
268/// the PTO" worth of time, which is connection-specific; we use a conservative
269/// fixed window that covers typical loss/reorder horizons.
270const PREVIOUS_KEY_WINDOW: Duration = Duration::from_secs(3);
271
272/// Tracks RFC9001 § 6.5 "key update in progress" enforcement: once the local
273/// endpoint initiates a key update it MUST NOT initiate another until an ACK
274/// confirms a packet sent at the new key phase has been received.
275#[derive(Debug, Clone, PartialEq, Eq, Default)]
276struct OneRttKeyUpdate {
277    write_update_in_progress: bool,
278    write_update_anchor: Option<u64>,
279}
280
281impl OneRttKeyUpdate {
282    fn note_packet_acked(&mut self, packet_number: u64) {
283        if let Some(anchor) = self.write_update_anchor {
284            if packet_number >= anchor {
285                self.write_update_in_progress = false;
286                self.write_update_anchor = None;
287            }
288        }
289    }
290}
291
292/// Result of opening a 1-RTT short-header packet, describing which set of
293/// per-phase keys decrypted the AEAD so the caller can commit the receive
294/// rotation when applicable.
295#[derive(Debug, Clone, Copy, PartialEq, Eq)]
296enum OneRttOpenOutcome {
297    Current,
298    Previous,
299    Next,
300}
301
302#[derive(Debug, Clone, PartialEq, Eq)]
303struct OneRttOpenedPacket {
304    opened: OpenedShortHeaderPacket,
305    outcome: OneRttOpenOutcome,
306}
307
308#[allow(clippy::too_many_arguments)]
309fn try_open_one_rtt_packet(
310    current: &QuicPacketKeyMaterial,
311    next: Option<&QuicPacketKeyMaterial>,
312    previous: Option<&PreviousKeys>,
313    expected_read_phase: bool,
314    now: Instant,
315    packet: &[u8],
316    destination_cid_len: usize,
317    expected_packet_number: u64,
318) -> Result<OneRttOpenedPacket> {
319    if let Ok(opened) =
320        open_short_header_packet(current, packet, destination_cid_len, expected_packet_number)
321    {
322        if opened.key_phase == expected_read_phase {
323            return Ok(OneRttOpenedPacket {
324                opened,
325                outcome: OneRttOpenOutcome::Current,
326            });
327        }
328    }
329
330    if let Some(previous) = previous {
331        if previous.retire_at > now {
332            if let Ok(opened) = open_short_header_packet(
333                &previous.keys,
334                packet,
335                destination_cid_len,
336                expected_packet_number,
337            ) {
338                if opened.key_phase == previous.phase {
339                    return Ok(OneRttOpenedPacket {
340                        opened,
341                        outcome: OneRttOpenOutcome::Previous,
342                    });
343                }
344            }
345        }
346    }
347
348    if let Some(next) = next {
349        let expected_next_phase = !expected_read_phase;
350        if let Ok(opened) =
351            open_short_header_packet(next, packet, destination_cid_len, expected_packet_number)
352        {
353            if opened.key_phase == expected_next_phase {
354                return Ok(OneRttOpenedPacket {
355                    opened,
356                    outcome: OneRttOpenOutcome::Next,
357                });
358            }
359        }
360    }
361
362    Err(Error::Quic(
363        "native QUIC 1-RTT short-header packet could not be decrypted with current, previous, or next phase keys".into(),
364    ))
365}
366
367#[derive(Debug, Clone, PartialEq, Eq)]
368struct SentCryptoPacket {
369    packet_type: LongHeaderType,
370    crypto_offset: u64,
371    crypto_data: Bytes,
372}
373
374#[derive(Debug, Clone, PartialEq, Eq)]
375struct QuicApplicationFlowControl {
376    local_initiator: u64,
377    max_data: u64,
378    sent_data: u64,
379    initial_max_stream_data_bidi_local: u64,
380    initial_max_stream_data_bidi_remote: u64,
381    initial_max_stream_data_uni: u64,
382    initial_max_streams_bidi: u64,
383    initial_max_streams_uni: u64,
384    stream_sent: BTreeMap<u64, u64>,
385    stream_data_overrides: BTreeMap<u64, u64>,
386    last_blocked: Option<QuicFrame>,
387}
388
389impl QuicApplicationFlowControl {
390    fn client(peer_transport: &QuicTransportParams) -> Self {
391        Self::new(0, peer_transport)
392    }
393
394    fn server(peer_transport: &QuicTransportParams) -> Self {
395        Self::new(1, peer_transport)
396    }
397
398    fn new(local_initiator: u64, peer_transport: &QuicTransportParams) -> Self {
399        Self {
400            local_initiator,
401            max_data: peer_transport.initial_max_data,
402            sent_data: 0,
403            initial_max_stream_data_bidi_local: peer_transport.initial_max_stream_data_bidi_local,
404            initial_max_stream_data_bidi_remote: peer_transport.initial_max_stream_data_bidi_remote,
405            initial_max_stream_data_uni: peer_transport.initial_max_stream_data_uni,
406            initial_max_streams_bidi: peer_transport.initial_max_streams_bidi,
407            initial_max_streams_uni: peer_transport.initial_max_streams_uni,
408            stream_sent: BTreeMap::new(),
409            stream_data_overrides: BTreeMap::new(),
410            last_blocked: None,
411        }
412    }
413
414    fn apply_max_data(&mut self, max_data: u64) {
415        self.max_data = self.max_data.max(max_data);
416    }
417
418    fn apply_max_stream_data(&mut self, stream_id: u64, max_stream_data: u64) {
419        self.stream_data_overrides
420            .entry(stream_id)
421            .and_modify(|current| *current = (*current).max(max_stream_data))
422            .or_insert(max_stream_data);
423    }
424
425    fn apply_max_streams(&mut self, bidirectional: bool, max_streams: u64) {
426        if bidirectional {
427            self.initial_max_streams_bidi = self.initial_max_streams_bidi.max(max_streams);
428        } else {
429            self.initial_max_streams_uni = self.initial_max_streams_uni.max(max_streams);
430        }
431    }
432
433    fn take_blocked_frame(&mut self) -> Option<QuicFrame> {
434        self.last_blocked.take()
435    }
436
437    fn consume_stream_data(
438        &mut self,
439        stream_id: u64,
440        stream_offset: u64,
441        len: usize,
442    ) -> Result<()> {
443        let stream_limit = self.stream_data_limit(stream_id)?;
444        let data_end = stream_offset
445            .checked_add(len as u64)
446            .ok_or_else(|| Error::HttpProtocol("QUIC flow control range overflow".into()))?;
447        if data_end > stream_limit {
448            self.last_blocked = Some(QuicFrame::StreamDataBlocked {
449                stream_id,
450                maximum_stream_data: stream_limit,
451            });
452            return Err(Error::Quic(format!(
453                "native H3 flow control blocked stream {stream_id}: end offset {data_end} exceeds peer stream limit {stream_limit}"
454            )));
455        }
456
457        let previous_stream_sent = *self.stream_sent.get(&stream_id).unwrap_or(&0);
458        let new_connection_bytes = data_end.saturating_sub(previous_stream_sent);
459        let next_sent_data = self
460            .sent_data
461            .checked_add(new_connection_bytes)
462            .ok_or_else(|| Error::HttpProtocol("QUIC flow control data overflow".into()))?;
463        if next_sent_data > self.max_data {
464            self.last_blocked = Some(QuicFrame::DataBlocked {
465                maximum_data: self.max_data,
466            });
467            return Err(Error::Quic(format!(
468                "native H3 flow control blocked stream {stream_id}: connection data {next_sent_data} exceeds peer connection limit {}",
469                self.max_data
470            )));
471        }
472
473        self.sent_data = next_sent_data;
474        self.stream_sent
475            .insert(stream_id, previous_stream_sent.max(data_end));
476        self.last_blocked = None;
477        Ok(())
478    }
479
480    fn stream_data_limit(&mut self, stream_id: u64) -> Result<u64> {
481        let initial_limit = if is_bidirectional_stream(stream_id) {
482            if stream_initiator(stream_id) == self.local_initiator {
483                self.ensure_stream_count(
484                    stream_id,
485                    self.initial_max_streams_bidi,
486                    "bidirectional",
487                )?;
488                self.initial_max_stream_data_bidi_remote
489            } else {
490                self.initial_max_stream_data_bidi_local
491            }
492        } else {
493            if stream_initiator(stream_id) != self.local_initiator {
494                return Err(Error::Quic(format!(
495                    "native H3 flow control cannot send on peer-initiated unidirectional stream {stream_id}"
496                )));
497            }
498            self.ensure_stream_count(stream_id, self.initial_max_streams_uni, "unidirectional")?;
499            self.initial_max_stream_data_uni
500        };
501        Ok(self
502            .stream_data_overrides
503            .get(&stream_id)
504            .copied()
505            .unwrap_or(initial_limit)
506            .max(initial_limit))
507    }
508
509    fn ensure_stream_count(&mut self, stream_id: u64, max_streams: u64, label: &str) -> Result<()> {
510        let required_stream_count = (stream_id >> 2) + 1;
511        if required_stream_count > max_streams {
512            self.last_blocked = Some(QuicFrame::StreamsBlocked {
513                bidirectional: label == "bidirectional",
514                maximum_streams: max_streams,
515            });
516            return Err(Error::Quic(format!(
517                "native H3 flow control blocked stream {stream_id}: opening {required_stream_count} {label} streams exceeds peer limit {max_streams}"
518            )));
519        }
520        Ok(())
521    }
522}
523
524// QUIC receive flow control.
525//
526// RFC 9000 Section 4 specifies that MAX_DATA and MAX_STREAM_DATA frames
527// (encodings in Sections 19.9 and 19.10) carry the *absolute* maximum the
528// receiver is willing to accept on the connection or stream, not a delta.
529// Per RFC 9000 Section 4.1, "a receiver MUST close the connection with an
530// error of type FLOW_CONTROL_ERROR if the sender violates the advertised
531// connection or stream data limits", and Section 4.2 ties window growth to
532// the receiver's application drain rate so that buffers stay bounded.
533//
534// We therefore derive every advertised absolute value from
535// `initial_max_*data + bytes_consumed_by_application`, and only *gate* the
536// emission of those frames so we are not putting one frame per byte on the
537// wire. The on-wire receive path (`observe_stream_frame`) is kept purely as
538// an enforcement check against the limit we have already advertised.
539#[derive(Debug, Clone, PartialEq, Eq)]
540struct QuicReceiveFlowControl {
541    local_initiator: u64,
542    initial_max_data: u64,
543    max_data: u64,
544    max_connection_window: u64,
545    received_data: u64,
546    initial_max_stream_data_bidi_local: u64,
547    initial_max_stream_data_bidi_remote: u64,
548    initial_max_stream_data_uni: u64,
549    max_stream_window: u64,
550    stream_received: BTreeMap<u64, u64>,
551    stream_data_overrides: BTreeMap<u64, u64>,
552    connection_consumed: u64,
553    stream_consumed: BTreeMap<u64, u64>,
554    last_announced_max_data: u64,
555    last_announced_max_stream_data: BTreeMap<u64, u64>,
556    pending_max_data: Option<u64>,
557    pending_max_stream_data: BTreeMap<u64, u64>,
558    connection_update_threshold: u64,
559}
560
561impl QuicReceiveFlowControl {
562    fn client(local_transport: &QuicTransportParams) -> Self {
563        Self::new(0, local_transport)
564    }
565
566    fn server(local_transport: &QuicTransportParams) -> Self {
567        Self::new(1, local_transport)
568    }
569
570    fn new(local_initiator: u64, local_transport: &QuicTransportParams) -> Self {
571        let initial_max_data = local_transport.initial_max_data;
572        let max_connection_window = local_transport.max_connection_window.max(initial_max_data);
573        // Emit MAX_DATA when the absolute value we would announce has grown
574        // by at least half of the originally negotiated initial window since
575        // the last announcement. This keeps the same "half-window" cadence
576        // that the previous receive-threshold logic used, but applied to the
577        // app-consumed counter that RFC 9000 Section 4 requires.
578        let connection_update_threshold = (initial_max_data / 2).max(1);
579        Self {
580            local_initiator,
581            initial_max_data,
582            max_data: initial_max_data,
583            max_connection_window,
584            received_data: 0,
585            initial_max_stream_data_bidi_local: local_transport.initial_max_stream_data_bidi_local,
586            initial_max_stream_data_bidi_remote: local_transport
587                .initial_max_stream_data_bidi_remote,
588            initial_max_stream_data_uni: local_transport.initial_max_stream_data_uni,
589            max_stream_window: local_transport.max_stream_window,
590            stream_received: BTreeMap::new(),
591            stream_data_overrides: BTreeMap::new(),
592            connection_consumed: 0,
593            stream_consumed: BTreeMap::new(),
594            last_announced_max_data: initial_max_data,
595            last_announced_max_stream_data: BTreeMap::new(),
596            pending_max_data: None,
597            pending_max_stream_data: BTreeMap::new(),
598            connection_update_threshold,
599        }
600    }
601
602    fn observe_stream_frame(
603        &mut self,
604        stream_id: u64,
605        offset: Option<u64>,
606        len: usize,
607    ) -> Result<()> {
608        let stream_limit = self.stream_data_limit(stream_id)?;
609        let stream_offset = offset.unwrap_or(0);
610        let data_end = stream_offset.checked_add(len as u64).ok_or_else(|| {
611            Error::HttpProtocol("QUIC receive flow control range overflow".into())
612        })?;
613        if data_end > stream_limit {
614            return Err(Error::Quic(format!(
615                "native H3 receive flow control blocked stream {stream_id}: end offset {data_end} exceeds local stream limit {stream_limit}"
616            )));
617        }
618
619        let previous_stream_received = *self.stream_received.get(&stream_id).unwrap_or(&0);
620        let new_connection_bytes = data_end.saturating_sub(previous_stream_received);
621        let next_received_data = self
622            .received_data
623            .checked_add(new_connection_bytes)
624            .ok_or_else(|| Error::HttpProtocol("QUIC receive flow control data overflow".into()))?;
625        if next_received_data > self.max_data {
626            return Err(Error::Quic(format!(
627                "native H3 receive flow control blocked stream {stream_id}: connection data {next_received_data} exceeds local connection limit {}",
628                self.max_data
629            )));
630        }
631
632        self.received_data = next_received_data;
633        self.stream_received
634            .insert(stream_id, previous_stream_received.max(data_end));
635        Ok(())
636    }
637
638    // Record the bytes the application has drained off a stream's public body
639    // (or RFC 9220 tunnel inbound channel). Per RFC 9000 Section 4.1/4.2 the
640    // absolute MAX_DATA / MAX_STREAM_DATA values are
641    //   initial_max_data + sum(bytes_consumed_by_application across streams)
642    //   initial_max_stream_data[kind] + bytes_consumed_for_this_stream
643    // and we only enqueue a frame when the value crosses the gating
644    // threshold relative to the last announced value.
645    fn record_stream_consumed(&mut self, stream_id: u64, len: u64) -> Result<()> {
646        if len == 0 {
647            return Ok(());
648        }
649
650        let initial_stream_limit = self.initial_stream_data_limit(stream_id)?;
651        let stream_window = self.max_stream_window.max(initial_stream_limit);
652        let stream_threshold = (initial_stream_limit / 2).max(1);
653
654        let stream_total = self
655            .stream_consumed
656            .get(&stream_id)
657            .copied()
658            .unwrap_or(0)
659            .checked_add(len)
660            .ok_or_else(|| {
661                Error::HttpProtocol("QUIC receive flow control consumed overflow".into())
662            })?;
663        self.stream_consumed.insert(stream_id, stream_total);
664
665        self.connection_consumed = self.connection_consumed.checked_add(len).ok_or_else(|| {
666            Error::HttpProtocol("QUIC receive flow control connection consumed overflow".into())
667        })?;
668
669        let stream_announced = *self
670            .last_announced_max_stream_data
671            .get(&stream_id)
672            .unwrap_or(&initial_stream_limit);
673        let stream_absolute = initial_stream_limit
674            .saturating_add(stream_total)
675            .min(stream_window);
676        if stream_absolute > stream_announced
677            && stream_absolute - stream_announced >= stream_threshold
678        {
679            self.pending_max_stream_data
680                .insert(stream_id, stream_absolute);
681            self.stream_data_overrides
682                .insert(stream_id, stream_absolute);
683            self.last_announced_max_stream_data
684                .insert(stream_id, stream_absolute);
685        }
686
687        let connection_absolute = self
688            .initial_max_data
689            .saturating_add(self.connection_consumed)
690            .min(self.max_connection_window);
691        if connection_absolute > self.last_announced_max_data
692            && connection_absolute - self.last_announced_max_data
693                >= self.connection_update_threshold
694        {
695            self.pending_max_data = Some(connection_absolute);
696            self.max_data = connection_absolute;
697            self.last_announced_max_data = connection_absolute;
698        }
699        Ok(())
700    }
701
702    // Drop per-stream bookkeeping when a stream is closed. RFC 9000 Section
703    // 4.1 keeps the connection-level counter monotonic across stream
704    // lifetimes, so we never decrement `connection_consumed`; only the
705    // per-stream maps are released so completed streams cannot double-count.
706    fn release_stream(&mut self, stream_id: u64) {
707        self.stream_consumed.remove(&stream_id);
708        self.last_announced_max_stream_data.remove(&stream_id);
709        self.pending_max_stream_data.remove(&stream_id);
710        self.stream_received.remove(&stream_id);
711        self.stream_data_overrides.remove(&stream_id);
712    }
713
714    fn take_update_frames(&mut self) -> Vec<QuicFrame> {
715        let mut frames = Vec::new();
716        if let Some(max_data) = self.pending_max_data.take() {
717            frames.push(QuicFrame::MaxData(max_data));
718        }
719        frames.extend(
720            std::mem::take(&mut self.pending_max_stream_data)
721                .into_iter()
722                .map(|(stream_id, max_stream_data)| QuicFrame::MaxStreamData {
723                    stream_id,
724                    max_stream_data,
725                }),
726        );
727        frames
728    }
729
730    fn stream_data_limit(&self, stream_id: u64) -> Result<u64> {
731        if let Some(max_stream_data) = self.stream_data_overrides.get(&stream_id) {
732            return Ok(*max_stream_data);
733        }
734        self.initial_stream_data_limit(stream_id)
735    }
736
737    fn initial_stream_data_limit(&self, stream_id: u64) -> Result<u64> {
738        if is_bidirectional_stream(stream_id) {
739            if stream_initiator(stream_id) == self.local_initiator {
740                Ok(self.initial_max_stream_data_bidi_local)
741            } else {
742                Ok(self.initial_max_stream_data_bidi_remote)
743            }
744        } else if stream_initiator(stream_id) == self.local_initiator {
745            Err(Error::Quic(format!(
746                "native H3 receive flow control cannot receive on local unidirectional stream {stream_id}"
747            )))
748        } else {
749            Ok(self.initial_max_stream_data_uni)
750        }
751    }
752}
753
754pub struct NativeQuicHandshake {
755    client_initial: ClientInitialPacket,
756    pending_client_initial: Option<ClientInitialPacket>,
757    tls: NativeQuicTlsSession,
758    fingerprint: Http3Fingerprint,
759    server_name: String,
760    tls_fingerprint: Option<TlsFingerprint>,
761    verify_peer: bool,
762    root_certs: Vec<Vec<u8>>,
763    use_platform_roots: bool,
764    supported_versions: Vec<u32>,
765    client_initial_version: u32,
766    retry_received: bool,
767    vn_received: bool,
768    server_initial_or_handshake_seen: bool,
769    original_destination_cid: ConnectionId,
770    retry_source_cid: Option<ConnectionId>,
771    destination_cid: ConnectionId,
772    source_cid: ConnectionId,
773    client_initial_keys: QuicPacketKeyMaterial,
774    server_initial_keys: QuicPacketKeyMaterial,
775    client_handshake_keys: Option<QuicPacketKeyMaterial>,
776    client_early_data_keys: Option<QuicPacketKeyMaterial>,
777    server_handshake_keys: Option<QuicPacketKeyMaterial>,
778    client_application_keys: Option<QuicPacketKeyMaterial>,
779    server_application_keys: Option<QuicPacketKeyMaterial>,
780    client_application_next_keys: Option<QuicPacketKeyMaterial>,
781    server_application_next_keys: Option<QuicPacketKeyMaterial>,
782    server_application_previous: Option<PreviousKeys>,
783    write_key_phase: bool,
784    read_key_phase: bool,
785    application_key_update: OneRttKeyUpdate,
786    initial_crypto: QuicCryptoAssembler,
787    handshake_crypto: QuicCryptoAssembler,
788    initial_ack_tracker: QuicAckTracker,
789    handshake_ack_tracker: QuicAckTracker,
790    application_ack_tracker: QuicAckTracker,
791    client_initial_loss_detector: QuicLossDetector,
792    client_handshake_loss_detector: QuicLossDetector,
793    client_application_loss_detector: QuicLossDetector,
794    client_application_flow_control: QuicApplicationFlowControl,
795    client_application_receive_flow_control: QuicReceiveFlowControl,
796    client_initial_sent_crypto: BTreeMap<u64, SentCryptoPacket>,
797    client_handshake_sent_crypto: BTreeMap<u64, SentCryptoPacket>,
798    client_application_sent_streams: BTreeMap<u64, SentApplicationStreamPacket>,
799    client_application_recovery_lost_packets: Vec<u64>,
800    client_application_ecn_congestion: bool,
801    client_path_validator: QuicPathValidator,
802    client_cid_inventory: QuicConnectionIdInventory,
803    client_pmtu_probe: QuicPmtuProbePolicy,
804    server_transport_parameters_validated: bool,
805    recovery: RecoveryState,
806    next_client_initial_packet_number: u64,
807    next_server_initial_packet_number: u64,
808    next_server_handshake_packet_number: u64,
809    next_client_handshake_packet_number: u64,
810    next_server_application_packet_number: u64,
811    next_client_application_packet_number: u64,
812    next_client_bidirectional_stream_id: u64,
813    next_client_unidirectional_stream_id: u64,
814    client_handshake_crypto_offset: u64,
815    client_stream_offsets: BTreeMap<u64, u64>,
816    server_h3_stream_buffers: BTreeMap<u64, BytesMut>,
817    server_h3_stream_buffer_offsets: BTreeMap<u64, u64>,
818    server_h3_stream_types: BTreeMap<u64, native::H3StreamType>,
819    close_draining: bool,
820    close_state: QuicCloseState,
821}
822
823pub struct NativeQuicServerHandshake {
824    tls: NativeQuicTlsSession,
825    client_source_cid: ConnectionId,
826    server_source_cid: ConnectionId,
827    client_initial_keys: QuicPacketKeyMaterial,
828    server_initial_keys: QuicPacketKeyMaterial,
829    client_handshake_keys: Option<QuicPacketKeyMaterial>,
830    client_early_data_keys: Option<QuicPacketKeyMaterial>,
831    server_handshake_keys: Option<QuicPacketKeyMaterial>,
832    client_initial_crypto: QuicCryptoAssembler,
833    client_handshake_crypto: QuicCryptoAssembler,
834    client_initial_ack_tracker: QuicAckTracker,
835    client_handshake_ack_tracker: QuicAckTracker,
836    client_application_ack_tracker: QuicAckTracker,
837    server_initial_loss_detector: QuicLossDetector,
838    server_handshake_loss_detector: QuicLossDetector,
839    server_application_loss_detector: QuicLossDetector,
840    server_application_flow_control: QuicApplicationFlowControl,
841    server_application_receive_flow_control: QuicReceiveFlowControl,
842    server_application_sent_streams: BTreeMap<u64, SentApplicationStreamPacket>,
843    server_application_recovery_lost_packets: Vec<u64>,
844    server_initial_sent_crypto: BTreeMap<u64, SentCryptoPacket>,
845    server_handshake_sent_crypto: BTreeMap<u64, SentCryptoPacket>,
846    server_path_validator: QuicPathValidator,
847    cid_inventory: QuicConnectionIdInventory,
848    recovery: RecoveryState,
849    ack_delay_exponent: u64,
850    next_client_initial_packet_number: u64,
851    next_client_handshake_packet_number: u64,
852    next_client_application_packet_number: u64,
853    next_server_initial_packet_number: u64,
854    next_server_handshake_packet_number: u64,
855    next_server_application_packet_number: u64,
856    next_server_unidirectional_stream_id: u64,
857    client_application_keys: Option<QuicPacketKeyMaterial>,
858    server_application_keys: Option<QuicPacketKeyMaterial>,
859    client_application_next_keys: Option<QuicPacketKeyMaterial>,
860    server_application_next_keys: Option<QuicPacketKeyMaterial>,
861    client_application_previous: Option<PreviousKeys>,
862    write_key_phase: bool,
863    read_key_phase: bool,
864    application_key_update: OneRttKeyUpdate,
865    server_initial_crypto_offset: u64,
866    server_handshake_crypto_offset: u64,
867    server_stream_offsets: BTreeMap<u64, u64>,
868    server_control_stream_id: Option<u64>,
869    client_h3_stream_buffers: BTreeMap<u64, BytesMut>,
870    client_h3_stream_buffer_offsets: BTreeMap<u64, u64>,
871    client_h3_stream_types: BTreeMap<u64, native::H3StreamType>,
872    close_draining: bool,
873    close_state: QuicCloseState,
874}
875
876impl NativeQuicServerHandshake {
877    pub fn new(
878        fingerprint: &Http3Fingerprint,
879        cert_pem: &[u8],
880        key_pem: &[u8],
881        client_destination_cid: ConnectionId,
882        client_source_cid: ConnectionId,
883        server_source_cid: ConnectionId,
884    ) -> Result<Self> {
885        let initial_keys = derive_initial_key_material(client_destination_cid.as_bytes())?;
886        let cid_inventory =
887            new_server_cid_inventory(fingerprint, &server_source_cid, &client_source_cid);
888        Ok(Self {
889            tls: NativeQuicTlsSession::server_with_connection_ids(
890                fingerprint,
891                cert_pem,
892                key_pem,
893                &client_destination_cid,
894                &server_source_cid,
895            )?,
896            client_source_cid,
897            server_source_cid,
898            client_initial_keys: initial_keys.client,
899            server_initial_keys: initial_keys.server,
900            client_handshake_keys: None,
901            client_early_data_keys: None,
902            server_handshake_keys: None,
903            client_initial_crypto: QuicCryptoAssembler::default(),
904            client_handshake_crypto: QuicCryptoAssembler::default(),
905            client_initial_ack_tracker: QuicAckTracker::default(),
906            client_handshake_ack_tracker: QuicAckTracker::default(),
907            client_application_ack_tracker: QuicAckTracker::default(),
908            server_initial_loss_detector: QuicLossDetector::default(),
909            server_handshake_loss_detector: QuicLossDetector::default(),
910            server_application_loss_detector: QuicLossDetector::default(),
911            server_application_flow_control: QuicApplicationFlowControl::server(
912                &fingerprint.transport,
913            ),
914            server_application_receive_flow_control: QuicReceiveFlowControl::server(
915                &fingerprint.transport,
916            ),
917            server_application_sent_streams: BTreeMap::new(),
918            server_application_recovery_lost_packets: Vec::new(),
919            server_initial_sent_crypto: BTreeMap::new(),
920            server_handshake_sent_crypto: BTreeMap::new(),
921            server_path_validator: QuicPathValidator::default(),
922            cid_inventory,
923            next_client_initial_packet_number: 0,
924            next_client_handshake_packet_number: 0,
925            next_client_application_packet_number: 0,
926            next_server_initial_packet_number: 0,
927            next_server_handshake_packet_number: 0,
928            next_server_application_packet_number: 0,
929            next_server_unidirectional_stream_id: 3,
930            client_application_keys: None,
931            server_application_keys: None,
932            client_application_next_keys: None,
933            server_application_next_keys: None,
934            client_application_previous: None,
935            write_key_phase: false,
936            read_key_phase: false,
937            application_key_update: OneRttKeyUpdate::default(),
938            server_initial_crypto_offset: 0,
939            server_handshake_crypto_offset: 0,
940            server_stream_offsets: BTreeMap::new(),
941            server_control_stream_id: None,
942            client_h3_stream_buffers: BTreeMap::new(),
943            client_h3_stream_buffer_offsets: BTreeMap::new(),
944            client_h3_stream_types: BTreeMap::new(),
945            close_draining: false,
946            close_state: QuicCloseState::default(),
947            recovery: recovery_state_from_transport(&fingerprint.transport),
948            ack_delay_exponent: fingerprint.transport.ack_delay_exponent,
949        })
950    }
951
952    pub fn new_with_ticket_keys(
953        fingerprint: &Http3Fingerprint,
954        cert_pem: &[u8],
955        key_pem: &[u8],
956        client_destination_cid: ConnectionId,
957        client_source_cid: ConnectionId,
958        server_source_cid: ConnectionId,
959        ticket_keys: &[u8; crate::transport::h3::tls::NATIVE_H3_TICKET_KEY_LEN],
960    ) -> Result<Self> {
961        let initial_keys = derive_initial_key_material(client_destination_cid.as_bytes())?;
962        let cid_inventory =
963            new_server_cid_inventory(fingerprint, &server_source_cid, &client_source_cid);
964        Ok(Self {
965            tls: NativeQuicTlsSession::server_with_connection_ids_and_ticket_keys(
966                fingerprint,
967                cert_pem,
968                key_pem,
969                &client_destination_cid,
970                &server_source_cid,
971                ticket_keys,
972            )?,
973            client_source_cid,
974            server_source_cid,
975            client_initial_keys: initial_keys.client,
976            server_initial_keys: initial_keys.server,
977            client_handshake_keys: None,
978            client_early_data_keys: None,
979            server_handshake_keys: None,
980            client_application_keys: None,
981            server_application_keys: None,
982            client_application_next_keys: None,
983            server_application_next_keys: None,
984            client_application_previous: None,
985            write_key_phase: false,
986            read_key_phase: false,
987            application_key_update: OneRttKeyUpdate::default(),
988            server_initial_crypto_offset: 0,
989            server_handshake_crypto_offset: 0,
990            server_stream_offsets: BTreeMap::new(),
991            server_control_stream_id: None,
992            client_h3_stream_buffers: BTreeMap::new(),
993            client_h3_stream_buffer_offsets: BTreeMap::new(),
994            client_h3_stream_types: BTreeMap::new(),
995            close_draining: false,
996            close_state: QuicCloseState::default(),
997            client_initial_crypto: QuicCryptoAssembler::default(),
998            client_handshake_crypto: QuicCryptoAssembler::default(),
999            client_initial_ack_tracker: QuicAckTracker::default(),
1000            client_handshake_ack_tracker: QuicAckTracker::default(),
1001            client_application_ack_tracker: QuicAckTracker::default(),
1002            server_initial_loss_detector: QuicLossDetector::default(),
1003            server_handshake_loss_detector: QuicLossDetector::default(),
1004            server_application_loss_detector: QuicLossDetector::default(),
1005            server_application_flow_control: QuicApplicationFlowControl::server(
1006                &fingerprint.transport,
1007            ),
1008            server_application_receive_flow_control: QuicReceiveFlowControl::server(
1009                &fingerprint.transport,
1010            ),
1011            server_application_sent_streams: BTreeMap::new(),
1012            server_application_recovery_lost_packets: Vec::new(),
1013            server_initial_sent_crypto: BTreeMap::new(),
1014            server_handshake_sent_crypto: BTreeMap::new(),
1015            server_path_validator: QuicPathValidator::default(),
1016            cid_inventory,
1017            next_client_initial_packet_number: 0,
1018            next_client_handshake_packet_number: 0,
1019            next_client_application_packet_number: 0,
1020            next_server_initial_packet_number: 0,
1021            next_server_handshake_packet_number: 0,
1022            next_server_application_packet_number: 0,
1023            next_server_unidirectional_stream_id: 3,
1024            recovery: recovery_state_from_transport(&fingerprint.transport),
1025            ack_delay_exponent: fingerprint.transport.ack_delay_exponent,
1026        })
1027    }
1028
1029    #[allow(clippy::too_many_arguments)]
1030    pub fn new_with_transport_parameter_connection_ids(
1031        fingerprint: &Http3Fingerprint,
1032        cert_pem: &[u8],
1033        key_pem: &[u8],
1034        client_destination_cid: ConnectionId,
1035        client_source_cid: ConnectionId,
1036        server_source_cid: ConnectionId,
1037        transport_original_destination_cid: ConnectionId,
1038        transport_initial_source_cid: ConnectionId,
1039        transport_retry_source_cid: Option<ConnectionId>,
1040    ) -> Result<Self> {
1041        let initial_keys = derive_initial_key_material(client_destination_cid.as_bytes())?;
1042        let cid_inventory =
1043            new_server_cid_inventory(fingerprint, &server_source_cid, &client_source_cid);
1044        Ok(Self {
1045            tls: NativeQuicTlsSession::server_with_transport_parameter_connection_ids(
1046                fingerprint,
1047                cert_pem,
1048                key_pem,
1049                &transport_original_destination_cid,
1050                &transport_initial_source_cid,
1051                transport_retry_source_cid.as_ref(),
1052            )?,
1053            client_source_cid,
1054            server_source_cid,
1055            client_initial_keys: initial_keys.client,
1056            server_initial_keys: initial_keys.server,
1057            client_handshake_keys: None,
1058            client_early_data_keys: None,
1059            server_handshake_keys: None,
1060            client_initial_crypto: QuicCryptoAssembler::default(),
1061            client_handshake_crypto: QuicCryptoAssembler::default(),
1062            client_initial_ack_tracker: QuicAckTracker::default(),
1063            client_handshake_ack_tracker: QuicAckTracker::default(),
1064            client_application_ack_tracker: QuicAckTracker::default(),
1065            server_initial_loss_detector: QuicLossDetector::default(),
1066            server_handshake_loss_detector: QuicLossDetector::default(),
1067            server_application_loss_detector: QuicLossDetector::default(),
1068            server_application_flow_control: QuicApplicationFlowControl::server(
1069                &fingerprint.transport,
1070            ),
1071            server_application_receive_flow_control: QuicReceiveFlowControl::server(
1072                &fingerprint.transport,
1073            ),
1074            server_application_sent_streams: BTreeMap::new(),
1075            server_application_recovery_lost_packets: Vec::new(),
1076            server_initial_sent_crypto: BTreeMap::new(),
1077            server_handshake_sent_crypto: BTreeMap::new(),
1078            server_path_validator: QuicPathValidator::default(),
1079            cid_inventory,
1080            next_client_initial_packet_number: 0,
1081            next_client_handshake_packet_number: 0,
1082            next_client_application_packet_number: 0,
1083            next_server_initial_packet_number: 0,
1084            next_server_handshake_packet_number: 0,
1085            next_server_application_packet_number: 0,
1086            next_server_unidirectional_stream_id: 3,
1087            client_application_keys: None,
1088            server_application_keys: None,
1089            client_application_next_keys: None,
1090            server_application_next_keys: None,
1091            client_application_previous: None,
1092            write_key_phase: false,
1093            read_key_phase: false,
1094            application_key_update: OneRttKeyUpdate::default(),
1095            server_initial_crypto_offset: 0,
1096            server_handshake_crypto_offset: 0,
1097            server_stream_offsets: BTreeMap::new(),
1098            server_control_stream_id: None,
1099            client_h3_stream_buffers: BTreeMap::new(),
1100            client_h3_stream_buffer_offsets: BTreeMap::new(),
1101            client_h3_stream_types: BTreeMap::new(),
1102            close_draining: false,
1103            close_state: QuicCloseState::default(),
1104            recovery: recovery_state_from_transport(&fingerprint.transport),
1105            ack_delay_exponent: fingerprint.transport.ack_delay_exponent,
1106        })
1107    }
1108
1109    pub fn is_application_ready(&self) -> bool {
1110        self.client_application_keys.is_some() && self.server_application_keys.is_some()
1111    }
1112
1113    /// Native HTTP/3 TLS 1.3 resumption / QUIC 0-RTT status for this server
1114    /// handshake. Server-side `EarlyAccepted` requires that the server
1115    /// configured a matching `SSL_set_quic_early_data_context` per
1116    /// RFC 9001 section 4.6.
1117    pub fn handshake_status(&self) -> NativeH3HandshakeStatus {
1118        self.tls.handshake_status()
1119    }
1120
1121    /// BoringSSL `SSL_get_early_data_reason` code for diagnostic logging on
1122    /// the server side. See `ssl_early_data_reason_t` in `openssl/ssl.h`.
1123    pub fn early_data_reason(&self) -> u32 {
1124        self.tls.early_data_reason()
1125    }
1126
1127    pub fn is_close_draining(&self) -> bool {
1128        self.close_draining
1129    }
1130
1131    pub fn close_state(&self) -> &QuicCloseState {
1132        &self.close_state
1133    }
1134
1135    pub fn close_state_mut(&mut self) -> &mut QuicCloseState {
1136        &mut self.close_state
1137    }
1138
1139    /// RFC9000 § 10.2 closing: called by the server driver after emitting a
1140    /// CONNECTION_CLOSE frame to suppress further outbound application data
1141    /// and anchor the close timer.
1142    pub fn server_enter_closing(&mut self, now: Instant) {
1143        self.close_state.enter_closing(now);
1144        self.close_draining = true;
1145    }
1146
1147    /// RFC9000 § 10.2 draining: called by the server driver when entering
1148    /// the draining phase explicitly. Receiving a peer CONNECTION_CLOSE in
1149    /// `open_client_h3_event_packet` also drives this transition.
1150    pub fn server_enter_draining(&mut self, now: Instant) {
1151        self.close_state.enter_draining(now);
1152        self.close_draining = true;
1153    }
1154
1155    /// Returns the RFC9000 § 10.2 close window derived from the server's
1156    /// application-space loss detector via RFC9002 § 6.2.1
1157    /// `current_PTO * 3`.
1158    pub fn server_close_window(&self) -> Duration {
1159        self.server_application_loss_detector.close_window()
1160    }
1161
1162    pub fn server_is_close_window_expired(&self, now: Instant) -> bool {
1163        self.close_state.is_expired(now, self.server_close_window())
1164    }
1165
1166    pub fn server_close_time_until_expiry(&self, now: Instant) -> Option<Duration> {
1167        self.close_state
1168            .time_until_expiry(now, self.server_close_window())
1169    }
1170
1171    pub fn server_should_replay_connection_close(&self, now: Instant) -> bool {
1172        self.close_state.should_replay(now)
1173    }
1174
1175    pub fn server_mark_connection_close_replayed(&mut self, now: Instant) {
1176        self.close_state.mark_replayed(now);
1177    }
1178
1179    pub fn server_observe_inbound_packet_for_close(&mut self) -> u64 {
1180        self.close_state.observe_inbound_packet()
1181    }
1182
1183    pub fn server_application_pto(&self) -> Duration {
1184        self.server_application_loss_detector.current_pto()
1185    }
1186
1187    /// Current write-side key phase bit per RFC9001 § 6 (the bit set on
1188    /// outbound 1-RTT short-header packets).
1189    pub fn write_key_phase(&self) -> bool {
1190        self.write_key_phase
1191    }
1192
1193    /// Current read-side key phase bit per RFC9001 § 6 (what the peer's
1194    /// most-recent committed write phase looks like from here).
1195    pub fn read_key_phase(&self) -> bool {
1196        self.read_key_phase
1197    }
1198
1199    /// Whether a locally-initiated key update is currently waiting for an ACK
1200    /// of a packet sent at the new write phase per RFC9001 § 6.5.
1201    pub fn key_update_in_progress(&self) -> bool {
1202        self.application_key_update.write_update_in_progress
1203    }
1204
1205    /// Force a 1-RTT key update. Returns an error when the previous local key
1206    /// update has not yet been confirmed via ACK (RFC9001 § 6.5) so callers
1207    /// cannot accidentally chain updates faster than the peer can confirm.
1208    ///
1209    /// This is the deterministic test hook called for by the production
1210    /// implementation; in production code, a key update can also be triggered
1211    /// implicitly when a peer's packet at the next phase is decrypted.
1212    pub fn force_key_update(&mut self) -> Result<()> {
1213        if self.application_key_update.write_update_in_progress {
1214            return Err(Error::Quic(
1215                "RFC9001 § 6.5: cannot initiate a new key update while a previous one is unconfirmed"
1216                    .into(),
1217            ));
1218        }
1219        let next = self.server_application_next_keys.take().ok_or_else(|| {
1220            Error::Quic(
1221                "native QUIC server cannot force a key update before TLS application secrets are installed"
1222                    .into(),
1223            )
1224        })?;
1225        self.server_application_keys = Some(next);
1226        let new_current = self
1227            .server_application_keys
1228            .as_ref()
1229            .expect("server application keys just installed");
1230        self.server_application_next_keys = Some(derive_next_packet_key_material(new_current)?);
1231        self.write_key_phase = !self.write_key_phase;
1232        self.application_key_update.write_update_in_progress = true;
1233        self.application_key_update.write_update_anchor =
1234            Some(self.next_server_application_packet_number);
1235        Ok(())
1236    }
1237
1238    fn commit_receive_key_update(&mut self, now: Instant) -> Result<()> {
1239        let Some(current) = self.client_application_keys.take() else {
1240            return Err(Error::Quic(
1241                "native QUIC server cannot rotate read keys without an installed current key set"
1242                    .into(),
1243            ));
1244        };
1245        let Some(next) = self.client_application_next_keys.take() else {
1246            return Err(Error::Quic(
1247                "native QUIC server cannot rotate read keys without precomputed next key set"
1248                    .into(),
1249            ));
1250        };
1251        let old_phase = self.read_key_phase;
1252        self.client_application_keys = Some(next);
1253        let new_current = self
1254            .client_application_keys
1255            .as_ref()
1256            .expect("client application keys just installed");
1257        self.client_application_next_keys = Some(derive_next_packet_key_material(new_current)?);
1258        self.client_application_previous = Some(PreviousKeys {
1259            keys: current,
1260            phase: old_phase,
1261            retire_at: now + PREVIOUS_KEY_WINDOW,
1262        });
1263        self.read_key_phase = !self.read_key_phase;
1264
1265        if self.write_key_phase != self.read_key_phase
1266            && !self.application_key_update.write_update_in_progress
1267        {
1268            let next_write = self.server_application_next_keys.take().ok_or_else(|| {
1269                Error::Quic(
1270                    "native QUIC server cannot mirror peer key update without precomputed next write keys"
1271                        .into(),
1272                )
1273            })?;
1274            self.server_application_keys = Some(next_write);
1275            let new_current_write = self
1276                .server_application_keys
1277                .as_ref()
1278                .expect("server application keys just rotated");
1279            self.server_application_next_keys =
1280                Some(derive_next_packet_key_material(new_current_write)?);
1281            self.write_key_phase = !self.write_key_phase;
1282            self.application_key_update.write_update_in_progress = true;
1283            self.application_key_update.write_update_anchor =
1284                Some(self.next_server_application_packet_number);
1285        }
1286
1287        Ok(())
1288    }
1289
1290    pub fn server_application_lost_packets(&self) -> Vec<u64> {
1291        self.server_application_loss_detector.lost_packets()
1292    }
1293
1294    pub fn recovery(&self) -> &RecoveryState {
1295        &self.recovery
1296    }
1297
1298    pub fn loss_detection_timer(&self) -> Option<Instant> {
1299        self.recovery.loss_detection_timer()
1300    }
1301
1302    pub fn on_loss_detection_timeout(&mut self, now: Instant) -> LossDetectionOutcome {
1303        self.recovery.on_loss_detection_timeout(now)
1304    }
1305
1306    pub fn application_pto(&self) -> Duration {
1307        self.recovery.current_pto()
1308    }
1309
1310    pub fn application_pto_timeout(&self) -> Duration {
1311        let max_ack_delay = self.recovery.max_ack_delay();
1312        let backoff = 1u32 << self.recovery.pto_count().min(31);
1313        self.recovery
1314            .current_pto()
1315            .saturating_add(max_ack_delay.saturating_mul(backoff))
1316    }
1317
1318    pub fn retransmit_lost_server_application_stream_packets(
1319        &mut self,
1320    ) -> Result<Vec<ServerApplicationPacket>> {
1321        let mut lost_packets = self.server_application_loss_detector.lost_packets();
1322        lost_packets.append(&mut self.server_application_recovery_lost_packets);
1323        self.retransmit_server_application_stream_packets(lost_packets)
1324    }
1325
1326    pub fn retransmit_pto_server_application_stream_packets(
1327        &mut self,
1328        now: Instant,
1329        pto_timeout: Duration,
1330    ) -> Result<Vec<ServerApplicationPacket>> {
1331        let expired_packets = self
1332            .server_application_loss_detector
1333            .pto_expired_packets(now, pto_timeout);
1334        self.retransmit_server_application_stream_packets(expired_packets)
1335    }
1336
1337    fn retransmit_server_application_stream_packets<I>(
1338        &mut self,
1339        packet_numbers: I,
1340    ) -> Result<Vec<ServerApplicationPacket>>
1341    where
1342        I: IntoIterator<Item = u64>,
1343    {
1344        let mut packet_numbers = packet_numbers.into_iter().collect::<Vec<_>>();
1345        packet_numbers.sort_unstable();
1346        packet_numbers.dedup();
1347        let mut retransmits = Vec::new();
1348        for packet_number in packet_numbers {
1349            self.server_application_loss_detector
1350                .retire_packet(packet_number);
1351            let Some(sent) = self.server_application_sent_streams.remove(&packet_number) else {
1352                continue;
1353            };
1354            retransmits.push(self.build_server_application_stream_packet_at_offset(
1355                sent.stream_id,
1356                sent.stream_offset,
1357                sent.data,
1358                sent.fin,
1359            )?);
1360        }
1361        Ok(retransmits)
1362    }
1363
1364    pub fn process_client_initial(&mut self, datagram: &[u8]) -> Result<ServerHandshakeFlight> {
1365        self.process_client_initial_with_ecn(datagram, None)
1366    }
1367
1368    pub fn process_client_initial_with_ecn(
1369        &mut self,
1370        datagram: &[u8],
1371        ecn_mark: Option<QuicEcnMark>,
1372    ) -> Result<ServerHandshakeFlight> {
1373        let mut server_initial_crypto = Bytes::new();
1374        let mut server_handshake_crypto = Bytes::new();
1375
1376        for packet in split_long_header_datagram(datagram)? {
1377            if packet.packet_type != LongHeaderType::Initial {
1378                continue;
1379            }
1380
1381            let opened = open_long_header_packet(
1382                &self.client_initial_keys,
1383                &packet.packet,
1384                packet.packet_number_offset,
1385                self.next_client_initial_packet_number,
1386            )?;
1387            observe_packet_with_ecn(
1388                &mut self.client_initial_ack_tracker,
1389                opened.packet_number,
1390                ecn_mark,
1391                Instant::now(),
1392            );
1393            self.next_client_initial_packet_number = opened.packet_number + 1;
1394
1395            for frame in decode_frames(&opened.payload)? {
1396                for packet_number in self.server_initial_loss_detector.on_ack_frame(&frame)? {
1397                    self.server_initial_sent_crypto.remove(&packet_number);
1398                }
1399                let outcome = self.recovery.on_ack_received(
1400                    PacketNumberSpace::Initial,
1401                    &frame,
1402                    self.ack_delay_exponent,
1403                    Instant::now(),
1404                )?;
1405                for (packet_number, _) in outcome.newly_acked {
1406                    self.server_initial_sent_crypto.remove(&packet_number);
1407                }
1408                if let QuicFrame::Crypto { offset, data } = frame {
1409                    self.client_initial_crypto.insert(offset, data)?;
1410                }
1411            }
1412
1413            let crypto_data = self.client_initial_crypto.take_contiguous();
1414            if crypto_data.is_empty() {
1415                continue;
1416            }
1417
1418            self.tls
1419                .provide_crypto(QuicEncryptionLevel::Initial, &crypto_data)?;
1420            self.install_tls_secrets()?;
1421            server_initial_crypto = self.tls.take_crypto(QuicEncryptionLevel::Initial);
1422            server_handshake_crypto = self.tls.take_crypto(QuicEncryptionLevel::Handshake);
1423        }
1424
1425        let secrets = self.tls.secrets();
1426        let mut packets = Vec::new();
1427        let mut datagram_out = Vec::new();
1428
1429        if !server_initial_crypto.is_empty() {
1430            let packet = self.build_server_initial_packet(server_initial_crypto)?;
1431            datagram_out.extend_from_slice(&packet.packet);
1432            packets.push(packet);
1433        }
1434        if !server_handshake_crypto.is_empty() {
1435            let packet = self.build_server_handshake_packet(server_handshake_crypto)?;
1436            datagram_out.extend_from_slice(&packet.packet);
1437            packets.push(packet);
1438        }
1439
1440        Ok(ServerHandshakeFlight {
1441            datagram: Bytes::from(datagram_out),
1442            packets,
1443            secrets,
1444        })
1445    }
1446
1447    pub fn build_server_initial_ack_packet(&mut self) -> Result<Option<ClientAckPacket>> {
1448        let packet = build_ack_packet(
1449            LongHeaderType::Initial,
1450            &self.server_initial_keys,
1451            &self.client_source_cid,
1452            &self.server_source_cid,
1453            &mut self.client_initial_ack_tracker,
1454            self.next_server_initial_packet_number,
1455        )?;
1456        if packet.is_some() {
1457            self.next_server_initial_packet_number += 1;
1458        }
1459        Ok(packet)
1460    }
1461
1462    pub fn build_server_handshake_ack_packet(&mut self) -> Result<Option<ClientAckPacket>> {
1463        if self.client_handshake_ack_tracker.is_empty() {
1464            return Ok(None);
1465        }
1466        let Some(server_handshake_keys) = &self.server_handshake_keys else {
1467            return Err(Error::Quic(
1468                "native server Handshake ACK encryption is waiting for TLS Handshake keys".into(),
1469            ));
1470        };
1471        let packet = build_ack_packet(
1472            LongHeaderType::Handshake,
1473            server_handshake_keys,
1474            &self.client_source_cid,
1475            &self.server_source_cid,
1476            &mut self.client_handshake_ack_tracker,
1477            self.next_server_handshake_packet_number,
1478        )?;
1479        if packet.is_some() {
1480            self.next_server_handshake_packet_number += 1;
1481        }
1482        Ok(packet)
1483    }
1484
1485    pub fn process_client_handshake(
1486        &mut self,
1487        datagram: &[u8],
1488    ) -> Result<ProcessedClientHandshake> {
1489        self.process_client_handshake_with_ecn(datagram, None)
1490    }
1491
1492    pub fn process_client_handshake_with_ecn(
1493        &mut self,
1494        datagram: &[u8],
1495        ecn_mark: Option<QuicEcnMark>,
1496    ) -> Result<ProcessedClientHandshake> {
1497        let Some(client_handshake_keys) = &self.client_handshake_keys else {
1498            return Err(Error::Quic(
1499                "native server Handshake packet decryption is waiting for TLS Handshake keys"
1500                    .into(),
1501            ));
1502        };
1503
1504        let mut packet_number = self.next_client_handshake_packet_number;
1505        for packet in split_long_header_datagram(datagram)? {
1506            if packet.packet_type != LongHeaderType::Handshake {
1507                continue;
1508            }
1509
1510            let opened = open_long_header_packet(
1511                client_handshake_keys,
1512                &packet.packet,
1513                packet.packet_number_offset,
1514                self.next_client_handshake_packet_number,
1515            )?;
1516            packet_number = opened.packet_number;
1517            observe_packet_with_ecn(
1518                &mut self.client_handshake_ack_tracker,
1519                opened.packet_number,
1520                ecn_mark,
1521                Instant::now(),
1522            );
1523            self.next_client_handshake_packet_number = opened.packet_number + 1;
1524
1525            for frame in decode_frames(&opened.payload)? {
1526                for packet_number in self.server_handshake_loss_detector.on_ack_frame(&frame)? {
1527                    self.server_handshake_sent_crypto.remove(&packet_number);
1528                }
1529                let outcome = self.recovery.on_ack_received(
1530                    PacketNumberSpace::Handshake,
1531                    &frame,
1532                    self.ack_delay_exponent,
1533                    Instant::now(),
1534                )?;
1535                for (packet_number, _) in outcome.newly_acked {
1536                    self.server_handshake_sent_crypto.remove(&packet_number);
1537                }
1538                if let QuicFrame::Crypto { offset, data } = frame {
1539                    self.client_handshake_crypto.insert(offset, data)?;
1540                }
1541            }
1542        }
1543
1544        let crypto_data = self.client_handshake_crypto.take_contiguous();
1545        if !crypto_data.is_empty() {
1546            self.tls
1547                .provide_crypto(QuicEncryptionLevel::Handshake, &crypto_data)?;
1548            self.install_tls_secrets()?;
1549        }
1550
1551        Ok(ProcessedClientHandshake {
1552            packet_number,
1553            crypto_data,
1554            secrets: self.tls.secrets(),
1555        })
1556    }
1557
1558    pub fn open_client_application_packet(&mut self, packet: &[u8]) -> Result<Vec<QuicFrame>> {
1559        self.open_client_application_packet_with_ecn(packet, None)
1560    }
1561
1562    pub fn open_client_application_packet_with_ecn(
1563        &mut self,
1564        packet: &[u8],
1565        ecn_mark: Option<QuicEcnMark>,
1566    ) -> Result<Vec<QuicFrame>> {
1567        // RFC9000 § 10.2: stop parsing inbound packets once we have entered
1568        // the draining phase. Closing-phase parsing is preserved so the
1569        // server can take the MAY-optimisation path (§ 10.2: closing -> draining
1570        // once the peer's CONNECTION_CLOSE is observed).
1571        if self.close_state.is_draining() {
1572            return Ok(Vec::new());
1573        }
1574        let Some(client_application_keys) = self.client_application_keys.as_ref() else {
1575            return Err(Error::Quic(
1576                "native server application packet decryption is waiting for TLS application keys"
1577                    .into(),
1578            ));
1579        };
1580        let now = Instant::now();
1581        let destination_cid_len =
1582            match_local_connection_id(packet, self.cid_inventory.unretired_locals())
1583                .map(|(_, len)| len)
1584                .unwrap_or_else(|| self.server_source_cid.as_bytes().len());
1585        let opened = try_open_one_rtt_packet(
1586            client_application_keys,
1587            self.client_application_next_keys.as_ref(),
1588            self.client_application_previous.as_ref(),
1589            self.read_key_phase,
1590            now,
1591            packet,
1592            destination_cid_len,
1593            self.next_client_application_packet_number,
1594        )?;
1595        if matches!(opened.outcome, OneRttOpenOutcome::Next) {
1596            self.commit_receive_key_update(now)?;
1597        }
1598        let opened = opened.opened;
1599        self.next_client_application_packet_number = opened.packet_number + 1;
1600        let frames = decode_frames(&opened.payload)?;
1601        self.apply_opened_client_application_frames(opened.packet_number, frames, now, ecn_mark)
1602    }
1603
1604    pub fn open_client_zero_rtt_h3_event_packet(
1605        &mut self,
1606        datagram: &[u8],
1607    ) -> Result<Vec<ClientH3Event>> {
1608        self.open_client_zero_rtt_h3_event_packet_with_ecn(datagram, None)
1609    }
1610
1611    pub fn open_client_zero_rtt_h3_event_packet_with_ecn(
1612        &mut self,
1613        datagram: &[u8],
1614        ecn_mark: Option<QuicEcnMark>,
1615    ) -> Result<Vec<ClientH3Event>> {
1616        if self.close_state.is_draining() || !self.handshake_status().early_data_accepted() {
1617            return Ok(Vec::new());
1618        }
1619        let Some(client_early_data_keys) = self.client_early_data_keys.clone() else {
1620            return Ok(Vec::new());
1621        };
1622
1623        let mut events = Vec::new();
1624        for packet in split_long_header_datagram(datagram)? {
1625            if packet.packet_type != LongHeaderType::ZeroRtt {
1626                continue;
1627            }
1628            let now = Instant::now();
1629            let opened = open_long_header_packet(
1630                &client_early_data_keys,
1631                &packet.packet,
1632                packet.packet_number_offset,
1633                self.next_client_application_packet_number,
1634            )?;
1635            self.next_client_application_packet_number = opened.packet_number + 1;
1636            let frames = decode_frames(&opened.payload)?;
1637            let frames = self.apply_opened_client_application_frames(
1638                opened.packet_number,
1639                frames,
1640                now,
1641                ecn_mark,
1642            )?;
1643            events.extend(self.client_h3_events_from_frames(frames, None)?);
1644        }
1645        Ok(events)
1646    }
1647
1648    fn apply_opened_client_application_frames(
1649        &mut self,
1650        packet_number: u64,
1651        frames: Vec<QuicFrame>,
1652        now: Instant,
1653        ecn_mark: Option<QuicEcnMark>,
1654    ) -> Result<Vec<QuicFrame>> {
1655        for frame in &frames {
1656            if let QuicFrame::Stream {
1657                stream_id,
1658                offset,
1659                data,
1660                ..
1661            } = frame
1662            {
1663                self.server_application_receive_flow_control
1664                    .observe_stream_frame(*stream_id, *offset, data.len())?;
1665            }
1666            for packet_number in self.server_application_loss_detector.on_ack_frame(frame)? {
1667                self.server_application_sent_streams.remove(&packet_number);
1668                self.application_key_update.note_packet_acked(packet_number);
1669            }
1670            if matches!(frame, QuicFrame::Ack { .. } | QuicFrame::AckEcn { .. }) {
1671                let outcome = self.recovery.on_ack_received(
1672                    PacketNumberSpace::Application,
1673                    frame,
1674                    self.ack_delay_exponent,
1675                    now,
1676                )?;
1677                for (packet_number, _) in outcome.newly_acked {
1678                    self.server_application_sent_streams.remove(&packet_number);
1679                    self.application_key_update.note_packet_acked(packet_number);
1680                }
1681                self.server_application_recovery_lost_packets.extend(
1682                    outcome
1683                        .lost
1684                        .into_iter()
1685                        .map(|(packet_number, _)| packet_number),
1686                );
1687            }
1688            match frame {
1689                QuicFrame::MaxData(max_data) => {
1690                    self.server_application_flow_control
1691                        .apply_max_data(*max_data);
1692                }
1693                QuicFrame::MaxStreamData {
1694                    stream_id,
1695                    max_stream_data,
1696                } => self
1697                    .server_application_flow_control
1698                    .apply_max_stream_data(*stream_id, *max_stream_data),
1699                QuicFrame::MaxStreams {
1700                    bidirectional,
1701                    max_streams,
1702                } => self
1703                    .server_application_flow_control
1704                    .apply_max_streams(*bidirectional, *max_streams),
1705                QuicFrame::NewConnectionId {
1706                    sequence_number,
1707                    retire_prior_to,
1708                    connection_id,
1709                    stateless_reset_token,
1710                } => {
1711                    let _ = self.cid_inventory.observe_peer_new_connection_id(
1712                        *sequence_number,
1713                        *retire_prior_to,
1714                        connection_id.clone(),
1715                        *stateless_reset_token,
1716                    );
1717                }
1718                QuicFrame::RetireConnectionId { sequence_number } => {
1719                    let _ = self
1720                        .cid_inventory
1721                        .observe_peer_retire_connection_id(*sequence_number);
1722                }
1723                _ => {}
1724            }
1725        }
1726        if frames.iter().any(is_ack_eliciting_quic_frame) {
1727            observe_packet_with_ecn(
1728                &mut self.client_application_ack_tracker,
1729                packet_number,
1730                ecn_mark,
1731                now,
1732            );
1733        }
1734        Ok(frames.into_iter().filter(is_not_padding_frame).collect())
1735    }
1736
1737    pub fn build_server_application_ack_packet(
1738        &mut self,
1739    ) -> Result<Option<ServerApplicationAckPacket>> {
1740        self.build_server_application_ack_packet_with_delay(Instant::now(), 0)
1741    }
1742
1743    pub fn build_server_application_ack_packet_with_delay(
1744        &mut self,
1745        now: Instant,
1746        ack_delay_exponent: u64,
1747    ) -> Result<Option<ServerApplicationAckPacket>> {
1748        if self.client_application_ack_tracker.is_empty() {
1749            return Ok(None);
1750        }
1751        let Some(server_application_keys) = &self.server_application_keys else {
1752            return Err(Error::Quic(
1753                "native server application ACK encryption is waiting for TLS application keys"
1754                    .into(),
1755            ));
1756        };
1757
1758        let packet_number = self.next_server_application_packet_number;
1759        let packet_number_len = 2;
1760        let frame = encode_frame(
1761            &self
1762                .client_application_ack_tracker
1763                .to_ack_frame_with_delay(now, ack_delay_exponent)?,
1764        );
1765        let packet = protect_short_header_packet(
1766            server_application_keys,
1767            &self.server_outbound_peer_cid(),
1768            packet_number,
1769            packet_number_len,
1770            self.write_key_phase,
1771            &frame,
1772        )?;
1773        self.client_application_ack_tracker.mark_ack_sent();
1774        self.next_server_application_packet_number += 1;
1775
1776        Ok(Some(ServerApplicationAckPacket {
1777            packet,
1778            packet_number,
1779            packet_number_offset: 1 + self.server_outbound_peer_cid().as_bytes().len(),
1780        }))
1781    }
1782
1783    pub fn build_server_application_ack_packet_after(
1784        &mut self,
1785        threshold: usize,
1786    ) -> Result<Option<ServerApplicationAckPacket>> {
1787        if !self
1788            .client_application_ack_tracker
1789            .should_ack_after(threshold)
1790        {
1791            return Ok(None);
1792        }
1793        self.build_server_application_ack_packet()
1794    }
1795
1796    pub fn build_server_application_ack_packet_after_or_delay(
1797        &mut self,
1798        threshold: usize,
1799        max_ack_delay: Duration,
1800        now: Instant,
1801        ack_delay_exponent: u64,
1802    ) -> Result<Option<ServerApplicationAckPacket>> {
1803        if !self
1804            .client_application_ack_tracker
1805            .should_ack_after_or_delay(threshold, max_ack_delay, now)
1806        {
1807            return Ok(None);
1808        }
1809        self.build_server_application_ack_packet_with_delay(now, ack_delay_exponent)
1810    }
1811
1812    pub fn server_application_ack_deadline(&self, max_ack_delay: Duration) -> Option<Instant> {
1813        self.client_application_ack_tracker
1814            .pending_ack_deadline(max_ack_delay)
1815    }
1816
1817    pub fn open_client_h3_stream_packet(
1818        &mut self,
1819        packet: &[u8],
1820    ) -> Result<Vec<ClientH3StreamEvent>> {
1821        Ok(self
1822            .open_client_h3_event_packet(packet)?
1823            .into_iter()
1824            .filter_map(|event| match event {
1825                ClientH3Event::Stream(event) => Some(event),
1826                ClientH3Event::ResetStream { .. }
1827                | ClientH3Event::StopSending { .. }
1828                | ClientH3Event::ConnectionClose { .. }
1829                | ClientH3Event::PathChallenge(_) => None,
1830            })
1831            .collect())
1832    }
1833
1834    pub fn open_client_h3_event_packet(&mut self, packet: &[u8]) -> Result<Vec<ClientH3Event>> {
1835        // RFC9000 § 10.2: once we are in the draining phase we MUST NOT
1836        // process any further inbound packets. While in the closing phase
1837        // we still parse incoming packets so we can take the MAY-optimisation
1838        // path and transition to draining when the peer also closes.
1839        if self.close_state.is_draining() {
1840            return Ok(Vec::new());
1841        }
1842        let frames = self.open_client_application_packet(packet)?;
1843        self.client_h3_events_from_frames(frames, None)
1844    }
1845
1846    pub fn open_client_h3_event_packet_from(
1847        &mut self,
1848        packet: &[u8],
1849        remote_address: SocketAddr,
1850    ) -> Result<Vec<ClientH3Event>> {
1851        if self.close_state.is_draining() {
1852            return Ok(Vec::new());
1853        }
1854        let frames = self.open_client_application_packet(packet)?;
1855        self.client_h3_events_from_frames(frames, Some(remote_address))
1856    }
1857
1858    fn client_h3_events_from_frames(
1859        &mut self,
1860        frames: Vec<QuicFrame>,
1861        remote_address: Option<SocketAddr>,
1862    ) -> Result<Vec<ClientH3Event>> {
1863        let mut events = Vec::new();
1864        for frame in frames {
1865            match frame {
1866                QuicFrame::Stream {
1867                    stream_id,
1868                    offset,
1869                    fin,
1870                    data,
1871                } => {
1872                    if let Some(event) = apply_h3_stream_frame(
1873                        &mut self.client_h3_stream_buffers,
1874                        &mut self.client_h3_stream_buffer_offsets,
1875                        &mut self.client_h3_stream_types,
1876                        stream_id,
1877                        offset,
1878                        fin,
1879                        data,
1880                    )? {
1881                        events.push(ClientH3Event::Stream(ClientH3StreamEvent {
1882                            stream_id: event.stream_id,
1883                            stream_type: event.stream_type,
1884                            fin: event.fin,
1885                            frames: event.frames,
1886                        }));
1887                    }
1888                }
1889                QuicFrame::ResetStream {
1890                    stream_id,
1891                    error_code,
1892                    final_size,
1893                } => events.push(ClientH3Event::ResetStream {
1894                    stream_id,
1895                    error_code,
1896                    final_size,
1897                }),
1898                QuicFrame::StopSending {
1899                    stream_id,
1900                    error_code,
1901                } => events.push(ClientH3Event::StopSending {
1902                    stream_id,
1903                    error_code,
1904                }),
1905                QuicFrame::ConnectionClose {
1906                    error_code,
1907                    frame_type,
1908                    reason,
1909                } => {
1910                    // RFC9000 § 10.2: peer CONNECTION_CLOSE transitions us
1911                    // into the draining phase, which forbids any further
1912                    // outbound packets except an optional one-shot
1913                    // CONNECTION_CLOSE acknowledgement.
1914                    self.close_draining = true;
1915                    self.close_state.enter_draining(Instant::now());
1916                    events.push(ClientH3Event::ConnectionClose {
1917                        error_code,
1918                        frame_type,
1919                        reason,
1920                    });
1921                }
1922                QuicFrame::PathChallenge(data) => events.push(ClientH3Event::PathChallenge(data)),
1923                QuicFrame::PathResponse(data) => {
1924                    if let Some(remote_address) = remote_address {
1925                        self.server_path_validator
1926                            .on_path_response_from(remote_address, data);
1927                    } else {
1928                        self.server_path_validator.on_path_response(data);
1929                    }
1930                }
1931                QuicFrame::Padding
1932                | QuicFrame::Ping
1933                | QuicFrame::Ack { .. }
1934                | QuicFrame::AckEcn { .. }
1935                | QuicFrame::Crypto { .. }
1936                | QuicFrame::MaxData(_)
1937                | QuicFrame::MaxStreamData { .. }
1938                | QuicFrame::MaxStreams { .. }
1939                | QuicFrame::DataBlocked { .. }
1940                | QuicFrame::StreamDataBlocked { .. }
1941                | QuicFrame::StreamsBlocked { .. }
1942                | QuicFrame::NewConnectionId { .. }
1943                | QuicFrame::RetireConnectionId { .. }
1944                | QuicFrame::HandshakeDone => {}
1945            }
1946        }
1947        Ok(events)
1948    }
1949
1950    pub fn build_server_h3_settings_packet(
1951        &mut self,
1952        fingerprint: &Http3Fingerprint,
1953    ) -> Result<ServerApplicationPacket> {
1954        let stream_id = self.server_control_stream_id.unwrap_or_else(|| {
1955            let stream_id = self.next_server_unidirectional_stream_id;
1956            self.next_server_unidirectional_stream_id += 4;
1957            self.server_control_stream_id = Some(stream_id);
1958            stream_id
1959        });
1960        let settings = native::encode_frame(&native::H3Frame::Settings(
1961            native::encode_fingerprint_settings_payload(fingerprint),
1962        ));
1963        let payload = if self.server_stream_offsets.contains_key(&stream_id) {
1964            settings
1965        } else {
1966            native::encode_unidirectional_stream(&native::H3UnidirectionalStream {
1967                stream_type: native::H3StreamType::Control,
1968                payload: settings,
1969            })
1970        };
1971        self.build_server_application_stream_packet(stream_id, payload, false)
1972    }
1973
1974    pub fn build_server_h3_goaway_packet(&mut self, id: u64) -> Result<ServerApplicationPacket> {
1975        let Some(stream_id) = self.server_control_stream_id else {
1976            return Err(Error::HttpProtocol(
1977                "native server H3 GOAWAY requires an open control stream".into(),
1978            ));
1979        };
1980        self.build_server_application_stream_packet(
1981            stream_id,
1982            native::encode_frame(&native::H3Frame::GoAway { id }),
1983            false,
1984        )
1985    }
1986
1987    pub fn build_server_reset_stream_packet(
1988        &mut self,
1989        stream_id: u64,
1990        error_code: u64,
1991    ) -> Result<ServerApplicationControlPacket> {
1992        let final_size = *self.server_stream_offsets.get(&stream_id).unwrap_or(&0);
1993        self.build_server_application_control_packet(QuicFrame::ResetStream {
1994            stream_id,
1995            error_code,
1996            final_size,
1997        })
1998    }
1999
2000    pub fn build_server_connection_close_packet(
2001        &mut self,
2002        error_code: u64,
2003        reason: Bytes,
2004    ) -> Result<ServerApplicationControlPacket> {
2005        let packet = self.build_server_application_control_packet(QuicFrame::ConnectionClose {
2006            error_code,
2007            frame_type: None,
2008            reason,
2009        })?;
2010        // RFC9000 § 10.2: emitting a CONNECTION_CLOSE transitions the
2011        // connection into the closing phase. Mirroring the client handshake
2012        // helper, we anchor the timer at build time so server-side drivers
2013        // do not have to call `server_enter_closing` separately on every
2014        // path that emits a CONNECTION_CLOSE.
2015        self.server_enter_closing(Instant::now());
2016        Ok(packet)
2017    }
2018
2019    pub fn build_server_max_data_packet(
2020        &mut self,
2021        max_data: u64,
2022    ) -> Result<ServerApplicationControlPacket> {
2023        self.build_server_application_control_packet(QuicFrame::MaxData(max_data))
2024    }
2025
2026    pub fn build_server_max_stream_data_packet(
2027        &mut self,
2028        stream_id: u64,
2029        max_stream_data: u64,
2030    ) -> Result<ServerApplicationControlPacket> {
2031        self.build_server_application_control_packet(QuicFrame::MaxStreamData {
2032            stream_id,
2033            max_stream_data,
2034        })
2035    }
2036
2037    pub fn build_server_max_streams_packet(
2038        &mut self,
2039        bidirectional: bool,
2040        max_streams: u64,
2041    ) -> Result<ServerApplicationControlPacket> {
2042        self.build_server_application_control_packet(QuicFrame::MaxStreams {
2043            bidirectional,
2044            max_streams,
2045        })
2046    }
2047
2048    pub fn build_server_flow_control_blocked_packet(
2049        &mut self,
2050    ) -> Result<Option<ServerApplicationControlPacket>> {
2051        self.server_application_flow_control
2052            .take_blocked_frame()
2053            .map(|frame| self.build_server_application_control_packet(frame))
2054            .transpose()
2055    }
2056
2057    pub fn build_server_receive_flow_control_update_packets(
2058        &mut self,
2059    ) -> Result<Vec<ServerApplicationControlPacket>> {
2060        self.server_application_receive_flow_control
2061            .take_update_frames()
2062            .into_iter()
2063            .map(|frame| self.build_server_application_control_packet(frame))
2064            .collect()
2065    }
2066
2067    // Server-side symmetric hook for app-consumed bytes. RFC 9000 Section 4
2068    // treats client and server receivers identically: both advertise
2069    // MAX_DATA / MAX_STREAM_DATA absolute values derived from
2070    // `initial_max_*data + bytes_consumed_by_application` and only emit a
2071    // frame when the gating threshold is crossed.
2072    pub fn record_server_stream_consumed(&mut self, stream_id: u64, len: u64) -> Result<()> {
2073        self.server_application_receive_flow_control
2074            .record_stream_consumed(stream_id, len)
2075    }
2076
2077    pub fn release_server_stream(&mut self, stream_id: u64) {
2078        self.server_application_receive_flow_control
2079            .release_stream(stream_id);
2080    }
2081
2082    pub fn build_server_handshake_done_packet(&mut self) -> Result<ServerApplicationControlPacket> {
2083        self.build_server_application_control_packet(QuicFrame::HandshakeDone)
2084    }
2085
2086    pub fn build_server_new_connection_id_packet(
2087        &mut self,
2088        sequence_number: u64,
2089        retire_prior_to: u64,
2090        connection_id: ConnectionId,
2091        stateless_reset_token: [u8; 16],
2092    ) -> Result<ServerApplicationControlPacket> {
2093        if connection_id.as_bytes().is_empty() {
2094            return Err(Error::Quic(
2095                "native QUIC NEW_CONNECTION_ID cannot carry an empty connection id".into(),
2096            ));
2097        }
2098        if retire_prior_to > sequence_number {
2099            return Err(Error::Quic(
2100                "native QUIC NEW_CONNECTION_ID retire_prior_to exceeds sequence_number".into(),
2101            ));
2102        }
2103        self.cid_inventory.register_local_issued(
2104            sequence_number,
2105            connection_id.clone(),
2106            stateless_reset_token,
2107        )?;
2108        self.build_server_application_control_packet(QuicFrame::NewConnectionId {
2109            sequence_number,
2110            retire_prior_to,
2111            connection_id: Bytes::copy_from_slice(connection_id.as_bytes()),
2112            stateless_reset_token,
2113        })
2114    }
2115
2116    pub fn build_server_path_response_packet(
2117        &mut self,
2118        data: [u8; 8],
2119    ) -> Result<ServerApplicationControlPacket> {
2120        self.build_server_application_control_packet_with_min_datagram(
2121            QuicFrame::PathResponse(data),
2122            MIN_PATH_VALIDATION_DATAGRAM,
2123        )
2124    }
2125
2126    pub fn build_server_path_challenge_packet_for_address(
2127        &mut self,
2128        remote_address: SocketAddr,
2129        data: [u8; 8],
2130    ) -> Result<ServerApplicationControlPacket> {
2131        let frame = self
2132            .server_path_validator
2133            .path_challenge_for_peer_address(remote_address, data)?;
2134        self.build_server_application_control_packet_with_min_datagram(
2135            frame,
2136            MIN_PATH_VALIDATION_DATAGRAM,
2137        )
2138    }
2139
2140    pub fn build_server_retire_connection_id_packet(
2141        &mut self,
2142        sequence_number: u64,
2143    ) -> Result<ServerApplicationControlPacket> {
2144        self.build_server_application_control_packet(QuicFrame::RetireConnectionId {
2145            sequence_number,
2146        })
2147    }
2148
2149    pub fn build_server_connection_migration_close_packet(
2150        &mut self,
2151    ) -> Result<ServerApplicationControlPacket> {
2152        let packet = self.build_server_application_control_packet(QuicFrame::ConnectionClose {
2153            error_code: QUIC_CONNECTION_MIGRATION_ERROR,
2154            frame_type: None,
2155            reason: Bytes::new(),
2156        })?;
2157        self.server_enter_closing(Instant::now());
2158        Ok(packet)
2159    }
2160
2161    pub fn server_pop_pending_peer_retires(&mut self) -> Vec<u64> {
2162        self.cid_inventory.drain_pending_peer_retires()
2163    }
2164
2165    pub fn server_promote_peer_cid(&mut self, sequence_number: u64) -> Result<()> {
2166        self.cid_inventory.promote_peer_to_active(sequence_number)
2167    }
2168
2169    pub fn server_outbound_peer_cid(&self) -> ConnectionId {
2170        self.cid_inventory
2171            .active_peer()
2172            .and_then(|entry| ConnectionId::from_bytes(entry.connection_id.clone()).ok())
2173            .unwrap_or_else(|| self.client_source_cid.clone())
2174    }
2175
2176    pub fn server_local_cids_for_routing(&self) -> Vec<ConnectionId> {
2177        self.cid_inventory
2178            .unretired_locals()
2179            .map(|entry| entry.connection_id.clone())
2180            .collect()
2181    }
2182
2183    pub fn is_server_path_address_validated(&self, remote_address: &SocketAddr) -> bool {
2184        self.server_path_validator
2185            .is_address_validated(remote_address)
2186    }
2187
2188    pub fn build_server_h3_raw_stream_packet(
2189        &mut self,
2190        stream_id: u64,
2191        data: Bytes,
2192        fin: bool,
2193    ) -> Result<ServerApplicationPacket> {
2194        self.build_server_application_stream_packet(stream_id, data, fin)
2195    }
2196
2197    pub fn build_server_h3_response_packet(
2198        &mut self,
2199        stream_id: u64,
2200        headers: Vec<native::H3Header>,
2201        body: Option<Bytes>,
2202        fin: bool,
2203    ) -> Result<ServerApplicationPacket> {
2204        let mut payload = native::encode_frame(&native::H3Frame::Headers(
2205            native::encode_header_block(&headers),
2206        ))
2207        .to_vec();
2208        if let Some(body) = body {
2209            payload.extend_from_slice(&native::encode_frame(&native::H3Frame::Data(body)));
2210        }
2211        self.build_server_application_stream_packet(stream_id, Bytes::from(payload), fin)
2212    }
2213
2214    pub fn build_server_h3_response_data_packet(
2215        &mut self,
2216        stream_id: u64,
2217        data: Bytes,
2218        fin: bool,
2219    ) -> Result<ServerApplicationPacket> {
2220        self.build_server_application_stream_packet(
2221            stream_id,
2222            native::encode_frame(&native::H3Frame::Data(data)),
2223            fin,
2224        )
2225    }
2226
2227    fn install_tls_secrets(&mut self) -> Result<()> {
2228        for secret in self.tls.secrets() {
2229            match (secret.direction, secret.level) {
2230                (QuicSecretDirection::Read, QuicEncryptionLevel::Handshake) => {
2231                    self.client_handshake_keys = Some(secret.packet_key_material()?);
2232                    self.recovery.set_has_handshake_keys(true);
2233                }
2234                (QuicSecretDirection::Read, QuicEncryptionLevel::EarlyData) => {
2235                    self.client_early_data_keys = Some(secret.packet_key_material()?);
2236                }
2237                (QuicSecretDirection::Write, QuicEncryptionLevel::Handshake) => {
2238                    self.server_handshake_keys = Some(secret.packet_key_material()?);
2239                    self.recovery.set_has_handshake_keys(true);
2240                }
2241                (QuicSecretDirection::Read, QuicEncryptionLevel::Application) => {
2242                    let keys = secret.packet_key_material()?;
2243                    self.client_application_next_keys =
2244                        Some(derive_next_packet_key_material(&keys)?);
2245                    self.client_application_keys = Some(keys);
2246                }
2247                (QuicSecretDirection::Write, QuicEncryptionLevel::Application) => {
2248                    let keys = secret.packet_key_material()?;
2249                    self.server_application_next_keys =
2250                        Some(derive_next_packet_key_material(&keys)?);
2251                    self.server_application_keys = Some(keys);
2252                }
2253                _ => {}
2254            }
2255        }
2256        if self.is_application_ready() && !self.recovery.handshake_complete() {
2257            self.recovery.discard_space(PacketNumberSpace::Initial);
2258            self.recovery.discard_space(PacketNumberSpace::Handshake);
2259            self.recovery.mark_handshake_complete();
2260        }
2261        Ok(())
2262    }
2263
2264    fn build_server_initial_packet(&mut self, crypto_data: Bytes) -> Result<ServerHandshakePacket> {
2265        let crypto_offset = self.server_initial_crypto_offset;
2266        let packet = self.build_server_initial_packet_at_offset_with_sent_at(
2267            crypto_offset,
2268            crypto_data,
2269            Instant::now(),
2270        )?;
2271        self.server_initial_crypto_offset += packet.crypto_data.len() as u64;
2272        Ok(packet)
2273    }
2274
2275    fn build_server_initial_packet_at_offset_with_sent_at(
2276        &mut self,
2277        crypto_offset: u64,
2278        crypto_data: Bytes,
2279        sent_at: Instant,
2280    ) -> Result<ServerHandshakePacket> {
2281        let packet_number = self.next_server_initial_packet_number;
2282        self.next_server_initial_packet_number += 1;
2283        let packet = build_server_crypto_packet(
2284            LongHeaderType::Initial,
2285            &self.server_initial_keys,
2286            &self.client_source_cid,
2287            &self.server_source_cid,
2288            packet_number,
2289            crypto_offset,
2290            crypto_data.clone(),
2291        )?;
2292        self.server_initial_loss_detector
2293            .on_packet_sent_at(packet_number, sent_at);
2294        self.recovery.on_packet_sent(
2295            PacketNumberSpace::Initial,
2296            packet_number,
2297            SentPacketInfo::new(sent_at, packet.packet.len(), true, true),
2298        );
2299        self.server_initial_sent_crypto.insert(
2300            packet_number,
2301            SentCryptoPacket {
2302                packet_type: LongHeaderType::Initial,
2303                crypto_offset,
2304                crypto_data: crypto_data.clone(),
2305            },
2306        );
2307        Ok(packet)
2308    }
2309
2310    fn build_server_handshake_packet(
2311        &mut self,
2312        crypto_data: Bytes,
2313    ) -> Result<ServerHandshakePacket> {
2314        let crypto_offset = self.server_handshake_crypto_offset;
2315        let packet = self.build_server_handshake_packet_at_offset_with_sent_at(
2316            crypto_offset,
2317            crypto_data,
2318            Instant::now(),
2319        )?;
2320        self.server_handshake_crypto_offset += packet.crypto_data.len() as u64;
2321        Ok(packet)
2322    }
2323
2324    pub fn retransmit_pto_server_crypto_packets(
2325        &mut self,
2326        now: Instant,
2327        pto: Duration,
2328    ) -> Result<Vec<ServerHandshakePacket>> {
2329        let mut retransmits = Vec::new();
2330        for packet_number in self
2331            .server_initial_loss_detector
2332            .pto_expired_packets(now, pto)
2333        {
2334            self.server_initial_loss_detector
2335                .retire_packet(packet_number);
2336            let Some(sent) = self.server_initial_sent_crypto.remove(&packet_number) else {
2337                continue;
2338            };
2339            if sent.packet_type != LongHeaderType::Initial {
2340                continue;
2341            }
2342            retransmits.push(self.build_server_initial_packet_at_offset_with_sent_at(
2343                sent.crypto_offset,
2344                sent.crypto_data,
2345                now,
2346            )?);
2347        }
2348        for packet_number in self
2349            .server_handshake_loss_detector
2350            .pto_expired_packets(now, pto)
2351        {
2352            self.server_handshake_loss_detector
2353                .retire_packet(packet_number);
2354            let Some(sent) = self.server_handshake_sent_crypto.remove(&packet_number) else {
2355                continue;
2356            };
2357            if sent.packet_type != LongHeaderType::Handshake {
2358                continue;
2359            }
2360            retransmits.push(self.build_server_handshake_packet_at_offset_with_sent_at(
2361                sent.crypto_offset,
2362                sent.crypto_data,
2363                now,
2364            )?);
2365        }
2366        Ok(retransmits)
2367    }
2368
2369    fn build_server_handshake_packet_at_offset_with_sent_at(
2370        &mut self,
2371        crypto_offset: u64,
2372        crypto_data: Bytes,
2373        sent_at: Instant,
2374    ) -> Result<ServerHandshakePacket> {
2375        let Some(server_handshake_keys) = &self.server_handshake_keys else {
2376            return Err(Error::Quic(
2377                "native server Handshake packet encryption is waiting for TLS Handshake keys"
2378                    .into(),
2379            ));
2380        };
2381        let packet_number = self.next_server_handshake_packet_number;
2382        self.next_server_handshake_packet_number += 1;
2383        let packet = build_server_crypto_packet(
2384            LongHeaderType::Handshake,
2385            server_handshake_keys,
2386            &self.client_source_cid,
2387            &self.server_source_cid,
2388            packet_number,
2389            crypto_offset,
2390            crypto_data.clone(),
2391        )?;
2392        self.server_handshake_loss_detector
2393            .on_packet_sent_at(packet_number, sent_at);
2394        self.recovery.on_packet_sent(
2395            PacketNumberSpace::Handshake,
2396            packet_number,
2397            SentPacketInfo::new(sent_at, packet.packet.len(), true, true),
2398        );
2399        self.server_handshake_sent_crypto.insert(
2400            packet_number,
2401            SentCryptoPacket {
2402                packet_type: LongHeaderType::Handshake,
2403                crypto_offset,
2404                crypto_data: crypto_data.clone(),
2405            },
2406        );
2407        Ok(packet)
2408    }
2409
2410    fn build_server_application_stream_packet(
2411        &mut self,
2412        stream_id: u64,
2413        data: Bytes,
2414        fin: bool,
2415    ) -> Result<ServerApplicationPacket> {
2416        if data.is_empty() && !fin {
2417            return Err(Error::HttpProtocol(
2418                "native server H3 response produced no payload".into(),
2419            ));
2420        }
2421        let stream_offset = *self.server_stream_offsets.get(&stream_id).unwrap_or(&0);
2422        self.server_application_flow_control.consume_stream_data(
2423            stream_id,
2424            stream_offset,
2425            data.len(),
2426        )?;
2427        let packet = self.build_server_application_stream_packet_at_offset(
2428            stream_id,
2429            stream_offset,
2430            data,
2431            fin,
2432        )?;
2433        self.server_stream_offsets
2434            .insert(stream_id, stream_offset + packet.data.len() as u64);
2435        Ok(packet)
2436    }
2437
2438    fn build_server_application_stream_packet_at_offset(
2439        &mut self,
2440        stream_id: u64,
2441        stream_offset: u64,
2442        data: Bytes,
2443        fin: bool,
2444    ) -> Result<ServerApplicationPacket> {
2445        let Some(server_application_keys) = &self.server_application_keys else {
2446            return Err(Error::Quic(
2447                "native server application packet encryption is waiting for TLS application keys"
2448                    .into(),
2449            ));
2450        };
2451
2452        let packet_number = self.next_server_application_packet_number;
2453        let packet_number_len = 2;
2454        let peer_cid = self.server_outbound_peer_cid();
2455        let frame = encode_frame(&QuicFrame::Stream {
2456            stream_id,
2457            offset: (stream_offset > 0).then_some(stream_offset),
2458            fin,
2459            data: data.clone(),
2460        });
2461        let packet = protect_short_header_packet(
2462            server_application_keys,
2463            &peer_cid,
2464            packet_number,
2465            packet_number_len,
2466            self.write_key_phase,
2467            &frame,
2468        )?;
2469
2470        let now = Instant::now();
2471        let packet_size = packet.len();
2472        self.server_application_loss_detector
2473            .on_packet_sent_at(packet_number, now);
2474        self.server_application_sent_streams.insert(
2475            packet_number,
2476            SentApplicationStreamPacket {
2477                stream_id,
2478                stream_offset,
2479                fin,
2480                data: data.clone(),
2481            },
2482        );
2483        self.recovery.on_packet_sent(
2484            PacketNumberSpace::Application,
2485            packet_number,
2486            SentPacketInfo::new(now, packet_size, true, true),
2487        );
2488        self.next_server_application_packet_number += 1;
2489
2490        Ok(ServerApplicationPacket {
2491            packet,
2492            packet_number,
2493            stream_id,
2494            packet_number_offset: 1 + peer_cid.as_bytes().len(),
2495            data,
2496        })
2497    }
2498
2499    fn build_server_application_control_packet(
2500        &mut self,
2501        frame: QuicFrame,
2502    ) -> Result<ServerApplicationControlPacket> {
2503        self.build_server_application_control_packet_with_min_datagram(frame, 0)
2504    }
2505
2506    fn build_server_application_control_packet_with_min_datagram(
2507        &mut self,
2508        frame: QuicFrame,
2509        min_datagram: usize,
2510    ) -> Result<ServerApplicationControlPacket> {
2511        let Some(server_application_keys) = &self.server_application_keys else {
2512            return Err(Error::Quic(
2513                "native server application packet encryption is waiting for TLS application keys"
2514                    .into(),
2515            ));
2516        };
2517
2518        let packet_number = self.next_server_application_packet_number;
2519        let packet_number_len = 2;
2520        let peer_cid = self.server_outbound_peer_cid();
2521        let short_header_len = 1 + peer_cid.as_bytes().len() + packet_number_len;
2522        let payload = if min_datagram > 0 {
2523            pad_short_header_payload_to_min_datagram(
2524                padded_short_header_payload(encode_frame(&frame)),
2525                short_header_len,
2526                min_datagram,
2527            )
2528        } else {
2529            padded_short_header_payload(encode_frame(&frame))
2530        };
2531        let packet = protect_short_header_packet(
2532            server_application_keys,
2533            &peer_cid,
2534            packet_number,
2535            packet_number_len,
2536            self.write_key_phase,
2537            &payload,
2538        )?;
2539        let now = Instant::now();
2540        let packet_size = packet.len();
2541        self.server_application_loss_detector
2542            .on_packet_sent_at(packet_number, now);
2543        self.recovery.on_packet_sent(
2544            PacketNumberSpace::Application,
2545            packet_number,
2546            SentPacketInfo::new(now, packet_size, true, true),
2547        );
2548        self.next_server_application_packet_number += 1;
2549
2550        Ok(ServerApplicationControlPacket {
2551            packet,
2552            packet_number,
2553            packet_number_offset: 1 + peer_cid.as_bytes().len(),
2554        })
2555    }
2556}
2557
2558impl NativeQuicHandshake {
2559    pub fn client(
2560        server_name: &str,
2561        fingerprint: &Http3Fingerprint,
2562        destination_cid: ConnectionId,
2563        source_cid: ConnectionId,
2564    ) -> Result<Self> {
2565        Self::client_with_verify_peer(server_name, fingerprint, destination_cid, source_cid, true)
2566    }
2567
2568    pub fn client_with_verify_peer(
2569        server_name: &str,
2570        fingerprint: &Http3Fingerprint,
2571        destination_cid: ConnectionId,
2572        source_cid: ConnectionId,
2573        verify_peer: bool,
2574    ) -> Result<Self> {
2575        Self::client_with_tls_fingerprint(
2576            server_name,
2577            fingerprint,
2578            None,
2579            destination_cid,
2580            source_cid,
2581            verify_peer,
2582            &[],
2583            false,
2584        )
2585    }
2586
2587    /// Build a client handshake that optionally replays a cached TLS 1.3
2588    /// session ticket. When `session_der` is `Some`, the ClientHello is emitted
2589    /// via the resumption path (`client_with_replayed_session_ticket`);
2590    /// otherwise it falls through to the ordinary first-handshake constructor.
2591    #[allow(clippy::too_many_arguments)]
2592    pub fn client_with_tls_fingerprint_and_session(
2593        server_name: &str,
2594        fingerprint: &Http3Fingerprint,
2595        tls_fingerprint: Option<&TlsFingerprint>,
2596        destination_cid: ConnectionId,
2597        source_cid: ConnectionId,
2598        verify_peer: bool,
2599        root_certs: &[Vec<u8>],
2600        use_platform_roots: bool,
2601        session_der: Option<&[u8]>,
2602    ) -> Result<Self> {
2603        match session_der {
2604            Some(session_der) => Self::client_with_replayed_session_ticket(
2605                server_name,
2606                fingerprint,
2607                tls_fingerprint,
2608                destination_cid,
2609                source_cid,
2610                verify_peer,
2611                root_certs,
2612                use_platform_roots,
2613                session_der,
2614            ),
2615            None => Self::client_with_tls_fingerprint(
2616                server_name,
2617                fingerprint,
2618                tls_fingerprint,
2619                destination_cid,
2620                source_cid,
2621                verify_peer,
2622                root_certs,
2623                use_platform_roots,
2624            ),
2625        }
2626    }
2627
2628    #[allow(clippy::too_many_arguments)]
2629    pub fn client_with_tls_fingerprint_and_zero_rtt_request(
2630        server_name: &str,
2631        fingerprint: &Http3Fingerprint,
2632        tls_fingerprint: Option<&TlsFingerprint>,
2633        destination_cid: ConnectionId,
2634        source_cid: ConnectionId,
2635        verify_peer: bool,
2636        root_certs: &[Vec<u8>],
2637        use_platform_roots: bool,
2638        session_der: &[u8],
2639        early_data: &[u8],
2640    ) -> Result<Self> {
2641        let initial_keys = derive_initial_key_material(destination_cid.as_bytes())?;
2642        let mut tls =
2643            NativeQuicTlsSession::client_with_initial_source_connection_id_and_zero_rtt_offer(
2644                server_name,
2645                fingerprint,
2646                &source_cid,
2647                tls_fingerprint,
2648                verify_peer,
2649                root_certs,
2650                use_platform_roots,
2651                session_der,
2652                early_data,
2653            )?;
2654        let client_initial = build_client_initial_packet_from_capture_with_size(
2655            tls.take_client_initial(),
2656            destination_cid.clone(),
2657            source_cid.clone(),
2658            fingerprint.transport.initial_datagram_size,
2659        )?;
2660        let client_early_data_keys = client_initial
2661            .secrets
2662            .iter()
2663            .find(|secret| {
2664                secret.direction == QuicSecretDirection::Write
2665                    && secret.level == QuicEncryptionLevel::EarlyData
2666            })
2667            .map(QuicTlsSecret::packet_key_material)
2668            .transpose()?;
2669        let client_cid_inventory = new_client_cid_inventory(fingerprint, &source_cid);
2670
2671        Ok(Self {
2672            client_initial,
2673            pending_client_initial: None,
2674            tls,
2675            fingerprint: fingerprint.clone(),
2676            server_name: server_name.to_string(),
2677            tls_fingerprint: tls_fingerprint.cloned(),
2678            verify_peer,
2679            root_certs: root_certs.to_vec(),
2680            use_platform_roots,
2681            supported_versions: vec![QUIC_VERSION_1],
2682            client_initial_version: QUIC_VERSION_1,
2683            retry_received: false,
2684            vn_received: false,
2685            server_initial_or_handshake_seen: false,
2686            original_destination_cid: destination_cid.clone(),
2687            retry_source_cid: None,
2688            destination_cid,
2689            source_cid,
2690            client_initial_keys: initial_keys.client,
2691            server_initial_keys: initial_keys.server,
2692            client_handshake_keys: None,
2693            client_early_data_keys,
2694            server_handshake_keys: None,
2695            client_application_keys: None,
2696            server_application_keys: None,
2697            client_application_next_keys: None,
2698            server_application_next_keys: None,
2699            server_application_previous: None,
2700            write_key_phase: false,
2701            read_key_phase: false,
2702            application_key_update: OneRttKeyUpdate::default(),
2703            initial_crypto: QuicCryptoAssembler::default(),
2704            handshake_crypto: QuicCryptoAssembler::default(),
2705            initial_ack_tracker: QuicAckTracker::default(),
2706            handshake_ack_tracker: QuicAckTracker::default(),
2707            application_ack_tracker: QuicAckTracker::default(),
2708            client_initial_loss_detector: QuicLossDetector::default(),
2709            client_handshake_loss_detector: QuicLossDetector::default(),
2710            client_application_loss_detector: QuicLossDetector::default(),
2711            client_application_flow_control: QuicApplicationFlowControl::client(
2712                &fingerprint.transport,
2713            ),
2714            client_application_receive_flow_control: QuicReceiveFlowControl::client(
2715                &fingerprint.transport,
2716            ),
2717            client_initial_sent_crypto: BTreeMap::new(),
2718            client_handshake_sent_crypto: BTreeMap::new(),
2719            client_application_sent_streams: BTreeMap::new(),
2720            client_application_recovery_lost_packets: Vec::new(),
2721            client_application_ecn_congestion: false,
2722            client_path_validator: QuicPathValidator::default(),
2723            client_cid_inventory,
2724            client_pmtu_probe: QuicPmtuProbePolicy::from_transport(&fingerprint.transport),
2725            server_transport_parameters_validated: false,
2726            recovery: recovery_state_from_transport(&fingerprint.transport),
2727            next_client_initial_packet_number: 1,
2728            next_server_initial_packet_number: 0,
2729            next_server_handshake_packet_number: 0,
2730            next_client_handshake_packet_number: 0,
2731            next_server_application_packet_number: 0,
2732            next_client_application_packet_number: 0,
2733            next_client_bidirectional_stream_id: 0,
2734            next_client_unidirectional_stream_id: 2,
2735            client_handshake_crypto_offset: 0,
2736            client_stream_offsets: BTreeMap::new(),
2737            server_h3_stream_buffers: BTreeMap::new(),
2738            server_h3_stream_buffer_offsets: BTreeMap::new(),
2739            server_h3_stream_types: BTreeMap::new(),
2740            close_draining: false,
2741            close_state: QuicCloseState::default(),
2742        })
2743    }
2744
2745    #[allow(clippy::too_many_arguments)]
2746    pub fn client_with_tls_fingerprint(
2747        server_name: &str,
2748        fingerprint: &Http3Fingerprint,
2749        tls_fingerprint: Option<&TlsFingerprint>,
2750        destination_cid: ConnectionId,
2751        source_cid: ConnectionId,
2752        verify_peer: bool,
2753        root_certs: &[Vec<u8>],
2754        use_platform_roots: bool,
2755    ) -> Result<Self> {
2756        let initial_keys = derive_initial_key_material(destination_cid.as_bytes())?;
2757        let mut tls =
2758            NativeQuicTlsSession::client_with_initial_source_connection_id_and_verify_peer(
2759                server_name,
2760                fingerprint,
2761                &source_cid,
2762                tls_fingerprint,
2763                verify_peer,
2764                root_certs,
2765                use_platform_roots,
2766            )?;
2767        let client_initial = build_client_initial_packet_from_capture_with_size(
2768            tls.take_client_initial(),
2769            destination_cid.clone(),
2770            source_cid.clone(),
2771            fingerprint.transport.initial_datagram_size,
2772        )?;
2773        let client_cid_inventory = new_client_cid_inventory(fingerprint, &source_cid);
2774
2775        Ok(Self {
2776            client_initial,
2777            pending_client_initial: None,
2778            tls,
2779            fingerprint: fingerprint.clone(),
2780            server_name: server_name.to_string(),
2781            tls_fingerprint: tls_fingerprint.cloned(),
2782            verify_peer,
2783            root_certs: root_certs.to_vec(),
2784            use_platform_roots,
2785            supported_versions: vec![QUIC_VERSION_1],
2786            client_initial_version: QUIC_VERSION_1,
2787            retry_received: false,
2788            vn_received: false,
2789            server_initial_or_handshake_seen: false,
2790            original_destination_cid: destination_cid.clone(),
2791            retry_source_cid: None,
2792            destination_cid,
2793            source_cid,
2794            client_initial_keys: initial_keys.client,
2795            server_initial_keys: initial_keys.server,
2796            client_handshake_keys: None,
2797            client_early_data_keys: None,
2798            server_handshake_keys: None,
2799            client_application_keys: None,
2800            server_application_keys: None,
2801            client_application_next_keys: None,
2802            server_application_next_keys: None,
2803            server_application_previous: None,
2804            write_key_phase: false,
2805            read_key_phase: false,
2806            application_key_update: OneRttKeyUpdate::default(),
2807            initial_crypto: QuicCryptoAssembler::default(),
2808            handshake_crypto: QuicCryptoAssembler::default(),
2809            initial_ack_tracker: QuicAckTracker::default(),
2810            handshake_ack_tracker: QuicAckTracker::default(),
2811            application_ack_tracker: QuicAckTracker::default(),
2812            client_initial_loss_detector: QuicLossDetector::default(),
2813            client_handshake_loss_detector: QuicLossDetector::default(),
2814            client_application_loss_detector: QuicLossDetector::default(),
2815            client_application_flow_control: QuicApplicationFlowControl::client(
2816                &fingerprint.transport,
2817            ),
2818            client_application_receive_flow_control: QuicReceiveFlowControl::client(
2819                &fingerprint.transport,
2820            ),
2821            client_initial_sent_crypto: BTreeMap::new(),
2822            client_handshake_sent_crypto: BTreeMap::new(),
2823            client_application_sent_streams: BTreeMap::new(),
2824            client_application_recovery_lost_packets: Vec::new(),
2825            client_application_ecn_congestion: false,
2826            client_path_validator: QuicPathValidator::default(),
2827            client_cid_inventory,
2828            client_pmtu_probe: QuicPmtuProbePolicy::from_transport(&fingerprint.transport),
2829            server_transport_parameters_validated: false,
2830            recovery: recovery_state_from_transport(&fingerprint.transport),
2831            next_client_initial_packet_number: 1,
2832            next_server_initial_packet_number: 0,
2833            next_server_handshake_packet_number: 0,
2834            next_client_handshake_packet_number: 0,
2835            next_server_application_packet_number: 0,
2836            next_client_application_packet_number: 0,
2837            next_client_bidirectional_stream_id: 0,
2838            next_client_unidirectional_stream_id: 2,
2839            client_handshake_crypto_offset: 0,
2840            client_stream_offsets: BTreeMap::new(),
2841            server_h3_stream_buffers: BTreeMap::new(),
2842            server_h3_stream_buffer_offsets: BTreeMap::new(),
2843            server_h3_stream_types: BTreeMap::new(),
2844            close_draining: false,
2845            close_state: QuicCloseState::default(),
2846        })
2847    }
2848
2849    #[allow(clippy::too_many_arguments)]
2850    pub fn client_with_replayed_session_ticket(
2851        server_name: &str,
2852        fingerprint: &Http3Fingerprint,
2853        tls_fingerprint: Option<&TlsFingerprint>,
2854        destination_cid: ConnectionId,
2855        source_cid: ConnectionId,
2856        verify_peer: bool,
2857        root_certs: &[Vec<u8>],
2858        use_platform_roots: bool,
2859        session_ticket_der: &[u8],
2860    ) -> Result<Self> {
2861        let initial_keys = derive_initial_key_material(destination_cid.as_bytes())?;
2862        let mut tls =
2863            NativeQuicTlsSession::client_with_initial_source_connection_id_and_replayed_session(
2864                server_name,
2865                fingerprint,
2866                &source_cid,
2867                tls_fingerprint,
2868                verify_peer,
2869                root_certs,
2870                use_platform_roots,
2871                session_ticket_der,
2872            )?;
2873        let client_initial = build_client_initial_packet_from_capture_with_size(
2874            tls.take_client_initial(),
2875            destination_cid.clone(),
2876            source_cid.clone(),
2877            fingerprint.transport.initial_datagram_size,
2878        )?;
2879        let client_cid_inventory = new_client_cid_inventory(fingerprint, &source_cid);
2880
2881        Ok(Self {
2882            client_initial,
2883            pending_client_initial: None,
2884            tls,
2885            fingerprint: fingerprint.clone(),
2886            server_name: server_name.to_string(),
2887            tls_fingerprint: tls_fingerprint.cloned(),
2888            verify_peer,
2889            root_certs: root_certs.to_vec(),
2890            use_platform_roots,
2891            supported_versions: vec![QUIC_VERSION_1],
2892            client_initial_version: QUIC_VERSION_1,
2893            retry_received: false,
2894            vn_received: false,
2895            server_initial_or_handshake_seen: false,
2896            original_destination_cid: destination_cid.clone(),
2897            retry_source_cid: None,
2898            destination_cid,
2899            source_cid,
2900            client_initial_keys: initial_keys.client,
2901            server_initial_keys: initial_keys.server,
2902            client_handshake_keys: None,
2903            client_early_data_keys: None,
2904            server_handshake_keys: None,
2905            client_application_keys: None,
2906            server_application_keys: None,
2907            client_application_next_keys: None,
2908            server_application_next_keys: None,
2909            server_application_previous: None,
2910            write_key_phase: false,
2911            read_key_phase: false,
2912            application_key_update: OneRttKeyUpdate::default(),
2913            initial_crypto: QuicCryptoAssembler::default(),
2914            handshake_crypto: QuicCryptoAssembler::default(),
2915            initial_ack_tracker: QuicAckTracker::default(),
2916            handshake_ack_tracker: QuicAckTracker::default(),
2917            application_ack_tracker: QuicAckTracker::default(),
2918            client_initial_loss_detector: QuicLossDetector::default(),
2919            client_handshake_loss_detector: QuicLossDetector::default(),
2920            client_application_loss_detector: QuicLossDetector::default(),
2921            client_application_flow_control: QuicApplicationFlowControl::client(
2922                &fingerprint.transport,
2923            ),
2924            client_application_receive_flow_control: QuicReceiveFlowControl::client(
2925                &fingerprint.transport,
2926            ),
2927            client_initial_sent_crypto: BTreeMap::new(),
2928            client_handshake_sent_crypto: BTreeMap::new(),
2929            client_application_sent_streams: BTreeMap::new(),
2930            client_application_recovery_lost_packets: Vec::new(),
2931            client_application_ecn_congestion: false,
2932            client_path_validator: QuicPathValidator::default(),
2933            client_cid_inventory,
2934            client_pmtu_probe: QuicPmtuProbePolicy::from_transport(&fingerprint.transport),
2935            server_transport_parameters_validated: false,
2936            recovery: recovery_state_from_transport(&fingerprint.transport),
2937            next_client_initial_packet_number: 1,
2938            next_server_initial_packet_number: 0,
2939            next_server_handshake_packet_number: 0,
2940            next_client_handshake_packet_number: 0,
2941            next_server_application_packet_number: 0,
2942            next_client_application_packet_number: 0,
2943            next_client_bidirectional_stream_id: 0,
2944            next_client_unidirectional_stream_id: 2,
2945            client_handshake_crypto_offset: 0,
2946            client_stream_offsets: BTreeMap::new(),
2947            server_h3_stream_buffers: BTreeMap::new(),
2948            server_h3_stream_buffer_offsets: BTreeMap::new(),
2949            server_h3_stream_types: BTreeMap::new(),
2950            close_draining: false,
2951            close_state: QuicCloseState::default(),
2952        })
2953    }
2954
2955    pub fn take_session_tickets(&mut self) -> Vec<NativeH3SessionTicket> {
2956        self.tls.take_session_tickets()
2957    }
2958
2959    /// Native HTTP/3 TLS 1.3 resumption / QUIC 0-RTT status for this handshake.
2960    ///
2961    /// Combines `SSL_session_reused`, `SSL_early_data_accepted`, and the
2962    /// per-session 0-RTT offer flag into [`NativeH3HandshakeStatus`]. Stable
2963    /// once the handshake has produced application secrets per RFC 9001
2964    /// section 4.6.
2965    pub fn handshake_status(&self) -> NativeH3HandshakeStatus {
2966        self.tls.handshake_status()
2967    }
2968
2969    /// BoringSSL `SSL_get_early_data_reason` code (e.g. `ssl_early_data_accepted = 2`,
2970    /// `ssl_early_data_quic_parameter_mismatch = 13`) for diagnostic logging.
2971    pub fn early_data_reason(&self) -> u32 {
2972        self.tls.early_data_reason()
2973    }
2974
2975    pub fn client_initial(&self) -> &ClientInitialPacket {
2976        &self.client_initial
2977    }
2978
2979    pub fn take_pending_client_initial(&mut self) -> Option<ClientInitialPacket> {
2980        self.pending_client_initial.take()
2981    }
2982
2983    pub fn supported_versions(&self) -> &[u32] {
2984        &self.supported_versions
2985    }
2986
2987    pub fn set_supported_versions(&mut self, versions: Vec<u32>) -> Result<()> {
2988        if versions.is_empty() {
2989            return Err(Error::Quic(
2990                "native H3 supported QUIC versions list cannot be empty".into(),
2991            ));
2992        }
2993        if !versions.contains(&self.client_initial_version) {
2994            return Err(Error::Quic(
2995                "native H3 supported QUIC versions must include the issued initial version".into(),
2996            ));
2997        }
2998        self.supported_versions = versions;
2999        Ok(())
3000    }
3001
3002    pub fn client_initial_version(&self) -> u32 {
3003        self.client_initial_version
3004    }
3005
3006    pub fn retry_received(&self) -> bool {
3007        self.retry_received
3008    }
3009
3010    pub fn version_negotiation_received(&self) -> bool {
3011        self.vn_received
3012    }
3013
3014    pub fn install_tls_secrets(&mut self, secrets: &[QuicTlsSecret]) -> Result<()> {
3015        for secret in secrets {
3016            if secret.direction == QuicSecretDirection::Read
3017                && secret.level == QuicEncryptionLevel::Handshake
3018            {
3019                self.server_handshake_keys = Some(secret.packet_key_material()?);
3020            } else if secret.direction == QuicSecretDirection::Write
3021                && secret.level == QuicEncryptionLevel::EarlyData
3022            {
3023                self.client_early_data_keys = Some(secret.packet_key_material()?);
3024            } else if secret.direction == QuicSecretDirection::Write
3025                && secret.level == QuicEncryptionLevel::Handshake
3026            {
3027                self.client_handshake_keys = Some(secret.packet_key_material()?);
3028            } else if secret.direction == QuicSecretDirection::Read
3029                && secret.level == QuicEncryptionLevel::Application
3030            {
3031                let keys = secret.packet_key_material()?;
3032                self.server_application_next_keys = Some(derive_next_packet_key_material(&keys)?);
3033                self.server_application_keys = Some(keys);
3034            } else if secret.direction == QuicSecretDirection::Write
3035                && secret.level == QuicEncryptionLevel::Application
3036            {
3037                let keys = secret.packet_key_material()?;
3038                self.client_application_next_keys = Some(derive_next_packet_key_material(&keys)?);
3039                self.client_application_keys = Some(keys);
3040            }
3041        }
3042        if self.is_application_ready() && !self.recovery.handshake_complete() {
3043            self.recovery.discard_space(PacketNumberSpace::Initial);
3044            self.recovery.discard_space(PacketNumberSpace::Handshake);
3045            self.recovery.mark_handshake_complete();
3046        }
3047        Ok(())
3048    }
3049
3050    pub fn server_handshake_keys(&self) -> Option<&QuicPacketKeyMaterial> {
3051        self.server_handshake_keys.as_ref()
3052    }
3053
3054    pub fn is_application_ready(&self) -> bool {
3055        self.client_application_keys.is_some() && self.server_application_keys.is_some()
3056    }
3057
3058    pub fn is_close_draining(&self) -> bool {
3059        self.close_draining
3060    }
3061
3062    pub fn close_state(&self) -> &QuicCloseState {
3063        &self.close_state
3064    }
3065
3066    pub fn close_state_mut(&mut self) -> &mut QuicCloseState {
3067        &mut self.close_state
3068    }
3069
3070    /// RFC9000 § 10.2 closing: called by the client driver after emitting a
3071    /// CONNECTION_CLOSE frame to suppress further outbound application data
3072    /// and anchor the close timer at `now`.
3073    pub fn client_enter_closing(&mut self, now: Instant) {
3074        self.close_state.enter_closing(now);
3075        self.close_draining = true;
3076    }
3077
3078    /// RFC9000 § 10.2 draining: called when the client driver wants to
3079    /// enter draining explicitly. Peer CONNECTION_CLOSE handling in
3080    /// `open_client_h3_event_packet` also drives this transition.
3081    pub fn client_enter_draining(&mut self, now: Instant) {
3082        self.close_state.enter_draining(now);
3083        self.close_draining = true;
3084    }
3085
3086    /// Returns the RFC9000 § 10.2 close window derived from the client
3087    /// application-space loss detector via RFC9002 § 6.2.1 `current_PTO * 3`.
3088    pub fn client_close_window(&self) -> Duration {
3089        self.client_application_loss_detector.close_window()
3090    }
3091
3092    pub fn client_is_close_window_expired(&self, now: Instant) -> bool {
3093        self.close_state.is_expired(now, self.client_close_window())
3094    }
3095
3096    pub fn client_close_time_until_expiry(&self, now: Instant) -> Option<Duration> {
3097        self.close_state
3098            .time_until_expiry(now, self.client_close_window())
3099    }
3100
3101    pub fn client_should_replay_connection_close(&self, now: Instant) -> bool {
3102        self.close_state.should_replay(now)
3103    }
3104
3105    pub fn client_mark_connection_close_replayed(&mut self, now: Instant) {
3106        self.close_state.mark_replayed(now);
3107    }
3108
3109    pub fn client_observe_inbound_packet_for_close(&mut self) -> u64 {
3110        self.close_state.observe_inbound_packet()
3111    }
3112
3113    pub fn client_application_pto(&self) -> Duration {
3114        self.client_application_loss_detector.current_pto()
3115    }
3116
3117    /// Current write-side key phase bit per RFC9001 § 6.
3118    pub fn write_key_phase(&self) -> bool {
3119        self.write_key_phase
3120    }
3121
3122    /// Current read-side key phase bit per RFC9001 § 6.
3123    pub fn read_key_phase(&self) -> bool {
3124        self.read_key_phase
3125    }
3126
3127    /// Whether a locally-initiated key update is currently waiting for an ACK
3128    /// of a packet sent at the new write phase per RFC9001 § 6.5.
3129    pub fn key_update_in_progress(&self) -> bool {
3130        self.application_key_update.write_update_in_progress
3131    }
3132
3133    /// Force a 1-RTT key update. Returns an error when the previous local key
3134    /// update has not yet been confirmed via ACK (RFC9001 § 6.5).
3135    pub fn force_key_update(&mut self) -> Result<()> {
3136        if self.application_key_update.write_update_in_progress {
3137            return Err(Error::Quic(
3138                "RFC9001 § 6.5: cannot initiate a new key update while a previous one is unconfirmed"
3139                    .into(),
3140            ));
3141        }
3142        let next = self.client_application_next_keys.take().ok_or_else(|| {
3143            Error::Quic(
3144                "native QUIC client cannot force a key update before TLS application secrets are installed"
3145                    .into(),
3146            )
3147        })?;
3148        self.client_application_keys = Some(next);
3149        let new_current = self
3150            .client_application_keys
3151            .as_ref()
3152            .expect("client application keys just installed");
3153        self.client_application_next_keys = Some(derive_next_packet_key_material(new_current)?);
3154        self.write_key_phase = !self.write_key_phase;
3155        self.application_key_update.write_update_in_progress = true;
3156        self.application_key_update.write_update_anchor =
3157            Some(self.next_client_application_packet_number);
3158        Ok(())
3159    }
3160
3161    fn commit_receive_key_update(&mut self, now: Instant) -> Result<()> {
3162        let Some(current) = self.server_application_keys.take() else {
3163            return Err(Error::Quic(
3164                "native QUIC client cannot rotate read keys without an installed current key set"
3165                    .into(),
3166            ));
3167        };
3168        let Some(next) = self.server_application_next_keys.take() else {
3169            return Err(Error::Quic(
3170                "native QUIC client cannot rotate read keys without precomputed next key set"
3171                    .into(),
3172            ));
3173        };
3174        let old_phase = self.read_key_phase;
3175        self.server_application_keys = Some(next);
3176        let new_current = self
3177            .server_application_keys
3178            .as_ref()
3179            .expect("server application keys just installed");
3180        self.server_application_next_keys = Some(derive_next_packet_key_material(new_current)?);
3181        self.server_application_previous = Some(PreviousKeys {
3182            keys: current,
3183            phase: old_phase,
3184            retire_at: now + PREVIOUS_KEY_WINDOW,
3185        });
3186        self.read_key_phase = !self.read_key_phase;
3187
3188        if self.write_key_phase != self.read_key_phase
3189            && !self.application_key_update.write_update_in_progress
3190        {
3191            let next_write = self.client_application_next_keys.take().ok_or_else(|| {
3192                Error::Quic(
3193                    "native QUIC client cannot mirror peer key update without precomputed next write keys"
3194                        .into(),
3195                )
3196            })?;
3197            self.client_application_keys = Some(next_write);
3198            let new_current_write = self
3199                .client_application_keys
3200                .as_ref()
3201                .expect("client application keys just rotated");
3202            self.client_application_next_keys =
3203                Some(derive_next_packet_key_material(new_current_write)?);
3204            self.write_key_phase = !self.write_key_phase;
3205            self.application_key_update.write_update_in_progress = true;
3206            self.application_key_update.write_update_anchor =
3207                Some(self.next_client_application_packet_number);
3208        }
3209
3210        Ok(())
3211    }
3212
3213    pub fn client_path_validation_pending_count(&self) -> usize {
3214        self.client_path_validator.pending_count()
3215    }
3216
3217    pub fn is_client_path_validated(&self, data: &[u8; 8]) -> bool {
3218        self.client_path_validator.is_validated(data)
3219    }
3220
3221    pub fn is_client_path_address_validated(&self, remote_address: &SocketAddr) -> bool {
3222        self.client_path_validator
3223            .is_address_validated(remote_address)
3224    }
3225
3226    pub fn client_path_migration_connection_id(
3227        &self,
3228        remote_address: &SocketAddr,
3229    ) -> Option<&ConnectionId> {
3230        self.client_path_validator
3231            .migration_connection_id(remote_address)
3232    }
3233
3234    pub fn client_pmtu_current_size(&self) -> usize {
3235        self.client_pmtu_probe.current_size()
3236    }
3237
3238    pub fn client_pmtu_pending_probe_size(&self) -> Option<usize> {
3239        self.client_pmtu_probe.pending_probe_size()
3240    }
3241
3242    pub fn client_application_lost_packets(&self) -> Vec<u64> {
3243        self.client_application_loss_detector.lost_packets()
3244    }
3245
3246    pub fn take_client_application_ecn_congestion(&mut self) -> bool {
3247        std::mem::take(&mut self.client_application_ecn_congestion)
3248    }
3249
3250    /// Smoothed RTT observed on the application packet number space, as
3251    /// updated by RFC9002 § 5.3 from inbound ACK frames. Read-only consumer
3252    /// of the loss detector's RTT estimator.
3253    pub fn client_application_smoothed_rtt(&self) -> Option<Duration> {
3254        self.client_application_loss_detector.smoothed_rtt()
3255    }
3256
3257    /// Minimum RTT observed on the application packet number space.
3258    /// Read-only consumer of the loss detector's RTT estimator.
3259    pub fn client_application_min_rtt(&self) -> Option<Duration> {
3260        self.client_application_loss_detector.min_rtt()
3261    }
3262
3263    pub fn retransmit_lost_client_application_stream_packets(
3264        &mut self,
3265    ) -> Result<Vec<ClientApplicationPacket>> {
3266        let mut lost_packets = self.client_application_loss_detector.lost_packets();
3267        lost_packets.append(&mut self.client_application_recovery_lost_packets);
3268        self.retransmit_client_application_stream_packets(lost_packets)
3269    }
3270
3271    pub fn retransmit_pto_client_application_stream_packets(
3272        &mut self,
3273        now: Instant,
3274        pto_timeout: Duration,
3275    ) -> Result<Vec<ClientApplicationPacket>> {
3276        let expired_packets = self
3277            .client_application_loss_detector
3278            .pto_expired_packets(now, pto_timeout);
3279        self.retransmit_client_application_stream_packets(expired_packets)
3280    }
3281
3282    fn retransmit_client_application_stream_packets<I>(
3283        &mut self,
3284        packet_numbers: I,
3285    ) -> Result<Vec<ClientApplicationPacket>>
3286    where
3287        I: IntoIterator<Item = u64>,
3288    {
3289        let mut packet_numbers = packet_numbers.into_iter().collect::<Vec<_>>();
3290        packet_numbers.sort_unstable();
3291        packet_numbers.dedup();
3292        let mut retransmits = Vec::new();
3293        for packet_number in packet_numbers {
3294            self.client_application_loss_detector
3295                .retire_packet(packet_number);
3296            let Some(sent) = self.client_application_sent_streams.remove(&packet_number) else {
3297                continue;
3298            };
3299            retransmits.push(self.build_client_application_stream_packet_at_offset(
3300                sent.stream_id,
3301                sent.stream_offset,
3302                sent.data,
3303                sent.fin,
3304            )?);
3305        }
3306        Ok(retransmits)
3307    }
3308
3309    pub fn build_client_initial_ack_packet(&mut self) -> Result<Option<ClientAckPacket>> {
3310        let packet = build_ack_packet(
3311            LongHeaderType::Initial,
3312            &self.client_initial_keys,
3313            &self.destination_cid,
3314            &self.source_cid,
3315            &mut self.initial_ack_tracker,
3316            self.next_client_initial_packet_number,
3317        )?;
3318        if packet.is_some() {
3319            self.next_client_initial_packet_number += 1;
3320        }
3321        Ok(packet)
3322    }
3323
3324    pub fn build_client_handshake_ack_packet(&mut self) -> Result<Option<ClientAckPacket>> {
3325        if self.handshake_ack_tracker.is_empty() {
3326            return Ok(None);
3327        }
3328        let Some(client_handshake_keys) = &self.client_handshake_keys else {
3329            return Err(Error::Quic(
3330                "native Handshake ACK encryption is waiting for TLS Handshake keys".into(),
3331            ));
3332        };
3333        let packet = build_ack_packet(
3334            LongHeaderType::Handshake,
3335            client_handshake_keys,
3336            &self.destination_cid,
3337            &self.source_cid,
3338            &mut self.handshake_ack_tracker,
3339            self.next_client_handshake_packet_number,
3340        )?;
3341        if packet.is_some() {
3342            self.next_client_handshake_packet_number += 1;
3343        }
3344        Ok(packet)
3345    }
3346
3347    pub fn build_client_application_ack_packet(
3348        &mut self,
3349    ) -> Result<Option<ClientApplicationAckPacket>> {
3350        self.build_client_application_ack_packet_with_delay(Instant::now(), 0)
3351    }
3352
3353    pub fn build_client_application_ack_packet_with_delay(
3354        &mut self,
3355        now: Instant,
3356        ack_delay_exponent: u64,
3357    ) -> Result<Option<ClientApplicationAckPacket>> {
3358        if self.application_ack_tracker.is_empty() {
3359            return Ok(None);
3360        }
3361        let Some(client_application_keys) = &self.client_application_keys else {
3362            return Err(Error::Quic(
3363                "native application ACK encryption is waiting for TLS application keys".into(),
3364            ));
3365        };
3366
3367        let packet_number = self.next_client_application_packet_number;
3368        let packet_number_len = 2;
3369        let frame = encode_frame(
3370            &self
3371                .application_ack_tracker
3372                .to_ack_frame_with_delay(now, ack_delay_exponent)?,
3373        );
3374        let packet = protect_short_header_packet(
3375            client_application_keys,
3376            &self.destination_cid,
3377            packet_number,
3378            packet_number_len,
3379            self.write_key_phase,
3380            &frame,
3381        )?;
3382        self.application_ack_tracker.mark_ack_sent();
3383        self.next_client_application_packet_number += 1;
3384
3385        Ok(Some(ClientApplicationAckPacket {
3386            packet,
3387            packet_number,
3388            packet_number_offset: 1 + self.destination_cid.as_bytes().len(),
3389        }))
3390    }
3391
3392    pub fn build_client_application_ack_packet_after(
3393        &mut self,
3394        threshold: usize,
3395    ) -> Result<Option<ClientApplicationAckPacket>> {
3396        if !self.application_ack_tracker.should_ack_after(threshold) {
3397            return Ok(None);
3398        }
3399        self.build_client_application_ack_packet()
3400    }
3401
3402    pub fn build_client_application_ack_packet_after_or_delay(
3403        &mut self,
3404        threshold: usize,
3405        max_ack_delay: Duration,
3406        now: Instant,
3407        ack_delay_exponent: u64,
3408    ) -> Result<Option<ClientApplicationAckPacket>> {
3409        if !self
3410            .application_ack_tracker
3411            .should_ack_after_or_delay(threshold, max_ack_delay, now)
3412        {
3413            return Ok(None);
3414        }
3415        self.build_client_application_ack_packet_with_delay(now, ack_delay_exponent)
3416    }
3417
3418    pub fn client_application_ack_deadline(&self, max_ack_delay: Duration) -> Option<Instant> {
3419        self.application_ack_tracker
3420            .pending_ack_deadline(max_ack_delay)
3421    }
3422
3423    pub fn build_client_handshake_crypto_packet(
3424        &mut self,
3425        crypto_data: Bytes,
3426    ) -> Result<Option<ClientHandshakePacket>> {
3427        if crypto_data.is_empty() {
3428            return Ok(None);
3429        }
3430
3431        let crypto_offset = self.client_handshake_crypto_offset;
3432        let packet =
3433            self.build_client_handshake_crypto_packet_at_offset(crypto_offset, crypto_data)?;
3434        self.client_handshake_crypto_offset += packet.crypto_data.len() as u64;
3435
3436        Ok(Some(packet))
3437    }
3438
3439    pub fn retransmit_pto_client_handshake_crypto_packets(
3440        &mut self,
3441        now: Instant,
3442        pto: Duration,
3443    ) -> Result<Vec<ClientHandshakePacket>> {
3444        let expired_packets = self
3445            .client_handshake_loss_detector
3446            .pto_expired_packets(now, pto);
3447        let mut retransmits = Vec::new();
3448        for packet_number in expired_packets {
3449            self.client_handshake_loss_detector
3450                .retire_packet(packet_number);
3451            let Some(sent) = self.client_handshake_sent_crypto.remove(&packet_number) else {
3452                continue;
3453            };
3454            if sent.packet_type != LongHeaderType::Handshake {
3455                continue;
3456            }
3457            retransmits.push(
3458                self.build_client_handshake_crypto_packet_at_offset_with_sent_at(
3459                    sent.crypto_offset,
3460                    sent.crypto_data,
3461                    now,
3462                )?,
3463            );
3464        }
3465        Ok(retransmits)
3466    }
3467
3468    /// Record that the initial client Initial datagram (or its Retry/VN
3469    /// rebuild) has been handed to the socket. RFC9002 § 6.1 OnPacketSent for
3470    /// the Initial packet number space: tracks the packet for loss/PTO
3471    /// detection and seeds `recovery` so the loss detection timer can arm.
3472    pub fn record_client_initial_sent_at(&mut self, sent_at: Instant) {
3473        let packet_number = self.next_client_initial_packet_number.saturating_sub(1);
3474        if self.client_initial_sent_crypto.contains_key(&packet_number) {
3475            return;
3476        }
3477        let packet_size = self.client_initial.packet.len();
3478        let crypto_data = self.client_initial.crypto_data.clone();
3479        self.client_initial_loss_detector
3480            .on_packet_sent_at(packet_number, sent_at);
3481        self.client_initial_sent_crypto.insert(
3482            packet_number,
3483            SentCryptoPacket {
3484                packet_type: LongHeaderType::Initial,
3485                crypto_offset: 0,
3486                crypto_data,
3487            },
3488        );
3489        self.recovery.on_packet_sent(
3490            PacketNumberSpace::Initial,
3491            packet_number,
3492            SentPacketInfo::new(sent_at, packet_size, true, true),
3493        );
3494    }
3495
3496    /// Retransmit Initial CRYPTO whose PTO has expired. RFC9002 § 6.2.4
3497    /// triggers a probe by resending the unacknowledged CRYPTO bytes with a
3498    /// fresh Initial packet number, preserving CRYPTO offsets so the peer
3499    /// reassembler accepts the duplicate.
3500    pub fn retransmit_pto_client_initial_crypto_packets(
3501        &mut self,
3502        now: Instant,
3503        pto: Duration,
3504    ) -> Result<Vec<ClientInitialPacket>> {
3505        let expired_packets = self
3506            .client_initial_loss_detector
3507            .pto_expired_packets(now, pto);
3508        let mut retransmits = Vec::new();
3509        for packet_number in expired_packets {
3510            self.client_initial_loss_detector
3511                .retire_packet(packet_number);
3512            let Some(sent) = self.client_initial_sent_crypto.remove(&packet_number) else {
3513                continue;
3514            };
3515            if sent.packet_type != LongHeaderType::Initial {
3516                continue;
3517            }
3518            retransmits.push(self.build_client_initial_crypto_pto_packet(sent.crypto_data, now)?);
3519        }
3520        Ok(retransmits)
3521    }
3522
3523    fn build_client_initial_crypto_pto_packet(
3524        &mut self,
3525        crypto_data: Bytes,
3526        sent_at: Instant,
3527    ) -> Result<ClientInitialPacket> {
3528        let packet_number = self.next_client_initial_packet_number;
3529        let token = decode_long_header(&self.client_initial.header)?.token;
3530        let packet = build_client_initial_packet_with_token_and_version(
3531            &self.fingerprint,
3532            crypto_data.clone(),
3533            self.client_initial.transport_parameters.clone(),
3534            self.client_initial.secrets.clone(),
3535            self.destination_cid.clone(),
3536            self.source_cid.clone(),
3537            token,
3538            packet_number,
3539            self.client_initial_version,
3540        )?;
3541        let packet_size = packet.packet.len();
3542        self.next_client_initial_packet_number = packet_number + 1;
3543        self.client_initial_loss_detector
3544            .on_packet_sent_at(packet_number, sent_at);
3545        self.client_initial_sent_crypto.insert(
3546            packet_number,
3547            SentCryptoPacket {
3548                packet_type: LongHeaderType::Initial,
3549                crypto_offset: 0,
3550                crypto_data,
3551            },
3552        );
3553        self.recovery.on_packet_sent(
3554            PacketNumberSpace::Initial,
3555            packet_number,
3556            SentPacketInfo::new(sent_at, packet_size, true, true),
3557        );
3558        Ok(packet)
3559    }
3560
3561    /// Read-only access to the RFC9002 packet-space recovery state for tests
3562    /// and the H3 driver loss-detection timer wakeup.
3563    pub fn recovery(&self) -> &RecoveryState {
3564        &self.recovery
3565    }
3566
3567    /// Driver hook: when the loss detection timer fires, call this to either
3568    /// declare time-threshold losses (RFC9002 § 6.1.2) or schedule a PTO probe
3569    /// in the earliest in-flight space.
3570    pub fn on_loss_detection_timeout(&mut self, now: Instant) -> LossDetectionOutcome {
3571        self.recovery.on_loss_detection_timeout(now)
3572    }
3573
3574    /// Convenience for the driver: where to schedule the next loss detection
3575    /// timer wakeup, if any.
3576    pub fn loss_detection_timer(&self) -> Option<Instant> {
3577        self.recovery.loss_detection_timer()
3578    }
3579
3580    /// Current PTO duration (`smoothed_rtt + max(4*rttvar, kGranularity)`)
3581    /// applied to the Application space after handshake confirmation.
3582    pub fn application_pto(&self) -> Duration {
3583        self.recovery.current_pto()
3584    }
3585
3586    /// Application packet-number-space PTO including peer max_ack_delay and
3587    /// current RFC9002 PTO backoff.
3588    pub fn application_pto_timeout(&self) -> Duration {
3589        let max_ack_delay = self.recovery.max_ack_delay();
3590        let backoff = 1u32 << self.recovery.pto_count().min(31);
3591        self.recovery
3592            .current_pto()
3593            .saturating_add(max_ack_delay.saturating_mul(backoff))
3594    }
3595
3596    /// Marks Handshake confirmation so Application PTO includes `max_ack_delay`
3597    /// and the loss detection timer is rearmed. Idempotent.
3598    pub fn mark_handshake_confirmed(&mut self) {
3599        self.recovery.mark_handshake_complete();
3600    }
3601
3602    /// Discards an entire packet-number space per RFC9002 § 6.4 (e.g. when
3603    /// Handshake keys install or HANDSHAKE_DONE is received). Resets
3604    /// `pto_count` and returns bytes_in_flight credit to the congestion
3605    /// controller.
3606    pub fn discard_packet_space(&mut self, space: PacketNumberSpace) {
3607        self.recovery.discard_space(space);
3608    }
3609
3610    fn build_client_handshake_crypto_packet_at_offset(
3611        &mut self,
3612        crypto_offset: u64,
3613        crypto_data: Bytes,
3614    ) -> Result<ClientHandshakePacket> {
3615        self.build_client_handshake_crypto_packet_at_offset_with_sent_at(
3616            crypto_offset,
3617            crypto_data,
3618            Instant::now(),
3619        )
3620    }
3621
3622    fn build_client_handshake_crypto_packet_at_offset_with_sent_at(
3623        &mut self,
3624        crypto_offset: u64,
3625        crypto_data: Bytes,
3626        sent_at: Instant,
3627    ) -> Result<ClientHandshakePacket> {
3628        let Some(client_handshake_keys) = &self.client_handshake_keys else {
3629            return Err(Error::Quic(
3630                "native Handshake packet encryption is waiting for TLS Handshake keys".into(),
3631            ));
3632        };
3633
3634        let packet_number = self.next_client_handshake_packet_number;
3635        let packet_number_len = 2;
3636        let frame = encode_frame(&QuicFrame::Crypto {
3637            offset: crypto_offset,
3638            data: crypto_data.clone(),
3639        });
3640        let header = encode_long_header(&LongHeaderPacket {
3641            packet_type: LongHeaderType::Handshake,
3642            version: 1,
3643            destination_cid: self.destination_cid.clone(),
3644            source_cid: self.source_cid.clone(),
3645            token: Bytes::new(),
3646            packet_number,
3647            packet_number_len,
3648            payload_len: frame.len() + 16,
3649        })?;
3650        let packet_number_offset = header
3651            .len()
3652            .checked_sub(packet_number_len)
3653            .ok_or_else(|| Error::HttpProtocol("invalid QUIC Handshake header length".into()))?;
3654        let packet = protect_long_header_packet(
3655            client_handshake_keys,
3656            packet_number,
3657            &header,
3658            packet_number_offset,
3659            packet_number_len,
3660            &frame,
3661        )?;
3662
3663        let packet_size = packet.len();
3664        self.next_client_handshake_packet_number += 1;
3665        self.client_handshake_loss_detector
3666            .on_packet_sent_at(packet_number, sent_at);
3667        self.client_handshake_sent_crypto.insert(
3668            packet_number,
3669            SentCryptoPacket {
3670                packet_type: LongHeaderType::Handshake,
3671                crypto_offset,
3672                crypto_data: crypto_data.clone(),
3673            },
3674        );
3675        self.recovery.on_packet_sent(
3676            PacketNumberSpace::Handshake,
3677            packet_number,
3678            SentPacketInfo::new(sent_at, packet_size, true, true),
3679        );
3680
3681        Ok(ClientHandshakePacket {
3682            packet,
3683            packet_number,
3684            packet_number_offset,
3685            crypto_data,
3686        })
3687    }
3688
3689    pub fn build_client_application_stream_packet(
3690        &mut self,
3691        stream_id: u64,
3692        data: Bytes,
3693        fin: bool,
3694    ) -> Result<Option<ClientApplicationPacket>> {
3695        if data.is_empty() && !fin {
3696            return Ok(None);
3697        }
3698        let stream_offset = *self.client_stream_offsets.get(&stream_id).unwrap_or(&0);
3699        self.client_application_flow_control.consume_stream_data(
3700            stream_id,
3701            stream_offset,
3702            data.len(),
3703        )?;
3704        let packet = self.build_client_application_stream_packet_at_offset(
3705            stream_id,
3706            stream_offset,
3707            data,
3708            fin,
3709        )?;
3710        self.client_stream_offsets
3711            .insert(stream_id, stream_offset + packet.data.len() as u64);
3712        Ok(Some(packet))
3713    }
3714
3715    pub fn build_client_h3_zero_rtt_request_packet(
3716        &mut self,
3717        method: &http::Method,
3718        uri: &http::Uri,
3719        headers: impl Into<Headers>,
3720        body: Option<Bytes>,
3721    ) -> Result<ClientApplicationPacket> {
3722        let headers = headers.into();
3723        let stream_id = self.next_client_bidirectional_stream_id;
3724        let h3_headers = native::build_request_headers(method, uri, &headers)?;
3725        let payload =
3726            native::encode_request_stream_with_fingerprint(&h3_headers, body, &self.fingerprint);
3727
3728        let packet = self
3729            .build_client_zero_rtt_stream_packet(stream_id, payload, true)?
3730            .ok_or_else(|| {
3731                Error::HttpProtocol("native H3 0-RTT request produced no payload".into())
3732            })?;
3733        self.next_client_bidirectional_stream_id += 4;
3734        Ok(packet)
3735    }
3736
3737    pub fn build_client_h3_replay_request_packet(
3738        &mut self,
3739        stream_id: u64,
3740        method: &http::Method,
3741        uri: &http::Uri,
3742        headers: impl Into<Headers>,
3743        body: Option<Bytes>,
3744    ) -> Result<ClientApplicationPacket> {
3745        let headers = headers.into();
3746        let h3_headers = native::build_request_headers(method, uri, &headers)?;
3747        let payload =
3748            native::encode_request_stream_with_fingerprint(&h3_headers, body, &self.fingerprint);
3749        let payload_len = payload.len() as u64;
3750        let packet =
3751            self.build_client_application_stream_packet_at_offset(stream_id, 0, payload, true)?;
3752        self.client_stream_offsets.insert(stream_id, payload_len);
3753        Ok(packet)
3754    }
3755
3756    fn build_client_zero_rtt_stream_packet(
3757        &mut self,
3758        stream_id: u64,
3759        data: Bytes,
3760        fin: bool,
3761    ) -> Result<Option<ClientApplicationPacket>> {
3762        if data.is_empty() && !fin {
3763            return Ok(None);
3764        }
3765        let stream_offset = *self.client_stream_offsets.get(&stream_id).unwrap_or(&0);
3766        self.client_application_flow_control.consume_stream_data(
3767            stream_id,
3768            stream_offset,
3769            data.len(),
3770        )?;
3771        let Some(client_early_data_keys) = &self.client_early_data_keys else {
3772            return Err(Error::Quic(
3773                "native 0-RTT packet encryption is waiting for TLS early-data keys".into(),
3774            ));
3775        };
3776
3777        let packet_number = self.next_client_application_packet_number;
3778        let packet_number_len = 2;
3779        let frame = encode_frame(&QuicFrame::Stream {
3780            stream_id,
3781            offset: (stream_offset > 0).then_some(stream_offset),
3782            fin,
3783            data: data.clone(),
3784        });
3785        let header = encode_long_header(&LongHeaderPacket {
3786            packet_type: LongHeaderType::ZeroRtt,
3787            version: QUIC_VERSION_1,
3788            destination_cid: self.destination_cid.clone(),
3789            source_cid: self.source_cid.clone(),
3790            token: Bytes::new(),
3791            packet_number,
3792            packet_number_len,
3793            payload_len: frame.len() + AES_GCM_TAG_LEN,
3794        })?;
3795        let packet_number_offset = header
3796            .len()
3797            .checked_sub(packet_number_len)
3798            .ok_or_else(|| Error::HttpProtocol("invalid QUIC 0-RTT header length".into()))?;
3799        let packet = protect_long_header_packet(
3800            client_early_data_keys,
3801            packet_number,
3802            &header,
3803            packet_number_offset,
3804            packet_number_len,
3805            &frame,
3806        )?;
3807
3808        let now = Instant::now();
3809        let packet_size = packet.len();
3810        self.client_application_loss_detector
3811            .on_packet_sent_at(packet_number, now);
3812        self.client_application_sent_streams.insert(
3813            packet_number,
3814            SentApplicationStreamPacket {
3815                stream_id,
3816                stream_offset,
3817                fin,
3818                data: data.clone(),
3819            },
3820        );
3821        self.recovery.on_packet_sent(
3822            PacketNumberSpace::Application,
3823            packet_number,
3824            SentPacketInfo::new(now, packet_size, true, true),
3825        );
3826        self.next_client_application_packet_number += 1;
3827        self.client_stream_offsets
3828            .insert(stream_id, stream_offset + data.len() as u64);
3829
3830        Ok(Some(ClientApplicationPacket {
3831            packet,
3832            packet_number,
3833            stream_id,
3834            packet_number_offset,
3835            data,
3836        }))
3837    }
3838
3839    fn build_client_application_stream_packet_at_offset(
3840        &mut self,
3841        stream_id: u64,
3842        stream_offset: u64,
3843        data: Bytes,
3844        fin: bool,
3845    ) -> Result<ClientApplicationPacket> {
3846        let Some(client_application_keys) = &self.client_application_keys else {
3847            return Err(Error::Quic(
3848                "native application packet encryption is waiting for TLS application keys".into(),
3849            ));
3850        };
3851
3852        let packet_number = self.next_client_application_packet_number;
3853        let packet_number_len = 2;
3854        let frame = encode_frame(&QuicFrame::Stream {
3855            stream_id,
3856            offset: (stream_offset > 0).then_some(stream_offset),
3857            fin,
3858            data: data.clone(),
3859        });
3860        let packet = protect_short_header_packet(
3861            client_application_keys,
3862            &self.destination_cid,
3863            packet_number,
3864            packet_number_len,
3865            self.write_key_phase,
3866            &frame,
3867        )?;
3868
3869        let now = Instant::now();
3870        let packet_size = packet.len();
3871        self.client_application_loss_detector
3872            .on_packet_sent_at(packet_number, now);
3873        self.client_application_sent_streams.insert(
3874            packet_number,
3875            SentApplicationStreamPacket {
3876                stream_id,
3877                stream_offset,
3878                fin,
3879                data: data.clone(),
3880            },
3881        );
3882        self.recovery.on_packet_sent(
3883            PacketNumberSpace::Application,
3884            packet_number,
3885            SentPacketInfo::new(now, packet_size, true, true),
3886        );
3887        self.next_client_application_packet_number += 1;
3888
3889        Ok(ClientApplicationPacket {
3890            packet,
3891            packet_number,
3892            stream_id,
3893            packet_number_offset: 1 + self.destination_cid.as_bytes().len(),
3894            data,
3895        })
3896    }
3897
3898    pub fn build_client_h3_preface_packets(
3899        &mut self,
3900        fingerprint: &Http3Fingerprint,
3901    ) -> Result<Vec<ClientApplicationPacket>> {
3902        if self.client_application_keys.is_none() {
3903            return Err(Error::Quic(
3904                "native application packet encryption is waiting for TLS application keys".into(),
3905            ));
3906        }
3907
3908        let mut packets = Vec::new();
3909        for stream in native::encode_client_preface_streams(fingerprint) {
3910            let stream_id = self.next_client_unidirectional_stream_id;
3911            self.next_client_unidirectional_stream_id += 4;
3912            let payload = native::encode_unidirectional_stream(&stream);
3913            if let Some(packet) =
3914                self.build_client_application_stream_packet(stream_id, payload, false)?
3915            {
3916                packets.push(packet);
3917            }
3918        }
3919        Ok(packets)
3920    }
3921
3922    pub fn build_client_h3_request_packet(
3923        &mut self,
3924        method: &http::Method,
3925        uri: &http::Uri,
3926        headers: impl Into<Headers>,
3927        body: Option<Bytes>,
3928    ) -> Result<ClientApplicationPacket> {
3929        let headers = headers.into();
3930        if self.client_application_keys.is_none() {
3931            return Err(Error::Quic(
3932                "native application packet encryption is waiting for TLS application keys".into(),
3933            ));
3934        }
3935
3936        let stream_id = self.next_client_bidirectional_stream_id;
3937        let payload = self.encode_client_h3_request_payload(method, uri, &headers, body)?;
3938
3939        let packet = self
3940            .build_client_application_stream_packet(stream_id, payload, true)?
3941            .ok_or_else(|| Error::HttpProtocol("native H3 request produced no payload".into()))?;
3942        self.next_client_bidirectional_stream_id += 4;
3943        Ok(packet)
3944    }
3945
3946    pub fn build_client_h3_request_start_packet(
3947        &mut self,
3948        method: &http::Method,
3949        uri: &http::Uri,
3950        headers: impl Into<Headers>,
3951        body: Option<Bytes>,
3952        fin: bool,
3953    ) -> Result<ClientApplicationPacket> {
3954        let headers = headers.into();
3955        if self.client_application_keys.is_none() {
3956            return Err(Error::Quic(
3957                "native application packet encryption is waiting for TLS application keys".into(),
3958            ));
3959        }
3960
3961        let stream_id = self.next_client_bidirectional_stream_id;
3962        let payload = self.encode_client_h3_request_payload(method, uri, &headers, body)?;
3963
3964        let packet = self
3965            .build_client_application_stream_packet(stream_id, payload, fin)?
3966            .ok_or_else(|| {
3967                Error::HttpProtocol("native H3 request start produced no payload".into())
3968            })?;
3969        self.next_client_bidirectional_stream_id += 4;
3970        Ok(packet)
3971    }
3972
3973    pub fn retire_client_application_packet(&mut self, packet_number: u64) {
3974        self.client_application_loss_detector
3975            .retire_packet(packet_number);
3976        self.client_application_sent_streams.remove(&packet_number);
3977    }
3978
3979    fn encode_client_h3_request_payload(
3980        &self,
3981        method: &http::Method,
3982        uri: &http::Uri,
3983        headers: &Headers,
3984        body: Option<Bytes>,
3985    ) -> Result<Bytes> {
3986        let h3_headers = native::build_request_headers(method, uri, headers)?;
3987        Ok(native::encode_request_stream_with_fingerprint(
3988            &h3_headers,
3989            body,
3990            &self.fingerprint,
3991        ))
3992    }
3993
3994    pub fn build_client_h3_websocket_connect_packet(
3995        &mut self,
3996        uri: &http::Uri,
3997        headers: impl Into<Headers>,
3998    ) -> Result<ClientApplicationPacket> {
3999        let headers = headers.into();
4000        if self.client_application_keys.is_none() {
4001            return Err(Error::Quic(
4002                "native application packet encryption is waiting for TLS application keys".into(),
4003            ));
4004        }
4005
4006        let stream_id = self.next_client_bidirectional_stream_id;
4007        let h3_headers = native::build_websocket_connect_headers(uri, &headers)?;
4008        let payload =
4009            native::encode_request_stream_with_fingerprint(&h3_headers, None, &self.fingerprint);
4010
4011        let packet = self
4012            .build_client_application_stream_packet(stream_id, payload, false)?
4013            .ok_or_else(|| Error::HttpProtocol("native H3 CONNECT produced no payload".into()))?;
4014        self.next_client_bidirectional_stream_id += 4;
4015        Ok(packet)
4016    }
4017
4018    pub fn build_client_h3_data_packet(
4019        &mut self,
4020        stream_id: u64,
4021        data: Bytes,
4022        fin: bool,
4023    ) -> Result<Option<ClientApplicationPacket>> {
4024        let payload = if data.is_empty() {
4025            Bytes::new()
4026        } else {
4027            native::encode_frame(&native::H3Frame::Data(data))
4028        };
4029        self.build_client_application_stream_packet(stream_id, payload, fin)
4030    }
4031
4032    pub fn build_client_reset_stream_packet(
4033        &mut self,
4034        stream_id: u64,
4035        error_code: u64,
4036    ) -> Result<ClientApplicationControlPacket> {
4037        let final_size = *self.client_stream_offsets.get(&stream_id).unwrap_or(&0);
4038        self.build_client_application_control_packet(QuicFrame::ResetStream {
4039            stream_id,
4040            error_code,
4041            final_size,
4042        })
4043    }
4044
4045    pub fn build_client_stop_sending_packet(
4046        &mut self,
4047        stream_id: u64,
4048        error_code: u64,
4049    ) -> Result<ClientApplicationControlPacket> {
4050        self.build_client_application_control_packet(QuicFrame::StopSending {
4051            stream_id,
4052            error_code,
4053        })
4054    }
4055
4056    pub fn build_client_path_response_packet(
4057        &mut self,
4058        data: [u8; 8],
4059    ) -> Result<ClientApplicationControlPacket> {
4060        self.build_client_application_control_packet(QuicFrame::PathResponse(data))
4061    }
4062
4063    pub fn build_client_new_connection_id_packet(
4064        &mut self,
4065        sequence_number: u64,
4066        retire_prior_to: u64,
4067        connection_id: ConnectionId,
4068        stateless_reset_token: [u8; 16],
4069    ) -> Result<ClientApplicationControlPacket> {
4070        if connection_id.as_bytes().is_empty() {
4071            return Err(Error::Quic(
4072                "native QUIC NEW_CONNECTION_ID cannot carry an empty connection id".into(),
4073            ));
4074        }
4075        if retire_prior_to > sequence_number {
4076            return Err(Error::Quic(
4077                "native QUIC NEW_CONNECTION_ID retire_prior_to exceeds sequence_number".into(),
4078            ));
4079        }
4080        self.client_cid_inventory.register_local_issued(
4081            sequence_number,
4082            connection_id.clone(),
4083            stateless_reset_token,
4084        )?;
4085        self.build_client_application_control_packet(QuicFrame::NewConnectionId {
4086            sequence_number,
4087            retire_prior_to,
4088            connection_id: Bytes::copy_from_slice(connection_id.as_bytes()),
4089            stateless_reset_token,
4090        })
4091    }
4092
4093    pub fn build_client_path_challenge_packet(
4094        &mut self,
4095        data: [u8; 8],
4096    ) -> Result<ClientApplicationControlPacket> {
4097        let frame = self.client_path_validator.path_challenge(data);
4098        self.build_client_application_control_packet(frame)
4099    }
4100
4101    pub fn build_client_path_challenge_packet_for_address(
4102        &mut self,
4103        remote_address: SocketAddr,
4104        connection_id_sequence: u64,
4105        data: [u8; 8],
4106    ) -> Result<ClientApplicationControlPacket> {
4107        let destination_cid = self
4108            .client_path_validator
4109            .connection_id(connection_id_sequence)
4110            .cloned()
4111            .ok_or_else(|| {
4112                Error::Quic("native QUIC path migration requires an available connection id".into())
4113            })?;
4114        let frame = self.client_path_validator.path_challenge_for_address(
4115            remote_address,
4116            connection_id_sequence,
4117            data,
4118        )?;
4119        self.build_client_application_control_packet_to(frame, &destination_cid)
4120    }
4121
4122    pub fn build_client_pmtu_probe_packet(
4123        &mut self,
4124        now: Instant,
4125    ) -> Result<Option<ClientApplicationControlPacket>> {
4126        let Some(target_size) = self.client_pmtu_probe.next_probe_size() else {
4127            return Ok(None);
4128        };
4129        let packet = self.build_client_application_probe_packet(target_size, now)?;
4130        self.client_pmtu_probe
4131            .on_probe_sent(packet.packet_number, packet.packet.len(), now);
4132        Ok(Some(packet))
4133    }
4134
4135    pub fn build_client_connection_close_packet(
4136        &mut self,
4137        error_code: u64,
4138        reason: Bytes,
4139    ) -> Result<ClientApplicationControlPacket> {
4140        let packet = self.build_client_application_control_packet(QuicFrame::ConnectionClose {
4141            error_code,
4142            frame_type: None,
4143            reason,
4144        })?;
4145        // RFC9000 § 10.2: emitting a CONNECTION_CLOSE transitions the
4146        // connection into the closing phase. We anchor the timer here so the
4147        // driver does not have to remember to call `client_enter_closing`
4148        // separately on every send path.
4149        self.client_enter_closing(Instant::now());
4150        Ok(packet)
4151    }
4152
4153    pub fn build_client_max_data_packet(
4154        &mut self,
4155        max_data: u64,
4156    ) -> Result<ClientApplicationControlPacket> {
4157        self.build_client_application_control_packet(QuicFrame::MaxData(max_data))
4158    }
4159
4160    pub fn build_client_max_stream_data_packet(
4161        &mut self,
4162        stream_id: u64,
4163        max_stream_data: u64,
4164    ) -> Result<ClientApplicationControlPacket> {
4165        self.build_client_application_control_packet(QuicFrame::MaxStreamData {
4166            stream_id,
4167            max_stream_data,
4168        })
4169    }
4170
4171    pub fn build_client_max_streams_packet(
4172        &mut self,
4173        bidirectional: bool,
4174        max_streams: u64,
4175    ) -> Result<ClientApplicationControlPacket> {
4176        self.build_client_application_control_packet(QuicFrame::MaxStreams {
4177            bidirectional,
4178            max_streams,
4179        })
4180    }
4181
4182    pub fn build_client_flow_control_blocked_packet(
4183        &mut self,
4184    ) -> Result<Option<ClientApplicationControlPacket>> {
4185        self.client_application_flow_control
4186            .take_blocked_frame()
4187            .map(|frame| self.build_client_application_control_packet(frame))
4188            .transpose()
4189    }
4190
4191    pub fn build_client_receive_flow_control_update_packets(
4192        &mut self,
4193    ) -> Result<Vec<ClientApplicationControlPacket>> {
4194        self.client_application_receive_flow_control
4195            .take_update_frames()
4196            .into_iter()
4197            .map(|frame| self.build_client_application_control_packet(frame))
4198            .collect()
4199    }
4200
4201    // Hook for the H3 driver to surface bytes the application has actually
4202    // drained from a streaming response body or RFC 9220 tunnel inbound
4203    // channel. Per RFC 9000 Section 4 these counters drive the absolute
4204    // MAX_DATA / MAX_STREAM_DATA values we are willing to advertise.
4205    pub fn record_client_stream_consumed(&mut self, stream_id: u64, len: u64) -> Result<()> {
4206        self.client_application_receive_flow_control
4207            .record_stream_consumed(stream_id, len)
4208    }
4209
4210    pub fn release_client_stream(&mut self, stream_id: u64) {
4211        self.client_application_receive_flow_control
4212            .release_stream(stream_id);
4213    }
4214
4215    fn build_client_application_control_packet(
4216        &mut self,
4217        frame: QuicFrame,
4218    ) -> Result<ClientApplicationControlPacket> {
4219        let destination_cid = self.destination_cid.clone();
4220        self.build_client_application_control_packet_to(frame, &destination_cid)
4221    }
4222
4223    fn build_client_application_control_packet_to(
4224        &mut self,
4225        frame: QuicFrame,
4226        destination_cid: &ConnectionId,
4227    ) -> Result<ClientApplicationControlPacket> {
4228        self.build_client_application_payload_packet_at_to(
4229            padded_short_header_payload(encode_frame(&frame)),
4230            Instant::now(),
4231            destination_cid,
4232        )
4233    }
4234
4235    fn build_client_application_probe_packet(
4236        &mut self,
4237        target_size: usize,
4238        now: Instant,
4239    ) -> Result<ClientApplicationControlPacket> {
4240        let Some(_client_application_keys) = &self.client_application_keys else {
4241            return Err(Error::Quic(
4242                "native application packet encryption is waiting for TLS application keys".into(),
4243            ));
4244        };
4245        let packet_number_len = 2;
4246        let header_len = 1 + self.destination_cid.as_bytes().len() + packet_number_len;
4247        let tag_len = AES_GCM_TAG_LEN;
4248        let target_payload_len = target_size.saturating_sub(header_len + tag_len);
4249        let mut payload = encode_frame(&QuicFrame::Ping).to_vec();
4250        payload.resize(target_payload_len.max(payload.len()), 0);
4251        self.build_client_application_payload_packet_at(Bytes::from(payload), now)
4252    }
4253
4254    fn build_client_application_payload_packet_at(
4255        &mut self,
4256        payload: Bytes,
4257        now: Instant,
4258    ) -> Result<ClientApplicationControlPacket> {
4259        let destination_cid = self.destination_cid.clone();
4260        self.build_client_application_payload_packet_at_to(payload, now, &destination_cid)
4261    }
4262
4263    fn build_client_application_payload_packet_at_to(
4264        &mut self,
4265        payload: Bytes,
4266        now: Instant,
4267        destination_cid: &ConnectionId,
4268    ) -> Result<ClientApplicationControlPacket> {
4269        let Some(client_application_keys) = &self.client_application_keys else {
4270            return Err(Error::Quic(
4271                "native application packet encryption is waiting for TLS application keys".into(),
4272            ));
4273        };
4274
4275        let packet_number = self.next_client_application_packet_number;
4276        let packet_number_len = 2;
4277        let packet = protect_short_header_packet(
4278            client_application_keys,
4279            destination_cid,
4280            packet_number,
4281            packet_number_len,
4282            self.write_key_phase,
4283            &payload,
4284        )?;
4285        let packet_size = packet.len();
4286        self.client_application_loss_detector
4287            .on_packet_sent_at(packet_number, now);
4288        self.recovery.on_packet_sent(
4289            PacketNumberSpace::Application,
4290            packet_number,
4291            SentPacketInfo::new(now, packet_size, true, true),
4292        );
4293        self.next_client_application_packet_number += 1;
4294
4295        Ok(ClientApplicationControlPacket {
4296            packet,
4297            packet_number,
4298            packet_number_offset: 1 + destination_cid.as_bytes().len(),
4299        })
4300    }
4301
4302    pub fn open_server_application_packet(&mut self, packet: &[u8]) -> Result<Vec<QuicFrame>> {
4303        self.open_server_application_packet_with_ecn(packet, None)
4304    }
4305
4306    pub fn open_server_application_packet_with_ecn(
4307        &mut self,
4308        packet: &[u8],
4309        ecn_mark: Option<QuicEcnMark>,
4310    ) -> Result<Vec<QuicFrame>> {
4311        self.open_server_application_packet_with_path(packet, None, ecn_mark)
4312    }
4313
4314    fn open_server_application_packet_with_path(
4315        &mut self,
4316        packet: &[u8],
4317        remote_address: Option<SocketAddr>,
4318        ecn_mark: Option<QuicEcnMark>,
4319    ) -> Result<Vec<QuicFrame>> {
4320        // RFC9000 § 10.2: stop decrypting peer packets once we are draining;
4321        // closing-phase decryption is preserved so we can still apply the
4322        // § 10.2 MAY-optimisation (closing -> draining on peer CONNECTION_CLOSE).
4323        if self.close_state.is_draining() {
4324            return Ok(Vec::new());
4325        }
4326        let Some(server_application_keys) = self.server_application_keys.as_ref() else {
4327            return Err(Error::Quic(
4328                "native application packet decryption is waiting for TLS application keys".into(),
4329            ));
4330        };
4331        let now = Instant::now();
4332        let opened = try_open_one_rtt_packet(
4333            server_application_keys,
4334            self.server_application_next_keys.as_ref(),
4335            self.server_application_previous.as_ref(),
4336            self.read_key_phase,
4337            now,
4338            packet,
4339            self.source_cid.as_bytes().len(),
4340            self.next_server_application_packet_number,
4341        )?;
4342        if matches!(opened.outcome, OneRttOpenOutcome::Next) {
4343            self.commit_receive_key_update(now)?;
4344        }
4345        let opened = opened.opened;
4346        self.next_server_application_packet_number = opened.packet_number + 1;
4347        let frames = decode_frames(&opened.payload)?;
4348        for frame in &frames {
4349            if let QuicFrame::Stream {
4350                stream_id,
4351                offset,
4352                data,
4353                ..
4354            } = frame
4355            {
4356                self.client_application_receive_flow_control
4357                    .observe_stream_frame(*stream_id, *offset, data.len())?;
4358            }
4359            for packet_number in self.client_application_loss_detector.on_ack_frame(frame)? {
4360                self.client_application_sent_streams.remove(&packet_number);
4361                self.application_key_update.note_packet_acked(packet_number);
4362                self.client_pmtu_probe.on_probe_acked(packet_number);
4363            }
4364            if matches!(frame, QuicFrame::Ack { .. } | QuicFrame::AckEcn { .. }) {
4365                let outcome = self.recovery.on_ack_received(
4366                    PacketNumberSpace::Application,
4367                    frame,
4368                    self.fingerprint.transport.ack_delay_exponent,
4369                    now,
4370                )?;
4371                for (packet_number, _) in outcome.newly_acked {
4372                    self.client_application_sent_streams.remove(&packet_number);
4373                    self.application_key_update.note_packet_acked(packet_number);
4374                    self.client_pmtu_probe.on_probe_acked(packet_number);
4375                }
4376                for (packet_number, _) in &outcome.lost {
4377                    self.client_pmtu_probe.on_probe_lost(*packet_number);
4378                }
4379                self.client_application_recovery_lost_packets.extend(
4380                    outcome
4381                        .lost
4382                        .into_iter()
4383                        .map(|(packet_number, _)| packet_number),
4384                );
4385                if outcome.ecn_congestion {
4386                    self.client_application_ecn_congestion = true;
4387                }
4388            }
4389            match frame {
4390                QuicFrame::MaxData(max_data) => {
4391                    self.client_application_flow_control
4392                        .apply_max_data(*max_data);
4393                }
4394                QuicFrame::MaxStreamData {
4395                    stream_id,
4396                    max_stream_data,
4397                } => self
4398                    .client_application_flow_control
4399                    .apply_max_stream_data(*stream_id, *max_stream_data),
4400                QuicFrame::MaxStreams {
4401                    bidirectional,
4402                    max_streams,
4403                } => self
4404                    .client_application_flow_control
4405                    .apply_max_streams(*bidirectional, *max_streams),
4406                QuicFrame::NewConnectionId {
4407                    sequence_number,
4408                    retire_prior_to,
4409                    connection_id,
4410                    stateless_reset_token,
4411                } => self.client_path_validator.register_connection_id(
4412                    *sequence_number,
4413                    *retire_prior_to,
4414                    ConnectionId::from_bytes(connection_id.clone())?,
4415                    *stateless_reset_token,
4416                )?,
4417                QuicFrame::PathResponse(data) => {
4418                    if let Some(remote_address) = remote_address {
4419                        if self
4420                            .client_path_validator
4421                            .on_path_response_from(remote_address, *data)
4422                        {
4423                            if let Some(connection_id) = self
4424                                .client_path_validator
4425                                .migration_connection_id(&remote_address)
4426                                .cloned()
4427                            {
4428                                self.destination_cid = connection_id;
4429                            }
4430                        }
4431                    } else {
4432                        self.client_path_validator.on_path_response(*data);
4433                    }
4434                }
4435                _ => {}
4436            }
4437        }
4438        if frames.iter().any(is_ack_eliciting_quic_frame) {
4439            observe_packet_with_ecn(
4440                &mut self.application_ack_tracker,
4441                opened.packet_number,
4442                ecn_mark,
4443                now,
4444            );
4445        }
4446        Ok(frames.into_iter().filter(is_not_padding_frame).collect())
4447    }
4448
4449    pub fn open_server_h3_stream_packet(
4450        &mut self,
4451        packet: &[u8],
4452    ) -> Result<Vec<ServerH3StreamEvent>> {
4453        Ok(self
4454            .open_server_h3_event_packet(packet)?
4455            .into_iter()
4456            .filter_map(|event| match event {
4457                ServerH3Event::Stream(event) => Some(event),
4458                ServerH3Event::ResetStream { .. }
4459                | ServerH3Event::StopSending { .. }
4460                | ServerH3Event::ConnectionClose { .. }
4461                | ServerH3Event::PathChallenge(_) => None,
4462            })
4463            .collect())
4464    }
4465
4466    pub fn open_server_h3_event_packet(&mut self, packet: &[u8]) -> Result<Vec<ServerH3Event>> {
4467        self.open_server_h3_event_packet_with_ecn(packet, None)
4468    }
4469
4470    pub fn open_server_h3_event_packet_from(
4471        &mut self,
4472        packet: &[u8],
4473        remote_address: SocketAddr,
4474    ) -> Result<Vec<ServerH3Event>> {
4475        self.open_server_h3_event_packet_with_path_ecn(packet, Some(remote_address), None)
4476    }
4477
4478    pub fn open_server_h3_event_packet_from_with_ecn(
4479        &mut self,
4480        packet: &[u8],
4481        remote_address: SocketAddr,
4482        ecn_mark: Option<QuicEcnMark>,
4483    ) -> Result<Vec<ServerH3Event>> {
4484        self.open_server_h3_event_packet_with_path_ecn(packet, Some(remote_address), ecn_mark)
4485    }
4486
4487    pub fn open_server_h3_event_packet_with_ecn(
4488        &mut self,
4489        packet: &[u8],
4490        ecn_mark: Option<QuicEcnMark>,
4491    ) -> Result<Vec<ServerH3Event>> {
4492        self.open_server_h3_event_packet_with_path_ecn(packet, None, ecn_mark)
4493    }
4494
4495    fn open_server_h3_event_packet_with_path_ecn(
4496        &mut self,
4497        packet: &[u8],
4498        remote_address: Option<SocketAddr>,
4499        ecn_mark: Option<QuicEcnMark>,
4500    ) -> Result<Vec<ServerH3Event>> {
4501        // RFC9000 § 10.2: once we are draining we MUST drop inbound packets.
4502        // Closing-phase parsing remains active so the MAY optimisation in
4503        // § 10.2 ("transition from closing to draining if you can confirm
4504        // the peer is also closing") fires when the peer sends us a
4505        // CONNECTION_CLOSE in response to ours.
4506        if self.close_state.is_draining() {
4507            return Ok(Vec::new());
4508        }
4509        let mut events = Vec::new();
4510        for frame in
4511            self.open_server_application_packet_with_path(packet, remote_address, ecn_mark)?
4512        {
4513            match frame {
4514                QuicFrame::Stream {
4515                    stream_id,
4516                    offset,
4517                    fin,
4518                    data,
4519                    ..
4520                } => {
4521                    if let Some(event) =
4522                        self.apply_server_quic_stream_frame(stream_id, offset, fin, data)?
4523                    {
4524                        events.push(ServerH3Event::Stream(event));
4525                    }
4526                }
4527                QuicFrame::ResetStream {
4528                    stream_id,
4529                    error_code,
4530                    final_size,
4531                } => events.push(ServerH3Event::ResetStream {
4532                    stream_id,
4533                    error_code,
4534                    final_size,
4535                }),
4536                QuicFrame::StopSending {
4537                    stream_id,
4538                    error_code,
4539                } => events.push(ServerH3Event::StopSending {
4540                    stream_id,
4541                    error_code,
4542                }),
4543                QuicFrame::ConnectionClose {
4544                    error_code,
4545                    frame_type,
4546                    reason,
4547                } => {
4548                    // RFC9000 § 10.2: peer CONNECTION_CLOSE transitions us
4549                    // into the draining phase. Drivers must stop sending
4550                    // packets and may emit at most one CONNECTION_CLOSE in
4551                    // response.
4552                    self.close_draining = true;
4553                    self.close_state.enter_draining(Instant::now());
4554                    events.push(ServerH3Event::ConnectionClose {
4555                        error_code,
4556                        frame_type,
4557                        reason,
4558                    });
4559                }
4560                QuicFrame::PathChallenge(data) => events.push(ServerH3Event::PathChallenge(data)),
4561                QuicFrame::PathResponse(_) => {}
4562                QuicFrame::Padding
4563                | QuicFrame::Ping
4564                | QuicFrame::Ack { .. }
4565                | QuicFrame::AckEcn { .. }
4566                | QuicFrame::Crypto { .. }
4567                | QuicFrame::MaxData(_)
4568                | QuicFrame::MaxStreamData { .. }
4569                | QuicFrame::MaxStreams { .. }
4570                | QuicFrame::DataBlocked { .. }
4571                | QuicFrame::StreamDataBlocked { .. }
4572                | QuicFrame::StreamsBlocked { .. }
4573                | QuicFrame::NewConnectionId { .. }
4574                | QuicFrame::RetireConnectionId { .. }
4575                | QuicFrame::HandshakeDone => {}
4576            }
4577        }
4578        Ok(events)
4579    }
4580
4581    fn apply_server_quic_stream_frame(
4582        &mut self,
4583        stream_id: u64,
4584        offset: Option<u64>,
4585        fin: bool,
4586        data: Bytes,
4587    ) -> Result<Option<ServerH3StreamEvent>> {
4588        apply_h3_stream_frame(
4589            &mut self.server_h3_stream_buffers,
4590            &mut self.server_h3_stream_buffer_offsets,
4591            &mut self.server_h3_stream_types,
4592            stream_id,
4593            offset,
4594            fin,
4595            data,
4596        )
4597    }
4598
4599    pub fn process_server_datagram(
4600        &mut self,
4601        datagram: &[u8],
4602    ) -> Result<Vec<ProcessedServerInitial>> {
4603        self.process_server_datagram_with_ecn(datagram, None)
4604    }
4605
4606    pub fn process_server_datagram_with_ecn(
4607        &mut self,
4608        datagram: &[u8],
4609        ecn_mark: Option<QuicEcnMark>,
4610    ) -> Result<Vec<ProcessedServerInitial>> {
4611        if is_version_negotiation_datagram(datagram) {
4612            return self.process_version_negotiation_datagram(datagram);
4613        }
4614
4615        let mut processed = Vec::new();
4616        for packet in split_long_header_datagram(datagram)? {
4617            match packet.packet_type {
4618                LongHeaderType::Initial => {
4619                    let opened = open_long_header_packet(
4620                        &self.server_initial_keys,
4621                        &packet.packet,
4622                        packet.packet_number_offset,
4623                        self.next_server_initial_packet_number,
4624                    )?;
4625                    self.destination_cid = packet.source_cid.clone();
4626                    self.server_initial_or_handshake_seen = true;
4627                    observe_packet_with_ecn(
4628                        &mut self.initial_ack_tracker,
4629                        opened.packet_number,
4630                        ecn_mark,
4631                        Instant::now(),
4632                    );
4633                    self.next_server_initial_packet_number = opened.packet_number + 1;
4634
4635                    for frame in decode_frames(&opened.payload)? {
4636                        for packet_number in
4637                            self.client_initial_loss_detector.on_ack_frame(&frame)?
4638                        {
4639                            self.client_initial_sent_crypto.remove(&packet_number);
4640                        }
4641                        let outcome = self.recovery.on_ack_received(
4642                            PacketNumberSpace::Initial,
4643                            &frame,
4644                            self.fingerprint.transport.ack_delay_exponent,
4645                            Instant::now(),
4646                        )?;
4647                        for (packet_number, _) in outcome.newly_acked {
4648                            self.client_initial_sent_crypto.remove(&packet_number);
4649                        }
4650                        if let QuicFrame::Crypto { offset, data } = frame {
4651                            self.initial_crypto.insert(offset, data)?;
4652                        }
4653                    }
4654
4655                    let crypto_data = self.initial_crypto.take_contiguous();
4656                    if crypto_data.is_empty() {
4657                        continue;
4658                    }
4659
4660                    self.tls
4661                        .provide_crypto(QuicEncryptionLevel::Initial, &crypto_data)?;
4662                    let secrets = self.tls.secrets();
4663                    self.install_tls_secrets(&secrets)?;
4664                    self.validate_server_transport_parameters_if_available()?;
4665                    processed.push(ProcessedServerInitial {
4666                        packet_number: opened.packet_number,
4667                        crypto_data,
4668                        initial_crypto_out: self.tls.take_crypto(QuicEncryptionLevel::Initial),
4669                        handshake_crypto_out: self.tls.take_crypto(QuicEncryptionLevel::Handshake),
4670                        secrets,
4671                    });
4672                }
4673                LongHeaderType::Handshake => {
4674                    let Some(server_handshake_keys) = &self.server_handshake_keys else {
4675                        return Err(Error::Quic(
4676                            "native Handshake packet decryption is waiting for TLS Handshake keys"
4677                                .into(),
4678                        ));
4679                    };
4680                    let opened = open_long_header_packet(
4681                        server_handshake_keys,
4682                        &packet.packet,
4683                        packet.packet_number_offset,
4684                        self.next_server_handshake_packet_number,
4685                    )?;
4686                    self.server_initial_or_handshake_seen = true;
4687                    observe_packet_with_ecn(
4688                        &mut self.handshake_ack_tracker,
4689                        opened.packet_number,
4690                        ecn_mark,
4691                        Instant::now(),
4692                    );
4693                    self.next_server_handshake_packet_number = opened.packet_number + 1;
4694
4695                    for frame in decode_frames(&opened.payload)? {
4696                        for packet_number in
4697                            self.client_handshake_loss_detector.on_ack_frame(&frame)?
4698                        {
4699                            self.client_handshake_sent_crypto.remove(&packet_number);
4700                        }
4701                        let outcome = self.recovery.on_ack_received(
4702                            PacketNumberSpace::Handshake,
4703                            &frame,
4704                            self.fingerprint.transport.ack_delay_exponent,
4705                            Instant::now(),
4706                        )?;
4707                        for (packet_number, _) in outcome.newly_acked {
4708                            self.client_handshake_sent_crypto.remove(&packet_number);
4709                        }
4710                        if let QuicFrame::Crypto { offset, data } = frame {
4711                            self.handshake_crypto.insert(offset, data)?;
4712                        }
4713                    }
4714
4715                    let crypto_data = self.handshake_crypto.take_contiguous();
4716                    if !crypto_data.is_empty() {
4717                        self.tls
4718                            .provide_crypto(QuicEncryptionLevel::Handshake, &crypto_data)?;
4719                        let secrets = self.tls.secrets();
4720                        self.install_tls_secrets(&secrets)?;
4721                        self.validate_server_transport_parameters_if_available()?;
4722                        let handshake_crypto_out =
4723                            self.tls.take_crypto(QuicEncryptionLevel::Handshake);
4724                        if !handshake_crypto_out.is_empty() {
4725                            processed.push(ProcessedServerInitial {
4726                                packet_number: opened.packet_number,
4727                                crypto_data,
4728                                initial_crypto_out: Bytes::new(),
4729                                handshake_crypto_out,
4730                                secrets,
4731                            });
4732                        }
4733                    }
4734                }
4735                LongHeaderType::Retry => {
4736                    self.process_retry_packet(packet.packet.as_ref())?;
4737                }
4738                LongHeaderType::ZeroRtt => {}
4739            }
4740        }
4741
4742        Ok(processed)
4743    }
4744
4745    fn process_version_negotiation_datagram(
4746        &mut self,
4747        datagram: &[u8],
4748    ) -> Result<Vec<ProcessedServerInitial>> {
4749        let packet = decode_version_negotiation_packet(datagram)?;
4750        if packet.destination_cid != self.source_cid || packet.source_cid != self.destination_cid {
4751            return Ok(Vec::new());
4752        }
4753        if self.vn_received {
4754            return Ok(Vec::new());
4755        }
4756        if packet
4757            .supported_versions
4758            .contains(&self.client_initial_version)
4759        {
4760            return Ok(Vec::new());
4761        }
4762
4763        let chosen_version = self
4764            .supported_versions
4765            .iter()
4766            .copied()
4767            .find(|version| packet.supported_versions.contains(version));
4768        let Some(chosen_version) = chosen_version else {
4769            return Err(Error::Quic(format!(
4770                "version_negotiation_failed: native H3 server did not offer QUIC version 1 or any other version we support (offered {:?})",
4771                packet.supported_versions,
4772            )));
4773        };
4774
4775        self.restart_for_version_negotiation(chosen_version)?;
4776        Ok(Vec::new())
4777    }
4778
4779    fn process_retry_packet(&mut self, retry_packet: &[u8]) -> Result<()> {
4780        if self.retry_received {
4781            return Ok(());
4782        }
4783        if self.server_initial_or_handshake_seen {
4784            return Ok(());
4785        }
4786
4787        let retry =
4788            match validate_retry_integrity_tag_v1(&self.original_destination_cid, retry_packet) {
4789                Ok(retry) => retry,
4790                Err(_) => return Ok(()),
4791            };
4792        if retry.destination_cid != self.source_cid {
4793            return Ok(());
4794        }
4795        if retry.source_cid.as_bytes() == self.original_destination_cid.as_bytes() {
4796            return Ok(());
4797        }
4798        if retry.token.is_empty() {
4799            return Ok(());
4800        }
4801
4802        let retry_keys = derive_initial_key_material(retry.source_cid.as_bytes())?;
4803        let packet_number = self.next_client_initial_packet_number;
4804        let retry_initial = build_client_initial_packet_with_token_and_version(
4805            &self.fingerprint,
4806            self.client_initial.crypto_data.clone(),
4807            self.client_initial.transport_parameters.clone(),
4808            self.client_initial.secrets.clone(),
4809            retry.source_cid.clone(),
4810            self.source_cid.clone(),
4811            retry.token,
4812            packet_number,
4813            self.client_initial_version,
4814        )?;
4815
4816        self.destination_cid = retry.source_cid.clone();
4817        self.retry_source_cid = Some(retry.source_cid);
4818        self.retry_received = true;
4819        self.client_initial_keys = retry_keys.client;
4820        self.server_initial_keys = retry_keys.server;
4821        self.client_initial = retry_initial.clone();
4822        self.pending_client_initial = Some(retry_initial);
4823        self.next_client_initial_packet_number = packet_number + 1;
4824        self.client_initial_loss_detector = QuicLossDetector::default();
4825        self.client_initial_sent_crypto.clear();
4826        self.recovery = recovery_state_from_transport(&self.fingerprint.transport);
4827        Ok(())
4828    }
4829
4830    fn restart_for_version_negotiation(&mut self, chosen_version: u32) -> Result<()> {
4831        let new_source_cid = random_connection_id(self.source_cid.as_bytes().len())?;
4832        let mut new_tls =
4833            NativeQuicTlsSession::client_with_initial_source_connection_id_and_verify_peer(
4834                &self.server_name,
4835                &self.fingerprint,
4836                &new_source_cid,
4837                self.tls_fingerprint.as_ref(),
4838                self.verify_peer,
4839                &self.root_certs,
4840                self.use_platform_roots,
4841            )?;
4842        let captured = new_tls.take_client_initial();
4843        let new_initial = build_client_initial_packet_from_capture_with_version_and_size(
4844            captured,
4845            self.destination_cid.clone(),
4846            new_source_cid.clone(),
4847            chosen_version,
4848            self.fingerprint.transport.initial_datagram_size,
4849        )?;
4850        let initial_keys = derive_initial_key_material(self.destination_cid.as_bytes())?;
4851
4852        self.tls = new_tls;
4853        self.source_cid = new_source_cid;
4854        self.client_initial_version = chosen_version;
4855        self.vn_received = true;
4856        self.retry_received = false;
4857        self.retry_source_cid = None;
4858        self.server_initial_or_handshake_seen = false;
4859        self.server_transport_parameters_validated = false;
4860        self.close_draining = false;
4861        self.client_initial = new_initial.clone();
4862        self.pending_client_initial = Some(new_initial);
4863        self.client_initial_keys = initial_keys.client;
4864        self.server_initial_keys = initial_keys.server;
4865        self.client_handshake_keys = None;
4866        self.server_handshake_keys = None;
4867        self.client_application_keys = None;
4868        self.server_application_keys = None;
4869        self.initial_crypto = QuicCryptoAssembler::default();
4870        self.handshake_crypto = QuicCryptoAssembler::default();
4871        self.initial_ack_tracker = QuicAckTracker::default();
4872        self.handshake_ack_tracker = QuicAckTracker::default();
4873        self.application_ack_tracker = QuicAckTracker::default();
4874        self.client_initial_loss_detector = QuicLossDetector::default();
4875        self.client_handshake_loss_detector = QuicLossDetector::default();
4876        self.client_application_loss_detector = QuicLossDetector::default();
4877        self.client_application_flow_control =
4878            QuicApplicationFlowControl::client(&self.fingerprint.transport);
4879        self.client_application_receive_flow_control =
4880            QuicReceiveFlowControl::client(&self.fingerprint.transport);
4881        self.client_initial_sent_crypto.clear();
4882        self.client_handshake_sent_crypto.clear();
4883        self.client_application_sent_streams.clear();
4884        self.client_application_recovery_lost_packets.clear();
4885        self.client_path_validator = QuicPathValidator::default();
4886        self.client_cid_inventory = new_client_cid_inventory(&self.fingerprint, &self.source_cid);
4887        self.client_pmtu_probe = QuicPmtuProbePolicy::from_transport(&self.fingerprint.transport);
4888        self.recovery = recovery_state_from_transport(&self.fingerprint.transport);
4889        self.next_client_initial_packet_number = 1;
4890        self.next_server_initial_packet_number = 0;
4891        self.next_server_handshake_packet_number = 0;
4892        self.next_client_handshake_packet_number = 0;
4893        self.next_server_application_packet_number = 0;
4894        self.next_client_application_packet_number = 0;
4895        self.next_client_bidirectional_stream_id = 0;
4896        self.next_client_unidirectional_stream_id = 2;
4897        self.client_handshake_crypto_offset = 0;
4898        self.client_stream_offsets.clear();
4899        self.server_h3_stream_buffers.clear();
4900        self.server_h3_stream_buffer_offsets.clear();
4901        self.server_h3_stream_types.clear();
4902        Ok(())
4903    }
4904
4905    fn validate_server_transport_parameters_if_available(&mut self) -> Result<()> {
4906        if self.server_transport_parameters_validated {
4907            return Ok(());
4908        }
4909        let peer_transport_parameters = self.tls.peer_transport_parameters();
4910        if peer_transport_parameters.is_empty() {
4911            return Ok(());
4912        }
4913        self.validate_server_transport_parameters(peer_transport_parameters.as_ref())?;
4914        self.server_transport_parameters_validated = true;
4915        Ok(())
4916    }
4917
4918    fn validate_server_transport_parameters(&self, encoded: &[u8]) -> Result<()> {
4919        let mut original_destination_cid = None;
4920        let mut initial_source_cid = None;
4921        let mut retry_source_cid = None;
4922
4923        for parameter in decode_transport_parameters(encoded)? {
4924            match parameter {
4925                TransportParameter::OriginalDestinationConnectionId(value) => {
4926                    original_destination_cid = Some(value);
4927                }
4928                TransportParameter::InitialSourceConnectionId(value) => {
4929                    initial_source_cid = Some(value);
4930                }
4931                TransportParameter::RetrySourceConnectionId(value) => {
4932                    retry_source_cid = Some(value);
4933                }
4934                _ => {}
4935            }
4936        }
4937
4938        let original_destination_cid = original_destination_cid.ok_or_else(|| {
4939            Error::Quic("native H3 server omitted original_destination_connection_id".into())
4940        })?;
4941        if original_destination_cid.as_ref() != self.original_destination_cid.as_bytes() {
4942            return Err(Error::Quic(
4943                "native H3 server original_destination_connection_id mismatch".into(),
4944            ));
4945        }
4946
4947        let initial_source_cid = initial_source_cid.ok_or_else(|| {
4948            Error::Quic("native H3 server omitted initial_source_connection_id".into())
4949        })?;
4950        if initial_source_cid.as_ref() != self.destination_cid.as_bytes() {
4951            return Err(Error::Quic(
4952                "native H3 server initial_source_connection_id mismatch".into(),
4953            ));
4954        }
4955
4956        match (&self.retry_source_cid, retry_source_cid) {
4957            (Some(expected), Some(actual)) if actual.as_ref() == expected.as_bytes() => Ok(()),
4958            (Some(_), Some(_)) => Err(Error::Quic(
4959                "native H3 server retry_source_connection_id mismatch".into(),
4960            )),
4961            (Some(_), None) => Err(Error::Quic(
4962                "native H3 server omitted retry_source_connection_id".into(),
4963            )),
4964            (None, Some(_)) => Err(Error::Quic(
4965                "native H3 server sent unexpected retry_source_connection_id".into(),
4966            )),
4967            (None, None) => Ok(()),
4968        }
4969    }
4970}
4971
4972fn is_version_negotiation_datagram(datagram: &[u8]) -> bool {
4973    datagram.len() >= 5
4974        && datagram[0] & 0x80 != 0
4975        && u32::from_be_bytes([datagram[1], datagram[2], datagram[3], datagram[4]]) == 0
4976}
4977
4978#[allow(clippy::too_many_arguments)]
4979fn build_client_initial_packet_with_token_and_version(
4980    fingerprint: &Http3Fingerprint,
4981    crypto_data: Bytes,
4982    transport_parameters: Bytes,
4983    secrets: Vec<QuicTlsSecret>,
4984    destination_cid: ConnectionId,
4985    source_cid: ConnectionId,
4986    token: Bytes,
4987    packet_number: u64,
4988    version: u32,
4989) -> Result<ClientInitialPacket> {
4990    let header_len_without_length = 1
4991        + 4
4992        + 1
4993        + destination_cid.as_bytes().len()
4994        + 1
4995        + source_cid.as_bytes().len()
4996        + varint_len(token.len() as u64)
4997        + token.len();
4998    let padded_plaintext_len = initial_plaintext_len(
4999        header_len_without_length,
5000        crypto_data.len(),
5001        fingerprint.transport.initial_datagram_size,
5002    );
5003    let payload_len = padded_plaintext_len + AES_GCM_TAG_LEN;
5004    let header = encode_long_header(&LongHeaderPacket {
5005        packet_type: LongHeaderType::Initial,
5006        version,
5007        destination_cid: destination_cid.clone(),
5008        source_cid,
5009        token,
5010        packet_number,
5011        packet_number_len: INITIAL_PACKET_NUMBER_LEN,
5012        payload_len,
5013    })?;
5014    let packet_number_offset = header
5015        .len()
5016        .checked_sub(INITIAL_PACKET_NUMBER_LEN)
5017        .ok_or_else(|| Error::HttpProtocol("invalid QUIC Initial header length".into()))?;
5018    let keys = derive_initial_key_material(destination_cid.as_bytes())?;
5019    let packet = build_initial_crypto_packet(
5020        &keys.client,
5021        packet_number,
5022        &header,
5023        packet_number_offset,
5024        INITIAL_PACKET_NUMBER_LEN,
5025        &crypto_data,
5026        padded_plaintext_len,
5027    )?;
5028
5029    Ok(ClientInitialPacket {
5030        packet,
5031        header,
5032        packet_number_offset,
5033        crypto_data,
5034        transport_parameters,
5035        secrets,
5036    })
5037}
5038
5039fn random_connection_id(len: usize) -> Result<ConnectionId> {
5040    let mut bytes = vec![0u8; len];
5041    getrandom_fill(&mut bytes)
5042        .map_err(|err| Error::Quic(format!("native H3 connection id RNG failed: {err}")))?;
5043    ConnectionId::from_bytes(Bytes::from(bytes))
5044}
5045
5046fn initial_plaintext_len(
5047    header_len_without_length: usize,
5048    crypto_data_len: usize,
5049    initial_datagram_size: usize,
5050) -> usize {
5051    let target_datagram_len = initial_datagram_size.max(1200);
5052    let crypto_frame_len = 1 + 1 + varint_len(crypto_data_len as u64) + crypto_data_len;
5053    let mut padded_len = crypto_frame_len;
5054    loop {
5055        let payload_len = padded_len + AES_GCM_TAG_LEN;
5056        let header_len = header_len_without_length
5057            + varint_len((payload_len + INITIAL_PACKET_NUMBER_LEN) as u64)
5058            + INITIAL_PACKET_NUMBER_LEN;
5059        if header_len + payload_len >= target_datagram_len {
5060            return padded_len;
5061        }
5062        padded_len = target_datagram_len - header_len - AES_GCM_TAG_LEN;
5063    }
5064}
5065
5066fn varint_len(value: u64) -> usize {
5067    match value {
5068        0..=0x3f => 1,
5069        0x40..=0x3fff => 2,
5070        0x4000..=0x3fff_ffff => 4,
5071        _ => 8,
5072    }
5073}
5074
5075fn build_server_crypto_packet(
5076    packet_type: LongHeaderType,
5077    keys: &QuicPacketKeyMaterial,
5078    destination_cid: &ConnectionId,
5079    source_cid: &ConnectionId,
5080    packet_number: u64,
5081    crypto_offset: u64,
5082    crypto_data: Bytes,
5083) -> Result<ServerHandshakePacket> {
5084    let packet_number_len = 2;
5085    let frame = encode_frame(&QuicFrame::Crypto {
5086        offset: crypto_offset,
5087        data: crypto_data.clone(),
5088    });
5089    let header = encode_long_header(&LongHeaderPacket {
5090        packet_type,
5091        version: 1,
5092        destination_cid: destination_cid.clone(),
5093        source_cid: source_cid.clone(),
5094        token: Bytes::new(),
5095        packet_number,
5096        packet_number_len,
5097        payload_len: frame.len() + 16,
5098    })?;
5099    let packet_number_offset = header
5100        .len()
5101        .checked_sub(packet_number_len)
5102        .ok_or_else(|| Error::HttpProtocol("invalid QUIC server long-header length".into()))?;
5103    let packet = protect_long_header_packet(
5104        keys,
5105        packet_number,
5106        &header,
5107        packet_number_offset,
5108        packet_number_len,
5109        &frame,
5110    )?;
5111
5112    Ok(ServerHandshakePacket {
5113        packet,
5114        packet_type,
5115        packet_number,
5116        packet_number_offset,
5117        crypto_data,
5118    })
5119}
5120
5121fn apply_h3_stream_frame(
5122    buffers: &mut BTreeMap<u64, BytesMut>,
5123    buffer_offsets: &mut BTreeMap<u64, u64>,
5124    stream_types: &mut BTreeMap<u64, native::H3StreamType>,
5125    stream_id: u64,
5126    offset: Option<u64>,
5127    fin: bool,
5128    data: Bytes,
5129) -> Result<Option<ServerH3StreamEvent>> {
5130    let (stream_type, frames) = if data.is_empty() {
5131        (stream_types.get(&stream_id).copied(), Vec::new())
5132    } else if is_unidirectional_stream(stream_id) {
5133        let stream_type = if let Some(stream_type) = stream_types.get(&stream_id).copied() {
5134            let buffer = buffers.entry(stream_id).or_default();
5135            buffer.extend_from_slice(&data);
5136            stream_type
5137        } else {
5138            let buffer = buffers.entry(stream_id).or_default();
5139            buffer.extend_from_slice(&data);
5140            let stream = match native::decode_unidirectional_stream(buffer.as_ref()) {
5141                Ok(stream) => stream,
5142                Err(error) if !fin && is_incomplete_h3_data_error(&error) => {
5143                    return Ok(None);
5144                }
5145                Err(error) => return Err(error),
5146            };
5147            stream_types.insert(stream_id, stream.stream_type);
5148            *buffer = BytesMut::from(stream.payload.as_ref());
5149            stream.stream_type
5150        };
5151        let buffer = buffers.entry(stream_id).or_default();
5152        let frames = if buffer.is_empty() {
5153            Vec::new()
5154        } else if !matches!(stream_type, native::H3StreamType::Control) {
5155            buffer.clear();
5156            Vec::new()
5157        } else {
5158            match native::decode_frames(buffer.as_ref()) {
5159                Ok(frames) => {
5160                    buffer.clear();
5161                    frames
5162                }
5163                Err(error) if !fin && is_incomplete_h3_data_error(&error) => {
5164                    return Ok(None);
5165                }
5166                Err(error) => return Err(error),
5167            }
5168        };
5169        (Some(stream_type), frames)
5170    } else {
5171        let stream_offset = offset.unwrap_or(0);
5172        let buffer_base = *buffer_offsets.entry(stream_id).or_insert(0);
5173        let buffer = buffers.entry(stream_id).or_default();
5174        let buffered_end = buffer_base
5175            .checked_add(buffer.len() as u64)
5176            .ok_or_else(|| Error::HttpProtocol("native H3 stream range overflow".into()))?;
5177        let data_end = stream_offset
5178            .checked_add(data.len() as u64)
5179            .ok_or_else(|| Error::HttpProtocol("native H3 stream range overflow".into()))?;
5180        if data_end <= buffer_base || stream_offset > buffered_end {
5181            return Ok(None);
5182        }
5183        let already_buffered = usize::try_from(buffered_end - stream_offset)
5184            .map_err(|_| Error::HttpProtocol("native H3 stream overlap exceeds usize".into()))?;
5185        if already_buffered < data.len() {
5186            buffer.extend_from_slice(&data[already_buffered..]);
5187        }
5188        match native::decode_frames(buffer.as_ref()) {
5189            Ok(frames) => {
5190                let consumed = buffer.len() as u64;
5191                buffer.clear();
5192                buffer_offsets.insert(
5193                    stream_id,
5194                    buffer_base.checked_add(consumed).ok_or_else(|| {
5195                        Error::HttpProtocol("native H3 stream range overflow".into())
5196                    })?,
5197                );
5198                (None, frames)
5199            }
5200            Err(error) if !fin && is_incomplete_h3_data_error(&error) => {
5201                return Ok(None);
5202            }
5203            Err(error) => return Err(error),
5204        }
5205    };
5206
5207    Ok(Some(ServerH3StreamEvent {
5208        stream_id,
5209        stream_type,
5210        fin,
5211        frames,
5212    }))
5213}
5214
5215fn is_unidirectional_stream(stream_id: u64) -> bool {
5216    stream_id & 0x02 != 0
5217}
5218
5219fn is_bidirectional_stream(stream_id: u64) -> bool {
5220    !is_unidirectional_stream(stream_id)
5221}
5222
5223fn stream_initiator(stream_id: u64) -> u64 {
5224    stream_id & 0x01
5225}
5226
5227fn is_ack_eliciting_quic_frame(frame: &QuicFrame) -> bool {
5228    !matches!(
5229        frame,
5230        QuicFrame::Padding | QuicFrame::Ack { .. } | QuicFrame::AckEcn { .. }
5231    )
5232}
5233
5234fn is_not_padding_frame(frame: &QuicFrame) -> bool {
5235    !matches!(frame, QuicFrame::Padding)
5236}
5237
5238fn padded_short_header_payload(payload: Bytes) -> Bytes {
5239    const MIN_SHORT_HEADER_PAYLOAD_LEN: usize = 24;
5240    if payload.len() >= MIN_SHORT_HEADER_PAYLOAD_LEN {
5241        return payload;
5242    }
5243    let mut padded = payload.to_vec();
5244    padded.resize(MIN_SHORT_HEADER_PAYLOAD_LEN, 0);
5245    Bytes::from(padded)
5246}
5247
5248fn is_incomplete_h3_data_error(error: &Error) -> bool {
5249    let message = error.to_string();
5250    message.contains("truncated HTTP/3 frame")
5251        || message.contains("missing HTTP/3 varint")
5252        || message.contains("truncated HTTP/3 varint")
5253}
5254
5255fn build_ack_packet(
5256    packet_type: LongHeaderType,
5257    keys: &QuicPacketKeyMaterial,
5258    destination_cid: &ConnectionId,
5259    source_cid: &ConnectionId,
5260    tracker: &mut QuicAckTracker,
5261    packet_number: u64,
5262) -> Result<Option<ClientAckPacket>> {
5263    if tracker.is_empty() {
5264        return Ok(None);
5265    }
5266
5267    let packet_number_len = 2;
5268    let frame = encode_frame(&tracker.to_ack_frame(0)?);
5269    let header = encode_long_header(&LongHeaderPacket {
5270        packet_type,
5271        version: 1,
5272        destination_cid: destination_cid.clone(),
5273        source_cid: source_cid.clone(),
5274        token: Bytes::new(),
5275        packet_number,
5276        packet_number_len,
5277        payload_len: frame.len() + 16,
5278    })?;
5279    let packet_number_offset = header
5280        .len()
5281        .checked_sub(packet_number_len)
5282        .ok_or_else(|| Error::HttpProtocol("invalid QUIC ACK header length".into()))?;
5283    let packet = protect_long_header_packet(
5284        keys,
5285        packet_number,
5286        &header,
5287        packet_number_offset,
5288        packet_number_len,
5289        &frame,
5290    )?;
5291    tracker.mark_ack_sent();
5292
5293    Ok(Some(ClientAckPacket {
5294        packet,
5295        packet_type,
5296        packet_number,
5297        packet_number_offset,
5298    }))
5299}
5300
5301#[cfg(test)]
5302mod receive_flow_control_tests {
5303    use super::*;
5304    use crate::fingerprint::QuicTransportParams;
5305
5306    // Build a `QuicTransportParams` that exposes small, easy-to-reason-about
5307    // initial limits so threshold/gating math is obvious. RFC 9000 Section 4
5308    // numbers the absolute MAX_DATA / MAX_STREAM_DATA value space; the
5309    // initial limits below are what the peer is presumed to know via the
5310    // transport parameter exchange (RFC 9000 Section 18.2).
5311    fn flow_control_params() -> QuicTransportParams {
5312        let mut params = QuicTransportParams::chrome();
5313        params.initial_max_data = 100;
5314        params.initial_max_stream_data_bidi_local = 40;
5315        params.initial_max_stream_data_bidi_remote = 40;
5316        params.initial_max_stream_data_uni = 40;
5317        params.max_connection_window = 100_000;
5318        params.max_stream_window = 100_000;
5319        params
5320    }
5321
5322    fn client_flow_control() -> QuicReceiveFlowControl {
5323        QuicReceiveFlowControl::client(&flow_control_params())
5324    }
5325
5326    // Stream 0 (client-bidi-local) consumes exactly N bytes; the absolute
5327    // MAX_STREAM_DATA we are willing to advertise is initial + N, never the
5328    // bytes seen on the wire, per RFC 9000 Section 4.1 ("a receiver
5329    // advertises a credit limit based on its progress on the application
5330    // protocol").
5331    #[test]
5332    fn record_stream_consumed_advertises_initial_plus_drained_per_stream() {
5333        let mut fc = client_flow_control();
5334        let stream_id = 0;
5335
5336        fc.record_stream_consumed(stream_id, 5)
5337            .expect("first drain");
5338        // 5 bytes < threshold (40 / 2 = 20), so no frame yet.
5339        assert!(fc.take_update_frames().is_empty());
5340
5341        fc.record_stream_consumed(stream_id, 20)
5342            .expect("threshold-crossing drain");
5343        // Total drained is 25 >= threshold; emit MAX_STREAM_DATA with the
5344        // absolute initial(40) + drained(25) = 65 value, not an arbitrary
5345        // receive-threshold ceiling.
5346        let frames = fc.take_update_frames();
5347        assert_eq!(
5348            frames,
5349            vec![QuicFrame::MaxStreamData {
5350                stream_id,
5351                max_stream_data: 65,
5352            }]
5353        );
5354
5355        // A further small drain below the next threshold delta keeps the
5356        // queue empty so we do not flood the wire with one frame per byte.
5357        fc.record_stream_consumed(stream_id, 5)
5358            .expect("small drain");
5359        assert!(fc.take_update_frames().is_empty());
5360
5361        // Cumulative drain (5 + 20 + 5 + 16 = 46) crosses the next
5362        // half-window delta relative to the previously announced 65, so the
5363        // emitted frame is the exact absolute initial(40) + drained(46) = 86,
5364        // not rounded up to a static receive-threshold ceiling.
5365        fc.record_stream_consumed(stream_id, 16)
5366            .expect("next threshold");
5367        let frames = fc.take_update_frames();
5368        assert_eq!(
5369            frames,
5370            vec![QuicFrame::MaxStreamData {
5371                stream_id,
5372                max_stream_data: 86,
5373            }]
5374        );
5375    }
5376
5377    // RFC 9000 Section 4.2: the connection-level MAX_DATA value is the
5378    // initial connection window plus the sum of bytes consumed across all
5379    // streams, with no double-counting between per-stream and connection
5380    // counters.
5381    #[test]
5382    fn record_stream_consumed_aggregates_connection_level_across_streams() {
5383        let mut fc = client_flow_control();
5384        // Two distinct client-bidi-local streams.
5385        let stream_a = 0;
5386        let stream_b = 4;
5387
5388        // 30 bytes on each stream: total 60 consumed. Connection threshold
5389        // is 100 / 2 = 50, so MAX_DATA fires; each stream is above its 20
5390        // stream-level threshold, so per-stream MAX_STREAM_DATA fires too.
5391        fc.record_stream_consumed(stream_a, 30).expect("a drains");
5392        fc.record_stream_consumed(stream_b, 30).expect("b drains");
5393        let mut frames = fc.take_update_frames();
5394        frames.sort_by_key(|frame| match frame {
5395            QuicFrame::MaxData(_) => 0u8,
5396            QuicFrame::MaxStreamData { stream_id, .. } => 1 + (*stream_id as u8 % 8),
5397            _ => 255,
5398        });
5399
5400        // initial_max_data(100) + connection_consumed(60) = 160 absolute.
5401        assert!(frames.contains(&QuicFrame::MaxData(160)));
5402        // Per-stream absolute = initial(40) + per-stream drained(30) = 70.
5403        assert!(frames.contains(&QuicFrame::MaxStreamData {
5404            stream_id: stream_a,
5405            max_stream_data: 70,
5406        }));
5407        assert!(frames.contains(&QuicFrame::MaxStreamData {
5408            stream_id: stream_b,
5409            max_stream_data: 70,
5410        }));
5411    }
5412
5413    // RFC 9000 Section 19.9/19.10 forbid emitting frames for every byte
5414    // drained; we gate emission on a half-initial-window delta but the
5415    // absolute value still comes from the consumed counter.
5416    #[test]
5417    fn threshold_gates_emit_but_does_not_round_absolute_value() {
5418        let mut fc = client_flow_control();
5419        let stream_id = 0;
5420
5421        // Many tiny drains adding up to just below the half-window
5422        // threshold do not produce a frame.
5423        for _ in 0..19 {
5424            fc.record_stream_consumed(stream_id, 1).expect("tiny drain");
5425            assert!(fc.take_update_frames().is_empty());
5426        }
5427
5428        // The very next byte crosses the 20-byte half-initial-window
5429        // threshold and emits exactly initial(40) + drained(20) = 60.
5430        fc.record_stream_consumed(stream_id, 1)
5431            .expect("crossing drain");
5432        let frames = fc.take_update_frames();
5433        assert_eq!(
5434            frames,
5435            vec![QuicFrame::MaxStreamData {
5436                stream_id,
5437                max_stream_data: 60,
5438            }]
5439        );
5440    }
5441
5442    // Stream completion must release per-stream bookkeeping cleanly. The
5443    // connection-level counter is monotonic across stream lifetimes
5444    // (RFC 9000 Section 4.1) so completed streams must not be double-counted
5445    // into the next stream's absolute value.
5446    #[test]
5447    fn release_stream_drops_per_stream_state_without_double_counting_connection() {
5448        let mut fc = client_flow_control();
5449        let stream_a = 0;
5450        let stream_b = 4;
5451
5452        fc.record_stream_consumed(stream_a, 40)
5453            .expect("a fully drains");
5454        let _ = fc.take_update_frames();
5455        // Stream A retires.
5456        fc.release_stream(stream_a);
5457        assert!(
5458            !fc.stream_consumed.contains_key(&stream_a),
5459            "release_stream must clear per-stream consumed bookkeeping"
5460        );
5461        assert!(
5462            !fc.last_announced_max_stream_data.contains_key(&stream_a),
5463            "release_stream must clear per-stream announced bookkeeping"
5464        );
5465
5466        // Stream B drains fresh; the connection counter still reflects
5467        // 40 (from A) + 30 (from B) = 70, not 30 alone, and not 110.
5468        fc.record_stream_consumed(stream_b, 30)
5469            .expect("b drains after a retired");
5470        let frames = fc.take_update_frames();
5471        // 70 >= connection threshold (50): emit MAX_DATA with absolute
5472        // initial(100) + connection_consumed(70) = 170.
5473        assert!(frames.contains(&QuicFrame::MaxData(170)));
5474        // Per-stream B is still ahead of its threshold and emits
5475        // initial(40) + per-stream(30) = 70.
5476        assert!(frames.contains(&QuicFrame::MaxStreamData {
5477            stream_id: stream_b,
5478            max_stream_data: 70,
5479        }));
5480    }
5481
5482    // RFC 9000 Section 4.1: violations of an advertised limit MUST cause a
5483    // FLOW_CONTROL_ERROR. The receive-side enforcement still uses the
5484    // advertised window, which now grows from the consumed counter; once we
5485    // advertise more, the peer is allowed to send up to that new limit.
5486    #[test]
5487    fn observe_stream_frame_uses_advertised_limit_after_consume_grows_window() {
5488        let mut fc = client_flow_control();
5489        let stream_id = 0;
5490
5491        // Without any consumption, the initial 40-byte stream window is
5492        // enforced.
5493        let too_much = fc.observe_stream_frame(stream_id, Some(0), 41);
5494        assert!(too_much.is_err(), "must reject data above initial limit");
5495
5496        // Drain 40 bytes to push absolute to 80; emit and clear frames.
5497        fc.observe_stream_frame(stream_id, Some(0), 40)
5498            .expect("fill initial window");
5499        fc.record_stream_consumed(stream_id, 40)
5500            .expect("drain initial window");
5501        let _ = fc.take_update_frames();
5502
5503        // 41 more bytes (offsets 40..81) now fit under the new 80 absolute.
5504        fc.observe_stream_frame(stream_id, Some(40), 40)
5505            .expect("data within newly advertised window");
5506    }
5507}