1use std::collections::BTreeMap;
4use std::net::SocketAddr;
5use std::time::{Duration, Instant};
6
7use bytes::{Bytes, BytesMut};
8
9use crate::error::{Error, Result};
10use crate::fingerprint::{Http3Fingerprint, QuicTransportParams, TlsFingerprint};
11use crate::headers::Headers;
12use crate::transport::h3::native;
13use crate::transport::h3::path::{match_local_connection_id, QuicConnectionIdInventory};
14use crate::transport::h3::quic::{
15 build_initial_crypto_packet, decode_frames, decode_long_header, decode_transport_parameters,
16 decode_version_negotiation_packet, derive_initial_key_material,
17 derive_next_packet_key_material, encode_frame, encode_long_header, open_long_header_packet,
18 open_short_header_packet, protect_long_header_packet, protect_short_header_packet,
19 split_long_header_datagram, validate_retry_integrity_tag_v1, ConnectionId, LongHeaderPacket,
20 LongHeaderType, OpenedShortHeaderPacket, QuicAckTracker, QuicCloseState, QuicCryptoAssembler,
21 QuicEcnMark, QuicFrame, QuicLossDetector, QuicPacketKeyMaterial, QuicPathValidator,
22 QuicPmtuProbePolicy, TransportParameter,
23};
24use crate::transport::h3::recovery::{
25 LossDetectionOutcome, PacketNumberSpace, RecoveryState, SentPacketInfo,
26};
27use crate::transport::h3::tls::{
28 build_client_initial_packet_from_capture_with_size,
29 build_client_initial_packet_from_capture_with_version_and_size, ClientInitialPacket,
30 NativeH3HandshakeStatus, NativeH3SessionTicket, NativeQuicTlsSession, QuicEncryptionLevel,
31 QuicSecretDirection, QuicTlsSecret,
32};
33
34use getrandom::fill as getrandom_fill;
35
36const QUIC_VERSION_1: u32 = 1;
37const INITIAL_PACKET_NUMBER_LEN: usize = 4;
38const AES_GCM_TAG_LEN: usize = 16;
39const MIN_PATH_VALIDATION_DATAGRAM: usize = 1200;
40const QUIC_CONNECTION_MIGRATION_ERROR: u64 = 0x0a;
41
42fn new_server_cid_inventory(
43 fingerprint: &Http3Fingerprint,
44 server_source_cid: &ConnectionId,
45 client_source_cid: &ConnectionId,
46) -> QuicConnectionIdInventory {
47 let mut inventory =
48 QuicConnectionIdInventory::new(fingerprint.transport.active_connection_id_limit);
49 inventory.install_initial_local(server_source_cid.clone(), [0u8; 16]);
50 inventory.install_initial_peer(
51 Bytes::copy_from_slice(client_source_cid.as_bytes()),
52 [0u8; 16],
53 );
54 inventory
55}
56
57fn new_client_cid_inventory(
58 fingerprint: &Http3Fingerprint,
59 source_cid: &ConnectionId,
60) -> QuicConnectionIdInventory {
61 let mut inventory =
62 QuicConnectionIdInventory::new(fingerprint.transport.active_connection_id_limit);
63 inventory.install_initial_local(source_cid.clone(), [0u8; 16]);
64 inventory
65}
66
67fn pad_short_header_payload_to_min_datagram(
68 payload: Bytes,
69 short_header_len: usize,
70 min_datagram: usize,
71) -> Bytes {
72 let mut out = payload.to_vec();
73 while short_header_len + out.len() + AES_GCM_TAG_LEN < min_datagram {
74 out.push(0);
75 }
76 Bytes::from(out)
77}
78
79fn recovery_state_from_transport(params: &QuicTransportParams) -> RecoveryState {
80 let max_ack_delay = Duration::from_millis(params.max_ack_delay_ms);
81 let datagram = params.max_recv_udp_payload_size.max(1200) as u64;
82 RecoveryState::new(max_ack_delay, datagram)
83}
84
85fn observe_packet_with_ecn(
86 tracker: &mut QuicAckTracker,
87 packet_number: u64,
88 ecn_mark: Option<QuicEcnMark>,
89 now: Instant,
90) {
91 if let Some(mark) = ecn_mark {
92 tracker.observe_ecn_at(packet_number, mark, now);
93 } else {
94 tracker.observe_at(packet_number, now);
95 }
96}
97
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub struct ProcessedServerInitial {
100 pub packet_number: u64,
101 pub crypto_data: Bytes,
102 pub initial_crypto_out: Bytes,
103 pub handshake_crypto_out: Bytes,
104 pub secrets: Vec<QuicTlsSecret>,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct ClientHandshakePacket {
109 pub packet: Bytes,
110 pub packet_number: u64,
111 pub packet_number_offset: usize,
112 pub crypto_data: Bytes,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct ServerHandshakePacket {
117 pub packet: Bytes,
118 pub packet_type: LongHeaderType,
119 pub packet_number: u64,
120 pub packet_number_offset: usize,
121 pub crypto_data: Bytes,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct ServerHandshakeFlight {
126 pub datagram: Bytes,
127 pub packets: Vec<ServerHandshakePacket>,
128 pub secrets: Vec<QuicTlsSecret>,
129}
130
131#[derive(Debug, Clone, PartialEq, Eq)]
132pub struct ProcessedClientHandshake {
133 pub packet_number: u64,
134 pub crypto_data: Bytes,
135 pub secrets: Vec<QuicTlsSecret>,
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub struct ClientAckPacket {
140 pub packet: Bytes,
141 pub packet_type: LongHeaderType,
142 pub packet_number: u64,
143 pub packet_number_offset: usize,
144}
145
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct ClientApplicationPacket {
148 pub packet: Bytes,
149 pub packet_number: u64,
150 pub stream_id: u64,
151 pub packet_number_offset: usize,
152 pub data: Bytes,
153}
154
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct ServerApplicationPacket {
157 pub packet: Bytes,
158 pub packet_number: u64,
159 pub stream_id: u64,
160 pub packet_number_offset: usize,
161 pub data: Bytes,
162}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub struct ServerApplicationAckPacket {
166 pub packet: Bytes,
167 pub packet_number: u64,
168 pub packet_number_offset: usize,
169}
170
171#[derive(Debug, Clone, PartialEq, Eq)]
172pub struct ServerApplicationControlPacket {
173 pub packet: Bytes,
174 pub packet_number: u64,
175 pub packet_number_offset: usize,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub struct ClientApplicationAckPacket {
180 pub packet: Bytes,
181 pub packet_number: u64,
182 pub packet_number_offset: usize,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct ClientApplicationControlPacket {
187 pub packet: Bytes,
188 pub packet_number: u64,
189 pub packet_number_offset: usize,
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
193pub struct ServerH3StreamEvent {
194 pub stream_id: u64,
195 pub stream_type: Option<native::H3StreamType>,
196 pub fin: bool,
197 pub frames: Vec<native::H3Frame>,
198}
199
200#[derive(Debug, Clone, PartialEq, Eq)]
201pub struct ClientH3StreamEvent {
202 pub stream_id: u64,
203 pub stream_type: Option<native::H3StreamType>,
204 pub fin: bool,
205 pub frames: Vec<native::H3Frame>,
206}
207
208#[derive(Debug, Clone, PartialEq, Eq)]
209pub enum ClientH3Event {
210 Stream(ClientH3StreamEvent),
211 ResetStream {
212 stream_id: u64,
213 error_code: u64,
214 final_size: u64,
215 },
216 StopSending {
217 stream_id: u64,
218 error_code: u64,
219 },
220 ConnectionClose {
221 error_code: u64,
222 frame_type: Option<u64>,
223 reason: Bytes,
224 },
225 PathChallenge([u8; 8]),
226}
227
228#[derive(Debug, Clone, PartialEq, Eq)]
229pub enum ServerH3Event {
230 Stream(ServerH3StreamEvent),
231 ResetStream {
232 stream_id: u64,
233 error_code: u64,
234 final_size: u64,
235 },
236 StopSending {
237 stream_id: u64,
238 error_code: u64,
239 },
240 ConnectionClose {
241 error_code: u64,
242 frame_type: Option<u64>,
243 reason: Bytes,
244 },
245 PathChallenge([u8; 8]),
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249struct SentApplicationStreamPacket {
250 stream_id: u64,
251 stream_offset: u64,
252 fin: bool,
253 data: Bytes,
254}
255
256#[derive(Debug, Clone, PartialEq, Eq)]
260struct PreviousKeys {
261 keys: QuicPacketKeyMaterial,
262 phase: bool,
263 retire_at: Instant,
264}
265
266const PREVIOUS_KEY_WINDOW: Duration = Duration::from_secs(3);
271
272#[derive(Debug, Clone, PartialEq, Eq, Default)]
276struct OneRttKeyUpdate {
277 write_update_in_progress: bool,
278 write_update_anchor: Option<u64>,
279}
280
281impl OneRttKeyUpdate {
282 fn note_packet_acked(&mut self, packet_number: u64) {
283 if let Some(anchor) = self.write_update_anchor {
284 if packet_number >= anchor {
285 self.write_update_in_progress = false;
286 self.write_update_anchor = None;
287 }
288 }
289 }
290}
291
292#[derive(Debug, Clone, Copy, PartialEq, Eq)]
296enum OneRttOpenOutcome {
297 Current,
298 Previous,
299 Next,
300}
301
302#[derive(Debug, Clone, PartialEq, Eq)]
303struct OneRttOpenedPacket {
304 opened: OpenedShortHeaderPacket,
305 outcome: OneRttOpenOutcome,
306}
307
308#[allow(clippy::too_many_arguments)]
309fn try_open_one_rtt_packet(
310 current: &QuicPacketKeyMaterial,
311 next: Option<&QuicPacketKeyMaterial>,
312 previous: Option<&PreviousKeys>,
313 expected_read_phase: bool,
314 now: Instant,
315 packet: &[u8],
316 destination_cid_len: usize,
317 expected_packet_number: u64,
318) -> Result<OneRttOpenedPacket> {
319 if let Ok(opened) =
320 open_short_header_packet(current, packet, destination_cid_len, expected_packet_number)
321 {
322 if opened.key_phase == expected_read_phase {
323 return Ok(OneRttOpenedPacket {
324 opened,
325 outcome: OneRttOpenOutcome::Current,
326 });
327 }
328 }
329
330 if let Some(previous) = previous {
331 if previous.retire_at > now {
332 if let Ok(opened) = open_short_header_packet(
333 &previous.keys,
334 packet,
335 destination_cid_len,
336 expected_packet_number,
337 ) {
338 if opened.key_phase == previous.phase {
339 return Ok(OneRttOpenedPacket {
340 opened,
341 outcome: OneRttOpenOutcome::Previous,
342 });
343 }
344 }
345 }
346 }
347
348 if let Some(next) = next {
349 let expected_next_phase = !expected_read_phase;
350 if let Ok(opened) =
351 open_short_header_packet(next, packet, destination_cid_len, expected_packet_number)
352 {
353 if opened.key_phase == expected_next_phase {
354 return Ok(OneRttOpenedPacket {
355 opened,
356 outcome: OneRttOpenOutcome::Next,
357 });
358 }
359 }
360 }
361
362 Err(Error::Quic(
363 "native QUIC 1-RTT short-header packet could not be decrypted with current, previous, or next phase keys".into(),
364 ))
365}
366
367#[derive(Debug, Clone, PartialEq, Eq)]
368struct SentCryptoPacket {
369 packet_type: LongHeaderType,
370 crypto_offset: u64,
371 crypto_data: Bytes,
372}
373
374#[derive(Debug, Clone, PartialEq, Eq)]
375struct QuicApplicationFlowControl {
376 local_initiator: u64,
377 max_data: u64,
378 sent_data: u64,
379 initial_max_stream_data_bidi_local: u64,
380 initial_max_stream_data_bidi_remote: u64,
381 initial_max_stream_data_uni: u64,
382 initial_max_streams_bidi: u64,
383 initial_max_streams_uni: u64,
384 stream_sent: BTreeMap<u64, u64>,
385 stream_data_overrides: BTreeMap<u64, u64>,
386 last_blocked: Option<QuicFrame>,
387}
388
389impl QuicApplicationFlowControl {
390 fn client(peer_transport: &QuicTransportParams) -> Self {
391 Self::new(0, peer_transport)
392 }
393
394 fn server(peer_transport: &QuicTransportParams) -> Self {
395 Self::new(1, peer_transport)
396 }
397
398 fn new(local_initiator: u64, peer_transport: &QuicTransportParams) -> Self {
399 Self {
400 local_initiator,
401 max_data: peer_transport.initial_max_data,
402 sent_data: 0,
403 initial_max_stream_data_bidi_local: peer_transport.initial_max_stream_data_bidi_local,
404 initial_max_stream_data_bidi_remote: peer_transport.initial_max_stream_data_bidi_remote,
405 initial_max_stream_data_uni: peer_transport.initial_max_stream_data_uni,
406 initial_max_streams_bidi: peer_transport.initial_max_streams_bidi,
407 initial_max_streams_uni: peer_transport.initial_max_streams_uni,
408 stream_sent: BTreeMap::new(),
409 stream_data_overrides: BTreeMap::new(),
410 last_blocked: None,
411 }
412 }
413
414 fn apply_max_data(&mut self, max_data: u64) {
415 self.max_data = self.max_data.max(max_data);
416 }
417
418 fn apply_max_stream_data(&mut self, stream_id: u64, max_stream_data: u64) {
419 self.stream_data_overrides
420 .entry(stream_id)
421 .and_modify(|current| *current = (*current).max(max_stream_data))
422 .or_insert(max_stream_data);
423 }
424
425 fn apply_max_streams(&mut self, bidirectional: bool, max_streams: u64) {
426 if bidirectional {
427 self.initial_max_streams_bidi = self.initial_max_streams_bidi.max(max_streams);
428 } else {
429 self.initial_max_streams_uni = self.initial_max_streams_uni.max(max_streams);
430 }
431 }
432
433 fn take_blocked_frame(&mut self) -> Option<QuicFrame> {
434 self.last_blocked.take()
435 }
436
437 fn consume_stream_data(
438 &mut self,
439 stream_id: u64,
440 stream_offset: u64,
441 len: usize,
442 ) -> Result<()> {
443 let stream_limit = self.stream_data_limit(stream_id)?;
444 let data_end = stream_offset
445 .checked_add(len as u64)
446 .ok_or_else(|| Error::HttpProtocol("QUIC flow control range overflow".into()))?;
447 if data_end > stream_limit {
448 self.last_blocked = Some(QuicFrame::StreamDataBlocked {
449 stream_id,
450 maximum_stream_data: stream_limit,
451 });
452 return Err(Error::Quic(format!(
453 "native H3 flow control blocked stream {stream_id}: end offset {data_end} exceeds peer stream limit {stream_limit}"
454 )));
455 }
456
457 let previous_stream_sent = *self.stream_sent.get(&stream_id).unwrap_or(&0);
458 let new_connection_bytes = data_end.saturating_sub(previous_stream_sent);
459 let next_sent_data = self
460 .sent_data
461 .checked_add(new_connection_bytes)
462 .ok_or_else(|| Error::HttpProtocol("QUIC flow control data overflow".into()))?;
463 if next_sent_data > self.max_data {
464 self.last_blocked = Some(QuicFrame::DataBlocked {
465 maximum_data: self.max_data,
466 });
467 return Err(Error::Quic(format!(
468 "native H3 flow control blocked stream {stream_id}: connection data {next_sent_data} exceeds peer connection limit {}",
469 self.max_data
470 )));
471 }
472
473 self.sent_data = next_sent_data;
474 self.stream_sent
475 .insert(stream_id, previous_stream_sent.max(data_end));
476 self.last_blocked = None;
477 Ok(())
478 }
479
480 fn stream_data_limit(&mut self, stream_id: u64) -> Result<u64> {
481 let initial_limit = if is_bidirectional_stream(stream_id) {
482 if stream_initiator(stream_id) == self.local_initiator {
483 self.ensure_stream_count(
484 stream_id,
485 self.initial_max_streams_bidi,
486 "bidirectional",
487 )?;
488 self.initial_max_stream_data_bidi_remote
489 } else {
490 self.initial_max_stream_data_bidi_local
491 }
492 } else {
493 if stream_initiator(stream_id) != self.local_initiator {
494 return Err(Error::Quic(format!(
495 "native H3 flow control cannot send on peer-initiated unidirectional stream {stream_id}"
496 )));
497 }
498 self.ensure_stream_count(stream_id, self.initial_max_streams_uni, "unidirectional")?;
499 self.initial_max_stream_data_uni
500 };
501 Ok(self
502 .stream_data_overrides
503 .get(&stream_id)
504 .copied()
505 .unwrap_or(initial_limit)
506 .max(initial_limit))
507 }
508
509 fn ensure_stream_count(&mut self, stream_id: u64, max_streams: u64, label: &str) -> Result<()> {
510 let required_stream_count = (stream_id >> 2) + 1;
511 if required_stream_count > max_streams {
512 self.last_blocked = Some(QuicFrame::StreamsBlocked {
513 bidirectional: label == "bidirectional",
514 maximum_streams: max_streams,
515 });
516 return Err(Error::Quic(format!(
517 "native H3 flow control blocked stream {stream_id}: opening {required_stream_count} {label} streams exceeds peer limit {max_streams}"
518 )));
519 }
520 Ok(())
521 }
522}
523
524#[derive(Debug, Clone, PartialEq, Eq)]
540struct QuicReceiveFlowControl {
541 local_initiator: u64,
542 initial_max_data: u64,
543 max_data: u64,
544 max_connection_window: u64,
545 received_data: u64,
546 initial_max_stream_data_bidi_local: u64,
547 initial_max_stream_data_bidi_remote: u64,
548 initial_max_stream_data_uni: u64,
549 max_stream_window: u64,
550 stream_received: BTreeMap<u64, u64>,
551 stream_data_overrides: BTreeMap<u64, u64>,
552 connection_consumed: u64,
553 stream_consumed: BTreeMap<u64, u64>,
554 last_announced_max_data: u64,
555 last_announced_max_stream_data: BTreeMap<u64, u64>,
556 pending_max_data: Option<u64>,
557 pending_max_stream_data: BTreeMap<u64, u64>,
558 connection_update_threshold: u64,
559}
560
561impl QuicReceiveFlowControl {
562 fn client(local_transport: &QuicTransportParams) -> Self {
563 Self::new(0, local_transport)
564 }
565
566 fn server(local_transport: &QuicTransportParams) -> Self {
567 Self::new(1, local_transport)
568 }
569
570 fn new(local_initiator: u64, local_transport: &QuicTransportParams) -> Self {
571 let initial_max_data = local_transport.initial_max_data;
572 let max_connection_window = local_transport.max_connection_window.max(initial_max_data);
573 let connection_update_threshold = (initial_max_data / 2).max(1);
579 Self {
580 local_initiator,
581 initial_max_data,
582 max_data: initial_max_data,
583 max_connection_window,
584 received_data: 0,
585 initial_max_stream_data_bidi_local: local_transport.initial_max_stream_data_bidi_local,
586 initial_max_stream_data_bidi_remote: local_transport
587 .initial_max_stream_data_bidi_remote,
588 initial_max_stream_data_uni: local_transport.initial_max_stream_data_uni,
589 max_stream_window: local_transport.max_stream_window,
590 stream_received: BTreeMap::new(),
591 stream_data_overrides: BTreeMap::new(),
592 connection_consumed: 0,
593 stream_consumed: BTreeMap::new(),
594 last_announced_max_data: initial_max_data,
595 last_announced_max_stream_data: BTreeMap::new(),
596 pending_max_data: None,
597 pending_max_stream_data: BTreeMap::new(),
598 connection_update_threshold,
599 }
600 }
601
602 fn observe_stream_frame(
603 &mut self,
604 stream_id: u64,
605 offset: Option<u64>,
606 len: usize,
607 ) -> Result<()> {
608 let stream_limit = self.stream_data_limit(stream_id)?;
609 let stream_offset = offset.unwrap_or(0);
610 let data_end = stream_offset.checked_add(len as u64).ok_or_else(|| {
611 Error::HttpProtocol("QUIC receive flow control range overflow".into())
612 })?;
613 if data_end > stream_limit {
614 return Err(Error::Quic(format!(
615 "native H3 receive flow control blocked stream {stream_id}: end offset {data_end} exceeds local stream limit {stream_limit}"
616 )));
617 }
618
619 let previous_stream_received = *self.stream_received.get(&stream_id).unwrap_or(&0);
620 let new_connection_bytes = data_end.saturating_sub(previous_stream_received);
621 let next_received_data = self
622 .received_data
623 .checked_add(new_connection_bytes)
624 .ok_or_else(|| Error::HttpProtocol("QUIC receive flow control data overflow".into()))?;
625 if next_received_data > self.max_data {
626 return Err(Error::Quic(format!(
627 "native H3 receive flow control blocked stream {stream_id}: connection data {next_received_data} exceeds local connection limit {}",
628 self.max_data
629 )));
630 }
631
632 self.received_data = next_received_data;
633 self.stream_received
634 .insert(stream_id, previous_stream_received.max(data_end));
635 Ok(())
636 }
637
638 fn record_stream_consumed(&mut self, stream_id: u64, len: u64) -> Result<()> {
646 if len == 0 {
647 return Ok(());
648 }
649
650 let initial_stream_limit = self.initial_stream_data_limit(stream_id)?;
651 let stream_window = self.max_stream_window.max(initial_stream_limit);
652 let stream_threshold = (initial_stream_limit / 2).max(1);
653
654 let stream_total = self
655 .stream_consumed
656 .get(&stream_id)
657 .copied()
658 .unwrap_or(0)
659 .checked_add(len)
660 .ok_or_else(|| {
661 Error::HttpProtocol("QUIC receive flow control consumed overflow".into())
662 })?;
663 self.stream_consumed.insert(stream_id, stream_total);
664
665 self.connection_consumed = self.connection_consumed.checked_add(len).ok_or_else(|| {
666 Error::HttpProtocol("QUIC receive flow control connection consumed overflow".into())
667 })?;
668
669 let stream_announced = *self
670 .last_announced_max_stream_data
671 .get(&stream_id)
672 .unwrap_or(&initial_stream_limit);
673 let stream_absolute = initial_stream_limit
674 .saturating_add(stream_total)
675 .min(stream_window);
676 if stream_absolute > stream_announced
677 && stream_absolute - stream_announced >= stream_threshold
678 {
679 self.pending_max_stream_data
680 .insert(stream_id, stream_absolute);
681 self.stream_data_overrides
682 .insert(stream_id, stream_absolute);
683 self.last_announced_max_stream_data
684 .insert(stream_id, stream_absolute);
685 }
686
687 let connection_absolute = self
688 .initial_max_data
689 .saturating_add(self.connection_consumed)
690 .min(self.max_connection_window);
691 if connection_absolute > self.last_announced_max_data
692 && connection_absolute - self.last_announced_max_data
693 >= self.connection_update_threshold
694 {
695 self.pending_max_data = Some(connection_absolute);
696 self.max_data = connection_absolute;
697 self.last_announced_max_data = connection_absolute;
698 }
699 Ok(())
700 }
701
702 fn release_stream(&mut self, stream_id: u64) {
707 self.stream_consumed.remove(&stream_id);
708 self.last_announced_max_stream_data.remove(&stream_id);
709 self.pending_max_stream_data.remove(&stream_id);
710 self.stream_received.remove(&stream_id);
711 self.stream_data_overrides.remove(&stream_id);
712 }
713
714 fn take_update_frames(&mut self) -> Vec<QuicFrame> {
715 let mut frames = Vec::new();
716 if let Some(max_data) = self.pending_max_data.take() {
717 frames.push(QuicFrame::MaxData(max_data));
718 }
719 frames.extend(
720 std::mem::take(&mut self.pending_max_stream_data)
721 .into_iter()
722 .map(|(stream_id, max_stream_data)| QuicFrame::MaxStreamData {
723 stream_id,
724 max_stream_data,
725 }),
726 );
727 frames
728 }
729
730 fn stream_data_limit(&self, stream_id: u64) -> Result<u64> {
731 if let Some(max_stream_data) = self.stream_data_overrides.get(&stream_id) {
732 return Ok(*max_stream_data);
733 }
734 self.initial_stream_data_limit(stream_id)
735 }
736
737 fn initial_stream_data_limit(&self, stream_id: u64) -> Result<u64> {
738 if is_bidirectional_stream(stream_id) {
739 if stream_initiator(stream_id) == self.local_initiator {
740 Ok(self.initial_max_stream_data_bidi_local)
741 } else {
742 Ok(self.initial_max_stream_data_bidi_remote)
743 }
744 } else if stream_initiator(stream_id) == self.local_initiator {
745 Err(Error::Quic(format!(
746 "native H3 receive flow control cannot receive on local unidirectional stream {stream_id}"
747 )))
748 } else {
749 Ok(self.initial_max_stream_data_uni)
750 }
751 }
752}
753
754pub struct NativeQuicHandshake {
755 client_initial: ClientInitialPacket,
756 pending_client_initial: Option<ClientInitialPacket>,
757 tls: NativeQuicTlsSession,
758 fingerprint: Http3Fingerprint,
759 server_name: String,
760 tls_fingerprint: Option<TlsFingerprint>,
761 verify_peer: bool,
762 root_certs: Vec<Vec<u8>>,
763 use_platform_roots: bool,
764 supported_versions: Vec<u32>,
765 client_initial_version: u32,
766 retry_received: bool,
767 vn_received: bool,
768 server_initial_or_handshake_seen: bool,
769 original_destination_cid: ConnectionId,
770 retry_source_cid: Option<ConnectionId>,
771 destination_cid: ConnectionId,
772 source_cid: ConnectionId,
773 client_initial_keys: QuicPacketKeyMaterial,
774 server_initial_keys: QuicPacketKeyMaterial,
775 client_handshake_keys: Option<QuicPacketKeyMaterial>,
776 client_early_data_keys: Option<QuicPacketKeyMaterial>,
777 server_handshake_keys: Option<QuicPacketKeyMaterial>,
778 client_application_keys: Option<QuicPacketKeyMaterial>,
779 server_application_keys: Option<QuicPacketKeyMaterial>,
780 client_application_next_keys: Option<QuicPacketKeyMaterial>,
781 server_application_next_keys: Option<QuicPacketKeyMaterial>,
782 server_application_previous: Option<PreviousKeys>,
783 write_key_phase: bool,
784 read_key_phase: bool,
785 application_key_update: OneRttKeyUpdate,
786 initial_crypto: QuicCryptoAssembler,
787 handshake_crypto: QuicCryptoAssembler,
788 initial_ack_tracker: QuicAckTracker,
789 handshake_ack_tracker: QuicAckTracker,
790 application_ack_tracker: QuicAckTracker,
791 client_initial_loss_detector: QuicLossDetector,
792 client_handshake_loss_detector: QuicLossDetector,
793 client_application_loss_detector: QuicLossDetector,
794 client_application_flow_control: QuicApplicationFlowControl,
795 client_application_receive_flow_control: QuicReceiveFlowControl,
796 client_initial_sent_crypto: BTreeMap<u64, SentCryptoPacket>,
797 client_handshake_sent_crypto: BTreeMap<u64, SentCryptoPacket>,
798 client_application_sent_streams: BTreeMap<u64, SentApplicationStreamPacket>,
799 client_application_recovery_lost_packets: Vec<u64>,
800 client_application_ecn_congestion: bool,
801 client_path_validator: QuicPathValidator,
802 client_cid_inventory: QuicConnectionIdInventory,
803 client_pmtu_probe: QuicPmtuProbePolicy,
804 server_transport_parameters_validated: bool,
805 recovery: RecoveryState,
806 next_client_initial_packet_number: u64,
807 next_server_initial_packet_number: u64,
808 next_server_handshake_packet_number: u64,
809 next_client_handshake_packet_number: u64,
810 next_server_application_packet_number: u64,
811 next_client_application_packet_number: u64,
812 next_client_bidirectional_stream_id: u64,
813 next_client_unidirectional_stream_id: u64,
814 client_handshake_crypto_offset: u64,
815 client_stream_offsets: BTreeMap<u64, u64>,
816 server_h3_stream_buffers: BTreeMap<u64, BytesMut>,
817 server_h3_stream_buffer_offsets: BTreeMap<u64, u64>,
818 server_h3_stream_types: BTreeMap<u64, native::H3StreamType>,
819 close_draining: bool,
820 close_state: QuicCloseState,
821}
822
823pub struct NativeQuicServerHandshake {
824 tls: NativeQuicTlsSession,
825 client_source_cid: ConnectionId,
826 server_source_cid: ConnectionId,
827 client_initial_keys: QuicPacketKeyMaterial,
828 server_initial_keys: QuicPacketKeyMaterial,
829 client_handshake_keys: Option<QuicPacketKeyMaterial>,
830 client_early_data_keys: Option<QuicPacketKeyMaterial>,
831 server_handshake_keys: Option<QuicPacketKeyMaterial>,
832 client_initial_crypto: QuicCryptoAssembler,
833 client_handshake_crypto: QuicCryptoAssembler,
834 client_initial_ack_tracker: QuicAckTracker,
835 client_handshake_ack_tracker: QuicAckTracker,
836 client_application_ack_tracker: QuicAckTracker,
837 server_initial_loss_detector: QuicLossDetector,
838 server_handshake_loss_detector: QuicLossDetector,
839 server_application_loss_detector: QuicLossDetector,
840 server_application_flow_control: QuicApplicationFlowControl,
841 server_application_receive_flow_control: QuicReceiveFlowControl,
842 server_application_sent_streams: BTreeMap<u64, SentApplicationStreamPacket>,
843 server_application_recovery_lost_packets: Vec<u64>,
844 server_initial_sent_crypto: BTreeMap<u64, SentCryptoPacket>,
845 server_handshake_sent_crypto: BTreeMap<u64, SentCryptoPacket>,
846 server_path_validator: QuicPathValidator,
847 cid_inventory: QuicConnectionIdInventory,
848 recovery: RecoveryState,
849 ack_delay_exponent: u64,
850 next_client_initial_packet_number: u64,
851 next_client_handshake_packet_number: u64,
852 next_client_application_packet_number: u64,
853 next_server_initial_packet_number: u64,
854 next_server_handshake_packet_number: u64,
855 next_server_application_packet_number: u64,
856 next_server_unidirectional_stream_id: u64,
857 client_application_keys: Option<QuicPacketKeyMaterial>,
858 server_application_keys: Option<QuicPacketKeyMaterial>,
859 client_application_next_keys: Option<QuicPacketKeyMaterial>,
860 server_application_next_keys: Option<QuicPacketKeyMaterial>,
861 client_application_previous: Option<PreviousKeys>,
862 write_key_phase: bool,
863 read_key_phase: bool,
864 application_key_update: OneRttKeyUpdate,
865 server_initial_crypto_offset: u64,
866 server_handshake_crypto_offset: u64,
867 server_stream_offsets: BTreeMap<u64, u64>,
868 server_control_stream_id: Option<u64>,
869 client_h3_stream_buffers: BTreeMap<u64, BytesMut>,
870 client_h3_stream_buffer_offsets: BTreeMap<u64, u64>,
871 client_h3_stream_types: BTreeMap<u64, native::H3StreamType>,
872 close_draining: bool,
873 close_state: QuicCloseState,
874}
875
876impl NativeQuicServerHandshake {
877 pub fn new(
878 fingerprint: &Http3Fingerprint,
879 cert_pem: &[u8],
880 key_pem: &[u8],
881 client_destination_cid: ConnectionId,
882 client_source_cid: ConnectionId,
883 server_source_cid: ConnectionId,
884 ) -> Result<Self> {
885 let initial_keys = derive_initial_key_material(client_destination_cid.as_bytes())?;
886 let cid_inventory =
887 new_server_cid_inventory(fingerprint, &server_source_cid, &client_source_cid);
888 Ok(Self {
889 tls: NativeQuicTlsSession::server_with_connection_ids(
890 fingerprint,
891 cert_pem,
892 key_pem,
893 &client_destination_cid,
894 &server_source_cid,
895 )?,
896 client_source_cid,
897 server_source_cid,
898 client_initial_keys: initial_keys.client,
899 server_initial_keys: initial_keys.server,
900 client_handshake_keys: None,
901 client_early_data_keys: None,
902 server_handshake_keys: None,
903 client_initial_crypto: QuicCryptoAssembler::default(),
904 client_handshake_crypto: QuicCryptoAssembler::default(),
905 client_initial_ack_tracker: QuicAckTracker::default(),
906 client_handshake_ack_tracker: QuicAckTracker::default(),
907 client_application_ack_tracker: QuicAckTracker::default(),
908 server_initial_loss_detector: QuicLossDetector::default(),
909 server_handshake_loss_detector: QuicLossDetector::default(),
910 server_application_loss_detector: QuicLossDetector::default(),
911 server_application_flow_control: QuicApplicationFlowControl::server(
912 &fingerprint.transport,
913 ),
914 server_application_receive_flow_control: QuicReceiveFlowControl::server(
915 &fingerprint.transport,
916 ),
917 server_application_sent_streams: BTreeMap::new(),
918 server_application_recovery_lost_packets: Vec::new(),
919 server_initial_sent_crypto: BTreeMap::new(),
920 server_handshake_sent_crypto: BTreeMap::new(),
921 server_path_validator: QuicPathValidator::default(),
922 cid_inventory,
923 next_client_initial_packet_number: 0,
924 next_client_handshake_packet_number: 0,
925 next_client_application_packet_number: 0,
926 next_server_initial_packet_number: 0,
927 next_server_handshake_packet_number: 0,
928 next_server_application_packet_number: 0,
929 next_server_unidirectional_stream_id: 3,
930 client_application_keys: None,
931 server_application_keys: None,
932 client_application_next_keys: None,
933 server_application_next_keys: None,
934 client_application_previous: None,
935 write_key_phase: false,
936 read_key_phase: false,
937 application_key_update: OneRttKeyUpdate::default(),
938 server_initial_crypto_offset: 0,
939 server_handshake_crypto_offset: 0,
940 server_stream_offsets: BTreeMap::new(),
941 server_control_stream_id: None,
942 client_h3_stream_buffers: BTreeMap::new(),
943 client_h3_stream_buffer_offsets: BTreeMap::new(),
944 client_h3_stream_types: BTreeMap::new(),
945 close_draining: false,
946 close_state: QuicCloseState::default(),
947 recovery: recovery_state_from_transport(&fingerprint.transport),
948 ack_delay_exponent: fingerprint.transport.ack_delay_exponent,
949 })
950 }
951
952 pub fn new_with_ticket_keys(
953 fingerprint: &Http3Fingerprint,
954 cert_pem: &[u8],
955 key_pem: &[u8],
956 client_destination_cid: ConnectionId,
957 client_source_cid: ConnectionId,
958 server_source_cid: ConnectionId,
959 ticket_keys: &[u8; crate::transport::h3::tls::NATIVE_H3_TICKET_KEY_LEN],
960 ) -> Result<Self> {
961 let initial_keys = derive_initial_key_material(client_destination_cid.as_bytes())?;
962 let cid_inventory =
963 new_server_cid_inventory(fingerprint, &server_source_cid, &client_source_cid);
964 Ok(Self {
965 tls: NativeQuicTlsSession::server_with_connection_ids_and_ticket_keys(
966 fingerprint,
967 cert_pem,
968 key_pem,
969 &client_destination_cid,
970 &server_source_cid,
971 ticket_keys,
972 )?,
973 client_source_cid,
974 server_source_cid,
975 client_initial_keys: initial_keys.client,
976 server_initial_keys: initial_keys.server,
977 client_handshake_keys: None,
978 client_early_data_keys: None,
979 server_handshake_keys: None,
980 client_application_keys: None,
981 server_application_keys: None,
982 client_application_next_keys: None,
983 server_application_next_keys: None,
984 client_application_previous: None,
985 write_key_phase: false,
986 read_key_phase: false,
987 application_key_update: OneRttKeyUpdate::default(),
988 server_initial_crypto_offset: 0,
989 server_handshake_crypto_offset: 0,
990 server_stream_offsets: BTreeMap::new(),
991 server_control_stream_id: None,
992 client_h3_stream_buffers: BTreeMap::new(),
993 client_h3_stream_buffer_offsets: BTreeMap::new(),
994 client_h3_stream_types: BTreeMap::new(),
995 close_draining: false,
996 close_state: QuicCloseState::default(),
997 client_initial_crypto: QuicCryptoAssembler::default(),
998 client_handshake_crypto: QuicCryptoAssembler::default(),
999 client_initial_ack_tracker: QuicAckTracker::default(),
1000 client_handshake_ack_tracker: QuicAckTracker::default(),
1001 client_application_ack_tracker: QuicAckTracker::default(),
1002 server_initial_loss_detector: QuicLossDetector::default(),
1003 server_handshake_loss_detector: QuicLossDetector::default(),
1004 server_application_loss_detector: QuicLossDetector::default(),
1005 server_application_flow_control: QuicApplicationFlowControl::server(
1006 &fingerprint.transport,
1007 ),
1008 server_application_receive_flow_control: QuicReceiveFlowControl::server(
1009 &fingerprint.transport,
1010 ),
1011 server_application_sent_streams: BTreeMap::new(),
1012 server_application_recovery_lost_packets: Vec::new(),
1013 server_initial_sent_crypto: BTreeMap::new(),
1014 server_handshake_sent_crypto: BTreeMap::new(),
1015 server_path_validator: QuicPathValidator::default(),
1016 cid_inventory,
1017 next_client_initial_packet_number: 0,
1018 next_client_handshake_packet_number: 0,
1019 next_client_application_packet_number: 0,
1020 next_server_initial_packet_number: 0,
1021 next_server_handshake_packet_number: 0,
1022 next_server_application_packet_number: 0,
1023 next_server_unidirectional_stream_id: 3,
1024 recovery: recovery_state_from_transport(&fingerprint.transport),
1025 ack_delay_exponent: fingerprint.transport.ack_delay_exponent,
1026 })
1027 }
1028
1029 #[allow(clippy::too_many_arguments)]
1030 pub fn new_with_transport_parameter_connection_ids(
1031 fingerprint: &Http3Fingerprint,
1032 cert_pem: &[u8],
1033 key_pem: &[u8],
1034 client_destination_cid: ConnectionId,
1035 client_source_cid: ConnectionId,
1036 server_source_cid: ConnectionId,
1037 transport_original_destination_cid: ConnectionId,
1038 transport_initial_source_cid: ConnectionId,
1039 transport_retry_source_cid: Option<ConnectionId>,
1040 ) -> Result<Self> {
1041 let initial_keys = derive_initial_key_material(client_destination_cid.as_bytes())?;
1042 let cid_inventory =
1043 new_server_cid_inventory(fingerprint, &server_source_cid, &client_source_cid);
1044 Ok(Self {
1045 tls: NativeQuicTlsSession::server_with_transport_parameter_connection_ids(
1046 fingerprint,
1047 cert_pem,
1048 key_pem,
1049 &transport_original_destination_cid,
1050 &transport_initial_source_cid,
1051 transport_retry_source_cid.as_ref(),
1052 )?,
1053 client_source_cid,
1054 server_source_cid,
1055 client_initial_keys: initial_keys.client,
1056 server_initial_keys: initial_keys.server,
1057 client_handshake_keys: None,
1058 client_early_data_keys: None,
1059 server_handshake_keys: None,
1060 client_initial_crypto: QuicCryptoAssembler::default(),
1061 client_handshake_crypto: QuicCryptoAssembler::default(),
1062 client_initial_ack_tracker: QuicAckTracker::default(),
1063 client_handshake_ack_tracker: QuicAckTracker::default(),
1064 client_application_ack_tracker: QuicAckTracker::default(),
1065 server_initial_loss_detector: QuicLossDetector::default(),
1066 server_handshake_loss_detector: QuicLossDetector::default(),
1067 server_application_loss_detector: QuicLossDetector::default(),
1068 server_application_flow_control: QuicApplicationFlowControl::server(
1069 &fingerprint.transport,
1070 ),
1071 server_application_receive_flow_control: QuicReceiveFlowControl::server(
1072 &fingerprint.transport,
1073 ),
1074 server_application_sent_streams: BTreeMap::new(),
1075 server_application_recovery_lost_packets: Vec::new(),
1076 server_initial_sent_crypto: BTreeMap::new(),
1077 server_handshake_sent_crypto: BTreeMap::new(),
1078 server_path_validator: QuicPathValidator::default(),
1079 cid_inventory,
1080 next_client_initial_packet_number: 0,
1081 next_client_handshake_packet_number: 0,
1082 next_client_application_packet_number: 0,
1083 next_server_initial_packet_number: 0,
1084 next_server_handshake_packet_number: 0,
1085 next_server_application_packet_number: 0,
1086 next_server_unidirectional_stream_id: 3,
1087 client_application_keys: None,
1088 server_application_keys: None,
1089 client_application_next_keys: None,
1090 server_application_next_keys: None,
1091 client_application_previous: None,
1092 write_key_phase: false,
1093 read_key_phase: false,
1094 application_key_update: OneRttKeyUpdate::default(),
1095 server_initial_crypto_offset: 0,
1096 server_handshake_crypto_offset: 0,
1097 server_stream_offsets: BTreeMap::new(),
1098 server_control_stream_id: None,
1099 client_h3_stream_buffers: BTreeMap::new(),
1100 client_h3_stream_buffer_offsets: BTreeMap::new(),
1101 client_h3_stream_types: BTreeMap::new(),
1102 close_draining: false,
1103 close_state: QuicCloseState::default(),
1104 recovery: recovery_state_from_transport(&fingerprint.transport),
1105 ack_delay_exponent: fingerprint.transport.ack_delay_exponent,
1106 })
1107 }
1108
1109 pub fn is_application_ready(&self) -> bool {
1110 self.client_application_keys.is_some() && self.server_application_keys.is_some()
1111 }
1112
1113 pub fn handshake_status(&self) -> NativeH3HandshakeStatus {
1118 self.tls.handshake_status()
1119 }
1120
1121 pub fn early_data_reason(&self) -> u32 {
1124 self.tls.early_data_reason()
1125 }
1126
1127 pub fn is_close_draining(&self) -> bool {
1128 self.close_draining
1129 }
1130
1131 pub fn close_state(&self) -> &QuicCloseState {
1132 &self.close_state
1133 }
1134
1135 pub fn close_state_mut(&mut self) -> &mut QuicCloseState {
1136 &mut self.close_state
1137 }
1138
1139 pub fn server_enter_closing(&mut self, now: Instant) {
1143 self.close_state.enter_closing(now);
1144 self.close_draining = true;
1145 }
1146
1147 pub fn server_enter_draining(&mut self, now: Instant) {
1151 self.close_state.enter_draining(now);
1152 self.close_draining = true;
1153 }
1154
1155 pub fn server_close_window(&self) -> Duration {
1159 self.server_application_loss_detector.close_window()
1160 }
1161
1162 pub fn server_is_close_window_expired(&self, now: Instant) -> bool {
1163 self.close_state.is_expired(now, self.server_close_window())
1164 }
1165
1166 pub fn server_close_time_until_expiry(&self, now: Instant) -> Option<Duration> {
1167 self.close_state
1168 .time_until_expiry(now, self.server_close_window())
1169 }
1170
1171 pub fn server_should_replay_connection_close(&self, now: Instant) -> bool {
1172 self.close_state.should_replay(now)
1173 }
1174
1175 pub fn server_mark_connection_close_replayed(&mut self, now: Instant) {
1176 self.close_state.mark_replayed(now);
1177 }
1178
1179 pub fn server_observe_inbound_packet_for_close(&mut self) -> u64 {
1180 self.close_state.observe_inbound_packet()
1181 }
1182
1183 pub fn server_application_pto(&self) -> Duration {
1184 self.server_application_loss_detector.current_pto()
1185 }
1186
1187 pub fn write_key_phase(&self) -> bool {
1190 self.write_key_phase
1191 }
1192
1193 pub fn read_key_phase(&self) -> bool {
1196 self.read_key_phase
1197 }
1198
1199 pub fn key_update_in_progress(&self) -> bool {
1202 self.application_key_update.write_update_in_progress
1203 }
1204
1205 pub fn force_key_update(&mut self) -> Result<()> {
1213 if self.application_key_update.write_update_in_progress {
1214 return Err(Error::Quic(
1215 "RFC9001 § 6.5: cannot initiate a new key update while a previous one is unconfirmed"
1216 .into(),
1217 ));
1218 }
1219 let next = self.server_application_next_keys.take().ok_or_else(|| {
1220 Error::Quic(
1221 "native QUIC server cannot force a key update before TLS application secrets are installed"
1222 .into(),
1223 )
1224 })?;
1225 self.server_application_keys = Some(next);
1226 let new_current = self
1227 .server_application_keys
1228 .as_ref()
1229 .expect("server application keys just installed");
1230 self.server_application_next_keys = Some(derive_next_packet_key_material(new_current)?);
1231 self.write_key_phase = !self.write_key_phase;
1232 self.application_key_update.write_update_in_progress = true;
1233 self.application_key_update.write_update_anchor =
1234 Some(self.next_server_application_packet_number);
1235 Ok(())
1236 }
1237
1238 fn commit_receive_key_update(&mut self, now: Instant) -> Result<()> {
1239 let Some(current) = self.client_application_keys.take() else {
1240 return Err(Error::Quic(
1241 "native QUIC server cannot rotate read keys without an installed current key set"
1242 .into(),
1243 ));
1244 };
1245 let Some(next) = self.client_application_next_keys.take() else {
1246 return Err(Error::Quic(
1247 "native QUIC server cannot rotate read keys without precomputed next key set"
1248 .into(),
1249 ));
1250 };
1251 let old_phase = self.read_key_phase;
1252 self.client_application_keys = Some(next);
1253 let new_current = self
1254 .client_application_keys
1255 .as_ref()
1256 .expect("client application keys just installed");
1257 self.client_application_next_keys = Some(derive_next_packet_key_material(new_current)?);
1258 self.client_application_previous = Some(PreviousKeys {
1259 keys: current,
1260 phase: old_phase,
1261 retire_at: now + PREVIOUS_KEY_WINDOW,
1262 });
1263 self.read_key_phase = !self.read_key_phase;
1264
1265 if self.write_key_phase != self.read_key_phase
1266 && !self.application_key_update.write_update_in_progress
1267 {
1268 let next_write = self.server_application_next_keys.take().ok_or_else(|| {
1269 Error::Quic(
1270 "native QUIC server cannot mirror peer key update without precomputed next write keys"
1271 .into(),
1272 )
1273 })?;
1274 self.server_application_keys = Some(next_write);
1275 let new_current_write = self
1276 .server_application_keys
1277 .as_ref()
1278 .expect("server application keys just rotated");
1279 self.server_application_next_keys =
1280 Some(derive_next_packet_key_material(new_current_write)?);
1281 self.write_key_phase = !self.write_key_phase;
1282 self.application_key_update.write_update_in_progress = true;
1283 self.application_key_update.write_update_anchor =
1284 Some(self.next_server_application_packet_number);
1285 }
1286
1287 Ok(())
1288 }
1289
1290 pub fn server_application_lost_packets(&self) -> Vec<u64> {
1291 self.server_application_loss_detector.lost_packets()
1292 }
1293
1294 pub fn recovery(&self) -> &RecoveryState {
1295 &self.recovery
1296 }
1297
1298 pub fn loss_detection_timer(&self) -> Option<Instant> {
1299 self.recovery.loss_detection_timer()
1300 }
1301
1302 pub fn on_loss_detection_timeout(&mut self, now: Instant) -> LossDetectionOutcome {
1303 self.recovery.on_loss_detection_timeout(now)
1304 }
1305
1306 pub fn application_pto(&self) -> Duration {
1307 self.recovery.current_pto()
1308 }
1309
1310 pub fn application_pto_timeout(&self) -> Duration {
1311 let max_ack_delay = self.recovery.max_ack_delay();
1312 let backoff = 1u32 << self.recovery.pto_count().min(31);
1313 self.recovery
1314 .current_pto()
1315 .saturating_add(max_ack_delay.saturating_mul(backoff))
1316 }
1317
1318 pub fn retransmit_lost_server_application_stream_packets(
1319 &mut self,
1320 ) -> Result<Vec<ServerApplicationPacket>> {
1321 let mut lost_packets = self.server_application_loss_detector.lost_packets();
1322 lost_packets.append(&mut self.server_application_recovery_lost_packets);
1323 self.retransmit_server_application_stream_packets(lost_packets)
1324 }
1325
1326 pub fn retransmit_pto_server_application_stream_packets(
1327 &mut self,
1328 now: Instant,
1329 pto_timeout: Duration,
1330 ) -> Result<Vec<ServerApplicationPacket>> {
1331 let expired_packets = self
1332 .server_application_loss_detector
1333 .pto_expired_packets(now, pto_timeout);
1334 self.retransmit_server_application_stream_packets(expired_packets)
1335 }
1336
1337 fn retransmit_server_application_stream_packets<I>(
1338 &mut self,
1339 packet_numbers: I,
1340 ) -> Result<Vec<ServerApplicationPacket>>
1341 where
1342 I: IntoIterator<Item = u64>,
1343 {
1344 let mut packet_numbers = packet_numbers.into_iter().collect::<Vec<_>>();
1345 packet_numbers.sort_unstable();
1346 packet_numbers.dedup();
1347 let mut retransmits = Vec::new();
1348 for packet_number in packet_numbers {
1349 self.server_application_loss_detector
1350 .retire_packet(packet_number);
1351 let Some(sent) = self.server_application_sent_streams.remove(&packet_number) else {
1352 continue;
1353 };
1354 retransmits.push(self.build_server_application_stream_packet_at_offset(
1355 sent.stream_id,
1356 sent.stream_offset,
1357 sent.data,
1358 sent.fin,
1359 )?);
1360 }
1361 Ok(retransmits)
1362 }
1363
1364 pub fn process_client_initial(&mut self, datagram: &[u8]) -> Result<ServerHandshakeFlight> {
1365 self.process_client_initial_with_ecn(datagram, None)
1366 }
1367
1368 pub fn process_client_initial_with_ecn(
1369 &mut self,
1370 datagram: &[u8],
1371 ecn_mark: Option<QuicEcnMark>,
1372 ) -> Result<ServerHandshakeFlight> {
1373 let mut server_initial_crypto = Bytes::new();
1374 let mut server_handshake_crypto = Bytes::new();
1375
1376 for packet in split_long_header_datagram(datagram)? {
1377 if packet.packet_type != LongHeaderType::Initial {
1378 continue;
1379 }
1380
1381 let opened = open_long_header_packet(
1382 &self.client_initial_keys,
1383 &packet.packet,
1384 packet.packet_number_offset,
1385 self.next_client_initial_packet_number,
1386 )?;
1387 observe_packet_with_ecn(
1388 &mut self.client_initial_ack_tracker,
1389 opened.packet_number,
1390 ecn_mark,
1391 Instant::now(),
1392 );
1393 self.next_client_initial_packet_number = opened.packet_number + 1;
1394
1395 for frame in decode_frames(&opened.payload)? {
1396 for packet_number in self.server_initial_loss_detector.on_ack_frame(&frame)? {
1397 self.server_initial_sent_crypto.remove(&packet_number);
1398 }
1399 let outcome = self.recovery.on_ack_received(
1400 PacketNumberSpace::Initial,
1401 &frame,
1402 self.ack_delay_exponent,
1403 Instant::now(),
1404 )?;
1405 for (packet_number, _) in outcome.newly_acked {
1406 self.server_initial_sent_crypto.remove(&packet_number);
1407 }
1408 if let QuicFrame::Crypto { offset, data } = frame {
1409 self.client_initial_crypto.insert(offset, data)?;
1410 }
1411 }
1412
1413 let crypto_data = self.client_initial_crypto.take_contiguous();
1414 if crypto_data.is_empty() {
1415 continue;
1416 }
1417
1418 self.tls
1419 .provide_crypto(QuicEncryptionLevel::Initial, &crypto_data)?;
1420 self.install_tls_secrets()?;
1421 server_initial_crypto = self.tls.take_crypto(QuicEncryptionLevel::Initial);
1422 server_handshake_crypto = self.tls.take_crypto(QuicEncryptionLevel::Handshake);
1423 }
1424
1425 let secrets = self.tls.secrets();
1426 let mut packets = Vec::new();
1427 let mut datagram_out = Vec::new();
1428
1429 if !server_initial_crypto.is_empty() {
1430 let packet = self.build_server_initial_packet(server_initial_crypto)?;
1431 datagram_out.extend_from_slice(&packet.packet);
1432 packets.push(packet);
1433 }
1434 if !server_handshake_crypto.is_empty() {
1435 let packet = self.build_server_handshake_packet(server_handshake_crypto)?;
1436 datagram_out.extend_from_slice(&packet.packet);
1437 packets.push(packet);
1438 }
1439
1440 Ok(ServerHandshakeFlight {
1441 datagram: Bytes::from(datagram_out),
1442 packets,
1443 secrets,
1444 })
1445 }
1446
1447 pub fn build_server_initial_ack_packet(&mut self) -> Result<Option<ClientAckPacket>> {
1448 let packet = build_ack_packet(
1449 LongHeaderType::Initial,
1450 &self.server_initial_keys,
1451 &self.client_source_cid,
1452 &self.server_source_cid,
1453 &mut self.client_initial_ack_tracker,
1454 self.next_server_initial_packet_number,
1455 )?;
1456 if packet.is_some() {
1457 self.next_server_initial_packet_number += 1;
1458 }
1459 Ok(packet)
1460 }
1461
1462 pub fn build_server_handshake_ack_packet(&mut self) -> Result<Option<ClientAckPacket>> {
1463 if self.client_handshake_ack_tracker.is_empty() {
1464 return Ok(None);
1465 }
1466 let Some(server_handshake_keys) = &self.server_handshake_keys else {
1467 return Err(Error::Quic(
1468 "native server Handshake ACK encryption is waiting for TLS Handshake keys".into(),
1469 ));
1470 };
1471 let packet = build_ack_packet(
1472 LongHeaderType::Handshake,
1473 server_handshake_keys,
1474 &self.client_source_cid,
1475 &self.server_source_cid,
1476 &mut self.client_handshake_ack_tracker,
1477 self.next_server_handshake_packet_number,
1478 )?;
1479 if packet.is_some() {
1480 self.next_server_handshake_packet_number += 1;
1481 }
1482 Ok(packet)
1483 }
1484
1485 pub fn process_client_handshake(
1486 &mut self,
1487 datagram: &[u8],
1488 ) -> Result<ProcessedClientHandshake> {
1489 self.process_client_handshake_with_ecn(datagram, None)
1490 }
1491
1492 pub fn process_client_handshake_with_ecn(
1493 &mut self,
1494 datagram: &[u8],
1495 ecn_mark: Option<QuicEcnMark>,
1496 ) -> Result<ProcessedClientHandshake> {
1497 let Some(client_handshake_keys) = &self.client_handshake_keys else {
1498 return Err(Error::Quic(
1499 "native server Handshake packet decryption is waiting for TLS Handshake keys"
1500 .into(),
1501 ));
1502 };
1503
1504 let mut packet_number = self.next_client_handshake_packet_number;
1505 for packet in split_long_header_datagram(datagram)? {
1506 if packet.packet_type != LongHeaderType::Handshake {
1507 continue;
1508 }
1509
1510 let opened = open_long_header_packet(
1511 client_handshake_keys,
1512 &packet.packet,
1513 packet.packet_number_offset,
1514 self.next_client_handshake_packet_number,
1515 )?;
1516 packet_number = opened.packet_number;
1517 observe_packet_with_ecn(
1518 &mut self.client_handshake_ack_tracker,
1519 opened.packet_number,
1520 ecn_mark,
1521 Instant::now(),
1522 );
1523 self.next_client_handshake_packet_number = opened.packet_number + 1;
1524
1525 for frame in decode_frames(&opened.payload)? {
1526 for packet_number in self.server_handshake_loss_detector.on_ack_frame(&frame)? {
1527 self.server_handshake_sent_crypto.remove(&packet_number);
1528 }
1529 let outcome = self.recovery.on_ack_received(
1530 PacketNumberSpace::Handshake,
1531 &frame,
1532 self.ack_delay_exponent,
1533 Instant::now(),
1534 )?;
1535 for (packet_number, _) in outcome.newly_acked {
1536 self.server_handshake_sent_crypto.remove(&packet_number);
1537 }
1538 if let QuicFrame::Crypto { offset, data } = frame {
1539 self.client_handshake_crypto.insert(offset, data)?;
1540 }
1541 }
1542 }
1543
1544 let crypto_data = self.client_handshake_crypto.take_contiguous();
1545 if !crypto_data.is_empty() {
1546 self.tls
1547 .provide_crypto(QuicEncryptionLevel::Handshake, &crypto_data)?;
1548 self.install_tls_secrets()?;
1549 }
1550
1551 Ok(ProcessedClientHandshake {
1552 packet_number,
1553 crypto_data,
1554 secrets: self.tls.secrets(),
1555 })
1556 }
1557
1558 pub fn open_client_application_packet(&mut self, packet: &[u8]) -> Result<Vec<QuicFrame>> {
1559 self.open_client_application_packet_with_ecn(packet, None)
1560 }
1561
1562 pub fn open_client_application_packet_with_ecn(
1563 &mut self,
1564 packet: &[u8],
1565 ecn_mark: Option<QuicEcnMark>,
1566 ) -> Result<Vec<QuicFrame>> {
1567 if self.close_state.is_draining() {
1572 return Ok(Vec::new());
1573 }
1574 let Some(client_application_keys) = self.client_application_keys.as_ref() else {
1575 return Err(Error::Quic(
1576 "native server application packet decryption is waiting for TLS application keys"
1577 .into(),
1578 ));
1579 };
1580 let now = Instant::now();
1581 let destination_cid_len =
1582 match_local_connection_id(packet, self.cid_inventory.unretired_locals())
1583 .map(|(_, len)| len)
1584 .unwrap_or_else(|| self.server_source_cid.as_bytes().len());
1585 let opened = try_open_one_rtt_packet(
1586 client_application_keys,
1587 self.client_application_next_keys.as_ref(),
1588 self.client_application_previous.as_ref(),
1589 self.read_key_phase,
1590 now,
1591 packet,
1592 destination_cid_len,
1593 self.next_client_application_packet_number,
1594 )?;
1595 if matches!(opened.outcome, OneRttOpenOutcome::Next) {
1596 self.commit_receive_key_update(now)?;
1597 }
1598 let opened = opened.opened;
1599 self.next_client_application_packet_number = opened.packet_number + 1;
1600 let frames = decode_frames(&opened.payload)?;
1601 self.apply_opened_client_application_frames(opened.packet_number, frames, now, ecn_mark)
1602 }
1603
1604 pub fn open_client_zero_rtt_h3_event_packet(
1605 &mut self,
1606 datagram: &[u8],
1607 ) -> Result<Vec<ClientH3Event>> {
1608 self.open_client_zero_rtt_h3_event_packet_with_ecn(datagram, None)
1609 }
1610
1611 pub fn open_client_zero_rtt_h3_event_packet_with_ecn(
1612 &mut self,
1613 datagram: &[u8],
1614 ecn_mark: Option<QuicEcnMark>,
1615 ) -> Result<Vec<ClientH3Event>> {
1616 if self.close_state.is_draining() || !self.handshake_status().early_data_accepted() {
1617 return Ok(Vec::new());
1618 }
1619 let Some(client_early_data_keys) = self.client_early_data_keys.clone() else {
1620 return Ok(Vec::new());
1621 };
1622
1623 let mut events = Vec::new();
1624 for packet in split_long_header_datagram(datagram)? {
1625 if packet.packet_type != LongHeaderType::ZeroRtt {
1626 continue;
1627 }
1628 let now = Instant::now();
1629 let opened = open_long_header_packet(
1630 &client_early_data_keys,
1631 &packet.packet,
1632 packet.packet_number_offset,
1633 self.next_client_application_packet_number,
1634 )?;
1635 self.next_client_application_packet_number = opened.packet_number + 1;
1636 let frames = decode_frames(&opened.payload)?;
1637 let frames = self.apply_opened_client_application_frames(
1638 opened.packet_number,
1639 frames,
1640 now,
1641 ecn_mark,
1642 )?;
1643 events.extend(self.client_h3_events_from_frames(frames, None)?);
1644 }
1645 Ok(events)
1646 }
1647
1648 fn apply_opened_client_application_frames(
1649 &mut self,
1650 packet_number: u64,
1651 frames: Vec<QuicFrame>,
1652 now: Instant,
1653 ecn_mark: Option<QuicEcnMark>,
1654 ) -> Result<Vec<QuicFrame>> {
1655 for frame in &frames {
1656 if let QuicFrame::Stream {
1657 stream_id,
1658 offset,
1659 data,
1660 ..
1661 } = frame
1662 {
1663 self.server_application_receive_flow_control
1664 .observe_stream_frame(*stream_id, *offset, data.len())?;
1665 }
1666 for packet_number in self.server_application_loss_detector.on_ack_frame(frame)? {
1667 self.server_application_sent_streams.remove(&packet_number);
1668 self.application_key_update.note_packet_acked(packet_number);
1669 }
1670 if matches!(frame, QuicFrame::Ack { .. } | QuicFrame::AckEcn { .. }) {
1671 let outcome = self.recovery.on_ack_received(
1672 PacketNumberSpace::Application,
1673 frame,
1674 self.ack_delay_exponent,
1675 now,
1676 )?;
1677 for (packet_number, _) in outcome.newly_acked {
1678 self.server_application_sent_streams.remove(&packet_number);
1679 self.application_key_update.note_packet_acked(packet_number);
1680 }
1681 self.server_application_recovery_lost_packets.extend(
1682 outcome
1683 .lost
1684 .into_iter()
1685 .map(|(packet_number, _)| packet_number),
1686 );
1687 }
1688 match frame {
1689 QuicFrame::MaxData(max_data) => {
1690 self.server_application_flow_control
1691 .apply_max_data(*max_data);
1692 }
1693 QuicFrame::MaxStreamData {
1694 stream_id,
1695 max_stream_data,
1696 } => self
1697 .server_application_flow_control
1698 .apply_max_stream_data(*stream_id, *max_stream_data),
1699 QuicFrame::MaxStreams {
1700 bidirectional,
1701 max_streams,
1702 } => self
1703 .server_application_flow_control
1704 .apply_max_streams(*bidirectional, *max_streams),
1705 QuicFrame::NewConnectionId {
1706 sequence_number,
1707 retire_prior_to,
1708 connection_id,
1709 stateless_reset_token,
1710 } => {
1711 let _ = self.cid_inventory.observe_peer_new_connection_id(
1712 *sequence_number,
1713 *retire_prior_to,
1714 connection_id.clone(),
1715 *stateless_reset_token,
1716 );
1717 }
1718 QuicFrame::RetireConnectionId { sequence_number } => {
1719 let _ = self
1720 .cid_inventory
1721 .observe_peer_retire_connection_id(*sequence_number);
1722 }
1723 _ => {}
1724 }
1725 }
1726 if frames.iter().any(is_ack_eliciting_quic_frame) {
1727 observe_packet_with_ecn(
1728 &mut self.client_application_ack_tracker,
1729 packet_number,
1730 ecn_mark,
1731 now,
1732 );
1733 }
1734 Ok(frames.into_iter().filter(is_not_padding_frame).collect())
1735 }
1736
1737 pub fn build_server_application_ack_packet(
1738 &mut self,
1739 ) -> Result<Option<ServerApplicationAckPacket>> {
1740 self.build_server_application_ack_packet_with_delay(Instant::now(), 0)
1741 }
1742
1743 pub fn build_server_application_ack_packet_with_delay(
1744 &mut self,
1745 now: Instant,
1746 ack_delay_exponent: u64,
1747 ) -> Result<Option<ServerApplicationAckPacket>> {
1748 if self.client_application_ack_tracker.is_empty() {
1749 return Ok(None);
1750 }
1751 let Some(server_application_keys) = &self.server_application_keys else {
1752 return Err(Error::Quic(
1753 "native server application ACK encryption is waiting for TLS application keys"
1754 .into(),
1755 ));
1756 };
1757
1758 let packet_number = self.next_server_application_packet_number;
1759 let packet_number_len = 2;
1760 let frame = encode_frame(
1761 &self
1762 .client_application_ack_tracker
1763 .to_ack_frame_with_delay(now, ack_delay_exponent)?,
1764 );
1765 let packet = protect_short_header_packet(
1766 server_application_keys,
1767 &self.server_outbound_peer_cid(),
1768 packet_number,
1769 packet_number_len,
1770 self.write_key_phase,
1771 &frame,
1772 )?;
1773 self.client_application_ack_tracker.mark_ack_sent();
1774 self.next_server_application_packet_number += 1;
1775
1776 Ok(Some(ServerApplicationAckPacket {
1777 packet,
1778 packet_number,
1779 packet_number_offset: 1 + self.server_outbound_peer_cid().as_bytes().len(),
1780 }))
1781 }
1782
1783 pub fn build_server_application_ack_packet_after(
1784 &mut self,
1785 threshold: usize,
1786 ) -> Result<Option<ServerApplicationAckPacket>> {
1787 if !self
1788 .client_application_ack_tracker
1789 .should_ack_after(threshold)
1790 {
1791 return Ok(None);
1792 }
1793 self.build_server_application_ack_packet()
1794 }
1795
1796 pub fn build_server_application_ack_packet_after_or_delay(
1797 &mut self,
1798 threshold: usize,
1799 max_ack_delay: Duration,
1800 now: Instant,
1801 ack_delay_exponent: u64,
1802 ) -> Result<Option<ServerApplicationAckPacket>> {
1803 if !self
1804 .client_application_ack_tracker
1805 .should_ack_after_or_delay(threshold, max_ack_delay, now)
1806 {
1807 return Ok(None);
1808 }
1809 self.build_server_application_ack_packet_with_delay(now, ack_delay_exponent)
1810 }
1811
1812 pub fn server_application_ack_deadline(&self, max_ack_delay: Duration) -> Option<Instant> {
1813 self.client_application_ack_tracker
1814 .pending_ack_deadline(max_ack_delay)
1815 }
1816
1817 pub fn open_client_h3_stream_packet(
1818 &mut self,
1819 packet: &[u8],
1820 ) -> Result<Vec<ClientH3StreamEvent>> {
1821 Ok(self
1822 .open_client_h3_event_packet(packet)?
1823 .into_iter()
1824 .filter_map(|event| match event {
1825 ClientH3Event::Stream(event) => Some(event),
1826 ClientH3Event::ResetStream { .. }
1827 | ClientH3Event::StopSending { .. }
1828 | ClientH3Event::ConnectionClose { .. }
1829 | ClientH3Event::PathChallenge(_) => None,
1830 })
1831 .collect())
1832 }
1833
1834 pub fn open_client_h3_event_packet(&mut self, packet: &[u8]) -> Result<Vec<ClientH3Event>> {
1835 if self.close_state.is_draining() {
1840 return Ok(Vec::new());
1841 }
1842 let frames = self.open_client_application_packet(packet)?;
1843 self.client_h3_events_from_frames(frames, None)
1844 }
1845
1846 pub fn open_client_h3_event_packet_from(
1847 &mut self,
1848 packet: &[u8],
1849 remote_address: SocketAddr,
1850 ) -> Result<Vec<ClientH3Event>> {
1851 if self.close_state.is_draining() {
1852 return Ok(Vec::new());
1853 }
1854 let frames = self.open_client_application_packet(packet)?;
1855 self.client_h3_events_from_frames(frames, Some(remote_address))
1856 }
1857
1858 fn client_h3_events_from_frames(
1859 &mut self,
1860 frames: Vec<QuicFrame>,
1861 remote_address: Option<SocketAddr>,
1862 ) -> Result<Vec<ClientH3Event>> {
1863 let mut events = Vec::new();
1864 for frame in frames {
1865 match frame {
1866 QuicFrame::Stream {
1867 stream_id,
1868 offset,
1869 fin,
1870 data,
1871 } => {
1872 if let Some(event) = apply_h3_stream_frame(
1873 &mut self.client_h3_stream_buffers,
1874 &mut self.client_h3_stream_buffer_offsets,
1875 &mut self.client_h3_stream_types,
1876 stream_id,
1877 offset,
1878 fin,
1879 data,
1880 )? {
1881 events.push(ClientH3Event::Stream(ClientH3StreamEvent {
1882 stream_id: event.stream_id,
1883 stream_type: event.stream_type,
1884 fin: event.fin,
1885 frames: event.frames,
1886 }));
1887 }
1888 }
1889 QuicFrame::ResetStream {
1890 stream_id,
1891 error_code,
1892 final_size,
1893 } => events.push(ClientH3Event::ResetStream {
1894 stream_id,
1895 error_code,
1896 final_size,
1897 }),
1898 QuicFrame::StopSending {
1899 stream_id,
1900 error_code,
1901 } => events.push(ClientH3Event::StopSending {
1902 stream_id,
1903 error_code,
1904 }),
1905 QuicFrame::ConnectionClose {
1906 error_code,
1907 frame_type,
1908 reason,
1909 } => {
1910 self.close_draining = true;
1915 self.close_state.enter_draining(Instant::now());
1916 events.push(ClientH3Event::ConnectionClose {
1917 error_code,
1918 frame_type,
1919 reason,
1920 });
1921 }
1922 QuicFrame::PathChallenge(data) => events.push(ClientH3Event::PathChallenge(data)),
1923 QuicFrame::PathResponse(data) => {
1924 if let Some(remote_address) = remote_address {
1925 self.server_path_validator
1926 .on_path_response_from(remote_address, data);
1927 } else {
1928 self.server_path_validator.on_path_response(data);
1929 }
1930 }
1931 QuicFrame::Padding
1932 | QuicFrame::Ping
1933 | QuicFrame::Ack { .. }
1934 | QuicFrame::AckEcn { .. }
1935 | QuicFrame::Crypto { .. }
1936 | QuicFrame::MaxData(_)
1937 | QuicFrame::MaxStreamData { .. }
1938 | QuicFrame::MaxStreams { .. }
1939 | QuicFrame::DataBlocked { .. }
1940 | QuicFrame::StreamDataBlocked { .. }
1941 | QuicFrame::StreamsBlocked { .. }
1942 | QuicFrame::NewConnectionId { .. }
1943 | QuicFrame::RetireConnectionId { .. }
1944 | QuicFrame::HandshakeDone => {}
1945 }
1946 }
1947 Ok(events)
1948 }
1949
1950 pub fn build_server_h3_settings_packet(
1951 &mut self,
1952 fingerprint: &Http3Fingerprint,
1953 ) -> Result<ServerApplicationPacket> {
1954 let stream_id = self.server_control_stream_id.unwrap_or_else(|| {
1955 let stream_id = self.next_server_unidirectional_stream_id;
1956 self.next_server_unidirectional_stream_id += 4;
1957 self.server_control_stream_id = Some(stream_id);
1958 stream_id
1959 });
1960 let settings = native::encode_frame(&native::H3Frame::Settings(
1961 native::encode_fingerprint_settings_payload(fingerprint),
1962 ));
1963 let payload = if self.server_stream_offsets.contains_key(&stream_id) {
1964 settings
1965 } else {
1966 native::encode_unidirectional_stream(&native::H3UnidirectionalStream {
1967 stream_type: native::H3StreamType::Control,
1968 payload: settings,
1969 })
1970 };
1971 self.build_server_application_stream_packet(stream_id, payload, false)
1972 }
1973
1974 pub fn build_server_h3_goaway_packet(&mut self, id: u64) -> Result<ServerApplicationPacket> {
1975 let Some(stream_id) = self.server_control_stream_id else {
1976 return Err(Error::HttpProtocol(
1977 "native server H3 GOAWAY requires an open control stream".into(),
1978 ));
1979 };
1980 self.build_server_application_stream_packet(
1981 stream_id,
1982 native::encode_frame(&native::H3Frame::GoAway { id }),
1983 false,
1984 )
1985 }
1986
1987 pub fn build_server_reset_stream_packet(
1988 &mut self,
1989 stream_id: u64,
1990 error_code: u64,
1991 ) -> Result<ServerApplicationControlPacket> {
1992 let final_size = *self.server_stream_offsets.get(&stream_id).unwrap_or(&0);
1993 self.build_server_application_control_packet(QuicFrame::ResetStream {
1994 stream_id,
1995 error_code,
1996 final_size,
1997 })
1998 }
1999
2000 pub fn build_server_connection_close_packet(
2001 &mut self,
2002 error_code: u64,
2003 reason: Bytes,
2004 ) -> Result<ServerApplicationControlPacket> {
2005 let packet = self.build_server_application_control_packet(QuicFrame::ConnectionClose {
2006 error_code,
2007 frame_type: None,
2008 reason,
2009 })?;
2010 self.server_enter_closing(Instant::now());
2016 Ok(packet)
2017 }
2018
2019 pub fn build_server_max_data_packet(
2020 &mut self,
2021 max_data: u64,
2022 ) -> Result<ServerApplicationControlPacket> {
2023 self.build_server_application_control_packet(QuicFrame::MaxData(max_data))
2024 }
2025
2026 pub fn build_server_max_stream_data_packet(
2027 &mut self,
2028 stream_id: u64,
2029 max_stream_data: u64,
2030 ) -> Result<ServerApplicationControlPacket> {
2031 self.build_server_application_control_packet(QuicFrame::MaxStreamData {
2032 stream_id,
2033 max_stream_data,
2034 })
2035 }
2036
2037 pub fn build_server_max_streams_packet(
2038 &mut self,
2039 bidirectional: bool,
2040 max_streams: u64,
2041 ) -> Result<ServerApplicationControlPacket> {
2042 self.build_server_application_control_packet(QuicFrame::MaxStreams {
2043 bidirectional,
2044 max_streams,
2045 })
2046 }
2047
2048 pub fn build_server_flow_control_blocked_packet(
2049 &mut self,
2050 ) -> Result<Option<ServerApplicationControlPacket>> {
2051 self.server_application_flow_control
2052 .take_blocked_frame()
2053 .map(|frame| self.build_server_application_control_packet(frame))
2054 .transpose()
2055 }
2056
2057 pub fn build_server_receive_flow_control_update_packets(
2058 &mut self,
2059 ) -> Result<Vec<ServerApplicationControlPacket>> {
2060 self.server_application_receive_flow_control
2061 .take_update_frames()
2062 .into_iter()
2063 .map(|frame| self.build_server_application_control_packet(frame))
2064 .collect()
2065 }
2066
2067 pub fn record_server_stream_consumed(&mut self, stream_id: u64, len: u64) -> Result<()> {
2073 self.server_application_receive_flow_control
2074 .record_stream_consumed(stream_id, len)
2075 }
2076
2077 pub fn release_server_stream(&mut self, stream_id: u64) {
2078 self.server_application_receive_flow_control
2079 .release_stream(stream_id);
2080 }
2081
2082 pub fn build_server_handshake_done_packet(&mut self) -> Result<ServerApplicationControlPacket> {
2083 self.build_server_application_control_packet(QuicFrame::HandshakeDone)
2084 }
2085
2086 pub fn build_server_new_connection_id_packet(
2087 &mut self,
2088 sequence_number: u64,
2089 retire_prior_to: u64,
2090 connection_id: ConnectionId,
2091 stateless_reset_token: [u8; 16],
2092 ) -> Result<ServerApplicationControlPacket> {
2093 if connection_id.as_bytes().is_empty() {
2094 return Err(Error::Quic(
2095 "native QUIC NEW_CONNECTION_ID cannot carry an empty connection id".into(),
2096 ));
2097 }
2098 if retire_prior_to > sequence_number {
2099 return Err(Error::Quic(
2100 "native QUIC NEW_CONNECTION_ID retire_prior_to exceeds sequence_number".into(),
2101 ));
2102 }
2103 self.cid_inventory.register_local_issued(
2104 sequence_number,
2105 connection_id.clone(),
2106 stateless_reset_token,
2107 )?;
2108 self.build_server_application_control_packet(QuicFrame::NewConnectionId {
2109 sequence_number,
2110 retire_prior_to,
2111 connection_id: Bytes::copy_from_slice(connection_id.as_bytes()),
2112 stateless_reset_token,
2113 })
2114 }
2115
2116 pub fn build_server_path_response_packet(
2117 &mut self,
2118 data: [u8; 8],
2119 ) -> Result<ServerApplicationControlPacket> {
2120 self.build_server_application_control_packet_with_min_datagram(
2121 QuicFrame::PathResponse(data),
2122 MIN_PATH_VALIDATION_DATAGRAM,
2123 )
2124 }
2125
2126 pub fn build_server_path_challenge_packet_for_address(
2127 &mut self,
2128 remote_address: SocketAddr,
2129 data: [u8; 8],
2130 ) -> Result<ServerApplicationControlPacket> {
2131 let frame = self
2132 .server_path_validator
2133 .path_challenge_for_peer_address(remote_address, data)?;
2134 self.build_server_application_control_packet_with_min_datagram(
2135 frame,
2136 MIN_PATH_VALIDATION_DATAGRAM,
2137 )
2138 }
2139
2140 pub fn build_server_retire_connection_id_packet(
2141 &mut self,
2142 sequence_number: u64,
2143 ) -> Result<ServerApplicationControlPacket> {
2144 self.build_server_application_control_packet(QuicFrame::RetireConnectionId {
2145 sequence_number,
2146 })
2147 }
2148
2149 pub fn build_server_connection_migration_close_packet(
2150 &mut self,
2151 ) -> Result<ServerApplicationControlPacket> {
2152 let packet = self.build_server_application_control_packet(QuicFrame::ConnectionClose {
2153 error_code: QUIC_CONNECTION_MIGRATION_ERROR,
2154 frame_type: None,
2155 reason: Bytes::new(),
2156 })?;
2157 self.server_enter_closing(Instant::now());
2158 Ok(packet)
2159 }
2160
2161 pub fn server_pop_pending_peer_retires(&mut self) -> Vec<u64> {
2162 self.cid_inventory.drain_pending_peer_retires()
2163 }
2164
2165 pub fn server_promote_peer_cid(&mut self, sequence_number: u64) -> Result<()> {
2166 self.cid_inventory.promote_peer_to_active(sequence_number)
2167 }
2168
2169 pub fn server_outbound_peer_cid(&self) -> ConnectionId {
2170 self.cid_inventory
2171 .active_peer()
2172 .and_then(|entry| ConnectionId::from_bytes(entry.connection_id.clone()).ok())
2173 .unwrap_or_else(|| self.client_source_cid.clone())
2174 }
2175
2176 pub fn server_local_cids_for_routing(&self) -> Vec<ConnectionId> {
2177 self.cid_inventory
2178 .unretired_locals()
2179 .map(|entry| entry.connection_id.clone())
2180 .collect()
2181 }
2182
2183 pub fn is_server_path_address_validated(&self, remote_address: &SocketAddr) -> bool {
2184 self.server_path_validator
2185 .is_address_validated(remote_address)
2186 }
2187
2188 pub fn build_server_h3_raw_stream_packet(
2189 &mut self,
2190 stream_id: u64,
2191 data: Bytes,
2192 fin: bool,
2193 ) -> Result<ServerApplicationPacket> {
2194 self.build_server_application_stream_packet(stream_id, data, fin)
2195 }
2196
2197 pub fn build_server_h3_response_packet(
2198 &mut self,
2199 stream_id: u64,
2200 headers: Vec<native::H3Header>,
2201 body: Option<Bytes>,
2202 fin: bool,
2203 ) -> Result<ServerApplicationPacket> {
2204 let mut payload = native::encode_frame(&native::H3Frame::Headers(
2205 native::encode_header_block(&headers),
2206 ))
2207 .to_vec();
2208 if let Some(body) = body {
2209 payload.extend_from_slice(&native::encode_frame(&native::H3Frame::Data(body)));
2210 }
2211 self.build_server_application_stream_packet(stream_id, Bytes::from(payload), fin)
2212 }
2213
2214 pub fn build_server_h3_response_data_packet(
2215 &mut self,
2216 stream_id: u64,
2217 data: Bytes,
2218 fin: bool,
2219 ) -> Result<ServerApplicationPacket> {
2220 self.build_server_application_stream_packet(
2221 stream_id,
2222 native::encode_frame(&native::H3Frame::Data(data)),
2223 fin,
2224 )
2225 }
2226
2227 fn install_tls_secrets(&mut self) -> Result<()> {
2228 for secret in self.tls.secrets() {
2229 match (secret.direction, secret.level) {
2230 (QuicSecretDirection::Read, QuicEncryptionLevel::Handshake) => {
2231 self.client_handshake_keys = Some(secret.packet_key_material()?);
2232 self.recovery.set_has_handshake_keys(true);
2233 }
2234 (QuicSecretDirection::Read, QuicEncryptionLevel::EarlyData) => {
2235 self.client_early_data_keys = Some(secret.packet_key_material()?);
2236 }
2237 (QuicSecretDirection::Write, QuicEncryptionLevel::Handshake) => {
2238 self.server_handshake_keys = Some(secret.packet_key_material()?);
2239 self.recovery.set_has_handshake_keys(true);
2240 }
2241 (QuicSecretDirection::Read, QuicEncryptionLevel::Application) => {
2242 let keys = secret.packet_key_material()?;
2243 self.client_application_next_keys =
2244 Some(derive_next_packet_key_material(&keys)?);
2245 self.client_application_keys = Some(keys);
2246 }
2247 (QuicSecretDirection::Write, QuicEncryptionLevel::Application) => {
2248 let keys = secret.packet_key_material()?;
2249 self.server_application_next_keys =
2250 Some(derive_next_packet_key_material(&keys)?);
2251 self.server_application_keys = Some(keys);
2252 }
2253 _ => {}
2254 }
2255 }
2256 if self.is_application_ready() && !self.recovery.handshake_complete() {
2257 self.recovery.discard_space(PacketNumberSpace::Initial);
2258 self.recovery.discard_space(PacketNumberSpace::Handshake);
2259 self.recovery.mark_handshake_complete();
2260 }
2261 Ok(())
2262 }
2263
2264 fn build_server_initial_packet(&mut self, crypto_data: Bytes) -> Result<ServerHandshakePacket> {
2265 let crypto_offset = self.server_initial_crypto_offset;
2266 let packet = self.build_server_initial_packet_at_offset_with_sent_at(
2267 crypto_offset,
2268 crypto_data,
2269 Instant::now(),
2270 )?;
2271 self.server_initial_crypto_offset += packet.crypto_data.len() as u64;
2272 Ok(packet)
2273 }
2274
2275 fn build_server_initial_packet_at_offset_with_sent_at(
2276 &mut self,
2277 crypto_offset: u64,
2278 crypto_data: Bytes,
2279 sent_at: Instant,
2280 ) -> Result<ServerHandshakePacket> {
2281 let packet_number = self.next_server_initial_packet_number;
2282 self.next_server_initial_packet_number += 1;
2283 let packet = build_server_crypto_packet(
2284 LongHeaderType::Initial,
2285 &self.server_initial_keys,
2286 &self.client_source_cid,
2287 &self.server_source_cid,
2288 packet_number,
2289 crypto_offset,
2290 crypto_data.clone(),
2291 )?;
2292 self.server_initial_loss_detector
2293 .on_packet_sent_at(packet_number, sent_at);
2294 self.recovery.on_packet_sent(
2295 PacketNumberSpace::Initial,
2296 packet_number,
2297 SentPacketInfo::new(sent_at, packet.packet.len(), true, true),
2298 );
2299 self.server_initial_sent_crypto.insert(
2300 packet_number,
2301 SentCryptoPacket {
2302 packet_type: LongHeaderType::Initial,
2303 crypto_offset,
2304 crypto_data: crypto_data.clone(),
2305 },
2306 );
2307 Ok(packet)
2308 }
2309
2310 fn build_server_handshake_packet(
2311 &mut self,
2312 crypto_data: Bytes,
2313 ) -> Result<ServerHandshakePacket> {
2314 let crypto_offset = self.server_handshake_crypto_offset;
2315 let packet = self.build_server_handshake_packet_at_offset_with_sent_at(
2316 crypto_offset,
2317 crypto_data,
2318 Instant::now(),
2319 )?;
2320 self.server_handshake_crypto_offset += packet.crypto_data.len() as u64;
2321 Ok(packet)
2322 }
2323
2324 pub fn retransmit_pto_server_crypto_packets(
2325 &mut self,
2326 now: Instant,
2327 pto: Duration,
2328 ) -> Result<Vec<ServerHandshakePacket>> {
2329 let mut retransmits = Vec::new();
2330 for packet_number in self
2331 .server_initial_loss_detector
2332 .pto_expired_packets(now, pto)
2333 {
2334 self.server_initial_loss_detector
2335 .retire_packet(packet_number);
2336 let Some(sent) = self.server_initial_sent_crypto.remove(&packet_number) else {
2337 continue;
2338 };
2339 if sent.packet_type != LongHeaderType::Initial {
2340 continue;
2341 }
2342 retransmits.push(self.build_server_initial_packet_at_offset_with_sent_at(
2343 sent.crypto_offset,
2344 sent.crypto_data,
2345 now,
2346 )?);
2347 }
2348 for packet_number in self
2349 .server_handshake_loss_detector
2350 .pto_expired_packets(now, pto)
2351 {
2352 self.server_handshake_loss_detector
2353 .retire_packet(packet_number);
2354 let Some(sent) = self.server_handshake_sent_crypto.remove(&packet_number) else {
2355 continue;
2356 };
2357 if sent.packet_type != LongHeaderType::Handshake {
2358 continue;
2359 }
2360 retransmits.push(self.build_server_handshake_packet_at_offset_with_sent_at(
2361 sent.crypto_offset,
2362 sent.crypto_data,
2363 now,
2364 )?);
2365 }
2366 Ok(retransmits)
2367 }
2368
2369 fn build_server_handshake_packet_at_offset_with_sent_at(
2370 &mut self,
2371 crypto_offset: u64,
2372 crypto_data: Bytes,
2373 sent_at: Instant,
2374 ) -> Result<ServerHandshakePacket> {
2375 let Some(server_handshake_keys) = &self.server_handshake_keys else {
2376 return Err(Error::Quic(
2377 "native server Handshake packet encryption is waiting for TLS Handshake keys"
2378 .into(),
2379 ));
2380 };
2381 let packet_number = self.next_server_handshake_packet_number;
2382 self.next_server_handshake_packet_number += 1;
2383 let packet = build_server_crypto_packet(
2384 LongHeaderType::Handshake,
2385 server_handshake_keys,
2386 &self.client_source_cid,
2387 &self.server_source_cid,
2388 packet_number,
2389 crypto_offset,
2390 crypto_data.clone(),
2391 )?;
2392 self.server_handshake_loss_detector
2393 .on_packet_sent_at(packet_number, sent_at);
2394 self.recovery.on_packet_sent(
2395 PacketNumberSpace::Handshake,
2396 packet_number,
2397 SentPacketInfo::new(sent_at, packet.packet.len(), true, true),
2398 );
2399 self.server_handshake_sent_crypto.insert(
2400 packet_number,
2401 SentCryptoPacket {
2402 packet_type: LongHeaderType::Handshake,
2403 crypto_offset,
2404 crypto_data: crypto_data.clone(),
2405 },
2406 );
2407 Ok(packet)
2408 }
2409
2410 fn build_server_application_stream_packet(
2411 &mut self,
2412 stream_id: u64,
2413 data: Bytes,
2414 fin: bool,
2415 ) -> Result<ServerApplicationPacket> {
2416 if data.is_empty() && !fin {
2417 return Err(Error::HttpProtocol(
2418 "native server H3 response produced no payload".into(),
2419 ));
2420 }
2421 let stream_offset = *self.server_stream_offsets.get(&stream_id).unwrap_or(&0);
2422 self.server_application_flow_control.consume_stream_data(
2423 stream_id,
2424 stream_offset,
2425 data.len(),
2426 )?;
2427 let packet = self.build_server_application_stream_packet_at_offset(
2428 stream_id,
2429 stream_offset,
2430 data,
2431 fin,
2432 )?;
2433 self.server_stream_offsets
2434 .insert(stream_id, stream_offset + packet.data.len() as u64);
2435 Ok(packet)
2436 }
2437
2438 fn build_server_application_stream_packet_at_offset(
2439 &mut self,
2440 stream_id: u64,
2441 stream_offset: u64,
2442 data: Bytes,
2443 fin: bool,
2444 ) -> Result<ServerApplicationPacket> {
2445 let Some(server_application_keys) = &self.server_application_keys else {
2446 return Err(Error::Quic(
2447 "native server application packet encryption is waiting for TLS application keys"
2448 .into(),
2449 ));
2450 };
2451
2452 let packet_number = self.next_server_application_packet_number;
2453 let packet_number_len = 2;
2454 let peer_cid = self.server_outbound_peer_cid();
2455 let frame = encode_frame(&QuicFrame::Stream {
2456 stream_id,
2457 offset: (stream_offset > 0).then_some(stream_offset),
2458 fin,
2459 data: data.clone(),
2460 });
2461 let packet = protect_short_header_packet(
2462 server_application_keys,
2463 &peer_cid,
2464 packet_number,
2465 packet_number_len,
2466 self.write_key_phase,
2467 &frame,
2468 )?;
2469
2470 let now = Instant::now();
2471 let packet_size = packet.len();
2472 self.server_application_loss_detector
2473 .on_packet_sent_at(packet_number, now);
2474 self.server_application_sent_streams.insert(
2475 packet_number,
2476 SentApplicationStreamPacket {
2477 stream_id,
2478 stream_offset,
2479 fin,
2480 data: data.clone(),
2481 },
2482 );
2483 self.recovery.on_packet_sent(
2484 PacketNumberSpace::Application,
2485 packet_number,
2486 SentPacketInfo::new(now, packet_size, true, true),
2487 );
2488 self.next_server_application_packet_number += 1;
2489
2490 Ok(ServerApplicationPacket {
2491 packet,
2492 packet_number,
2493 stream_id,
2494 packet_number_offset: 1 + peer_cid.as_bytes().len(),
2495 data,
2496 })
2497 }
2498
2499 fn build_server_application_control_packet(
2500 &mut self,
2501 frame: QuicFrame,
2502 ) -> Result<ServerApplicationControlPacket> {
2503 self.build_server_application_control_packet_with_min_datagram(frame, 0)
2504 }
2505
2506 fn build_server_application_control_packet_with_min_datagram(
2507 &mut self,
2508 frame: QuicFrame,
2509 min_datagram: usize,
2510 ) -> Result<ServerApplicationControlPacket> {
2511 let Some(server_application_keys) = &self.server_application_keys else {
2512 return Err(Error::Quic(
2513 "native server application packet encryption is waiting for TLS application keys"
2514 .into(),
2515 ));
2516 };
2517
2518 let packet_number = self.next_server_application_packet_number;
2519 let packet_number_len = 2;
2520 let peer_cid = self.server_outbound_peer_cid();
2521 let short_header_len = 1 + peer_cid.as_bytes().len() + packet_number_len;
2522 let payload = if min_datagram > 0 {
2523 pad_short_header_payload_to_min_datagram(
2524 padded_short_header_payload(encode_frame(&frame)),
2525 short_header_len,
2526 min_datagram,
2527 )
2528 } else {
2529 padded_short_header_payload(encode_frame(&frame))
2530 };
2531 let packet = protect_short_header_packet(
2532 server_application_keys,
2533 &peer_cid,
2534 packet_number,
2535 packet_number_len,
2536 self.write_key_phase,
2537 &payload,
2538 )?;
2539 let now = Instant::now();
2540 let packet_size = packet.len();
2541 self.server_application_loss_detector
2542 .on_packet_sent_at(packet_number, now);
2543 self.recovery.on_packet_sent(
2544 PacketNumberSpace::Application,
2545 packet_number,
2546 SentPacketInfo::new(now, packet_size, true, true),
2547 );
2548 self.next_server_application_packet_number += 1;
2549
2550 Ok(ServerApplicationControlPacket {
2551 packet,
2552 packet_number,
2553 packet_number_offset: 1 + peer_cid.as_bytes().len(),
2554 })
2555 }
2556}
2557
2558impl NativeQuicHandshake {
2559 pub fn client(
2560 server_name: &str,
2561 fingerprint: &Http3Fingerprint,
2562 destination_cid: ConnectionId,
2563 source_cid: ConnectionId,
2564 ) -> Result<Self> {
2565 Self::client_with_verify_peer(server_name, fingerprint, destination_cid, source_cid, true)
2566 }
2567
2568 pub fn client_with_verify_peer(
2569 server_name: &str,
2570 fingerprint: &Http3Fingerprint,
2571 destination_cid: ConnectionId,
2572 source_cid: ConnectionId,
2573 verify_peer: bool,
2574 ) -> Result<Self> {
2575 Self::client_with_tls_fingerprint(
2576 server_name,
2577 fingerprint,
2578 None,
2579 destination_cid,
2580 source_cid,
2581 verify_peer,
2582 &[],
2583 false,
2584 )
2585 }
2586
2587 #[allow(clippy::too_many_arguments)]
2592 pub fn client_with_tls_fingerprint_and_session(
2593 server_name: &str,
2594 fingerprint: &Http3Fingerprint,
2595 tls_fingerprint: Option<&TlsFingerprint>,
2596 destination_cid: ConnectionId,
2597 source_cid: ConnectionId,
2598 verify_peer: bool,
2599 root_certs: &[Vec<u8>],
2600 use_platform_roots: bool,
2601 session_der: Option<&[u8]>,
2602 ) -> Result<Self> {
2603 match session_der {
2604 Some(session_der) => Self::client_with_replayed_session_ticket(
2605 server_name,
2606 fingerprint,
2607 tls_fingerprint,
2608 destination_cid,
2609 source_cid,
2610 verify_peer,
2611 root_certs,
2612 use_platform_roots,
2613 session_der,
2614 ),
2615 None => Self::client_with_tls_fingerprint(
2616 server_name,
2617 fingerprint,
2618 tls_fingerprint,
2619 destination_cid,
2620 source_cid,
2621 verify_peer,
2622 root_certs,
2623 use_platform_roots,
2624 ),
2625 }
2626 }
2627
2628 #[allow(clippy::too_many_arguments)]
2629 pub fn client_with_tls_fingerprint_and_zero_rtt_request(
2630 server_name: &str,
2631 fingerprint: &Http3Fingerprint,
2632 tls_fingerprint: Option<&TlsFingerprint>,
2633 destination_cid: ConnectionId,
2634 source_cid: ConnectionId,
2635 verify_peer: bool,
2636 root_certs: &[Vec<u8>],
2637 use_platform_roots: bool,
2638 session_der: &[u8],
2639 early_data: &[u8],
2640 ) -> Result<Self> {
2641 let initial_keys = derive_initial_key_material(destination_cid.as_bytes())?;
2642 let mut tls =
2643 NativeQuicTlsSession::client_with_initial_source_connection_id_and_zero_rtt_offer(
2644 server_name,
2645 fingerprint,
2646 &source_cid,
2647 tls_fingerprint,
2648 verify_peer,
2649 root_certs,
2650 use_platform_roots,
2651 session_der,
2652 early_data,
2653 )?;
2654 let client_initial = build_client_initial_packet_from_capture_with_size(
2655 tls.take_client_initial(),
2656 destination_cid.clone(),
2657 source_cid.clone(),
2658 fingerprint.transport.initial_datagram_size,
2659 )?;
2660 let client_early_data_keys = client_initial
2661 .secrets
2662 .iter()
2663 .find(|secret| {
2664 secret.direction == QuicSecretDirection::Write
2665 && secret.level == QuicEncryptionLevel::EarlyData
2666 })
2667 .map(QuicTlsSecret::packet_key_material)
2668 .transpose()?;
2669 let client_cid_inventory = new_client_cid_inventory(fingerprint, &source_cid);
2670
2671 Ok(Self {
2672 client_initial,
2673 pending_client_initial: None,
2674 tls,
2675 fingerprint: fingerprint.clone(),
2676 server_name: server_name.to_string(),
2677 tls_fingerprint: tls_fingerprint.cloned(),
2678 verify_peer,
2679 root_certs: root_certs.to_vec(),
2680 use_platform_roots,
2681 supported_versions: vec![QUIC_VERSION_1],
2682 client_initial_version: QUIC_VERSION_1,
2683 retry_received: false,
2684 vn_received: false,
2685 server_initial_or_handshake_seen: false,
2686 original_destination_cid: destination_cid.clone(),
2687 retry_source_cid: None,
2688 destination_cid,
2689 source_cid,
2690 client_initial_keys: initial_keys.client,
2691 server_initial_keys: initial_keys.server,
2692 client_handshake_keys: None,
2693 client_early_data_keys,
2694 server_handshake_keys: None,
2695 client_application_keys: None,
2696 server_application_keys: None,
2697 client_application_next_keys: None,
2698 server_application_next_keys: None,
2699 server_application_previous: None,
2700 write_key_phase: false,
2701 read_key_phase: false,
2702 application_key_update: OneRttKeyUpdate::default(),
2703 initial_crypto: QuicCryptoAssembler::default(),
2704 handshake_crypto: QuicCryptoAssembler::default(),
2705 initial_ack_tracker: QuicAckTracker::default(),
2706 handshake_ack_tracker: QuicAckTracker::default(),
2707 application_ack_tracker: QuicAckTracker::default(),
2708 client_initial_loss_detector: QuicLossDetector::default(),
2709 client_handshake_loss_detector: QuicLossDetector::default(),
2710 client_application_loss_detector: QuicLossDetector::default(),
2711 client_application_flow_control: QuicApplicationFlowControl::client(
2712 &fingerprint.transport,
2713 ),
2714 client_application_receive_flow_control: QuicReceiveFlowControl::client(
2715 &fingerprint.transport,
2716 ),
2717 client_initial_sent_crypto: BTreeMap::new(),
2718 client_handshake_sent_crypto: BTreeMap::new(),
2719 client_application_sent_streams: BTreeMap::new(),
2720 client_application_recovery_lost_packets: Vec::new(),
2721 client_application_ecn_congestion: false,
2722 client_path_validator: QuicPathValidator::default(),
2723 client_cid_inventory,
2724 client_pmtu_probe: QuicPmtuProbePolicy::from_transport(&fingerprint.transport),
2725 server_transport_parameters_validated: false,
2726 recovery: recovery_state_from_transport(&fingerprint.transport),
2727 next_client_initial_packet_number: 1,
2728 next_server_initial_packet_number: 0,
2729 next_server_handshake_packet_number: 0,
2730 next_client_handshake_packet_number: 0,
2731 next_server_application_packet_number: 0,
2732 next_client_application_packet_number: 0,
2733 next_client_bidirectional_stream_id: 0,
2734 next_client_unidirectional_stream_id: 2,
2735 client_handshake_crypto_offset: 0,
2736 client_stream_offsets: BTreeMap::new(),
2737 server_h3_stream_buffers: BTreeMap::new(),
2738 server_h3_stream_buffer_offsets: BTreeMap::new(),
2739 server_h3_stream_types: BTreeMap::new(),
2740 close_draining: false,
2741 close_state: QuicCloseState::default(),
2742 })
2743 }
2744
2745 #[allow(clippy::too_many_arguments)]
2746 pub fn client_with_tls_fingerprint(
2747 server_name: &str,
2748 fingerprint: &Http3Fingerprint,
2749 tls_fingerprint: Option<&TlsFingerprint>,
2750 destination_cid: ConnectionId,
2751 source_cid: ConnectionId,
2752 verify_peer: bool,
2753 root_certs: &[Vec<u8>],
2754 use_platform_roots: bool,
2755 ) -> Result<Self> {
2756 let initial_keys = derive_initial_key_material(destination_cid.as_bytes())?;
2757 let mut tls =
2758 NativeQuicTlsSession::client_with_initial_source_connection_id_and_verify_peer(
2759 server_name,
2760 fingerprint,
2761 &source_cid,
2762 tls_fingerprint,
2763 verify_peer,
2764 root_certs,
2765 use_platform_roots,
2766 )?;
2767 let client_initial = build_client_initial_packet_from_capture_with_size(
2768 tls.take_client_initial(),
2769 destination_cid.clone(),
2770 source_cid.clone(),
2771 fingerprint.transport.initial_datagram_size,
2772 )?;
2773 let client_cid_inventory = new_client_cid_inventory(fingerprint, &source_cid);
2774
2775 Ok(Self {
2776 client_initial,
2777 pending_client_initial: None,
2778 tls,
2779 fingerprint: fingerprint.clone(),
2780 server_name: server_name.to_string(),
2781 tls_fingerprint: tls_fingerprint.cloned(),
2782 verify_peer,
2783 root_certs: root_certs.to_vec(),
2784 use_platform_roots,
2785 supported_versions: vec![QUIC_VERSION_1],
2786 client_initial_version: QUIC_VERSION_1,
2787 retry_received: false,
2788 vn_received: false,
2789 server_initial_or_handshake_seen: false,
2790 original_destination_cid: destination_cid.clone(),
2791 retry_source_cid: None,
2792 destination_cid,
2793 source_cid,
2794 client_initial_keys: initial_keys.client,
2795 server_initial_keys: initial_keys.server,
2796 client_handshake_keys: None,
2797 client_early_data_keys: None,
2798 server_handshake_keys: None,
2799 client_application_keys: None,
2800 server_application_keys: None,
2801 client_application_next_keys: None,
2802 server_application_next_keys: None,
2803 server_application_previous: None,
2804 write_key_phase: false,
2805 read_key_phase: false,
2806 application_key_update: OneRttKeyUpdate::default(),
2807 initial_crypto: QuicCryptoAssembler::default(),
2808 handshake_crypto: QuicCryptoAssembler::default(),
2809 initial_ack_tracker: QuicAckTracker::default(),
2810 handshake_ack_tracker: QuicAckTracker::default(),
2811 application_ack_tracker: QuicAckTracker::default(),
2812 client_initial_loss_detector: QuicLossDetector::default(),
2813 client_handshake_loss_detector: QuicLossDetector::default(),
2814 client_application_loss_detector: QuicLossDetector::default(),
2815 client_application_flow_control: QuicApplicationFlowControl::client(
2816 &fingerprint.transport,
2817 ),
2818 client_application_receive_flow_control: QuicReceiveFlowControl::client(
2819 &fingerprint.transport,
2820 ),
2821 client_initial_sent_crypto: BTreeMap::new(),
2822 client_handshake_sent_crypto: BTreeMap::new(),
2823 client_application_sent_streams: BTreeMap::new(),
2824 client_application_recovery_lost_packets: Vec::new(),
2825 client_application_ecn_congestion: false,
2826 client_path_validator: QuicPathValidator::default(),
2827 client_cid_inventory,
2828 client_pmtu_probe: QuicPmtuProbePolicy::from_transport(&fingerprint.transport),
2829 server_transport_parameters_validated: false,
2830 recovery: recovery_state_from_transport(&fingerprint.transport),
2831 next_client_initial_packet_number: 1,
2832 next_server_initial_packet_number: 0,
2833 next_server_handshake_packet_number: 0,
2834 next_client_handshake_packet_number: 0,
2835 next_server_application_packet_number: 0,
2836 next_client_application_packet_number: 0,
2837 next_client_bidirectional_stream_id: 0,
2838 next_client_unidirectional_stream_id: 2,
2839 client_handshake_crypto_offset: 0,
2840 client_stream_offsets: BTreeMap::new(),
2841 server_h3_stream_buffers: BTreeMap::new(),
2842 server_h3_stream_buffer_offsets: BTreeMap::new(),
2843 server_h3_stream_types: BTreeMap::new(),
2844 close_draining: false,
2845 close_state: QuicCloseState::default(),
2846 })
2847 }
2848
2849 #[allow(clippy::too_many_arguments)]
2850 pub fn client_with_replayed_session_ticket(
2851 server_name: &str,
2852 fingerprint: &Http3Fingerprint,
2853 tls_fingerprint: Option<&TlsFingerprint>,
2854 destination_cid: ConnectionId,
2855 source_cid: ConnectionId,
2856 verify_peer: bool,
2857 root_certs: &[Vec<u8>],
2858 use_platform_roots: bool,
2859 session_ticket_der: &[u8],
2860 ) -> Result<Self> {
2861 let initial_keys = derive_initial_key_material(destination_cid.as_bytes())?;
2862 let mut tls =
2863 NativeQuicTlsSession::client_with_initial_source_connection_id_and_replayed_session(
2864 server_name,
2865 fingerprint,
2866 &source_cid,
2867 tls_fingerprint,
2868 verify_peer,
2869 root_certs,
2870 use_platform_roots,
2871 session_ticket_der,
2872 )?;
2873 let client_initial = build_client_initial_packet_from_capture_with_size(
2874 tls.take_client_initial(),
2875 destination_cid.clone(),
2876 source_cid.clone(),
2877 fingerprint.transport.initial_datagram_size,
2878 )?;
2879 let client_cid_inventory = new_client_cid_inventory(fingerprint, &source_cid);
2880
2881 Ok(Self {
2882 client_initial,
2883 pending_client_initial: None,
2884 tls,
2885 fingerprint: fingerprint.clone(),
2886 server_name: server_name.to_string(),
2887 tls_fingerprint: tls_fingerprint.cloned(),
2888 verify_peer,
2889 root_certs: root_certs.to_vec(),
2890 use_platform_roots,
2891 supported_versions: vec![QUIC_VERSION_1],
2892 client_initial_version: QUIC_VERSION_1,
2893 retry_received: false,
2894 vn_received: false,
2895 server_initial_or_handshake_seen: false,
2896 original_destination_cid: destination_cid.clone(),
2897 retry_source_cid: None,
2898 destination_cid,
2899 source_cid,
2900 client_initial_keys: initial_keys.client,
2901 server_initial_keys: initial_keys.server,
2902 client_handshake_keys: None,
2903 client_early_data_keys: None,
2904 server_handshake_keys: None,
2905 client_application_keys: None,
2906 server_application_keys: None,
2907 client_application_next_keys: None,
2908 server_application_next_keys: None,
2909 server_application_previous: None,
2910 write_key_phase: false,
2911 read_key_phase: false,
2912 application_key_update: OneRttKeyUpdate::default(),
2913 initial_crypto: QuicCryptoAssembler::default(),
2914 handshake_crypto: QuicCryptoAssembler::default(),
2915 initial_ack_tracker: QuicAckTracker::default(),
2916 handshake_ack_tracker: QuicAckTracker::default(),
2917 application_ack_tracker: QuicAckTracker::default(),
2918 client_initial_loss_detector: QuicLossDetector::default(),
2919 client_handshake_loss_detector: QuicLossDetector::default(),
2920 client_application_loss_detector: QuicLossDetector::default(),
2921 client_application_flow_control: QuicApplicationFlowControl::client(
2922 &fingerprint.transport,
2923 ),
2924 client_application_receive_flow_control: QuicReceiveFlowControl::client(
2925 &fingerprint.transport,
2926 ),
2927 client_initial_sent_crypto: BTreeMap::new(),
2928 client_handshake_sent_crypto: BTreeMap::new(),
2929 client_application_sent_streams: BTreeMap::new(),
2930 client_application_recovery_lost_packets: Vec::new(),
2931 client_application_ecn_congestion: false,
2932 client_path_validator: QuicPathValidator::default(),
2933 client_cid_inventory,
2934 client_pmtu_probe: QuicPmtuProbePolicy::from_transport(&fingerprint.transport),
2935 server_transport_parameters_validated: false,
2936 recovery: recovery_state_from_transport(&fingerprint.transport),
2937 next_client_initial_packet_number: 1,
2938 next_server_initial_packet_number: 0,
2939 next_server_handshake_packet_number: 0,
2940 next_client_handshake_packet_number: 0,
2941 next_server_application_packet_number: 0,
2942 next_client_application_packet_number: 0,
2943 next_client_bidirectional_stream_id: 0,
2944 next_client_unidirectional_stream_id: 2,
2945 client_handshake_crypto_offset: 0,
2946 client_stream_offsets: BTreeMap::new(),
2947 server_h3_stream_buffers: BTreeMap::new(),
2948 server_h3_stream_buffer_offsets: BTreeMap::new(),
2949 server_h3_stream_types: BTreeMap::new(),
2950 close_draining: false,
2951 close_state: QuicCloseState::default(),
2952 })
2953 }
2954
2955 pub fn take_session_tickets(&mut self) -> Vec<NativeH3SessionTicket> {
2956 self.tls.take_session_tickets()
2957 }
2958
2959 pub fn handshake_status(&self) -> NativeH3HandshakeStatus {
2966 self.tls.handshake_status()
2967 }
2968
2969 pub fn early_data_reason(&self) -> u32 {
2972 self.tls.early_data_reason()
2973 }
2974
2975 pub fn client_initial(&self) -> &ClientInitialPacket {
2976 &self.client_initial
2977 }
2978
2979 pub fn take_pending_client_initial(&mut self) -> Option<ClientInitialPacket> {
2980 self.pending_client_initial.take()
2981 }
2982
2983 pub fn supported_versions(&self) -> &[u32] {
2984 &self.supported_versions
2985 }
2986
2987 pub fn set_supported_versions(&mut self, versions: Vec<u32>) -> Result<()> {
2988 if versions.is_empty() {
2989 return Err(Error::Quic(
2990 "native H3 supported QUIC versions list cannot be empty".into(),
2991 ));
2992 }
2993 if !versions.contains(&self.client_initial_version) {
2994 return Err(Error::Quic(
2995 "native H3 supported QUIC versions must include the issued initial version".into(),
2996 ));
2997 }
2998 self.supported_versions = versions;
2999 Ok(())
3000 }
3001
3002 pub fn client_initial_version(&self) -> u32 {
3003 self.client_initial_version
3004 }
3005
3006 pub fn retry_received(&self) -> bool {
3007 self.retry_received
3008 }
3009
3010 pub fn version_negotiation_received(&self) -> bool {
3011 self.vn_received
3012 }
3013
3014 pub fn install_tls_secrets(&mut self, secrets: &[QuicTlsSecret]) -> Result<()> {
3015 for secret in secrets {
3016 if secret.direction == QuicSecretDirection::Read
3017 && secret.level == QuicEncryptionLevel::Handshake
3018 {
3019 self.server_handshake_keys = Some(secret.packet_key_material()?);
3020 } else if secret.direction == QuicSecretDirection::Write
3021 && secret.level == QuicEncryptionLevel::EarlyData
3022 {
3023 self.client_early_data_keys = Some(secret.packet_key_material()?);
3024 } else if secret.direction == QuicSecretDirection::Write
3025 && secret.level == QuicEncryptionLevel::Handshake
3026 {
3027 self.client_handshake_keys = Some(secret.packet_key_material()?);
3028 } else if secret.direction == QuicSecretDirection::Read
3029 && secret.level == QuicEncryptionLevel::Application
3030 {
3031 let keys = secret.packet_key_material()?;
3032 self.server_application_next_keys = Some(derive_next_packet_key_material(&keys)?);
3033 self.server_application_keys = Some(keys);
3034 } else if secret.direction == QuicSecretDirection::Write
3035 && secret.level == QuicEncryptionLevel::Application
3036 {
3037 let keys = secret.packet_key_material()?;
3038 self.client_application_next_keys = Some(derive_next_packet_key_material(&keys)?);
3039 self.client_application_keys = Some(keys);
3040 }
3041 }
3042 if self.is_application_ready() && !self.recovery.handshake_complete() {
3043 self.recovery.discard_space(PacketNumberSpace::Initial);
3044 self.recovery.discard_space(PacketNumberSpace::Handshake);
3045 self.recovery.mark_handshake_complete();
3046 }
3047 Ok(())
3048 }
3049
3050 pub fn server_handshake_keys(&self) -> Option<&QuicPacketKeyMaterial> {
3051 self.server_handshake_keys.as_ref()
3052 }
3053
3054 pub fn is_application_ready(&self) -> bool {
3055 self.client_application_keys.is_some() && self.server_application_keys.is_some()
3056 }
3057
3058 pub fn is_close_draining(&self) -> bool {
3059 self.close_draining
3060 }
3061
3062 pub fn close_state(&self) -> &QuicCloseState {
3063 &self.close_state
3064 }
3065
3066 pub fn close_state_mut(&mut self) -> &mut QuicCloseState {
3067 &mut self.close_state
3068 }
3069
3070 pub fn client_enter_closing(&mut self, now: Instant) {
3074 self.close_state.enter_closing(now);
3075 self.close_draining = true;
3076 }
3077
3078 pub fn client_enter_draining(&mut self, now: Instant) {
3082 self.close_state.enter_draining(now);
3083 self.close_draining = true;
3084 }
3085
3086 pub fn client_close_window(&self) -> Duration {
3089 self.client_application_loss_detector.close_window()
3090 }
3091
3092 pub fn client_is_close_window_expired(&self, now: Instant) -> bool {
3093 self.close_state.is_expired(now, self.client_close_window())
3094 }
3095
3096 pub fn client_close_time_until_expiry(&self, now: Instant) -> Option<Duration> {
3097 self.close_state
3098 .time_until_expiry(now, self.client_close_window())
3099 }
3100
3101 pub fn client_should_replay_connection_close(&self, now: Instant) -> bool {
3102 self.close_state.should_replay(now)
3103 }
3104
3105 pub fn client_mark_connection_close_replayed(&mut self, now: Instant) {
3106 self.close_state.mark_replayed(now);
3107 }
3108
3109 pub fn client_observe_inbound_packet_for_close(&mut self) -> u64 {
3110 self.close_state.observe_inbound_packet()
3111 }
3112
3113 pub fn client_application_pto(&self) -> Duration {
3114 self.client_application_loss_detector.current_pto()
3115 }
3116
3117 pub fn write_key_phase(&self) -> bool {
3119 self.write_key_phase
3120 }
3121
3122 pub fn read_key_phase(&self) -> bool {
3124 self.read_key_phase
3125 }
3126
3127 pub fn key_update_in_progress(&self) -> bool {
3130 self.application_key_update.write_update_in_progress
3131 }
3132
3133 pub fn force_key_update(&mut self) -> Result<()> {
3136 if self.application_key_update.write_update_in_progress {
3137 return Err(Error::Quic(
3138 "RFC9001 § 6.5: cannot initiate a new key update while a previous one is unconfirmed"
3139 .into(),
3140 ));
3141 }
3142 let next = self.client_application_next_keys.take().ok_or_else(|| {
3143 Error::Quic(
3144 "native QUIC client cannot force a key update before TLS application secrets are installed"
3145 .into(),
3146 )
3147 })?;
3148 self.client_application_keys = Some(next);
3149 let new_current = self
3150 .client_application_keys
3151 .as_ref()
3152 .expect("client application keys just installed");
3153 self.client_application_next_keys = Some(derive_next_packet_key_material(new_current)?);
3154 self.write_key_phase = !self.write_key_phase;
3155 self.application_key_update.write_update_in_progress = true;
3156 self.application_key_update.write_update_anchor =
3157 Some(self.next_client_application_packet_number);
3158 Ok(())
3159 }
3160
3161 fn commit_receive_key_update(&mut self, now: Instant) -> Result<()> {
3162 let Some(current) = self.server_application_keys.take() else {
3163 return Err(Error::Quic(
3164 "native QUIC client cannot rotate read keys without an installed current key set"
3165 .into(),
3166 ));
3167 };
3168 let Some(next) = self.server_application_next_keys.take() else {
3169 return Err(Error::Quic(
3170 "native QUIC client cannot rotate read keys without precomputed next key set"
3171 .into(),
3172 ));
3173 };
3174 let old_phase = self.read_key_phase;
3175 self.server_application_keys = Some(next);
3176 let new_current = self
3177 .server_application_keys
3178 .as_ref()
3179 .expect("server application keys just installed");
3180 self.server_application_next_keys = Some(derive_next_packet_key_material(new_current)?);
3181 self.server_application_previous = Some(PreviousKeys {
3182 keys: current,
3183 phase: old_phase,
3184 retire_at: now + PREVIOUS_KEY_WINDOW,
3185 });
3186 self.read_key_phase = !self.read_key_phase;
3187
3188 if self.write_key_phase != self.read_key_phase
3189 && !self.application_key_update.write_update_in_progress
3190 {
3191 let next_write = self.client_application_next_keys.take().ok_or_else(|| {
3192 Error::Quic(
3193 "native QUIC client cannot mirror peer key update without precomputed next write keys"
3194 .into(),
3195 )
3196 })?;
3197 self.client_application_keys = Some(next_write);
3198 let new_current_write = self
3199 .client_application_keys
3200 .as_ref()
3201 .expect("client application keys just rotated");
3202 self.client_application_next_keys =
3203 Some(derive_next_packet_key_material(new_current_write)?);
3204 self.write_key_phase = !self.write_key_phase;
3205 self.application_key_update.write_update_in_progress = true;
3206 self.application_key_update.write_update_anchor =
3207 Some(self.next_client_application_packet_number);
3208 }
3209
3210 Ok(())
3211 }
3212
3213 pub fn client_path_validation_pending_count(&self) -> usize {
3214 self.client_path_validator.pending_count()
3215 }
3216
3217 pub fn is_client_path_validated(&self, data: &[u8; 8]) -> bool {
3218 self.client_path_validator.is_validated(data)
3219 }
3220
3221 pub fn is_client_path_address_validated(&self, remote_address: &SocketAddr) -> bool {
3222 self.client_path_validator
3223 .is_address_validated(remote_address)
3224 }
3225
3226 pub fn client_path_migration_connection_id(
3227 &self,
3228 remote_address: &SocketAddr,
3229 ) -> Option<&ConnectionId> {
3230 self.client_path_validator
3231 .migration_connection_id(remote_address)
3232 }
3233
3234 pub fn client_pmtu_current_size(&self) -> usize {
3235 self.client_pmtu_probe.current_size()
3236 }
3237
3238 pub fn client_pmtu_pending_probe_size(&self) -> Option<usize> {
3239 self.client_pmtu_probe.pending_probe_size()
3240 }
3241
3242 pub fn client_application_lost_packets(&self) -> Vec<u64> {
3243 self.client_application_loss_detector.lost_packets()
3244 }
3245
3246 pub fn take_client_application_ecn_congestion(&mut self) -> bool {
3247 std::mem::take(&mut self.client_application_ecn_congestion)
3248 }
3249
3250 pub fn client_application_smoothed_rtt(&self) -> Option<Duration> {
3254 self.client_application_loss_detector.smoothed_rtt()
3255 }
3256
3257 pub fn client_application_min_rtt(&self) -> Option<Duration> {
3260 self.client_application_loss_detector.min_rtt()
3261 }
3262
3263 pub fn retransmit_lost_client_application_stream_packets(
3264 &mut self,
3265 ) -> Result<Vec<ClientApplicationPacket>> {
3266 let mut lost_packets = self.client_application_loss_detector.lost_packets();
3267 lost_packets.append(&mut self.client_application_recovery_lost_packets);
3268 self.retransmit_client_application_stream_packets(lost_packets)
3269 }
3270
3271 pub fn retransmit_pto_client_application_stream_packets(
3272 &mut self,
3273 now: Instant,
3274 pto_timeout: Duration,
3275 ) -> Result<Vec<ClientApplicationPacket>> {
3276 let expired_packets = self
3277 .client_application_loss_detector
3278 .pto_expired_packets(now, pto_timeout);
3279 self.retransmit_client_application_stream_packets(expired_packets)
3280 }
3281
3282 fn retransmit_client_application_stream_packets<I>(
3283 &mut self,
3284 packet_numbers: I,
3285 ) -> Result<Vec<ClientApplicationPacket>>
3286 where
3287 I: IntoIterator<Item = u64>,
3288 {
3289 let mut packet_numbers = packet_numbers.into_iter().collect::<Vec<_>>();
3290 packet_numbers.sort_unstable();
3291 packet_numbers.dedup();
3292 let mut retransmits = Vec::new();
3293 for packet_number in packet_numbers {
3294 self.client_application_loss_detector
3295 .retire_packet(packet_number);
3296 let Some(sent) = self.client_application_sent_streams.remove(&packet_number) else {
3297 continue;
3298 };
3299 retransmits.push(self.build_client_application_stream_packet_at_offset(
3300 sent.stream_id,
3301 sent.stream_offset,
3302 sent.data,
3303 sent.fin,
3304 )?);
3305 }
3306 Ok(retransmits)
3307 }
3308
3309 pub fn build_client_initial_ack_packet(&mut self) -> Result<Option<ClientAckPacket>> {
3310 let packet = build_ack_packet(
3311 LongHeaderType::Initial,
3312 &self.client_initial_keys,
3313 &self.destination_cid,
3314 &self.source_cid,
3315 &mut self.initial_ack_tracker,
3316 self.next_client_initial_packet_number,
3317 )?;
3318 if packet.is_some() {
3319 self.next_client_initial_packet_number += 1;
3320 }
3321 Ok(packet)
3322 }
3323
3324 pub fn build_client_handshake_ack_packet(&mut self) -> Result<Option<ClientAckPacket>> {
3325 if self.handshake_ack_tracker.is_empty() {
3326 return Ok(None);
3327 }
3328 let Some(client_handshake_keys) = &self.client_handshake_keys else {
3329 return Err(Error::Quic(
3330 "native Handshake ACK encryption is waiting for TLS Handshake keys".into(),
3331 ));
3332 };
3333 let packet = build_ack_packet(
3334 LongHeaderType::Handshake,
3335 client_handshake_keys,
3336 &self.destination_cid,
3337 &self.source_cid,
3338 &mut self.handshake_ack_tracker,
3339 self.next_client_handshake_packet_number,
3340 )?;
3341 if packet.is_some() {
3342 self.next_client_handshake_packet_number += 1;
3343 }
3344 Ok(packet)
3345 }
3346
3347 pub fn build_client_application_ack_packet(
3348 &mut self,
3349 ) -> Result<Option<ClientApplicationAckPacket>> {
3350 self.build_client_application_ack_packet_with_delay(Instant::now(), 0)
3351 }
3352
3353 pub fn build_client_application_ack_packet_with_delay(
3354 &mut self,
3355 now: Instant,
3356 ack_delay_exponent: u64,
3357 ) -> Result<Option<ClientApplicationAckPacket>> {
3358 if self.application_ack_tracker.is_empty() {
3359 return Ok(None);
3360 }
3361 let Some(client_application_keys) = &self.client_application_keys else {
3362 return Err(Error::Quic(
3363 "native application ACK encryption is waiting for TLS application keys".into(),
3364 ));
3365 };
3366
3367 let packet_number = self.next_client_application_packet_number;
3368 let packet_number_len = 2;
3369 let frame = encode_frame(
3370 &self
3371 .application_ack_tracker
3372 .to_ack_frame_with_delay(now, ack_delay_exponent)?,
3373 );
3374 let packet = protect_short_header_packet(
3375 client_application_keys,
3376 &self.destination_cid,
3377 packet_number,
3378 packet_number_len,
3379 self.write_key_phase,
3380 &frame,
3381 )?;
3382 self.application_ack_tracker.mark_ack_sent();
3383 self.next_client_application_packet_number += 1;
3384
3385 Ok(Some(ClientApplicationAckPacket {
3386 packet,
3387 packet_number,
3388 packet_number_offset: 1 + self.destination_cid.as_bytes().len(),
3389 }))
3390 }
3391
3392 pub fn build_client_application_ack_packet_after(
3393 &mut self,
3394 threshold: usize,
3395 ) -> Result<Option<ClientApplicationAckPacket>> {
3396 if !self.application_ack_tracker.should_ack_after(threshold) {
3397 return Ok(None);
3398 }
3399 self.build_client_application_ack_packet()
3400 }
3401
3402 pub fn build_client_application_ack_packet_after_or_delay(
3403 &mut self,
3404 threshold: usize,
3405 max_ack_delay: Duration,
3406 now: Instant,
3407 ack_delay_exponent: u64,
3408 ) -> Result<Option<ClientApplicationAckPacket>> {
3409 if !self
3410 .application_ack_tracker
3411 .should_ack_after_or_delay(threshold, max_ack_delay, now)
3412 {
3413 return Ok(None);
3414 }
3415 self.build_client_application_ack_packet_with_delay(now, ack_delay_exponent)
3416 }
3417
3418 pub fn client_application_ack_deadline(&self, max_ack_delay: Duration) -> Option<Instant> {
3419 self.application_ack_tracker
3420 .pending_ack_deadline(max_ack_delay)
3421 }
3422
3423 pub fn build_client_handshake_crypto_packet(
3424 &mut self,
3425 crypto_data: Bytes,
3426 ) -> Result<Option<ClientHandshakePacket>> {
3427 if crypto_data.is_empty() {
3428 return Ok(None);
3429 }
3430
3431 let crypto_offset = self.client_handshake_crypto_offset;
3432 let packet =
3433 self.build_client_handshake_crypto_packet_at_offset(crypto_offset, crypto_data)?;
3434 self.client_handshake_crypto_offset += packet.crypto_data.len() as u64;
3435
3436 Ok(Some(packet))
3437 }
3438
3439 pub fn retransmit_pto_client_handshake_crypto_packets(
3440 &mut self,
3441 now: Instant,
3442 pto: Duration,
3443 ) -> Result<Vec<ClientHandshakePacket>> {
3444 let expired_packets = self
3445 .client_handshake_loss_detector
3446 .pto_expired_packets(now, pto);
3447 let mut retransmits = Vec::new();
3448 for packet_number in expired_packets {
3449 self.client_handshake_loss_detector
3450 .retire_packet(packet_number);
3451 let Some(sent) = self.client_handshake_sent_crypto.remove(&packet_number) else {
3452 continue;
3453 };
3454 if sent.packet_type != LongHeaderType::Handshake {
3455 continue;
3456 }
3457 retransmits.push(
3458 self.build_client_handshake_crypto_packet_at_offset_with_sent_at(
3459 sent.crypto_offset,
3460 sent.crypto_data,
3461 now,
3462 )?,
3463 );
3464 }
3465 Ok(retransmits)
3466 }
3467
3468 pub fn record_client_initial_sent_at(&mut self, sent_at: Instant) {
3473 let packet_number = self.next_client_initial_packet_number.saturating_sub(1);
3474 if self.client_initial_sent_crypto.contains_key(&packet_number) {
3475 return;
3476 }
3477 let packet_size = self.client_initial.packet.len();
3478 let crypto_data = self.client_initial.crypto_data.clone();
3479 self.client_initial_loss_detector
3480 .on_packet_sent_at(packet_number, sent_at);
3481 self.client_initial_sent_crypto.insert(
3482 packet_number,
3483 SentCryptoPacket {
3484 packet_type: LongHeaderType::Initial,
3485 crypto_offset: 0,
3486 crypto_data,
3487 },
3488 );
3489 self.recovery.on_packet_sent(
3490 PacketNumberSpace::Initial,
3491 packet_number,
3492 SentPacketInfo::new(sent_at, packet_size, true, true),
3493 );
3494 }
3495
3496 pub fn retransmit_pto_client_initial_crypto_packets(
3501 &mut self,
3502 now: Instant,
3503 pto: Duration,
3504 ) -> Result<Vec<ClientInitialPacket>> {
3505 let expired_packets = self
3506 .client_initial_loss_detector
3507 .pto_expired_packets(now, pto);
3508 let mut retransmits = Vec::new();
3509 for packet_number in expired_packets {
3510 self.client_initial_loss_detector
3511 .retire_packet(packet_number);
3512 let Some(sent) = self.client_initial_sent_crypto.remove(&packet_number) else {
3513 continue;
3514 };
3515 if sent.packet_type != LongHeaderType::Initial {
3516 continue;
3517 }
3518 retransmits.push(self.build_client_initial_crypto_pto_packet(sent.crypto_data, now)?);
3519 }
3520 Ok(retransmits)
3521 }
3522
3523 fn build_client_initial_crypto_pto_packet(
3524 &mut self,
3525 crypto_data: Bytes,
3526 sent_at: Instant,
3527 ) -> Result<ClientInitialPacket> {
3528 let packet_number = self.next_client_initial_packet_number;
3529 let token = decode_long_header(&self.client_initial.header)?.token;
3530 let packet = build_client_initial_packet_with_token_and_version(
3531 &self.fingerprint,
3532 crypto_data.clone(),
3533 self.client_initial.transport_parameters.clone(),
3534 self.client_initial.secrets.clone(),
3535 self.destination_cid.clone(),
3536 self.source_cid.clone(),
3537 token,
3538 packet_number,
3539 self.client_initial_version,
3540 )?;
3541 let packet_size = packet.packet.len();
3542 self.next_client_initial_packet_number = packet_number + 1;
3543 self.client_initial_loss_detector
3544 .on_packet_sent_at(packet_number, sent_at);
3545 self.client_initial_sent_crypto.insert(
3546 packet_number,
3547 SentCryptoPacket {
3548 packet_type: LongHeaderType::Initial,
3549 crypto_offset: 0,
3550 crypto_data,
3551 },
3552 );
3553 self.recovery.on_packet_sent(
3554 PacketNumberSpace::Initial,
3555 packet_number,
3556 SentPacketInfo::new(sent_at, packet_size, true, true),
3557 );
3558 Ok(packet)
3559 }
3560
3561 pub fn recovery(&self) -> &RecoveryState {
3564 &self.recovery
3565 }
3566
3567 pub fn on_loss_detection_timeout(&mut self, now: Instant) -> LossDetectionOutcome {
3571 self.recovery.on_loss_detection_timeout(now)
3572 }
3573
3574 pub fn loss_detection_timer(&self) -> Option<Instant> {
3577 self.recovery.loss_detection_timer()
3578 }
3579
3580 pub fn application_pto(&self) -> Duration {
3583 self.recovery.current_pto()
3584 }
3585
3586 pub fn application_pto_timeout(&self) -> Duration {
3589 let max_ack_delay = self.recovery.max_ack_delay();
3590 let backoff = 1u32 << self.recovery.pto_count().min(31);
3591 self.recovery
3592 .current_pto()
3593 .saturating_add(max_ack_delay.saturating_mul(backoff))
3594 }
3595
3596 pub fn mark_handshake_confirmed(&mut self) {
3599 self.recovery.mark_handshake_complete();
3600 }
3601
3602 pub fn discard_packet_space(&mut self, space: PacketNumberSpace) {
3607 self.recovery.discard_space(space);
3608 }
3609
3610 fn build_client_handshake_crypto_packet_at_offset(
3611 &mut self,
3612 crypto_offset: u64,
3613 crypto_data: Bytes,
3614 ) -> Result<ClientHandshakePacket> {
3615 self.build_client_handshake_crypto_packet_at_offset_with_sent_at(
3616 crypto_offset,
3617 crypto_data,
3618 Instant::now(),
3619 )
3620 }
3621
3622 fn build_client_handshake_crypto_packet_at_offset_with_sent_at(
3623 &mut self,
3624 crypto_offset: u64,
3625 crypto_data: Bytes,
3626 sent_at: Instant,
3627 ) -> Result<ClientHandshakePacket> {
3628 let Some(client_handshake_keys) = &self.client_handshake_keys else {
3629 return Err(Error::Quic(
3630 "native Handshake packet encryption is waiting for TLS Handshake keys".into(),
3631 ));
3632 };
3633
3634 let packet_number = self.next_client_handshake_packet_number;
3635 let packet_number_len = 2;
3636 let frame = encode_frame(&QuicFrame::Crypto {
3637 offset: crypto_offset,
3638 data: crypto_data.clone(),
3639 });
3640 let header = encode_long_header(&LongHeaderPacket {
3641 packet_type: LongHeaderType::Handshake,
3642 version: 1,
3643 destination_cid: self.destination_cid.clone(),
3644 source_cid: self.source_cid.clone(),
3645 token: Bytes::new(),
3646 packet_number,
3647 packet_number_len,
3648 payload_len: frame.len() + 16,
3649 })?;
3650 let packet_number_offset = header
3651 .len()
3652 .checked_sub(packet_number_len)
3653 .ok_or_else(|| Error::HttpProtocol("invalid QUIC Handshake header length".into()))?;
3654 let packet = protect_long_header_packet(
3655 client_handshake_keys,
3656 packet_number,
3657 &header,
3658 packet_number_offset,
3659 packet_number_len,
3660 &frame,
3661 )?;
3662
3663 let packet_size = packet.len();
3664 self.next_client_handshake_packet_number += 1;
3665 self.client_handshake_loss_detector
3666 .on_packet_sent_at(packet_number, sent_at);
3667 self.client_handshake_sent_crypto.insert(
3668 packet_number,
3669 SentCryptoPacket {
3670 packet_type: LongHeaderType::Handshake,
3671 crypto_offset,
3672 crypto_data: crypto_data.clone(),
3673 },
3674 );
3675 self.recovery.on_packet_sent(
3676 PacketNumberSpace::Handshake,
3677 packet_number,
3678 SentPacketInfo::new(sent_at, packet_size, true, true),
3679 );
3680
3681 Ok(ClientHandshakePacket {
3682 packet,
3683 packet_number,
3684 packet_number_offset,
3685 crypto_data,
3686 })
3687 }
3688
3689 pub fn build_client_application_stream_packet(
3690 &mut self,
3691 stream_id: u64,
3692 data: Bytes,
3693 fin: bool,
3694 ) -> Result<Option<ClientApplicationPacket>> {
3695 if data.is_empty() && !fin {
3696 return Ok(None);
3697 }
3698 let stream_offset = *self.client_stream_offsets.get(&stream_id).unwrap_or(&0);
3699 self.client_application_flow_control.consume_stream_data(
3700 stream_id,
3701 stream_offset,
3702 data.len(),
3703 )?;
3704 let packet = self.build_client_application_stream_packet_at_offset(
3705 stream_id,
3706 stream_offset,
3707 data,
3708 fin,
3709 )?;
3710 self.client_stream_offsets
3711 .insert(stream_id, stream_offset + packet.data.len() as u64);
3712 Ok(Some(packet))
3713 }
3714
3715 pub fn build_client_h3_zero_rtt_request_packet(
3716 &mut self,
3717 method: &http::Method,
3718 uri: &http::Uri,
3719 headers: impl Into<Headers>,
3720 body: Option<Bytes>,
3721 ) -> Result<ClientApplicationPacket> {
3722 let headers = headers.into();
3723 let stream_id = self.next_client_bidirectional_stream_id;
3724 let h3_headers = native::build_request_headers(method, uri, &headers)?;
3725 let payload =
3726 native::encode_request_stream_with_fingerprint(&h3_headers, body, &self.fingerprint);
3727
3728 let packet = self
3729 .build_client_zero_rtt_stream_packet(stream_id, payload, true)?
3730 .ok_or_else(|| {
3731 Error::HttpProtocol("native H3 0-RTT request produced no payload".into())
3732 })?;
3733 self.next_client_bidirectional_stream_id += 4;
3734 Ok(packet)
3735 }
3736
3737 pub fn build_client_h3_replay_request_packet(
3738 &mut self,
3739 stream_id: u64,
3740 method: &http::Method,
3741 uri: &http::Uri,
3742 headers: impl Into<Headers>,
3743 body: Option<Bytes>,
3744 ) -> Result<ClientApplicationPacket> {
3745 let headers = headers.into();
3746 let h3_headers = native::build_request_headers(method, uri, &headers)?;
3747 let payload =
3748 native::encode_request_stream_with_fingerprint(&h3_headers, body, &self.fingerprint);
3749 let payload_len = payload.len() as u64;
3750 let packet =
3751 self.build_client_application_stream_packet_at_offset(stream_id, 0, payload, true)?;
3752 self.client_stream_offsets.insert(stream_id, payload_len);
3753 Ok(packet)
3754 }
3755
3756 fn build_client_zero_rtt_stream_packet(
3757 &mut self,
3758 stream_id: u64,
3759 data: Bytes,
3760 fin: bool,
3761 ) -> Result<Option<ClientApplicationPacket>> {
3762 if data.is_empty() && !fin {
3763 return Ok(None);
3764 }
3765 let stream_offset = *self.client_stream_offsets.get(&stream_id).unwrap_or(&0);
3766 self.client_application_flow_control.consume_stream_data(
3767 stream_id,
3768 stream_offset,
3769 data.len(),
3770 )?;
3771 let Some(client_early_data_keys) = &self.client_early_data_keys else {
3772 return Err(Error::Quic(
3773 "native 0-RTT packet encryption is waiting for TLS early-data keys".into(),
3774 ));
3775 };
3776
3777 let packet_number = self.next_client_application_packet_number;
3778 let packet_number_len = 2;
3779 let frame = encode_frame(&QuicFrame::Stream {
3780 stream_id,
3781 offset: (stream_offset > 0).then_some(stream_offset),
3782 fin,
3783 data: data.clone(),
3784 });
3785 let header = encode_long_header(&LongHeaderPacket {
3786 packet_type: LongHeaderType::ZeroRtt,
3787 version: QUIC_VERSION_1,
3788 destination_cid: self.destination_cid.clone(),
3789 source_cid: self.source_cid.clone(),
3790 token: Bytes::new(),
3791 packet_number,
3792 packet_number_len,
3793 payload_len: frame.len() + AES_GCM_TAG_LEN,
3794 })?;
3795 let packet_number_offset = header
3796 .len()
3797 .checked_sub(packet_number_len)
3798 .ok_or_else(|| Error::HttpProtocol("invalid QUIC 0-RTT header length".into()))?;
3799 let packet = protect_long_header_packet(
3800 client_early_data_keys,
3801 packet_number,
3802 &header,
3803 packet_number_offset,
3804 packet_number_len,
3805 &frame,
3806 )?;
3807
3808 let now = Instant::now();
3809 let packet_size = packet.len();
3810 self.client_application_loss_detector
3811 .on_packet_sent_at(packet_number, now);
3812 self.client_application_sent_streams.insert(
3813 packet_number,
3814 SentApplicationStreamPacket {
3815 stream_id,
3816 stream_offset,
3817 fin,
3818 data: data.clone(),
3819 },
3820 );
3821 self.recovery.on_packet_sent(
3822 PacketNumberSpace::Application,
3823 packet_number,
3824 SentPacketInfo::new(now, packet_size, true, true),
3825 );
3826 self.next_client_application_packet_number += 1;
3827 self.client_stream_offsets
3828 .insert(stream_id, stream_offset + data.len() as u64);
3829
3830 Ok(Some(ClientApplicationPacket {
3831 packet,
3832 packet_number,
3833 stream_id,
3834 packet_number_offset,
3835 data,
3836 }))
3837 }
3838
3839 fn build_client_application_stream_packet_at_offset(
3840 &mut self,
3841 stream_id: u64,
3842 stream_offset: u64,
3843 data: Bytes,
3844 fin: bool,
3845 ) -> Result<ClientApplicationPacket> {
3846 let Some(client_application_keys) = &self.client_application_keys else {
3847 return Err(Error::Quic(
3848 "native application packet encryption is waiting for TLS application keys".into(),
3849 ));
3850 };
3851
3852 let packet_number = self.next_client_application_packet_number;
3853 let packet_number_len = 2;
3854 let frame = encode_frame(&QuicFrame::Stream {
3855 stream_id,
3856 offset: (stream_offset > 0).then_some(stream_offset),
3857 fin,
3858 data: data.clone(),
3859 });
3860 let packet = protect_short_header_packet(
3861 client_application_keys,
3862 &self.destination_cid,
3863 packet_number,
3864 packet_number_len,
3865 self.write_key_phase,
3866 &frame,
3867 )?;
3868
3869 let now = Instant::now();
3870 let packet_size = packet.len();
3871 self.client_application_loss_detector
3872 .on_packet_sent_at(packet_number, now);
3873 self.client_application_sent_streams.insert(
3874 packet_number,
3875 SentApplicationStreamPacket {
3876 stream_id,
3877 stream_offset,
3878 fin,
3879 data: data.clone(),
3880 },
3881 );
3882 self.recovery.on_packet_sent(
3883 PacketNumberSpace::Application,
3884 packet_number,
3885 SentPacketInfo::new(now, packet_size, true, true),
3886 );
3887 self.next_client_application_packet_number += 1;
3888
3889 Ok(ClientApplicationPacket {
3890 packet,
3891 packet_number,
3892 stream_id,
3893 packet_number_offset: 1 + self.destination_cid.as_bytes().len(),
3894 data,
3895 })
3896 }
3897
3898 pub fn build_client_h3_preface_packets(
3899 &mut self,
3900 fingerprint: &Http3Fingerprint,
3901 ) -> Result<Vec<ClientApplicationPacket>> {
3902 if self.client_application_keys.is_none() {
3903 return Err(Error::Quic(
3904 "native application packet encryption is waiting for TLS application keys".into(),
3905 ));
3906 }
3907
3908 let mut packets = Vec::new();
3909 for stream in native::encode_client_preface_streams(fingerprint) {
3910 let stream_id = self.next_client_unidirectional_stream_id;
3911 self.next_client_unidirectional_stream_id += 4;
3912 let payload = native::encode_unidirectional_stream(&stream);
3913 if let Some(packet) =
3914 self.build_client_application_stream_packet(stream_id, payload, false)?
3915 {
3916 packets.push(packet);
3917 }
3918 }
3919 Ok(packets)
3920 }
3921
3922 pub fn build_client_h3_request_packet(
3923 &mut self,
3924 method: &http::Method,
3925 uri: &http::Uri,
3926 headers: impl Into<Headers>,
3927 body: Option<Bytes>,
3928 ) -> Result<ClientApplicationPacket> {
3929 let headers = headers.into();
3930 if self.client_application_keys.is_none() {
3931 return Err(Error::Quic(
3932 "native application packet encryption is waiting for TLS application keys".into(),
3933 ));
3934 }
3935
3936 let stream_id = self.next_client_bidirectional_stream_id;
3937 let payload = self.encode_client_h3_request_payload(method, uri, &headers, body)?;
3938
3939 let packet = self
3940 .build_client_application_stream_packet(stream_id, payload, true)?
3941 .ok_or_else(|| Error::HttpProtocol("native H3 request produced no payload".into()))?;
3942 self.next_client_bidirectional_stream_id += 4;
3943 Ok(packet)
3944 }
3945
3946 pub fn build_client_h3_request_start_packet(
3947 &mut self,
3948 method: &http::Method,
3949 uri: &http::Uri,
3950 headers: impl Into<Headers>,
3951 body: Option<Bytes>,
3952 fin: bool,
3953 ) -> Result<ClientApplicationPacket> {
3954 let headers = headers.into();
3955 if self.client_application_keys.is_none() {
3956 return Err(Error::Quic(
3957 "native application packet encryption is waiting for TLS application keys".into(),
3958 ));
3959 }
3960
3961 let stream_id = self.next_client_bidirectional_stream_id;
3962 let payload = self.encode_client_h3_request_payload(method, uri, &headers, body)?;
3963
3964 let packet = self
3965 .build_client_application_stream_packet(stream_id, payload, fin)?
3966 .ok_or_else(|| {
3967 Error::HttpProtocol("native H3 request start produced no payload".into())
3968 })?;
3969 self.next_client_bidirectional_stream_id += 4;
3970 Ok(packet)
3971 }
3972
3973 pub fn retire_client_application_packet(&mut self, packet_number: u64) {
3974 self.client_application_loss_detector
3975 .retire_packet(packet_number);
3976 self.client_application_sent_streams.remove(&packet_number);
3977 }
3978
3979 fn encode_client_h3_request_payload(
3980 &self,
3981 method: &http::Method,
3982 uri: &http::Uri,
3983 headers: &Headers,
3984 body: Option<Bytes>,
3985 ) -> Result<Bytes> {
3986 let h3_headers = native::build_request_headers(method, uri, headers)?;
3987 Ok(native::encode_request_stream_with_fingerprint(
3988 &h3_headers,
3989 body,
3990 &self.fingerprint,
3991 ))
3992 }
3993
3994 pub fn build_client_h3_websocket_connect_packet(
3995 &mut self,
3996 uri: &http::Uri,
3997 headers: impl Into<Headers>,
3998 ) -> Result<ClientApplicationPacket> {
3999 let headers = headers.into();
4000 if self.client_application_keys.is_none() {
4001 return Err(Error::Quic(
4002 "native application packet encryption is waiting for TLS application keys".into(),
4003 ));
4004 }
4005
4006 let stream_id = self.next_client_bidirectional_stream_id;
4007 let h3_headers = native::build_websocket_connect_headers(uri, &headers)?;
4008 let payload =
4009 native::encode_request_stream_with_fingerprint(&h3_headers, None, &self.fingerprint);
4010
4011 let packet = self
4012 .build_client_application_stream_packet(stream_id, payload, false)?
4013 .ok_or_else(|| Error::HttpProtocol("native H3 CONNECT produced no payload".into()))?;
4014 self.next_client_bidirectional_stream_id += 4;
4015 Ok(packet)
4016 }
4017
4018 pub fn build_client_h3_data_packet(
4019 &mut self,
4020 stream_id: u64,
4021 data: Bytes,
4022 fin: bool,
4023 ) -> Result<Option<ClientApplicationPacket>> {
4024 let payload = if data.is_empty() {
4025 Bytes::new()
4026 } else {
4027 native::encode_frame(&native::H3Frame::Data(data))
4028 };
4029 self.build_client_application_stream_packet(stream_id, payload, fin)
4030 }
4031
4032 pub fn build_client_reset_stream_packet(
4033 &mut self,
4034 stream_id: u64,
4035 error_code: u64,
4036 ) -> Result<ClientApplicationControlPacket> {
4037 let final_size = *self.client_stream_offsets.get(&stream_id).unwrap_or(&0);
4038 self.build_client_application_control_packet(QuicFrame::ResetStream {
4039 stream_id,
4040 error_code,
4041 final_size,
4042 })
4043 }
4044
4045 pub fn build_client_stop_sending_packet(
4046 &mut self,
4047 stream_id: u64,
4048 error_code: u64,
4049 ) -> Result<ClientApplicationControlPacket> {
4050 self.build_client_application_control_packet(QuicFrame::StopSending {
4051 stream_id,
4052 error_code,
4053 })
4054 }
4055
4056 pub fn build_client_path_response_packet(
4057 &mut self,
4058 data: [u8; 8],
4059 ) -> Result<ClientApplicationControlPacket> {
4060 self.build_client_application_control_packet(QuicFrame::PathResponse(data))
4061 }
4062
4063 pub fn build_client_new_connection_id_packet(
4064 &mut self,
4065 sequence_number: u64,
4066 retire_prior_to: u64,
4067 connection_id: ConnectionId,
4068 stateless_reset_token: [u8; 16],
4069 ) -> Result<ClientApplicationControlPacket> {
4070 if connection_id.as_bytes().is_empty() {
4071 return Err(Error::Quic(
4072 "native QUIC NEW_CONNECTION_ID cannot carry an empty connection id".into(),
4073 ));
4074 }
4075 if retire_prior_to > sequence_number {
4076 return Err(Error::Quic(
4077 "native QUIC NEW_CONNECTION_ID retire_prior_to exceeds sequence_number".into(),
4078 ));
4079 }
4080 self.client_cid_inventory.register_local_issued(
4081 sequence_number,
4082 connection_id.clone(),
4083 stateless_reset_token,
4084 )?;
4085 self.build_client_application_control_packet(QuicFrame::NewConnectionId {
4086 sequence_number,
4087 retire_prior_to,
4088 connection_id: Bytes::copy_from_slice(connection_id.as_bytes()),
4089 stateless_reset_token,
4090 })
4091 }
4092
4093 pub fn build_client_path_challenge_packet(
4094 &mut self,
4095 data: [u8; 8],
4096 ) -> Result<ClientApplicationControlPacket> {
4097 let frame = self.client_path_validator.path_challenge(data);
4098 self.build_client_application_control_packet(frame)
4099 }
4100
4101 pub fn build_client_path_challenge_packet_for_address(
4102 &mut self,
4103 remote_address: SocketAddr,
4104 connection_id_sequence: u64,
4105 data: [u8; 8],
4106 ) -> Result<ClientApplicationControlPacket> {
4107 let destination_cid = self
4108 .client_path_validator
4109 .connection_id(connection_id_sequence)
4110 .cloned()
4111 .ok_or_else(|| {
4112 Error::Quic("native QUIC path migration requires an available connection id".into())
4113 })?;
4114 let frame = self.client_path_validator.path_challenge_for_address(
4115 remote_address,
4116 connection_id_sequence,
4117 data,
4118 )?;
4119 self.build_client_application_control_packet_to(frame, &destination_cid)
4120 }
4121
4122 pub fn build_client_pmtu_probe_packet(
4123 &mut self,
4124 now: Instant,
4125 ) -> Result<Option<ClientApplicationControlPacket>> {
4126 let Some(target_size) = self.client_pmtu_probe.next_probe_size() else {
4127 return Ok(None);
4128 };
4129 let packet = self.build_client_application_probe_packet(target_size, now)?;
4130 self.client_pmtu_probe
4131 .on_probe_sent(packet.packet_number, packet.packet.len(), now);
4132 Ok(Some(packet))
4133 }
4134
4135 pub fn build_client_connection_close_packet(
4136 &mut self,
4137 error_code: u64,
4138 reason: Bytes,
4139 ) -> Result<ClientApplicationControlPacket> {
4140 let packet = self.build_client_application_control_packet(QuicFrame::ConnectionClose {
4141 error_code,
4142 frame_type: None,
4143 reason,
4144 })?;
4145 self.client_enter_closing(Instant::now());
4150 Ok(packet)
4151 }
4152
4153 pub fn build_client_max_data_packet(
4154 &mut self,
4155 max_data: u64,
4156 ) -> Result<ClientApplicationControlPacket> {
4157 self.build_client_application_control_packet(QuicFrame::MaxData(max_data))
4158 }
4159
4160 pub fn build_client_max_stream_data_packet(
4161 &mut self,
4162 stream_id: u64,
4163 max_stream_data: u64,
4164 ) -> Result<ClientApplicationControlPacket> {
4165 self.build_client_application_control_packet(QuicFrame::MaxStreamData {
4166 stream_id,
4167 max_stream_data,
4168 })
4169 }
4170
4171 pub fn build_client_max_streams_packet(
4172 &mut self,
4173 bidirectional: bool,
4174 max_streams: u64,
4175 ) -> Result<ClientApplicationControlPacket> {
4176 self.build_client_application_control_packet(QuicFrame::MaxStreams {
4177 bidirectional,
4178 max_streams,
4179 })
4180 }
4181
4182 pub fn build_client_flow_control_blocked_packet(
4183 &mut self,
4184 ) -> Result<Option<ClientApplicationControlPacket>> {
4185 self.client_application_flow_control
4186 .take_blocked_frame()
4187 .map(|frame| self.build_client_application_control_packet(frame))
4188 .transpose()
4189 }
4190
4191 pub fn build_client_receive_flow_control_update_packets(
4192 &mut self,
4193 ) -> Result<Vec<ClientApplicationControlPacket>> {
4194 self.client_application_receive_flow_control
4195 .take_update_frames()
4196 .into_iter()
4197 .map(|frame| self.build_client_application_control_packet(frame))
4198 .collect()
4199 }
4200
4201 pub fn record_client_stream_consumed(&mut self, stream_id: u64, len: u64) -> Result<()> {
4206 self.client_application_receive_flow_control
4207 .record_stream_consumed(stream_id, len)
4208 }
4209
4210 pub fn release_client_stream(&mut self, stream_id: u64) {
4211 self.client_application_receive_flow_control
4212 .release_stream(stream_id);
4213 }
4214
4215 fn build_client_application_control_packet(
4216 &mut self,
4217 frame: QuicFrame,
4218 ) -> Result<ClientApplicationControlPacket> {
4219 let destination_cid = self.destination_cid.clone();
4220 self.build_client_application_control_packet_to(frame, &destination_cid)
4221 }
4222
4223 fn build_client_application_control_packet_to(
4224 &mut self,
4225 frame: QuicFrame,
4226 destination_cid: &ConnectionId,
4227 ) -> Result<ClientApplicationControlPacket> {
4228 self.build_client_application_payload_packet_at_to(
4229 padded_short_header_payload(encode_frame(&frame)),
4230 Instant::now(),
4231 destination_cid,
4232 )
4233 }
4234
4235 fn build_client_application_probe_packet(
4236 &mut self,
4237 target_size: usize,
4238 now: Instant,
4239 ) -> Result<ClientApplicationControlPacket> {
4240 let Some(_client_application_keys) = &self.client_application_keys else {
4241 return Err(Error::Quic(
4242 "native application packet encryption is waiting for TLS application keys".into(),
4243 ));
4244 };
4245 let packet_number_len = 2;
4246 let header_len = 1 + self.destination_cid.as_bytes().len() + packet_number_len;
4247 let tag_len = AES_GCM_TAG_LEN;
4248 let target_payload_len = target_size.saturating_sub(header_len + tag_len);
4249 let mut payload = encode_frame(&QuicFrame::Ping).to_vec();
4250 payload.resize(target_payload_len.max(payload.len()), 0);
4251 self.build_client_application_payload_packet_at(Bytes::from(payload), now)
4252 }
4253
4254 fn build_client_application_payload_packet_at(
4255 &mut self,
4256 payload: Bytes,
4257 now: Instant,
4258 ) -> Result<ClientApplicationControlPacket> {
4259 let destination_cid = self.destination_cid.clone();
4260 self.build_client_application_payload_packet_at_to(payload, now, &destination_cid)
4261 }
4262
4263 fn build_client_application_payload_packet_at_to(
4264 &mut self,
4265 payload: Bytes,
4266 now: Instant,
4267 destination_cid: &ConnectionId,
4268 ) -> Result<ClientApplicationControlPacket> {
4269 let Some(client_application_keys) = &self.client_application_keys else {
4270 return Err(Error::Quic(
4271 "native application packet encryption is waiting for TLS application keys".into(),
4272 ));
4273 };
4274
4275 let packet_number = self.next_client_application_packet_number;
4276 let packet_number_len = 2;
4277 let packet = protect_short_header_packet(
4278 client_application_keys,
4279 destination_cid,
4280 packet_number,
4281 packet_number_len,
4282 self.write_key_phase,
4283 &payload,
4284 )?;
4285 let packet_size = packet.len();
4286 self.client_application_loss_detector
4287 .on_packet_sent_at(packet_number, now);
4288 self.recovery.on_packet_sent(
4289 PacketNumberSpace::Application,
4290 packet_number,
4291 SentPacketInfo::new(now, packet_size, true, true),
4292 );
4293 self.next_client_application_packet_number += 1;
4294
4295 Ok(ClientApplicationControlPacket {
4296 packet,
4297 packet_number,
4298 packet_number_offset: 1 + destination_cid.as_bytes().len(),
4299 })
4300 }
4301
4302 pub fn open_server_application_packet(&mut self, packet: &[u8]) -> Result<Vec<QuicFrame>> {
4303 self.open_server_application_packet_with_ecn(packet, None)
4304 }
4305
4306 pub fn open_server_application_packet_with_ecn(
4307 &mut self,
4308 packet: &[u8],
4309 ecn_mark: Option<QuicEcnMark>,
4310 ) -> Result<Vec<QuicFrame>> {
4311 self.open_server_application_packet_with_path(packet, None, ecn_mark)
4312 }
4313
4314 fn open_server_application_packet_with_path(
4315 &mut self,
4316 packet: &[u8],
4317 remote_address: Option<SocketAddr>,
4318 ecn_mark: Option<QuicEcnMark>,
4319 ) -> Result<Vec<QuicFrame>> {
4320 if self.close_state.is_draining() {
4324 return Ok(Vec::new());
4325 }
4326 let Some(server_application_keys) = self.server_application_keys.as_ref() else {
4327 return Err(Error::Quic(
4328 "native application packet decryption is waiting for TLS application keys".into(),
4329 ));
4330 };
4331 let now = Instant::now();
4332 let opened = try_open_one_rtt_packet(
4333 server_application_keys,
4334 self.server_application_next_keys.as_ref(),
4335 self.server_application_previous.as_ref(),
4336 self.read_key_phase,
4337 now,
4338 packet,
4339 self.source_cid.as_bytes().len(),
4340 self.next_server_application_packet_number,
4341 )?;
4342 if matches!(opened.outcome, OneRttOpenOutcome::Next) {
4343 self.commit_receive_key_update(now)?;
4344 }
4345 let opened = opened.opened;
4346 self.next_server_application_packet_number = opened.packet_number + 1;
4347 let frames = decode_frames(&opened.payload)?;
4348 for frame in &frames {
4349 if let QuicFrame::Stream {
4350 stream_id,
4351 offset,
4352 data,
4353 ..
4354 } = frame
4355 {
4356 self.client_application_receive_flow_control
4357 .observe_stream_frame(*stream_id, *offset, data.len())?;
4358 }
4359 for packet_number in self.client_application_loss_detector.on_ack_frame(frame)? {
4360 self.client_application_sent_streams.remove(&packet_number);
4361 self.application_key_update.note_packet_acked(packet_number);
4362 self.client_pmtu_probe.on_probe_acked(packet_number);
4363 }
4364 if matches!(frame, QuicFrame::Ack { .. } | QuicFrame::AckEcn { .. }) {
4365 let outcome = self.recovery.on_ack_received(
4366 PacketNumberSpace::Application,
4367 frame,
4368 self.fingerprint.transport.ack_delay_exponent,
4369 now,
4370 )?;
4371 for (packet_number, _) in outcome.newly_acked {
4372 self.client_application_sent_streams.remove(&packet_number);
4373 self.application_key_update.note_packet_acked(packet_number);
4374 self.client_pmtu_probe.on_probe_acked(packet_number);
4375 }
4376 for (packet_number, _) in &outcome.lost {
4377 self.client_pmtu_probe.on_probe_lost(*packet_number);
4378 }
4379 self.client_application_recovery_lost_packets.extend(
4380 outcome
4381 .lost
4382 .into_iter()
4383 .map(|(packet_number, _)| packet_number),
4384 );
4385 if outcome.ecn_congestion {
4386 self.client_application_ecn_congestion = true;
4387 }
4388 }
4389 match frame {
4390 QuicFrame::MaxData(max_data) => {
4391 self.client_application_flow_control
4392 .apply_max_data(*max_data);
4393 }
4394 QuicFrame::MaxStreamData {
4395 stream_id,
4396 max_stream_data,
4397 } => self
4398 .client_application_flow_control
4399 .apply_max_stream_data(*stream_id, *max_stream_data),
4400 QuicFrame::MaxStreams {
4401 bidirectional,
4402 max_streams,
4403 } => self
4404 .client_application_flow_control
4405 .apply_max_streams(*bidirectional, *max_streams),
4406 QuicFrame::NewConnectionId {
4407 sequence_number,
4408 retire_prior_to,
4409 connection_id,
4410 stateless_reset_token,
4411 } => self.client_path_validator.register_connection_id(
4412 *sequence_number,
4413 *retire_prior_to,
4414 ConnectionId::from_bytes(connection_id.clone())?,
4415 *stateless_reset_token,
4416 )?,
4417 QuicFrame::PathResponse(data) => {
4418 if let Some(remote_address) = remote_address {
4419 if self
4420 .client_path_validator
4421 .on_path_response_from(remote_address, *data)
4422 {
4423 if let Some(connection_id) = self
4424 .client_path_validator
4425 .migration_connection_id(&remote_address)
4426 .cloned()
4427 {
4428 self.destination_cid = connection_id;
4429 }
4430 }
4431 } else {
4432 self.client_path_validator.on_path_response(*data);
4433 }
4434 }
4435 _ => {}
4436 }
4437 }
4438 if frames.iter().any(is_ack_eliciting_quic_frame) {
4439 observe_packet_with_ecn(
4440 &mut self.application_ack_tracker,
4441 opened.packet_number,
4442 ecn_mark,
4443 now,
4444 );
4445 }
4446 Ok(frames.into_iter().filter(is_not_padding_frame).collect())
4447 }
4448
4449 pub fn open_server_h3_stream_packet(
4450 &mut self,
4451 packet: &[u8],
4452 ) -> Result<Vec<ServerH3StreamEvent>> {
4453 Ok(self
4454 .open_server_h3_event_packet(packet)?
4455 .into_iter()
4456 .filter_map(|event| match event {
4457 ServerH3Event::Stream(event) => Some(event),
4458 ServerH3Event::ResetStream { .. }
4459 | ServerH3Event::StopSending { .. }
4460 | ServerH3Event::ConnectionClose { .. }
4461 | ServerH3Event::PathChallenge(_) => None,
4462 })
4463 .collect())
4464 }
4465
4466 pub fn open_server_h3_event_packet(&mut self, packet: &[u8]) -> Result<Vec<ServerH3Event>> {
4467 self.open_server_h3_event_packet_with_ecn(packet, None)
4468 }
4469
4470 pub fn open_server_h3_event_packet_from(
4471 &mut self,
4472 packet: &[u8],
4473 remote_address: SocketAddr,
4474 ) -> Result<Vec<ServerH3Event>> {
4475 self.open_server_h3_event_packet_with_path_ecn(packet, Some(remote_address), None)
4476 }
4477
4478 pub fn open_server_h3_event_packet_from_with_ecn(
4479 &mut self,
4480 packet: &[u8],
4481 remote_address: SocketAddr,
4482 ecn_mark: Option<QuicEcnMark>,
4483 ) -> Result<Vec<ServerH3Event>> {
4484 self.open_server_h3_event_packet_with_path_ecn(packet, Some(remote_address), ecn_mark)
4485 }
4486
4487 pub fn open_server_h3_event_packet_with_ecn(
4488 &mut self,
4489 packet: &[u8],
4490 ecn_mark: Option<QuicEcnMark>,
4491 ) -> Result<Vec<ServerH3Event>> {
4492 self.open_server_h3_event_packet_with_path_ecn(packet, None, ecn_mark)
4493 }
4494
4495 fn open_server_h3_event_packet_with_path_ecn(
4496 &mut self,
4497 packet: &[u8],
4498 remote_address: Option<SocketAddr>,
4499 ecn_mark: Option<QuicEcnMark>,
4500 ) -> Result<Vec<ServerH3Event>> {
4501 if self.close_state.is_draining() {
4507 return Ok(Vec::new());
4508 }
4509 let mut events = Vec::new();
4510 for frame in
4511 self.open_server_application_packet_with_path(packet, remote_address, ecn_mark)?
4512 {
4513 match frame {
4514 QuicFrame::Stream {
4515 stream_id,
4516 offset,
4517 fin,
4518 data,
4519 ..
4520 } => {
4521 if let Some(event) =
4522 self.apply_server_quic_stream_frame(stream_id, offset, fin, data)?
4523 {
4524 events.push(ServerH3Event::Stream(event));
4525 }
4526 }
4527 QuicFrame::ResetStream {
4528 stream_id,
4529 error_code,
4530 final_size,
4531 } => events.push(ServerH3Event::ResetStream {
4532 stream_id,
4533 error_code,
4534 final_size,
4535 }),
4536 QuicFrame::StopSending {
4537 stream_id,
4538 error_code,
4539 } => events.push(ServerH3Event::StopSending {
4540 stream_id,
4541 error_code,
4542 }),
4543 QuicFrame::ConnectionClose {
4544 error_code,
4545 frame_type,
4546 reason,
4547 } => {
4548 self.close_draining = true;
4553 self.close_state.enter_draining(Instant::now());
4554 events.push(ServerH3Event::ConnectionClose {
4555 error_code,
4556 frame_type,
4557 reason,
4558 });
4559 }
4560 QuicFrame::PathChallenge(data) => events.push(ServerH3Event::PathChallenge(data)),
4561 QuicFrame::PathResponse(_) => {}
4562 QuicFrame::Padding
4563 | QuicFrame::Ping
4564 | QuicFrame::Ack { .. }
4565 | QuicFrame::AckEcn { .. }
4566 | QuicFrame::Crypto { .. }
4567 | QuicFrame::MaxData(_)
4568 | QuicFrame::MaxStreamData { .. }
4569 | QuicFrame::MaxStreams { .. }
4570 | QuicFrame::DataBlocked { .. }
4571 | QuicFrame::StreamDataBlocked { .. }
4572 | QuicFrame::StreamsBlocked { .. }
4573 | QuicFrame::NewConnectionId { .. }
4574 | QuicFrame::RetireConnectionId { .. }
4575 | QuicFrame::HandshakeDone => {}
4576 }
4577 }
4578 Ok(events)
4579 }
4580
4581 fn apply_server_quic_stream_frame(
4582 &mut self,
4583 stream_id: u64,
4584 offset: Option<u64>,
4585 fin: bool,
4586 data: Bytes,
4587 ) -> Result<Option<ServerH3StreamEvent>> {
4588 apply_h3_stream_frame(
4589 &mut self.server_h3_stream_buffers,
4590 &mut self.server_h3_stream_buffer_offsets,
4591 &mut self.server_h3_stream_types,
4592 stream_id,
4593 offset,
4594 fin,
4595 data,
4596 )
4597 }
4598
4599 pub fn process_server_datagram(
4600 &mut self,
4601 datagram: &[u8],
4602 ) -> Result<Vec<ProcessedServerInitial>> {
4603 self.process_server_datagram_with_ecn(datagram, None)
4604 }
4605
4606 pub fn process_server_datagram_with_ecn(
4607 &mut self,
4608 datagram: &[u8],
4609 ecn_mark: Option<QuicEcnMark>,
4610 ) -> Result<Vec<ProcessedServerInitial>> {
4611 if is_version_negotiation_datagram(datagram) {
4612 return self.process_version_negotiation_datagram(datagram);
4613 }
4614
4615 let mut processed = Vec::new();
4616 for packet in split_long_header_datagram(datagram)? {
4617 match packet.packet_type {
4618 LongHeaderType::Initial => {
4619 let opened = open_long_header_packet(
4620 &self.server_initial_keys,
4621 &packet.packet,
4622 packet.packet_number_offset,
4623 self.next_server_initial_packet_number,
4624 )?;
4625 self.destination_cid = packet.source_cid.clone();
4626 self.server_initial_or_handshake_seen = true;
4627 observe_packet_with_ecn(
4628 &mut self.initial_ack_tracker,
4629 opened.packet_number,
4630 ecn_mark,
4631 Instant::now(),
4632 );
4633 self.next_server_initial_packet_number = opened.packet_number + 1;
4634
4635 for frame in decode_frames(&opened.payload)? {
4636 for packet_number in
4637 self.client_initial_loss_detector.on_ack_frame(&frame)?
4638 {
4639 self.client_initial_sent_crypto.remove(&packet_number);
4640 }
4641 let outcome = self.recovery.on_ack_received(
4642 PacketNumberSpace::Initial,
4643 &frame,
4644 self.fingerprint.transport.ack_delay_exponent,
4645 Instant::now(),
4646 )?;
4647 for (packet_number, _) in outcome.newly_acked {
4648 self.client_initial_sent_crypto.remove(&packet_number);
4649 }
4650 if let QuicFrame::Crypto { offset, data } = frame {
4651 self.initial_crypto.insert(offset, data)?;
4652 }
4653 }
4654
4655 let crypto_data = self.initial_crypto.take_contiguous();
4656 if crypto_data.is_empty() {
4657 continue;
4658 }
4659
4660 self.tls
4661 .provide_crypto(QuicEncryptionLevel::Initial, &crypto_data)?;
4662 let secrets = self.tls.secrets();
4663 self.install_tls_secrets(&secrets)?;
4664 self.validate_server_transport_parameters_if_available()?;
4665 processed.push(ProcessedServerInitial {
4666 packet_number: opened.packet_number,
4667 crypto_data,
4668 initial_crypto_out: self.tls.take_crypto(QuicEncryptionLevel::Initial),
4669 handshake_crypto_out: self.tls.take_crypto(QuicEncryptionLevel::Handshake),
4670 secrets,
4671 });
4672 }
4673 LongHeaderType::Handshake => {
4674 let Some(server_handshake_keys) = &self.server_handshake_keys else {
4675 return Err(Error::Quic(
4676 "native Handshake packet decryption is waiting for TLS Handshake keys"
4677 .into(),
4678 ));
4679 };
4680 let opened = open_long_header_packet(
4681 server_handshake_keys,
4682 &packet.packet,
4683 packet.packet_number_offset,
4684 self.next_server_handshake_packet_number,
4685 )?;
4686 self.server_initial_or_handshake_seen = true;
4687 observe_packet_with_ecn(
4688 &mut self.handshake_ack_tracker,
4689 opened.packet_number,
4690 ecn_mark,
4691 Instant::now(),
4692 );
4693 self.next_server_handshake_packet_number = opened.packet_number + 1;
4694
4695 for frame in decode_frames(&opened.payload)? {
4696 for packet_number in
4697 self.client_handshake_loss_detector.on_ack_frame(&frame)?
4698 {
4699 self.client_handshake_sent_crypto.remove(&packet_number);
4700 }
4701 let outcome = self.recovery.on_ack_received(
4702 PacketNumberSpace::Handshake,
4703 &frame,
4704 self.fingerprint.transport.ack_delay_exponent,
4705 Instant::now(),
4706 )?;
4707 for (packet_number, _) in outcome.newly_acked {
4708 self.client_handshake_sent_crypto.remove(&packet_number);
4709 }
4710 if let QuicFrame::Crypto { offset, data } = frame {
4711 self.handshake_crypto.insert(offset, data)?;
4712 }
4713 }
4714
4715 let crypto_data = self.handshake_crypto.take_contiguous();
4716 if !crypto_data.is_empty() {
4717 self.tls
4718 .provide_crypto(QuicEncryptionLevel::Handshake, &crypto_data)?;
4719 let secrets = self.tls.secrets();
4720 self.install_tls_secrets(&secrets)?;
4721 self.validate_server_transport_parameters_if_available()?;
4722 let handshake_crypto_out =
4723 self.tls.take_crypto(QuicEncryptionLevel::Handshake);
4724 if !handshake_crypto_out.is_empty() {
4725 processed.push(ProcessedServerInitial {
4726 packet_number: opened.packet_number,
4727 crypto_data,
4728 initial_crypto_out: Bytes::new(),
4729 handshake_crypto_out,
4730 secrets,
4731 });
4732 }
4733 }
4734 }
4735 LongHeaderType::Retry => {
4736 self.process_retry_packet(packet.packet.as_ref())?;
4737 }
4738 LongHeaderType::ZeroRtt => {}
4739 }
4740 }
4741
4742 Ok(processed)
4743 }
4744
4745 fn process_version_negotiation_datagram(
4746 &mut self,
4747 datagram: &[u8],
4748 ) -> Result<Vec<ProcessedServerInitial>> {
4749 let packet = decode_version_negotiation_packet(datagram)?;
4750 if packet.destination_cid != self.source_cid || packet.source_cid != self.destination_cid {
4751 return Ok(Vec::new());
4752 }
4753 if self.vn_received {
4754 return Ok(Vec::new());
4755 }
4756 if packet
4757 .supported_versions
4758 .contains(&self.client_initial_version)
4759 {
4760 return Ok(Vec::new());
4761 }
4762
4763 let chosen_version = self
4764 .supported_versions
4765 .iter()
4766 .copied()
4767 .find(|version| packet.supported_versions.contains(version));
4768 let Some(chosen_version) = chosen_version else {
4769 return Err(Error::Quic(format!(
4770 "version_negotiation_failed: native H3 server did not offer QUIC version 1 or any other version we support (offered {:?})",
4771 packet.supported_versions,
4772 )));
4773 };
4774
4775 self.restart_for_version_negotiation(chosen_version)?;
4776 Ok(Vec::new())
4777 }
4778
4779 fn process_retry_packet(&mut self, retry_packet: &[u8]) -> Result<()> {
4780 if self.retry_received {
4781 return Ok(());
4782 }
4783 if self.server_initial_or_handshake_seen {
4784 return Ok(());
4785 }
4786
4787 let retry =
4788 match validate_retry_integrity_tag_v1(&self.original_destination_cid, retry_packet) {
4789 Ok(retry) => retry,
4790 Err(_) => return Ok(()),
4791 };
4792 if retry.destination_cid != self.source_cid {
4793 return Ok(());
4794 }
4795 if retry.source_cid.as_bytes() == self.original_destination_cid.as_bytes() {
4796 return Ok(());
4797 }
4798 if retry.token.is_empty() {
4799 return Ok(());
4800 }
4801
4802 let retry_keys = derive_initial_key_material(retry.source_cid.as_bytes())?;
4803 let packet_number = self.next_client_initial_packet_number;
4804 let retry_initial = build_client_initial_packet_with_token_and_version(
4805 &self.fingerprint,
4806 self.client_initial.crypto_data.clone(),
4807 self.client_initial.transport_parameters.clone(),
4808 self.client_initial.secrets.clone(),
4809 retry.source_cid.clone(),
4810 self.source_cid.clone(),
4811 retry.token,
4812 packet_number,
4813 self.client_initial_version,
4814 )?;
4815
4816 self.destination_cid = retry.source_cid.clone();
4817 self.retry_source_cid = Some(retry.source_cid);
4818 self.retry_received = true;
4819 self.client_initial_keys = retry_keys.client;
4820 self.server_initial_keys = retry_keys.server;
4821 self.client_initial = retry_initial.clone();
4822 self.pending_client_initial = Some(retry_initial);
4823 self.next_client_initial_packet_number = packet_number + 1;
4824 self.client_initial_loss_detector = QuicLossDetector::default();
4825 self.client_initial_sent_crypto.clear();
4826 self.recovery = recovery_state_from_transport(&self.fingerprint.transport);
4827 Ok(())
4828 }
4829
4830 fn restart_for_version_negotiation(&mut self, chosen_version: u32) -> Result<()> {
4831 let new_source_cid = random_connection_id(self.source_cid.as_bytes().len())?;
4832 let mut new_tls =
4833 NativeQuicTlsSession::client_with_initial_source_connection_id_and_verify_peer(
4834 &self.server_name,
4835 &self.fingerprint,
4836 &new_source_cid,
4837 self.tls_fingerprint.as_ref(),
4838 self.verify_peer,
4839 &self.root_certs,
4840 self.use_platform_roots,
4841 )?;
4842 let captured = new_tls.take_client_initial();
4843 let new_initial = build_client_initial_packet_from_capture_with_version_and_size(
4844 captured,
4845 self.destination_cid.clone(),
4846 new_source_cid.clone(),
4847 chosen_version,
4848 self.fingerprint.transport.initial_datagram_size,
4849 )?;
4850 let initial_keys = derive_initial_key_material(self.destination_cid.as_bytes())?;
4851
4852 self.tls = new_tls;
4853 self.source_cid = new_source_cid;
4854 self.client_initial_version = chosen_version;
4855 self.vn_received = true;
4856 self.retry_received = false;
4857 self.retry_source_cid = None;
4858 self.server_initial_or_handshake_seen = false;
4859 self.server_transport_parameters_validated = false;
4860 self.close_draining = false;
4861 self.client_initial = new_initial.clone();
4862 self.pending_client_initial = Some(new_initial);
4863 self.client_initial_keys = initial_keys.client;
4864 self.server_initial_keys = initial_keys.server;
4865 self.client_handshake_keys = None;
4866 self.server_handshake_keys = None;
4867 self.client_application_keys = None;
4868 self.server_application_keys = None;
4869 self.initial_crypto = QuicCryptoAssembler::default();
4870 self.handshake_crypto = QuicCryptoAssembler::default();
4871 self.initial_ack_tracker = QuicAckTracker::default();
4872 self.handshake_ack_tracker = QuicAckTracker::default();
4873 self.application_ack_tracker = QuicAckTracker::default();
4874 self.client_initial_loss_detector = QuicLossDetector::default();
4875 self.client_handshake_loss_detector = QuicLossDetector::default();
4876 self.client_application_loss_detector = QuicLossDetector::default();
4877 self.client_application_flow_control =
4878 QuicApplicationFlowControl::client(&self.fingerprint.transport);
4879 self.client_application_receive_flow_control =
4880 QuicReceiveFlowControl::client(&self.fingerprint.transport);
4881 self.client_initial_sent_crypto.clear();
4882 self.client_handshake_sent_crypto.clear();
4883 self.client_application_sent_streams.clear();
4884 self.client_application_recovery_lost_packets.clear();
4885 self.client_path_validator = QuicPathValidator::default();
4886 self.client_cid_inventory = new_client_cid_inventory(&self.fingerprint, &self.source_cid);
4887 self.client_pmtu_probe = QuicPmtuProbePolicy::from_transport(&self.fingerprint.transport);
4888 self.recovery = recovery_state_from_transport(&self.fingerprint.transport);
4889 self.next_client_initial_packet_number = 1;
4890 self.next_server_initial_packet_number = 0;
4891 self.next_server_handshake_packet_number = 0;
4892 self.next_client_handshake_packet_number = 0;
4893 self.next_server_application_packet_number = 0;
4894 self.next_client_application_packet_number = 0;
4895 self.next_client_bidirectional_stream_id = 0;
4896 self.next_client_unidirectional_stream_id = 2;
4897 self.client_handshake_crypto_offset = 0;
4898 self.client_stream_offsets.clear();
4899 self.server_h3_stream_buffers.clear();
4900 self.server_h3_stream_buffer_offsets.clear();
4901 self.server_h3_stream_types.clear();
4902 Ok(())
4903 }
4904
4905 fn validate_server_transport_parameters_if_available(&mut self) -> Result<()> {
4906 if self.server_transport_parameters_validated {
4907 return Ok(());
4908 }
4909 let peer_transport_parameters = self.tls.peer_transport_parameters();
4910 if peer_transport_parameters.is_empty() {
4911 return Ok(());
4912 }
4913 self.validate_server_transport_parameters(peer_transport_parameters.as_ref())?;
4914 self.server_transport_parameters_validated = true;
4915 Ok(())
4916 }
4917
4918 fn validate_server_transport_parameters(&self, encoded: &[u8]) -> Result<()> {
4919 let mut original_destination_cid = None;
4920 let mut initial_source_cid = None;
4921 let mut retry_source_cid = None;
4922
4923 for parameter in decode_transport_parameters(encoded)? {
4924 match parameter {
4925 TransportParameter::OriginalDestinationConnectionId(value) => {
4926 original_destination_cid = Some(value);
4927 }
4928 TransportParameter::InitialSourceConnectionId(value) => {
4929 initial_source_cid = Some(value);
4930 }
4931 TransportParameter::RetrySourceConnectionId(value) => {
4932 retry_source_cid = Some(value);
4933 }
4934 _ => {}
4935 }
4936 }
4937
4938 let original_destination_cid = original_destination_cid.ok_or_else(|| {
4939 Error::Quic("native H3 server omitted original_destination_connection_id".into())
4940 })?;
4941 if original_destination_cid.as_ref() != self.original_destination_cid.as_bytes() {
4942 return Err(Error::Quic(
4943 "native H3 server original_destination_connection_id mismatch".into(),
4944 ));
4945 }
4946
4947 let initial_source_cid = initial_source_cid.ok_or_else(|| {
4948 Error::Quic("native H3 server omitted initial_source_connection_id".into())
4949 })?;
4950 if initial_source_cid.as_ref() != self.destination_cid.as_bytes() {
4951 return Err(Error::Quic(
4952 "native H3 server initial_source_connection_id mismatch".into(),
4953 ));
4954 }
4955
4956 match (&self.retry_source_cid, retry_source_cid) {
4957 (Some(expected), Some(actual)) if actual.as_ref() == expected.as_bytes() => Ok(()),
4958 (Some(_), Some(_)) => Err(Error::Quic(
4959 "native H3 server retry_source_connection_id mismatch".into(),
4960 )),
4961 (Some(_), None) => Err(Error::Quic(
4962 "native H3 server omitted retry_source_connection_id".into(),
4963 )),
4964 (None, Some(_)) => Err(Error::Quic(
4965 "native H3 server sent unexpected retry_source_connection_id".into(),
4966 )),
4967 (None, None) => Ok(()),
4968 }
4969 }
4970}
4971
4972fn is_version_negotiation_datagram(datagram: &[u8]) -> bool {
4973 datagram.len() >= 5
4974 && datagram[0] & 0x80 != 0
4975 && u32::from_be_bytes([datagram[1], datagram[2], datagram[3], datagram[4]]) == 0
4976}
4977
4978#[allow(clippy::too_many_arguments)]
4979fn build_client_initial_packet_with_token_and_version(
4980 fingerprint: &Http3Fingerprint,
4981 crypto_data: Bytes,
4982 transport_parameters: Bytes,
4983 secrets: Vec<QuicTlsSecret>,
4984 destination_cid: ConnectionId,
4985 source_cid: ConnectionId,
4986 token: Bytes,
4987 packet_number: u64,
4988 version: u32,
4989) -> Result<ClientInitialPacket> {
4990 let header_len_without_length = 1
4991 + 4
4992 + 1
4993 + destination_cid.as_bytes().len()
4994 + 1
4995 + source_cid.as_bytes().len()
4996 + varint_len(token.len() as u64)
4997 + token.len();
4998 let padded_plaintext_len = initial_plaintext_len(
4999 header_len_without_length,
5000 crypto_data.len(),
5001 fingerprint.transport.initial_datagram_size,
5002 );
5003 let payload_len = padded_plaintext_len + AES_GCM_TAG_LEN;
5004 let header = encode_long_header(&LongHeaderPacket {
5005 packet_type: LongHeaderType::Initial,
5006 version,
5007 destination_cid: destination_cid.clone(),
5008 source_cid,
5009 token,
5010 packet_number,
5011 packet_number_len: INITIAL_PACKET_NUMBER_LEN,
5012 payload_len,
5013 })?;
5014 let packet_number_offset = header
5015 .len()
5016 .checked_sub(INITIAL_PACKET_NUMBER_LEN)
5017 .ok_or_else(|| Error::HttpProtocol("invalid QUIC Initial header length".into()))?;
5018 let keys = derive_initial_key_material(destination_cid.as_bytes())?;
5019 let packet = build_initial_crypto_packet(
5020 &keys.client,
5021 packet_number,
5022 &header,
5023 packet_number_offset,
5024 INITIAL_PACKET_NUMBER_LEN,
5025 &crypto_data,
5026 padded_plaintext_len,
5027 )?;
5028
5029 Ok(ClientInitialPacket {
5030 packet,
5031 header,
5032 packet_number_offset,
5033 crypto_data,
5034 transport_parameters,
5035 secrets,
5036 })
5037}
5038
5039fn random_connection_id(len: usize) -> Result<ConnectionId> {
5040 let mut bytes = vec![0u8; len];
5041 getrandom_fill(&mut bytes)
5042 .map_err(|err| Error::Quic(format!("native H3 connection id RNG failed: {err}")))?;
5043 ConnectionId::from_bytes(Bytes::from(bytes))
5044}
5045
5046fn initial_plaintext_len(
5047 header_len_without_length: usize,
5048 crypto_data_len: usize,
5049 initial_datagram_size: usize,
5050) -> usize {
5051 let target_datagram_len = initial_datagram_size.max(1200);
5052 let crypto_frame_len = 1 + 1 + varint_len(crypto_data_len as u64) + crypto_data_len;
5053 let mut padded_len = crypto_frame_len;
5054 loop {
5055 let payload_len = padded_len + AES_GCM_TAG_LEN;
5056 let header_len = header_len_without_length
5057 + varint_len((payload_len + INITIAL_PACKET_NUMBER_LEN) as u64)
5058 + INITIAL_PACKET_NUMBER_LEN;
5059 if header_len + payload_len >= target_datagram_len {
5060 return padded_len;
5061 }
5062 padded_len = target_datagram_len - header_len - AES_GCM_TAG_LEN;
5063 }
5064}
5065
5066fn varint_len(value: u64) -> usize {
5067 match value {
5068 0..=0x3f => 1,
5069 0x40..=0x3fff => 2,
5070 0x4000..=0x3fff_ffff => 4,
5071 _ => 8,
5072 }
5073}
5074
5075fn build_server_crypto_packet(
5076 packet_type: LongHeaderType,
5077 keys: &QuicPacketKeyMaterial,
5078 destination_cid: &ConnectionId,
5079 source_cid: &ConnectionId,
5080 packet_number: u64,
5081 crypto_offset: u64,
5082 crypto_data: Bytes,
5083) -> Result<ServerHandshakePacket> {
5084 let packet_number_len = 2;
5085 let frame = encode_frame(&QuicFrame::Crypto {
5086 offset: crypto_offset,
5087 data: crypto_data.clone(),
5088 });
5089 let header = encode_long_header(&LongHeaderPacket {
5090 packet_type,
5091 version: 1,
5092 destination_cid: destination_cid.clone(),
5093 source_cid: source_cid.clone(),
5094 token: Bytes::new(),
5095 packet_number,
5096 packet_number_len,
5097 payload_len: frame.len() + 16,
5098 })?;
5099 let packet_number_offset = header
5100 .len()
5101 .checked_sub(packet_number_len)
5102 .ok_or_else(|| Error::HttpProtocol("invalid QUIC server long-header length".into()))?;
5103 let packet = protect_long_header_packet(
5104 keys,
5105 packet_number,
5106 &header,
5107 packet_number_offset,
5108 packet_number_len,
5109 &frame,
5110 )?;
5111
5112 Ok(ServerHandshakePacket {
5113 packet,
5114 packet_type,
5115 packet_number,
5116 packet_number_offset,
5117 crypto_data,
5118 })
5119}
5120
5121fn apply_h3_stream_frame(
5122 buffers: &mut BTreeMap<u64, BytesMut>,
5123 buffer_offsets: &mut BTreeMap<u64, u64>,
5124 stream_types: &mut BTreeMap<u64, native::H3StreamType>,
5125 stream_id: u64,
5126 offset: Option<u64>,
5127 fin: bool,
5128 data: Bytes,
5129) -> Result<Option<ServerH3StreamEvent>> {
5130 let (stream_type, frames) = if data.is_empty() {
5131 (stream_types.get(&stream_id).copied(), Vec::new())
5132 } else if is_unidirectional_stream(stream_id) {
5133 let stream_type = if let Some(stream_type) = stream_types.get(&stream_id).copied() {
5134 let buffer = buffers.entry(stream_id).or_default();
5135 buffer.extend_from_slice(&data);
5136 stream_type
5137 } else {
5138 let buffer = buffers.entry(stream_id).or_default();
5139 buffer.extend_from_slice(&data);
5140 let stream = match native::decode_unidirectional_stream(buffer.as_ref()) {
5141 Ok(stream) => stream,
5142 Err(error) if !fin && is_incomplete_h3_data_error(&error) => {
5143 return Ok(None);
5144 }
5145 Err(error) => return Err(error),
5146 };
5147 stream_types.insert(stream_id, stream.stream_type);
5148 *buffer = BytesMut::from(stream.payload.as_ref());
5149 stream.stream_type
5150 };
5151 let buffer = buffers.entry(stream_id).or_default();
5152 let frames = if buffer.is_empty() {
5153 Vec::new()
5154 } else if !matches!(stream_type, native::H3StreamType::Control) {
5155 buffer.clear();
5156 Vec::new()
5157 } else {
5158 match native::decode_frames(buffer.as_ref()) {
5159 Ok(frames) => {
5160 buffer.clear();
5161 frames
5162 }
5163 Err(error) if !fin && is_incomplete_h3_data_error(&error) => {
5164 return Ok(None);
5165 }
5166 Err(error) => return Err(error),
5167 }
5168 };
5169 (Some(stream_type), frames)
5170 } else {
5171 let stream_offset = offset.unwrap_or(0);
5172 let buffer_base = *buffer_offsets.entry(stream_id).or_insert(0);
5173 let buffer = buffers.entry(stream_id).or_default();
5174 let buffered_end = buffer_base
5175 .checked_add(buffer.len() as u64)
5176 .ok_or_else(|| Error::HttpProtocol("native H3 stream range overflow".into()))?;
5177 let data_end = stream_offset
5178 .checked_add(data.len() as u64)
5179 .ok_or_else(|| Error::HttpProtocol("native H3 stream range overflow".into()))?;
5180 if data_end <= buffer_base || stream_offset > buffered_end {
5181 return Ok(None);
5182 }
5183 let already_buffered = usize::try_from(buffered_end - stream_offset)
5184 .map_err(|_| Error::HttpProtocol("native H3 stream overlap exceeds usize".into()))?;
5185 if already_buffered < data.len() {
5186 buffer.extend_from_slice(&data[already_buffered..]);
5187 }
5188 match native::decode_frames(buffer.as_ref()) {
5189 Ok(frames) => {
5190 let consumed = buffer.len() as u64;
5191 buffer.clear();
5192 buffer_offsets.insert(
5193 stream_id,
5194 buffer_base.checked_add(consumed).ok_or_else(|| {
5195 Error::HttpProtocol("native H3 stream range overflow".into())
5196 })?,
5197 );
5198 (None, frames)
5199 }
5200 Err(error) if !fin && is_incomplete_h3_data_error(&error) => {
5201 return Ok(None);
5202 }
5203 Err(error) => return Err(error),
5204 }
5205 };
5206
5207 Ok(Some(ServerH3StreamEvent {
5208 stream_id,
5209 stream_type,
5210 fin,
5211 frames,
5212 }))
5213}
5214
5215fn is_unidirectional_stream(stream_id: u64) -> bool {
5216 stream_id & 0x02 != 0
5217}
5218
5219fn is_bidirectional_stream(stream_id: u64) -> bool {
5220 !is_unidirectional_stream(stream_id)
5221}
5222
5223fn stream_initiator(stream_id: u64) -> u64 {
5224 stream_id & 0x01
5225}
5226
5227fn is_ack_eliciting_quic_frame(frame: &QuicFrame) -> bool {
5228 !matches!(
5229 frame,
5230 QuicFrame::Padding | QuicFrame::Ack { .. } | QuicFrame::AckEcn { .. }
5231 )
5232}
5233
5234fn is_not_padding_frame(frame: &QuicFrame) -> bool {
5235 !matches!(frame, QuicFrame::Padding)
5236}
5237
5238fn padded_short_header_payload(payload: Bytes) -> Bytes {
5239 const MIN_SHORT_HEADER_PAYLOAD_LEN: usize = 24;
5240 if payload.len() >= MIN_SHORT_HEADER_PAYLOAD_LEN {
5241 return payload;
5242 }
5243 let mut padded = payload.to_vec();
5244 padded.resize(MIN_SHORT_HEADER_PAYLOAD_LEN, 0);
5245 Bytes::from(padded)
5246}
5247
5248fn is_incomplete_h3_data_error(error: &Error) -> bool {
5249 let message = error.to_string();
5250 message.contains("truncated HTTP/3 frame")
5251 || message.contains("missing HTTP/3 varint")
5252 || message.contains("truncated HTTP/3 varint")
5253}
5254
5255fn build_ack_packet(
5256 packet_type: LongHeaderType,
5257 keys: &QuicPacketKeyMaterial,
5258 destination_cid: &ConnectionId,
5259 source_cid: &ConnectionId,
5260 tracker: &mut QuicAckTracker,
5261 packet_number: u64,
5262) -> Result<Option<ClientAckPacket>> {
5263 if tracker.is_empty() {
5264 return Ok(None);
5265 }
5266
5267 let packet_number_len = 2;
5268 let frame = encode_frame(&tracker.to_ack_frame(0)?);
5269 let header = encode_long_header(&LongHeaderPacket {
5270 packet_type,
5271 version: 1,
5272 destination_cid: destination_cid.clone(),
5273 source_cid: source_cid.clone(),
5274 token: Bytes::new(),
5275 packet_number,
5276 packet_number_len,
5277 payload_len: frame.len() + 16,
5278 })?;
5279 let packet_number_offset = header
5280 .len()
5281 .checked_sub(packet_number_len)
5282 .ok_or_else(|| Error::HttpProtocol("invalid QUIC ACK header length".into()))?;
5283 let packet = protect_long_header_packet(
5284 keys,
5285 packet_number,
5286 &header,
5287 packet_number_offset,
5288 packet_number_len,
5289 &frame,
5290 )?;
5291 tracker.mark_ack_sent();
5292
5293 Ok(Some(ClientAckPacket {
5294 packet,
5295 packet_type,
5296 packet_number,
5297 packet_number_offset,
5298 }))
5299}
5300
5301#[cfg(test)]
5302mod receive_flow_control_tests {
5303 use super::*;
5304 use crate::fingerprint::QuicTransportParams;
5305
5306 fn flow_control_params() -> QuicTransportParams {
5312 let mut params = QuicTransportParams::chrome();
5313 params.initial_max_data = 100;
5314 params.initial_max_stream_data_bidi_local = 40;
5315 params.initial_max_stream_data_bidi_remote = 40;
5316 params.initial_max_stream_data_uni = 40;
5317 params.max_connection_window = 100_000;
5318 params.max_stream_window = 100_000;
5319 params
5320 }
5321
5322 fn client_flow_control() -> QuicReceiveFlowControl {
5323 QuicReceiveFlowControl::client(&flow_control_params())
5324 }
5325
5326 #[test]
5332 fn record_stream_consumed_advertises_initial_plus_drained_per_stream() {
5333 let mut fc = client_flow_control();
5334 let stream_id = 0;
5335
5336 fc.record_stream_consumed(stream_id, 5)
5337 .expect("first drain");
5338 assert!(fc.take_update_frames().is_empty());
5340
5341 fc.record_stream_consumed(stream_id, 20)
5342 .expect("threshold-crossing drain");
5343 let frames = fc.take_update_frames();
5347 assert_eq!(
5348 frames,
5349 vec![QuicFrame::MaxStreamData {
5350 stream_id,
5351 max_stream_data: 65,
5352 }]
5353 );
5354
5355 fc.record_stream_consumed(stream_id, 5)
5358 .expect("small drain");
5359 assert!(fc.take_update_frames().is_empty());
5360
5361 fc.record_stream_consumed(stream_id, 16)
5366 .expect("next threshold");
5367 let frames = fc.take_update_frames();
5368 assert_eq!(
5369 frames,
5370 vec![QuicFrame::MaxStreamData {
5371 stream_id,
5372 max_stream_data: 86,
5373 }]
5374 );
5375 }
5376
5377 #[test]
5382 fn record_stream_consumed_aggregates_connection_level_across_streams() {
5383 let mut fc = client_flow_control();
5384 let stream_a = 0;
5386 let stream_b = 4;
5387
5388 fc.record_stream_consumed(stream_a, 30).expect("a drains");
5392 fc.record_stream_consumed(stream_b, 30).expect("b drains");
5393 let mut frames = fc.take_update_frames();
5394 frames.sort_by_key(|frame| match frame {
5395 QuicFrame::MaxData(_) => 0u8,
5396 QuicFrame::MaxStreamData { stream_id, .. } => 1 + (*stream_id as u8 % 8),
5397 _ => 255,
5398 });
5399
5400 assert!(frames.contains(&QuicFrame::MaxData(160)));
5402 assert!(frames.contains(&QuicFrame::MaxStreamData {
5404 stream_id: stream_a,
5405 max_stream_data: 70,
5406 }));
5407 assert!(frames.contains(&QuicFrame::MaxStreamData {
5408 stream_id: stream_b,
5409 max_stream_data: 70,
5410 }));
5411 }
5412
5413 #[test]
5417 fn threshold_gates_emit_but_does_not_round_absolute_value() {
5418 let mut fc = client_flow_control();
5419 let stream_id = 0;
5420
5421 for _ in 0..19 {
5424 fc.record_stream_consumed(stream_id, 1).expect("tiny drain");
5425 assert!(fc.take_update_frames().is_empty());
5426 }
5427
5428 fc.record_stream_consumed(stream_id, 1)
5431 .expect("crossing drain");
5432 let frames = fc.take_update_frames();
5433 assert_eq!(
5434 frames,
5435 vec![QuicFrame::MaxStreamData {
5436 stream_id,
5437 max_stream_data: 60,
5438 }]
5439 );
5440 }
5441
5442 #[test]
5447 fn release_stream_drops_per_stream_state_without_double_counting_connection() {
5448 let mut fc = client_flow_control();
5449 let stream_a = 0;
5450 let stream_b = 4;
5451
5452 fc.record_stream_consumed(stream_a, 40)
5453 .expect("a fully drains");
5454 let _ = fc.take_update_frames();
5455 fc.release_stream(stream_a);
5457 assert!(
5458 !fc.stream_consumed.contains_key(&stream_a),
5459 "release_stream must clear per-stream consumed bookkeeping"
5460 );
5461 assert!(
5462 !fc.last_announced_max_stream_data.contains_key(&stream_a),
5463 "release_stream must clear per-stream announced bookkeeping"
5464 );
5465
5466 fc.record_stream_consumed(stream_b, 30)
5469 .expect("b drains after a retired");
5470 let frames = fc.take_update_frames();
5471 assert!(frames.contains(&QuicFrame::MaxData(170)));
5474 assert!(frames.contains(&QuicFrame::MaxStreamData {
5477 stream_id: stream_b,
5478 max_stream_data: 70,
5479 }));
5480 }
5481
5482 #[test]
5487 fn observe_stream_frame_uses_advertised_limit_after_consume_grows_window() {
5488 let mut fc = client_flow_control();
5489 let stream_id = 0;
5490
5491 let too_much = fc.observe_stream_frame(stream_id, Some(0), 41);
5494 assert!(too_much.is_err(), "must reject data above initial limit");
5495
5496 fc.observe_stream_frame(stream_id, Some(0), 40)
5498 .expect("fill initial window");
5499 fc.record_stream_consumed(stream_id, 40)
5500 .expect("drain initial window");
5501 let _ = fc.take_update_frames();
5502
5503 fc.observe_stream_frame(stream_id, Some(40), 40)
5505 .expect("data within newly advertised window");
5506 }
5507}